"""AI Agent with Memory and LLM Integration.""" import random import threading import time from typing import Any, List, Optional, Callable from hooks import HooksSystem from llm_interface import LLMInterface from memory_system import MemorySystem from self_healing import SelfHealingSystem from tools import TOOL_DEFINITIONS, execute_tool from sub_agent_manager import SubAgentManager # Maximum number of recent messages to include in LLM context MAX_CONTEXT_MESSAGES = 20 # Optimized for Agent SDK flat-rate subscription # Maximum conversation history entries before pruning MAX_CONVERSATION_HISTORY = 50 # Conservative limit to prevent Agent SDK JSON buffer overflow (1MB max) # Maximum tool execution iterations (generous limit for complex operations like zettelkasten) MAX_TOOL_ITERATIONS = 30 # Allows complex multi-step workflows with auto-linking, hybrid search, etc. class Agent: """AI Agent with memory, LLM, and hooks.""" def __init__( self, provider: str = "claude", workspace_dir: str = "./memory_workspace", is_sub_agent: bool = False, specialist_prompt: Optional[str] = None, ) -> None: self.memory = MemorySystem(workspace_dir) self.llm = LLMInterface(provider) self.hooks = HooksSystem() self.conversation_history: List[dict] = [] self._chat_lock = threading.Lock() self.healing_system = SelfHealingSystem(self.memory, self) self._progress_callback: Optional[Callable[[str], None]] = None self._progress_timer: Optional[threading.Timer] = None # Sub-agent orchestration self.is_sub_agent = is_sub_agent self.specialist_prompt = specialist_prompt self.sub_agent_manager = SubAgentManager(timeout_seconds=300) # 5 min timeout if not is_sub_agent: self.sub_agent_manager.start_watchdog() # Only main agent runs watchdog self.sub_agents: dict = {} # Cache for spawned sub-agents self.agent_id: Optional[str] = None # Set when this is a sub-agent self.memory.sync() if not is_sub_agent: # Only trigger hooks for main agent self.hooks.trigger("agent", "startup", {"workspace_dir": workspace_dir}) def spawn_sub_agent( self, specialist_prompt: str, agent_id: Optional[str] = None, share_memory: bool = True, ) -> 'Agent': """Spawn a sub-agent with specialized system prompt. Args: specialist_prompt: Custom system prompt for the specialist agent_id: Optional ID for caching (reuse same specialist) share_memory: Whether to share memory workspace with main agent Returns: Agent instance configured as a specialist Example: sub = agent.spawn_sub_agent( specialist_prompt="You are a zettelkasten expert. Focus ONLY on note-taking.", agent_id="zettelkasten_processor" ) result = sub.chat("Process my fleeting notes", username="jordan") """ # Check cache if agent_id provided if agent_id and agent_id in self.sub_agents: return self.sub_agents[agent_id] # Create new sub-agent workspace = self.memory.workspace_dir if share_memory else f"{self.memory.workspace_dir}/sub_agents/{agent_id}" sub_agent = Agent( provider=self.llm.provider, workspace_dir=workspace, is_sub_agent=True, specialist_prompt=specialist_prompt, ) # DEFENSIVE: Ensure sub-agent never inherits main event loop # Sub-agents run in dedicated threads with isolated loops sub_agent.llm._event_loop = None # Set agent_id for activity tracking sub_agent.agent_id = agent_id # Cache if ID provided if agent_id: self.sub_agents[agent_id] = sub_agent # Register with sub-agent manager for monitoring if not self.is_sub_agent: self.sub_agent_manager.register_sub_agent(agent_id, specialist_prompt[:100]) return sub_agent def delegate( self, task: str, specialist_prompt: str, username: str = "default", agent_id: Optional[str] = None, max_retries: int = 1, ) -> str: """Delegate a task to a specialist sub-agent with automatic retry on hang.""" # Generate unique agent IDs to prevent caching race conditions in parallel delegations if not agent_id: agent_id = f"sub_{int(time.time()*1000)}_{random.randint(1000,9999)}" else: # Add timestamp to user-provided ID to ensure uniqueness agent_id = f"{agent_id}_{int(time.time()*1000)}" for attempt in range(max_retries + 1): if attempt > 0: print(f"[Agent] Retrying {agent_id} (attempt {attempt+1}/{max_retries+1})") if agent_id and not self.is_sub_agent: self.sub_agent_manager.cleanup_agent(f"{agent_id}_prev") retry_id = f"{agent_id}_r{attempt}" if (agent_id and attempt > 0) else agent_id sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=retry_id) # Heartbeat for activity tracking heartbeat_running = [True] def heartbeat(): while heartbeat_running[0]: if retry_id and not self.is_sub_agent: self.sub_agent_manager.update_activity(retry_id) time.sleep(10) heartbeat_thread = None if retry_id and not self.is_sub_agent: heartbeat_thread = threading.Thread(target=heartbeat, daemon=True) heartbeat_thread.start() try: result = sub_agent.chat(task, username=username) if retry_id and not self.is_sub_agent: self.sub_agent_manager.mark_complete(retry_id, result=result) return result except Exception as e: if retry_id and not self.is_sub_agent: self.sub_agent_manager.mark_complete(retry_id, error=str(e)) # If this was the last attempt, raise the error if attempt >= max_retries: raise # Otherwise, retry will happen in next loop iteration finally: heartbeat_running[0] = False if heartbeat_thread: heartbeat_thread.join(timeout=1) def _get_context_messages(self, max_messages: int) -> List[dict]: """Get recent messages without breaking tool_use/tool_result pairs. Ensures that: 1. A tool_result message always has its preceding tool_use message 2. A tool_use message always has its following tool_result message 3. The first message is never a tool_result without its tool_use """ if len(self.conversation_history) <= max_messages: return list(self.conversation_history) # Start with the most recent messages start_idx = len(self.conversation_history) - max_messages # Track original start_idx before adjustments for end-of-list check original_start_idx = start_idx # Check if we split a tool pair at the start if start_idx > 0: candidate = self.conversation_history[start_idx] # If first message is a tool_result, include the tool_use before it if candidate["role"] == "user" and isinstance(candidate.get("content"), list): if any(isinstance(block, dict) and block.get("type") == "tool_result" for block in candidate["content"]): start_idx -= 1 # Build result slice using adjusted start result = list(self.conversation_history[start_idx:]) # Check if we split a tool pair at the end # Use original_start_idx + max_messages to find end of original slice original_end_idx = original_start_idx + max_messages if original_end_idx < len(self.conversation_history): end_msg = self.conversation_history[original_end_idx - 1] if end_msg["role"] == "assistant" and isinstance(end_msg.get("content"), list): has_tool_use = any( (hasattr(block, "type") and block.type == "tool_use") or (isinstance(block, dict) and block.get("type") == "tool_use") for block in end_msg["content"] ) if has_tool_use: # The tool_result at original_end_idx is already in result # if start_idx was adjusted, so only add if it's not there next_msg = self.conversation_history[original_end_idx] if next_msg not in result: result.append(next_msg) return result def _prune_conversation_history(self) -> None: """Prune conversation history to prevent unbounded growth. Removes oldest messages while preserving tool_use/tool_result pairs. """ if len(self.conversation_history) <= MAX_CONVERSATION_HISTORY: return # Keep the most recent half keep_count = MAX_CONVERSATION_HISTORY // 2 start_idx = len(self.conversation_history) - keep_count # Ensure we don't split a tool pair if start_idx > 0: candidate = self.conversation_history[start_idx] if candidate["role"] == "user" and isinstance(candidate.get("content"), list): if any(isinstance(block, dict) and block.get("type") == "tool_result" for block in candidate["content"]): start_idx -= 1 self.conversation_history = self.conversation_history[start_idx:] def _strip_images_from_history(self) -> None: """Remove image blocks from conversation history to prevent token bloat. Images are huge (tens of thousands of tokens) and rarely needed after the initial response. This prevents old images from polluting context and confusing the agent about which image the user is referring to. """ images_removed = 0 for msg in self.conversation_history: if isinstance(msg.get("content"), list): original_len = len(msg["content"]) # Keep only non-image blocks msg["content"] = [ block for block in msg["content"] if not (isinstance(block, dict) and block.get("type") == "image") ] images_removed += original_len - len(msg["content"]) # If all blocks were images and there's no text, add placeholder if not msg["content"] and msg["role"] == "user": msg["content"] = "[Image was removed from history]" # If content is now a single text block, simplify to string elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text": msg["content"] = msg["content"][0]["text"] if images_removed > 0: print(f"[Agent] Removed {images_removed} image(s) from conversation history") def _prune_old_tool_results(self, keep_recent: int = 10) -> None: """Remove old tool_result blocks to prevent buffer overflow during diagram generation. When creating complex diagrams, each add_element call creates a tool_result. These accumulate quickly and can exceed the Agent SDK's 1MB JSON buffer. We keep only the most recent tool results. """ if len(self.conversation_history) < keep_recent: return tool_results_removed = 0 # Process all but the most recent messages for msg in self.conversation_history[:-keep_recent]: if isinstance(msg.get("content"), list): original_blocks = msg["content"][:] # Remove tool_result blocks but keep text, tool_use, etc. msg["content"] = [ block for block in msg["content"] if not (isinstance(block, dict) and block.get("type") == "tool_result") ] tool_results_removed += len(original_blocks) - len(msg["content"]) # If all blocks were tool_results, add placeholder if not msg["content"] and msg["role"] == "user": msg["content"] = "[Tool results removed from history]" # Simplify single text blocks elif len(msg["content"]) == 1 and isinstance(msg["content"][0], dict) and msg["content"][0].get("type") == "text": msg["content"] = msg["content"][0]["text"] if tool_results_removed > 0: print(f"[Agent] Pruned {tool_results_removed} old tool_result(s) from conversation history") def chat( self, user_message: str, username: str = "default", progress_callback: Optional[Callable[[str], None]] = None, inbound_message: Optional['InboundMessage'] = None ) -> str: """Chat with context from memory and tool use. Thread-safe: uses a lock to prevent concurrent modification of conversation history from multiple threads (e.g., scheduled tasks and live messages). Args: user_message: The user's message username: The user's name (default: "default") progress_callback: Optional callback for sending progress updates during long operations inbound_message: Optional full message object (for file/image handling) """ # Update activity if this is a sub-agent if self.is_sub_agent and self.agent_id: # Find parent agent to update activity # (parent has the sub_agent_manager) # For now, we'll add this in delegate() instead pass # Store the callback for use during the chat self._progress_callback = progress_callback # Handle model switching commands (no lock needed, read-only on history) if user_message.lower().startswith("/model "): model_name = user_message[7:].strip() self.llm.set_model(model_name) return f"Switched to model: {model_name}" elif user_message.lower() == "/sonnet": self.llm.set_model("claude-sonnet-4-5-20250929") return "Switched to Claude Sonnet 4.5 (more capable, higher cost)" elif user_message.lower() == "/haiku": self.llm.set_model("claude-haiku-4-5-20251001") return "Switched to Claude Haiku 4.5 (faster, cheaper)" elif user_message.lower() == "/status": current_model = self.llm.model is_sonnet = "sonnet" in current_model.lower() cache_status = "enabled" if is_sonnet else "disabled (Haiku active)" return ( f"Current model: {current_model}\n" f"Prompt caching: {cache_status}\n" f"Context messages: {MAX_CONTEXT_MESSAGES}\n" f"Memory results: 2\n\n" f"Commands: /sonnet, /haiku, /status" ) with self._chat_lock: try: return self._chat_inner(user_message, username, inbound_message) finally: # Clear the callback when done self._progress_callback = None def _build_system_prompt(self, user_message: str, username: str) -> str: """Build the system prompt with SOUL, user profile, and memory context.""" if self.specialist_prompt: return ( f"{self.specialist_prompt}\n\n" f"You have access to tools for file operations, command execution, " f"web fetching, note-taking, and Google services. " f"Use them to accomplish your specialized task efficiently." ) soul = self.memory.get_soul() user_profile = self.memory.get_user(username) relevant_memory = self.memory.search_hybrid(user_message, max_results=5) memory_lines = [f"- {mem['snippet']}" for mem in relevant_memory] return ( f"{soul}\n\nUser Profile:\n{user_profile}\n\n" f"Relevant Memory:\n" + "\n".join(memory_lines) + f"\n\nYou have access to tools for file operations, command execution, " f"web fetching, note-taking, and Google services (Gmail, Calendar, Contacts). " f"Use them freely to help the user." ) def _prepare_message_content( self, user_message: str, inbound_message: Optional['InboundMessage'] ) -> tuple[Any, bool]: """Prepare message content, embedding images if present. Args: user_message: The text message inbound_message: Optional message object with file metadata Returns: Tuple of (content, has_images) - content: str (text only) or List[Dict] (text + images) - has_images: bool """ import base64 from pathlib import Path if not inbound_message or "files" not in inbound_message.metadata: return user_message, False files = inbound_message.metadata.get("files", []) if not files: return user_message, False # Separate images from documents images = [f for f in files if f.get("mimetype", "").startswith("image/")] documents = [f for f in files if not f.get("mimetype", "").startswith("image/")] # If no images AND no documents, return early if not images and not documents: return user_message, False # Build multi-modal content content_blocks = [] # Add text if user_message.strip(): content_blocks.append({"type": "text", "text": user_message}) # Add images (base64 encoded) for img in images: try: file_path = Path(img["file_path"]) # Check file exists if not file_path.exists(): print(f"[Agent] Image file not found: {file_path}") continue # Check file size (Claude max: 5MB per image) file_size = file_path.stat().st_size if file_size > 5 * 1024 * 1024: # 5MB print(f"[Agent] Image too large ({file_size / 1024 / 1024:.1f}MB): {file_path.name}") content_blocks.append({ "type": "text", "text": f"\n[Image {file_path.name} is too large ({file_size / 1024 / 1024:.1f}MB). Max: 5MB]" }) continue # Read and encode image_data = file_path.read_bytes() base64_data = base64.standard_b64encode(image_data).decode("utf-8") # Validate mimetype mimetype = img["mimetype"] if mimetype not in ["image/jpeg", "image/jpg", "image/png", "image/gif", "image/webp"]: print(f"[Agent] Unsupported image format: {mimetype}") content_blocks.append({ "type": "text", "text": f"\n[Image {file_path.name} has unsupported format: {mimetype}]" }) continue # Verify base64 data is not empty if not base64_data: print(f"[Agent ERROR] Base64 encoding failed for {file_path.name}") continue print(f"[Agent DEBUG] Embedding image: {file_path.name} ({mimetype}, {file_size / 1024:.1f}KB)") print(f"[Agent DEBUG] Base64 encoded: {len(base64_data)} chars, content_blocks count: {len(content_blocks)}") image_block = { "type": "image", "source": { "type": "base64", "media_type": mimetype, "data": base64_data } } content_blocks.append(image_block) print(f"[Agent DEBUG] Image block added to content_blocks (new count: {len(content_blocks)})") except Exception as e: print(f"[Agent] Failed to load image {img.get('file_path')}: {e}") # Add note about documents if documents: doc_list = "\n".join( f"- {d['filename']} at {d['file_path']}" for d in documents ) content_blocks.append({ "type": "text", "text": f"\n\nAttached documents:\n{doc_list}\n(Use read_file tool to access)" }) # Final validation image_block_count = sum(1 for block in content_blocks if block.get("type") == "image") text_block_count = sum(1 for block in content_blocks if block.get("type") == "text") print(f"[Agent DEBUG] Final content_blocks: {image_block_count} image(s), {text_block_count} text block(s)") # Return has_images=True only if there are actual image blocks has_images = image_block_count > 0 return content_blocks, has_images def _chat_inner(self, user_message: str, username: str, inbound_message: Optional['InboundMessage'] = None) -> str: """Inner chat logic, called while holding _chat_lock.""" # Prepare content (may include images) content, has_images = self._prepare_message_content(user_message, inbound_message) # Build system prompt system = self._build_system_prompt(user_message, username) # Enhance prompt for images if has_images: system += ( "\n\nThe user has shared one or more images. " "Analyze the visual content and respond helpfully." ) # Add to conversation history (content may be str or List[Dict]) self.conversation_history.append( {"role": "user", "content": content} ) self._prune_conversation_history() # In Agent SDK mode, query() handles tool calls automatically via MCP. # The tool loop is only needed for Direct API mode. if self.llm.mode == "agent_sdk": return self._chat_agent_sdk(user_message, system) else: return self._chat_direct_api(user_message, system) def _send_progress_update(self, elapsed_seconds: int): """Send a progress update if callback is set.""" if self._progress_callback: messages = [ f"⏳ Still working... ({elapsed_seconds}s elapsed)", f"🔄 Processing your request... ({elapsed_seconds}s)", f"⚙️ Working on it, this might take a moment... ({elapsed_seconds}s)", ] # Rotate through messages message = messages[(elapsed_seconds // 90) % len(messages)] try: self._progress_callback(message) except Exception as e: print(f"[Agent] Failed to send progress update: {e}") def _start_progress_updates(self): """Start periodic progress updates (every 90 seconds).""" def send_update(elapsed: int): self._send_progress_update(elapsed) # Schedule next update self._progress_timer = threading.Timer(90.0, send_update, args=[elapsed + 90]) self._progress_timer.daemon = True self._progress_timer.start() # Send first update after 90 seconds self._progress_timer = threading.Timer(90.0, send_update, args=[90]) self._progress_timer.daemon = True self._progress_timer.start() def _stop_progress_updates(self): """Stop progress updates.""" if self._progress_timer: self._progress_timer.cancel() self._progress_timer = None def _chat_agent_sdk(self, user_message: str, system: str) -> str: """Chat using Agent SDK. Tools are handled automatically by MCP.""" context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES) # DEBUG: Count images in conversation history image_count = 0 for msg in context_messages: if isinstance(msg.get("content"), list): image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image") if image_count > 0: print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context") # Start progress updates self._start_progress_updates() try: # chat_with_tools() in Agent SDK mode returns a string directly. # The SDK handles all tool calls via MCP servers internally. response = self.llm.chat_with_tools( context_messages, tools=[], # Ignored in Agent SDK mode; tools come from MCP system=system, ) except TimeoutError as e: # Use the detailed timeout message from llm_interface.py error_msg = str(e) if str(e) else "⏱️ Task timed out - consider breaking it into smaller steps or using delegate_task." print(f"[Agent] TIMEOUT: {error_msg}") self.healing_system.capture_error( error=e, component="agent.py:_chat_agent_sdk", intent="Calling Agent SDK for chat response (TIMEOUT)", context={ "model": self.llm.model, "message_preview": user_message[:100], "error_type": "timeout", }, ) return error_msg except Exception as e: # Include actual error message for better debugging error_msg = f"Agent SDK error: {e}" print(f"[Agent] {error_msg}") self.healing_system.capture_error( error=e, component="agent.py:_chat_agent_sdk", intent="Calling Agent SDK for chat response", context={ "model": self.llm.model, "message_preview": user_message[:100], }, ) # Return the actual error message instead of generic text return f"Sorry, I encountered an error: {str(e)[:500]}" finally: # Always stop progress updates when done self._stop_progress_updates() # In Agent SDK mode, response is always a string final_response = response if isinstance(response, str) else str(response) if not final_response.strip(): final_response = "(No response generated)" self.conversation_history.append( {"role": "assistant", "content": final_response} ) # Remove images from conversation history to prevent token bloat and confusion self._strip_images_from_history() # Prune old tool results to prevent buffer overflow during diagram generation self._prune_old_tool_results(keep_recent=10) # Write compact summary to memory compact_summary = self.memory.compact_conversation( user_message=user_message, assistant_response=final_response, tools_used=None # SDK handles tools internally; we don't track them here ) self.memory.write_memory(compact_summary, daily=True) return final_response def _chat_direct_api(self, user_message: str, system: str) -> str: """Chat using Direct API with manual tool execution loop.""" max_iterations = MAX_TOOL_ITERATIONS use_caching = "sonnet" in self.llm.model.lower() tools_used = [] for iteration in range(max_iterations): context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES) # DEBUG: Count images in conversation history (only log on first iteration) if iteration == 0: image_count = 0 for msg in context_messages: if isinstance(msg.get("content"), list): image_count += sum(1 for block in msg["content"] if isinstance(block, dict) and block.get("type") == "image") if image_count > 0: print(f"[Agent DEBUG] Sending {len(context_messages)} messages to Claude with {image_count} total image(s) in context") try: response = self.llm.chat_with_tools( context_messages, tools=TOOL_DEFINITIONS, system=system, use_cache=use_caching, ) except Exception as e: error_msg = f"LLM API error: {e}" print(f"[Agent] {error_msg}") self.healing_system.capture_error( error=e, component="agent.py:_chat_direct_api", intent="Calling Direct API for chat response", context={ "model": self.llm.model, "message_preview": user_message[:100], "iteration": iteration, }, ) # Return actual error message instead of generic text return f"Sorry, I encountered an error: {str(e)[:500]}" if response.stop_reason == "end_turn": text_content = [] for block in response.content: if block.type == "text": text_content.append(block.text) final_response = "\n".join(text_content) if not final_response.strip(): final_response = "(No response generated)" self.conversation_history.append( {"role": "assistant", "content": final_response} ) # Remove images from conversation history to prevent token bloat and confusion self._strip_images_from_history() # Prune old tool results to prevent buffer overflow during diagram generation self._prune_old_tool_results(keep_recent=10) compact_summary = self.memory.compact_conversation( user_message=user_message, assistant_response=final_response, tools_used=tools_used if tools_used else None ) self.memory.write_memory(compact_summary, daily=True) return final_response elif response.stop_reason == "tool_use": assistant_content = [] tool_uses = [] for block in response.content: if block.type == "text": assistant_content.append({ "type": "text", "text": block.text }) elif block.type == "tool_use": assistant_content.append({ "type": "tool_use", "id": block.id, "name": block.name, "input": block.input }) tool_uses.append(block) self.conversation_history.append({ "role": "assistant", "content": assistant_content }) tool_results = [] for tool_use in tool_uses: if tool_use.name not in tools_used: tools_used.append(tool_use.name) result = execute_tool(tool_use.name, tool_use.input, healing_system=self.healing_system) if len(result) > 5000: result = result[:5000] + "\n... (output truncated)" print(f"[Tool] {tool_use.name}: {result[:100]}...") tool_results.append({ "type": "tool_result", "tool_use_id": tool_use.id, "content": result }) self.conversation_history.append({ "role": "user", "content": tool_results }) else: return f"Unexpected stop reason: {response.stop_reason}" return "Error: Maximum tool use iterations exceeded" def switch_model(self, provider: str) -> None: """Switch LLM provider.""" self.llm = LLMInterface(provider) def shutdown(self) -> None: """Cleanup and stop background services.""" self.memory.close() self.hooks.trigger("agent", "shutdown", {}) if __name__ == "__main__": agent = Agent(provider="claude") response = agent.chat("What's my current project?", username="alice") print(response)