Files
ajarbot/sub_agent_manager.py
Jordan Ramos 916f86725d feat: RSO observation system, child safety, Discord adapter, Telegram watchdog, email attachments
Core agent improvements:
- RSO (Relevance Scoring & Observation) system: interaction_logger, memory_scorer, signal_detector
- Memory access logging (memory_access_log table) for relevance scoring; high-signal turn detection
- Rich conversation storage for notable turns; compact_conversation truncates long user messages
- Task-type classifier (query/action/analysis/creative) for observation tagging
- Nested sub-agent visibility: deep delegations now register against the main agent's manager

Child safety (Gabriel profile):
- child_safety.py: filtering, audit logging, prompt constants for restricted sessions
- .kiro/specs/child-safety-profile: requirements, design, tasks specs
- GABRIEL_BOT_PROPOSAL.md: initial proposal doc
- Reduced context window (10 msgs) and tutor-mode identity for restricted users

Telegram adapter:
- Polling watchdog: auto-restarts updater if polling drops unexpectedly
- get_me() with exponential-backoff retry on NetworkError at startup
- Correct stop() ordering: signal watchdog before cancelling tasks

Email / Gmail:
- send_email: supports file attachments (attachments list param)
- get_email: surfaces attachment metadata in response

Scheduled tasks / weather:
- Remove OpenWeatherMap API calls from morning-weather task; use wttr.in exclusively
- New scheduled tasks and scheduler state persistence

Discord:
- adapters/discord/__init__.py scaffold
- discord-plugin: MCP plugin for Claude Code Discord integration (server.ts, skills, config)

Infrastructure:
- n8n workflow exports (garvis_webhook, content_pipeline variants)
- memory_workspace: context, homelab-repo-updates, weekly observation summaries, error logs
- UCS C240 migration plan doc
- requirements.txt: new deps
- .claude/settings.json, fix_hooks.py: hook/permission tuning
2026-04-23 07:54:01 -06:00

326 lines
12 KiB
Python

"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle.
Handles:
- Sub-agent spawning and tracking
- Progress monitoring and hang detection
- Automatic cleanup and restart on timeout
"""
import time
import threading
from concurrent.futures import Future
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from logging_config import StructuredLogger as _StructuredLogger
# Use the project's structured logger so watchdog/state lines go to ajarbot.log.
logger = _StructuredLogger("sub_agent_manager").logger
@dataclass
class SubAgentState:
"""Track state of a running sub-agent."""
agent_id: str
task_description: str
started_at: float
last_activity: float
is_complete: bool = False
result: Optional[str] = None
error: Optional[str] = None
# Loop detection fields
message_count: int = 0
last_message_count: int = 0
error_count: int = 0
last_error: Optional[str] = None
# Cancellation support
future: Optional[Future] = field(default=None, repr=False)
cancel_requested: bool = False
class SubAgentManager:
"""Manages sub-agent lifecycle with hang detection and auto-restart."""
def __init__(
self,
idle_timeout_seconds: int = 300, # 5 minutes idle (no progress)
total_timeout_seconds: int = 900, # 15 minutes total (hard cap)
):
"""Initialize manager.
Args:
idle_timeout_seconds: Max time without progress before killing (distinguishes slow from stuck)
total_timeout_seconds: Absolute maximum runtime (safety net for legitimately slow tasks)
"""
self.idle_timeout_seconds = idle_timeout_seconds
self.total_timeout_seconds = total_timeout_seconds
self.sub_agents: Dict[str, SubAgentState] = {}
self._lock = threading.Lock()
self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_running = False
def start_watchdog(self) -> None:
"""Start the watchdog thread that monitors for hung sub-agents."""
if self._watchdog_running:
return
self._watchdog_running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog_loop,
daemon=True,
name="SubAgentWatchdog"
)
self._watchdog_thread.start()
logger.info(
"[SubAgentManager] Watchdog started (idle: %ds, total: %ds)",
self.idle_timeout_seconds,
self.total_timeout_seconds
)
def stop_watchdog(self) -> None:
"""Stop the watchdog thread."""
self._watchdog_running = False
if self._watchdog_thread:
self._watchdog_thread.join(timeout=2)
def register_sub_agent(
self,
agent_id: str,
task_description: str
) -> None:
"""Register a new sub-agent for monitoring."""
with self._lock:
now = time.time()
self.sub_agents[agent_id] = SubAgentState(
agent_id=agent_id,
task_description=task_description,
started_at=now,
last_activity=now
)
logger.info("[SubAgentManager] STATE[register] id=%s task=%s", agent_id, task_description[:80])
def attach_future(self, agent_id: str, future: Future) -> None:
"""Attach the running Future for a sub-agent so it can be cancelled on timeout."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].future = future
logger.info("[SubAgentManager] STATE[attach_future] id=%s", agent_id)
def update_activity(self, agent_id: str, message_count: Optional[int] = None) -> None:
"""Update last activity timestamp and message count for a sub-agent.
Args:
agent_id: The sub-agent ID
message_count: Current message count (indicates progress)
"""
with self._lock:
if agent_id in self.sub_agents:
state = self.sub_agents[agent_id]
now = time.time()
# Update message count if provided
if message_count is not None:
state.last_message_count = state.message_count
state.message_count = message_count
# Only update activity timestamp if message count actually increased
# (this distinguishes active progress from idle heartbeat)
if message_count > state.last_message_count:
state.last_activity = now
else:
# No message count provided - basic heartbeat
state.last_activity = now
def update_error(self, agent_id: str, error_message: str) -> None:
"""Track error occurrences for loop detection.
Args:
agent_id: The sub-agent ID
error_message: The error message that occurred
"""
with self._lock:
if agent_id in self.sub_agents:
state = self.sub_agents[agent_id]
# Increment error count
state.error_count += 1
# Store last error for debugging
state.last_error = error_message
# Log repetitive errors
if state.error_count > 3:
logger.warning(
"[SubAgentManager] Sub-agent %s has %d errors (last: %s...)",
agent_id, state.error_count, error_message[:80]
)
def mark_complete(
self,
agent_id: str,
result: Optional[str] = None,
error: Optional[str] = None
) -> None:
"""Mark a sub-agent as complete."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].is_complete = True
self.sub_agents[agent_id].result = result
self.sub_agents[agent_id].error = error
logger.info("[SubAgentManager] STATE[complete] id=%s success=%s",
agent_id, error is None)
def get_hung_agents(self) -> list:
"""Get list of sub-agent IDs that appear to be hung.
Uses adaptive detection:
- Idle timeout: No progress (message count unchanged) for idle_timeout_seconds
- Total timeout: Running longer than total_timeout_seconds regardless of progress
- Loop detection: Repetitive errors or no message growth
"""
now = time.time()
hung = []
with self._lock:
for agent_id, state in self.sub_agents.items():
if state.is_complete:
continue
total_runtime = now - state.started_at
idle_time = now - state.last_activity
# Check 1: Total timeout (hard cap)
if total_runtime > self.total_timeout_seconds:
hung.append(agent_id)
logger.warning(
"[SubAgentManager] Sub-agent exceeded total timeout: %s - %s "
"(running %.1fs > %ds total limit)",
agent_id, state.task_description, total_runtime, self.total_timeout_seconds
)
continue
# Check 2: Idle timeout (no progress)
if idle_time > self.idle_timeout_seconds:
hung.append(agent_id)
logger.warning(
"[SubAgentManager] Sub-agent idle timeout: %s - %s "
"(no progress for %.1fs, messages: %d)",
agent_id, state.task_description, idle_time, state.message_count
)
continue
# Check 3: Loop detection - high error count
if state.error_count > 5:
hung.append(agent_id)
logger.warning(
"[SubAgentManager] Sub-agent loop detected: %s - %s "
"(error count: %d, last error: %s)",
agent_id, state.task_description, state.error_count,
state.last_error[:100] if state.last_error else "None"
)
continue
return hung
def cleanup_agent(self, agent_id: str) -> None:
"""Clean up a hung sub-agent."""
with self._lock:
if agent_id in self.sub_agents:
state = self.sub_agents[agent_id]
logger.error(
"[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)",
agent_id,
state.task_description,
time.time() - state.last_activity
)
# Mark as failed and request cancellation
state.is_complete = True
state.cancel_requested = True
if state.future is not None:
cancelled = state.future.cancel()
logger.info(
"[SubAgentManager] STATE[cancel] id=%s cancel_returned=%s (False means already running — thread may linger but caller will unblock)",
agent_id, cancelled
)
total_runtime = time.time() - state.started_at
idle_time = time.time() - state.last_activity
# Build detailed error message based on timeout type
if total_runtime > self.total_timeout_seconds:
state.error = (
f"Total timeout: Exceeded {self.total_timeout_seconds}s limit "
f"(ran {total_runtime:.1f}s, {state.message_count} messages)"
)
elif idle_time > self.idle_timeout_seconds:
state.error = (
f"Idle timeout: No progress for {idle_time:.1f}s "
f"(limit: {self.idle_timeout_seconds}s, {state.message_count} messages)"
)
else:
state.error = (
f"Loop detected: {state.error_count} errors, "
f"last: {state.last_error[:100] if state.last_error else 'None'}"
)
def _watchdog_loop(self) -> None:
"""Watchdog loop that runs in background thread."""
while self._watchdog_running:
try:
hung_agents = self.get_hung_agents()
for agent_id in hung_agents:
self.cleanup_agent(agent_id)
# Check every 30 seconds
time.sleep(30)
except Exception as e:
logger.error("[SubAgentManager] Watchdog error: %s", e)
time.sleep(30)
def get_status(self) -> Dict[str, Any]:
"""Get current status of all sub-agents."""
now = time.time()
status = {
"total": len(self.sub_agents),
"complete": 0,
"running": 0,
"hung": 0,
"agents": []
}
with self._lock:
for agent_id, state in self.sub_agents.items():
agent_status = {
"id": agent_id,
"task": state.task_description,
"runtime": now - state.started_at,
"idle_time": now - state.last_activity,
"complete": state.is_complete,
"has_error": state.error is not None
}
if state.is_complete:
status["complete"] += 1
elif ((now - state.last_activity) > self.idle_timeout_seconds or
(now - state.started_at) > self.total_timeout_seconds):
status["hung"] += 1
else:
status["running"] += 1
status["agents"].append(agent_status)
return status
def clear_completed(self) -> None:
"""Remove completed sub-agents from tracking."""
with self._lock:
completed = [
agent_id for agent_id, state in self.sub_agents.items()
if state.is_complete
]
for agent_id in completed:
del self.sub_agents[agent_id]
if completed:
logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))