diff --git a/agent.py b/agent.py index 74bd856..646c6cb 100644 --- a/agent.py +++ b/agent.py @@ -1,5 +1,6 @@ """AI Agent with Memory and LLM Integration.""" +import random import threading import time from typing import Any, List, Optional, Callable @@ -87,6 +88,10 @@ class Agent: specialist_prompt=specialist_prompt, ) + # DEFENSIVE: Ensure sub-agent never inherits main event loop + # Sub-agents run in dedicated threads with isolated loops + sub_agent.llm._event_loop = None + # Set agent_id for activity tracking sub_agent.agent_id = agent_id @@ -108,6 +113,13 @@ class Agent: max_retries: int = 1, ) -> str: """Delegate a task to a specialist sub-agent with automatic retry on hang.""" + # Generate unique agent IDs to prevent caching race conditions in parallel delegations + if not agent_id: + agent_id = f"sub_{int(time.time()*1000)}_{random.randint(1000,9999)}" + else: + # Add timestamp to user-provided ID to ensure uniqueness + agent_id = f"{agent_id}_{int(time.time()*1000)}" + for attempt in range(max_retries + 1): if attempt > 0: print(f"[Agent] Retrying {agent_id} (attempt {attempt+1}/{max_retries+1})") diff --git a/llm_interface.py b/llm_interface.py index a62b738..7cfe3a4 100644 --- a/llm_interface.py +++ b/llm_interface.py @@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set import requests from anthropic import Anthropic +from claude_agent_sdk import TextBlock, ToolUseBlock from usage_tracker import UsageTracker logger = logging.getLogger(__name__) @@ -607,12 +608,13 @@ class LLMInterface: assistant_messages.append(message.content) elif isinstance(message.content, list): for block in message.content: - if hasattr(block, 'type'): - if block.type == 'text' and hasattr(block, 'text'): - assistant_messages.append(block.text) - elif block.type == 'tool_use' and hasattr(block, 'name'): - tool_names.append(block.name) - self._last_tool_names = tool_names.copy() + # Use isinstance() checks instead of hasattr(block, 'type') + # ToolUseBlock dataclass has no .type attribute + if isinstance(block, TextBlock): + assistant_messages.append(block.text) + elif isinstance(block, ToolUseBlock): + tool_names.append(block.name) + self._last_tool_names = tool_names.copy() if isinstance(message, ResultMessage): # DEBUG: Log what we captured during message processing diff --git a/mcp_tools.py b/mcp_tools.py index 99bbfae..992456d 100644 --- a/mcp_tools.py +++ b/mcp_tools.py @@ -1,2020 +1,4074 @@ """MCP Tools - In-process tools using Claude Agent SDK. + + These tools run directly in the Python process for better performance + compared to the traditional tool execution flow. File and system tools + are ideal candidates for MCP since they don't require external APIs. + """ + + from pathlib import Path + import subprocess + from typing import Any, Dict, List, Optional + from urllib.parse import urlparse + from datetime import datetime + import threading + from claude_agent_sdk import tool, create_sdk_mcp_server + import httpx + from bs4 import BeautifulSoup + + # Import memory system for hybrid search + try: + from memory_system import MemorySystem + MEMORY_AVAILABLE = True + except ImportError: + MEMORY_AVAILABLE = False + + # Import agent registry for delegate_task tool + try: + from agent_registry import get_agent + AGENT_REGISTRY_AVAILABLE = True + except ImportError: + AGENT_REGISTRY_AVAILABLE = False + + + # Maximum characters of tool output to return (prevents token explosion) + _MAX_TOOL_OUTPUT = 5000 # Restored for complex diagram generation + + # Maximum page size for web fetching (500KB) + _MAX_WEB_PAGE_SIZE = 500_000 + + # Maximum text content from web page (10,000 chars ≈ 2,500 tokens) + _MAX_WEB_TEXT = 10_000 + + +# Delegation settings + +_DELEGATE_TIMEOUT = 600 # 10 minutes max per sub-agent + +_MAX_CONCURRENT_DELEGATES = 4 # Prevent unbounded thread creation + +_delegate_semaphore = threading.Semaphore(_MAX_CONCURRENT_DELEGATES) + + + # Zettelkasten vault paths + _VAULT_ROOT = Path("memory_workspace/obsidian") + _VAULT_FLEETING = _VAULT_ROOT / "fleeting" + _VAULT_DAILY = _VAULT_ROOT / "daily" + _VAULT_PERMANENT = _VAULT_ROOT / "permanent" + _VAULT_LITERATURE = _VAULT_ROOT / "literature" + + + def _generate_note_id() -> str: + """Generate unique timestamp-based note ID (YYYYMMDDHHmmss).""" + return datetime.now().strftime("%Y%m%d%H%M%S") + + + def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str: + """Create YAML front matter for zettelkasten note.""" + now = datetime.now() + tags_str = ", ".join(tags) if tags else "" + + return f"""--- + id: {note_id} + title: {title} + created: {now.strftime("%Y-%m-%dT%H:%M:%S")} + modified: {now.strftime("%Y-%m-%dT%H:%M:%S")} + type: {note_type} + tags: [{tags_str}] + --- + + """ + + + def _ensure_vault_structure(): + """Ensure zettelkasten vault directories exist.""" + for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + dir_path.mkdir(parents=True, exist_ok=True) + + + def _get_memory_system() -> Optional['MemorySystem']: + """Get memory system instance for hybrid search.""" + if not MEMORY_AVAILABLE: + return None + + try: + # Initialize memory system pointing to obsidian vault + memory = MemorySystem(workspace_dir=_VAULT_ROOT) + return memory + except Exception: + return None + + + def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]: + """Find related notes using hybrid search (vector + keyword). + + Returns list of (note_title, score) tuples. + """ + memory = _get_memory_system() + + if memory: + # Use hybrid search for better results + try: + # Index the vault if needed + for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if vault_dir.exists(): + for note_path in vault_dir.glob("*.md"): + memory.index_file(note_path) + + # Search with content + title as query + query = f"{title} {content[:500]}" # Limit content to first 500 chars + results = memory.search_hybrid(query, max_results=max_results) + + # Convert results to (title, score) tuples + related = [] + for result in results: + note_path = Path(result["path"]) + note_title = note_path.stem + if ' - ' in note_title: + note_title = note_title.split(' - ', 1)[1] + + # Use snippet match percentage as score + score = result.get("snippet", "").count("**") // 2 # Count of matches + related.append((note_title, score)) + + return related + except Exception: + pass # Fall back to keyword search + + # Fallback: keyword search + related_notes = [] + search_terms = title.lower().split() + content.lower().split()[:20] + search_terms = [term for term in search_terms if len(term) > 4] + + for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + note_content = note_path.read_text(encoding="utf-8").lower() + matches = sum(1 for term in search_terms if term in note_content) + if matches >= 2: + note_title = note_path.stem + if ' - ' in note_title: + note_title = note_title.split(' - ', 1)[1] + related_notes.append((note_title, matches)) + except Exception: + continue + + related_notes.sort(key=lambda x: x[1], reverse=True) + return related_notes[:max_results] + + + def _is_safe_url(url: str) -> bool: + """Validate URL safety - blocks localhost, private IPs, and file:// URLs.""" + try: + parsed = urlparse(url) + + # Must be HTTP/HTTPS + if parsed.scheme not in ['http', 'https']: + return False + + # Must have a hostname + if not parsed.hostname: + return False + + # Block localhost and loopback IPs + blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1'] + if parsed.hostname in blocked_hosts: + return False + + # Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x) + if parsed.hostname: + parts = parsed.hostname.split('.') + if len(parts) == 4 and parts[0].isdigit(): + first_octet = int(parts[0]) + # Check for private ranges + if first_octet == 10: # 10.0.0.0/8 + return False + if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16 + return False + if first_octet == 172 and len(parts) > 1 and parts[1].isdigit(): + second_octet = int(parts[1]) + if 16 <= second_octet <= 31: # 172.16.0.0/12 + return False + + return True + + except Exception: + return False + + + @tool( + name="read_file", + description="Read the contents of a file. Use this to view configuration files, code, or any text file.", + input_schema={ + "file_path": str, + }, + ) + async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Read and return file contents with auto-retry for PDFs.""" + file_path = args["file_path"] + path = Path(file_path) + + if not path.exists(): + return { + "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], + "isError": True + } + + # Check if it's a PDF + is_pdf = path.suffix.lower() == ".pdf" + + if is_pdf: + # Try reading PDF with multiple methods + for attempt, method in enumerate(["pypdf", "pdfplumber", "pdfminer"], 1): + try: + if method == "pypdf": + try: + from pypdf import PdfReader + except ImportError: + continue # Try next method + + reader = PdfReader(path) + + # Check if actually password-protected + if reader.is_encrypted: + # Try with empty password first (some PDFs are "encrypted" with no password) + try: + reader.decrypt("") + except Exception: + return { + "content": [{"type": "text", "text": f"PDF is password-protected and cannot be read without the password."}], + "isError": True + } + + # Extract text from all pages with early truncation + text_parts = [] + total_length = 0 + truncated = False + + for i, page in enumerate(reader.pages, 1): + page_text = page.extract_text() + if page_text.strip(): + page_section = f"--- Page {i} ---\n{page_text}" + + # Check if adding this page would exceed limit + if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT: # +2 for "\n\n" + # Add partial page if there's room + remaining = _MAX_TOOL_OUTPUT - total_length - 2 + if remaining > 100: # Only add if we can fit meaningful content + text_parts.append(page_section[:remaining]) + truncated = True + break + + text_parts.append(page_section) + total_length += len(page_section) + 2 + + content = "\n\n".join(text_parts) + if truncated: + content += f"\n... (PDF truncated - showing first {len(text_parts)} of {len(reader.pages)} pages)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path} ({len(reader.pages)} pages):\n\n{content}"}] + } + + elif method == "pdfplumber": + try: + import pdfplumber + except ImportError: + continue + + with pdfplumber.open(path) as pdf: + text_parts = [] + total_length = 0 + truncated = False + total_pages = len(pdf.pages) + + for i, page in enumerate(pdf.pages, 1): + page_text = page.extract_text() + if page_text and page_text.strip(): + page_section = f"--- Page {i} ---\n{page_text}" + + # Check if adding this page would exceed limit + if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT: + remaining = _MAX_TOOL_OUTPUT - total_length - 2 + if remaining > 100: + text_parts.append(page_section[:remaining]) + truncated = True + break + + text_parts.append(page_section) + total_length += len(page_section) + 2 + + content = "\n\n".join(text_parts) + if truncated: + content += f"\n... (PDF truncated - showing first {len(text_parts)} of {total_pages} pages)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path} ({total_pages} pages):\n\n{content}"}] + } + + elif method == "pdfminer": + try: + from pdfminer.high_level import extract_text as pdfminer_extract + except ImportError: + continue + + content = pdfminer_extract(path) + if len(content) > _MAX_TOOL_OUTPUT: + content = content[:_MAX_TOOL_OUTPUT] + "\n... (PDF truncated)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] + } + + except Exception as e: + # If this is the last attempt, return the error + if attempt == 3: + error_msg = str(e).lower() + if "password" in error_msg or "encrypted" in error_msg: + return { + "content": [{"type": "text", "text": f"PDF appears to be password-protected: {str(e)}"}], + "isError": True + } + else: + return { + "content": [{"type": "text", "text": f"Error reading PDF after trying multiple methods: {str(e)}. The PDF might be corrupted or use an unsupported format."}], + "isError": True + } + # Otherwise, continue to next method + continue + + # If we get here, no PDF library is installed + return { + "content": [{"type": "text", "text": f"Cannot read PDF: No PDF library installed. Install with: pip install pypdf pdfplumber"}], + "isError": True + } + + # Non-PDF files: try reading as text + try: + content = path.read_text(encoding="utf-8") + if len(content) > _MAX_TOOL_OUTPUT: + content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] + } + except UnicodeDecodeError: + # Binary file that's not a PDF + return { + "content": [{"type": "text", "text": f"Error: {file_path} appears to be a binary file. Only text files and PDFs are supported."}], + "isError": True + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error reading file: {str(e)}"}], + "isError": True + } + + + @tool( + name="write_file", + description="Write content to a file. Creates a new file or overwrites existing file completely.", + input_schema={ + "file_path": str, + "content": str, + }, + ) + async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Write content to a file.""" + file_path = args["file_path"] + content = args["content"] + path = Path(file_path) + + try: + # Create parent directories if they don't exist + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Successfully wrote to {file_path} ({len(content)} characters)" + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error writing file: {str(e)}"}], + "isError": True + } + + + @tool( + name="edit_file", + description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.", + input_schema={ + "file_path": str, + "old_text": str, + "new_text": str, + }, + ) + async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Edit file by replacing text.""" + file_path = args["file_path"] + old_text = args["old_text"] + new_text = args["new_text"] + path = Path(file_path) + + if not path.exists(): + return { + "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], + "isError": True + } + + try: + content = path.read_text(encoding="utf-8") + + if old_text not in content: + return { + "content": [{ + "type": "text", + "text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..." + }], + "isError": True + } + + new_content = content.replace(old_text, new_text, 1) + path.write_text(new_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Successfully edited {file_path}. Replaced 1 occurrence." + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error editing file: {str(e)}"}], + "isError": True + } + + + @tool( + name="list_directory", + description="List files and directories in a given path. Useful for exploring the file system.", + input_schema={ + "path": str, + }, + ) + async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """List directory contents.""" + dir_path = Path(args.get("path", ".")) + + if not dir_path.exists(): + return { + "content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}], + "isError": True + } + + if not dir_path.is_dir(): + return { + "content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}], + "isError": True + } + + try: + items = [] + for item in sorted(dir_path.iterdir()): + item_type = "DIR " if item.is_dir() else "FILE" + size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)" + items.append(f" {item_type} {item.name}{size}") + + if not items: + return { + "content": [{"type": "text", "text": f"Directory {dir_path} is empty"}] + } + + return { + "content": [{ + "type": "text", + "text": f"Contents of {dir_path}:\n" + "\n".join(items) + }] + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}], + "isError": True + } + + + @tool( + name="run_command", + description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.", + input_schema={ + "command": str, + "working_dir": str, + }, + ) + async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Execute a shell command.""" + command = args["command"] + working_dir = args.get("working_dir", ".") + + try: + result = subprocess.run( + command, + shell=True, + cwd=working_dir, + capture_output=True, + text=True, + timeout=30, + ) + + output = [] + if result.stdout: + stdout = result.stdout + if len(stdout) > _MAX_TOOL_OUTPUT: + stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)" + output.append(f"STDOUT:\n{stdout}") + if result.stderr: + stderr = result.stderr + if len(stderr) > _MAX_TOOL_OUTPUT: + stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)" + output.append(f"STDERR:\n{stderr}") + + status = f"Command exited with code {result.returncode}" + if not output: + text = status + else: + text = status + "\n\n" + "\n\n".join(output) + + return { + "content": [{"type": "text", "text": text}], + "isError": result.returncode != 0 + } + except subprocess.TimeoutExpired: + return { + "content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}], + "isError": True + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error running command: {str(e)}"}], + "isError": True + } + + + @tool( + name="web_fetch", + description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.", + input_schema={ + "url": str, + }, + ) + async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Fetch webpage and return parsed text content. + + This is a zero-cost MCP tool - it fetches the HTML and converts to text, + then returns it to the main Agent SDK query for processing (no extra API cost). + """ + url = args["url"] + + # Security: Validate URL + if not _is_safe_url(url): + return { + "content": [{ + "type": "text", + "text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed." + }], + "isError": True + } + + try: + # Fetch page with timeout and size limit + headers = { + "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" + } + async with httpx.AsyncClient( + timeout=10.0, + follow_redirects=True, + limits=httpx.Limits(max_connections=5), + headers=headers + ) as client: + response = await client.get(url) + response.raise_for_status() + + # Check size limit + if len(response.content) > _MAX_WEB_PAGE_SIZE: + return { + "content": [{ + "type": "text", + "text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})" + }], + "isError": True + } + + # Parse HTML to text + soup = BeautifulSoup(response.content, 'html.parser') + + # Remove unwanted elements + for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): + element.decompose() + + # Extract text + text = soup.get_text(separator='\n', strip=True) + + # Clean up excessive whitespace + lines = [line.strip() for line in text.split('\n') if line.strip()] + text = '\n'.join(lines) + + # Truncate if needed + if len(text) > _MAX_WEB_TEXT: + text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)" + + # Get title if available + title = soup.title.string if soup.title else "No title" + + return { + "content": [{ + "type": "text", + "text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}" + }] + } + + except httpx.TimeoutException: + return { + "content": [{ + "type": "text", + "text": f"Error: Request to {url} timed out after 10 seconds" + }], + "isError": True + } + except httpx.HTTPStatusError as e: + return { + "content": [{ + "type": "text", + "text": f"Error: HTTP {e.response.status_code} - {url}" + }], + "isError": True + } + except Exception as e: + return { + "content": [{ + "type": "text", + "text": f"Error fetching webpage: {str(e)}" + }], + "isError": True + } + + + @tool( + name="fleeting_note", + description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.", + input_schema={ + "content": str, + "tags": str, # Comma-separated tags (optional) + }, + ) + async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create a fleeting note (quick capture) in the zettelkasten vault. + + Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes. + """ + content = args["content"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Generate note ID and extract title from first line + note_id = _generate_note_id() + lines = content.split('\n', 1) + title = lines[0][:50] if lines else "Quick Note" + + # Parse tags + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.append("fleeting") # Always tag as fleeting + + # Create note file + filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_FLEETING / filename + + # Write note with front matter + frontmatter = _create_frontmatter(note_id, title, tags, "fleeting") + full_content = frontmatter + content + + note_path.write_text(full_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}" + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}], + "isError": True + } + + + @tool( + name="daily_note", + description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.", + input_schema={ + "entry": str, + }, + ) + async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Append timestamped entry to today's daily note. + + Zero-cost MCP tool for daily journaling and activity logging. + """ + entry = args["entry"] + + try: + _ensure_vault_structure() + + # Get today's date + now = datetime.now() + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M") + + # Daily note path + note_path = _VAULT_DAILY / f"{date_str}.md" + + # Create or update daily note + if note_path.exists(): + # Append to existing note + current_content = note_path.read_text(encoding="utf-8") + new_entry = f"\n## {time_str}\n{entry}\n" + updated_content = current_content + new_entry + else: + # Create new daily note with front matter + frontmatter = f"""--- + date: {date_str} + type: daily + tags: [daily-note] + --- + + # {date_str} + + ## {time_str} + {entry} + """ + updated_content = frontmatter + + note_path.write_text(updated_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}" + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}], + "isError": True + } + + + @tool( + name="literature_note", + description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.", + input_schema={ + "url": str, + "tags": str, # Comma-separated tags (optional) + }, + ) + async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create literature note from web article. + + Zero-cost MCP tool. Combines web_fetch with note creation. + """ + url = args["url"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Fetch article content using web_fetch logic + if not _is_safe_url(url): + return { + "content": [{ + "type": "text", + "text": f"Error: Blocked unsafe URL - {url}" + }], + "isError": True + } + + # Fetch the article + headers = { + "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" + } + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: + response = await client.get(url) + response.raise_for_status() + + if len(response.content) > _MAX_WEB_PAGE_SIZE: + return { + "content": [{ + "type": "text", + "text": f"Error: Page too large" + }], + "isError": True + } + + # Parse content + soup = BeautifulSoup(response.content, 'html.parser') + for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): + element.decompose() + + text = soup.get_text(separator='\n', strip=True) + lines = [line.strip() for line in text.split('\n') if line.strip()] + text = '\n'.join(lines)[:_MAX_WEB_TEXT] + + title = soup.title.string if soup.title else "Untitled Article" + + # Generate note + note_id = _generate_note_id() + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.extend(["literature", "web-article"]) + + # Create note content + note_content = f"""# {title} + + **Source**: {url} + **Captured**: {datetime.now().strftime("%Y-%m-%d")} + + ## Content + + {text} + + ## Notes + + (Add your thoughts, key takeaways, and connections to other notes here) + + ## Related + - + + --- + *This is a literature note. Process it into permanent notes with key insights.* + """ + + # Create filename and path + filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_LITERATURE / filename + + # Write with frontmatter + frontmatter = _create_frontmatter(note_id, title, tags, "literature") + full_content = frontmatter + note_content + + note_path.write_text(full_content, encoding="utf-8") + + return { + "content": [{ + "type": "text", + "text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}" + }] + } + + except httpx.TimeoutException: + return { + "content": [{"type": "text", "text": f"Error: Request timed out"}], + "isError": True + } + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}], + "isError": True + } + + + @tool( + name="permanent_note", + description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.", + input_schema={ + "title": str, + "content": str, + "tags": str, # Comma-separated tags (optional) + }, + ) + async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Create permanent note with smart linking suggestions. + + Zero-cost MCP tool. Uses simple search for now. + Future: Will use hybrid search for better suggestions. + """ + title = args["title"] + content = args["content"] + tags_str = args.get("tags", "") + + try: + _ensure_vault_structure() + + # Generate note ID + note_id = _generate_note_id() + + # Parse tags + tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] + tags.append("permanent") + + # Search for related notes using hybrid search (vector + keyword) + related_notes = _find_related_notes_hybrid(title, content, max_results=5) + + # Build related section + related_section = "\n## Related Notes\n" + if related_notes: + for note_title, _ in related_notes: + related_section += f"- [[{note_title}]]\n" + else: + related_section += "- (No related notes found yet)\n" + + # Create full note content + note_content = f"""# {title} + + {content} + + {related_section} + + --- + *Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}* + """ + + # Create filename + filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" + note_path = _VAULT_PERMANENT / filename + + # Write with frontmatter + frontmatter = _create_frontmatter(note_id, title, tags, "permanent") + full_content = frontmatter + note_content + + note_path.write_text(full_content, encoding="utf-8") + + # Build result message + result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}" + if related_notes: + result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):" + for note_title, matches in related_notes: + result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)" + + return { + "content": [{ + "type": "text", + "text": result_msg + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}], + "isError": True + } + + + @tool( + name="search_by_tags", + description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.", + input_schema={ + "tags": str, # Comma-separated tags to search for + "match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false) + "limit": int, # Max results (optional, default 10) + }, + ) + async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Search vault by tags. + + Zero-cost MCP tool. Searches YAML frontmatter for tag matches. + """ + tags_str = args["tags"] + match_all = args.get("match_all", False) + limit = args.get("limit", 10) + + try: + _ensure_vault_structure() + + # Parse search tags + search_tags = [tag.strip().lower() for tag in tags_str.split(',')] + + results = [] + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + content = note_path.read_text(encoding="utf-8") + + # Extract tags from frontmatter + if content.startswith("---"): + parts = content.split("---", 2) + if len(parts) >= 2: + frontmatter = parts[1] + # Look for tags line + for line in frontmatter.split('\n'): + if line.strip().startswith('tags:'): + tags_part = line.split(':', 1)[1].strip() + # Remove brackets and split + tags_part = tags_part.strip('[]') + note_tags = [t.strip().lower() for t in tags_part.split(',')] + + # Check match + if match_all: + if all(tag in note_tags for tag in search_tags): + results.append(note_path.name) + else: + if any(tag in note_tags for tag in search_tags): + results.append(note_path.name) + break + except Exception: + continue + + # Limit results + results = results[:limit] + + if not results: + match_type = "all" if match_all else "any" + return { + "content": [{ + "type": "text", + "text": f"No notes found with {match_type} of tags: {tags_str}" + }] + } + + result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n" + for i, note_name in enumerate(results, 1): + result_text += f"{i}. {note_name}\n" + + return { + "content": [{ + "type": "text", + "text": result_text + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}], + "isError": True + } + + + @tool( + name="search_vault", + description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.", + input_schema={ + "query": str, + "tags": str, # Optional comma-separated tags to filter by + "limit": int, # Max results (optional, default 5) + }, + ) + async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Search zettelkasten vault using hybrid search (vector + keyword). + + Zero-cost MCP tool. Uses hybrid search when available for better results. + """ + query = args["query"] + tags_filter = args.get("tags", "") + limit = args.get("limit", 5) + + try: + _ensure_vault_structure() + + # Try hybrid search first + memory = _get_memory_system() + results = [] + + if memory and MEMORY_AVAILABLE: + try: + # Index vault + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if vault_dir.exists(): + for note_path in vault_dir.glob("*.md"): + memory.index_file(note_path) + + # Hybrid search + search_results = memory.search_hybrid(query, max_results=limit * 2) + + # Convert to results format + for result in search_results: + note_path = Path(result["path"]) + results.append({ + "file": note_path.name, + "path": str(note_path), + "snippet": result.get("snippet", "")[:300] + }) + except Exception: + pass # Fall back to simple search + + # Fallback: simple keyword search + if not results: + query_lower = query.lower() + for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: + if not vault_dir.exists(): + continue + + for note_path in vault_dir.glob("*.md"): + try: + content = note_path.read_text(encoding="utf-8") + + if query_lower in content.lower(): + # Extract snippet + lines = content.split('\n') + for i, line in enumerate(lines): + if query_lower in line.lower(): + start = max(0, i - 2) + end = min(len(lines), i + 3) + snippet = '\n'.join(lines[start:end]) + results.append({ + "file": note_path.name, + "path": str(note_path), + "snippet": snippet[:300] + }) + break + except Exception: + continue + + # Filter by tags if specified + if tags_filter: + filter_tags = [t.strip().lower() for t in tags_filter.split(',')] + filtered_results = [] + + for result in results: + try: + note_path = Path(result["path"]) + content = note_path.read_text(encoding="utf-8") + + # Check tags in frontmatter + if content.startswith("---"): + parts = content.split("---", 2) + if len(parts) >= 2: + frontmatter = parts[1] + for line in frontmatter.split('\n'): + if line.strip().startswith('tags:'): + tags_part = line.split(':', 1)[1].strip().strip('[]') + note_tags = [t.strip().lower() for t in tags_part.split(',')] + if any(tag in note_tags for tag in filter_tags): + filtered_results.append(result) + break + except Exception: + continue + + results = filtered_results + + # Limit results + results = results[:limit] + + if not results: + tag_msg = f" with tags '{tags_filter}'" if tags_filter else "" + return { + "content": [{ + "type": "text", + "text": f"No notes found matching: {query}{tag_msg}" + }] + } + + # Format results + tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else "" + result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n" + for i, result in enumerate(results, 1): + result_text += f"{i}. {result['file']}\n" + result_text += f" {result['snippet']}\n\n" + + return { + "content": [{ + "type": "text", + "text": result_text + }] + } + + except Exception as e: + return { + "content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}], + "isError": True + } + + + # ============================================ + # Google and Weather Tools (MCP Migration) + # ============================================ + # Lazy-loaded Google clients + _gmail_client: Optional[Any] = None + _calendar_client: Optional[Any] = None + _people_client: Optional[Any] = None + + + def _initialize_google_clients(): + """Lazy-load Google API clients when needed.""" + global _gmail_client, _calendar_client, _people_client + + if _gmail_client is not None: + return _gmail_client, _calendar_client, _people_client + + try: + from google_tools.gmail_client import GmailClient + from google_tools.calendar_client import CalendarClient + from google_tools.people_client import PeopleClient + from google_tools.oauth_manager import GoogleOAuthManager + + oauth_manager = GoogleOAuthManager() + credentials = oauth_manager.get_credentials() + + if not credentials: + return None, None, None + + _gmail_client = GmailClient(oauth_manager) + _calendar_client = CalendarClient(oauth_manager) + _people_client = PeopleClient(oauth_manager) + + return _gmail_client, _calendar_client, _people_client + except Exception as e: + print(f"[MCP Google] Failed to initialize: {e}") + return None, None, None + + + @tool( + name="get_weather", + description="Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and description.", + input_schema={"location": str}, + ) + async def get_weather(args: Dict[str, Any]) -> Dict[str, Any]: + """Get current weather for a location using OpenWeatherMap API.""" + location = args.get("location", "Phoenix, US") + import os + import requests + + api_key = os.getenv("OPENWEATHERMAP_API_KEY") + if not api_key: + return { + "content": [{ + "type": "text", + "text": "Error: OPENWEATHERMAP_API_KEY not found in environment variables" + }], + "isError": True + } + + try: + base_url = "http://api.openweathermap.org/data/2.5/weather" + params = { + "q": location, + "appid": api_key, + "units": "imperial" + } + + response = requests.get(base_url, params=params, timeout=10) + response.raise_for_status() + data = response.json() + + temp = data["main"]["temp"] + feels_like = data["main"]["feels_like"] + humidity = data["main"]["humidity"] + conditions = data["weather"][0]["main"] + description = data["weather"][0]["description"] + city_name = data["name"] + + summary = ( + f"Weather in {city_name}:\n" + f"Temperature: {temp}°F (feels like {feels_like}°F)\n" + f"Conditions: {conditions} - {description}\n" + f"Humidity: {humidity}%" + ) + + return { + "content": [{"type": "text", "text": summary}] + } + + except Exception as e: + return { + "content": [{ + "type": "text", + "text": f"Error getting weather: {str(e)}" + }], + "isError": True + } + + + @tool( + name="send_email", + description="Send an email via Gmail API. Requires prior OAuth setup (--setup-google).", + input_schema={"to": str, "subject": str, "body": str, "cc": str, "reply_to_message_id": str}, + ) + async def send_email(args: Dict[str, Any]) -> Dict[str, Any]: + """Send an email via Gmail API.""" + to = args["to"] + subject = args["subject"] + body = args["body"] + cc = args.get("cc") + reply_to_message_id = args.get("reply_to_message_id") + + gmail_client, _, _ = _initialize_google_clients() + + if not gmail_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = gmail_client.send_email( + to=to, subject=subject, body=body, cc=cc, + reply_to_message_id=reply_to_message_id, + ) + + if result["success"]: + msg_id = result.get("message_id", "unknown") + text = f"Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}" + else: + text = f"Error sending email: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="read_emails", + description="Search and read emails from Gmail using Gmail query syntax (e.g., 'from:user@example.com after:2026/02/10').", + input_schema={"query": str, "max_results": int, "include_body": bool}, + ) + async def read_emails(args: Dict[str, Any]) -> Dict[str, Any]: + """Search and read emails from Gmail.""" + query = args.get("query", "") + max_results = args.get("max_results", 10) + include_body = args.get("include_body", False) + + gmail_client, _, _ = _initialize_google_clients() + + if not gmail_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = gmail_client.search_emails(query=query, max_results=max_results, include_body=include_body) + + if result["success"]: + summary = result.get("summary", "") + if len(summary) > _MAX_TOOL_OUTPUT: + summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" + else: + summary = f"Error reading emails: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": summary}], "isError": not result["success"]} + + + @tool( + name="get_email", + description="Get full content of a specific email by its Gmail message ID.", + input_schema={"message_id": str, "format_type": str}, + ) + async def get_email(args: Dict[str, Any]) -> Dict[str, Any]: + """Get full content of a specific email.""" + message_id = args["message_id"] + format_type = args.get("format_type", "text") + + gmail_client, _, _ = _initialize_google_clients() + + if not gmail_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = gmail_client.get_email(message_id=message_id, format_type=format_type) + + if result["success"]: + email_data = result.get("email", {}) + text = ( + f"From: {email_data.get('from', 'Unknown')}\n" + f"To: {email_data.get('to', 'Unknown')}\n" + f"Subject: {email_data.get('subject', 'No subject')}\n" + f"Date: {email_data.get('date', 'Unknown')}\n\n" + f"{email_data.get('body', 'No content')}" + ) + if len(text) > _MAX_TOOL_OUTPUT: + text = text[:_MAX_TOOL_OUTPUT] + "\n... (content truncated)" + else: + text = f"Error getting email: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="read_calendar", + description="Read upcoming events from Google Calendar. Shows events from today onwards.", + input_schema={"days_ahead": int, "calendar_id": str, "max_results": int}, + ) + async def read_calendar(args: Dict[str, Any]) -> Dict[str, Any]: + """Read upcoming calendar events.""" + days_ahead = args.get("days_ahead", 7) + calendar_id = args.get("calendar_id", "primary") + max_results = args.get("max_results", 20) + + _, calendar_client, _ = _initialize_google_clients() + + if not calendar_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = calendar_client.list_events( + days_ahead=days_ahead, calendar_id=calendar_id, max_results=max_results, + ) + + if result["success"]: + summary = result.get("summary", "No events found") + if len(summary) > _MAX_TOOL_OUTPUT: + summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" + text = f"Upcoming events (next {days_ahead} days):\n\n{summary}" + else: + text = f"Error reading calendar: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="create_calendar_event", + description="Create a new event in Google Calendar. Use ISO 8601 format for times.", + input_schema={ + "summary": str, "start_time": str, "end_time": str, + "description": str, "location": str, "calendar_id": str, + }, + ) + async def create_calendar_event(args: Dict[str, Any]) -> Dict[str, Any]: + """Create a new calendar event.""" + summary = args["summary"] + start_time = args["start_time"] + end_time = args["end_time"] + description = args.get("description", "") + location = args.get("location", "") + calendar_id = args.get("calendar_id", "primary") + + _, calendar_client, _ = _initialize_google_clients() + + if not calendar_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = calendar_client.create_event( + summary=summary, start_time=start_time, end_time=end_time, + description=description, location=location, calendar_id=calendar_id, + ) + + if result["success"]: + event_id = result.get("event_id", "unknown") + html_link = result.get("html_link", "") + start = result.get("start", start_time) + text = ( + f"Calendar event created successfully!\n" + f"Title: {summary}\nStart: {start}\n" + f"Event ID: {event_id}\nLink: {html_link}" + ) + else: + text = f"Error creating calendar event: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="search_calendar", + description="Search calendar events by text query. Searches event titles and descriptions.", + input_schema={"query": str, "calendar_id": str}, + ) + async def search_calendar(args: Dict[str, Any]) -> Dict[str, Any]: + """Search calendar events by text query.""" + query = args["query"] + calendar_id = args.get("calendar_id", "primary") + + _, calendar_client, _ = _initialize_google_clients() + + if not calendar_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = calendar_client.search_events(query=query, calendar_id=calendar_id) + + if result["success"]: + summary = result.get("summary", "No events found") + if len(summary) > _MAX_TOOL_OUTPUT: + summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" + text = f"Calendar search results for '{query}':\n\n{summary}" + else: + text = f"Error searching calendar: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + # ============================================ + # Contacts Tools (MCP) + # ============================================ + + + @tool( + name="create_contact", + description="Create a new Google contact. Requires prior OAuth setup (--setup-google).", + input_schema={"given_name": str, "family_name": str, "email": str, "phone": str, "notes": str}, + ) + async def create_contact(args: Dict[str, Any]) -> Dict[str, Any]: + """Create a new Google contact.""" + given_name = args["given_name"] + family_name = args.get("family_name", "") + email = args.get("email", "") + phone = args.get("phone") + notes = args.get("notes") + + _, _, people_client = _initialize_google_clients() + + if not people_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = people_client.create_contact( + given_name=given_name, family_name=family_name, + email=email, phone=phone, notes=notes, + ) + + if result["success"]: + name = result.get("name", given_name) + resource = result.get("resource_name", "") + text = f"Contact created: {name}\nResource: {resource}" + else: + text = f"Error creating contact: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="list_contacts", + description="List or search Google contacts. Without a query, lists all contacts sorted by last name.", + input_schema={"max_results": int, "query": str}, + ) + async def list_contacts(args: Dict[str, Any]) -> Dict[str, Any]: + """List or search Google contacts.""" + max_results = args.get("max_results", 100) + query = args.get("query") + + _, _, people_client = _initialize_google_clients() + + if not people_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = people_client.list_contacts(max_results=max_results, query=query) + + if result["success"]: + summary = result.get("summary", "No contacts found.") + if len(summary) > _MAX_TOOL_OUTPUT: + summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" + text = f"Contacts ({result.get('count', 0)} found):\n\n{summary}" + else: + text = f"Error listing contacts: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} + + + @tool( + name="get_contact", + description="Get full details of a specific Google contact by resource name.", + input_schema={"resource_name": str}, + ) + async def get_contact(args: Dict[str, Any]) -> Dict[str, Any]: + """Get full details of a specific Google contact.""" + resource_name = args["resource_name"] + + _, _, people_client = _initialize_google_clients() + + if not people_client: + return { + "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], + "isError": True + } + + result = people_client.get_contact(resource_name=resource_name) + + if result["success"]: + c = result.get("contact", {}) + output = [] + name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip() + output.append(f"Name: {name or '(no name)'}") + if c.get("email"): + output.append(f"Email: {c['email']}") + if c.get("phone"): + output.append(f"Phone: {c['phone']}") + if c.get("notes"): + output.append(f"Notes: {c['notes']}") + output.append(f"Resource: {c.get('resource_name', resource_name)}") + text = "\n".join(output) + else: + text = f"Error getting contact: {result.get('error', 'Unknown error')}" + + return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} -# ============================================ -# Gitea Tools (MCP) - Private repo access + + + # ============================================ +# Gitea Tools (MCP) - Private repo access + +# ============================================ + + + # Lazy-loaded Gitea client + _gitea_client: Optional[Any] = None + + + def _get_gitea_client(): + """Lazy-load Gitea client when first needed.""" + global _gitea_client + + if _gitea_client is not None: + return _gitea_client + + try: + from gitea_tools.client import get_gitea_client + _gitea_client = get_gitea_client() + return _gitea_client + except Exception as e: + print(f"[MCP Gitea] Failed to initialize: {e}") + return None + + + @tool( + name="gitea_read_file", + description="Read a file from a Gitea repository. Use this to access files from Jordan's homelab repo or any configured Gitea repo. Returns the file content as text.", + input_schema={ + "file_path": str, + "repo": str, + "branch": str, + }, + ) + async def gitea_read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Read a file from a Gitea repository. + + Zero-cost MCP tool for accessing private Gitea repos. + """ + file_path = args.get("file_path", "") + repo = args.get("repo") + branch = args.get("branch") + + if not file_path: + return { + "content": [{"type": "text", "text": "Error: file_path is required"}], + "isError": True, + } + + client = _get_gitea_client() + if not client: + return { + "content": [{ + "type": "text", + "text": ( + "Error: Gitea not configured. " + "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " + "and add your Personal Access Token." + ), + }], + "isError": True, + } + + # Parse owner/repo if provided + owner = None + if repo and "/" in repo: + parts = repo.split("/", 1) + owner = parts[0] + repo = parts[1] + + result = await client.get_file_content( + file_path=file_path, + owner=owner, + repo=repo, + branch=branch, + ) + + if result["success"]: + content = result["content"] + metadata = result.get("metadata", {}) + path_info = metadata.get("path", file_path) + size = metadata.get("size", 0) + + header = f"File: {path_info} ({size:,} bytes)" + if metadata.get("truncated"): + header += " [TRUNCATED]" + + return { + "content": [{"type": "text", "text": f"{header}\n\n{content}"}], + } + else: + return { + "content": [{"type": "text", "text": f"Error: {result['error']}"}], + "isError": True, + } + + + @tool( + name="gitea_list_files", + description="List files and folders in a directory in a Gitea repository. Use this to explore the structure of Jordan's homelab repo or any configured Gitea repo.", + input_schema={ + "path": str, + "repo": str, + "branch": str, + }, + ) + async def gitea_list_files_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """List files and directories in a Gitea repo path. + + Zero-cost MCP tool for browsing private Gitea repos. + """ + path = args.get("path", "") + repo = args.get("repo") + branch = args.get("branch") + + client = _get_gitea_client() + if not client: + return { + "content": [{ + "type": "text", + "text": ( + "Error: Gitea not configured. " + "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " + "and add your Personal Access Token." + ), + }], + "isError": True, + } + + # Parse owner/repo if provided + owner = None + if repo and "/" in repo: + parts = repo.split("/", 1) + owner = parts[0] + repo = parts[1] + + result = await client.list_files( + path=path, + owner=owner, + repo=repo, + branch=branch, + ) + + if result["success"]: + files = result["files"] + repo_name = result.get("repo", "") + display_path = result.get("path", "/") + count = result.get("count", 0) + + # Format output + lines = [f"Directory: {repo_name}/{display_path} ({count} items)\n"] + for f in files: + if f["type"] == "dir": + lines.append(f" DIR {f['name']}/") + else: + size_str = f"({f['size']:,} bytes)" if f["size"] else "" + lines.append(f" FILE {f['name']} {size_str}") + + return { + "content": [{"type": "text", "text": "\n".join(lines)}], + } + else: + return { + "content": [{"type": "text", "text": f"Error: {result['error']}"}], + "isError": True, + } + + + @tool( + name="gitea_search_code", + description="Search for files by name/path in a Gitea repository. Searches file and directory names. For content search, use gitea_read_file on specific files.", + input_schema={ + "query": str, + "repo": str, + }, + ) + async def gitea_search_code_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Search for code/files in a Gitea repository. + + Zero-cost MCP tool. Searches file/directory names in the repo tree. + """ + query = args.get("query", "") + repo = args.get("repo") + + if not query: + return { + "content": [{"type": "text", "text": "Error: query is required"}], + "isError": True, + } + + client = _get_gitea_client() + if not client: + return { + "content": [{ + "type": "text", + "text": ( + "Error: Gitea not configured. " + "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " + "and add your Personal Access Token." + ), + }], + "isError": True, + } + + # Parse owner/repo if provided + owner = None + if repo and "/" in repo: + parts = repo.split("/", 1) + owner = parts[0] + repo = parts[1] + + result = await client.search_code( + query=query, + owner=owner, + repo=repo, + ) + + if result["success"]: + results = result.get("results", []) + count = result.get("count", 0) + repo_name = result.get("repo", "") + + if not results: + message = result.get("message", f"No results for '{query}'") + return { + "content": [{"type": "text", "text": message}], + } + + lines = [f"Search results for '{query}' in {repo_name} ({count} matches):\n"] + for r in results: + type_icon = "DIR " if r["type"] == "dir" else "FILE" + size_str = f"({r['size']:,} bytes)" if r.get("size") else "" + lines.append(f" {type_icon} {r['path']} {size_str}") + + return { + "content": [{"type": "text", "text": "\n".join(lines)}], + } + else: + return { + "content": [{"type": "text", "text": f"Error: {result['error']}"}], + "isError": True, + } + + + @tool( + name="gitea_get_tree", + description="Get the directory tree structure from a Gitea repository. Shows all files and folders. Use recursive=true for the full tree.", + input_schema={ + "repo": str, + "branch": str, + "recursive": bool, + }, + ) + async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Get directory tree from a Gitea repository. + + Zero-cost MCP tool for viewing repo structure. + """ + repo = args.get("repo") + branch = args.get("branch") + recursive = args.get("recursive", False) + + client = _get_gitea_client() + if not client: + return { + "content": [{ + "type": "text", + "text": ( + "Error: Gitea not configured. " + "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " + "and add your Personal Access Token." + ), + }], + "isError": True, + } + + # Parse owner/repo if provided + owner = None + if repo and "/" in repo: + parts = repo.split("/", 1) + owner = parts[0] + repo = parts[1] + + result = await client.get_tree( + owner=owner, + repo=repo, + branch=branch, + recursive=recursive, + ) + + if result["success"]: + entries = result.get("entries", []) + repo_name = result.get("repo", "") + branch_name = result.get("branch", "main") + total = result.get("total", 0) + truncated = result.get("truncated", False) + + lines = [f"Tree: {repo_name} (branch: {branch_name}, {total} entries)"] + if truncated: + lines[0] += " [TRUNCATED - tree too large]" + lines.append("") + + for entry in entries: + if entry["type"] == "dir": + lines.append(f" {entry['path']}/") + else: + size_str = f"({entry['size']:,} bytes)" if entry.get("size") else "" + lines.append(f" {entry['path']} {size_str}") + + # Truncate output if too long + text = "\n".join(lines) + if len(text) > _MAX_TOOL_OUTPUT: + text = text[:_MAX_TOOL_OUTPUT] + "\n\n... (tree truncated, use gitea_list_files for specific directories)" + + return { + "content": [{"type": "text", "text": text}], + } + else: + return { + "content": [{"type": "text", "text": f"Error: {result['error']}"}], + "isError": True, + } + + + + + # ============================================ + # Sub-Agent Delegation Tool (MCP Bridge) + # ============================================ + + @tool( + name="delegate_task", + description=( + "Delegate a task to a specialist sub-agent. The sub-agent runs in a separate " + "thread with its own conversation context but shares the memory workspace. " + "Use this to parallelize work (e.g., creating multiple diagrams, researching " + "multiple topics). Each sub-agent gets a specialist prompt defining its role. " + "Returns the sub-agent's final response text." + ), + input_schema={ + "task": str, + "specialist_prompt": str, + "agent_id": str, + }, + ) + async def delegate_task_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Delegate a task to a specialist sub-agent via the main Agent. + + This MCP tool bridges the gap between the Agent SDK subprocess (claude.exe) + and the in-process Agent.delegate() method. It retrieves the main Agent + from the global registry and calls delegate() synchronously. + + Thread-safe: Agent.delegate() uses Agent._chat_lock internally, and + MemorySystem.write_memory() uses _write_lock for file operations. + """ + task = args.get("task", "") + specialist_prompt = args.get("specialist_prompt", "") + agent_id = args.get("agent_id", "") + + # Validate required fields + if not task: + return { + "content": [{"type": "text", "text": "Error: 'task' is required"}], + "isError": True, + } + if not specialist_prompt: + return { + "content": [{ + "type": "text", + "text": "Error: 'specialist_prompt' is required (defines the sub-agent role)", + }], + "isError": True, + } + + # Check agent registry availability + if not AGENT_REGISTRY_AVAILABLE: + return { + "content": [{ + "type": "text", + "text": "Error: agent_registry module not available. Cannot delegate tasks.", + }], + "isError": True, + } + + # Get the main agent from the global registry + agent = get_agent() + if agent is None: + + return { + + "content": [{ + + "type": "text", + + "text": ( + + "Error: No agent registered. The bot may still be starting up, " + + "or agent_registry.register_agent() was not called at startup." + + ), + + }], + + "isError": True, + + } + + + + # Generate agent_id if not provided + + if not agent_id: + + agent_id = f"sub_{threading.current_thread().name}_{id(args)}" + + + + # Acquire semaphore to limit concurrent delegations + if not _delegate_semaphore.acquire(blocking=False): return { "content": [{ "type": "text", "text": ( - "Error: No agent registered. The bot may still be starting up, " - "or agent_registry.register_agent() was not called at startup." + f"Error: Maximum concurrent delegations reached ({_MAX_CONCURRENT_DELEGATES}). " + "Wait for existing sub-agents to complete." ), }], "isError": True, } - # Generate agent_id if not provided - if not agent_id: - agent_id = f"sub_{threading.current_thread().name}_{id(args)}" - try: - # Run delegate in a thread to avoid blocking the async event loop. - # Agent.delegate() is synchronous (calls sub_agent.chat() which holds _chat_lock). + # Create Future tied to CURRENT event loop (MCP handler's loop) import asyncio - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, # Use default thread pool - lambda: agent.delegate( - task=task, - specialist_prompt=specialist_prompt, - username="default", - agent_id=agent_id, - max_retries=1, - ), + loop = asyncio.get_running_loop() + result_future = loop.create_future() + + def _run_delegation(): + """Run sub-agent in dedicated thread with isolated event loop.""" + try: + # Create completely fresh event loop for this thread + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + # agent.delegate() is synchronous - runs in this thread's loop context + result = agent.delegate( + task=task, + specialist_prompt=specialist_prompt, + username="default", + agent_id=agent_id, + max_retries=1, + ) + # Report result back to caller's event loop (thread-safe) + loop.call_soon_threadsafe(result_future.set_result, result) + except Exception as e: + loop.call_soon_threadsafe(result_future.set_exception, e) + finally: + new_loop.close() + except Exception as e: + # Fallback if even loop creation fails + try: + loop.call_soon_threadsafe(result_future.set_exception, e) + except Exception: + pass + + # Start dedicated daemon thread (NOT from thread pool) + thread = threading.Thread( + target=_run_delegation, + name=f"delegate-{agent_id}", + daemon=True, ) + thread.start() + + # Await result from dedicated thread (properly yields to event loop) + result = await asyncio.wait_for(result_future, timeout=_DELEGATE_TIMEOUT) # Truncate result if too large if len(result) > _MAX_TOOL_OUTPUT: @@ -2031,7 +4085,10 @@ async def delegate_task_tool(args: Dict[str, Any]) -> Dict[str, Any]: return { "content": [{ "type": "text", - "text": f"Error: Sub-agent '{agent_id}' timed out. Task may be too complex.", + "text": ( + f"Error: Sub-agent '{agent_id}' timed out after {_DELEGATE_TIMEOUT}s. " + "Task may be too complex." + ), }], "isError": True, } @@ -2043,6 +4100,9 @@ async def delegate_task_tool(args: Dict[str, Any]) -> Dict[str, Any]: }], "isError": True, } + finally: + # Always release semaphore + _delegate_semaphore.release() # Create the MCP server with all tools