## New Features - **Gitea MCP Tools** (zero API cost): - gitea_read_file: Read files from homelab repo - gitea_list_files: Browse directories - gitea_search_code: Search by filename - gitea_get_tree: Get directory tree - **Gitea Client** (gitea_tools/client.py): REST API wrapper with OAuth - **Proxmox SSH Scripts** (scripts/): Homelab data collection utilities - **Obsidian MCP Support** (obsidian_mcp.py): Advanced vault operations - **Voice Integration Plan** (JARVIS_VOICE_INTEGRATION_PLAN.md) ## Improvements - **Increased timeout**: 5min → 10min for complex tasks (llm_interface.py) - **Removed Direct API fallback**: Gitea tools are MCP-only (zero cost) - **Updated .env.example**: Added Obsidian MCP configuration - **Enhanced .gitignore**: Protect personal memory files (SOUL.md, MEMORY.md) ## Cleanup - Deleted 24 obsolete files (temp/test/experimental scripts, outdated docs) - Untracked personal memory files (SOUL.md, MEMORY.md now in .gitignore) - Removed: AGENT_SDK_IMPLEMENTATION.md, HYBRID_SEARCH_SUMMARY.md, IMPLEMENTATION_SUMMARY.md, MIGRATION.md, test_agent_sdk.py, etc. ## Configuration - Added config/gitea_config.example.yaml (Gitea setup template) - Added config/obsidian_mcp.example.yaml (Obsidian MCP template) - Updated scheduled_tasks.yaml with new task examples Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1822 lines
59 KiB
Python
1822 lines
59 KiB
Python
"""MCP Tools - In-process tools using Claude Agent SDK.
|
|
|
|
These tools run directly in the Python process for better performance
|
|
compared to the traditional tool execution flow. File and system tools
|
|
are ideal candidates for MCP since they don't require external APIs.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
import subprocess
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import urlparse
|
|
from datetime import datetime
|
|
from claude_agent_sdk import tool, create_sdk_mcp_server
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Import memory system for hybrid search
|
|
try:
|
|
from memory_system import MemorySystem
|
|
MEMORY_AVAILABLE = True
|
|
except ImportError:
|
|
MEMORY_AVAILABLE = False
|
|
|
|
|
|
# Maximum characters of tool output to return (prevents token explosion)
|
|
_MAX_TOOL_OUTPUT = 5000
|
|
|
|
# Maximum page size for web fetching (500KB)
|
|
_MAX_WEB_PAGE_SIZE = 500_000
|
|
|
|
# Maximum text content from web page (10,000 chars ≈ 2,500 tokens)
|
|
_MAX_WEB_TEXT = 10_000
|
|
|
|
# Zettelkasten vault paths
|
|
_VAULT_ROOT = Path("memory_workspace/obsidian")
|
|
_VAULT_FLEETING = _VAULT_ROOT / "fleeting"
|
|
_VAULT_DAILY = _VAULT_ROOT / "daily"
|
|
_VAULT_PERMANENT = _VAULT_ROOT / "permanent"
|
|
_VAULT_LITERATURE = _VAULT_ROOT / "literature"
|
|
|
|
|
|
def _generate_note_id() -> str:
|
|
"""Generate unique timestamp-based note ID (YYYYMMDDHHmmss)."""
|
|
return datetime.now().strftime("%Y%m%d%H%M%S")
|
|
|
|
|
|
def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str:
|
|
"""Create YAML front matter for zettelkasten note."""
|
|
now = datetime.now()
|
|
tags_str = ", ".join(tags) if tags else ""
|
|
|
|
return f"""---
|
|
id: {note_id}
|
|
title: {title}
|
|
created: {now.strftime("%Y-%m-%dT%H:%M:%S")}
|
|
modified: {now.strftime("%Y-%m-%dT%H:%M:%S")}
|
|
type: {note_type}
|
|
tags: [{tags_str}]
|
|
---
|
|
|
|
"""
|
|
|
|
|
|
def _ensure_vault_structure():
|
|
"""Ensure zettelkasten vault directories exist."""
|
|
for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _get_memory_system() -> Optional['MemorySystem']:
|
|
"""Get memory system instance for hybrid search."""
|
|
if not MEMORY_AVAILABLE:
|
|
return None
|
|
|
|
try:
|
|
# Initialize memory system pointing to obsidian vault
|
|
memory = MemorySystem(workspace_dir=_VAULT_ROOT)
|
|
return memory
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]:
|
|
"""Find related notes using hybrid search (vector + keyword).
|
|
|
|
Returns list of (note_title, score) tuples.
|
|
"""
|
|
memory = _get_memory_system()
|
|
|
|
if memory:
|
|
# Use hybrid search for better results
|
|
try:
|
|
# Index the vault if needed
|
|
for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
if vault_dir.exists():
|
|
for note_path in vault_dir.glob("*.md"):
|
|
memory.index_file(note_path)
|
|
|
|
# Search with content + title as query
|
|
query = f"{title} {content[:500]}" # Limit content to first 500 chars
|
|
results = memory.search_hybrid(query, max_results=max_results)
|
|
|
|
# Convert results to (title, score) tuples
|
|
related = []
|
|
for result in results:
|
|
note_path = Path(result["path"])
|
|
note_title = note_path.stem
|
|
if ' - ' in note_title:
|
|
note_title = note_title.split(' - ', 1)[1]
|
|
|
|
# Use snippet match percentage as score
|
|
score = result.get("snippet", "").count("**") // 2 # Count of matches
|
|
related.append((note_title, score))
|
|
|
|
return related
|
|
except Exception:
|
|
pass # Fall back to keyword search
|
|
|
|
# Fallback: keyword search
|
|
related_notes = []
|
|
search_terms = title.lower().split() + content.lower().split()[:20]
|
|
search_terms = [term for term in search_terms if len(term) > 4]
|
|
|
|
for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
if not vault_dir.exists():
|
|
continue
|
|
|
|
for note_path in vault_dir.glob("*.md"):
|
|
try:
|
|
note_content = note_path.read_text(encoding="utf-8").lower()
|
|
matches = sum(1 for term in search_terms if term in note_content)
|
|
if matches >= 2:
|
|
note_title = note_path.stem
|
|
if ' - ' in note_title:
|
|
note_title = note_title.split(' - ', 1)[1]
|
|
related_notes.append((note_title, matches))
|
|
except Exception:
|
|
continue
|
|
|
|
related_notes.sort(key=lambda x: x[1], reverse=True)
|
|
return related_notes[:max_results]
|
|
|
|
|
|
def _is_safe_url(url: str) -> bool:
|
|
"""Validate URL safety - blocks localhost, private IPs, and file:// URLs."""
|
|
try:
|
|
parsed = urlparse(url)
|
|
|
|
# Must be HTTP/HTTPS
|
|
if parsed.scheme not in ['http', 'https']:
|
|
return False
|
|
|
|
# Must have a hostname
|
|
if not parsed.hostname:
|
|
return False
|
|
|
|
# Block localhost and loopback IPs
|
|
blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1']
|
|
if parsed.hostname in blocked_hosts:
|
|
return False
|
|
|
|
# Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x)
|
|
if parsed.hostname:
|
|
parts = parsed.hostname.split('.')
|
|
if len(parts) == 4 and parts[0].isdigit():
|
|
first_octet = int(parts[0])
|
|
# Check for private ranges
|
|
if first_octet == 10: # 10.0.0.0/8
|
|
return False
|
|
if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16
|
|
return False
|
|
if first_octet == 172 and len(parts) > 1 and parts[1].isdigit():
|
|
second_octet = int(parts[1])
|
|
if 16 <= second_octet <= 31: # 172.16.0.0/12
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@tool(
|
|
name="read_file",
|
|
description="Read the contents of a file. Use this to view configuration files, code, or any text file.",
|
|
input_schema={
|
|
"file_path": str,
|
|
},
|
|
)
|
|
async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Read and return file contents."""
|
|
file_path = args["file_path"]
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: File not found: {file_path}"}],
|
|
"isError": True
|
|
}
|
|
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
if len(content) > _MAX_TOOL_OUTPUT:
|
|
content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)"
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}]
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error reading file: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="write_file",
|
|
description="Write content to a file. Creates a new file or overwrites existing file completely.",
|
|
input_schema={
|
|
"file_path": str,
|
|
"content": str,
|
|
},
|
|
)
|
|
async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Write content to a file."""
|
|
file_path = args["file_path"]
|
|
content = args["content"]
|
|
path = Path(file_path)
|
|
|
|
try:
|
|
# Create parent directories if they don't exist
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(content, encoding="utf-8")
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Successfully wrote to {file_path} ({len(content)} characters)"
|
|
}]
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error writing file: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="edit_file",
|
|
description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.",
|
|
input_schema={
|
|
"file_path": str,
|
|
"old_text": str,
|
|
"new_text": str,
|
|
},
|
|
)
|
|
async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Edit file by replacing text."""
|
|
file_path = args["file_path"]
|
|
old_text = args["old_text"]
|
|
new_text = args["new_text"]
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: File not found: {file_path}"}],
|
|
"isError": True
|
|
}
|
|
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
|
|
if old_text not in content:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..."
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
new_content = content.replace(old_text, new_text, 1)
|
|
path.write_text(new_content, encoding="utf-8")
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Successfully edited {file_path}. Replaced 1 occurrence."
|
|
}]
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error editing file: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="list_directory",
|
|
description="List files and directories in a given path. Useful for exploring the file system.",
|
|
input_schema={
|
|
"path": str,
|
|
},
|
|
)
|
|
async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""List directory contents."""
|
|
dir_path = Path(args.get("path", "."))
|
|
|
|
if not dir_path.exists():
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}],
|
|
"isError": True
|
|
}
|
|
|
|
if not dir_path.is_dir():
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}],
|
|
"isError": True
|
|
}
|
|
|
|
try:
|
|
items = []
|
|
for item in sorted(dir_path.iterdir()):
|
|
item_type = "DIR " if item.is_dir() else "FILE"
|
|
size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)"
|
|
items.append(f" {item_type} {item.name}{size}")
|
|
|
|
if not items:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Directory {dir_path} is empty"}]
|
|
}
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Contents of {dir_path}:\n" + "\n".join(items)
|
|
}]
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="run_command",
|
|
description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.",
|
|
input_schema={
|
|
"command": str,
|
|
"working_dir": str,
|
|
},
|
|
)
|
|
async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute a shell command."""
|
|
command = args["command"]
|
|
working_dir = args.get("working_dir", ".")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
cwd=working_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
|
|
output = []
|
|
if result.stdout:
|
|
stdout = result.stdout
|
|
if len(stdout) > _MAX_TOOL_OUTPUT:
|
|
stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)"
|
|
output.append(f"STDOUT:\n{stdout}")
|
|
if result.stderr:
|
|
stderr = result.stderr
|
|
if len(stderr) > _MAX_TOOL_OUTPUT:
|
|
stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)"
|
|
output.append(f"STDERR:\n{stderr}")
|
|
|
|
status = f"Command exited with code {result.returncode}"
|
|
if not output:
|
|
text = status
|
|
else:
|
|
text = status + "\n\n" + "\n\n".join(output)
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": text}],
|
|
"isError": result.returncode != 0
|
|
}
|
|
except subprocess.TimeoutExpired:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}],
|
|
"isError": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error running command: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="web_fetch",
|
|
description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.",
|
|
input_schema={
|
|
"url": str,
|
|
},
|
|
)
|
|
async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Fetch webpage and return parsed text content.
|
|
|
|
This is a zero-cost MCP tool - it fetches the HTML and converts to text,
|
|
then returns it to the main Agent SDK query for processing (no extra API cost).
|
|
"""
|
|
url = args["url"]
|
|
|
|
# Security: Validate URL
|
|
if not _is_safe_url(url):
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed."
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
try:
|
|
# Fetch page with timeout and size limit
|
|
headers = {
|
|
"User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)"
|
|
}
|
|
async with httpx.AsyncClient(
|
|
timeout=10.0,
|
|
follow_redirects=True,
|
|
limits=httpx.Limits(max_connections=5),
|
|
headers=headers
|
|
) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
|
|
# Check size limit
|
|
if len(response.content) > _MAX_WEB_PAGE_SIZE:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
# Parse HTML to text
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# Remove unwanted elements
|
|
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
|
|
element.decompose()
|
|
|
|
# Extract text
|
|
text = soup.get_text(separator='\n', strip=True)
|
|
|
|
# Clean up excessive whitespace
|
|
lines = [line.strip() for line in text.split('\n') if line.strip()]
|
|
text = '\n'.join(lines)
|
|
|
|
# Truncate if needed
|
|
if len(text) > _MAX_WEB_TEXT:
|
|
text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)"
|
|
|
|
# Get title if available
|
|
title = soup.title.string if soup.title else "No title"
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}"
|
|
}]
|
|
}
|
|
|
|
except httpx.TimeoutException:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Request to {url} timed out after 10 seconds"
|
|
}],
|
|
"isError": True
|
|
}
|
|
except httpx.HTTPStatusError as e:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: HTTP {e.response.status_code} - {url}"
|
|
}],
|
|
"isError": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error fetching webpage: {str(e)}"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="fleeting_note",
|
|
description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.",
|
|
input_schema={
|
|
"content": str,
|
|
"tags": str, # Comma-separated tags (optional)
|
|
},
|
|
)
|
|
async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create a fleeting note (quick capture) in the zettelkasten vault.
|
|
|
|
Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes.
|
|
"""
|
|
content = args["content"]
|
|
tags_str = args.get("tags", "")
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Generate note ID and extract title from first line
|
|
note_id = _generate_note_id()
|
|
lines = content.split('\n', 1)
|
|
title = lines[0][:50] if lines else "Quick Note"
|
|
|
|
# Parse tags
|
|
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
|
|
tags.append("fleeting") # Always tag as fleeting
|
|
|
|
# Create note file
|
|
filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md"
|
|
note_path = _VAULT_FLEETING / filename
|
|
|
|
# Write note with front matter
|
|
frontmatter = _create_frontmatter(note_id, title, tags, "fleeting")
|
|
full_content = frontmatter + content
|
|
|
|
note_path.write_text(full_content, encoding="utf-8")
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}"
|
|
}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="daily_note",
|
|
description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.",
|
|
input_schema={
|
|
"entry": str,
|
|
},
|
|
)
|
|
async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Append timestamped entry to today's daily note.
|
|
|
|
Zero-cost MCP tool for daily journaling and activity logging.
|
|
"""
|
|
entry = args["entry"]
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Get today's date
|
|
now = datetime.now()
|
|
date_str = now.strftime("%Y-%m-%d")
|
|
time_str = now.strftime("%H:%M")
|
|
|
|
# Daily note path
|
|
note_path = _VAULT_DAILY / f"{date_str}.md"
|
|
|
|
# Create or update daily note
|
|
if note_path.exists():
|
|
# Append to existing note
|
|
current_content = note_path.read_text(encoding="utf-8")
|
|
new_entry = f"\n## {time_str}\n{entry}\n"
|
|
updated_content = current_content + new_entry
|
|
else:
|
|
# Create new daily note with front matter
|
|
frontmatter = f"""---
|
|
date: {date_str}
|
|
type: daily
|
|
tags: [daily-note]
|
|
---
|
|
|
|
# {date_str}
|
|
|
|
## {time_str}
|
|
{entry}
|
|
"""
|
|
updated_content = frontmatter
|
|
|
|
note_path.write_text(updated_content, encoding="utf-8")
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}"
|
|
}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="literature_note",
|
|
description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.",
|
|
input_schema={
|
|
"url": str,
|
|
"tags": str, # Comma-separated tags (optional)
|
|
},
|
|
)
|
|
async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create literature note from web article.
|
|
|
|
Zero-cost MCP tool. Combines web_fetch with note creation.
|
|
"""
|
|
url = args["url"]
|
|
tags_str = args.get("tags", "")
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Fetch article content using web_fetch logic
|
|
if not _is_safe_url(url):
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Blocked unsafe URL - {url}"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
# Fetch the article
|
|
headers = {
|
|
"User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)"
|
|
}
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
|
|
if len(response.content) > _MAX_WEB_PAGE_SIZE:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error: Page too large"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
# Parse content
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
|
|
element.decompose()
|
|
|
|
text = soup.get_text(separator='\n', strip=True)
|
|
lines = [line.strip() for line in text.split('\n') if line.strip()]
|
|
text = '\n'.join(lines)[:_MAX_WEB_TEXT]
|
|
|
|
title = soup.title.string if soup.title else "Untitled Article"
|
|
|
|
# Generate note
|
|
note_id = _generate_note_id()
|
|
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
|
|
tags.extend(["literature", "web-article"])
|
|
|
|
# Create note content
|
|
note_content = f"""# {title}
|
|
|
|
**Source**: {url}
|
|
**Captured**: {datetime.now().strftime("%Y-%m-%d")}
|
|
|
|
## Content
|
|
|
|
{text}
|
|
|
|
## Notes
|
|
|
|
(Add your thoughts, key takeaways, and connections to other notes here)
|
|
|
|
## Related
|
|
-
|
|
|
|
---
|
|
*This is a literature note. Process it into permanent notes with key insights.*
|
|
"""
|
|
|
|
# Create filename and path
|
|
filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md"
|
|
note_path = _VAULT_LITERATURE / filename
|
|
|
|
# Write with frontmatter
|
|
frontmatter = _create_frontmatter(note_id, title, tags, "literature")
|
|
full_content = frontmatter + note_content
|
|
|
|
note_path.write_text(full_content, encoding="utf-8")
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}"
|
|
}]
|
|
}
|
|
|
|
except httpx.TimeoutException:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: Request timed out"}],
|
|
"isError": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="permanent_note",
|
|
description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.",
|
|
input_schema={
|
|
"title": str,
|
|
"content": str,
|
|
"tags": str, # Comma-separated tags (optional)
|
|
},
|
|
)
|
|
async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create permanent note with smart linking suggestions.
|
|
|
|
Zero-cost MCP tool. Uses simple search for now.
|
|
Future: Will use hybrid search for better suggestions.
|
|
"""
|
|
title = args["title"]
|
|
content = args["content"]
|
|
tags_str = args.get("tags", "")
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Generate note ID
|
|
note_id = _generate_note_id()
|
|
|
|
# Parse tags
|
|
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
|
|
tags.append("permanent")
|
|
|
|
# Search for related notes using hybrid search (vector + keyword)
|
|
related_notes = _find_related_notes_hybrid(title, content, max_results=5)
|
|
|
|
# Build related section
|
|
related_section = "\n## Related Notes\n"
|
|
if related_notes:
|
|
for note_title, _ in related_notes:
|
|
related_section += f"- [[{note_title}]]\n"
|
|
else:
|
|
related_section += "- (No related notes found yet)\n"
|
|
|
|
# Create full note content
|
|
note_content = f"""# {title}
|
|
|
|
{content}
|
|
|
|
{related_section}
|
|
|
|
---
|
|
*Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}*
|
|
"""
|
|
|
|
# Create filename
|
|
filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md"
|
|
note_path = _VAULT_PERMANENT / filename
|
|
|
|
# Write with frontmatter
|
|
frontmatter = _create_frontmatter(note_id, title, tags, "permanent")
|
|
full_content = frontmatter + note_content
|
|
|
|
note_path.write_text(full_content, encoding="utf-8")
|
|
|
|
# Build result message
|
|
result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}"
|
|
if related_notes:
|
|
result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):"
|
|
for note_title, matches in related_notes:
|
|
result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)"
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": result_msg
|
|
}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="search_by_tags",
|
|
description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.",
|
|
input_schema={
|
|
"tags": str, # Comma-separated tags to search for
|
|
"match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false)
|
|
"limit": int, # Max results (optional, default 10)
|
|
},
|
|
)
|
|
async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Search vault by tags.
|
|
|
|
Zero-cost MCP tool. Searches YAML frontmatter for tag matches.
|
|
"""
|
|
tags_str = args["tags"]
|
|
match_all = args.get("match_all", False)
|
|
limit = args.get("limit", 10)
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Parse search tags
|
|
search_tags = [tag.strip().lower() for tag in tags_str.split(',')]
|
|
|
|
results = []
|
|
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
if not vault_dir.exists():
|
|
continue
|
|
|
|
for note_path in vault_dir.glob("*.md"):
|
|
try:
|
|
content = note_path.read_text(encoding="utf-8")
|
|
|
|
# Extract tags from frontmatter
|
|
if content.startswith("---"):
|
|
parts = content.split("---", 2)
|
|
if len(parts) >= 2:
|
|
frontmatter = parts[1]
|
|
# Look for tags line
|
|
for line in frontmatter.split('\n'):
|
|
if line.strip().startswith('tags:'):
|
|
tags_part = line.split(':', 1)[1].strip()
|
|
# Remove brackets and split
|
|
tags_part = tags_part.strip('[]')
|
|
note_tags = [t.strip().lower() for t in tags_part.split(',')]
|
|
|
|
# Check match
|
|
if match_all:
|
|
if all(tag in note_tags for tag in search_tags):
|
|
results.append(note_path.name)
|
|
else:
|
|
if any(tag in note_tags for tag in search_tags):
|
|
results.append(note_path.name)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
# Limit results
|
|
results = results[:limit]
|
|
|
|
if not results:
|
|
match_type = "all" if match_all else "any"
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"No notes found with {match_type} of tags: {tags_str}"
|
|
}]
|
|
}
|
|
|
|
result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n"
|
|
for i, note_name in enumerate(results, 1):
|
|
result_text += f"{i}. {note_name}\n"
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": result_text
|
|
}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="search_vault",
|
|
description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.",
|
|
input_schema={
|
|
"query": str,
|
|
"tags": str, # Optional comma-separated tags to filter by
|
|
"limit": int, # Max results (optional, default 5)
|
|
},
|
|
)
|
|
async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Search zettelkasten vault using hybrid search (vector + keyword).
|
|
|
|
Zero-cost MCP tool. Uses hybrid search when available for better results.
|
|
"""
|
|
query = args["query"]
|
|
tags_filter = args.get("tags", "")
|
|
limit = args.get("limit", 5)
|
|
|
|
try:
|
|
_ensure_vault_structure()
|
|
|
|
# Try hybrid search first
|
|
memory = _get_memory_system()
|
|
results = []
|
|
|
|
if memory and MEMORY_AVAILABLE:
|
|
try:
|
|
# Index vault
|
|
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
if vault_dir.exists():
|
|
for note_path in vault_dir.glob("*.md"):
|
|
memory.index_file(note_path)
|
|
|
|
# Hybrid search
|
|
search_results = memory.search_hybrid(query, max_results=limit * 2)
|
|
|
|
# Convert to results format
|
|
for result in search_results:
|
|
note_path = Path(result["path"])
|
|
results.append({
|
|
"file": note_path.name,
|
|
"path": str(note_path),
|
|
"snippet": result.get("snippet", "")[:300]
|
|
})
|
|
except Exception:
|
|
pass # Fall back to simple search
|
|
|
|
# Fallback: simple keyword search
|
|
if not results:
|
|
query_lower = query.lower()
|
|
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
|
|
if not vault_dir.exists():
|
|
continue
|
|
|
|
for note_path in vault_dir.glob("*.md"):
|
|
try:
|
|
content = note_path.read_text(encoding="utf-8")
|
|
|
|
if query_lower in content.lower():
|
|
# Extract snippet
|
|
lines = content.split('\n')
|
|
for i, line in enumerate(lines):
|
|
if query_lower in line.lower():
|
|
start = max(0, i - 2)
|
|
end = min(len(lines), i + 3)
|
|
snippet = '\n'.join(lines[start:end])
|
|
results.append({
|
|
"file": note_path.name,
|
|
"path": str(note_path),
|
|
"snippet": snippet[:300]
|
|
})
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
# Filter by tags if specified
|
|
if tags_filter:
|
|
filter_tags = [t.strip().lower() for t in tags_filter.split(',')]
|
|
filtered_results = []
|
|
|
|
for result in results:
|
|
try:
|
|
note_path = Path(result["path"])
|
|
content = note_path.read_text(encoding="utf-8")
|
|
|
|
# Check tags in frontmatter
|
|
if content.startswith("---"):
|
|
parts = content.split("---", 2)
|
|
if len(parts) >= 2:
|
|
frontmatter = parts[1]
|
|
for line in frontmatter.split('\n'):
|
|
if line.strip().startswith('tags:'):
|
|
tags_part = line.split(':', 1)[1].strip().strip('[]')
|
|
note_tags = [t.strip().lower() for t in tags_part.split(',')]
|
|
if any(tag in note_tags for tag in filter_tags):
|
|
filtered_results.append(result)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
results = filtered_results
|
|
|
|
# Limit results
|
|
results = results[:limit]
|
|
|
|
if not results:
|
|
tag_msg = f" with tags '{tags_filter}'" if tags_filter else ""
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"No notes found matching: {query}{tag_msg}"
|
|
}]
|
|
}
|
|
|
|
# Format results
|
|
tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else ""
|
|
result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n"
|
|
for i, result in enumerate(results, 1):
|
|
result_text += f"{i}. {result['file']}\n"
|
|
result_text += f" {result['snippet']}\n\n"
|
|
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": result_text
|
|
}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
# ============================================
|
|
# Google and Weather Tools (MCP Migration)
|
|
# ============================================
|
|
# Lazy-loaded Google clients
|
|
_gmail_client: Optional[Any] = None
|
|
_calendar_client: Optional[Any] = None
|
|
_people_client: Optional[Any] = None
|
|
|
|
|
|
def _initialize_google_clients():
|
|
"""Lazy-load Google API clients when needed."""
|
|
global _gmail_client, _calendar_client, _people_client
|
|
|
|
if _gmail_client is not None:
|
|
return _gmail_client, _calendar_client, _people_client
|
|
|
|
try:
|
|
from google_tools.gmail_client import GmailClient
|
|
from google_tools.calendar_client import CalendarClient
|
|
from google_tools.people_client import PeopleClient
|
|
from google_tools.oauth_manager import GoogleOAuthManager
|
|
|
|
oauth_manager = GoogleOAuthManager()
|
|
credentials = oauth_manager.get_credentials()
|
|
|
|
if not credentials:
|
|
return None, None, None
|
|
|
|
_gmail_client = GmailClient(oauth_manager)
|
|
_calendar_client = CalendarClient(oauth_manager)
|
|
_people_client = PeopleClient(oauth_manager)
|
|
|
|
return _gmail_client, _calendar_client, _people_client
|
|
except Exception as e:
|
|
print(f"[MCP Google] Failed to initialize: {e}")
|
|
return None, None, None
|
|
|
|
|
|
@tool(
|
|
name="get_weather",
|
|
description="Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and description.",
|
|
input_schema={"location": str},
|
|
)
|
|
async def get_weather(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get current weather for a location using OpenWeatherMap API."""
|
|
location = args.get("location", "Phoenix, US")
|
|
import os
|
|
import requests
|
|
|
|
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
|
|
if not api_key:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": "Error: OPENWEATHERMAP_API_KEY not found in environment variables"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
try:
|
|
base_url = "http://api.openweathermap.org/data/2.5/weather"
|
|
params = {
|
|
"q": location,
|
|
"appid": api_key,
|
|
"units": "imperial"
|
|
}
|
|
|
|
response = requests.get(base_url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
temp = data["main"]["temp"]
|
|
feels_like = data["main"]["feels_like"]
|
|
humidity = data["main"]["humidity"]
|
|
conditions = data["weather"][0]["main"]
|
|
description = data["weather"][0]["description"]
|
|
city_name = data["name"]
|
|
|
|
summary = (
|
|
f"Weather in {city_name}:\n"
|
|
f"Temperature: {temp}°F (feels like {feels_like}°F)\n"
|
|
f"Conditions: {conditions} - {description}\n"
|
|
f"Humidity: {humidity}%"
|
|
)
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": summary}]
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error getting weather: {str(e)}"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="send_email",
|
|
description="Send an email via Gmail API. Requires prior OAuth setup (--setup-google).",
|
|
input_schema={"to": str, "subject": str, "body": str, "cc": str, "reply_to_message_id": str},
|
|
)
|
|
async def send_email(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Send an email via Gmail API."""
|
|
to = args["to"]
|
|
subject = args["subject"]
|
|
body = args["body"]
|
|
cc = args.get("cc")
|
|
reply_to_message_id = args.get("reply_to_message_id")
|
|
|
|
gmail_client, _, _ = _initialize_google_clients()
|
|
|
|
if not gmail_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = gmail_client.send_email(
|
|
to=to, subject=subject, body=body, cc=cc,
|
|
reply_to_message_id=reply_to_message_id,
|
|
)
|
|
|
|
if result["success"]:
|
|
msg_id = result.get("message_id", "unknown")
|
|
text = f"Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}"
|
|
else:
|
|
text = f"Error sending email: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="read_emails",
|
|
description="Search and read emails from Gmail using Gmail query syntax (e.g., 'from:user@example.com after:2026/02/10').",
|
|
input_schema={"query": str, "max_results": int, "include_body": bool},
|
|
)
|
|
async def read_emails(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Search and read emails from Gmail."""
|
|
query = args.get("query", "")
|
|
max_results = args.get("max_results", 10)
|
|
include_body = args.get("include_body", False)
|
|
|
|
gmail_client, _, _ = _initialize_google_clients()
|
|
|
|
if not gmail_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = gmail_client.search_emails(query=query, max_results=max_results, include_body=include_body)
|
|
|
|
if result["success"]:
|
|
summary = result.get("summary", "")
|
|
if len(summary) > _MAX_TOOL_OUTPUT:
|
|
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
|
|
else:
|
|
summary = f"Error reading emails: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": summary}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="get_email",
|
|
description="Get full content of a specific email by its Gmail message ID.",
|
|
input_schema={"message_id": str, "format_type": str},
|
|
)
|
|
async def get_email(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get full content of a specific email."""
|
|
message_id = args["message_id"]
|
|
format_type = args.get("format_type", "text")
|
|
|
|
gmail_client, _, _ = _initialize_google_clients()
|
|
|
|
if not gmail_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = gmail_client.get_email(message_id=message_id, format_type=format_type)
|
|
|
|
if result["success"]:
|
|
email_data = result.get("email", {})
|
|
text = (
|
|
f"From: {email_data.get('from', 'Unknown')}\n"
|
|
f"To: {email_data.get('to', 'Unknown')}\n"
|
|
f"Subject: {email_data.get('subject', 'No subject')}\n"
|
|
f"Date: {email_data.get('date', 'Unknown')}\n\n"
|
|
f"{email_data.get('body', 'No content')}"
|
|
)
|
|
if len(text) > _MAX_TOOL_OUTPUT:
|
|
text = text[:_MAX_TOOL_OUTPUT] + "\n... (content truncated)"
|
|
else:
|
|
text = f"Error getting email: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="read_calendar",
|
|
description="Read upcoming events from Google Calendar. Shows events from today onwards.",
|
|
input_schema={"days_ahead": int, "calendar_id": str, "max_results": int},
|
|
)
|
|
async def read_calendar(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Read upcoming calendar events."""
|
|
days_ahead = args.get("days_ahead", 7)
|
|
calendar_id = args.get("calendar_id", "primary")
|
|
max_results = args.get("max_results", 20)
|
|
|
|
_, calendar_client, _ = _initialize_google_clients()
|
|
|
|
if not calendar_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = calendar_client.list_events(
|
|
days_ahead=days_ahead, calendar_id=calendar_id, max_results=max_results,
|
|
)
|
|
|
|
if result["success"]:
|
|
summary = result.get("summary", "No events found")
|
|
if len(summary) > _MAX_TOOL_OUTPUT:
|
|
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
|
|
text = f"Upcoming events (next {days_ahead} days):\n\n{summary}"
|
|
else:
|
|
text = f"Error reading calendar: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="create_calendar_event",
|
|
description="Create a new event in Google Calendar. Use ISO 8601 format for times.",
|
|
input_schema={
|
|
"summary": str, "start_time": str, "end_time": str,
|
|
"description": str, "location": str, "calendar_id": str,
|
|
},
|
|
)
|
|
async def create_calendar_event(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create a new calendar event."""
|
|
summary = args["summary"]
|
|
start_time = args["start_time"]
|
|
end_time = args["end_time"]
|
|
description = args.get("description", "")
|
|
location = args.get("location", "")
|
|
calendar_id = args.get("calendar_id", "primary")
|
|
|
|
_, calendar_client, _ = _initialize_google_clients()
|
|
|
|
if not calendar_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = calendar_client.create_event(
|
|
summary=summary, start_time=start_time, end_time=end_time,
|
|
description=description, location=location, calendar_id=calendar_id,
|
|
)
|
|
|
|
if result["success"]:
|
|
event_id = result.get("event_id", "unknown")
|
|
html_link = result.get("html_link", "")
|
|
start = result.get("start", start_time)
|
|
text = (
|
|
f"Calendar event created successfully!\n"
|
|
f"Title: {summary}\nStart: {start}\n"
|
|
f"Event ID: {event_id}\nLink: {html_link}"
|
|
)
|
|
else:
|
|
text = f"Error creating calendar event: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="search_calendar",
|
|
description="Search calendar events by text query. Searches event titles and descriptions.",
|
|
input_schema={"query": str, "calendar_id": str},
|
|
)
|
|
async def search_calendar(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Search calendar events by text query."""
|
|
query = args["query"]
|
|
calendar_id = args.get("calendar_id", "primary")
|
|
|
|
_, calendar_client, _ = _initialize_google_clients()
|
|
|
|
if not calendar_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = calendar_client.search_events(query=query, calendar_id=calendar_id)
|
|
|
|
if result["success"]:
|
|
summary = result.get("summary", "No events found")
|
|
if len(summary) > _MAX_TOOL_OUTPUT:
|
|
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
|
|
text = f"Calendar search results for '{query}':\n\n{summary}"
|
|
else:
|
|
text = f"Error searching calendar: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
# ============================================
|
|
# Contacts Tools (MCP)
|
|
# ============================================
|
|
|
|
|
|
@tool(
|
|
name="create_contact",
|
|
description="Create a new Google contact. Requires prior OAuth setup (--setup-google).",
|
|
input_schema={"given_name": str, "family_name": str, "email": str, "phone": str, "notes": str},
|
|
)
|
|
async def create_contact(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create a new Google contact."""
|
|
given_name = args["given_name"]
|
|
family_name = args.get("family_name", "")
|
|
email = args.get("email", "")
|
|
phone = args.get("phone")
|
|
notes = args.get("notes")
|
|
|
|
_, _, people_client = _initialize_google_clients()
|
|
|
|
if not people_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = people_client.create_contact(
|
|
given_name=given_name, family_name=family_name,
|
|
email=email, phone=phone, notes=notes,
|
|
)
|
|
|
|
if result["success"]:
|
|
name = result.get("name", given_name)
|
|
resource = result.get("resource_name", "")
|
|
text = f"Contact created: {name}\nResource: {resource}"
|
|
else:
|
|
text = f"Error creating contact: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="list_contacts",
|
|
description="List or search Google contacts. Without a query, lists all contacts sorted by last name.",
|
|
input_schema={"max_results": int, "query": str},
|
|
)
|
|
async def list_contacts(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""List or search Google contacts."""
|
|
max_results = args.get("max_results", 100)
|
|
query = args.get("query")
|
|
|
|
_, _, people_client = _initialize_google_clients()
|
|
|
|
if not people_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = people_client.list_contacts(max_results=max_results, query=query)
|
|
|
|
if result["success"]:
|
|
summary = result.get("summary", "No contacts found.")
|
|
if len(summary) > _MAX_TOOL_OUTPUT:
|
|
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
|
|
text = f"Contacts ({result.get('count', 0)} found):\n\n{summary}"
|
|
else:
|
|
text = f"Error listing contacts: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
@tool(
|
|
name="get_contact",
|
|
description="Get full details of a specific Google contact by resource name.",
|
|
input_schema={"resource_name": str},
|
|
)
|
|
async def get_contact(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get full details of a specific Google contact."""
|
|
resource_name = args["resource_name"]
|
|
|
|
_, _, people_client = _initialize_google_clients()
|
|
|
|
if not people_client:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
|
|
"isError": True
|
|
}
|
|
|
|
result = people_client.get_contact(resource_name=resource_name)
|
|
|
|
if result["success"]:
|
|
c = result.get("contact", {})
|
|
output = []
|
|
name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip()
|
|
output.append(f"Name: {name or '(no name)'}")
|
|
if c.get("email"):
|
|
output.append(f"Email: {c['email']}")
|
|
if c.get("phone"):
|
|
output.append(f"Phone: {c['phone']}")
|
|
if c.get("notes"):
|
|
output.append(f"Notes: {c['notes']}")
|
|
output.append(f"Resource: {c.get('resource_name', resource_name)}")
|
|
text = "\n".join(output)
|
|
else:
|
|
text = f"Error getting contact: {result.get('error', 'Unknown error')}"
|
|
|
|
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
|
|
|
|
|
|
# ============================================
|
|
# Gitea Tools (MCP) - Private repo access
|
|
# ============================================
|
|
|
|
# Lazy-loaded Gitea client
|
|
_gitea_client: Optional[Any] = None
|
|
|
|
|
|
def _get_gitea_client():
|
|
"""Lazy-load Gitea client when first needed."""
|
|
global _gitea_client
|
|
|
|
if _gitea_client is not None:
|
|
return _gitea_client
|
|
|
|
try:
|
|
from gitea_tools.client import get_gitea_client
|
|
_gitea_client = get_gitea_client()
|
|
return _gitea_client
|
|
except Exception as e:
|
|
print(f"[MCP Gitea] Failed to initialize: {e}")
|
|
return None
|
|
|
|
|
|
@tool(
|
|
name="gitea_read_file",
|
|
description="Read a file from a Gitea repository. Use this to access files from Jordan's homelab repo or any configured Gitea repo. Returns the file content as text.",
|
|
input_schema={
|
|
"file_path": str,
|
|
"repo": str,
|
|
"branch": str,
|
|
},
|
|
)
|
|
async def gitea_read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Read a file from a Gitea repository.
|
|
|
|
Zero-cost MCP tool for accessing private Gitea repos.
|
|
"""
|
|
file_path = args.get("file_path", "")
|
|
repo = args.get("repo")
|
|
branch = args.get("branch")
|
|
|
|
if not file_path:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: file_path is required"}],
|
|
"isError": True,
|
|
}
|
|
|
|
client = _get_gitea_client()
|
|
if not client:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": (
|
|
"Error: Gitea not configured. "
|
|
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
|
|
"and add your Personal Access Token."
|
|
),
|
|
}],
|
|
"isError": True,
|
|
}
|
|
|
|
# Parse owner/repo if provided
|
|
owner = None
|
|
if repo and "/" in repo:
|
|
parts = repo.split("/", 1)
|
|
owner = parts[0]
|
|
repo = parts[1]
|
|
|
|
result = await client.get_file_content(
|
|
file_path=file_path,
|
|
owner=owner,
|
|
repo=repo,
|
|
branch=branch,
|
|
)
|
|
|
|
if result["success"]:
|
|
content = result["content"]
|
|
metadata = result.get("metadata", {})
|
|
path_info = metadata.get("path", file_path)
|
|
size = metadata.get("size", 0)
|
|
|
|
header = f"File: {path_info} ({size:,} bytes)"
|
|
if metadata.get("truncated"):
|
|
header += " [TRUNCATED]"
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": f"{header}\n\n{content}"}],
|
|
}
|
|
else:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
|
|
"isError": True,
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="gitea_list_files",
|
|
description="List files and folders in a directory in a Gitea repository. Use this to explore the structure of Jordan's homelab repo or any configured Gitea repo.",
|
|
input_schema={
|
|
"path": str,
|
|
"repo": str,
|
|
"branch": str,
|
|
},
|
|
)
|
|
async def gitea_list_files_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""List files and directories in a Gitea repo path.
|
|
|
|
Zero-cost MCP tool for browsing private Gitea repos.
|
|
"""
|
|
path = args.get("path", "")
|
|
repo = args.get("repo")
|
|
branch = args.get("branch")
|
|
|
|
client = _get_gitea_client()
|
|
if not client:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": (
|
|
"Error: Gitea not configured. "
|
|
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
|
|
"and add your Personal Access Token."
|
|
),
|
|
}],
|
|
"isError": True,
|
|
}
|
|
|
|
# Parse owner/repo if provided
|
|
owner = None
|
|
if repo and "/" in repo:
|
|
parts = repo.split("/", 1)
|
|
owner = parts[0]
|
|
repo = parts[1]
|
|
|
|
result = await client.list_files(
|
|
path=path,
|
|
owner=owner,
|
|
repo=repo,
|
|
branch=branch,
|
|
)
|
|
|
|
if result["success"]:
|
|
files = result["files"]
|
|
repo_name = result.get("repo", "")
|
|
display_path = result.get("path", "/")
|
|
count = result.get("count", 0)
|
|
|
|
# Format output
|
|
lines = [f"Directory: {repo_name}/{display_path} ({count} items)\n"]
|
|
for f in files:
|
|
if f["type"] == "dir":
|
|
lines.append(f" DIR {f['name']}/")
|
|
else:
|
|
size_str = f"({f['size']:,} bytes)" if f["size"] else ""
|
|
lines.append(f" FILE {f['name']} {size_str}")
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": "\n".join(lines)}],
|
|
}
|
|
else:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
|
|
"isError": True,
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="gitea_search_code",
|
|
description="Search for files by name/path in a Gitea repository. Searches file and directory names. For content search, use gitea_read_file on specific files.",
|
|
input_schema={
|
|
"query": str,
|
|
"repo": str,
|
|
},
|
|
)
|
|
async def gitea_search_code_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Search for code/files in a Gitea repository.
|
|
|
|
Zero-cost MCP tool. Searches file/directory names in the repo tree.
|
|
"""
|
|
query = args.get("query", "")
|
|
repo = args.get("repo")
|
|
|
|
if not query:
|
|
return {
|
|
"content": [{"type": "text", "text": "Error: query is required"}],
|
|
"isError": True,
|
|
}
|
|
|
|
client = _get_gitea_client()
|
|
if not client:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": (
|
|
"Error: Gitea not configured. "
|
|
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
|
|
"and add your Personal Access Token."
|
|
),
|
|
}],
|
|
"isError": True,
|
|
}
|
|
|
|
# Parse owner/repo if provided
|
|
owner = None
|
|
if repo and "/" in repo:
|
|
parts = repo.split("/", 1)
|
|
owner = parts[0]
|
|
repo = parts[1]
|
|
|
|
result = await client.search_code(
|
|
query=query,
|
|
owner=owner,
|
|
repo=repo,
|
|
)
|
|
|
|
if result["success"]:
|
|
results = result.get("results", [])
|
|
count = result.get("count", 0)
|
|
repo_name = result.get("repo", "")
|
|
|
|
if not results:
|
|
message = result.get("message", f"No results for '{query}'")
|
|
return {
|
|
"content": [{"type": "text", "text": message}],
|
|
}
|
|
|
|
lines = [f"Search results for '{query}' in {repo_name} ({count} matches):\n"]
|
|
for r in results:
|
|
type_icon = "DIR " if r["type"] == "dir" else "FILE"
|
|
size_str = f"({r['size']:,} bytes)" if r.get("size") else ""
|
|
lines.append(f" {type_icon} {r['path']} {size_str}")
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": "\n".join(lines)}],
|
|
}
|
|
else:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
|
|
"isError": True,
|
|
}
|
|
|
|
|
|
@tool(
|
|
name="gitea_get_tree",
|
|
description="Get the directory tree structure from a Gitea repository. Shows all files and folders. Use recursive=true for the full tree.",
|
|
input_schema={
|
|
"repo": str,
|
|
"branch": str,
|
|
"recursive": bool,
|
|
},
|
|
)
|
|
async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get directory tree from a Gitea repository.
|
|
|
|
Zero-cost MCP tool for viewing repo structure.
|
|
"""
|
|
repo = args.get("repo")
|
|
branch = args.get("branch")
|
|
recursive = args.get("recursive", False)
|
|
|
|
client = _get_gitea_client()
|
|
if not client:
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": (
|
|
"Error: Gitea not configured. "
|
|
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
|
|
"and add your Personal Access Token."
|
|
),
|
|
}],
|
|
"isError": True,
|
|
}
|
|
|
|
# Parse owner/repo if provided
|
|
owner = None
|
|
if repo and "/" in repo:
|
|
parts = repo.split("/", 1)
|
|
owner = parts[0]
|
|
repo = parts[1]
|
|
|
|
result = await client.get_tree(
|
|
owner=owner,
|
|
repo=repo,
|
|
branch=branch,
|
|
recursive=recursive,
|
|
)
|
|
|
|
if result["success"]:
|
|
entries = result.get("entries", [])
|
|
repo_name = result.get("repo", "")
|
|
branch_name = result.get("branch", "main")
|
|
total = result.get("total", 0)
|
|
truncated = result.get("truncated", False)
|
|
|
|
lines = [f"Tree: {repo_name} (branch: {branch_name}, {total} entries)"]
|
|
if truncated:
|
|
lines[0] += " [TRUNCATED - tree too large]"
|
|
lines.append("")
|
|
|
|
for entry in entries:
|
|
if entry["type"] == "dir":
|
|
lines.append(f" {entry['path']}/")
|
|
else:
|
|
size_str = f"({entry['size']:,} bytes)" if entry.get("size") else ""
|
|
lines.append(f" {entry['path']} {size_str}")
|
|
|
|
# Truncate output if too long
|
|
text = "\n".join(lines)
|
|
if len(text) > _MAX_TOOL_OUTPUT:
|
|
text = text[:_MAX_TOOL_OUTPUT] + "\n\n... (tree truncated, use gitea_list_files for specific directories)"
|
|
|
|
return {
|
|
"content": [{"type": "text", "text": text}],
|
|
}
|
|
else:
|
|
return {
|
|
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
|
|
"isError": True,
|
|
}
|
|
|
|
|
|
# Create the MCP server with all tools
|
|
file_system_server = create_sdk_mcp_server(
|
|
name="file_system",
|
|
version="2.0.0",
|
|
tools=[
|
|
# File and system tools
|
|
read_file_tool,
|
|
write_file_tool,
|
|
edit_file_tool,
|
|
list_directory_tool,
|
|
run_command_tool,
|
|
# Web tool
|
|
web_fetch_tool,
|
|
# Zettelkasten tools
|
|
fleeting_note_tool,
|
|
daily_note_tool,
|
|
literature_note_tool,
|
|
permanent_note_tool,
|
|
search_vault_tool,
|
|
search_by_tags_tool,
|
|
# Weather
|
|
get_weather,
|
|
# Gmail tools
|
|
send_email,
|
|
read_emails,
|
|
get_email,
|
|
# Calendar tools
|
|
read_calendar,
|
|
create_calendar_event,
|
|
search_calendar,
|
|
# Contacts tools
|
|
create_contact,
|
|
list_contacts,
|
|
get_contact,
|
|
# Gitea tools
|
|
gitea_read_file_tool,
|
|
gitea_list_files_tool,
|
|
gitea_search_code_tool,
|
|
gitea_get_tree_tool,
|
|
]
|
|
)
|