Files
ajarbot/tools.py
Jordan Ramos f018800d94 Implement self-healing system Phase 1: Error capture and logging
- Add SelfHealingSystem with error observation infrastructure
- Capture errors with full context: type, message, stack trace, intent, inputs
- Log to MEMORY.md with deduplication (max 3 attempts per error signature)
- Integrate error capture in agent, tools, runtime, and scheduler
- Non-invasive: preserves all existing error handling behavior
- Foundation for future diagnosis and auto-fixing capabilities

Phase 1 of 4-phase rollout - observation only, no auto-fixing yet.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-14 18:03:42 -07:00

823 lines
28 KiB
Python

"""Tool definitions and execution for agent capabilities."""
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional
# Google tools (lazy loaded when needed)
_gmail_client: Optional[Any] = None
_calendar_client: Optional[Any] = None
_people_client: Optional[Any] = None
# Tool definitions in Anthropic's tool use format
TOOL_DEFINITIONS = [
{
"name": "read_file",
"description": "Read the contents of a file. Use this to view configuration files, code, or any text file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to read (relative or absolute)",
}
},
"required": ["file_path"],
},
},
{
"name": "write_file",
"description": "Write content to a file. Creates a new file or overwrites existing file completely.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
},
"required": ["file_path", "content"],
},
},
{
"name": "edit_file",
"description": "Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to edit",
},
"old_text": {
"type": "string",
"description": "Exact text to find and replace",
},
"new_text": {
"type": "string",
"description": "New text to replace with",
},
},
"required": ["file_path", "old_text", "new_text"],
},
},
{
"name": "list_directory",
"description": "List files and directories in a given path. Useful for exploring the file system.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list (defaults to current directory)",
"default": ".",
}
},
},
},
{
"name": "run_command",
"description": "Execute a shell command. Use for git operations, running scripts, installing packages, etc.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute",
},
"working_dir": {
"type": "string",
"description": "Working directory for command execution (defaults to current directory)",
"default": ".",
},
},
"required": ["command"],
},
},
# Gmail tools
{
"name": "send_email",
"description": "Send an email from the bot's Gmail account. Requires prior OAuth setup (--setup-google).",
"input_schema": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient email address",
},
"subject": {
"type": "string",
"description": "Email subject line",
},
"body": {
"type": "string",
"description": "Email body (plain text or HTML)",
},
"cc": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of CC recipients",
},
"reply_to_message_id": {
"type": "string",
"description": "Optional Gmail message ID to reply to (for threading)",
},
},
"required": ["to", "subject", "body"],
},
},
{
"name": "read_emails",
"description": "Search and read emails from the bot's Gmail account using Gmail search syntax (e.g., 'from:user@example.com after:2026/02/10').",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (supports from, to, subject, after, before, has:attachment, etc.)",
"default": "",
},
"max_results": {
"type": "integer",
"description": "Maximum number of emails to return (default: 10, max: 50)",
"default": 10,
},
"include_body": {
"type": "boolean",
"description": "Whether to include full email body (default: false, shows snippet only)",
"default": False,
},
},
},
},
{
"name": "get_email",
"description": "Get full content of a specific email by its Gmail message ID.",
"input_schema": {
"type": "object",
"properties": {
"message_id": {
"type": "string",
"description": "Gmail message ID",
},
"format": {
"type": "string",
"description": "Format type: 'text' or 'html' (default: text)",
"default": "text",
},
},
"required": ["message_id"],
},
},
# Calendar tools
{
"name": "read_calendar",
"description": "Read upcoming events from Google Calendar. Shows events from today onwards.",
"input_schema": {
"type": "object",
"properties": {
"days_ahead": {
"type": "integer",
"description": "Number of days ahead to look (default: 7, max: 30)",
"default": 7,
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary' for main calendar)",
"default": "primary",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (default: 20)",
"default": 20,
},
},
},
},
{
"name": "create_calendar_event",
"description": "Create a new event in Google Calendar. Use ISO 8601 format for times (e.g., '2026-02-14T10:00:00Z').",
"input_schema": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Event title/summary",
},
"start_time": {
"type": "string",
"description": "Event start time in ISO 8601 format (e.g., '2026-02-14T10:00:00Z')",
},
"end_time": {
"type": "string",
"description": "Event end time in ISO 8601 format",
},
"description": {
"type": "string",
"description": "Optional event description",
"default": "",
},
"location": {
"type": "string",
"description": "Optional event location",
"default": "",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["summary", "start_time", "end_time"],
},
},
{
"name": "search_calendar",
"description": "Search calendar events by text query. Searches event titles and descriptions.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query text",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["query"],
},
},
# Contacts tools
{
"name": "create_contact",
"description": "Create a new Google contact. Requires prior OAuth setup (--setup-google).",
"input_schema": {
"type": "object",
"properties": {
"given_name": {
"type": "string",
"description": "Contact's first name",
},
"family_name": {
"type": "string",
"description": "Contact's last name",
"default": "",
},
"email": {
"type": "string",
"description": "Contact's email address",
"default": "",
},
"phone": {
"type": "string",
"description": "Contact's phone number",
},
"notes": {
"type": "string",
"description": "Optional notes about the contact",
},
},
"required": ["given_name"],
},
},
{
"name": "list_contacts",
"description": "List or search Google contacts. Without a query, lists all contacts sorted by last name.",
"input_schema": {
"type": "object",
"properties": {
"max_results": {
"type": "integer",
"description": "Maximum number of contacts to return (default: 100, max: 1000)",
"default": 100,
},
"query": {
"type": "string",
"description": "Optional search query to filter contacts by name, email, etc.",
},
},
},
},
{
"name": "get_contact",
"description": "Get full details of a specific Google contact by resource name.",
"input_schema": {
"type": "object",
"properties": {
"resource_name": {
"type": "string",
"description": "Contact resource name (e.g., 'people/c1234567890')",
},
},
"required": ["resource_name"],
},
},
]
def execute_tool(tool_name: str, tool_input: Dict[str, Any], healing_system: Any = None) -> str:
"""Execute a tool and return the result as a string."""
try:
# File tools
if tool_name == "read_file":
return _read_file(tool_input["file_path"])
elif tool_name == "write_file":
return _write_file(tool_input["file_path"], tool_input["content"])
elif tool_name == "edit_file":
return _edit_file(
tool_input["file_path"],
tool_input["old_text"],
tool_input["new_text"],
)
elif tool_name == "list_directory":
path = tool_input.get("path", ".")
return _list_directory(path)
elif tool_name == "run_command":
command = tool_input["command"]
working_dir = tool_input.get("working_dir", ".")
return _run_command(command, working_dir)
# Gmail tools
elif tool_name == "send_email":
return _send_email(
to=tool_input["to"],
subject=tool_input["subject"],
body=tool_input["body"],
cc=tool_input.get("cc"),
reply_to_message_id=tool_input.get("reply_to_message_id"),
)
elif tool_name == "read_emails":
return _read_emails(
query=tool_input.get("query", ""),
max_results=tool_input.get("max_results", 10),
include_body=tool_input.get("include_body", False),
)
elif tool_name == "get_email":
return _get_email(
message_id=tool_input["message_id"],
format_type=tool_input.get("format", "text"),
)
# Calendar tools
elif tool_name == "read_calendar":
return _read_calendar(
days_ahead=tool_input.get("days_ahead", 7),
calendar_id=tool_input.get("calendar_id", "primary"),
max_results=tool_input.get("max_results", 20),
)
elif tool_name == "create_calendar_event":
return _create_calendar_event(
summary=tool_input["summary"],
start_time=tool_input["start_time"],
end_time=tool_input["end_time"],
description=tool_input.get("description", ""),
location=tool_input.get("location", ""),
calendar_id=tool_input.get("calendar_id", "primary"),
)
elif tool_name == "search_calendar":
return _search_calendar(
query=tool_input["query"],
calendar_id=tool_input.get("calendar_id", "primary"),
)
# Contacts tools
elif tool_name == "create_contact":
return _create_contact(
given_name=tool_input["given_name"],
family_name=tool_input.get("family_name", ""),
email=tool_input.get("email", ""),
phone=tool_input.get("phone"),
notes=tool_input.get("notes"),
)
elif tool_name == "list_contacts":
return _list_contacts(
max_results=tool_input.get("max_results", 100),
query=tool_input.get("query"),
)
elif tool_name == "get_contact":
return _get_contact(
resource_name=tool_input["resource_name"],
)
else:
return f"Error: Unknown tool '{tool_name}'"
except Exception as e:
if healing_system:
healing_system.capture_error(
error=e,
component=f"tools.py:{tool_name}",
intent=f"Executing {tool_name} tool",
context={"tool_name": tool_name, "input": tool_input},
)
return f"Error executing {tool_name}: {str(e)}"
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
def _read_file(file_path: str) -> str:
"""Read and return file contents."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if len(content) > _MAX_TOOL_OUTPUT:
content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)"
return f"Content of {file_path}:\n\n{content}"
except Exception as e:
return f"Error reading file: {str(e)}"
def _write_file(file_path: str, content: str) -> str:
"""Write content to a file."""
path = Path(file_path)
try:
# Create parent directories if they don't exist
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return f"Successfully wrote to {file_path} ({len(content)} characters)"
except Exception as e:
return f"Error writing file: {str(e)}"
def _edit_file(file_path: str, old_text: str, new_text: str) -> str:
"""Edit file by replacing text."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: Text not found in file. Could not find:\n{old_text[:100]}..."
new_content = content.replace(old_text, new_text, 1)
path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {file_path}. Replaced 1 occurrence."
except Exception as e:
return f"Error editing file: {str(e)}"
def _list_directory(path: str) -> str:
"""List directory contents."""
dir_path = Path(path)
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
items = []
for item in sorted(dir_path.iterdir()):
item_type = "DIR " if item.is_dir() else "FILE"
size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)"
items.append(f" {item_type} {item.name}{size}")
if not items:
return f"Directory {path} is empty"
return f"Contents of {path}:\n" + "\n".join(items)
except Exception as e:
return f"Error listing directory: {str(e)}"
def _run_command(command: str, working_dir: str) -> str:
"""Execute a shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30,
)
output = []
if result.stdout:
stdout = result.stdout
if len(stdout) > _MAX_TOOL_OUTPUT:
stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)"
output.append(f"STDOUT:\n{stdout}")
if result.stderr:
stderr = result.stderr
if len(stderr) > _MAX_TOOL_OUTPUT:
stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)"
output.append(f"STDERR:\n{stderr}")
status = f"Command exited with code {result.returncode}"
if not output:
return status
return status + "\n\n" + "\n\n".join(output)
except subprocess.TimeoutExpired:
return "Error: Command timed out after 30 seconds"
except Exception as e:
return f"Error running command: {str(e)}"
# Google Tools Handlers
def _initialize_google_clients() -> tuple[Optional[Any], Optional[Any], Optional[Any]]:
"""Lazy initialization of Google clients.
Returns:
Tuple of (gmail_client, calendar_client, people_client) or (None, None, None) if not authorized
"""
global _gmail_client, _calendar_client, _people_client
if _gmail_client is not None and _calendar_client is not None and _people_client is not None:
return _gmail_client, _calendar_client, _people_client
try:
from google_tools.oauth_manager import GoogleOAuthManager
from google_tools.gmail_client import GmailClient
from google_tools.calendar_client import CalendarClient
from google_tools.people_client import PeopleClient
oauth_manager = GoogleOAuthManager()
credentials = oauth_manager.get_credentials()
if not credentials:
return None, None, None
_gmail_client = GmailClient(oauth_manager)
_calendar_client = CalendarClient(oauth_manager)
_people_client = PeopleClient(oauth_manager)
return _gmail_client, _calendar_client, _people_client
except Exception as e:
print(f"[Google Tools] Failed to initialize: {e}")
return None, None, None
def _send_email(
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> str:
"""Send an email via Gmail API."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.send_email(
to=to,
subject=subject,
body=body,
cc=cc,
reply_to_message_id=reply_to_message_id,
)
if result["success"]:
msg_id = result.get("message_id", "unknown")
return f"✓ Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}"
else:
return f"Error sending email: {result.get('error', 'Unknown error')}"
def _read_emails(
query: str = "",
max_results: int = 10,
include_body: bool = False,
) -> str:
"""Search and read emails from Gmail."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.search_emails(
query=query,
max_results=max_results,
include_body=include_body,
)
if result["success"]:
summary = result.get("summary", "")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return summary
else:
return f"Error reading emails: {result.get('error', 'Unknown error')}"
def _get_email(message_id: str, format_type: str = "text") -> str:
"""Get full content of a specific email."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.get_email(message_id=message_id, format_type=format_type)
if result["success"]:
email_data = result.get("email", {})
output = []
output.append(f"From: {email_data.get('from', 'Unknown')}")
output.append(f"To: {email_data.get('to', 'Unknown')}")
if email_data.get("cc"):
output.append(f"CC: {email_data['cc']}")
output.append(f"Subject: {email_data.get('subject', 'No subject')}")
output.append(f"Date: {email_data.get('date', 'Unknown')}")
output.append(f"\n{email_data.get('body', '')}")
if email_data.get("attachments"):
output.append(f"\nAttachments: {', '.join(email_data['attachments'])}")
full_output = "\n".join(output)
if len(full_output) > _MAX_TOOL_OUTPUT:
full_output = full_output[:_MAX_TOOL_OUTPUT] + "\n... (email truncated)"
return full_output
else:
return f"Error getting email: {result.get('error', 'Unknown error')}"
def _read_calendar(
days_ahead: int = 7,
calendar_id: str = "primary",
max_results: int = 20,
) -> str:
"""Read upcoming calendar events."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.list_events(
days_ahead=days_ahead,
calendar_id=calendar_id,
max_results=max_results,
)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f"Upcoming events (next {days_ahead} days):\n\n{summary}"
else:
return f"Error reading calendar: {result.get('error', 'Unknown error')}"
def _create_calendar_event(
summary: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
calendar_id: str = "primary",
) -> str:
"""Create a new calendar event."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.create_event(
summary=summary,
start_time=start_time,
end_time=end_time,
description=description,
location=location,
calendar_id=calendar_id,
)
if result["success"]:
event_id = result.get("event_id", "unknown")
html_link = result.get("html_link", "")
start = result.get("start", start_time)
output = [
f"✓ Calendar event created successfully!",
f"Title: {summary}",
f"Start: {start}",
f"Event ID: {event_id}",
]
if html_link:
output.append(f"Link: {html_link}")
if location:
output.append(f"Location: {location}")
return "\n".join(output)
else:
return f"Error creating calendar event: {result.get('error', 'Unknown error')}"
def _search_calendar(query: str, calendar_id: str = "primary") -> str:
"""Search calendar events by text query."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.search_events(query=query, calendar_id=calendar_id)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f'Search results for "{query}":\n\n{summary}'
else:
return f"Error searching calendar: {result.get('error', 'Unknown error')}"
# Contacts Tools Handlers
def _create_contact(
given_name: str,
family_name: str = "",
email: str = "",
phone: Optional[str] = None,
notes: Optional[str] = None,
) -> str:
"""Create a new Google contact."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.create_contact(
given_name=given_name,
family_name=family_name,
email=email,
phone=phone,
notes=notes,
)
if result["success"]:
name = result.get("name", given_name)
resource = result.get("resource_name", "")
return f"Contact created: {name}\nResource: {resource}"
else:
return f"Error creating contact: {result.get('error', 'Unknown error')}"
def _list_contacts(
max_results: int = 100,
query: Optional[str] = None,
) -> str:
"""List or search Google contacts."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.list_contacts(max_results=max_results, query=query)
if result["success"]:
summary = result.get("summary", "No contacts found.")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f"Contacts ({result.get('count', 0)} found):\n\n{summary}"
else:
return f"Error listing contacts: {result.get('error', 'Unknown error')}"
def _get_contact(resource_name: str) -> str:
"""Get full details of a specific contact."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.get_contact(resource_name=resource_name)
if result["success"]:
c = result.get("contact", {})
output = []
name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip()
output.append(f"Name: {name or '(no name)'}")
if c.get("email"):
output.append(f"Email: {c['email']}")
if c.get("phone"):
output.append(f"Phone: {c['phone']}")
if c.get("notes"):
output.append(f"Notes: {c['notes']}")
output.append(f"Resource: {c.get('resource_name', resource_name)}")
return "\n".join(output)
else:
return f"Error getting contact: {result.get('error', 'Unknown error')}"