Files
ajarbot/adapters/runtime.py
Jordan Ramos fe7c146dc6 feat: Add Gitea MCP integration and project cleanup
## New Features
- **Gitea MCP Tools** (zero API cost):
  - gitea_read_file: Read files from homelab repo
  - gitea_list_files: Browse directories
  - gitea_search_code: Search by filename
  - gitea_get_tree: Get directory tree
- **Gitea Client** (gitea_tools/client.py): REST API wrapper with OAuth
- **Proxmox SSH Scripts** (scripts/): Homelab data collection utilities
- **Obsidian MCP Support** (obsidian_mcp.py): Advanced vault operations
- **Voice Integration Plan** (JARVIS_VOICE_INTEGRATION_PLAN.md)

## Improvements
- **Increased timeout**: 5min → 10min for complex tasks (llm_interface.py)
- **Removed Direct API fallback**: Gitea tools are MCP-only (zero cost)
- **Updated .env.example**: Added Obsidian MCP configuration
- **Enhanced .gitignore**: Protect personal memory files (SOUL.md, MEMORY.md)

## Cleanup
- Deleted 24 obsolete files (temp/test/experimental scripts, outdated docs)
- Untracked personal memory files (SOUL.md, MEMORY.md now in .gitignore)
- Removed: AGENT_SDK_IMPLEMENTATION.md, HYBRID_SEARCH_SUMMARY.md,
  IMPLEMENTATION_SUMMARY.md, MIGRATION.md, test_agent_sdk.py, etc.

## Configuration
- Added config/gitea_config.example.yaml (Gitea setup template)
- Added config/obsidian_mcp.example.yaml (Obsidian MCP template)
- Updated scheduled_tasks.yaml with new task examples

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-18 20:31:32 -07:00

320 lines
11 KiB
Python

"""
Adapter runtime system for ajarbot.
Connects messaging platform adapters to the Agent instance.
"""
import asyncio
import re
import traceback
from typing import Any, Callable, Dict, List, Optional
from adapters.base import (
AdapterRegistry,
BaseAdapter,
InboundMessage,
OutboundMessage,
)
from agent import Agent
class AdapterRuntime:
"""
Runtime system that connects adapters to the Agent.
Acts as the bridge between messaging platforms (Slack, Telegram, etc.)
and the Agent (memory + LLM).
"""
def __init__(
self,
agent: Agent,
registry: Optional[AdapterRegistry] = None,
) -> None:
self.agent = agent
self.registry = registry or AdapterRegistry()
self.message_loop_task: Optional[asyncio.Task] = None
self._message_queue: asyncio.Queue = asyncio.Queue()
self._is_running = False
# User ID mapping: platform_user_id -> username
self._user_mapping: Dict[str, str] = {}
self._preprocessors: List[
Callable[[InboundMessage], InboundMessage]
] = []
self._postprocessors: List[
Callable[[str, InboundMessage], str]
] = []
def add_adapter(self, adapter: BaseAdapter) -> None:
"""Add and configure an adapter."""
self.registry.register(adapter)
adapter.register_message_handler(self._on_message_received)
def map_user(self, platform_user_id: str, username: str) -> None:
"""Map a platform user ID to an ajarbot username."""
self._user_mapping[platform_user_id] = username
def get_username(
self, platform: str, platform_user_id: str
) -> str:
"""
Get ajarbot username for a platform user.
Falls back to platform_user_id format if no mapping exists.
"""
key = f"{platform}:{platform_user_id}"
# Use underscore for fallback to match validation rules
fallback = f"{platform}_{platform_user_id}"
return self._user_mapping.get(key, fallback)
def add_preprocessor(
self,
preprocessor: Callable[[InboundMessage], InboundMessage],
) -> None:
"""Add a message preprocessor (e.g., for commands, filters)."""
self._preprocessors.append(preprocessor)
def add_postprocessor(
self,
postprocessor: Callable[[str, InboundMessage], str],
) -> None:
"""Add a response postprocessor (e.g., for formatting)."""
self._postprocessors.append(postprocessor)
def _on_message_received(self, message: InboundMessage) -> None:
"""Handle incoming message from an adapter.
This may be called from different event loop contexts (e.g.,
python-telegram-bot's internal loop vs. our main asyncio loop),
so we use loop-safe scheduling instead of create_task().
"""
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(self._message_queue.put_nowait, message)
except RuntimeError:
# No running loop - should not happen in normal operation
# but handle gracefully
print("[Runtime] Warning: No event loop for message dispatch")
self._message_queue.put_nowait(message)
async def _process_message_queue(self) -> None:
"""Background task to process incoming messages."""
print("[Runtime] Message processing loop started")
while self._is_running:
try:
message = await asyncio.wait_for(
self._message_queue.get(), timeout=1.0
)
await self._process_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
print("[Runtime] Message processing loop stopped")
async def _process_message(self, message: InboundMessage) -> None:
"""Process a single message."""
preview = message.text[:50]
print(
f"[{message.platform.upper()}] "
f"Message from {message.username}: {preview}..."
)
try:
# Apply preprocessors
processed_message = message
for preprocessor in self._preprocessors:
processed_message = preprocessor(processed_message)
username = self.get_username(
message.platform, message.user_id
)
adapter = self.registry.get(message.platform)
if adapter:
await adapter.send_typing_indicator(message.channel_id)
# Capture the event loop for thread-safe progress updates
event_loop = asyncio.get_running_loop()
# Create progress callback to send updates to the user
def progress_callback(update_message: str):
"""Send progress updates to the user during long operations."""
if adapter:
try:
# Create outbound message for progress update
progress_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=update_message,
thread_id=message.thread_id,
)
# Run async send in a thread-safe way
# Use the captured event loop instead of get_running_loop()
# since this callback runs from a thread (agent.chat via to_thread)
asyncio.run_coroutine_threadsafe(
adapter.send_message(progress_msg),
event_loop
)
except Exception as e:
print(f"[Runtime] Failed to send progress update: {e}")
# Get response from agent (synchronous call in thread)
response = await asyncio.to_thread(
self.agent.chat,
user_message=processed_message.text,
username=username,
progress_callback=progress_callback,
)
# Apply postprocessors
for postprocessor in self._postprocessors:
response = postprocessor(response, processed_message)
# Send response back
if adapter:
reply_to = (
message.metadata.get("ts")
or message.metadata.get("message_id")
)
outbound = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=response,
thread_id=message.thread_id,
reply_to_id=reply_to,
)
result = await adapter.send_message(outbound)
platform_tag = message.platform.upper()
if result.get("success"):
print(
f"[{platform_tag}] Response sent "
f"({len(response)} chars)"
)
else:
print(
f"[{platform_tag}] Failed to send response: "
f"{result.get('error')}"
)
except Exception as e:
print(f"[Runtime] Error processing message: {e}")
traceback.print_exc()
if hasattr(self.agent, 'healing_system'):
self.agent.healing_system.capture_error(
error=e,
component="adapters/runtime.py:_process_message",
intent=f"Processing message from {message.platform}",
context={
"platform": message.platform,
"user": message.username,
"message_preview": message.text[:100],
},
)
await self._send_error_reply(message)
async def _send_error_reply(self, message: InboundMessage) -> None:
"""Attempt to send an error message back to the user."""
try:
adapter = self.registry.get(message.platform)
if adapter:
error_msg = OutboundMessage(
platform=message.platform,
channel_id=message.channel_id,
text=(
"Sorry, I encountered an error processing "
"your message. Please try again."
),
thread_id=message.thread_id,
)
await adapter.send_message(error_msg)
except Exception:
pass
async def start(self) -> None:
"""Start the runtime and all adapters."""
print("[Runtime] Starting adapter runtime...")
await self.registry.start_all()
# Pass the main event loop to the LLM interface so that Agent SDK
# async calls (from worker threads created by asyncio.to_thread)
# can be scheduled back onto this loop via run_coroutine_threadsafe.
loop = asyncio.get_running_loop()
if hasattr(self.agent, 'llm') and hasattr(self.agent.llm, 'set_event_loop'):
self.agent.llm.set_event_loop(loop)
print("[Runtime] Event loop reference passed to LLM interface")
self._is_running = True
self.message_loop_task = asyncio.create_task(
self._process_message_queue()
)
print("[Runtime] Runtime started")
async def stop(self) -> None:
"""Stop the runtime and all adapters."""
print("[Runtime] Stopping adapter runtime...")
self._is_running = False
if self.message_loop_task:
await self.message_loop_task
await self.registry.stop_all()
self.agent.shutdown()
print("[Runtime] Runtime stopped")
async def health_check(self) -> Dict[str, Any]:
"""Get health status of all adapters."""
status: Dict[str, Any] = {
"runtime_running": self._is_running,
"adapters": {},
}
for adapter in self.registry.get_all():
adapter_health = await adapter.health_check()
status["adapters"][adapter.platform_name] = adapter_health
return status
# --- Example Preprocessors and Postprocessors ---
def command_preprocessor(message: InboundMessage) -> InboundMessage:
"""Example: Handle bot commands."""
if not message.text.startswith("/"):
return message
parts = message.text.split(maxsplit=1)
command = parts[0]
if command == "/status":
message.text = "What is your current status?"
elif command == "/help":
message.text = (
"Please provide help information about what you can do."
)
return message
def markdown_postprocessor(
response: str, original_message: InboundMessage
) -> str:
"""Example: Ensure markdown compatibility for Slack."""
if original_message.platform != "slack":
return response
# Convert standard markdown bold to Slack mrkdwn
response = response.replace("**", "*")
# Slack doesn't support ## headers
response = re.sub(r"^#+\s+", "", response, flags=re.MULTILINE)
return response