diff --git a/agent.py b/agent.py index f519e7b..031b2b9 100644 --- a/agent.py +++ b/agent.py @@ -1,18 +1,20 @@ """AI Agent with Memory and LLM Integration.""" import threading -from typing import List, Optional, Callable +import time +from typing import Any, List, Optional, Callable from hooks import HooksSystem from llm_interface import LLMInterface from memory_system import MemorySystem from self_healing import SelfHealingSystem from tools import TOOL_DEFINITIONS, execute_tool +from sub_agent_manager import SubAgentManager # Maximum number of recent messages to include in LLM context MAX_CONTEXT_MESSAGES = 20 # Optimized for Agent SDK flat-rate subscription # Maximum conversation history entries before pruning -MAX_CONVERSATION_HISTORY = 100 # Higher limit with flat-rate subscription +MAX_CONVERSATION_HISTORY = 50 # Conservative limit to prevent Agent SDK JSON buffer overflow (1MB max) # Maximum tool execution iterations (generous limit for complex operations like zettelkasten) MAX_TOOL_ITERATIONS = 30 # Allows complex multi-step workflows with auto-linking, hybrid search, etc. @@ -39,7 +41,11 @@ class Agent: # Sub-agent orchestration self.is_sub_agent = is_sub_agent self.specialist_prompt = specialist_prompt + self.sub_agent_manager = SubAgentManager(timeout_seconds=300) # 5 min timeout + if not is_sub_agent: + self.sub_agent_manager.start_watchdog() # Only main agent runs watchdog self.sub_agents: dict = {} # Cache for spawned sub-agents + self.agent_id: Optional[str] = None # Set when this is a sub-agent self.memory.sync() if not is_sub_agent: # Only trigger hooks for main agent @@ -80,10 +86,16 @@ class Agent: is_sub_agent=True, specialist_prompt=specialist_prompt, ) + + # Set agent_id for activity tracking + sub_agent.agent_id = agent_id # Cache if ID provided if agent_id: self.sub_agents[agent_id] = sub_agent + # Register with sub-agent manager for monitoring + if not self.is_sub_agent: + self.sub_agent_manager.register_sub_agent(agent_id, specialist_prompt[:100]) return sub_agent @@ -93,36 +105,48 @@ class Agent: specialist_prompt: str, username: str = "default", agent_id: Optional[str] = None, + max_retries: int = 1, ) -> str: - """Delegate a task to a specialist sub-agent (convenience method). - - Args: - task: The task/message to send to the specialist - specialist_prompt: System prompt defining the specialist's role - username: Username for context - agent_id: Optional ID for caching the specialist - - Returns: - Response from the specialist - - Example: - # One-off delegation - result = agent.delegate( - task="Process my fleeting notes and find connections", - specialist_prompt="You are a zettelkasten expert. Focus on note organization and linking.", - username="jordan" - ) - - # Cached specialist (reused across multiple calls) - result = agent.delegate( - task="Summarize my emails from today", - specialist_prompt="You are an email analyst. Focus on extracting key information.", - username="jordan", - agent_id="email_analyst" - ) - """ - sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=agent_id) - return sub_agent.chat(task, username=username) + """Delegate a task to a specialist sub-agent with automatic retry on hang.""" + for attempt in range(max_retries + 1): + if attempt > 0: + print(f"[Agent] Retrying {agent_id} (attempt {attempt+1}/{max_retries+1})") + if agent_id and not self.is_sub_agent: + self.sub_agent_manager.cleanup_agent(f"{agent_id}_prev") + + retry_id = f"{agent_id}_r{attempt}" if (agent_id and attempt > 0) else agent_id + sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=retry_id) + + # Heartbeat for activity tracking + heartbeat_running = [True] + + def heartbeat(): + while heartbeat_running[0]: + if retry_id and not self.is_sub_agent: + self.sub_agent_manager.update_activity(retry_id) + time.sleep(10) + + heartbeat_thread = None + if retry_id and not self.is_sub_agent: + heartbeat_thread = threading.Thread(target=heartbeat, daemon=True) + heartbeat_thread.start() + + try: + result = sub_agent.chat(task, username=username) + if retry_id and not self.is_sub_agent: + self.sub_agent_manager.mark_complete(retry_id, result=result) + return result + except Exception as e: + if retry_id and not self.is_sub_agent: + self.sub_agent_manager.mark_complete(retry_id, error=str(e)) + # If this was the last attempt, raise the error + if attempt >= max_retries: + raise + # Otherwise, retry will happen in next loop iteration + finally: + heartbeat_running[0] = False + if heartbeat_thread: + heartbeat_thread.join(timeout=1) def _get_context_messages(self, max_messages: int) -> List[dict]: """Get recent messages without breaking tool_use/tool_result pairs. @@ -194,11 +218,72 @@ class Agent: self.conversation_history = self.conversation_history[start_idx:] + def _strip_images_from_history(self) -> None: + """Remove image blocks from conversation history to prevent token bloat. + + Images are huge (tens of thousands of tokens) and rarely needed after + the initial response. This prevents old images from polluting context + and confusing the agent about which image the user is referring to. + """ + images_removed = 0 + for msg in self.conversation_history: + if isinstance(msg.get("content"), list): + original_len = len(msg["content"]) + # Keep only non-image blocks + msg["content"] = [ + block for block in msg["content"] + if not (isinstance(block, dict) and block.get("type") == "image") + ] + images_removed += original_len - len(msg["content"]) + + # If all blocks were images and there's no text, add placeholder + if not msg["content"] and msg["role"] == "user": + msg["content"] = "[Image was removed from history]" + # If content is now a single text block, simplify to string + elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text": + msg["content"] = msg["content"][0]["text"] + + if images_removed > 0: + print(f"[Agent] Removed {images_removed} image(s) from conversation history") + + def _prune_old_tool_results(self, keep_recent: int = 10) -> None: + """Remove old tool_result blocks to prevent buffer overflow during diagram generation. + + When creating complex diagrams, each add_element call creates a tool_result. + These accumulate quickly and can exceed the Agent SDK's 1MB JSON buffer. + We keep only the most recent tool results. + """ + if len(self.conversation_history) < keep_recent: + return + + tool_results_removed = 0 + # Process all but the most recent messages + for msg in self.conversation_history[:-keep_recent]: + if isinstance(msg.get("content"), list): + original_blocks = msg["content"][:] + # Remove tool_result blocks but keep text, tool_use, etc. + msg["content"] = [ + block for block in msg["content"] + if not (isinstance(block, dict) and block.get("type") == "tool_result") + ] + tool_results_removed += len(original_blocks) - len(msg["content"]) + + # If all blocks were tool_results, add placeholder + if not msg["content"] and msg["role"] == "user": + msg["content"] = "[Tool results removed from history]" + # Simplify single text blocks + elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text": + msg["content"] = msg["content"][0]["text"] + + if tool_results_removed > 0: + print(f"[Agent] Pruned {tool_results_removed} old tool_result(s) from conversation history") + def chat( self, user_message: str, username: str = "default", - progress_callback: Optional[Callable[[str], None]] = None + progress_callback: Optional[Callable[[str], None]] = None, + inbound_message: Optional['InboundMessage'] = None ) -> str: """Chat with context from memory and tool use. @@ -210,7 +295,15 @@ class Agent: user_message: The user's message username: The user's name (default: "default") progress_callback: Optional callback for sending progress updates during long operations + inbound_message: Optional full message object (for file/image handling) """ + # Update activity if this is a sub-agent + if self.is_sub_agent and self.agent_id: + # Find parent agent to update activity + # (parent has the sub_agent_manager) + # For now, we'll add this in delegate() instead + pass + # Store the callback for use during the chat self._progress_callback = progress_callback @@ -239,7 +332,7 @@ class Agent: with self._chat_lock: try: - return self._chat_inner(user_message, username) + return self._chat_inner(user_message, username, inbound_message) finally: # Clear the callback when done self._progress_callback = None @@ -267,12 +360,140 @@ class Agent: f"Use them freely to help the user." ) - def _chat_inner(self, user_message: str, username: str) -> str: + def _prepare_message_content( + self, + user_message: str, + inbound_message: Optional['InboundMessage'] + ) -> tuple[Any, bool]: + """Prepare message content, embedding images if present. + + Args: + user_message: The text message + inbound_message: Optional message object with file metadata + + Returns: + Tuple of (content, has_images) + - content: str (text only) or List[Dict] (text + images) + - has_images: bool + """ + import base64 + from pathlib import Path + + if not inbound_message or "files" not in inbound_message.metadata: + return user_message, False + + files = inbound_message.metadata.get("files", []) + if not files: + return user_message, False + + # Separate images from documents + images = [f for f in files if f.get("mimetype", "").startswith("image/")] + documents = [f for f in files if not f.get("mimetype", "").startswith("image/")] + + # If no images AND no documents, return early + if not images and not documents: + return user_message, False + + # Build multi-modal content + content_blocks = [] + + # Add text + if user_message.strip(): + content_blocks.append({"type": "text", "text": user_message}) + + # Add images (base64 encoded) + for img in images: + try: + file_path = Path(img["file_path"]) + + # Check file exists + if not file_path.exists(): + print(f"[Agent] Image file not found: {file_path}") + continue + + # Check file size (Claude max: 5MB per image) + file_size = file_path.stat().st_size + if file_size > 5 * 1024 * 1024: # 5MB + print(f"[Agent] Image too large ({file_size / 1024 / 1024:.1f}MB): {file_path.name}") + content_blocks.append({ + "type": "text", + "text": f"\n[Image {file_path.name} is too large ({file_size / 1024 / 1024:.1f}MB). Max: 5MB]" + }) + continue + + # Read and encode + image_data = file_path.read_bytes() + base64_data = base64.standard_b64encode(image_data).decode("utf-8") + + # Validate mimetype + mimetype = img["mimetype"] + if mimetype not in ["image/jpeg", "image/jpg", "image/png", "image/gif", "image/webp"]: + print(f"[Agent] Unsupported image format: {mimetype}") + content_blocks.append({ + "type": "text", + "text": f"\n[Image {file_path.name} has unsupported format: {mimetype}]" + }) + continue + + # Verify base64 data is not empty + if not base64_data: + print(f"[Agent ERROR] Base64 encoding failed for {file_path.name}") + continue + + print(f"[Agent DEBUG] Embedding image: {file_path.name} ({mimetype}, {file_size / 1024:.1f}KB)") + print(f"[Agent DEBUG] Base64 encoded: {len(base64_data)} chars, content_blocks count: {len(content_blocks)}") + + image_block = { + "type": "image", + "source": { + "type": "base64", + "media_type": mimetype, + "data": base64_data + } + } + content_blocks.append(image_block) + print(f"[Agent DEBUG] Image block added to content_blocks (new count: {len(content_blocks)})") + except Exception as e: + print(f"[Agent] Failed to load image {img.get('file_path')}: {e}") + + # Add note about documents + if documents: + doc_list = "\n".join( + f"- {d['filename']} at {d['file_path']}" + for d in documents + ) + content_blocks.append({ + "type": "text", + "text": f"\n\nAttached documents:\n{doc_list}\n(Use read_file tool to access)" + }) + + # Final validation + image_block_count = sum(1 for block in content_blocks if block.get("type") == "image") + text_block_count = sum(1 for block in content_blocks if block.get("type") == "text") + print(f"[Agent DEBUG] Final content_blocks: {image_block_count} image(s), {text_block_count} text block(s)") + + # Return has_images=True only if there are actual image blocks + has_images = image_block_count > 0 + return content_blocks, has_images + + def _chat_inner(self, user_message: str, username: str, inbound_message: Optional['InboundMessage'] = None) -> str: """Inner chat logic, called while holding _chat_lock.""" + # Prepare content (may include images) + content, has_images = self._prepare_message_content(user_message, inbound_message) + + # Build system prompt system = self._build_system_prompt(user_message, username) + # Enhance prompt for images + if has_images: + system += ( + "\n\nThe user has shared one or more images. " + "Analyze the visual content and respond helpfully." + ) + + # Add to conversation history (content may be str or List[Dict]) self.conversation_history.append( - {"role": "user", "content": user_message} + {"role": "user", "content": content} ) self._prune_conversation_history() @@ -323,6 +544,14 @@ class Agent: """Chat using Agent SDK. Tools are handled automatically by MCP.""" context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES) + # DEBUG: Count images in conversation history + image_count = 0 + for msg in context_messages: + if isinstance(msg.get("content"), list): + image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image") + if image_count > 0: + print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context") + # Start progress updates self._start_progress_updates() @@ -335,7 +564,7 @@ class Agent: system=system, ) except TimeoutError as e: - error_msg = "⏱️ Task timed out after 5 minutes. The task might be too complex - try breaking it into smaller steps." + error_msg = "⏱️ Task timed out after 20 minutes. This is a very large task - consider breaking it into smaller steps or delegating to a background sub-agent." print(f"[Agent] TIMEOUT: {error_msg}") self.healing_system.capture_error( error=e, @@ -375,6 +604,12 @@ class Agent: {"role": "assistant", "content": final_response} ) + # Remove images from conversation history to prevent token bloat and confusion + self._strip_images_from_history() + + # Prune old tool results to prevent buffer overflow during diagram generation + self._prune_old_tool_results(keep_recent=10) + # Write compact summary to memory compact_summary = self.memory.compact_conversation( user_message=user_message, @@ -394,6 +629,15 @@ class Agent: for iteration in range(max_iterations): context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES) + # DEBUG: Count images in conversation history (only log on first iteration) + if iteration == 0: + image_count = 0 + for msg in context_messages: + if isinstance(msg.get("content"), list): + image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image") + if image_count > 0: + print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context") + try: response = self.llm.chat_with_tools( context_messages, @@ -431,6 +675,12 @@ class Agent: {"role": "assistant", "content": final_response} ) + # Remove images from conversation history to prevent token bloat and confusion + self._strip_images_from_history() + + # Prune old tool results to prevent buffer overflow during diagram generation + self._prune_old_tool_results(keep_recent=10) + compact_summary = self.memory.compact_conversation( user_message=user_message, assistant_response=final_response,