Files
ajarbot/tools.py

651 lines
22 KiB
Python
Raw Normal View History

"""Tool definitions and execution for agent capabilities."""
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional
# Google tools (lazy loaded when needed)
_gmail_client: Optional[Any] = None
_calendar_client: Optional[Any] = None
# Tool definitions in Anthropic's tool use format
TOOL_DEFINITIONS = [
{
"name": "read_file",
"description": "Read the contents of a file. Use this to view configuration files, code, or any text file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to read (relative or absolute)",
}
},
"required": ["file_path"],
},
},
{
"name": "write_file",
"description": "Write content to a file. Creates a new file or overwrites existing file completely.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
},
"required": ["file_path", "content"],
},
},
{
"name": "edit_file",
"description": "Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to edit",
},
"old_text": {
"type": "string",
"description": "Exact text to find and replace",
},
"new_text": {
"type": "string",
"description": "New text to replace with",
},
},
"required": ["file_path", "old_text", "new_text"],
},
},
{
"name": "list_directory",
"description": "List files and directories in a given path. Useful for exploring the file system.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list (defaults to current directory)",
"default": ".",
}
},
},
},
{
"name": "run_command",
"description": "Execute a shell command. Use for git operations, running scripts, installing packages, etc.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute",
},
"working_dir": {
"type": "string",
"description": "Working directory for command execution (defaults to current directory)",
"default": ".",
},
},
"required": ["command"],
},
},
# Gmail tools
{
"name": "send_email",
"description": "Send an email from the bot's Gmail account. Requires prior OAuth setup (--setup-google).",
"input_schema": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient email address",
},
"subject": {
"type": "string",
"description": "Email subject line",
},
"body": {
"type": "string",
"description": "Email body (plain text or HTML)",
},
"cc": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of CC recipients",
},
"reply_to_message_id": {
"type": "string",
"description": "Optional Gmail message ID to reply to (for threading)",
},
},
"required": ["to", "subject", "body"],
},
},
{
"name": "read_emails",
"description": "Search and read emails from the bot's Gmail account using Gmail search syntax (e.g., 'from:user@example.com after:2026/02/10').",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (supports from, to, subject, after, before, has:attachment, etc.)",
"default": "",
},
"max_results": {
"type": "integer",
"description": "Maximum number of emails to return (default: 10, max: 50)",
"default": 10,
},
"include_body": {
"type": "boolean",
"description": "Whether to include full email body (default: false, shows snippet only)",
"default": False,
},
},
},
},
{
"name": "get_email",
"description": "Get full content of a specific email by its Gmail message ID.",
"input_schema": {
"type": "object",
"properties": {
"message_id": {
"type": "string",
"description": "Gmail message ID",
},
"format": {
"type": "string",
"description": "Format type: 'text' or 'html' (default: text)",
"default": "text",
},
},
"required": ["message_id"],
},
},
# Calendar tools
{
"name": "read_calendar",
"description": "Read upcoming events from Google Calendar. Shows events from today onwards.",
"input_schema": {
"type": "object",
"properties": {
"days_ahead": {
"type": "integer",
"description": "Number of days ahead to look (default: 7, max: 30)",
"default": 7,
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary' for main calendar)",
"default": "primary",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (default: 20)",
"default": 20,
},
},
},
},
{
"name": "create_calendar_event",
"description": "Create a new event in Google Calendar. Use ISO 8601 format for times (e.g., '2026-02-14T10:00:00Z').",
"input_schema": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Event title/summary",
},
"start_time": {
"type": "string",
"description": "Event start time in ISO 8601 format (e.g., '2026-02-14T10:00:00Z')",
},
"end_time": {
"type": "string",
"description": "Event end time in ISO 8601 format",
},
"description": {
"type": "string",
"description": "Optional event description",
"default": "",
},
"location": {
"type": "string",
"description": "Optional event location",
"default": "",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["summary", "start_time", "end_time"],
},
},
{
"name": "search_calendar",
"description": "Search calendar events by text query. Searches event titles and descriptions.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query text",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["query"],
},
},
]
def execute_tool(tool_name: str, tool_input: Dict[str, Any]) -> str:
"""Execute a tool and return the result as a string."""
try:
# File tools
if tool_name == "read_file":
return _read_file(tool_input["file_path"])
elif tool_name == "write_file":
return _write_file(tool_input["file_path"], tool_input["content"])
elif tool_name == "edit_file":
return _edit_file(
tool_input["file_path"],
tool_input["old_text"],
tool_input["new_text"],
)
elif tool_name == "list_directory":
path = tool_input.get("path", ".")
return _list_directory(path)
elif tool_name == "run_command":
command = tool_input["command"]
working_dir = tool_input.get("working_dir", ".")
return _run_command(command, working_dir)
# Gmail tools
elif tool_name == "send_email":
return _send_email(
to=tool_input["to"],
subject=tool_input["subject"],
body=tool_input["body"],
cc=tool_input.get("cc"),
reply_to_message_id=tool_input.get("reply_to_message_id"),
)
elif tool_name == "read_emails":
return _read_emails(
query=tool_input.get("query", ""),
max_results=tool_input.get("max_results", 10),
include_body=tool_input.get("include_body", False),
)
elif tool_name == "get_email":
return _get_email(
message_id=tool_input["message_id"],
format_type=tool_input.get("format", "text"),
)
# Calendar tools
elif tool_name == "read_calendar":
return _read_calendar(
days_ahead=tool_input.get("days_ahead", 7),
calendar_id=tool_input.get("calendar_id", "primary"),
max_results=tool_input.get("max_results", 20),
)
elif tool_name == "create_calendar_event":
return _create_calendar_event(
summary=tool_input["summary"],
start_time=tool_input["start_time"],
end_time=tool_input["end_time"],
description=tool_input.get("description", ""),
location=tool_input.get("location", ""),
calendar_id=tool_input.get("calendar_id", "primary"),
)
elif tool_name == "search_calendar":
return _search_calendar(
query=tool_input["query"],
calendar_id=tool_input.get("calendar_id", "primary"),
)
else:
return f"Error: Unknown tool '{tool_name}'"
except Exception as e:
return f"Error executing {tool_name}: {str(e)}"
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
def _read_file(file_path: str) -> str:
"""Read and return file contents."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if len(content) > _MAX_TOOL_OUTPUT:
content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)"
return f"Content of {file_path}:\n\n{content}"
except Exception as e:
return f"Error reading file: {str(e)}"
def _write_file(file_path: str, content: str) -> str:
"""Write content to a file."""
path = Path(file_path)
try:
# Create parent directories if they don't exist
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return f"Successfully wrote to {file_path} ({len(content)} characters)"
except Exception as e:
return f"Error writing file: {str(e)}"
def _edit_file(file_path: str, old_text: str, new_text: str) -> str:
"""Edit file by replacing text."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: Text not found in file. Could not find:\n{old_text[:100]}..."
new_content = content.replace(old_text, new_text, 1)
path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {file_path}. Replaced 1 occurrence."
except Exception as e:
return f"Error editing file: {str(e)}"
def _list_directory(path: str) -> str:
"""List directory contents."""
dir_path = Path(path)
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
items = []
for item in sorted(dir_path.iterdir()):
item_type = "DIR " if item.is_dir() else "FILE"
size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)"
items.append(f" {item_type} {item.name}{size}")
if not items:
return f"Directory {path} is empty"
return f"Contents of {path}:\n" + "\n".join(items)
except Exception as e:
return f"Error listing directory: {str(e)}"
def _run_command(command: str, working_dir: str) -> str:
"""Execute a shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30,
)
output = []
if result.stdout:
stdout = result.stdout
if len(stdout) > _MAX_TOOL_OUTPUT:
stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)"
output.append(f"STDOUT:\n{stdout}")
if result.stderr:
stderr = result.stderr
if len(stderr) > _MAX_TOOL_OUTPUT:
stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)"
output.append(f"STDERR:\n{stderr}")
status = f"Command exited with code {result.returncode}"
if not output:
return status
return status + "\n\n" + "\n\n".join(output)
except subprocess.TimeoutExpired:
return "Error: Command timed out after 30 seconds"
except Exception as e:
return f"Error running command: {str(e)}"
# Google Tools Handlers
def _initialize_google_clients() -> tuple[Optional[Any], Optional[Any]]:
"""Lazy initialization of Google clients.
Returns:
Tuple of (gmail_client, calendar_client) or (None, None) if not authorized
"""
global _gmail_client, _calendar_client
if _gmail_client is not None and _calendar_client is not None:
return _gmail_client, _calendar_client
try:
from google_tools.oauth_manager import GoogleOAuthManager
from google_tools.gmail_client import GmailClient
from google_tools.calendar_client import CalendarClient
oauth_manager = GoogleOAuthManager()
credentials = oauth_manager.get_credentials()
if not credentials:
return None, None
_gmail_client = GmailClient(oauth_manager)
_calendar_client = CalendarClient(oauth_manager)
return _gmail_client, _calendar_client
except Exception as e:
print(f"[Google Tools] Failed to initialize: {e}")
return None, None
def _send_email(
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> str:
"""Send an email via Gmail API."""
gmail_client, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.send_email(
to=to,
subject=subject,
body=body,
cc=cc,
reply_to_message_id=reply_to_message_id,
)
if result["success"]:
msg_id = result.get("message_id", "unknown")
return f"✓ Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}"
else:
return f"Error sending email: {result.get('error', 'Unknown error')}"
def _read_emails(
query: str = "",
max_results: int = 10,
include_body: bool = False,
) -> str:
"""Search and read emails from Gmail."""
gmail_client, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.search_emails(
query=query,
max_results=max_results,
include_body=include_body,
)
if result["success"]:
summary = result.get("summary", "")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return summary
else:
return f"Error reading emails: {result.get('error', 'Unknown error')}"
def _get_email(message_id: str, format_type: str = "text") -> str:
"""Get full content of a specific email."""
gmail_client, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.get_email(message_id=message_id, format_type=format_type)
if result["success"]:
email_data = result.get("email", {})
output = []
output.append(f"From: {email_data.get('from', 'Unknown')}")
output.append(f"To: {email_data.get('to', 'Unknown')}")
if email_data.get("cc"):
output.append(f"CC: {email_data['cc']}")
output.append(f"Subject: {email_data.get('subject', 'No subject')}")
output.append(f"Date: {email_data.get('date', 'Unknown')}")
output.append(f"\n{email_data.get('body', '')}")
if email_data.get("attachments"):
output.append(f"\nAttachments: {', '.join(email_data['attachments'])}")
full_output = "\n".join(output)
if len(full_output) > _MAX_TOOL_OUTPUT:
full_output = full_output[:_MAX_TOOL_OUTPUT] + "\n... (email truncated)"
return full_output
else:
return f"Error getting email: {result.get('error', 'Unknown error')}"
def _read_calendar(
days_ahead: int = 7,
calendar_id: str = "primary",
max_results: int = 20,
) -> str:
"""Read upcoming calendar events."""
_, calendar_client = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.list_events(
days_ahead=days_ahead,
calendar_id=calendar_id,
max_results=max_results,
)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f"Upcoming events (next {days_ahead} days):\n\n{summary}"
else:
return f"Error reading calendar: {result.get('error', 'Unknown error')}"
def _create_calendar_event(
summary: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
calendar_id: str = "primary",
) -> str:
"""Create a new calendar event."""
_, calendar_client = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.create_event(
summary=summary,
start_time=start_time,
end_time=end_time,
description=description,
location=location,
calendar_id=calendar_id,
)
if result["success"]:
event_id = result.get("event_id", "unknown")
html_link = result.get("html_link", "")
start = result.get("start", start_time)
output = [
f"✓ Calendar event created successfully!",
f"Title: {summary}",
f"Start: {start}",
f"Event ID: {event_id}",
]
if html_link:
output.append(f"Link: {html_link}")
if location:
output.append(f"Location: {location}")
return "\n".join(output)
else:
return f"Error creating calendar event: {result.get('error', 'Unknown error')}"
def _search_calendar(query: str, calendar_id: str = "primary") -> str:
"""Search calendar events by text query."""
_, calendar_client = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.search_events(query=query, calendar_id=calendar_id)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f'Search results for "{query}":\n\n{summary}'
else:
return f"Error searching calendar: {result.get('error', 'Unknown error')}"