from typing import Any from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from app.orchestrator import WiseClawOrchestrator class TelegramBotService: def __init__(self, token: str, orchestrator_factory: Any) -> None: self.token = token self.orchestrator_factory = orchestrator_factory self.application: Application | None = None async def process_message(self, telegram_user_id: int, text: str) -> str: with self.orchestrator_factory() as session: orchestrator = WiseClawOrchestrator(session) return orchestrator.handle_text_message(telegram_user_id=telegram_user_id, text=text) async def start(self) -> None: if not self.token: return self.application = Application.builder().token(self.token).build() self.application.add_handler(CommandHandler("start", self._on_start)) self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_text)) await self.application.initialize() await self.application.start() await self.application.updater.start_polling(drop_pending_updates=True) async def stop(self) -> None: if self.application is None: return await self.application.updater.stop() await self.application.stop() await self.application.shutdown() self.application = None async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: del context if update.message is None or update.effective_user is None: return await update.message.reply_text( "WiseClaw is online. If your Telegram user is whitelisted, send a message to start." ) async def _on_text(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: del context if update.message is None or update.effective_user is None or update.message.text is None: return reply = await self.process_message(update.effective_user.id, update.message.text) await update.message.reply_text(reply)