"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle. Handles: - Sub-agent spawning and tracking - Progress monitoring and hang detection - Automatic cleanup and restart on timeout """ import time import threading from typing import Dict, Optional, Any from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class SubAgentState: """Track state of a running sub-agent.""" agent_id: str task_description: str started_at: float last_activity: float is_complete: bool = False result: Optional[str] = None error: Optional[str] = None class SubAgentManager: """Manages sub-agent lifecycle with hang detection and auto-restart.""" def __init__(self, timeout_seconds: int = 300): # 5 minutes default """Initialize manager. Args: timeout_seconds: Maximum time without progress before killing sub-agent """ self.timeout_seconds = timeout_seconds self.sub_agents: Dict[str, SubAgentState] = {} self._lock = threading.Lock() self._watchdog_thread: Optional[threading.Thread] = None self._watchdog_running = False def start_watchdog(self) -> None: """Start the watchdog thread that monitors for hung sub-agents.""" if self._watchdog_running: return self._watchdog_running = True self._watchdog_thread = threading.Thread( target=self._watchdog_loop, daemon=True, name="SubAgentWatchdog" ) self._watchdog_thread.start() logger.info("[SubAgentManager] Watchdog started (timeout: %ds)", self.timeout_seconds) def stop_watchdog(self) -> None: """Stop the watchdog thread.""" self._watchdog_running = False if self._watchdog_thread: self._watchdog_thread.join(timeout=2) def register_sub_agent( self, agent_id: str, task_description: str ) -> None: """Register a new sub-agent for monitoring.""" with self._lock: now = time.time() self.sub_agents[agent_id] = SubAgentState( agent_id=agent_id, task_description=task_description, started_at=now, last_activity=now ) logger.info("[SubAgentManager] Registered sub-agent: %s - %s", agent_id, task_description) def update_activity(self, agent_id: str) -> None: """Update last activity timestamp for a sub-agent.""" with self._lock: if agent_id in self.sub_agents: self.sub_agents[agent_id].last_activity = time.time() def mark_complete( self, agent_id: str, result: Optional[str] = None, error: Optional[str] = None ) -> None: """Mark a sub-agent as complete.""" with self._lock: if agent_id in self.sub_agents: self.sub_agents[agent_id].is_complete = True self.sub_agents[agent_id].result = result self.sub_agents[agent_id].error = error logger.info("[SubAgentManager] Sub-agent completed: %s (success=%s)", agent_id, error is None) def get_hung_agents(self) -> list: """Get list of sub-agent IDs that appear to be hung.""" now = time.time() hung = [] with self._lock: for agent_id, state in self.sub_agents.items(): if state.is_complete: continue time_since_activity = now - state.last_activity if time_since_activity > self.timeout_seconds: hung.append(agent_id) logger.warning( "[SubAgentManager] Sub-agent appears hung: %s - %s (no activity for %.1fs)", agent_id, state.task_description, time_since_activity ) return hung def cleanup_agent(self, agent_id: str) -> None: """Clean up a hung sub-agent.""" with self._lock: if agent_id in self.sub_agents: state = self.sub_agents[agent_id] logger.error( "[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)", agent_id, state.task_description, time.time() - state.last_activity ) # Mark as failed state.is_complete = True state.error = f"Timeout: No progress for {self.timeout_seconds}s" def _watchdog_loop(self) -> None: """Watchdog loop that runs in background thread.""" while self._watchdog_running: try: hung_agents = self.get_hung_agents() for agent_id in hung_agents: self.cleanup_agent(agent_id) # Check every 30 seconds time.sleep(30) except Exception as e: logger.error("[SubAgentManager] Watchdog error: %s", e) time.sleep(30) def get_status(self) -> Dict[str, Any]: """Get current status of all sub-agents.""" now = time.time() status = { "total": len(self.sub_agents), "complete": 0, "running": 0, "hung": 0, "agents": [] } with self._lock: for agent_id, state in self.sub_agents.items(): agent_status = { "id": agent_id, "task": state.task_description, "runtime": now - state.started_at, "idle_time": now - state.last_activity, "complete": state.is_complete, "has_error": state.error is not None } if state.is_complete: status["complete"] += 1 elif (now - state.last_activity) > self.timeout_seconds: status["hung"] += 1 else: status["running"] += 1 status["agents"].append(agent_status) return status def clear_completed(self) -> None: """Remove completed sub-agents from tracking.""" with self._lock: completed = [ agent_id for agent_id, state in self.sub_agents.items() if state.is_complete ] for agent_id in completed: del self.sub_agents[agent_id] if completed: logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))