Files
ajarbot/adapters/runtime.py
Jordan Ramos e909cc0044 Add MCP delegation bridge and diagram tools
**Features Added**:

1. **Agent Registry (agent_registry.py)**
   - Thread-safe global singleton for MCP tool access to Agent instance
   - Enables MCP tools to call Agent.delegate() without circular imports
   - Registered at bot startup in bot_runner.py

2. **Sub-Agent Manager (sub_agent_manager.py)**
   - Watchdog system monitoring sub-agent lifecycle
   - Detects hung agents (5min timeout, 30s check interval)
   - Auto-cleanup and status tracking

3. **delegate_task MCP Tool (mcp_tools.py)**
   - Exposes Agent.delegate() to Claude via MCP protocol
   - Enables parallel sub-agent execution via tool calls
   - Supports specialist prompts and agent ID caching

4. **Memory Write Locks (memory_system.py)**
   - Thread-safe writes to prevent file corruption
   - Protects write_memory(), update_soul(), update_user()

5. **Diagram Tools**
   - Mermaid MCP server (flowcharts, sequence diagrams, etc.)
   - Excalidraw MCP server (hand-drawn style diagrams)
   - Config files in config/ directory

6. **Adapter Improvements**
   - Enhanced error handling across all adapters
   - Unified logging patterns

**Testing**: Ready for parallel sub-agent testing

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-01 14:34:24 -07:00

348 lines
13 KiB
Python

"""
Adapter runtime system for ajarbot.
Connects messaging platform adapters to the Agent instance.
"""
import asyncio
import traceback
from typing import Any, Callable, Dict, List, Optional
from adapters.base import (
AdapterRegistry,
BaseAdapter,
InboundMessage,
OutboundMessage,
)
from agent import Agent
class AdapterRuntime:
"""
Runtime system that connects adapters to the Agent.
Acts as the bridge between messaging platforms (Slack, Telegram, etc.)
and the Agent (memory + LLM).
"""
def __init__(
self,
agent: Agent,
registry: Optional[AdapterRegistry] = None,
) -> None:
self.agent = agent
self.registry = registry or AdapterRegistry()
self.message_loop_task: Optional[asyncio.Task] = None
self._message_queue: asyncio.Queue = asyncio.Queue()
self._is_running = False
# User ID mapping: platform_user_id -> username
self._user_mapping: Dict[str, str] = {}
self._preprocessors: List[
Callable[[InboundMessage], InboundMessage]
] = []
self._postprocessors: List[
Callable[[str, InboundMessage], str]
] = []
def add_adapter(self, adapter: BaseAdapter) -> None:
"""Add and configure an adapter."""
self.registry.register(adapter)
adapter.register_message_handler(self._on_message_received)
def map_user(self, platform_user_id: str, username: str) -> None:
"""Map a platform user ID to an ajarbot username."""
self._user_mapping[platform_user_id] = username
def get_username(
self, platform: str, platform_user_id: str
) -> str:
"""
Get ajarbot username for a platform user.
Falls back to platform_user_id format if no mapping exists.
"""
key = f"{platform}:{platform_user_id}"
# Use underscore for fallback to match validation rules
fallback = f"{platform}_{platform_user_id}"
return self._user_mapping.get(key, fallback)
def add_preprocessor(
self,
preprocessor: Callable[[InboundMessage], InboundMessage],
) -> None:
"""Add a message preprocessor (e.g., for commands, filters)."""
self._preprocessors.append(preprocessor)
def add_postprocessor(
self,
postprocessor: Callable[[str, InboundMessage], str],
) -> None:
"""Add a response postprocessor (e.g., for formatting)."""
self._postprocessors.append(postprocessor)
def _on_message_received(self, message: InboundMessage) -> None:
"""Handle incoming message from an adapter.
This may be called from different event loop contexts (e.g.,
python-telegram-bot's internal loop vs. our main asyncio loop),
so we use loop-safe scheduling instead of create_task().
"""
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(self._message_queue.put_nowait, message)
except RuntimeError:
# No running loop - should not happen in normal operation
# but handle gracefully
print("[Runtime] Warning: No event loop for message dispatch")
self._message_queue.put_nowait(message)
async def _detect_and_send_diagrams(
self,
response: str,
adapter: BaseAdapter,
channel_id: str,
thread_id: Optional[str]
) -> None:
"""Detect diagram file paths in response and auto-send them.
Args:
response: Agent's text response
adapter: Platform adapter to send files with
channel_id: Channel/chat ID
thread_id: Thread/message ID for replies
"""
import re
from pathlib import Path
# Match diagram file paths: "Saved to: path/to/diagram.png"
# Pattern matches common phrases followed by file path with image/diagram extensions
pattern = r"(?:Saved|Created|Generated|Exported|File saved|Output file)\s*(?:to|at)?[:\s]+([^\s]+\.(?:png|svg|pdf|jpg|jpeg))"
matches = re.findall(pattern, response, re.IGNORECASE)
if not matches:
return
sent_files = []
for file_path_str in matches:
try:
file_path = Path(file_path_str)
# Check if file exists
if not file_path.exists():
print(f"[Runtime] Diagram file not found: {file_path}")
continue
# Send file via adapter
result = await adapter.send_file(
channel_id=channel_id,
file_path=str(file_path.absolute()),
caption=f"Diagram: {file_path.name}",
thread_id=thread_id,
)
if result.get("success"):
sent_files.append(file_path.name)
print(f"[Runtime] Sent diagram file: {file_path.name}")
else:
print(f"[Runtime] Failed to send diagram: {result.get('error')}")
except Exception as e:
print(f"[Runtime] Error sending diagram file: {e}")
if sent_files:
print(f"[Runtime] Successfully sent {len(sent_files)} diagram file(s)")
async def _process_message_queue(self) -> None:
"""Background task to process incoming messages."""
print("[Runtime] Message processing loop started")
while self._is_running:
try:
message = await asyncio.wait_for(
self._message_queue.get(), timeout=1.0
)
await self._process_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
print("[Runtime] Message processing loop stopped")
async def _process_message(self, message: InboundMessage) -> None:
"""Process a single message."""
preview = message.text[:50]
print(
f"[{message.platform.upper()}] "
f"Message from {message.username}: {preview}..."
)
try:
# Apply preprocessors
processed_message = message
for preprocessor in self._preprocessors:
processed_message = preprocessor(processed_message)
username = self.get_username(
message.platform, message.user_id
)
adapter = self.registry.get(message.platform)
if adapter:
await adapter.send_typing_indicator(message.channel_id)
# Capture the event loop for thread-safe progress updates
event_loop = asyncio.get_running_loop()
# Create progress callback to send updates to the user
def progress_callback(update_message: str):
"""Send progress updates to the user during long operations."""
if adapter:
try:
# Create outbound message for progress update
progress_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=update_message,
thread_id=message.thread_id,
)
# Run async send in a thread-safe way
# Use the captured event loop instead of get_running_loop()
# since this callback runs from a thread (agent.chat via to_thread)
asyncio.run_coroutine_threadsafe(
adapter.send_message(progress_msg),
event_loop
)
except Exception as e:
print(f"[Runtime] Failed to send progress update: {e}")
# Get response from agent (synchronous call in thread)
response = await asyncio.to_thread(
self.agent.chat,
user_message=processed_message.text,
username=username,
progress_callback=progress_callback,
inbound_message=processed_message,
)
# Apply postprocessors
for postprocessor in self._postprocessors:
response = postprocessor(response, processed_message)
# NEW: Detect and send diagram files mentioned in response
if adapter:
await self._detect_and_send_diagrams(
response,
adapter,
message.channel_id,
message.thread_id,
)
# Send response back
if adapter:
reply_to = (
message.metadata.get("ts")
or message.metadata.get("message_id")
)
outbound = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=response,
thread_id=message.thread_id,
reply_to_id=reply_to,
)
result = await adapter.send_message(outbound)
platform_tag = message.platform.upper()
if result.get("success"):
print(
f"[{platform_tag}] Response sent "
f"({len(response)} chars)"
)
else:
print(
f"[{platform_tag}] Failed to send response: "
f"{result.get('error')}"
)
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
if hasattr(self.agent, 'healing_system'):
self.agent.healing_system.capture_error(
error=e,
component="adapters/runtime.py:_process_message",
intent=f"Processing message from {message.platform}",
context={
"platform": message.platform,
"user": message.username,
"message_preview": message.text[:100],
},
)
await self._send_error_reply(message)
async def _send_error_reply(self, message: InboundMessage) -> None:
"""Attempt to send an error message back to the user."""
try:
adapter = self.registry.get(message.platform)
if adapter:
error_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=(
"Sorry, I encountered an error processing "
"your message. Please try again."
),
thread_id=message.thread_id,
)
await adapter.send_message(error_msg)
except Exception:
pass
async def start(self) -> None:
"""Start the runtime and all adapters."""
print("[Runtime] Starting adapter runtime...")
await self.registry.start_all()
# Pass the main event loop to the LLM interface so that Agent SDK
# async calls (from worker threads created by asyncio.to_thread)
# can be scheduled back onto this loop via run_coroutine_threadsafe.
loop = asyncio.get_running_loop()
if hasattr(self.agent, 'llm') and hasattr(self.agent.llm, 'set_event_loop'):
self.agent.llm.set_event_loop(loop)
print("[Runtime] Event loop reference passed to LLM interface")
self._is_running = True
self.message_loop_task = asyncio.create_task(
self._process_message_queue()
)
print("[Runtime] Runtime started")
async def stop(self) -> None:
"""Stop the runtime and all adapters."""
print("[Runtime] Stopping adapter runtime...")
self._is_running = False
if self.message_loop_task:
await self.message_loop_task
await self.registry.stop_all()
self.agent.shutdown()
print("[Runtime] Runtime stopped")
async def health_check(self) -> Dict[str, Any]:
"""Get health status of all adapters."""
status: Dict[str, Any] = {
"runtime_running": self._is_running,
"adapters": {},
}
for adapter in self.registry.get_all():
adapter_health = await adapter.health_check()
status["adapters"][adapter.platform_name] = adapter_health
return status