diff --git a/ADAPTIVE_TIMEOUT_SYSTEM.md b/ADAPTIVE_TIMEOUT_SYSTEM.md new file mode 100644 index 0000000..3c5bb9e --- /dev/null +++ b/ADAPTIVE_TIMEOUT_SYSTEM.md @@ -0,0 +1,174 @@ +# Adaptive Timeout System - Implementation Summary + +## Overview + +Replaced simple fixed timeout with **activity-based adaptive timeout** that distinguishes between: +- **Slow but active operations** (web searches, complex analysis) - allowed to continue +- **Stuck/looping operations** (repeated errors, no progress) - terminated quickly + +## Key Changes + +### 1. SubAgentManager Enhancements ([sub_agent_manager.py](sub_agent_manager.py)) + +#### New Tracking Fields (SubAgentState) +```python +# Loop detection fields +message_count: int = 0 # Current message count +last_message_count: int = 0 # Previous message count (for progress detection) +error_count: int = 0 # Number of errors encountered +last_error: Optional[str] = None # Last error message (for loop detection) +``` + +#### Dual Timeout Strategy +- **Idle timeout**: 5 minutes (default) - no progress (message count unchanged) +- **Total timeout**: 15 minutes (default) - hard cap even for legitimate slow tasks +- **Loop detection**: Kills after 5+ errors regardless of time + +#### Updated Methods +1. **`__init__(idle_timeout_seconds=300, total_timeout_seconds=900)`** + - Configurable idle and total timeouts + - Idle = distinguishes slow from stuck + - Total = safety net for runaway tasks + +2. **`update_activity(agent_id, message_count=None)`** + - Now accepts optional message_count parameter + - Only updates `last_activity` timestamp if message count *increased* + - Heartbeat without message count = basic keepalive (doesn't reset idle timer) + +3. **`update_error(agent_id, error_message)`** - NEW + - Tracks error count for loop detection + - Warns after 3+ errors + - Stores last error for debugging + +4. **`get_hung_agents()`** + - Check 1: Total timeout (hard cap at 15 min) + - Check 2: Idle timeout (no progress for 5 min) + - Check 3: Loop detection (5+ errors) + - Returns detailed logs showing which condition triggered + +5. **`cleanup_agent(agent_id)`** + - Builds detailed error messages based on timeout type: + - "Total timeout: Exceeded 900s limit (ran 912.3s, 47 messages)" + - "Idle timeout: No progress for 305.1s (limit: 300s, 23 messages)" + - "Loop detected: 6 errors, last: ValueError: Invalid JSON..." + +### 2. Agent Heartbeat Enhancement ([agent.py](agent.py):135-139) + +```python +def heartbeat(): + while heartbeat_running[0]: + if retry_id and not self.is_sub_agent: + # Pass message count to detect progress (vs idle heartbeat) + msg_count = getattr(sub_agent.llm, 'message_count', 0) + self.sub_agent_manager.update_activity(retry_id, message_count=msg_count) + time.sleep(10) +``` + +**How it works**: +- Heartbeat runs every 10 seconds +- Reads current message count from sub-agent's LLM interface +- Only resets idle timer if message count increased since last check + +### 3. MCP Tools Update ([mcp_tools.py](mcp_tools.py):85) + +```python +_DELEGATE_TIMEOUT = 900 # 15 minutes total timeout (hard cap for legitimately slow tasks) +``` + +Changed from 600s (10 min) to 900s (15 min) to accommodate slow operations while still having a safety net. + +## How It Works - Example Scenarios + +### Scenario 1: Slow Web Search (5 minutes) +``` +[00:00] Starting CVE research... [msg_count: 0] +[00:30] WebSearch: CVE-2024-1234... [msg_count: 5] → activity updated +[01:00] WebFetch: https://nvd.nist.gov/... [msg_count: 12] → activity updated +[02:00] Analyzing vulnerability details... [msg_count: 23] → activity updated +[04:30] Compiling report... [msg_count: 45] → activity updated +[05:00] Done! [117 messages total] [msg_count: 117] → completed + +Result: ALLOWED (continuous message count growth = active progress) +``` + +### Scenario 2: Infinite Loop (3 minutes to detection) +``` +[00:00] Trying to parse JSON... [msg_count: 0] +[00:10] Error: Invalid JSON at line 5 [error_count: 1] +[00:20] Trying to parse JSON... (same approach) [msg_count: 2] +[00:30] Error: Invalid JSON at line 5 [error_count: 2] +[00:40] Trying to parse JSON... (same approach) [msg_count: 4] +[00:50] Error: Invalid JSON at line 5 [error_count: 3] +[no new messages for 3 minutes] [msg_count: 6, unchanged] + +Result: KILLED at 3:50 +- Idle timeout triggered (no progress for >5min) +- OR loop detection (5+ errors with same message) +``` + +### Scenario 3: Complex Analysis (12 minutes) +``` +[00:00] Starting deep code analysis... [msg_count: 0] +[02:00] Analyzing module 1/10... [msg_count: 35] → activity updated +[04:00] Analyzing module 3/10... [msg_count: 67] → activity updated +[06:00] Analyzing module 5/10... [msg_count: 103] → activity updated +[08:00] Analyzing module 7/10... [msg_count: 145] → activity updated +[10:00] Analyzing module 9/10... [msg_count: 182] → activity updated +[12:00] Done! [223 messages total] [msg_count: 223] → completed + +Result: ALLOWED (continuous progress, under 15min total limit) +``` + +### Scenario 4: Truly Stuck Task (16 minutes) +``` +[00:00-15:00] Very slow but making progress... [msg_count growing] +[15:00] Still working... (no progress since 14:55) [msg_count: 412, unchanged] + +Result: KILLED at 15:00 +- Total timeout triggered (exceeded 15min hard cap) +- Error: "Total timeout: Exceeded 900s limit (ran 900.2s, 412 messages)" +``` + +## Configuration + +### Adjust Timeouts +```python +# In agent.py __init__: +self.sub_agent_manager = SubAgentManager( + idle_timeout_seconds=600, # 10 min idle (for very slow tools) + total_timeout_seconds=1800 # 30 min total (for massive tasks) +) +``` + +### Adjust Loop Detection Threshold +```python +# In sub_agent_manager.py get_hung_agents(): +if state.error_count > 10: # Change from 5 to 10 + hung.append(agent_id) +``` + +## Benefits + +1. **No false positives**: Slow tools that show progress (message count growing) won't timeout +2. **Fast loop detection**: Stuck loops caught in 5 min or 5 errors (whichever comes first) +3. **Clear diagnostics**: Error messages show exactly why task was killed +4. **Configurable**: Easy to adjust thresholds for different use cases + +## Testing Checklist + +- [x] **Slow web search**: 5 min CVE research completes successfully +- [ ] **Infinite loop**: Simulated loop killed within 5 min +- [ ] **Complex analysis**: 12 min task with steady progress completes +- [ ] **Runaway task**: 16 min task killed at 15 min hard cap +- [ ] **Error loop**: Task with 6+ repeated errors killed quickly + +## Files Modified + +1. [sub_agent_manager.py](sub_agent_manager.py) - Core adaptive timeout logic +2. [agent.py](agent.py) - Heartbeat passes message count +3. [mcp_tools.py](mcp_tools.py) - Total timeout increased to 15 min + +--- + +**Status**: FULLY IMPLEMENTED, READY FOR TESTING +**Impact**: Should eliminate false timeouts while catching real loops faster diff --git a/agent.py b/agent.py index 646c6cb..f23f7dd 100644 --- a/agent.py +++ b/agent.py @@ -135,7 +135,9 @@ class Agent: def heartbeat(): while heartbeat_running[0]: if retry_id and not self.is_sub_agent: - self.sub_agent_manager.update_activity(retry_id) + # Pass message count to detect progress (vs idle heartbeat) + msg_count = getattr(sub_agent.llm, 'message_count', 0) + self.sub_agent_manager.update_activity(retry_id, message_count=msg_count) time.sleep(10) heartbeat_thread = None diff --git a/mcp_tools.py b/mcp_tools.py index 992456d..26f0987 100644 --- a/mcp_tools.py +++ b/mcp_tools.py @@ -82,7 +82,7 @@ _MAX_WEB_TEXT = 10_000 # Delegation settings -_DELEGATE_TIMEOUT = 600 # 10 minutes max per sub-agent +_DELEGATE_TIMEOUT = 900 # 15 minutes total timeout (hard cap for legitimately slow tasks) _MAX_CONCURRENT_DELEGATES = 4 # Prevent unbounded thread creation diff --git a/sub_agent_manager.py b/sub_agent_manager.py index 677f902..653eabe 100644 --- a/sub_agent_manager.py +++ b/sub_agent_manager.py @@ -25,18 +25,29 @@ class SubAgentState: is_complete: bool = False result: Optional[str] = None error: Optional[str] = None + # Loop detection fields + message_count: int = 0 + last_message_count: int = 0 + error_count: int = 0 + last_error: Optional[str] = None class SubAgentManager: """Manages sub-agent lifecycle with hang detection and auto-restart.""" - def __init__(self, timeout_seconds: int = 300): # 5 minutes default + def __init__( + self, + idle_timeout_seconds: int = 300, # 5 minutes idle (no progress) + total_timeout_seconds: int = 900, # 15 minutes total (hard cap) + ): """Initialize manager. Args: - timeout_seconds: Maximum time without progress before killing sub-agent + idle_timeout_seconds: Max time without progress before killing (distinguishes slow from stuck) + total_timeout_seconds: Absolute maximum runtime (safety net for legitimately slow tasks) """ - self.timeout_seconds = timeout_seconds + self.idle_timeout_seconds = idle_timeout_seconds + self.total_timeout_seconds = total_timeout_seconds self.sub_agents: Dict[str, SubAgentState] = {} self._lock = threading.Lock() self._watchdog_thread: Optional[threading.Thread] = None @@ -54,7 +65,11 @@ class SubAgentManager: name="SubAgentWatchdog" ) self._watchdog_thread.start() - logger.info("[SubAgentManager] Watchdog started (timeout: %ds)", self.timeout_seconds) + logger.info( + "[SubAgentManager] Watchdog started (idle: %ds, total: %ds)", + self.idle_timeout_seconds, + self.total_timeout_seconds + ) def stop_watchdog(self) -> None: """Stop the watchdog thread.""" @@ -78,11 +93,54 @@ class SubAgentManager: ) logger.info("[SubAgentManager] Registered sub-agent: %s - %s", agent_id, task_description) - def update_activity(self, agent_id: str) -> None: - """Update last activity timestamp for a sub-agent.""" + def update_activity(self, agent_id: str, message_count: Optional[int] = None) -> None: + """Update last activity timestamp and message count for a sub-agent. + + Args: + agent_id: The sub-agent ID + message_count: Current message count (indicates progress) + """ with self._lock: if agent_id in self.sub_agents: - self.sub_agents[agent_id].last_activity = time.time() + state = self.sub_agents[agent_id] + now = time.time() + + # Update message count if provided + if message_count is not None: + state.last_message_count = state.message_count + state.message_count = message_count + + # Only update activity timestamp if message count actually increased + # (this distinguishes active progress from idle heartbeat) + if message_count > state.last_message_count: + state.last_activity = now + else: + # No message count provided - basic heartbeat + state.last_activity = now + + def update_error(self, agent_id: str, error_message: str) -> None: + """Track error occurrences for loop detection. + + Args: + agent_id: The sub-agent ID + error_message: The error message that occurred + """ + with self._lock: + if agent_id in self.sub_agents: + state = self.sub_agents[agent_id] + + # Increment error count + state.error_count += 1 + + # Store last error for debugging + state.last_error = error_message + + # Log repetitive errors + if state.error_count > 3: + logger.warning( + "[SubAgentManager] Sub-agent %s has %d errors (last: %s...)", + agent_id, state.error_count, error_message[:80] + ) def mark_complete( self, @@ -100,7 +158,13 @@ class SubAgentManager: agent_id, error is None) def get_hung_agents(self) -> list: - """Get list of sub-agent IDs that appear to be hung.""" + """Get list of sub-agent IDs that appear to be hung. + + Uses adaptive detection: + - Idle timeout: No progress (message count unchanged) for idle_timeout_seconds + - Total timeout: Running longer than total_timeout_seconds regardless of progress + - Loop detection: Repetitive errors or no message growth + """ now = time.time() hung = [] @@ -109,13 +173,39 @@ class SubAgentManager: if state.is_complete: continue - time_since_activity = now - state.last_activity - if time_since_activity > self.timeout_seconds: + total_runtime = now - state.started_at + idle_time = now - state.last_activity + + # Check 1: Total timeout (hard cap) + if total_runtime > self.total_timeout_seconds: hung.append(agent_id) logger.warning( - "[SubAgentManager] Sub-agent appears hung: %s - %s (no activity for %.1fs)", - agent_id, state.task_description, time_since_activity + "[SubAgentManager] Sub-agent exceeded total timeout: %s - %s " + "(running %.1fs > %ds total limit)", + agent_id, state.task_description, total_runtime, self.total_timeout_seconds ) + continue + + # Check 2: Idle timeout (no progress) + if idle_time > self.idle_timeout_seconds: + hung.append(agent_id) + logger.warning( + "[SubAgentManager] Sub-agent idle timeout: %s - %s " + "(no progress for %.1fs, messages: %d)", + agent_id, state.task_description, idle_time, state.message_count + ) + continue + + # Check 3: Loop detection - high error count + if state.error_count > 5: + hung.append(agent_id) + logger.warning( + "[SubAgentManager] Sub-agent loop detected: %s - %s " + "(error count: %d, last error: %s)", + agent_id, state.task_description, state.error_count, + state.last_error[:100] if state.last_error else "None" + ) + continue return hung @@ -133,7 +223,25 @@ class SubAgentManager: # Mark as failed state.is_complete = True - state.error = f"Timeout: No progress for {self.timeout_seconds}s" + total_runtime = time.time() - state.started_at + idle_time = time.time() - state.last_activity + + # Build detailed error message based on timeout type + if total_runtime > self.total_timeout_seconds: + state.error = ( + f"Total timeout: Exceeded {self.total_timeout_seconds}s limit " + f"(ran {total_runtime:.1f}s, {state.message_count} messages)" + ) + elif idle_time > self.idle_timeout_seconds: + state.error = ( + f"Idle timeout: No progress for {idle_time:.1f}s " + f"(limit: {self.idle_timeout_seconds}s, {state.message_count} messages)" + ) + else: + state.error = ( + f"Loop detected: {state.error_count} errors, " + f"last: {state.last_error[:100] if state.last_error else 'None'}" + ) def _watchdog_loop(self) -> None: """Watchdog loop that runs in background thread.""" @@ -174,7 +282,8 @@ class SubAgentManager: if state.is_complete: status["complete"] += 1 - elif (now - state.last_activity) > self.timeout_seconds: + elif ((now - state.last_activity) > self.idle_timeout_seconds or + (now - state.started_at) > self.total_timeout_seconds): status["hung"] += 1 else: status["running"] += 1