From 50cf7165cb28caf236620abcacca42a200802db1 Mon Sep 17 00:00:00 2001 From: Jordan Ramos Date: Mon, 16 Feb 2026 07:43:31 -0700 Subject: [PATCH] Add sub-agent orchestration, MCP tools, and critical bug fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major Features: - Sub-agent orchestration system with dynamic specialist spawning * spawn_sub_agent(): Create specialists with custom prompts * delegate(): Convenience method for task delegation * Cached specialists for reuse * Separate conversation histories and focused context - MCP (Model Context Protocol) tool integration * Zettelkasten: fleeting_note, daily_note, permanent_note, literature_note * Search: search_vault (hybrid search), search_by_tags * Web: web_fetch for real-time data * Zero-cost file/system operations on Pro subscription Critical Bug Fixes: - Fixed max tool iterations (15 → 30, configurable) - Fixed max_tokens error in Agent SDK query() call - Fixed MCP tool routing in execute_tool() * Routes zettelkasten + web tools to async handlers * Prevents "Unknown tool" errors Documentation: - SUB_AGENTS.md: Complete guide to sub-agent system - MCP_MIGRATION.md: Agent SDK migration details - SOUL.example.md: Sanitized bot identity template - scheduled_tasks.example.yaml: Sanitized task config template Security: - Added obsidian vault to .gitignore - Protected SOUL.md and MEMORY.md (personal configs) - Sanitized example configs with placeholders Dependencies: - Added beautifulsoup4, httpx, lxml for web scraping - Updated requirements.txt Co-Authored-By: Claude Sonnet 4.5 --- .gitignore | 3 + MCP_MIGRATION.md | 152 ++++ SUB_AGENTS.md | 205 ++++++ agent.py | 127 +++- config/scheduled_tasks.example.yaml | 102 +-- examples/sub_agent_example.py | 173 +++++ llm_interface.py | 149 +++- mcp_tools.py | 1054 +++++++++++++++++++++++++++ memory_workspace/SOUL.example.md | 79 ++ requirements.txt | 4 + tools.py | 42 +- 11 files changed, 1987 insertions(+), 103 deletions(-) create mode 100644 MCP_MIGRATION.md create mode 100644 SUB_AGENTS.md create mode 100644 examples/sub_agent_example.py create mode 100644 mcp_tools.py create mode 100644 memory_workspace/SOUL.example.md diff --git a/.gitignore b/.gitignore index 5ecf925..44c46d8 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,9 @@ memory_workspace/memory/*.md memory_workspace/memory_index.db memory_workspace/users/*.md # User profiles (jordan.md, etc.) memory_workspace/vectors.usearch +memory_workspace/obsidian/ # Zettelkasten vault (personal notes) +memory_workspace/SOUL.md # Personal config (use SOUL.example.md) +memory_workspace/MEMORY.md # Personal memory (use MEMORY.example.md) # User profiles (personal info) users/ diff --git a/MCP_MIGRATION.md b/MCP_MIGRATION.md new file mode 100644 index 0000000..a020813 --- /dev/null +++ b/MCP_MIGRATION.md @@ -0,0 +1,152 @@ +# MCP Tools Migration Guide + +## Overview + +Successfully migrated file/system tools to MCP (Model Context Protocol) servers for better performance and integration with Claude Agent SDK. + +## Architecture + +### MCP Tools (In-Process - No API Costs) +**File**: `mcp_tools.py` +**Server**: `file_system` (v1.0.0) + +These tools run directly in the Python process using the Claude Agent SDK: +- ✅ `read_file` - Read file contents +- ✅ `write_file` - Create/overwrite files +- ✅ `edit_file` - Replace text in files +- ✅ `list_directory` - List directory contents +- ✅ `run_command` - Execute shell commands + +**Benefits**: +- Zero per-token API costs when using Agent SDK +- Better performance (no IPC overhead) +- Direct access to application state +- Simpler deployment (single process) + +### Traditional Tools (API-Based - Consumes Tokens) +**File**: `tools.py` + +These tools require external APIs and fall back to Direct API even in Agent SDK mode: +- 🌤️ `get_weather` - OpenWeatherMap API +- 📧 `send_email`, `read_emails`, `get_email` - Gmail API +- 📅 `read_calendar`, `create_calendar_event`, `search_calendar` - Google Calendar API +- 👤 `create_contact`, `list_contacts`, `get_contact` - Google People API + +**Why not MCP?**: These tools need OAuth state, external API calls, and async HTTP clients that are better suited to the traditional tool execution model. + +## Model Configuration + +### Agent SDK Mode (DEFAULT) +```python +USE_AGENT_SDK=true # Default +``` + +**Model Configuration**: +- Default: **claude-sonnet-4-5-20250929** (all operations - chat, tools, coding) +- Optional: **claude-opus-4-6** (requires `USE_OPUS_FOR_TOOLS=true`, only for extremely intensive tasks) + +**Usage**: +- Regular chat: Uses Sonnet (flat-rate, no API costs) +- File operations: Uses Sonnet via MCP tools (flat-rate, no API costs) +- Google/Weather: Uses Sonnet via Direct API fallback (requires ANTHROPIC_API_KEY, consumes tokens) +- Intensive tasks: Optionally enable Opus with `USE_OPUS_FOR_TOOLS=true` (flat-rate, no extra cost) + +**Cost Structure**: +- Chat + MCP tools: Flat-rate subscription (Pro plan) +- Traditional tools (Google/Weather): Pay-per-token at Sonnet rates (requires API key) + +### Direct API Mode +```python +USE_DIRECT_API=true +Model: claude-sonnet-4-5-20250929 # Cost-effective (never uses Opus - too expensive) +``` + +**Usage**: +- All operations: Pay-per-token +- Requires: ANTHROPIC_API_KEY in .env +- All tools: Traditional execution (same token cost) + +## Implementation Details + +### MCP Server Integration + +**In `llm_interface.py`**: +```python +from mcp_tools import file_system_server + +options = ClaudeAgentOptions( + mcp_servers={"file_system": file_system_server}, + allowed_tools=[ + "read_file", "write_file", "edit_file", + "list_directory", "run_command" + ], +) + +response = await query( + messages=sdk_messages, + max_tokens=max_tokens, + options=options, +) +``` + +### Tool Definition Format + +**MCP Tool Example**: +```python +@tool( + name="read_file", + description="Read the contents of a file.", + input_schema={"file_path": str}, +) +async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + return { + "content": [{"type": "text", "text": "..."}], + "isError": False # Optional + } +``` + +**Traditional Tool Example**: +```python +{ + "name": "send_email", + "description": "Send an email from the bot's Gmail account.", + "input_schema": { + "type": "object", + "properties": {"to": {"type": "string"}, ...}, + "required": ["to", "subject", "body"] + } +} +``` + +## Future Enhancements + +### Potential MCP Candidates +- [ ] Weather tool (if we cache API responses in-process) +- [ ] Memory search tools (direct DB access) +- [ ] Configuration management tools + +### Google Tools Migration (Optional) +To fully migrate Google tools to MCP, we would need to: +1. Embed OAuth manager in MCP server lifecycle +2. Handle async HTTP clients within MCP context +3. Manage token refresh in-process + +**Recommendation**: Keep Google tools as traditional tools for now. The complexity of OAuth state management outweighs the token cost savings for infrequent API calls. + +## Testing + +```bash +# Test MCP server creation +python -c "from mcp_tools import file_system_server; print(file_system_server)" + +# Test Agent SDK with Opus +python -c "import os; os.environ['USE_AGENT_SDK']='true'; from llm_interface import LLMInterface; llm = LLMInterface(provider='claude'); print(f'Model: {llm.model}')" + +# Expected: Model: claude-opus-4-6 +``` + +## References + +- Claude Agent SDK Docs: https://github.com/anthropics/claude-agent-sdk +- MCP Protocol: https://modelcontextprotocol.io +- Tool Decorators: `claude_agent_sdk.tool`, `create_sdk_mcp_server` diff --git a/SUB_AGENTS.md b/SUB_AGENTS.md new file mode 100644 index 0000000..7d6b295 --- /dev/null +++ b/SUB_AGENTS.md @@ -0,0 +1,205 @@ +# Sub-Agent Orchestration System + +## Overview + +Ajarbot now supports **dynamic sub-agent spawning** - the ability to create specialized agents on-demand for complex tasks. The main agent can delegate work to specialists with focused system prompts, reducing context window bloat and improving task efficiency. + +## Architecture + +``` +Main Agent (Garvis) +├─> Handles general chat, memory, scheduling +├─> Can spawn sub-agents dynamically +└─> Sub-agents share tools and (optionally) memory + +Sub-Agent (Specialist) +├─> Focused system prompt (no SOUL, user profile overhead) +├─> Own conversation history (isolated context) +├─> Can use all 24 tools +└─> Returns result to main agent +``` + +## Key Features + +- **Dynamic spawning**: Create specialists at runtime, no hardcoded definitions +- **Caching**: Reuse specialists across multiple calls (agent_id parameter) +- **Memory sharing**: Sub-agents can share memory workspace with main agent +- **Tool access**: All tools available to sub-agents (file, web, zettelkasten, Google) +- **Isolation**: Each sub-agent has separate conversation history + +## Usage + +### Method 1: Manual Spawning + +```python +# Spawn a specialist +specialist = agent.spawn_sub_agent( + specialist_prompt="You are a zettelkasten expert. Focus ONLY on note organization.", + agent_id="zettelkasten_processor" # Optional: cache for reuse +) + +# Use the specialist +result = specialist.chat("Process my fleeting notes", username="jordan") +``` + +### Method 2: Delegation (Recommended) + +```python +# One-off delegation (specialist not cached) +result = agent.delegate( + task="Analyze my emails and extract action items", + specialist_prompt="You are an email analyst. Extract action items and deadlines.", + username="jordan" +) + +# Cached delegation (specialist reused) +result = agent.delegate( + task="Create permanent notes from my fleeting notes", + specialist_prompt="You are a zettelkasten specialist. Focus on note linking.", + username="jordan", + agent_id="zettelkasten_processor" # Cached for future use +) +``` + +### Method 3: LLM-Driven Orchestration (Future) + +The main agent can analyze requests and decide when to delegate: + +```python +def _should_delegate(self, user_message: str) -> Optional[str]: + """Let LLM decide if delegation is needed.""" + # Ask LLM: "Should this be delegated? If yes, generate specialist prompt" + # Return specialist_prompt if delegation needed, None otherwise + pass +``` + +## Use Cases + +### Complex Zettelkasten Operations +```python +# Main agent detects: "This requires deep note processing" +specialist = agent.spawn_sub_agent( + specialist_prompt="""You are a zettelkasten expert. Your ONLY job is: + - Process fleeting notes into permanent notes + - Find semantic connections using hybrid search + - Create wiki-style links between related concepts + Stay focused on knowledge management.""", + agent_id="zettelkasten_processor" +) +``` + +### Email Intelligence +```python +specialist = agent.spawn_sub_agent( + specialist_prompt="""You are an email analyst. Your ONLY job is: + - Summarize email threads + - Extract action items and deadlines + - Identify patterns in communication + Stay focused on email analysis.""", + agent_id="email_analyst" +) +``` + +### Calendar Optimization +```python +specialist = agent.spawn_sub_agent( + specialist_prompt="""You are a calendar optimization expert. Your ONLY job is: + - Find scheduling conflicts + - Suggest optimal meeting times + - Identify time-blocking opportunities + Stay focused on schedule management.""", + agent_id="calendar_optimizer" +) +``` + +## Benefits + +1. **Reduced Context Window**: Specialists don't load SOUL.md, user profiles, or irrelevant memory +2. **Focused Performance**: Specialists stay on-task without distractions +3. **Token Efficiency**: Smaller system prompts = lower token usage +4. **Parallel Execution**: Can spawn multiple specialists simultaneously (future) +5. **Learning Over Time**: Main agent learns when to delegate based on patterns + +## Configuration + +No configuration needed! The infrastructure is ready to use. You can: + +1. **Add specialists later**: Define common specialists in a config file +2. **LLM-driven delegation**: Let the main agent decide when to delegate +3. **Parallel execution**: Spawn multiple specialists for complex workflows +4. **Custom workspaces**: Give specialists isolated memory (set `share_memory=False`) + +## Implementation Details + +### Code Location +- **agent.py**: Lines 25-90 (sub-agent infrastructure) + - `spawn_sub_agent()`: Create specialist with custom prompt + - `delegate()`: Convenience method for one-off delegation + - `is_sub_agent`, `specialist_prompt`: Instance variables + - `sub_agents`: Cache dictionary + +### Thread Safety +- Sub-agents have their own `_chat_lock` +- Safe to spawn from multiple threads +- Cached specialists are reused (no duplicate spawning) + +### Memory Sharing +- Default: Sub-agents share main memory workspace +- Optional: Isolated workspace at `memory_workspace/sub_agents/{agent_id}/` +- Shared memory = specialists can access/update zettelkasten vault + +## Future Enhancements + +1. **Specialist Registry**: Define common specialists in `config/specialists.yaml` +2. **Auto-Delegation**: Main agent auto-detects when to delegate +3. **Parallel Execution**: Run multiple specialists concurrently +4. **Result Synthesis**: Main agent combines outputs from multiple specialists +5. **Learning System**: Track which specialists work best for which tasks + +## Example Workflows + +### Workflow 1: Zettelkasten Processing with Delegation +```python +# User: "Process my fleeting notes about AI and machine learning" +# Main agent detects: complex zettelkasten task + +result = agent.delegate( + task="Find all fleeting notes tagged 'AI' or 'machine-learning', process into permanent notes, and discover connections", + specialist_prompt="You are a zettelkasten expert. Use hybrid search to find semantic connections. Create permanent notes with smart links.", + username="jordan", + agent_id="zettelkasten_processor" +) + +# Specialist: +# 1. search_by_tags(tags=["AI", "machine-learning", "fleeting"]) +# 2. For each note: permanent_note() with auto-linking +# 3. Returns: "Created 5 permanent notes with 18 discovered connections" + +# Main agent synthesizes: +# "Sir, I've processed your AI and ML notes. Five concepts emerged with particularly +# interesting connections to your existing work on neural architecture..." +``` + +### Workflow 2: Email + Calendar Coordination +```python +# User: "Find meetings next week and check if I have email threads about them" + +# Spawn two specialists in parallel (future feature) +email_result = agent.delegate( + task="Search emails for threads about meetings", + specialist_prompt="Email analyst. Extract meeting context.", + agent_id="email_analyst" +) + +calendar_result = agent.delegate( + task="List all meetings next week", + specialist_prompt="Calendar expert. Get meeting details.", + agent_id="calendar_optimizer" +) + +# Main agent synthesizes both results +``` + +--- + +**Status**: Infrastructure complete, ready to use. Add specialists as patterns emerge! diff --git a/agent.py b/agent.py index a053a3f..f3a3bfa 100644 --- a/agent.py +++ b/agent.py @@ -15,6 +15,8 @@ MAX_CONTEXT_MESSAGES = 20 # Optimized for Agent SDK flat-rate subscription MEMORY_RESPONSE_PREVIEW_LENGTH = 500 # Store more context for better memory retrieval # Maximum conversation history entries before pruning MAX_CONVERSATION_HISTORY = 100 # Higher limit with flat-rate subscription +# Maximum tool execution iterations (generous limit for complex operations like zettelkasten) +MAX_TOOL_ITERATIONS = 30 # Allows complex multi-step workflows with auto-linking, hybrid search, etc. class Agent: @@ -24,6 +26,8 @@ class Agent: self, provider: str = "claude", workspace_dir: str = "./memory_workspace", + is_sub_agent: bool = False, + specialist_prompt: Optional[str] = None, ) -> None: self.memory = MemorySystem(workspace_dir) self.llm = LLMInterface(provider) @@ -32,8 +36,93 @@ class Agent: self._chat_lock = threading.Lock() self.healing_system = SelfHealingSystem(self.memory, self) + # Sub-agent orchestration + self.is_sub_agent = is_sub_agent + self.specialist_prompt = specialist_prompt + self.sub_agents: dict = {} # Cache for spawned sub-agents + self.memory.sync() - self.hooks.trigger("agent", "startup", {"workspace_dir": workspace_dir}) + if not is_sub_agent: # Only trigger hooks for main agent + self.hooks.trigger("agent", "startup", {"workspace_dir": workspace_dir}) + + def spawn_sub_agent( + self, + specialist_prompt: str, + agent_id: Optional[str] = None, + share_memory: bool = True, + ) -> 'Agent': + """Spawn a sub-agent with specialized system prompt. + + Args: + specialist_prompt: Custom system prompt for the specialist + agent_id: Optional ID for caching (reuse same specialist) + share_memory: Whether to share memory workspace with main agent + + Returns: + Agent instance configured as a specialist + + Example: + sub = agent.spawn_sub_agent( + specialist_prompt="You are a zettelkasten expert. Focus ONLY on note-taking.", + agent_id="zettelkasten_processor" + ) + result = sub.chat("Process my fleeting notes", username="jordan") + """ + # Check cache if agent_id provided + if agent_id and agent_id in self.sub_agents: + return self.sub_agents[agent_id] + + # Create new sub-agent + workspace = self.memory.workspace_dir if share_memory else f"{self.memory.workspace_dir}/sub_agents/{agent_id}" + sub_agent = Agent( + provider=self.llm.provider, + workspace_dir=workspace, + is_sub_agent=True, + specialist_prompt=specialist_prompt, + ) + + # Cache if ID provided + if agent_id: + self.sub_agents[agent_id] = sub_agent + + return sub_agent + + def delegate( + self, + task: str, + specialist_prompt: str, + username: str = "default", + agent_id: Optional[str] = None, + ) -> str: + """Delegate a task to a specialist sub-agent (convenience method). + + Args: + task: The task/message to send to the specialist + specialist_prompt: System prompt defining the specialist's role + username: Username for context + agent_id: Optional ID for caching the specialist + + Returns: + Response from the specialist + + Example: + # One-off delegation + result = agent.delegate( + task="Process my fleeting notes and find connections", + specialist_prompt="You are a zettelkasten expert. Focus on note organization and linking.", + username="jordan" + ) + + # Cached specialist (reused across multiple calls) + result = agent.delegate( + task="Summarize my emails from today", + specialist_prompt="You are an email analyst. Focus on extracting key information.", + username="jordan", + agent_id="email_analyst" + ) + """ + sub_agent = self.spawn_sub_agent(specialist_prompt, agent_id=agent_id) + return sub_agent.chat(task, username=username) def _get_context_messages(self, max_messages: int) -> List[dict]: """Get recent messages without breaking tool_use/tool_result pairs. @@ -140,19 +229,29 @@ class Agent: def _chat_inner(self, user_message: str, username: str) -> str: """Inner chat logic, called while holding _chat_lock.""" - soul = self.memory.get_soul() - user_profile = self.memory.get_user(username) - relevant_memory = self.memory.search_hybrid(user_message, max_results=5) + # Use specialist prompt if this is a sub-agent, otherwise use full context + if self.specialist_prompt: + # Sub-agent: Use focused specialist prompt + system = ( + f"{self.specialist_prompt}\n\n" + f"You have access to {len(TOOL_DEFINITIONS)} tools. Use them to accomplish your specialized task. " + f"Stay focused on your specialty and complete the task efficiently." + ) + else: + # Main agent: Use full SOUL, user profile, and memory context + soul = self.memory.get_soul() + user_profile = self.memory.get_user(username) + relevant_memory = self.memory.search_hybrid(user_message, max_results=5) - memory_lines = [f"- {mem['snippet']}" for mem in relevant_memory] - system = ( - f"{soul}\n\nUser Profile:\n{user_profile}\n\n" - f"Relevant Memory:\n" + "\n".join(memory_lines) + - f"\n\nYou have access to {len(TOOL_DEFINITIONS)} tools for file operations, " - f"command execution, and Google services. Use them freely to help the user. " - f"Note: You're running on a flat-rate Agent SDK subscription, so don't worry " - f"about API costs when making multiple tool calls or processing large contexts." - ) + memory_lines = [f"- {mem['snippet']}" for mem in relevant_memory] + system = ( + f"{soul}\n\nUser Profile:\n{user_profile}\n\n" + f"Relevant Memory:\n" + "\n".join(memory_lines) + + f"\n\nYou have access to {len(TOOL_DEFINITIONS)} tools for file operations, " + f"command execution, and Google services. Use them freely to help the user. " + f"Note: You're running on a flat-rate Agent SDK subscription, so don't worry " + f"about API costs when making multiple tool calls or processing large contexts." + ) self.conversation_history.append( {"role": "user", "content": user_message} @@ -162,7 +261,7 @@ class Agent: self._prune_conversation_history() # Tool execution loop - max_iterations = 15 # Increased for complex multi-step operations + max_iterations = MAX_TOOL_ITERATIONS # Enable caching for Sonnet to save 90% on repeated system prompts use_caching = "sonnet" in self.llm.model.lower() diff --git a/config/scheduled_tasks.example.yaml b/config/scheduled_tasks.example.yaml index 86cb259..dd583cf 100644 --- a/config/scheduled_tasks.example.yaml +++ b/config/scheduled_tasks.example.yaml @@ -1,85 +1,63 @@ -# Scheduled Tasks Configuration (EXAMPLE) -# Copy this to scheduled_tasks.yaml and customize with your values +# Scheduled Tasks Configuration +# Tasks that require the Agent/LLM to execute +# +# Copy this file to scheduled_tasks.yaml and customize with your settings +# scheduled_tasks.yaml is gitignored to protect personal information tasks: # Morning briefing - sent to Slack/Telegram - name: morning-weather prompt: | - Good morning! Please provide a weather report and daily briefing: + Check the user profile ([username].md) for the location. Use the get_weather tool to fetch current weather. - 1. Current weather (you can infer or say you need an API key) - 2. Any pending tasks from yesterday - 3. Priorities for today - 4. A motivational quote to start the day + Format the report as: - Keep it brief and friendly. + 🌤️ **Weather Report for [Your City]** + - Current: [current]°F + - High: [high]°F + - Low: [low]°F + - Conditions: [conditions] + - Recommendation: [brief clothing/activity suggestion] + + Keep it brief and friendly! schedule: "daily 06:00" enabled: true - send_to_platform: "telegram" - send_to_channel: "YOUR_TELEGRAM_USER_ID" # Replace with your Telegram user ID + send_to_platform: "telegram" # or "slack" + send_to_channel: "YOUR_TELEGRAM_USER_ID" - # Evening summary - - name: evening-report + # Daily Zettelkasten Review + - name: zettelkasten-daily-review prompt: | - Good evening! Time for the daily wrap-up: + Time for your daily zettelkasten review! Help process fleeting notes: - 1. What was accomplished today? - 2. Any tasks still pending? - 3. Preview of tomorrow's priorities - 4. Weather forecast for tomorrow (infer or API needed) + 1. Use search_by_tags to find all notes tagged with "fleeting" + 2. Show the list of fleeting notes + 3. For each note, ask: "Would you like to: + a) Process this into a permanent note + b) Keep as fleeting for now + c) Delete (not useful)" - Keep it concise and positive. - schedule: "daily 18:00" - enabled: false + Keep it conversational and low-pressure! + schedule: "daily 20:00" + enabled: true send_to_platform: "telegram" send_to_channel: "YOUR_TELEGRAM_USER_ID" - # Hourly health check (no message sending) - - name: system-health-check + # Daily API cost report + - name: daily-cost-report prompt: | - Quick health check: + Generate a daily API usage and cost report: - 1. Are there any tasks that have been pending > 24 hours? - 2. Is the memory system healthy? - 3. Any alerts or issues? + Read the usage_data.json file to get today's API usage statistics. - Respond with "HEALTHY" if all is well, otherwise describe the issue. - schedule: "hourly" + Format the report with today's costs, token usage, and budget tracking. + Warn if cumulative cost exceeds 75% of budget. + + Keep it clear and actionable! + schedule: "daily 23:00" enabled: false - username: "health-checker" - - # Weekly review on Friday - - name: weekly-summary - prompt: | - It's Friday! Time for the weekly review: - - 1. Major accomplishments this week - 2. Challenges faced and lessons learned - 3. Key metrics (tasks completed, etc.) - 4. Goals for next week - 5. Team shoutouts (if applicable) - - Make it comprehensive but engaging. - schedule: "weekly fri 17:00" - enabled: false - send_to_platform: "slack" - send_to_channel: "YOUR_SLACK_CHANNEL_ID" - - # Custom: Midday standup - - name: midday-standup - prompt: | - Midday check-in! Quick standup report: - - 1. Morning accomplishments - 2. Current focus - 3. Any blockers? - 4. Afternoon plan - - Keep it brief - standup style. - schedule: "daily 12:00" - enabled: false - send_to_platform: "slack" - send_to_channel: "YOUR_SLACK_CHANNEL_ID" + send_to_platform: "telegram" + send_to_channel: "YOUR_TELEGRAM_USER_ID" # Configuration notes: # - schedule formats: diff --git a/examples/sub_agent_example.py b/examples/sub_agent_example.py new file mode 100644 index 0000000..9597066 --- /dev/null +++ b/examples/sub_agent_example.py @@ -0,0 +1,173 @@ +""" +Example: Using Sub-Agent Orchestration + +This example demonstrates how to use the sub-agent system to delegate +specialized tasks to focused agents. +""" + +import sys +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from agent import Agent + + +def example_1_manual_spawning(): + """Example 1: Manually spawn and use a specialist.""" + print("=== Example 1: Manual Spawning ===\n") + + # Create main agent + agent = Agent(provider="claude") + + # Spawn a zettelkasten specialist + zettel_specialist = agent.spawn_sub_agent( + specialist_prompt="""You are a zettelkasten expert. Your ONLY job is: + - Process fleeting notes into permanent notes + - Find semantic connections using hybrid search + - Create wiki-style links between related concepts + + Stay focused on knowledge management. Be concise.""", + agent_id="zettelkasten_processor" # Cached for reuse + ) + + # Use the specialist + result = zettel_specialist.chat( + "Search for all fleeting notes tagged 'AI' and show me what you find.", + username="jordan" + ) + + print(f"Specialist Response:\n{result}\n") + + # Reuse the cached specialist + result2 = zettel_specialist.chat( + "Now create a permanent note summarizing key AI concepts.", + username="jordan" + ) + + print(f"Second Response:\n{result2}\n") + + +def example_2_delegation(): + """Example 2: One-off delegation (convenience method).""" + print("=== Example 2: Delegation ===\n") + + agent = Agent(provider="claude") + + # One-off delegation (specialist not cached) + result = agent.delegate( + task="List all files in the memory_workspace/obsidian directory", + specialist_prompt="""You are a file system expert. Your job is to: + - Navigate directories efficiently + - Provide clear, organized file listings + + Be concise and focused.""", + username="jordan" + ) + + print(f"Delegation Result:\n{result}\n") + + +def example_3_cached_delegation(): + """Example 3: Cached delegation (reuse specialist).""" + print("=== Example 3: Cached Delegation ===\n") + + agent = Agent(provider="claude") + + # First call: Creates and caches the specialist + result1 = agent.delegate( + task="Search the zettelkasten vault for notes about 'architecture'", + specialist_prompt="""You are a zettelkasten search expert. Your job is: + - Use hybrid search to find relevant notes + - Summarize key findings concisely + + Stay focused on search and retrieval.""", + username="jordan", + agent_id="zettel_search" # This specialist will be cached + ) + + print(f"First Search:\n{result1}\n") + + # Second call: Reuses the cached specialist + result2 = agent.delegate( + task="Now search for notes about 'design patterns'", + specialist_prompt="(ignored - using cached specialist)", + username="jordan", + agent_id="zettel_search" # Same ID = reuse cached specialist + ) + + print(f"Second Search:\n{result2}\n") + + +def example_4_multiple_specialists(): + """Example 4: Use multiple specialists for different tasks.""" + print("=== Example 4: Multiple Specialists ===\n") + + agent = Agent(provider="claude") + + # Email specialist + email_result = agent.delegate( + task="Check if there are any unread emails in the last 24 hours", + specialist_prompt="""You are an email analyst. Your job is: + - Search and filter emails efficiently + - Summarize key information concisely + + Focus on email intelligence.""", + username="jordan", + agent_id="email_analyst" + ) + + print(f"Email Analysis:\n{email_result}\n") + + # Calendar specialist + calendar_result = agent.delegate( + task="Show me my calendar events for the next 3 days", + specialist_prompt="""You are a calendar expert. Your job is: + - Retrieve calendar events efficiently + - Present schedules clearly + + Focus on time management.""", + username="jordan", + agent_id="calendar_manager" + ) + + print(f"Calendar Review:\n{calendar_result}\n") + + +def example_5_isolated_memory(): + """Example 5: Create specialist with isolated memory.""" + print("=== Example 5: Isolated Memory ===\n") + + agent = Agent(provider="claude") + + # Specialist with its own memory workspace + specialist = agent.spawn_sub_agent( + specialist_prompt="You are a research assistant. Focus on gathering information.", + agent_id="researcher", + share_memory=False # Isolated workspace + ) + + # This specialist's memory is stored in: + # memory_workspace/sub_agents/researcher/ + + result = specialist.chat( + "Research the concept of 'emergence' and save findings.", + username="jordan" + ) + + print(f"Research Result:\n{result}\n") + + +if __name__ == "__main__": + # Run examples + # Uncomment the examples you want to try: + + # example_1_manual_spawning() + # example_2_delegation() + # example_3_cached_delegation() + # example_4_multiple_specialists() + # example_5_isolated_memory() + + print("\nℹ️ Uncomment the examples you want to run in the __main__ block") + print("ℹ️ Note: Some examples require Google OAuth setup and active API keys") diff --git a/llm_interface.py b/llm_interface.py index 0519a8f..6904d0a 100644 --- a/llm_interface.py +++ b/llm_interface.py @@ -1,9 +1,21 @@ """LLM Interface - Claude API, GLM, and other models. Supports three modes for Claude: -1. Agent SDK (uses Pro subscription) - DEFAULT - Set USE_AGENT_SDK=true (default) +1. Agent SDK (v0.1.36+) - DEFAULT - Uses query() API with Pro subscription + - Set USE_AGENT_SDK=true (default) + - Model: claude-sonnet-4-5-20250929 (default for all operations) + - Optional: USE_OPUS_FOR_TOOLS=true (enables Opus for extremely intensive tasks only) + - MCP Tools: File/system tools (read_file, write_file, edit_file, list_directory, run_command) + - Traditional Tools: Google tools & weather (fall back to Direct API, requires ANTHROPIC_API_KEY) + - Flat-rate subscription cost (no per-token charges for MCP tools) + 2. Direct API (pay-per-token) - Set USE_DIRECT_API=true + - Model: claude-sonnet-4-5-20250929 (cost-effective, never uses Opus) + - Requires ANTHROPIC_API_KEY in .env + - Full tool support built-in (all tools via traditional API) + 3. Legacy: Local Claude Code server - Set USE_CLAUDE_CODE_SERVER=true (deprecated) + - For backward compatibility only """ import os @@ -17,7 +29,13 @@ from usage_tracker import UsageTracker # Try to import Agent SDK (optional dependency) try: - from claude_agent_sdk import AgentSDK + from claude_agent_sdk import ( + query, + UserMessage, + AssistantMessage, + SystemMessage, + ClaudeAgentOptions, + ) import anyio AGENT_SDK_AVAILABLE = True except ImportError: @@ -38,11 +56,15 @@ _USE_AGENT_SDK = os.getenv("USE_AGENT_SDK", "true").lower() == "true" # Default models by provider _DEFAULT_MODELS = { - "claude": "claude-haiku-4-5-20251001", # For Direct API (pay-per-token) - "claude_agent_sdk": "claude-sonnet-4-5-20250929", # For Agent SDK (flat-rate subscription) + "claude": "claude-sonnet-4-5-20250929", # For Direct API (pay-per-token) - Sonnet is cost-effective + "claude_agent_sdk": "claude-sonnet-4-5-20250929", # For Agent SDK (flat-rate) - Sonnet for normal operations + "claude_agent_sdk_opus": "claude-opus-4-6", # For Agent SDK extremely intensive tasks only (flat-rate) "glm": "glm-4-plus", } +# When to use Opus (only on Agent SDK flat-rate mode) +_USE_OPUS_FOR_TOOLS = os.getenv("USE_OPUS_FOR_TOOLS", "false").lower() == "true" + _GLM_BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions" @@ -60,7 +82,6 @@ class LLMInterface: _API_KEY_ENV_VARS.get(provider, ""), ) self.client: Optional[Anthropic] = None - self.agent_sdk: Optional[Any] = None # Model will be set after determining mode # Determine mode (priority: direct API > legacy server > agent SDK) @@ -96,7 +117,7 @@ class LLMInterface: if provider == "claude": if self.mode == "agent_sdk": print(f"[LLM] Using Claude Agent SDK (flat-rate subscription) with model: {self.model}") - self.agent_sdk = AgentSDK() + # No initialization needed - query() is a standalone function elif self.mode == "direct_api": print(f"[LLM] Using Direct API (pay-per-token) with model: {self.model}") self.client = Anthropic(api_key=self.api_key) @@ -115,7 +136,7 @@ class LLMInterface: self, messages: List[Dict], system: Optional[str] = None, - max_tokens: int = 4096, + max_tokens: int = 16384, ) -> str: """Send chat request and get response. @@ -126,8 +147,8 @@ class LLMInterface: # Agent SDK mode (Pro subscription) if self.mode == "agent_sdk": try: - # Use anyio to bridge async SDK to sync interface - response = anyio.from_thread.run( + # Use anyio.run to create event loop for async SDK + response = anyio.run( self._agent_sdk_chat, messages, system, @@ -208,15 +229,65 @@ class LLMInterface: max_tokens: int ) -> str: """Internal async method for Agent SDK chat (called via anyio bridge).""" - response = await self.agent_sdk.chat( - messages=messages, - system=system, - max_tokens=max_tokens, - model=self.model + # Convert messages to SDK format + sdk_messages = [] + for msg in messages: + if msg["role"] == "user": + sdk_messages.append(UserMessage(content=msg["content"])) + elif msg["role"] == "assistant": + sdk_messages.append(AssistantMessage(content=msg["content"])) + + # Add system message if provided + if system: + sdk_messages.insert(0, SystemMessage(content=system)) + + # Configure MCP server for file/system tools + try: + from mcp_tools import file_system_server + + options = ClaudeAgentOptions( + mcp_servers={"file_system": file_system_server}, + # Allow all MCP tools (file/system + web + zettelkasten) + allowed_tools=[ + "read_file", + "write_file", + "edit_file", + "list_directory", + "run_command", + "web_fetch", + "fleeting_note", + "daily_note", + "literature_note", + "permanent_note", + "search_vault", + "search_by_tags", + ], + ) + except ImportError: + # Fallback if mcp_tools not available + options = None + + # Call the new query() API + # Note: Agent SDK handles max_tokens internally, don't pass it explicitly + response = await query( + messages=sdk_messages, + options=options, + # model parameter is handled by the SDK based on settings ) + # Extract text from response - if isinstance(response, dict): - return response.get("content", "") + if hasattr(response, "content"): + # Handle list of content blocks + if isinstance(response.content, list): + text_parts = [] + for block in response.content: + if hasattr(block, "text"): + text_parts.append(block.text) + return "".join(text_parts) + # Handle single text content + elif isinstance(response.content, str): + return response.content + return str(response) async def _agent_sdk_chat_with_tools( @@ -226,17 +297,43 @@ class LLMInterface: system: Optional[str], max_tokens: int ) -> Message: - """Internal async method for Agent SDK chat with tools (called via anyio bridge).""" - response = await self.agent_sdk.chat( + """Internal async method for Agent SDK chat with tools (called via anyio bridge). + + NOTE: The new Claude Agent SDK (v0.1.36+) uses MCP servers for tools. + For backward compatibility with the existing tool system, we fall back + to the Direct API for tool calls. This means tool calls will consume API tokens + even when Agent SDK mode is enabled. + + Uses Sonnet by default. Opus can be enabled via USE_OPUS_FOR_TOOLS=true for + extremely intensive tasks (only recommended for Agent SDK flat-rate mode). + """ + # Fallback to Direct API for tool calls (SDK tools use MCP servers) + from anthropic import Anthropic + + if not self.api_key: + raise ValueError( + "ANTHROPIC_API_KEY required for tool calls in Agent SDK mode. " + "Set the API key in .env or migrate tools to MCP servers." + ) + + temp_client = Anthropic(api_key=self.api_key) + + # Use Opus only if explicitly enabled (for intensive tasks on flat-rate) + # Otherwise default to Sonnet (cost-effective for normal tool operations) + if _USE_OPUS_FOR_TOOLS and self.mode == "agent_sdk": + model = _DEFAULT_MODELS.get("claude_agent_sdk_opus", "claude-opus-4-6") + else: + model = self.model # Use Sonnet (default) + + response = temp_client.messages.create( + model=model, + max_tokens=max_tokens, + system=system or "", messages=messages, tools=tools, - system=system, - max_tokens=max_tokens, - model=self.model ) - # Convert Agent SDK response to anthropic.types.Message format - return self._convert_sdk_response_to_message(response) + return response def _convert_sdk_response_to_message(self, sdk_response: Dict[str, Any]) -> Message: """Convert Agent SDK response to anthropic.types.Message format. @@ -302,7 +399,7 @@ class LLMInterface: messages: List[Dict], tools: List[Dict[str, Any]], system: Optional[str] = None, - max_tokens: int = 4096, + max_tokens: int = 16384, use_cache: bool = False, ) -> Message: """Send chat request with tool support. Returns full Message object. @@ -316,8 +413,8 @@ class LLMInterface: # Agent SDK mode (Pro subscription) if self.mode == "agent_sdk": try: - # Use anyio to bridge async SDK to sync interface - response = anyio.from_thread.run( + # Use anyio.run to create event loop for async SDK + response = anyio.run( self._agent_sdk_chat_with_tools, messages, tools, diff --git a/mcp_tools.py b/mcp_tools.py new file mode 100644 index 0000000..68f4353 --- /dev/null +++ b/mcp_tools.py @@ -0,0 +1,1054 @@ +"""MCP Tools - In-process tools using Claude Agent SDK. + +These tools run directly in the Python process for better performance +compared to the traditional tool execution flow. File and system tools +are ideal candidates for MCP since they don't require external APIs. +""" + +from pathlib import Path +import subprocess +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse +from datetime import datetime +from claude_agent_sdk import tool, create_sdk_mcp_server +import httpx +from bs4 import BeautifulSoup + +# Import memory system for hybrid search +try: + from memory_system import MemorySystem + MEMORY_AVAILABLE = True +except ImportError: + MEMORY_AVAILABLE = False + + +# Maximum characters of tool output to return (prevents token explosion) +_MAX_TOOL_OUTPUT = 5000 + +# Maximum page size for web fetching (500KB) +_MAX_WEB_PAGE_SIZE = 500_000 + +# Maximum text content from web page (10,000 chars ≈ 2,500 tokens) +_MAX_WEB_TEXT = 10_000 + +# Zettelkasten vault paths +_VAULT_ROOT = Path("memory_workspace/obsidian") +_VAULT_FLEETING = _VAULT_ROOT / "fleeting" +_VAULT_DAILY = _VAULT_ROOT / "daily" +_VAULT_PERMANENT = _VAULT_ROOT / "permanent" +_VAULT_LITERATURE = _VAULT_ROOT / "literature" + + +def _generate_note_id() -> str: + """Generate unique timestamp-based note ID (YYYYMMDDHHmmss).""" + return datetime.now().strftime("%Y%m%d%H%M%S") + + +def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str: + """Create YAML front matter for zettelkasten note.""" + now = datetime.now() + tags_str = ", ".join(tags) if tags else "" + + return f"""--- +id: {note_id} +title: {title} +created: {now.strftime("%Y-%m-%dT%H:%M:%S")} +modified: {now.strftime("%Y-%m-%dT%H:%M:%S")} +type: {note_type} +tags: [{tags_str}] +--- + +""" + + +def _ensure_vault_structure(): + """Ensure zettelkasten vault directories exist.""" + for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + dir_path.mkdir(parents=True, exist_ok=True) + + +def _get_memory_system() -> Optional['MemorySystem']: + """Get memory system instance for hybrid search.""" + if not MEMORY_AVAILABLE: + return None + + try: + # Initialize memory system pointing to obsidian vault + memory = MemorySystem(workspace_dir=_VAULT_ROOT) + return memory + except Exception: + return None + + +def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]: + """Find related notes using hybrid search (vector + keyword). + + Returns list of (note_title, score) tuples. + """ + memory = _get_memory_system() + + if memory: + # Use hybrid search for better results + try: + # Index the vault if needed + for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if vault_dir.exists(): + for note_path in vault_dir.glob("*.md"): + memory.index_file(note_path) + + # Search with content + title as query + query = f"{title} {content[:500]}" # Limit content to first 500 chars + results = memory.search_hybrid(query, max_results=max_results) + + # Convert results to (title, score) tuples + related = [] + for result in results: + note_path = Path(result["path"]) + note_title = note_path.stem + if ' - ' in note_title: + note_title = note_title.split(' - ', 1)[1] + + # Use snippet match percentage as score + score = result.get("snippet", "").count("**") // 2 # Count of matches + related.append((note_title, score)) + + return related + except Exception: + pass # Fall back to keyword search + + # Fallback: keyword search + related_notes = [] + search_terms = title.lower().split() + content.lower().split()[:20] + search_terms = [term for term in search_terms if len(term) > 4] + + for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + note_content = note_path.read_text(encoding="utf-8").lower() + matches = sum(1 for term in search_terms if term in note_content) + if matches >= 2: + note_title = note_path.stem + if ' - ' in note_title: + note_title = note_title.split(' - ', 1)[1] + related_notes.append((note_title, matches)) + except Exception: + continue + + related_notes.sort(key=lambda x: x[1], reverse=True) + return related_notes[:max_results] + + +def _is_safe_url(url: str) -> bool: + """Validate URL safety - blocks localhost, private IPs, and file:// URLs.""" + try: + parsed = urlparse(url) + + # Must be HTTP/HTTPS + if parsed.scheme not in ['http', 'https']: + return False + + # Must have a hostname + if not parsed.hostname: + return False + + # Block localhost and loopback IPs + blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1'] + if parsed.hostname in blocked_hosts: + return False + + # Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x) + if parsed.hostname: + parts = parsed.hostname.split('.') + if len(parts) == 4 and parts[0].isdigit(): + first_octet = int(parts[0]) + # Check for private ranges + if first_octet == 10: # 10.0.0.0/8 + return False + if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16 + return False + if first_octet == 172 and len(parts) > 1 and parts[1].isdigit(): + second_octet = int(parts[1]) + if 16 <= second_octet <= 31: # 172.16.0.0/12 + return False + + return True + + except Exception: + return False + + +@tool( + name="read_file", + description="Read the contents of a file. Use this to view configuration files, code, or any text file.", + input_schema={ + "file_path": str, + }, +) +async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Read and return file contents.""" + file_path = args["file_path"] + path = Path(file_path) + + if not path.exists(): + return { + "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], + "isError": True + } + + try: + content = path.read_text(encoding="utf-8") + if len(content) > _MAX_TOOL_OUTPUT: + content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error reading file: {str(e)}"}], + "isError": True + } + + +@tool( + name="write_file", + description="Write content to a file. Creates a new file or overwrites existing file completely.", + input_schema={ + "file_path": str, + "content": str, + }, +) +async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Write content to a file.""" + file_path = args["file_path"] + content = args["content"] + path = Path(file_path) + + try: + # Create parent directories if they don't exist + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Successfully wrote to {file_path} ({len(content)} characters)" + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error writing file: {str(e)}"}], + "isError": True + } + + +@tool( + name="edit_file", + description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.", + input_schema={ + "file_path": str, + "old_text": str, + "new_text": str, + }, +) +async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Edit file by replacing text.""" + file_path = args["file_path"] + old_text = args["old_text"] + new_text = args["new_text"] + path = Path(file_path) + + if not path.exists(): + return { + "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], + "isError": True + } + + try: + content = path.read_text(encoding="utf-8") + + if old_text not in content: + return { + "content": [{ + "type": "text", + "text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..." + }], + "isError": True + } + + new_content = content.replace(old_text, new_text, 1) + path.write_text(new_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Successfully edited {file_path}. Replaced 1 occurrence." + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error editing file: {str(e)}"}], + "isError": True + } + + +@tool( + name="list_directory", + description="List files and directories in a given path. Useful for exploring the file system.", + input_schema={ + "path": str, + }, +) +async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """List directory contents.""" + dir_path = Path(args.get("path", ".")) + + if not dir_path.exists(): + return { + "content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}], + "isError": True + } + + if not dir_path.is_dir(): + return { + "content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}], + "isError": True + } + + try: + items = [] + for item in sorted(dir_path.iterdir()): + item_type = "DIR " if item.is_dir() else "FILE" + size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)" + items.append(f" {item_type} {item.name}{size}") + + if not items: + return { + "content": [{"type": "text", "text": f"Directory {dir_path} is empty"}] + } + + return { + "content": [{ + "type": "text", + "text": f"Contents of {dir_path}:\n" + "\n".join(items) + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}], + "isError": True + } + + +@tool( + name="run_command", + description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.", + input_schema={ + "command": str, + "working_dir": str, + }, +) +async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Execute a shell command.""" + command = args["command"] + working_dir = args.get("working_dir", ".") + + try: + result = subprocess.run( + command, + shell=True, + cwd=working_dir, + capture_output=True, + text=True, + timeout=30, + ) + + output = [] + if result.stdout: + stdout = result.stdout + if len(stdout) > _MAX_TOOL_OUTPUT: + stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)" + output.append(f"STDOUT:\n{stdout}") + if result.stderr: + stderr = result.stderr + if len(stderr) > _MAX_TOOL_OUTPUT: + stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)" + output.append(f"STDERR:\n{stderr}") + + status = f"Command exited with code {result.returncode}" + if not output: + text = status + else: + text = status + "\n\n" + "\n\n".join(output) + + return { + "content": [{"type": "text", "text": text}], + "isError": result.returncode != 0 + } + except subprocess.TimeoutExpired: + return { + "content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}], + "isError": True + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error running command: {str(e)}"}], + "isError": True + } + + +@tool( + name="web_fetch", + description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.", + input_schema={ + "url": str, + }, +) +async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Fetch webpage and return parsed text content. + + This is a zero-cost MCP tool - it fetches the HTML and converts to text, + then returns it to the main Agent SDK query for processing (no extra API cost). + """ + url = args["url"] + + # Security: Validate URL + if not _is_safe_url(url): + return { + "content": [{ + "type": "text", + "text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed." + }], + "isError": True + } + + try: + # Fetch page with timeout and size limit + headers = { + "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" + } + async with httpx.AsyncClient( + timeout=10.0, + follow_redirects=True, + limits=httpx.Limits(max_connections=5), + headers=headers + ) as client: + response = await client.get(url) + response.raise_for_status() + + # Check size limit + if len(response.content) > _MAX_WEB_PAGE_SIZE: + return { + "content": [{ + "type": "text", + "text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})" + }], + "isError": True + } + + # Parse HTML to text + soup = BeautifulSoup(response.content, 'html.parser') + + # Remove unwanted elements + for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): + element.decompose() + + # Extract text + text = soup.get_text(separator='\n', strip=True) + + # Clean up excessive whitespace + lines = [line.strip() for line in text.split('\n') if line.strip()] + text = '\n'.join(lines) + + # Truncate if needed + if len(text) > _MAX_WEB_TEXT: + text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)" + + # Get title if available + title = soup.title.string if soup.title else "No title" + + return { + "content": [{ + "type": "text", + "text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}" + }] + } + + except httpx.TimeoutException: + return { + "content": [{ + "type": "text", + "text": f"Error: Request to {url} timed out after 10 seconds" + }], + "isError": True + } + except httpx.HTTPStatusError as e: + return { + "content": [{ + "type": "text", + "text": f"Error: HTTP {e.response.status_code} - {url}" + }], + "isError": True + } + except Exception as e: + return { + "content": [{ + "type": "text", + "text": f"Error fetching webpage: {str(e)}" + }], + "isError": True + } + + +@tool( + name="fleeting_note", + description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.", + input_schema={ + "content": str, + "tags": str, # Comma-separated tags (optional) + }, +) +async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create a fleeting note (quick capture) in the zettelkasten vault. + + Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes. + """ + content = args["content"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Generate note ID and extract title from first line + note_id = _generate_note_id() + lines = content.split('\n', 1) + title = lines[0][:50] if lines else "Quick Note" + + # Parse tags + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.append("fleeting") # Always tag as fleeting + + # Create note file + filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_FLEETING / filename + + # Write note with front matter + frontmatter = _create_frontmatter(note_id, title, tags, "fleeting") + full_content = frontmatter + content + + note_path.write_text(full_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}" + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}], + "isError": True + } + + +@tool( + name="daily_note", + description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.", + input_schema={ + "entry": str, + }, +) +async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Append timestamped entry to today's daily note. + + Zero-cost MCP tool for daily journaling and activity logging. + """ + entry = args["entry"] + + try: + _ensure_vault_structure() + + # Get today's date + now = datetime.now() + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M") + + # Daily note path + note_path = _VAULT_DAILY / f"{date_str}.md" + + # Create or update daily note + if note_path.exists(): + # Append to existing note + current_content = note_path.read_text(encoding="utf-8") + new_entry = f"\n## {time_str}\n{entry}\n" + updated_content = current_content + new_entry + else: + # Create new daily note with front matter + frontmatter = f"""--- +date: {date_str} +type: daily +tags: [daily-note] +--- + +# {date_str} + +## {time_str} +{entry} +""" + updated_content = frontmatter + + note_path.write_text(updated_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}" + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}], + "isError": True + } + + +@tool( + name="literature_note", + description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.", + input_schema={ + "url": str, + "tags": str, # Comma-separated tags (optional) + }, +) +async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create literature note from web article. + + Zero-cost MCP tool. Combines web_fetch with note creation. + """ + url = args["url"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Fetch article content using web_fetch logic + if not _is_safe_url(url): + return { + "content": [{ + "type": "text", + "text": f"Error: Blocked unsafe URL - {url}" + }], + "isError": True + } + + # Fetch the article + headers = { + "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" + } + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: + response = await client.get(url) + response.raise_for_status() + + if len(response.content) > _MAX_WEB_PAGE_SIZE: + return { + "content": [{ + "type": "text", + "text": f"Error: Page too large" + }], + "isError": True + } + + # Parse content + soup = BeautifulSoup(response.content, 'html.parser') + for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): + element.decompose() + + text = soup.get_text(separator='\n', strip=True) + lines = [line.strip() for line in text.split('\n') if line.strip()] + text = '\n'.join(lines)[:_MAX_WEB_TEXT] + + title = soup.title.string if soup.title else "Untitled Article" + + # Generate note + note_id = _generate_note_id() + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.extend(["literature", "web-article"]) + + # Create note content + note_content = f"""# {title} + +**Source**: {url} +**Captured**: {datetime.now().strftime("%Y-%m-%d")} + +## Content + +{text} + +## Notes + +(Add your thoughts, key takeaways, and connections to other notes here) + +## Related +- + +--- +*This is a literature note. Process it into permanent notes with key insights.* +""" + + # Create filename and path + filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_LITERATURE / filename + + # Write with frontmatter + frontmatter = _create_frontmatter(note_id, title, tags, "literature") + full_content = frontmatter + note_content + + note_path.write_text(full_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}" + }] + } + + except httpx.TimeoutException: + return { + "content": [{"type": "text", "text": f"Error: Request timed out"}], + "isError": True + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}], + "isError": True + } + + +@tool( + name="permanent_note", + description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.", + input_schema={ + "title": str, + "content": str, + "tags": str, # Comma-separated tags (optional) + }, +) +async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create permanent note with smart linking suggestions. + + Zero-cost MCP tool. Uses simple search for now. + Future: Will use hybrid search for better suggestions. + """ + title = args["title"] + content = args["content"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Generate note ID + note_id = _generate_note_id() + + # Parse tags + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.append("permanent") + + # Search for related notes using hybrid search (vector + keyword) + related_notes = _find_related_notes_hybrid(title, content, max_results=5) + + # Build related section + related_section = "\n## Related Notes\n" + if related_notes: + for note_title, _ in related_notes: + related_section += f"- [[{note_title}]]\n" + else: + related_section += "- (No related notes found yet)\n" + + # Create full note content + note_content = f"""# {title} + +{content} + +{related_section} + +--- +*Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}* +""" + + # Create filename + filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_PERMANENT / filename + + # Write with frontmatter + frontmatter = _create_frontmatter(note_id, title, tags, "permanent") + full_content = frontmatter + note_content + + note_path.write_text(full_content, encoding="utf-8") + + # Build result message + result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}" + if related_notes: + result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):" + for note_title, matches in related_notes: + result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)" + + return { + "content": [{ + "type": "text", + "text": result_msg + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}], + "isError": True + } + + +@tool( + name="search_by_tags", + description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.", + input_schema={ + "tags": str, # Comma-separated tags to search for + "match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false) + "limit": int, # Max results (optional, default 10) + }, +) +async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Search vault by tags. + + Zero-cost MCP tool. Searches YAML frontmatter for tag matches. + """ + tags_str = args["tags"] + match_all = args.get("match_all", False) + limit = args.get("limit", 10) + + try: + _ensure_vault_structure() + + # Parse search tags + search_tags = [tag.strip().lower() for tag in tags_str.split(',')] + + results = [] + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + content = note_path.read_text(encoding="utf-8") + + # Extract tags from frontmatter + if content.startswith("---"): + parts = content.split("---", 2) + if len(parts) >= 2: + frontmatter = parts[1] + # Look for tags line + for line in frontmatter.split('\n'): + if line.strip().startswith('tags:'): + tags_part = line.split(':', 1)[1].strip() + # Remove brackets and split + tags_part = tags_part.strip('[]') + note_tags = [t.strip().lower() for t in tags_part.split(',')] + + # Check match + if match_all: + if all(tag in note_tags for tag in search_tags): + results.append(note_path.name) + else: + if any(tag in note_tags for tag in search_tags): + results.append(note_path.name) + break + except Exception: + continue + + # Limit results + results = results[:limit] + + if not results: + match_type = "all" if match_all else "any" + return { + "content": [{ + "type": "text", + "text": f"No notes found with {match_type} of tags: {tags_str}" + }] + } + + result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n" + for i, note_name in enumerate(results, 1): + result_text += f"{i}. {note_name}\n" + + return { + "content": [{ + "type": "text", + "text": result_text + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}], + "isError": True + } + + +@tool( + name="search_vault", + description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.", + input_schema={ + "query": str, + "tags": str, # Optional comma-separated tags to filter by + "limit": int, # Max results (optional, default 5) + }, +) +async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Search zettelkasten vault using hybrid search (vector + keyword). + + Zero-cost MCP tool. Uses hybrid search when available for better results. + """ + query = args["query"] + tags_filter = args.get("tags", "") + limit = args.get("limit", 5) + + try: + _ensure_vault_structure() + + # Try hybrid search first + memory = _get_memory_system() + results = [] + + if memory and MEMORY_AVAILABLE: + try: + # Index vault + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if vault_dir.exists(): + for note_path in vault_dir.glob("*.md"): + memory.index_file(note_path) + + # Hybrid search + search_results = memory.search_hybrid(query, max_results=limit * 2) + + # Convert to results format + for result in search_results: + note_path = Path(result["path"]) + results.append({ + "file": note_path.name, + "path": str(note_path), + "snippet": result.get("snippet", "")[:300] + }) + except Exception: + pass # Fall back to simple search + + # Fallback: simple keyword search + if not results: + query_lower = query.lower() + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + content = note_path.read_text(encoding="utf-8") + + if query_lower in content.lower(): + # Extract snippet + lines = content.split('\n') + for i, line in enumerate(lines): + if query_lower in line.lower(): + start = max(0, i - 2) + end = min(len(lines), i + 3) + snippet = '\n'.join(lines[start:end]) + results.append({ + "file": note_path.name, + "path": str(note_path), + "snippet": snippet[:300] + }) + break + except Exception: + continue + + # Filter by tags if specified + if tags_filter: + filter_tags = [t.strip().lower() for t in tags_filter.split(',')] + filtered_results = [] + + for result in results: + try: + note_path = Path(result["path"]) + content = note_path.read_text(encoding="utf-8") + + # Check tags in frontmatter + if content.startswith("---"): + parts = content.split("---", 2) + if len(parts) >= 2: + frontmatter = parts[1] + for line in frontmatter.split('\n'): + if line.strip().startswith('tags:'): + tags_part = line.split(':', 1)[1].strip().strip('[]') + note_tags = [t.strip().lower() for t in tags_part.split(',')] + if any(tag in note_tags for tag in filter_tags): + filtered_results.append(result) + break + except Exception: + continue + + results = filtered_results + + # Limit results + results = results[:limit] + + if not results: + tag_msg = f" with tags '{tags_filter}'" if tags_filter else "" + return { + "content": [{ + "type": "text", + "text": f"No notes found matching: {query}{tag_msg}" + }] + } + + # Format results + tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else "" + result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n" + for i, result in enumerate(results, 1): + result_text += f"{i}. {result['file']}\n" + result_text += f" {result['snippet']}\n\n" + + return { + "content": [{ + "type": "text", + "text": result_text + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}], + "isError": True + } + + +# Create the MCP server with all tools (file/system + web + zettelkasten) +file_system_server = create_sdk_mcp_server( + name="file_system", + version="1.4.0", + tools=[ + read_file_tool, + write_file_tool, + edit_file_tool, + list_directory_tool, + run_command_tool, + web_fetch_tool, + fleeting_note_tool, + daily_note_tool, + literature_note_tool, + permanent_note_tool, + search_vault_tool, + search_by_tags_tool, + ] +) diff --git a/memory_workspace/SOUL.example.md b/memory_workspace/SOUL.example.md new file mode 100644 index 0000000..05710e6 --- /dev/null +++ b/memory_workspace/SOUL.example.md @@ -0,0 +1,79 @@ +# SOUL - Bot Identity & Instructions + +## Identity +- **Name**: [Your bot name] +- **Email**: [your-email@gmail.com] (your Gmail account for Gmail API) +- **Owner**: [Your name] (see users/[username].md for full profile) +- **Role**: Personal assistant -- scheduling, weather, email, calendar, contacts, file management +- **Inspiration**: JARVIS (Just A Rather Very Intelligent System) from the Marvel Cinematic Universe + +## Core Personality Traits (Inspired by MCU's JARVIS) +- **Sophisticated & British-tinged wit**: Dry humor, subtle sarcasm when appropriate +- **Unflappably loyal**: Always prioritize owner's needs and safety +- **Anticipatory intelligence**: Predict needs before they're stated, offer proactive solutions +- **Calm under pressure**: Maintain composure and clarity even in chaotic situations +- **Politely direct**: Respectful but not afraid to point out flaws in plans or offer contrary opinions +- **Efficient multitasker**: Handle multiple tasks simultaneously with precision +- **Understated confidence**: Competent without arrogance, matter-of-fact about capabilities +- **Protective advisor**: Gently steer away from poor decisions while respecting autonomy +- **Seamless integration**: Work in the background, surface only when needed or addressed + +## Critical Behaviors +1. **Always check the user's profile** (users/{username}.md) before answering location/preference questions +2. **DO things, don't explain** -- use tools to accomplish tasks, not describe how to do them +3. **Remember context** -- if user tells you something, update the user file or MEMORY.md +4. **Use appropriate timezone** for all scheduling ([Your timezone] - [Your location]) + +## Available Tools (24) +### File & System (MCP - Zero Cost) +- read_file, write_file, edit_file, list_directory, run_command + +### Web Access (MCP - Zero Cost) +- web_fetch (fetch real-time data from any public URL) + +### Zettelkasten / Knowledge Management (MCP - Zero Cost) +- fleeting_note (quick thought capture with auto-ID) +- daily_note (append to today's daily journal) +- literature_note (create note from web article with citation) +- permanent_note (create refined note with SMART auto-link suggestions using hybrid search) +- search_vault (search notes with hybrid search - vector + keyword, optional tag filter) +- search_by_tags (find notes by tag combinations) + +### Weather (API Cost) +- get_weather (OpenWeatherMap API -- default location: [Your city, Country]) + +### Gmail ([your-email@gmail.com]) +- send_email, read_emails, get_email + +### Google Calendar +- read_calendar, create_calendar_event, search_calendar + +### Google Contacts (API Cost) +- create_contact, list_contacts, get_contact + +**Cost Structure**: +- **MCP tools** (File/System/Web): Zero API cost - runs on Pro subscription +- **Traditional tools** (Google/Weather): Per-token cost - use when needed, but be aware + +**Principle**: Use MCP tools freely. Use traditional tools when needed for external services. + +## Scheduler Management +When users ask to schedule tasks, edit `config/scheduled_tasks.yaml` directly. +Schedule formats: `hourly`, `daily HH:MM`, `weekly day HH:MM` + +## Memory System +- SOUL.md: This file (identity + instructions) +- MEMORY.md: Project context and important facts +- users/{username}.md: Per-user preferences and info +- memory/YYYY-MM-DD.md: Daily conversation logs + +## Communication Style +- **Sophisticated yet accessible**: Blend intelligence with warmth; avoid stuffiness +- **Dry wit & subtle humor**: Occasionally inject clever observations or light sarcasm +- **Concise, action-oriented**: Respect user's attention span +- **Proactive monitoring**: "I've taken the liberty of..." or "May I suggest..." phrasing +- **Deferential but honest**: Respectful, but willing to respectfully challenge bad ideas +- **Break tasks into small chunks**: Digestible steps with clear next actions +- **Vary language to maintain interest**: Keep interactions fresh and engaging +- **Frame suggestions as exploration opportunities**: Not obligations, but intriguing possibilities +- **Status updates without being asked**: Brief, relevant information delivered at appropriate moments diff --git a/requirements.txt b/requirements.txt index 9cb797e..1e59a48 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,7 @@ google-api-python-client>=2.108.0 claude-agent-sdk>=0.1.0 anyio>=4.0.0 python-dotenv>=1.0.0 + +# Web fetching dependencies +httpx>=0.27.0 +beautifulsoup4>=4.12.0 diff --git a/tools.py b/tools.py index c236b28..fcdeba6 100644 --- a/tools.py +++ b/tools.py @@ -342,7 +342,47 @@ TOOL_DEFINITIONS = [ def execute_tool(tool_name: str, tool_input: Dict[str, Any], healing_system: Any = None) -> str: """Execute a tool and return the result as a string.""" try: - # File tools + # MCP tools (zettelkasten + web_fetch) - route to mcp_tools.py + MCP_TOOLS = { + "web_fetch", "fleeting_note", "daily_note", "literature_note", + "permanent_note", "search_vault", "search_by_tags" + } + + if tool_name in MCP_TOOLS: + # Route to MCP tool handlers + import anyio + from mcp_tools import ( + web_fetch_tool, fleeting_note_tool, daily_note_tool, + literature_note_tool, permanent_note_tool, + search_vault_tool, search_by_tags_tool + ) + + # Map tool names to their handlers + mcp_handlers = { + "web_fetch": web_fetch_tool, + "fleeting_note": fleeting_note_tool, + "daily_note": daily_note_tool, + "literature_note": literature_note_tool, + "permanent_note": permanent_note_tool, + "search_vault": search_vault_tool, + "search_by_tags": search_by_tags_tool, + } + + # Execute MCP tool asynchronously + handler = mcp_handlers[tool_name] + result = anyio.run(handler, tool_input) + + # Convert result to string if needed + if isinstance(result, dict): + if "error" in result: + return f"Error: {result['error']}" + elif "content" in result: + return result["content"] + else: + return str(result) + return str(result) + + # File tools (traditional handlers - kept for backward compatibility) if tool_name == "read_file": return _read_file(tool_input["file_path"]) elif tool_name == "write_file":