""" Adapter runtime system for ajarbot. Connects messaging platform adapters to the Agent instance. """ import asyncio import traceback from typing import Any, Callable, Dict, List, Optional from adapters.base import ( AdapterRegistry, BaseAdapter, InboundMessage, OutboundMessage, ) from agent import Agent class AdapterRuntime: """ Runtime system that connects adapters to the Agent. Acts as the bridge between messaging platforms (Slack, Telegram, etc.) and the Agent (memory + LLM). """ def __init__( self, agent: Agent, registry: Optional[AdapterRegistry] = None, ) -> None: self.agent = agent self.registry = registry or AdapterRegistry() self.message_loop_task: Optional[asyncio.Task] = None self._message_queue: asyncio.Queue = asyncio.Queue() self._is_running = False # User ID mapping: platform_user_id -> username self._user_mapping: Dict[str, str] = {} self._preprocessors: List[ Callable[[InboundMessage], InboundMessage] ] = [] self._postprocessors: List[ Callable[[str, InboundMessage], str] ] = [] def add_adapter(self, adapter: BaseAdapter) -> None: """Add and configure an adapter.""" self.registry.register(adapter) adapter.register_message_handler(self._on_message_received) def map_user(self, platform_user_id: str, username: str) -> None: """Map a platform user ID to an ajarbot username.""" self._user_mapping[platform_user_id] = username def get_username( self, platform: str, platform_user_id: str ) -> str: """ Get ajarbot username for a platform user. Falls back to platform_user_id format if no mapping exists. """ key = f"{platform}:{platform_user_id}" # Use underscore for fallback to match validation rules fallback = f"{platform}_{platform_user_id}" return self._user_mapping.get(key, fallback) def add_preprocessor( self, preprocessor: Callable[[InboundMessage], InboundMessage], ) -> None: """Add a message preprocessor (e.g., for commands, filters).""" self._preprocessors.append(preprocessor) def add_postprocessor( self, postprocessor: Callable[[str, InboundMessage], str], ) -> None: """Add a response postprocessor (e.g., for formatting).""" self._postprocessors.append(postprocessor) def _on_message_received(self, message: InboundMessage) -> None: """Handle incoming message from an adapter. This may be called from different event loop contexts (e.g., python-telegram-bot's internal loop vs. our main asyncio loop), so we use loop-safe scheduling instead of create_task(). """ try: loop = asyncio.get_running_loop() loop.call_soon_threadsafe(self._message_queue.put_nowait, message) except RuntimeError: # No running loop - should not happen in normal operation # but handle gracefully print("[Runtime] Warning: No event loop for message dispatch") self._message_queue.put_nowait(message) async def _process_message_queue(self) -> None: """Background task to process incoming messages.""" print("[Runtime] Message processing loop started") while self._is_running: try: message = await asyncio.wait_for( self._message_queue.get(), timeout=1.0 ) await self._process_message(message) except asyncio.TimeoutError: continue except Exception as e: print(f"[Runtime] Error processing message: {e}") traceback.print_exc() print("[Runtime] Message processing loop stopped") async def _process_message(self, message: InboundMessage) -> None: """Process a single message.""" preview = message.text[:50] print( f"[{message.platform.upper()}] " f"Message from {message.username}: {preview}..." ) try: # Apply preprocessors processed_message = message for preprocessor in self._preprocessors: processed_message = preprocessor(processed_message) username = self.get_username( message.platform, message.user_id ) adapter = self.registry.get(message.platform) if adapter: await adapter.send_typing_indicator(message.channel_id) # Capture the event loop for thread-safe progress updates event_loop = asyncio.get_running_loop() # Create progress callback to send updates to the user def progress_callback(update_message: str): """Send progress updates to the user during long operations.""" if adapter: try: # Create outbound message for progress update progress_msg = OutboundMessage( platform=message.platform, channel_id=message.channel_id, text=update_message, thread_id=message.thread_id, ) # Run async send in a thread-safe way # Use the captured event loop instead of get_running_loop() # since this callback runs from a thread (agent.chat via to_thread) asyncio.run_coroutine_threadsafe( adapter.send_message(progress_msg), event_loop ) except Exception as e: print(f"[Runtime] Failed to send progress update: {e}") # Get response from agent (synchronous call in thread) response = await asyncio.to_thread( self.agent.chat, user_message=processed_message.text, username=username, progress_callback=progress_callback, ) # Apply postprocessors for postprocessor in self._postprocessors: response = postprocessor(response, processed_message) # Send response back if adapter: reply_to = ( message.metadata.get("ts") or message.metadata.get("message_id") ) outbound = OutboundMessage( platform=message.platform, channel_id=message.channel_id, text=response, thread_id=message.thread_id, reply_to_id=reply_to, ) result = await adapter.send_message(outbound) platform_tag = message.platform.upper() if result.get("success"): print( f"[{platform_tag}] Response sent " f"({len(response)} chars)" ) else: print( f"[{platform_tag}] Failed to send response: " f"{result.get('error')}" ) except Exception as e: print(f"[Runtime] Error processing message: {e}") traceback.print_exc() if hasattr(self.agent, 'healing_system'): self.agent.healing_system.capture_error( error=e, component="adapters/runtime.py:_process_message", intent=f"Processing message from {message.platform}", context={ "platform": message.platform, "user": message.username, "message_preview": message.text[:100], }, ) await self._send_error_reply(message) async def _send_error_reply(self, message: InboundMessage) -> None: """Attempt to send an error message back to the user.""" try: adapter = self.registry.get(message.platform) if adapter: error_msg = OutboundMessage( platform=message.platform, channel_id=message.channel_id, text=( "Sorry, I encountered an error processing " "your message. Please try again." ), thread_id=message.thread_id, ) await adapter.send_message(error_msg) except Exception: pass async def start(self) -> None: """Start the runtime and all adapters.""" print("[Runtime] Starting adapter runtime...") await self.registry.start_all() # Pass the main event loop to the LLM interface so that Agent SDK # async calls (from worker threads created by asyncio.to_thread) # can be scheduled back onto this loop via run_coroutine_threadsafe. loop = asyncio.get_running_loop() if hasattr(self.agent, 'llm') and hasattr(self.agent.llm, 'set_event_loop'): self.agent.llm.set_event_loop(loop) print("[Runtime] Event loop reference passed to LLM interface") self._is_running = True self.message_loop_task = asyncio.create_task( self._process_message_queue() ) print("[Runtime] Runtime started") async def stop(self) -> None: """Stop the runtime and all adapters.""" print("[Runtime] Stopping adapter runtime...") self._is_running = False if self.message_loop_task: await self.message_loop_task await self.registry.stop_all() self.agent.shutdown() print("[Runtime] Runtime stopped") async def health_check(self) -> Dict[str, Any]: """Get health status of all adapters.""" status: Dict[str, Any] = { "runtime_running": self._is_running, "adapters": {}, } for adapter in self.registry.get_all(): adapter_health = await adapter.health_check() status["adapters"][adapter.platform_name] = adapter_health return status