"""Tool definitions and execution for agent capabilities.""" import os import subprocess from pathlib import Path from typing import Any, Dict, List, Optional # Google tools (lazy loaded when needed) _gmail_client: Optional[Any] = None _calendar_client: Optional[Any] = None _people_client: Optional[Any] = None # Tool definitions in Anthropic's tool use format TOOL_DEFINITIONS = [ { "name": "read_file", "description": "Read the contents of a file. Use this to view configuration files, code, or any text file.", "input_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the file to read (relative or absolute)", } }, "required": ["file_path"], }, }, { "name": "write_file", "description": "Write content to a file. Creates a new file or overwrites existing file completely.", "input_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the file to write", }, "content": { "type": "string", "description": "Content to write to the file", }, }, "required": ["file_path", "content"], }, }, { "name": "edit_file", "description": "Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.", "input_schema": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the file to edit", }, "old_text": { "type": "string", "description": "Exact text to find and replace", }, "new_text": { "type": "string", "description": "New text to replace with", }, }, "required": ["file_path", "old_text", "new_text"], }, }, { "name": "list_directory", "description": "List files and directories in a given path. Useful for exploring the file system.", "input_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path to list (defaults to current directory)", "default": ".", } }, }, }, { "name": "run_command", "description": "Execute a shell command. Use for git operations, running scripts, installing packages, etc.", "input_schema": { "type": "object", "properties": { "command": { "type": "string", "description": "Shell command to execute", }, "working_dir": { "type": "string", "description": "Working directory for command execution (defaults to current directory)", "default": ".", }, }, "required": ["command"], }, }, { "name": "get_weather", "description": "Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and brief summary.", "input_schema": { "type": "object", "properties": { "location": { "type": "string", "description": "City name or 'City, Country' (e.g., 'Phoenix, US' or 'London, GB'). Defaults to Phoenix, AZ if not specified.", "default": "Phoenix, US", } }, "required": [], }, }, # Gmail tools { "name": "send_email", "description": "Send an email from the bot's Gmail account. Requires prior OAuth setup (--setup-google).", "input_schema": { "type": "object", "properties": { "to": { "type": "string", "description": "Recipient email address", }, "subject": { "type": "string", "description": "Email subject line", }, "body": { "type": "string", "description": "Email body (plain text or HTML)", }, "cc": { "type": "array", "items": {"type": "string"}, "description": "Optional list of CC recipients", }, "reply_to_message_id": { "type": "string", "description": "Optional Gmail message ID to reply to (for threading)", }, }, "required": ["to", "subject", "body"], }, }, { "name": "read_emails", "description": "Search and read emails from the bot's Gmail account using Gmail search syntax (e.g., 'from:user@example.com after:2026/02/10').", "input_schema": { "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query (supports from, to, subject, after, before, has:attachment, etc.)", "default": "", }, "max_results": { "type": "integer", "description": "Maximum number of emails to return (default: 10, max: 50)", "default": 10, }, "include_body": { "type": "boolean", "description": "Whether to include full email body (default: false, shows snippet only)", "default": False, }, }, }, }, { "name": "get_email", "description": "Get full content of a specific email by its Gmail message ID.", "input_schema": { "type": "object", "properties": { "message_id": { "type": "string", "description": "Gmail message ID", }, "format": { "type": "string", "description": "Format type: 'text' or 'html' (default: text)", "default": "text", }, }, "required": ["message_id"], }, }, # Calendar tools { "name": "read_calendar", "description": "Read upcoming events from Google Calendar. Shows events from today onwards.", "input_schema": { "type": "object", "properties": { "days_ahead": { "type": "integer", "description": "Number of days ahead to look (default: 7, max: 30)", "default": 7, }, "calendar_id": { "type": "string", "description": "Calendar ID (default: 'primary' for main calendar)", "default": "primary", }, "max_results": { "type": "integer", "description": "Maximum number of events to return (default: 20)", "default": 20, }, }, }, }, { "name": "create_calendar_event", "description": "Create a new event in Google Calendar. Use ISO 8601 format for times (e.g., '2026-02-14T10:00:00Z').", "input_schema": { "type": "object", "properties": { "summary": { "type": "string", "description": "Event title/summary", }, "start_time": { "type": "string", "description": "Event start time in ISO 8601 format (e.g., '2026-02-14T10:00:00Z')", }, "end_time": { "type": "string", "description": "Event end time in ISO 8601 format", }, "description": { "type": "string", "description": "Optional event description", "default": "", }, "location": { "type": "string", "description": "Optional event location", "default": "", }, "calendar_id": { "type": "string", "description": "Calendar ID (default: 'primary')", "default": "primary", }, }, "required": ["summary", "start_time", "end_time"], }, }, { "name": "search_calendar", "description": "Search calendar events by text query. Searches event titles and descriptions.", "input_schema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query text", }, "calendar_id": { "type": "string", "description": "Calendar ID (default: 'primary')", "default": "primary", }, }, "required": ["query"], }, }, # Contacts tools { "name": "create_contact", "description": "Create a new Google contact. Requires prior OAuth setup (--setup-google).", "input_schema": { "type": "object", "properties": { "given_name": { "type": "string", "description": "Contact's first name", }, "family_name": { "type": "string", "description": "Contact's last name", "default": "", }, "email": { "type": "string", "description": "Contact's email address", "default": "", }, "phone": { "type": "string", "description": "Contact's phone number", }, "notes": { "type": "string", "description": "Optional notes about the contact", }, }, "required": ["given_name"], }, }, { "name": "list_contacts", "description": "List or search Google contacts. Without a query, lists all contacts sorted by last name.", "input_schema": { "type": "object", "properties": { "max_results": { "type": "integer", "description": "Maximum number of contacts to return (default: 100, max: 1000)", "default": 100, }, "query": { "type": "string", "description": "Optional search query to filter contacts by name, email, etc.", }, }, }, }, { "name": "get_contact", "description": "Get full details of a specific Google contact by resource name.", "input_schema": { "type": "object", "properties": { "resource_name": { "type": "string", "description": "Contact resource name (e.g., 'people/c1234567890')", }, }, "required": ["resource_name"], }, }, ] def execute_tool(tool_name: str, tool_input: Dict[str, Any], healing_system: Any = None) -> str: """Execute a tool and return the result as a string.""" try: # MCP tools (zettelkasten + web_fetch) - route to mcp_tools.py MCP_TOOLS = { "web_fetch", "fleeting_note", "daily_note", "literature_note", "permanent_note", "search_vault", "search_by_tags" } if tool_name in MCP_TOOLS: # Route to MCP tool handlers import anyio from mcp_tools import ( web_fetch_tool, fleeting_note_tool, daily_note_tool, literature_note_tool, permanent_note_tool, search_vault_tool, search_by_tags_tool ) # Map tool names to their handlers mcp_handlers = { "web_fetch": web_fetch_tool, "fleeting_note": fleeting_note_tool, "daily_note": daily_note_tool, "literature_note": literature_note_tool, "permanent_note": permanent_note_tool, "search_vault": search_vault_tool, "search_by_tags": search_by_tags_tool, } # Execute MCP tool asynchronously handler = mcp_handlers[tool_name] result = anyio.run(handler, tool_input) # Convert result to string if needed if isinstance(result, dict): if "error" in result: return f"Error: {result['error']}" elif "content" in result: return result["content"] else: return str(result) return str(result) # File tools (traditional handlers - kept for backward compatibility) if tool_name == "read_file": return _read_file(tool_input["file_path"]) elif tool_name == "write_file": return _write_file(tool_input["file_path"], tool_input["content"]) elif tool_name == "edit_file": return _edit_file( tool_input["file_path"], tool_input["old_text"], tool_input["new_text"], ) elif tool_name == "list_directory": path = tool_input.get("path", ".") return _list_directory(path) elif tool_name == "run_command": command = tool_input["command"] working_dir = tool_input.get("working_dir", ".") return _run_command(command, working_dir) elif tool_name == "get_weather": location = tool_input.get("location", "Phoenix, US") return _get_weather(location) # Gmail tools elif tool_name == "send_email": return _send_email( to=tool_input["to"], subject=tool_input["subject"], body=tool_input["body"], cc=tool_input.get("cc"), reply_to_message_id=tool_input.get("reply_to_message_id"), ) elif tool_name == "read_emails": return _read_emails( query=tool_input.get("query", ""), max_results=tool_input.get("max_results", 10), include_body=tool_input.get("include_body", False), ) elif tool_name == "get_email": return _get_email( message_id=tool_input["message_id"], format_type=tool_input.get("format", "text"), ) # Calendar tools elif tool_name == "read_calendar": return _read_calendar( days_ahead=tool_input.get("days_ahead", 7), calendar_id=tool_input.get("calendar_id", "primary"), max_results=tool_input.get("max_results", 20), ) elif tool_name == "create_calendar_event": return _create_calendar_event( summary=tool_input["summary"], start_time=tool_input["start_time"], end_time=tool_input["end_time"], description=tool_input.get("description", ""), location=tool_input.get("location", ""), calendar_id=tool_input.get("calendar_id", "primary"), ) elif tool_name == "search_calendar": return _search_calendar( query=tool_input["query"], calendar_id=tool_input.get("calendar_id", "primary"), ) # Contacts tools elif tool_name == "create_contact": return _create_contact( given_name=tool_input["given_name"], family_name=tool_input.get("family_name", ""), email=tool_input.get("email", ""), phone=tool_input.get("phone"), notes=tool_input.get("notes"), ) elif tool_name == "list_contacts": return _list_contacts( max_results=tool_input.get("max_results", 100), query=tool_input.get("query"), ) elif tool_name == "get_contact": return _get_contact( resource_name=tool_input["resource_name"], ) else: return f"Error: Unknown tool '{tool_name}'" except Exception as e: if healing_system: healing_system.capture_error( error=e, component=f"tools.py:{tool_name}", intent=f"Executing {tool_name} tool", context={"tool_name": tool_name, "input": tool_input}, ) return f"Error executing {tool_name}: {str(e)}" # Maximum characters of tool output to return (prevents token explosion) _MAX_TOOL_OUTPUT = 5000 def _read_file(file_path: str) -> str: """Read and return file contents.""" path = Path(file_path) if not path.exists(): return f"Error: File not found: {file_path}" try: content = path.read_text(encoding="utf-8") if len(content) > _MAX_TOOL_OUTPUT: content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)" return f"Content of {file_path}:\n\n{content}" except Exception as e: return f"Error reading file: {str(e)}" def _write_file(file_path: str, content: str) -> str: """Write content to a file.""" path = Path(file_path) try: # Create parent directories if they don't exist path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return f"Successfully wrote to {file_path} ({len(content)} characters)" except Exception as e: return f"Error writing file: {str(e)}" def _edit_file(file_path: str, old_text: str, new_text: str) -> str: """Edit file by replacing text.""" path = Path(file_path) if not path.exists(): return f"Error: File not found: {file_path}" try: content = path.read_text(encoding="utf-8") if old_text not in content: return f"Error: Text not found in file. Could not find:\n{old_text[:100]}..." new_content = content.replace(old_text, new_text, 1) path.write_text(new_content, encoding="utf-8") return f"Successfully edited {file_path}. Replaced 1 occurrence." except Exception as e: return f"Error editing file: {str(e)}" def _list_directory(path: str) -> str: """List directory contents.""" dir_path = Path(path) if not dir_path.exists(): return f"Error: Directory not found: {path}" if not dir_path.is_dir(): return f"Error: Not a directory: {path}" try: items = [] for item in sorted(dir_path.iterdir()): item_type = "DIR " if item.is_dir() else "FILE" size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)" items.append(f" {item_type} {item.name}{size}") if not items: return f"Directory {path} is empty" return f"Contents of {path}:\n" + "\n".join(items) except Exception as e: return f"Error listing directory: {str(e)}" def _run_command(command: str, working_dir: str) -> str: """Execute a shell command.""" try: result = subprocess.run( command, shell=True, cwd=working_dir, capture_output=True, text=True, timeout=30, ) output = [] if result.stdout: stdout = result.stdout if len(stdout) > _MAX_TOOL_OUTPUT: stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)" output.append(f"STDOUT:\n{stdout}") if result.stderr: stderr = result.stderr if len(stderr) > _MAX_TOOL_OUTPUT: stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)" output.append(f"STDERR:\n{stderr}") status = f"Command exited with code {result.returncode}" if not output: return status return status + "\n\n" + "\n\n".join(output) except subprocess.TimeoutExpired: return "Error: Command timed out after 30 seconds" except Exception as e: return f"Error running command: {str(e)}" def _get_weather(location: str = "Phoenix, US") -> str: """Get current weather for a location using OpenWeatherMap API. Args: location: City name or 'City, Country' (e.g., 'Phoenix, US') Returns: Weather summary string """ import requests api_key = os.getenv("OPENWEATHERMAP_API_KEY") if not api_key: return "Error: OPENWEATHERMAP_API_KEY not found in environment variables. Please add it to your .env file." try: # OpenWeatherMap API endpoint base_url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": location, "appid": api_key, "units": "imperial" # Fahrenheit } response = requests.get(base_url, params=params, timeout=10) response.raise_for_status() data = response.json() # Extract weather data temp = data["main"]["temp"] feels_like = data["main"]["feels_like"] description = data["weather"][0]["description"].capitalize() humidity = data["main"]["humidity"] wind_speed = data["wind"]["speed"] city = data["name"] # Format weather summary summary = f"**{city} Weather:**\n" summary += f"🌡️ {temp}°F (feels like {feels_like}°F)\n" summary += f"☁️ {description}\n" summary += f"💧 Humidity: {humidity}%\n" summary += f"💨 Wind: {wind_speed} mph" return summary except requests.exceptions.HTTPError as e: if e.response.status_code == 401: return "Error: Invalid OpenWeatherMap API key. Please check your OPENWEATHERMAP_API_KEY in .env file." elif e.response.status_code == 404: return f"Error: Location '{location}' not found. Try format: 'City, Country' (e.g., 'Phoenix, US')" else: return f"Error: OpenWeatherMap API error: {e}" except requests.exceptions.Timeout: return "Error: Weather API request timed out. Please try again." except Exception as e: return f"Error getting weather: {str(e)}" # Google Tools Handlers def _initialize_google_clients() -> tuple[Optional[Any], Optional[Any], Optional[Any]]: """Lazy initialization of Google clients. Returns: Tuple of (gmail_client, calendar_client, people_client) or (None, None, None) if not authorized """ global _gmail_client, _calendar_client, _people_client if _gmail_client is not None and _calendar_client is not None and _people_client is not None: return _gmail_client, _calendar_client, _people_client try: from google_tools.oauth_manager import GoogleOAuthManager from google_tools.gmail_client import GmailClient from google_tools.calendar_client import CalendarClient from google_tools.people_client import PeopleClient oauth_manager = GoogleOAuthManager() credentials = oauth_manager.get_credentials() if not credentials: return None, None, None _gmail_client = GmailClient(oauth_manager) _calendar_client = CalendarClient(oauth_manager) _people_client = PeopleClient(oauth_manager) return _gmail_client, _calendar_client, _people_client except Exception as e: print(f"[Google Tools] Failed to initialize: {e}") return None, None, None def _send_email( to: str, subject: str, body: str, cc: Optional[List[str]] = None, reply_to_message_id: Optional[str] = None, ) -> str: """Send an email via Gmail API.""" gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = gmail_client.send_email( to=to, subject=subject, body=body, cc=cc, reply_to_message_id=reply_to_message_id, ) if result["success"]: msg_id = result.get("message_id", "unknown") return f"✓ Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}" else: return f"Error sending email: {result.get('error', 'Unknown error')}" def _read_emails( query: str = "", max_results: int = 10, include_body: bool = False, ) -> str: """Search and read emails from Gmail.""" gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = gmail_client.search_emails( query=query, max_results=max_results, include_body=include_body, ) if result["success"]: summary = result.get("summary", "") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" return summary else: return f"Error reading emails: {result.get('error', 'Unknown error')}" def _get_email(message_id: str, format_type: str = "text") -> str: """Get full content of a specific email.""" gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = gmail_client.get_email(message_id=message_id, format_type=format_type) if result["success"]: email_data = result.get("email", {}) output = [] output.append(f"From: {email_data.get('from', 'Unknown')}") output.append(f"To: {email_data.get('to', 'Unknown')}") if email_data.get("cc"): output.append(f"CC: {email_data['cc']}") output.append(f"Subject: {email_data.get('subject', 'No subject')}") output.append(f"Date: {email_data.get('date', 'Unknown')}") output.append(f"\n{email_data.get('body', '')}") if email_data.get("attachments"): output.append(f"\nAttachments: {', '.join(email_data['attachments'])}") full_output = "\n".join(output) if len(full_output) > _MAX_TOOL_OUTPUT: full_output = full_output[:_MAX_TOOL_OUTPUT] + "\n... (email truncated)" return full_output else: return f"Error getting email: {result.get('error', 'Unknown error')}" def _read_calendar( days_ahead: int = 7, calendar_id: str = "primary", max_results: int = 20, ) -> str: """Read upcoming calendar events.""" _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = calendar_client.list_events( days_ahead=days_ahead, calendar_id=calendar_id, max_results=max_results, ) if result["success"]: summary = result.get("summary", "No events found") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" return f"Upcoming events (next {days_ahead} days):\n\n{summary}" else: return f"Error reading calendar: {result.get('error', 'Unknown error')}" def _create_calendar_event( summary: str, start_time: str, end_time: str, description: str = "", location: str = "", calendar_id: str = "primary", ) -> str: """Create a new calendar event.""" _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = calendar_client.create_event( summary=summary, start_time=start_time, end_time=end_time, description=description, location=location, calendar_id=calendar_id, ) if result["success"]: event_id = result.get("event_id", "unknown") html_link = result.get("html_link", "") start = result.get("start", start_time) output = [ f"✓ Calendar event created successfully!", f"Title: {summary}", f"Start: {start}", f"Event ID: {event_id}", ] if html_link: output.append(f"Link: {html_link}") if location: output.append(f"Location: {location}") return "\n".join(output) else: return f"Error creating calendar event: {result.get('error', 'Unknown error')}" def _search_calendar(query: str, calendar_id: str = "primary") -> str: """Search calendar events by text query.""" _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = calendar_client.search_events(query=query, calendar_id=calendar_id) if result["success"]: summary = result.get("summary", "No events found") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" return f'Search results for "{query}":\n\n{summary}' else: return f"Error searching calendar: {result.get('error', 'Unknown error')}" # Contacts Tools Handlers def _create_contact( given_name: str, family_name: str = "", email: str = "", phone: Optional[str] = None, notes: Optional[str] = None, ) -> str: """Create a new Google contact.""" _, _, people_client = _initialize_google_clients() if not people_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = people_client.create_contact( given_name=given_name, family_name=family_name, email=email, phone=phone, notes=notes, ) if result["success"]: name = result.get("name", given_name) resource = result.get("resource_name", "") return f"Contact created: {name}\nResource: {resource}" else: return f"Error creating contact: {result.get('error', 'Unknown error')}" def _list_contacts( max_results: int = 100, query: Optional[str] = None, ) -> str: """List or search Google contacts.""" _, _, people_client = _initialize_google_clients() if not people_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = people_client.list_contacts(max_results=max_results, query=query) if result["success"]: summary = result.get("summary", "No contacts found.") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" return f"Contacts ({result.get('count', 0)} found):\n\n{summary}" else: return f"Error listing contacts: {result.get('error', 'Unknown error')}" def _get_contact(resource_name: str) -> str: """Get full details of a specific contact.""" _, _, people_client = _initialize_google_clients() if not people_client: return "Error: Google not authorized. Run: python bot_runner.py --setup-google" result = people_client.get_contact(resource_name=resource_name) if result["success"]: c = result.get("contact", {}) output = [] name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip() output.append(f"Name: {name or '(no name)'}") if c.get("email"): output.append(f"Email: {c['email']}") if c.get("phone"): output.append(f"Phone: {c['phone']}") if c.get("notes"): output.append(f"Notes: {c['notes']}") output.append(f"Resource: {c.get('resource_name', resource_name)}") return "\n".join(output) else: return f"Error getting contact: {result.get('error', 'Unknown error')}"