Files
ajarbot/sub_agent_manager.py
Jordan Ramos e909cc0044 Add MCP delegation bridge and diagram tools
**Features Added**:

1. **Agent Registry (agent_registry.py)**
   - Thread-safe global singleton for MCP tool access to Agent instance
   - Enables MCP tools to call Agent.delegate() without circular imports
   - Registered at bot startup in bot_runner.py

2. **Sub-Agent Manager (sub_agent_manager.py)**
   - Watchdog system monitoring sub-agent lifecycle
   - Detects hung agents (5min timeout, 30s check interval)
   - Auto-cleanup and status tracking

3. **delegate_task MCP Tool (mcp_tools.py)**
   - Exposes Agent.delegate() to Claude via MCP protocol
   - Enables parallel sub-agent execution via tool calls
   - Supports specialist prompts and agent ID caching

4. **Memory Write Locks (memory_system.py)**
   - Thread-safe writes to prevent file corruption
   - Protects write_memory(), update_soul(), update_user()

5. **Diagram Tools**
   - Mermaid MCP server (flowcharts, sequence diagrams, etc.)
   - Excalidraw MCP server (hand-drawn style diagrams)
   - Config files in config/ directory

6. **Adapter Improvements**
   - Enhanced error handling across all adapters
   - Unified logging patterns

**Testing**: Ready for parallel sub-agent testing

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-01 14:34:24 -07:00

198 lines
6.5 KiB
Python

"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle.
Handles:
- Sub-agent spawning and tracking
- Progress monitoring and hang detection
- Automatic cleanup and restart on timeout
"""
import time
import threading
from typing import Dict, Optional, Any
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SubAgentState:
"""Track state of a running sub-agent."""
agent_id: str
task_description: str
started_at: float
last_activity: float
is_complete: bool = False
result: Optional[str] = None
error: Optional[str] = None
class SubAgentManager:
"""Manages sub-agent lifecycle with hang detection and auto-restart."""
def __init__(self, timeout_seconds: int = 300): # 5 minutes default
"""Initialize manager.
Args:
timeout_seconds: Maximum time without progress before killing sub-agent
"""
self.timeout_seconds = timeout_seconds
self.sub_agents: Dict[str, SubAgentState] = {}
self._lock = threading.Lock()
self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_running = False
def start_watchdog(self) -> None:
"""Start the watchdog thread that monitors for hung sub-agents."""
if self._watchdog_running:
return
self._watchdog_running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog_loop,
daemon=True,
name="SubAgentWatchdog"
)
self._watchdog_thread.start()
logger.info("[SubAgentManager] Watchdog started (timeout: %ds)", self.timeout_seconds)
def stop_watchdog(self) -> None:
"""Stop the watchdog thread."""
self._watchdog_running = False
if self._watchdog_thread:
self._watchdog_thread.join(timeout=2)
def register_sub_agent(
self,
agent_id: str,
task_description: str
) -> None:
"""Register a new sub-agent for monitoring."""
with self._lock:
now = time.time()
self.sub_agents[agent_id] = SubAgentState(
agent_id=agent_id,
task_description=task_description,
started_at=now,
last_activity=now
)
logger.info("[SubAgentManager] Registered sub-agent: %s - %s", agent_id, task_description)
def update_activity(self, agent_id: str) -> None:
"""Update last activity timestamp for a sub-agent."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].last_activity = time.time()
def mark_complete(
self,
agent_id: str,
result: Optional[str] = None,
error: Optional[str] = None
) -> None:
"""Mark a sub-agent as complete."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].is_complete = True
self.sub_agents[agent_id].result = result
self.sub_agents[agent_id].error = error
logger.info("[SubAgentManager] Sub-agent completed: %s (success=%s)",
agent_id, error is None)
def get_hung_agents(self) -> list:
"""Get list of sub-agent IDs that appear to be hung."""
now = time.time()
hung = []
with self._lock:
for agent_id, state in self.sub_agents.items():
if state.is_complete:
continue
time_since_activity = now - state.last_activity
if time_since_activity > self.timeout_seconds:
hung.append(agent_id)
logger.warning(
"[SubAgentManager] Sub-agent appears hung: %s - %s (no activity for %.1fs)",
agent_id, state.task_description, time_since_activity
)
return hung
def cleanup_agent(self, agent_id: str) -> None:
"""Clean up a hung sub-agent."""
with self._lock:
if agent_id in self.sub_agents:
state = self.sub_agents[agent_id]
logger.error(
"[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)",
agent_id,
state.task_description,
time.time() - state.last_activity
)
# Mark as failed
state.is_complete = True
state.error = f"Timeout: No progress for {self.timeout_seconds}s"
def _watchdog_loop(self) -> None:
"""Watchdog loop that runs in background thread."""
while self._watchdog_running:
try:
hung_agents = self.get_hung_agents()
for agent_id in hung_agents:
self.cleanup_agent(agent_id)
# Check every 30 seconds
time.sleep(30)
except Exception as e:
logger.error("[SubAgentManager] Watchdog error: %s", e)
time.sleep(30)
def get_status(self) -> Dict[str, Any]:
"""Get current status of all sub-agents."""
now = time.time()
status = {
"total": len(self.sub_agents),
"complete": 0,
"running": 0,
"hung": 0,
"agents": []
}
with self._lock:
for agent_id, state in self.sub_agents.items():
agent_status = {
"id": agent_id,
"task": state.task_description,
"runtime": now - state.started_at,
"idle_time": now - state.last_activity,
"complete": state.is_complete,
"has_error": state.error is not None
}
if state.is_complete:
status["complete"] += 1
elif (now - state.last_activity) > self.timeout_seconds:
status["hung"] += 1
else:
status["running"] += 1
status["agents"].append(agent_status)
return status
def clear_completed(self) -> None:
"""Remove completed sub-agents from tracking."""
with self._lock:
completed = [
agent_id for agent_id, state in self.sub_agents.items()
if state.is_complete
]
for agent_id in completed:
del self.sub_agents[agent_id]
if completed:
logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))