Files

180 lines
8.7 KiB
Python

import asyncio
import json
from contextlib import suppress
from typing import Any
from telegram import BotCommand, InputMediaPhoto, Update
from telegram.constants import ChatAction
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from app.orchestrator import WiseClawOrchestrator
class TelegramBotService:
MAX_MESSAGE_LEN = 3500
def __init__(self, token: str, orchestrator_factory: Any) -> None:
self.token = token
self.orchestrator_factory = orchestrator_factory
self.application: Application | None = None
async def process_message(self, telegram_user_id: int, text: str) -> str:
with self.orchestrator_factory() as session:
orchestrator = WiseClawOrchestrator(session)
return await orchestrator.handle_text_message(telegram_user_id=telegram_user_id, text=text)
async def process_message_payload(self, telegram_user_id: int, text: str) -> dict[str, object]:
with self.orchestrator_factory() as session:
orchestrator = WiseClawOrchestrator(session)
payload = await orchestrator.handle_message_payload(telegram_user_id=telegram_user_id, text=text)
text_value = str(payload.get("text", ""))
if text_value.startswith("__WC_MEDIA__"):
try:
decoded = json.loads(text_value[len("__WC_MEDIA__") :])
except json.JSONDecodeError:
return {"text": text_value, "media": []}
return {
"text": str(decoded.get("text", "")),
"media": decoded.get("media", []) if isinstance(decoded.get("media"), list) else [],
}
return payload
async def send_message(self, chat_id: int, text: str) -> None:
if self.application is None:
return
for chunk in self._chunk_message(text):
await self.application.bot.send_message(chat_id=chat_id, text=chunk)
async def send_media(self, chat_id: int, media: list[dict[str, str]]) -> None:
if self.application is None:
return
clean_media = [item for item in media[:3] if item.get("url")]
if not clean_media:
return
if len(clean_media) == 1:
item = clean_media[0]
try:
await self.application.bot.send_photo(chat_id=chat_id, photo=item["url"], caption=item.get("caption", "")[:1024])
except Exception:
return
return
media_group = []
for item in clean_media:
media_group.append(InputMediaPhoto(media=item["url"], caption=item.get("caption", "")[:1024]))
try:
await self.application.bot.send_media_group(chat_id=chat_id, media=media_group)
except Exception:
for item in clean_media:
try:
await self.application.bot.send_photo(chat_id=chat_id, photo=item["url"], caption=item.get("caption", "")[:1024])
except Exception:
continue
async def start(self) -> None:
if not self.token:
return
self.application = Application.builder().token(self.token).build()
self.application.add_handler(CommandHandler("start", self._on_start))
self.application.add_handler(CommandHandler("tanisalim", self._on_command_passthrough))
self.application.add_handler(CommandHandler("profilim", self._on_command_passthrough))
self.application.add_handler(CommandHandler("tercihlerim", self._on_command_passthrough))
self.application.add_handler(CommandHandler("tanisalim_sifirla", self._on_command_passthrough))
self.application.add_handler(CommandHandler("otomasyon_ekle", self._on_command_passthrough))
self.application.add_handler(CommandHandler("otomasyonlar", self._on_command_passthrough))
self.application.add_handler(CommandHandler("otomasyon_durdur", self._on_command_passthrough))
self.application.add_handler(CommandHandler("otomasyon_baslat", self._on_command_passthrough))
self.application.add_handler(CommandHandler("otomasyon_sil", self._on_command_passthrough))
self.application.add_handler(CommandHandler("notlarima_ekle", self._on_command_passthrough))
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_text))
await self.application.initialize()
await self.application.bot.set_my_commands(self._telegram_commands())
await self.application.start()
await self.application.updater.start_polling(drop_pending_updates=True)
async def stop(self) -> None:
if self.application is None:
return
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
self.application = None
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
del context
if update.message is None or update.effective_user is None:
return
await update.message.reply_text(
"WiseClaw is online. If your Telegram user is whitelisted, send a message to start."
)
async def _on_text(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message is None or update.effective_user is None or update.message.text is None:
return
typing_task = asyncio.create_task(self._send_typing(update.effective_chat.id, context))
try:
reply = await self.process_message_payload(update.effective_user.id, update.message.text)
finally:
typing_task.cancel()
with suppress(asyncio.CancelledError):
await typing_task
media = reply.get("media", []) if isinstance(reply, dict) else []
if isinstance(media, list) and media:
await self.send_media(
update.effective_chat.id,
[item for item in media if isinstance(item, dict)],
)
text_reply = str(reply.get("text", "")) if isinstance(reply, dict) else str(reply)
for chunk in self._chunk_message(text_reply):
await update.message.reply_text(chunk)
async def _on_command_passthrough(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
del context
if update.message is None or update.effective_user is None or update.message.text is None:
return
reply = await self.process_message_payload(update.effective_user.id, update.message.text)
media = reply.get("media", []) if isinstance(reply, dict) else []
if isinstance(media, list) and media:
await self.send_media(
update.effective_chat.id,
[item for item in media if isinstance(item, dict)],
)
text_reply = str(reply.get("text", "")) if isinstance(reply, dict) else str(reply)
for chunk in self._chunk_message(text_reply):
await update.message.reply_text(chunk)
async def _send_typing(self, chat_id: int, context: ContextTypes.DEFAULT_TYPE) -> None:
while True:
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
await asyncio.sleep(4)
def _chunk_message(self, text: str) -> list[str]:
if len(text) <= self.MAX_MESSAGE_LEN:
return [text]
chunks: list[str] = []
remaining = text
while len(remaining) > self.MAX_MESSAGE_LEN:
split_at = remaining.rfind("\n", 0, self.MAX_MESSAGE_LEN)
if split_at <= 0:
split_at = self.MAX_MESSAGE_LEN
chunks.append(remaining[:split_at].strip())
remaining = remaining[split_at:].strip()
if remaining:
chunks.append(remaining)
return chunks
def _telegram_commands(self) -> list[BotCommand]:
return [
BotCommand("start", "WiseClaw'i baslat (wc)"),
BotCommand("tanisalim", "12 soruluk tanisma akisini baslat (wc)"),
BotCommand("profilim", "Kayitli profil ozetimi goster (wc)"),
BotCommand("tercihlerim", "Kayitli iletisim tercihlerini goster (wc)"),
BotCommand("tanisalim_sifirla", "Tanisma profilini sifirla (wc)"),
BotCommand("otomasyon_ekle", "Yeni otomasyon wizard'ini baslat (wc)"),
BotCommand("otomasyonlar", "Otomasyon listesini goster (wc)"),
BotCommand("otomasyon_durdur", "Bir otomasyonu durdur: /otomasyon_durdur <id> (wc)"),
BotCommand("otomasyon_baslat", "Bir otomasyonu yeniden baslat: /otomasyon_baslat <id> (wc)"),
BotCommand("otomasyon_sil", "Bir otomasyonu sil: /otomasyon_sil <id> (wc)"),
BotCommand("notlarima_ekle", "Ikinci beyne yeni not ekle (wc)"),
]