Features: - Multi-platform bot (Slack, Telegram) - Memory system with SQLite FTS - Tool use capabilities (file ops, commands) - Scheduled tasks system - Dynamic model switching (/sonnet, /haiku) - Prompt caching for cost optimization Optimizations: - Default to Haiku 4.5 (12x cheaper) - Reduced context: 3 messages, 2 memory results - Optimized SOUL.md (48% smaller) - Automatic caching when using Sonnet (90% savings) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
368 lines
11 KiB
Python
368 lines
11 KiB
Python
"""
|
|
Telegram adapter for ajarbot.
|
|
|
|
Uses python-telegram-bot library for async Telegram Bot API integration.
|
|
"""
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from telegram import Bot, Update
|
|
from telegram.error import TelegramError
|
|
from telegram.ext import (
|
|
Application,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
from adapters.base import (
|
|
AdapterCapabilities,
|
|
AdapterConfig,
|
|
BaseAdapter,
|
|
InboundMessage,
|
|
MessageType,
|
|
OutboundMessage,
|
|
)
|
|
|
|
|
|
class TelegramAdapter(BaseAdapter):
|
|
"""
|
|
Telegram adapter using python-telegram-bot.
|
|
|
|
Configuration required:
|
|
- bot_token: Telegram Bot API Token (from @BotFather)
|
|
|
|
Optional settings:
|
|
- allowed_users: List of user IDs allowed to interact (for privacy)
|
|
- parse_mode: "HTML" or "Markdown" (default: "Markdown")
|
|
"""
|
|
|
|
def __init__(self, config: AdapterConfig) -> None:
|
|
super().__init__(config)
|
|
self.application: Optional[Application] = None
|
|
self.bot: Optional[Bot] = None
|
|
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "telegram"
|
|
|
|
@property
|
|
def capabilities(self) -> AdapterCapabilities:
|
|
return AdapterCapabilities(
|
|
supports_threads=False,
|
|
supports_reactions=True,
|
|
supports_media=True,
|
|
supports_files=True,
|
|
supports_markdown=True,
|
|
max_message_length=4096,
|
|
chunking_strategy="markdown",
|
|
)
|
|
|
|
def validate_config(self) -> bool:
|
|
"""Validate Telegram configuration."""
|
|
if not self.config.credentials:
|
|
return False
|
|
|
|
bot_token = self.config.credentials.get("bot_token", "")
|
|
return bool(bot_token and len(bot_token) > 20)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the Telegram bot."""
|
|
if not self.validate_config():
|
|
raise ValueError(
|
|
"Invalid Telegram configuration. Need bot_token"
|
|
)
|
|
|
|
bot_token = self.config.credentials["bot_token"]
|
|
|
|
self.application = (
|
|
Application.builder().token(bot_token).build()
|
|
)
|
|
self.bot = self.application.bot
|
|
|
|
self._register_handlers()
|
|
|
|
print("[Telegram] Starting bot...")
|
|
await self.application.initialize()
|
|
await self.application.start()
|
|
await self.application.updater.start_polling(
|
|
allowed_updates=Update.ALL_TYPES,
|
|
drop_pending_updates=True,
|
|
)
|
|
|
|
self.is_running = True
|
|
|
|
me = await self.bot.get_me()
|
|
print(
|
|
f"[Telegram] Bot started: @{me.username} ({me.first_name})"
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the Telegram bot."""
|
|
if self.application:
|
|
print("[Telegram] Stopping bot...")
|
|
await self.application.updater.stop()
|
|
await self.application.stop()
|
|
await self.application.shutdown()
|
|
self.is_running = False
|
|
print("[Telegram] Bot stopped")
|
|
|
|
def _register_handlers(self) -> None:
|
|
"""Register Telegram message handlers."""
|
|
|
|
async def handle_message(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle incoming text messages."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
|
|
if not self._is_user_allowed(update.effective_user.id):
|
|
await update.message.reply_text(
|
|
"Sorry, you are not authorized to use this bot."
|
|
)
|
|
return
|
|
|
|
user = update.effective_user
|
|
message = update.message
|
|
|
|
reply_to_id = None
|
|
if message.reply_to_message:
|
|
reply_to_id = str(message.reply_to_message.message_id)
|
|
|
|
# Sanitize username: replace spaces/special chars with underscores
|
|
raw_username = user.username or user.first_name or str(user.id)
|
|
sanitized_username = "".join(
|
|
c if c.isalnum() or c in "-_" else "_" for c in raw_username
|
|
)
|
|
|
|
inbound_msg = InboundMessage(
|
|
platform="telegram",
|
|
user_id=str(user.id),
|
|
username=sanitized_username,
|
|
text=message.text,
|
|
channel_id=str(message.chat.id),
|
|
thread_id=None,
|
|
reply_to_id=reply_to_id,
|
|
message_type=MessageType.TEXT,
|
|
metadata={
|
|
"message_id": message.message_id,
|
|
"chat_type": message.chat.type,
|
|
"date": (
|
|
message.date.isoformat()
|
|
if message.date
|
|
else None
|
|
),
|
|
"user_full_name": user.full_name,
|
|
"is_bot": user.is_bot,
|
|
},
|
|
raw=update,
|
|
)
|
|
|
|
self._dispatch_message(inbound_msg)
|
|
|
|
async def handle_start(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle /start command."""
|
|
await update.message.reply_text(
|
|
"Hello! I'm an AI assistant bot.\n\n"
|
|
"Just send me a message and I'll respond!"
|
|
)
|
|
|
|
async def handle_help(
|
|
update: Update, context: ContextTypes.DEFAULT_TYPE
|
|
) -> None:
|
|
"""Handle /help command."""
|
|
await update.message.reply_text(
|
|
"*Ajarbot Help*\n\n"
|
|
"I'm an AI assistant. You can:\n"
|
|
"- Send me messages and I'll respond\n"
|
|
"- Have natural conversations\n"
|
|
"- Ask me questions\n\n"
|
|
"Commands:\n"
|
|
"/start - Start the bot\n"
|
|
"/help - Show this help message",
|
|
parse_mode="Markdown",
|
|
)
|
|
|
|
self.application.add_handler(
|
|
CommandHandler("start", handle_start)
|
|
)
|
|
self.application.add_handler(
|
|
CommandHandler("help", handle_help)
|
|
)
|
|
self.application.add_handler(
|
|
MessageHandler(
|
|
filters.TEXT & ~filters.COMMAND, handle_message
|
|
)
|
|
)
|
|
|
|
async def send_message(
|
|
self, message: OutboundMessage
|
|
) -> Dict[str, Any]:
|
|
"""Send a message to Telegram."""
|
|
if not self.bot:
|
|
return {"success": False, "error": "Bot not started"}
|
|
|
|
try:
|
|
chat_id = int(message.channel_id)
|
|
parse_mode = "Markdown"
|
|
if self.config.settings:
|
|
parse_mode = self.config.settings.get(
|
|
"parse_mode", "Markdown"
|
|
)
|
|
|
|
chunks = self.chunk_text(message.text)
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
for chunk in chunks:
|
|
reply_to_id = (
|
|
int(message.reply_to_id)
|
|
if message.reply_to_id
|
|
else None
|
|
)
|
|
|
|
sent_message = await self.bot.send_message(
|
|
chat_id=chat_id,
|
|
text=chunk,
|
|
parse_mode=parse_mode,
|
|
reply_to_message_id=reply_to_id,
|
|
)
|
|
|
|
results.append({
|
|
"message_id": sent_message.message_id,
|
|
"chat_id": sent_message.chat_id,
|
|
"date": (
|
|
sent_message.date.isoformat()
|
|
if sent_message.date
|
|
else None
|
|
),
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": results[0]["message_id"],
|
|
"chunks_sent": len(chunks),
|
|
"results": results,
|
|
}
|
|
|
|
except TelegramError as e:
|
|
print(f"[Telegram] Error sending message: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
async def send_reaction(
|
|
self, channel_id: str, message_id: str, emoji: str
|
|
) -> bool:
|
|
"""Send a reaction to a message."""
|
|
if not self.bot:
|
|
return False
|
|
|
|
try:
|
|
await self.bot.set_message_reaction(
|
|
chat_id=int(channel_id),
|
|
message_id=int(message_id),
|
|
reaction=emoji,
|
|
)
|
|
return True
|
|
except TelegramError as e:
|
|
print(f"[Telegram] Error adding reaction: {e}")
|
|
return False
|
|
|
|
async def send_typing_indicator(self, channel_id: str) -> None:
|
|
"""Show typing indicator."""
|
|
if not self.bot:
|
|
return
|
|
|
|
try:
|
|
await self.bot.send_chat_action(
|
|
chat_id=int(channel_id), action="typing"
|
|
)
|
|
except TelegramError as e:
|
|
print(f"[Telegram] Error sending typing indicator: {e}")
|
|
|
|
async def health_check(self) -> Dict[str, Any]:
|
|
"""Perform health check."""
|
|
base_health = await super().health_check()
|
|
|
|
if not self.bot:
|
|
return {**base_health, "details": "Bot not initialized"}
|
|
|
|
try:
|
|
me = await self.bot.get_me()
|
|
return {
|
|
**base_health,
|
|
"bot_id": me.id,
|
|
"username": me.username,
|
|
"first_name": me.first_name,
|
|
"can_join_groups": me.can_join_groups,
|
|
"can_read_all_group_messages": (
|
|
me.can_read_all_group_messages
|
|
),
|
|
"connected": True,
|
|
}
|
|
except TelegramError as e:
|
|
return {
|
|
**base_health,
|
|
"healthy": False,
|
|
"error": str(e),
|
|
}
|
|
|
|
def _is_user_allowed(self, user_id: int) -> bool:
|
|
"""Check if user is allowed to interact with the bot."""
|
|
if not self.config.settings:
|
|
return True
|
|
|
|
allowed_users = self.config.settings.get("allowed_users", [])
|
|
if not allowed_users:
|
|
return True
|
|
|
|
return user_id in allowed_users
|
|
|
|
def chunk_text(self, text: str) -> List[str]:
|
|
"""
|
|
Override chunk_text for Telegram-specific markdown handling.
|
|
|
|
Preserves markdown code blocks and formatting.
|
|
"""
|
|
max_len = self.capabilities.max_message_length
|
|
|
|
if len(text) <= max_len:
|
|
return [text]
|
|
|
|
chunks: List[str] = []
|
|
current_chunk = ""
|
|
|
|
# Split by code blocks first to preserve them
|
|
parts = text.split("```")
|
|
in_code_block = False
|
|
|
|
for part in parts:
|
|
if in_code_block:
|
|
code_block = f"```{part}```"
|
|
if len(current_chunk) + len(code_block) > max_len:
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
chunks.append(code_block)
|
|
current_chunk = ""
|
|
else:
|
|
current_chunk += code_block
|
|
else:
|
|
# Regular text - split by paragraphs
|
|
paragraphs = part.split("\n\n")
|
|
for para in paragraphs:
|
|
if len(current_chunk) + len(para) + 2 > max_len:
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = para + "\n\n"
|
|
else:
|
|
current_chunk += para + "\n\n"
|
|
|
|
in_code_block = not in_code_block
|
|
|
|
if current_chunk.strip():
|
|
chunks.append(current_chunk.strip())
|
|
|
|
return chunks if chunks else [text]
|