Fix spawn_sub_agent() bug - add missing registration and return

- Add sub_agent_manager.register_sub_agent() call when agent_id provided
- Add missing return statement (method was returning None)
- Fixes watchdog tracking for when delegation is implemented

Bug found during investigation of why watchdog didn't engage during
parallel task test. Root cause was no MCP tool for delegation, but
this bug would have prevented tracking even if delegation worked.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 10:46:43 -07:00
parent eb0f543366
commit 6eafd758c9

314
agent.py
View File

@@ -1,18 +1,20 @@
"""AI Agent with Memory and LLM Integration."""
import threading
from typing import List, Optional, Callable
import time
from typing import Any, List, Optional, Callable
from hooks import HooksSystem
from llm_interface import LLMInterface
from memory_system import MemorySystem
from self_healing import SelfHealingSystem
from tools import TOOL_DEFINITIONS, execute_tool
from sub_agent_manager import SubAgentManager
# Maximum number of recent messages to include in LLM context
MAX_CONTEXT_MESSAGES = 20 # Optimized for Agent SDK flat-rate subscription
# Maximum conversation history entries before pruning
MAX_CONVERSATION_HISTORY = 100 # Higher limit with flat-rate subscription
MAX_CONVERSATION_HISTORY = 50 # Conservative limit to prevent Agent SDK JSON buffer overflow (1MB max)
# Maximum tool execution iterations (generous limit for complex operations like zettelkasten)
MAX_TOOL_ITERATIONS = 30 # Allows complex multi-step workflows with auto-linking, hybrid search, etc.
@@ -39,7 +41,11 @@ class Agent:
# Sub-agent orchestration
self.is_sub_agent = is_sub_agent
self.specialist_prompt = specialist_prompt
self.sub_agent_manager = SubAgentManager(timeout_seconds=300) # 5 min timeout
if not is_sub_agent:
self.sub_agent_manager.start_watchdog() # Only main agent runs watchdog
self.sub_agents: dict = {} # Cache for spawned sub-agents
self.agent_id: Optional[str] = None # Set when this is a sub-agent
self.memory.sync()
if not is_sub_agent: # Only trigger hooks for main agent
@@ -81,9 +87,15 @@ class Agent:
specialist_prompt=specialist_prompt,
)
# Set agent_id for activity tracking
sub_agent.agent_id = agent_id
# Cache if ID provided
if agent_id:
self.sub_agents[agent_id] = sub_agent
# Register with sub-agent manager for monitoring
if not self.is_sub_agent:
self.sub_agent_manager.register_sub_agent(agent_id, specialist_prompt[:100])
return sub_agent
@@ -93,36 +105,48 @@ class Agent:
specialist_prompt: str,
username: str = "default",
agent_id: Optional[str] = None,
max_retries: int = 1,
) -> str:
"""Delegate a task to a specialist sub-agent (convenience method).
"""Delegate a task to a specialist sub-agent with automatic retry on hang."""
for attempt in range(max_retries + 1):
if attempt > 0:
print(f"[Agent] Retrying {agent_id} (attempt {attempt+1}/{max_retries+1})")
if agent_id and not self.is_sub_agent:
self.sub_agent_manager.cleanup_agent(f"{agent_id}_prev")
Args:
task: The task/message to send to the specialist
specialist_prompt: System prompt defining the specialist's role
username: Username for context
agent_id: Optional ID for caching the specialist
retry_id = f"{agent_id}_r{attempt}" if (agent_id and attempt > 0) else agent_id
sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=retry_id)
Returns:
Response from the specialist
# Heartbeat for activity tracking
heartbeat_running = [True]
Example:
# One-off delegation
result = agent.delegate(
task="Process my fleeting notes and find connections",
specialist_prompt="You are a zettelkasten expert. Focus on note organization and linking.",
username="jordan"
)
def heartbeat():
while heartbeat_running[0]:
if retry_id and not self.is_sub_agent:
self.sub_agent_manager.update_activity(retry_id)
time.sleep(10)
# Cached specialist (reused across multiple calls)
result = agent.delegate(
task="Summarize my emails from today",
specialist_prompt="You are an email analyst. Focus on extracting key information.",
username="jordan",
agent_id="email_analyst"
)
"""
sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=agent_id)
return sub_agent.chat(task, username=username)
heartbeat_thread = None
if retry_id and not self.is_sub_agent:
heartbeat_thread = threading.Thread(target=heartbeat, daemon=True)
heartbeat_thread.start()
try:
result = sub_agent.chat(task, username=username)
if retry_id and not self.is_sub_agent:
self.sub_agent_manager.mark_complete(retry_id, result=result)
return result
except Exception as e:
if retry_id and not self.is_sub_agent:
self.sub_agent_manager.mark_complete(retry_id, error=str(e))
# If this was the last attempt, raise the error
if attempt >= max_retries:
raise
# Otherwise, retry will happen in next loop iteration
finally:
heartbeat_running[0] = False
if heartbeat_thread:
heartbeat_thread.join(timeout=1)
def _get_context_messages(self, max_messages: int) -> List[dict]:
"""Get recent messages without breaking tool_use/tool_result pairs.
@@ -194,11 +218,72 @@ class Agent:
self.conversation_history = self.conversation_history[start_idx:]
def _strip_images_from_history(self) -> None:
"""Remove image blocks from conversation history to prevent token bloat.
Images are huge (tens of thousands of tokens) and rarely needed after
the initial response. This prevents old images from polluting context
and confusing the agent about which image the user is referring to.
"""
images_removed = 0
for msg in self.conversation_history:
if isinstance(msg.get("content"), list):
original_len = len(msg["content"])
# Keep only non-image blocks
msg["content"] = [
block for block in msg["content"]
if not (isinstance(block, dict) and block.get("type") == "image")
]
images_removed += original_len - len(msg["content"])
# If all blocks were images and there's no text, add placeholder
if not msg["content"] and msg["role"] == "user":
msg["content"] = "[Image was removed from history]"
# If content is now a single text block, simplify to string
elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text":
msg["content"] = msg["content"][0]["text"]
if images_removed > 0:
print(f"[Agent] Removed {images_removed} image(s) from conversation history")
def _prune_old_tool_results(self, keep_recent: int = 10) -> None:
"""Remove old tool_result blocks to prevent buffer overflow during diagram generation.
When creating complex diagrams, each add_element call creates a tool_result.
These accumulate quickly and can exceed the Agent SDK's 1MB JSON buffer.
We keep only the most recent tool results.
"""
if len(self.conversation_history) < keep_recent:
return
tool_results_removed = 0
# Process all but the most recent messages
for msg in self.conversation_history[:-keep_recent]:
if isinstance(msg.get("content"), list):
original_blocks = msg["content"][:]
# Remove tool_result blocks but keep text, tool_use, etc.
msg["content"] = [
block for block in msg["content"]
if not (isinstance(block, dict) and block.get("type") == "tool_result")
]
tool_results_removed += len(original_blocks) - len(msg["content"])
# If all blocks were tool_results, add placeholder
if not msg["content"] and msg["role"] == "user":
msg["content"] = "[Tool results removed from history]"
# Simplify single text blocks
elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text":
msg["content"] = msg["content"][0]["text"]
if tool_results_removed > 0:
print(f"[Agent] Pruned {tool_results_removed} old tool_result(s) from conversation history")
def chat(
self,
user_message: str,
username: str = "default",
progress_callback: Optional[Callable[[str], None]] = None
progress_callback: Optional[Callable[[str], None]] = None,
inbound_message: Optional['InboundMessage'] = None
) -> str:
"""Chat with context from memory and tool use.
@@ -210,7 +295,15 @@ class Agent:
user_message: The user's message
username: The user's name (default: "default")
progress_callback: Optional callback for sending progress updates during long operations
inbound_message: Optional full message object (for file/image handling)
"""
# Update activity if this is a sub-agent
if self.is_sub_agent and self.agent_id:
# Find parent agent to update activity
# (parent has the sub_agent_manager)
# For now, we'll add this in delegate() instead
pass
# Store the callback for use during the chat
self._progress_callback = progress_callback
@@ -239,7 +332,7 @@ class Agent:
with self._chat_lock:
try:
return self._chat_inner(user_message, username)
return self._chat_inner(user_message, username, inbound_message)
finally:
# Clear the callback when done
self._progress_callback = None
@@ -267,12 +360,140 @@ class Agent:
f"Use them freely to help the user."
)
def _chat_inner(self, user_message: str, username: str) -> str:
def _prepare_message_content(
self,
user_message: str,
inbound_message: Optional['InboundMessage']
) -> tuple[Any, bool]:
"""Prepare message content, embedding images if present.
Args:
user_message: The text message
inbound_message: Optional message object with file metadata
Returns:
Tuple of (content, has_images)
- content: str (text only) or List[Dict] (text + images)
- has_images: bool
"""
import base64
from pathlib import Path
if not inbound_message or "files" not in inbound_message.metadata:
return user_message, False
files = inbound_message.metadata.get("files", [])
if not files:
return user_message, False
# Separate images from documents
images = [f for f in files if f.get("mimetype", "").startswith("image/")]
documents = [f for f in files if not f.get("mimetype", "").startswith("image/")]
# If no images AND no documents, return early
if not images and not documents:
return user_message, False
# Build multi-modal content
content_blocks = []
# Add text
if user_message.strip():
content_blocks.append({"type": "text", "text": user_message})
# Add images (base64 encoded)
for img in images:
try:
file_path = Path(img["file_path"])
# Check file exists
if not file_path.exists():
print(f"[Agent] Image file not found: {file_path}")
continue
# Check file size (Claude max: 5MB per image)
file_size = file_path.stat().st_size
if file_size > 5 * 1024 * 1024: # 5MB
print(f"[Agent] Image too large ({file_size / 1024 / 1024:.1f}MB): {file_path.name}")
content_blocks.append({
"type": "text",
"text": f"\n[Image {file_path.name} is too large ({file_size / 1024 / 1024:.1f}MB). Max: 5MB]"
})
continue
# Read and encode
image_data = file_path.read_bytes()
base64_data = base64.standard_b64encode(image_data).decode("utf-8")
# Validate mimetype
mimetype = img["mimetype"]
if mimetype not in ["image/jpeg", "image/jpg", "image/png", "image/gif", "image/webp"]:
print(f"[Agent] Unsupported image format: {mimetype}")
content_blocks.append({
"type": "text",
"text": f"\n[Image {file_path.name} has unsupported format: {mimetype}]"
})
continue
# Verify base64 data is not empty
if not base64_data:
print(f"[Agent ERROR] Base64 encoding failed for {file_path.name}")
continue
print(f"[Agent DEBUG] Embedding image: {file_path.name} ({mimetype}, {file_size / 1024:.1f}KB)")
print(f"[Agent DEBUG] Base64 encoded: {len(base64_data)} chars, content_blocks count: {len(content_blocks)}")
image_block = {
"type": "image",
"source": {
"type": "base64",
"media_type": mimetype,
"data": base64_data
}
}
content_blocks.append(image_block)
print(f"[Agent DEBUG] Image block added to content_blocks (new count: {len(content_blocks)})")
except Exception as e:
print(f"[Agent] Failed to load image {img.get('file_path')}: {e}")
# Add note about documents
if documents:
doc_list = "\n".join(
f"- {d['filename']} at {d['file_path']}"
for d in documents
)
content_blocks.append({
"type": "text",
"text": f"\n\nAttached documents:\n{doc_list}\n(Use read_file tool to access)"
})
# Final validation
image_block_count = sum(1 for block in content_blocks if block.get("type") == "image")
text_block_count = sum(1 for block in content_blocks if block.get("type") == "text")
print(f"[Agent DEBUG] Final content_blocks: {image_block_count} image(s), {text_block_count} text block(s)")
# Return has_images=True only if there are actual image blocks
has_images = image_block_count > 0
return content_blocks, has_images
def _chat_inner(self, user_message: str, username: str, inbound_message: Optional['InboundMessage'] = None) -> str:
"""Inner chat logic, called while holding _chat_lock."""
# Prepare content (may include images)
content, has_images = self._prepare_message_content(user_message, inbound_message)
# Build system prompt
system = self._build_system_prompt(user_message, username)
# Enhance prompt for images
if has_images:
system += (
"\n\nThe user has shared one or more images. "
"Analyze the visual content and respond helpfully."
)
# Add to conversation history (content may be str or List[Dict])
self.conversation_history.append(
{"role": "user", "content": user_message}
{"role": "user", "content": content}
)
self._prune_conversation_history()
@@ -323,6 +544,14 @@ class Agent:
"""Chat using Agent SDK. Tools are handled automatically by MCP."""
context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES)
# DEBUG: Count images in conversation history
image_count = 0
for msg in context_messages:
if isinstance(msg.get("content"), list):
image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image")
if image_count > 0:
print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context")
# Start progress updates
self._start_progress_updates()
@@ -335,7 +564,7 @@ class Agent:
system=system,
)
except TimeoutError as e:
error_msg = "⏱️ Task timed out after 5 minutes. The task might be too complex - try breaking it into smaller steps."
error_msg = "⏱️ Task timed out after 20 minutes. This is a very large task - consider breaking it into smaller steps or delegating to a background sub-agent."
print(f"[Agent] TIMEOUT: {error_msg}")
self.healing_system.capture_error(
error=e,
@@ -375,6 +604,12 @@ class Agent:
{"role": "assistant", "content": final_response}
)
# Remove images from conversation history to prevent token bloat and confusion
self._strip_images_from_history()
# Prune old tool results to prevent buffer overflow during diagram generation
self._prune_old_tool_results(keep_recent=10)
# Write compact summary to memory
compact_summary = self.memory.compact_conversation(
user_message=user_message,
@@ -394,6 +629,15 @@ class Agent:
for iteration in range(max_iterations):
context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES)
# DEBUG: Count images in conversation history (only log on first iteration)
if iteration == 0:
image_count = 0
for msg in context_messages:
if isinstance(msg.get("content"), list):
image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image")
if image_count > 0:
print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context")
try:
response = self.llm.chat_with_tools(
context_messages,
@@ -431,6 +675,12 @@ class Agent:
{"role": "assistant", "content": final_response}
)
# Remove images from conversation history to prevent token bloat and confusion
self._strip_images_from_history()
# Prune old tool results to prevent buffer overflow during diagram generation
self._prune_old_tool_results(keep_recent=10)
compact_summary = self.memory.compact_conversation(
user_message=user_message,
assistant_response=final_response,