Files
ajarbot/agent.py

508 lines
20 KiB
Python
Raw Permalink Normal View History

"""AI Agent with Memory and LLM Integration."""
import threading
import time
from typing import List, Optional, Callable
from hooks import HooksSystem
from llm_interface import LLMInterface
from memory_system import MemorySystem
from self_healing import SelfHealingSystem
from tools import TOOL_DEFINITIONS, execute_tool
# Maximum number of recent messages to include in LLM context
MAX_CONTEXT_MESSAGES = 20 # Optimized for Agent SDK flat-rate subscription
# Maximum characters of agent response to store in memory
MEMORY_RESPONSE_PREVIEW_LENGTH = 500 # Store more context for better memory retrieval
# Maximum conversation history entries before pruning
MAX_CONVERSATION_HISTORY = 100 # Higher limit with flat-rate subscription
# Maximum tool execution iterations (generous limit for complex operations like zettelkasten)
MAX_TOOL_ITERATIONS = 30 # Allows complex multi-step workflows with auto-linking, hybrid search, etc.
class Agent:
"""AI Agent with memory, LLM, and hooks."""
def __init__(
self,
provider: str = "claude",
workspace_dir: str = "./memory_workspace",
is_sub_agent: bool = False,
specialist_prompt: Optional[str] = None,
) -> None:
self.memory = MemorySystem(workspace_dir)
self.llm = LLMInterface(provider)
self.hooks = HooksSystem()
self.conversation_history: List[dict] = []
self._chat_lock = threading.Lock()
self.healing_system = SelfHealingSystem(self.memory, self)
self._progress_callback: Optional[Callable[[str], None]] = None
self._progress_timer: Optional[threading.Timer] = None
# Sub-agent orchestration
self.is_sub_agent = is_sub_agent
self.specialist_prompt = specialist_prompt
self.sub_agents: dict = {} # Cache for spawned sub-agents
self.memory.sync()
if not is_sub_agent: # Only trigger hooks for main agent
self.hooks.trigger("agent", "startup", {"workspace_dir": workspace_dir})
def spawn_sub_agent(
self,
specialist_prompt: str,
agent_id: Optional[str] = None,
share_memory: bool = True,
) -> 'Agent':
"""Spawn a sub-agent with specialized system prompt.
Args:
specialist_prompt: Custom system prompt for the specialist
agent_id: Optional ID for caching (reuse same specialist)
share_memory: Whether to share memory workspace with main agent
Returns:
Agent instance configured as a specialist
Example:
sub = agent.spawn_sub_agent(
specialist_prompt="You are a zettelkasten expert. Focus ONLY on note-taking.",
agent_id="zettelkasten_processor"
)
result = sub.chat("Process my fleeting notes", username="jordan")
"""
# Check cache if agent_id provided
if agent_id and agent_id in self.sub_agents:
return self.sub_agents[agent_id]
# Create new sub-agent
workspace = self.memory.workspace_dir if share_memory else f"{self.memory.workspace_dir}/sub_agents/{agent_id}"
sub_agent = Agent(
provider=self.llm.provider,
workspace_dir=workspace,
is_sub_agent=True,
specialist_prompt=specialist_prompt,
)
# Cache if ID provided
if agent_id:
self.sub_agents[agent_id] = sub_agent
return sub_agent
def delegate(
self,
task: str,
specialist_prompt: str,
username: str = "default",
agent_id: Optional[str] = None,
) -> str:
"""Delegate a task to a specialist sub-agent (convenience method).
Args:
task: The task/message to send to the specialist
specialist_prompt: System prompt defining the specialist's role
username: Username for context
agent_id: Optional ID for caching the specialist
Returns:
Response from the specialist
Example:
# One-off delegation
result = agent.delegate(
task="Process my fleeting notes and find connections",
specialist_prompt="You are a zettelkasten expert. Focus on note organization and linking.",
username="jordan"
)
# Cached specialist (reused across multiple calls)
result = agent.delegate(
task="Summarize my emails from today",
specialist_prompt="You are an email analyst. Focus on extracting key information.",
username="jordan",
agent_id="email_analyst"
)
"""
sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=agent_id)
return sub_agent.chat(task, username=username)
def _get_context_messages(self, max_messages: int) -> List[dict]:
"""Get recent messages without breaking tool_use/tool_result pairs.
Ensures that:
1. A tool_result message always has its preceding tool_use message
2. A tool_use message always has its following tool_result message
3. The first message is never a tool_result without its tool_use
"""
if len(self.conversation_history) <= max_messages:
return list(self.conversation_history)
# Start with the most recent messages
start_idx = len(self.conversation_history) - max_messages
# Track original start_idx before adjustments for end-of-list check
original_start_idx = start_idx
# Check if we split a tool pair at the start
if start_idx > 0:
candidate = self.conversation_history[start_idx]
# If first message is a tool_result, include the tool_use before it
if candidate["role"] == "user" and isinstance(candidate.get("content"), list):
if any(isinstance(block, dict) and block.get("type") == "tool_result"
for block in candidate["content"]):
start_idx -= 1
# Build result slice using adjusted start
result = list(self.conversation_history[start_idx:])
# Check if we split a tool pair at the end
# Use original_start_idx + max_messages to find end of original slice
original_end_idx = original_start_idx + max_messages
if original_end_idx < len(self.conversation_history):
end_msg = self.conversation_history[original_end_idx - 1]
if end_msg["role"] == "assistant" and isinstance(end_msg.get("content"), list):
has_tool_use = any(
(hasattr(block, "type") and block.type == "tool_use") or
(isinstance(block, dict) and block.get("type") == "tool_use")
for block in end_msg["content"]
)
if has_tool_use:
# The tool_result at original_end_idx is already in result
# if start_idx was adjusted, so only add if it's not there
next_msg = self.conversation_history[original_end_idx]
if next_msg not in result:
result.append(next_msg)
return result
def _prune_conversation_history(self) -> None:
"""Prune conversation history to prevent unbounded growth.
Removes oldest messages while preserving tool_use/tool_result pairs.
"""
if len(self.conversation_history) <= MAX_CONVERSATION_HISTORY:
return
# Keep the most recent half
keep_count = MAX_CONVERSATION_HISTORY // 2
start_idx = len(self.conversation_history) - keep_count
# Ensure we don't split a tool pair
if start_idx > 0:
candidate = self.conversation_history[start_idx]
if candidate["role"] == "user" and isinstance(candidate.get("content"), list):
if any(isinstance(block, dict) and block.get("type") == "tool_result"
for block in candidate["content"]):
start_idx -= 1
self.conversation_history = self.conversation_history[start_idx:]
def chat(
self,
user_message: str,
username: str = "default",
progress_callback: Optional[Callable[[str], None]] = None
) -> str:
"""Chat with context from memory and tool use.
Thread-safe: uses a lock to prevent concurrent modification of
conversation history from multiple threads (e.g., scheduled tasks
and live messages).
Args:
user_message: The user's message
username: The user's name (default: "default")
progress_callback: Optional callback for sending progress updates during long operations
"""
# Store the callback for use during the chat
self._progress_callback = progress_callback
# Handle model switching commands (no lock needed, read-only on history)
if user_message.lower().startswith("/model "):
model_name = user_message[7:].strip()
self.llm.set_model(model_name)
return f"Switched to model: {model_name}"
elif user_message.lower() == "/sonnet":
self.llm.set_model("claude-sonnet-4-5-20250929")
return "Switched to Claude Sonnet 4.5 (more capable, higher cost)"
elif user_message.lower() == "/haiku":
self.llm.set_model("claude-haiku-4-5-20251001")
return "Switched to Claude Haiku 4.5 (faster, cheaper)"
elif user_message.lower() == "/status":
current_model = self.llm.model
is_sonnet = "sonnet" in current_model.lower()
cache_status = "enabled" if is_sonnet else "disabled (Haiku active)"
return (
f"Current model: {current_model}\n"
f"Prompt caching: {cache_status}\n"
f"Context messages: {MAX_CONTEXT_MESSAGES}\n"
f"Memory results: 2\n\n"
f"Commands: /sonnet, /haiku, /status"
)
with self._chat_lock:
try:
return self._chat_inner(user_message, username)
finally:
# Clear the callback when done
self._progress_callback = None
def _build_system_prompt(self, user_message: str, username: str) -> str:
"""Build the system prompt with SOUL, user profile, and memory context."""
if self.specialist_prompt:
return (
f"{self.specialist_prompt}\n\n"
f"You have access to tools for file operations, command execution, "
f"web fetching, note-taking, and Google services. "
f"Use them to accomplish your specialized task efficiently."
)
soul = self.memory.get_soul()
user_profile = self.memory.get_user(username)
relevant_memory = self.memory.search_hybrid(user_message, max_results=5)
memory_lines = [f"- {mem['snippet']}" for mem in relevant_memory]
return (
f"{soul}\n\nUser Profile:\n{user_profile}\n\n"
f"Relevant Memory:\n" + "\n".join(memory_lines) +
f"\n\nYou have access to tools for file operations, command execution, "
f"web fetching, note-taking, and Google services (Gmail, Calendar, Contacts). "
f"Use them freely to help the user."
)
def _chat_inner(self, user_message: str, username: str) -> str:
"""Inner chat logic, called while holding _chat_lock."""
system = self._build_system_prompt(user_message, username)
self.conversation_history.append(
{"role": "user", "content": user_message}
)
self._prune_conversation_history()
# In Agent SDK mode, query() handles tool calls automatically via MCP.
# The tool loop is only needed for Direct API mode.
if self.llm.mode == "agent_sdk":
return self._chat_agent_sdk(user_message, system)
else:
return self._chat_direct_api(user_message, system)
def _send_progress_update(self, elapsed_seconds: int):
"""Send a progress update if callback is set."""
if self._progress_callback:
messages = [
f"⏳ Still working... ({elapsed_seconds}s elapsed)",
f"🔄 Processing your request... ({elapsed_seconds}s)",
f"⚙️ Working on it, this might take a moment... ({elapsed_seconds}s)",
]
# Rotate through messages
message = messages[(elapsed_seconds // 90) % len(messages)]
try:
self._progress_callback(message)
except Exception as e:
print(f"[Agent] Failed to send progress update: {e}")
def _start_progress_updates(self):
"""Start periodic progress updates (every 90 seconds)."""
def send_update(elapsed: int):
self._send_progress_update(elapsed)
# Schedule next update
self._progress_timer = threading.Timer(90.0, send_update, args=[elapsed + 90])
self._progress_timer.daemon = True
self._progress_timer.start()
# Send first update after 90 seconds
self._progress_timer = threading.Timer(90.0, send_update, args=[90])
self._progress_timer.daemon = True
self._progress_timer.start()
def _stop_progress_updates(self):
"""Stop progress updates."""
if self._progress_timer:
self._progress_timer.cancel()
self._progress_timer = None
def _chat_agent_sdk(self, user_message: str, system: str) -> str:
"""Chat using Agent SDK. Tools are handled automatically by MCP."""
context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES)
# Start progress updates
self._start_progress_updates()
try:
# chat_with_tools() in Agent SDK mode returns a string directly.
# The SDK handles all tool calls via MCP servers internally.
response = self.llm.chat_with_tools(
context_messages,
tools=[], # Ignored in Agent SDK mode; tools come from MCP
system=system,
)
except TimeoutError as e:
error_msg = "⏱️ Task timed out after 5 minutes. The task might be too complex - try breaking it into smaller steps."
print(f"[Agent] TIMEOUT: {error_msg}")
self.healing_system.capture_error(
error=e,
component="agent.py:_chat_agent_sdk",
intent="Calling Agent SDK for chat response (TIMEOUT)",
context={
"model": self.llm.model,
"message_preview": user_message[:100],
"error_type": "timeout",
},
)
return error_msg
except Exception as e:
error_msg = f"Agent SDK error: {e}"
print(f"[Agent] {error_msg}")
self.healing_system.capture_error(
error=e,
component="agent.py:_chat_agent_sdk",
intent="Calling Agent SDK for chat response",
context={
"model": self.llm.model,
"message_preview": user_message[:100],
},
)
return "Sorry, I encountered an error communicating with the AI model. Please try again."
finally:
# Always stop progress updates when done
self._stop_progress_updates()
# In Agent SDK mode, response is always a string
final_response = response if isinstance(response, str) else str(response)
if not final_response.strip():
final_response = "(No response generated)"
self.conversation_history.append(
{"role": "assistant", "content": final_response}
)
# Write compact summary to memory
compact_summary = self.memory.compact_conversation(
user_message=user_message,
assistant_response=final_response,
tools_used=None # SDK handles tools internally; we don't track them here
)
self.memory.write_memory(compact_summary, daily=True)
return final_response
def _chat_direct_api(self, user_message: str, system: str) -> str:
"""Chat using Direct API with manual tool execution loop."""
max_iterations = MAX_TOOL_ITERATIONS
use_caching = "sonnet" in self.llm.model.lower()
tools_used = []
for iteration in range(max_iterations):
context_messages = self._get_context_messages(MAX_CONTEXT_MESSAGES)
try:
response = self.llm.chat_with_tools(
context_messages,
tools=TOOL_DEFINITIONS,
system=system,
use_cache=use_caching,
)
except Exception as e:
error_msg = f"LLM API error: {e}"
print(f"[Agent] {error_msg}")
self.healing_system.capture_error(
error=e,
component="agent.py:_chat_direct_api",
intent="Calling Direct API for chat response",
context={
"model": self.llm.model,
"message_preview": user_message[:100],
"iteration": iteration,
},
)
return "Sorry, I encountered an error communicating with the AI model. Please try again."
if response.stop_reason == "end_turn":
text_content = []
for block in response.content:
if block.type == "text":
text_content.append(block.text)
final_response = "\n".join(text_content)
if not final_response.strip():
final_response = "(No response generated)"
self.conversation_history.append(
{"role": "assistant", "content": final_response}
)
compact_summary = self.memory.compact_conversation(
user_message=user_message,
assistant_response=final_response,
tools_used=tools_used if tools_used else None
)
self.memory.write_memory(compact_summary, daily=True)
return final_response
elif response.stop_reason == "tool_use":
assistant_content = []
tool_uses = []
for block in response.content:
if block.type == "text":
assistant_content.append({
"type": "text",
"text": block.text
})
elif block.type == "tool_use":
assistant_content.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input
})
tool_uses.append(block)
self.conversation_history.append({
"role": "assistant",
"content": assistant_content
})
tool_results = []
for tool_use in tool_uses:
if tool_use.name not in tools_used:
tools_used.append(tool_use.name)
result = execute_tool(tool_use.name, tool_use.input, healing_system=self.healing_system)
if len(result) > 5000:
result = result[:5000] + "\n... (output truncated)"
print(f"[Tool] {tool_use.name}: {result[:100]}...")
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
self.conversation_history.append({
"role": "user",
"content": tool_results
})
else:
return f"Unexpected stop reason: {response.stop_reason}"
return "Error: Maximum tool use iterations exceeded"
def switch_model(self, provider: str) -> None:
"""Switch LLM provider."""
self.llm = LLMInterface(provider)
def shutdown(self) -> None:
"""Cleanup and stop background services."""
self.memory.close()
self.hooks.trigger("agent", "shutdown", {})
if __name__ == "__main__":
agent = Agent(provider="claude")
response = agent.chat("What's my current project?", username="alice")
print(response)