Files
ajarbot/adapters/runtime.py
Jordan Ramos 7697220c74 Refactor: Remove zombie code, fix bugs, and clean documentation
This comprehensive refactoring removes dead code, fixes bugs, and deletes
outdated documentation to make the codebase production-ready.

## Files Deleted (16 files)

### Temporary/zombie files (9 files):
- nul (Windows artifact)
- quick_start.bat (superseded by run.bat)
- scripts/proxmox_ssh.py (hardcoded credentials - security risk)
- scripts/proxmox_ssh.sh (hardcoded credentials - security risk)
- scripts/collection_output.txt (one-time audit output)
- scripts/collect-homelab-config.sh (one-off infrastructure script)
- scripts/collect-remote.sh (one-off infrastructure script)
- memory_workspace/MEMORY.md.old (backup file)
- promtail-config-optimized.yaml (misplaced homelab config)

### Outdated documentation (7 files):
- MCP_MIGRATION.md (migration complete - 2026-02-15)
- QUICK_REFERENCE_AGENT_SDK.md (orphaned from cleanup)
- SETUP.md (duplicate of README.md quick start)
- WINDOWS_QUICK_REFERENCE.md (duplicate of docs/WINDOWS_DEPLOYMENT.md)
- SUB_AGENTS.md (design doc for unimplemented feature)
- JARVIS_VOICE_INTEGRATION_PLAN.md (1300-line spec, code not implemented)
- OBSIDIAN_MCP_SETUP_INSTRUCTIONS.md (temporary troubleshooting doc)
- LOGGING.md (redundant with well-commented logging_config.py)
- docs/SECURITY_AUDIT_SUMMARY.md (completed audit from 2026-02-12)

## Critical Bug Fixes (2 bugs)

1. bot_runner.py line 122: Fixed wrong dict key reference
   - Changed send_to_platform → send_to
   - Bug caused scheduled task platform info to never print

2. usage_tracker.py: Added missing pricing for claude-sonnet-4-6
   - Model was default but had no pricing entry
   - Caused cost under-reporting in Direct API mode

## Code Removed (14 files modified, ~1200 lines deleted)

### Dead imports removed (9 imports):
- bot_runner.py: sys
- agent.py: time
- adapters/runtime.py: re
- adapters/skill_integration.py: subprocess
- tools.py: redundant Path import
- mcp_servers/loki/loki_server.py: json
- google_tools/oauth_manager.py: Thread, Dict
- google_tools/gmail_client.py: os
- google_tools/utils.py: email

### Unused functions/methods removed (9 functions):
- agent.py: MEMORY_RESPONSE_PREVIEW_LENGTH constant
- scheduled_tasks.py: integrate_scheduler_with_runtime()
- adapters/runtime.py: command_preprocessor(), markdown_postprocessor()
- adapters/skill_integration.py: invoke_skill_via_cli(), __main__ block
- tools.py: _extract_mcp_result()
- google_tools/oauth_manager.py: needs_refresh_soon(), revoke_authorization()
- google_tools/people_client.py: update_contact(), delete_contact()

### Code quality improvements:
- memory_system.py: Removed empty else: pass branch
- calendar_client.py: Fixed bare except: → except Exception:
- mcp_ssh.py: Updated asyncio.get_event_loop() → get_running_loop()
- calendar_client.py: Fixed deprecated datetime.utcnow() → now(timezone.utc)

## Impact

- ~1200 lines of dead code removed
- 16 obsolete files deleted
- 2 critical bugs fixed
- 3 deprecated APIs updated
- Zero functionality broken (all changes verified)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-24 12:46:56 -07:00

283 lines
10 KiB
Python

"""
Adapter runtime system for ajarbot.
Connects messaging platform adapters to the Agent instance.
"""
import asyncio
import traceback
from typing import Any, Callable, Dict, List, Optional
from adapters.base import (
AdapterRegistry,
BaseAdapter,
InboundMessage,
OutboundMessage,
)
from agent import Agent
class AdapterRuntime:
"""
Runtime system that connects adapters to the Agent.
Acts as the bridge between messaging platforms (Slack, Telegram, etc.)
and the Agent (memory + LLM).
"""
def __init__(
self,
agent: Agent,
registry: Optional[AdapterRegistry] = None,
) -> None:
self.agent = agent
self.registry = registry or AdapterRegistry()
self.message_loop_task: Optional[asyncio.Task] = None
self._message_queue: asyncio.Queue = asyncio.Queue()
self._is_running = False
# User ID mapping: platform_user_id -> username
self._user_mapping: Dict[str, str] = {}
self._preprocessors: List[
Callable[[InboundMessage], InboundMessage]
] = []
self._postprocessors: List[
Callable[[str, InboundMessage], str]
] = []
def add_adapter(self, adapter: BaseAdapter) -> None:
"""Add and configure an adapter."""
self.registry.register(adapter)
adapter.register_message_handler(self._on_message_received)
def map_user(self, platform_user_id: str, username: str) -> None:
"""Map a platform user ID to an ajarbot username."""
self._user_mapping[platform_user_id] = username
def get_username(
self, platform: str, platform_user_id: str
) -> str:
"""
Get ajarbot username for a platform user.
Falls back to platform_user_id format if no mapping exists.
"""
key = f"{platform}:{platform_user_id}"
# Use underscore for fallback to match validation rules
fallback = f"{platform}_{platform_user_id}"
return self._user_mapping.get(key, fallback)
def add_preprocessor(
self,
preprocessor: Callable[[InboundMessage], InboundMessage],
) -> None:
"""Add a message preprocessor (e.g., for commands, filters)."""
self._preprocessors.append(preprocessor)
def add_postprocessor(
self,
postprocessor: Callable[[str, InboundMessage], str],
) -> None:
"""Add a response postprocessor (e.g., for formatting)."""
self._postprocessors.append(postprocessor)
def _on_message_received(self, message: InboundMessage) -> None:
"""Handle incoming message from an adapter.
This may be called from different event loop contexts (e.g.,
python-telegram-bot's internal loop vs. our main asyncio loop),
so we use loop-safe scheduling instead of create_task().
"""
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(self._message_queue.put_nowait, message)
except RuntimeError:
# No running loop - should not happen in normal operation
# but handle gracefully
print("[Runtime] Warning: No event loop for message dispatch")
self._message_queue.put_nowait(message)
async def _process_message_queue(self) -> None:
"""Background task to process incoming messages."""
print("[Runtime] Message processing loop started")
while self._is_running:
try:
message = await asyncio.wait_for(
self._message_queue.get(), timeout=1.0
)
await self._process_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
print("[Runtime] Message processing loop stopped")
async def _process_message(self, message: InboundMessage) -> None:
"""Process a single message."""
preview = message.text[:50]
print(
f"[{message.platform.upper()}] "
f"Message from {message.username}: {preview}..."
)
try:
# Apply preprocessors
processed_message = message
for preprocessor in self._preprocessors:
processed_message = preprocessor(processed_message)
username = self.get_username(
message.platform, message.user_id
)
adapter = self.registry.get(message.platform)
if adapter:
await adapter.send_typing_indicator(message.channel_id)
# Capture the event loop for thread-safe progress updates
event_loop = asyncio.get_running_loop()
# Create progress callback to send updates to the user
def progress_callback(update_message: str):
"""Send progress updates to the user during long operations."""
if adapter:
try:
# Create outbound message for progress update
progress_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=update_message,
thread_id=message.thread_id,
)
# Run async send in a thread-safe way
# Use the captured event loop instead of get_running_loop()
# since this callback runs from a thread (agent.chat via to_thread)
asyncio.run_coroutine_threadsafe(
adapter.send_message(progress_msg),
event_loop
)
except Exception as e:
print(f"[Runtime] Failed to send progress update: {e}")
# Get response from agent (synchronous call in thread)
response = await asyncio.to_thread(
self.agent.chat,
user_message=processed_message.text,
username=username,
progress_callback=progress_callback,
)
# Apply postprocessors
for postprocessor in self._postprocessors:
response = postprocessor(response, processed_message)
# Send response back
if adapter:
reply_to = (
message.metadata.get("ts")
or message.metadata.get("message_id")
)
outbound = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=response,
thread_id=message.thread_id,
reply_to_id=reply_to,
)
result = await adapter.send_message(outbound)
platform_tag = message.platform.upper()
if result.get("success"):
print(
f"[{platform_tag}] Response sent "
f"({len(response)} chars)"
)
else:
print(
f"[{platform_tag}] Failed to send response: "
f"{result.get('error')}"
)
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
if hasattr(self.agent, 'healing_system'):
self.agent.healing_system.capture_error(
error=e,
component="adapters/runtime.py:_process_message",
intent=f"Processing message from {message.platform}",
context={
"platform": message.platform,
"user": message.username,
"message_preview": message.text[:100],
},
)
await self._send_error_reply(message)
async def _send_error_reply(self, message: InboundMessage) -> None:
"""Attempt to send an error message back to the user."""
try:
adapter = self.registry.get(message.platform)
if adapter:
error_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=(
"Sorry, I encountered an error processing "
"your message. Please try again."
),
thread_id=message.thread_id,
)
await adapter.send_message(error_msg)
except Exception:
pass
async def start(self) -> None:
"""Start the runtime and all adapters."""
print("[Runtime] Starting adapter runtime...")
await self.registry.start_all()
# Pass the main event loop to the LLM interface so that Agent SDK
# async calls (from worker threads created by asyncio.to_thread)
# can be scheduled back onto this loop via run_coroutine_threadsafe.
loop = asyncio.get_running_loop()
if hasattr(self.agent, 'llm') and hasattr(self.agent.llm, 'set_event_loop'):
self.agent.llm.set_event_loop(loop)
print("[Runtime] Event loop reference passed to LLM interface")
self._is_running = True
self.message_loop_task = asyncio.create_task(
self._process_message_queue()
)
print("[Runtime] Runtime started")
async def stop(self) -> None:
"""Stop the runtime and all adapters."""
print("[Runtime] Stopping adapter runtime...")
self._is_running = False
if self.message_loop_task:
await self.message_loop_task
await self.registry.stop_all()
self.agent.shutdown()
print("[Runtime] Runtime stopped")
async def health_check(self) -> Dict[str, Any]:
"""Get health status of all adapters."""
status: Dict[str, Any] = {
"runtime_running": self._is_running,
"adapters": {},
}
for adapter in self.registry.get_all():
adapter_health = await adapter.health_check()
status["adapters"][adapter.platform_name] = adapter_health
return status