Core agent improvements: - RSO (Relevance Scoring & Observation) system: interaction_logger, memory_scorer, signal_detector - Memory access logging (memory_access_log table) for relevance scoring; high-signal turn detection - Rich conversation storage for notable turns; compact_conversation truncates long user messages - Task-type classifier (query/action/analysis/creative) for observation tagging - Nested sub-agent visibility: deep delegations now register against the main agent's manager Child safety (Gabriel profile): - child_safety.py: filtering, audit logging, prompt constants for restricted sessions - .kiro/specs/child-safety-profile: requirements, design, tasks specs - GABRIEL_BOT_PROPOSAL.md: initial proposal doc - Reduced context window (10 msgs) and tutor-mode identity for restricted users Telegram adapter: - Polling watchdog: auto-restarts updater if polling drops unexpectedly - get_me() with exponential-backoff retry on NetworkError at startup - Correct stop() ordering: signal watchdog before cancelling tasks Email / Gmail: - send_email: supports file attachments (attachments list param) - get_email: surfaces attachment metadata in response Scheduled tasks / weather: - Remove OpenWeatherMap API calls from morning-weather task; use wttr.in exclusively - New scheduled tasks and scheduler state persistence Discord: - adapters/discord/__init__.py scaffold - discord-plugin: MCP plugin for Claude Code Discord integration (server.ts, skills, config) Infrastructure: - n8n workflow exports (garvis_webhook, content_pipeline variants) - memory_workspace: context, homelab-repo-updates, weekly observation summaries, error logs - UCS C240 migration plan doc - requirements.txt: new deps - .claude/settings.json, fix_hooks.py: hook/permission tuning
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle.
|
|
|
|
Handles:
|
|
- Sub-agent spawning and tracking
|
|
- Progress monitoring and hang detection
|
|
- Automatic cleanup and restart on timeout
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from concurrent.futures import Future
|
|
from typing import Dict, Optional, Any
|
|
from dataclasses import dataclass, field
|
|
|
|
from logging_config import StructuredLogger as _StructuredLogger
|
|
# Use the project's structured logger so watchdog/state lines go to ajarbot.log.
|
|
logger = _StructuredLogger("sub_agent_manager").logger
|
|
|
|
|
|
@dataclass
|
|
class SubAgentState:
|
|
"""Track state of a running sub-agent."""
|
|
agent_id: str
|
|
task_description: str
|
|
started_at: float
|
|
last_activity: float
|
|
is_complete: bool = False
|
|
result: Optional[str] = None
|
|
error: Optional[str] = None
|
|
# Loop detection fields
|
|
message_count: int = 0
|
|
last_message_count: int = 0
|
|
error_count: int = 0
|
|
last_error: Optional[str] = None
|
|
# Cancellation support
|
|
future: Optional[Future] = field(default=None, repr=False)
|
|
cancel_requested: bool = False
|
|
|
|
|
|
class SubAgentManager:
|
|
"""Manages sub-agent lifecycle with hang detection and auto-restart."""
|
|
|
|
def __init__(
|
|
self,
|
|
idle_timeout_seconds: int = 300, # 5 minutes idle (no progress)
|
|
total_timeout_seconds: int = 900, # 15 minutes total (hard cap)
|
|
):
|
|
"""Initialize manager.
|
|
|
|
Args:
|
|
idle_timeout_seconds: Max time without progress before killing (distinguishes slow from stuck)
|
|
total_timeout_seconds: Absolute maximum runtime (safety net for legitimately slow tasks)
|
|
"""
|
|
self.idle_timeout_seconds = idle_timeout_seconds
|
|
self.total_timeout_seconds = total_timeout_seconds
|
|
self.sub_agents: Dict[str, SubAgentState] = {}
|
|
self._lock = threading.Lock()
|
|
self._watchdog_thread: Optional[threading.Thread] = None
|
|
self._watchdog_running = False
|
|
|
|
def start_watchdog(self) -> None:
|
|
"""Start the watchdog thread that monitors for hung sub-agents."""
|
|
if self._watchdog_running:
|
|
return
|
|
|
|
self._watchdog_running = True
|
|
self._watchdog_thread = threading.Thread(
|
|
target=self._watchdog_loop,
|
|
daemon=True,
|
|
name="SubAgentWatchdog"
|
|
)
|
|
self._watchdog_thread.start()
|
|
logger.info(
|
|
"[SubAgentManager] Watchdog started (idle: %ds, total: %ds)",
|
|
self.idle_timeout_seconds,
|
|
self.total_timeout_seconds
|
|
)
|
|
|
|
def stop_watchdog(self) -> None:
|
|
"""Stop the watchdog thread."""
|
|
self._watchdog_running = False
|
|
if self._watchdog_thread:
|
|
self._watchdog_thread.join(timeout=2)
|
|
|
|
def register_sub_agent(
|
|
self,
|
|
agent_id: str,
|
|
task_description: str
|
|
) -> None:
|
|
"""Register a new sub-agent for monitoring."""
|
|
with self._lock:
|
|
now = time.time()
|
|
self.sub_agents[agent_id] = SubAgentState(
|
|
agent_id=agent_id,
|
|
task_description=task_description,
|
|
started_at=now,
|
|
last_activity=now
|
|
)
|
|
logger.info("[SubAgentManager] STATE[register] id=%s task=%s", agent_id, task_description[:80])
|
|
|
|
def attach_future(self, agent_id: str, future: Future) -> None:
|
|
"""Attach the running Future for a sub-agent so it can be cancelled on timeout."""
|
|
with self._lock:
|
|
if agent_id in self.sub_agents:
|
|
self.sub_agents[agent_id].future = future
|
|
logger.info("[SubAgentManager] STATE[attach_future] id=%s", agent_id)
|
|
|
|
def update_activity(self, agent_id: str, message_count: Optional[int] = None) -> None:
|
|
"""Update last activity timestamp and message count for a sub-agent.
|
|
|
|
Args:
|
|
agent_id: The sub-agent ID
|
|
message_count: Current message count (indicates progress)
|
|
"""
|
|
with self._lock:
|
|
if agent_id in self.sub_agents:
|
|
state = self.sub_agents[agent_id]
|
|
now = time.time()
|
|
|
|
# Update message count if provided
|
|
if message_count is not None:
|
|
state.last_message_count = state.message_count
|
|
state.message_count = message_count
|
|
|
|
# Only update activity timestamp if message count actually increased
|
|
# (this distinguishes active progress from idle heartbeat)
|
|
if message_count > state.last_message_count:
|
|
state.last_activity = now
|
|
else:
|
|
# No message count provided - basic heartbeat
|
|
state.last_activity = now
|
|
|
|
def update_error(self, agent_id: str, error_message: str) -> None:
|
|
"""Track error occurrences for loop detection.
|
|
|
|
Args:
|
|
agent_id: The sub-agent ID
|
|
error_message: The error message that occurred
|
|
"""
|
|
with self._lock:
|
|
if agent_id in self.sub_agents:
|
|
state = self.sub_agents[agent_id]
|
|
|
|
# Increment error count
|
|
state.error_count += 1
|
|
|
|
# Store last error for debugging
|
|
state.last_error = error_message
|
|
|
|
# Log repetitive errors
|
|
if state.error_count > 3:
|
|
logger.warning(
|
|
"[SubAgentManager] Sub-agent %s has %d errors (last: %s...)",
|
|
agent_id, state.error_count, error_message[:80]
|
|
)
|
|
|
|
def mark_complete(
|
|
self,
|
|
agent_id: str,
|
|
result: Optional[str] = None,
|
|
error: Optional[str] = None
|
|
) -> None:
|
|
"""Mark a sub-agent as complete."""
|
|
with self._lock:
|
|
if agent_id in self.sub_agents:
|
|
self.sub_agents[agent_id].is_complete = True
|
|
self.sub_agents[agent_id].result = result
|
|
self.sub_agents[agent_id].error = error
|
|
logger.info("[SubAgentManager] STATE[complete] id=%s success=%s",
|
|
agent_id, error is None)
|
|
|
|
def get_hung_agents(self) -> list:
|
|
"""Get list of sub-agent IDs that appear to be hung.
|
|
|
|
Uses adaptive detection:
|
|
- Idle timeout: No progress (message count unchanged) for idle_timeout_seconds
|
|
- Total timeout: Running longer than total_timeout_seconds regardless of progress
|
|
- Loop detection: Repetitive errors or no message growth
|
|
"""
|
|
now = time.time()
|
|
hung = []
|
|
|
|
with self._lock:
|
|
for agent_id, state in self.sub_agents.items():
|
|
if state.is_complete:
|
|
continue
|
|
|
|
total_runtime = now - state.started_at
|
|
idle_time = now - state.last_activity
|
|
|
|
# Check 1: Total timeout (hard cap)
|
|
if total_runtime > self.total_timeout_seconds:
|
|
hung.append(agent_id)
|
|
logger.warning(
|
|
"[SubAgentManager] Sub-agent exceeded total timeout: %s - %s "
|
|
"(running %.1fs > %ds total limit)",
|
|
agent_id, state.task_description, total_runtime, self.total_timeout_seconds
|
|
)
|
|
continue
|
|
|
|
# Check 2: Idle timeout (no progress)
|
|
if idle_time > self.idle_timeout_seconds:
|
|
hung.append(agent_id)
|
|
logger.warning(
|
|
"[SubAgentManager] Sub-agent idle timeout: %s - %s "
|
|
"(no progress for %.1fs, messages: %d)",
|
|
agent_id, state.task_description, idle_time, state.message_count
|
|
)
|
|
continue
|
|
|
|
# Check 3: Loop detection - high error count
|
|
if state.error_count > 5:
|
|
hung.append(agent_id)
|
|
logger.warning(
|
|
"[SubAgentManager] Sub-agent loop detected: %s - %s "
|
|
"(error count: %d, last error: %s)",
|
|
agent_id, state.task_description, state.error_count,
|
|
state.last_error[:100] if state.last_error else "None"
|
|
)
|
|
continue
|
|
|
|
return hung
|
|
|
|
def cleanup_agent(self, agent_id: str) -> None:
|
|
"""Clean up a hung sub-agent."""
|
|
with self._lock:
|
|
if agent_id in self.sub_agents:
|
|
state = self.sub_agents[agent_id]
|
|
logger.error(
|
|
"[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)",
|
|
agent_id,
|
|
state.task_description,
|
|
time.time() - state.last_activity
|
|
)
|
|
|
|
# Mark as failed and request cancellation
|
|
state.is_complete = True
|
|
state.cancel_requested = True
|
|
if state.future is not None:
|
|
cancelled = state.future.cancel()
|
|
logger.info(
|
|
"[SubAgentManager] STATE[cancel] id=%s cancel_returned=%s (False means already running — thread may linger but caller will unblock)",
|
|
agent_id, cancelled
|
|
)
|
|
total_runtime = time.time() - state.started_at
|
|
idle_time = time.time() - state.last_activity
|
|
|
|
# Build detailed error message based on timeout type
|
|
if total_runtime > self.total_timeout_seconds:
|
|
state.error = (
|
|
f"Total timeout: Exceeded {self.total_timeout_seconds}s limit "
|
|
f"(ran {total_runtime:.1f}s, {state.message_count} messages)"
|
|
)
|
|
elif idle_time > self.idle_timeout_seconds:
|
|
state.error = (
|
|
f"Idle timeout: No progress for {idle_time:.1f}s "
|
|
f"(limit: {self.idle_timeout_seconds}s, {state.message_count} messages)"
|
|
)
|
|
else:
|
|
state.error = (
|
|
f"Loop detected: {state.error_count} errors, "
|
|
f"last: {state.last_error[:100] if state.last_error else 'None'}"
|
|
)
|
|
|
|
def _watchdog_loop(self) -> None:
|
|
"""Watchdog loop that runs in background thread."""
|
|
while self._watchdog_running:
|
|
try:
|
|
hung_agents = self.get_hung_agents()
|
|
for agent_id in hung_agents:
|
|
self.cleanup_agent(agent_id)
|
|
|
|
# Check every 30 seconds
|
|
time.sleep(30)
|
|
|
|
except Exception as e:
|
|
logger.error("[SubAgentManager] Watchdog error: %s", e)
|
|
time.sleep(30)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current status of all sub-agents."""
|
|
now = time.time()
|
|
status = {
|
|
"total": len(self.sub_agents),
|
|
"complete": 0,
|
|
"running": 0,
|
|
"hung": 0,
|
|
"agents": []
|
|
}
|
|
|
|
with self._lock:
|
|
for agent_id, state in self.sub_agents.items():
|
|
agent_status = {
|
|
"id": agent_id,
|
|
"task": state.task_description,
|
|
"runtime": now - state.started_at,
|
|
"idle_time": now - state.last_activity,
|
|
"complete": state.is_complete,
|
|
"has_error": state.error is not None
|
|
}
|
|
|
|
if state.is_complete:
|
|
status["complete"] += 1
|
|
elif ((now - state.last_activity) > self.idle_timeout_seconds or
|
|
(now - state.started_at) > self.total_timeout_seconds):
|
|
status["hung"] += 1
|
|
else:
|
|
status["running"] += 1
|
|
|
|
status["agents"].append(agent_status)
|
|
|
|
return status
|
|
|
|
def clear_completed(self) -> None:
|
|
"""Remove completed sub-agents from tracking."""
|
|
with self._lock:
|
|
completed = [
|
|
agent_id for agent_id, state in self.sub_agents.items()
|
|
if state.is_complete
|
|
]
|
|
for agent_id in completed:
|
|
del self.sub_agents[agent_id]
|
|
|
|
if completed:
|
|
logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))
|