Files
ajarbot/tools.py
Jordan Ramos 8afff96bb5 Add API usage tracking and dynamic task reloading
Features:
- Usage tracking system (usage_tracker.py)
  - Tracks input/output tokens per API call
  - Calculates costs with support for cache pricing
  - Stores data in usage_data.json (gitignored)
  - Integrated into llm_interface.py

- Dynamic task scheduler reloading
  - Auto-detects YAML changes every 60s
  - No restart needed for new tasks
  - reload_tasks() method for manual refresh

- Example cost tracking scheduled task
  - Daily API usage report
  - Budget tracking ($5/month target)
  - Disabled by default in scheduled_tasks.yaml

Improvements:
- Fixed tool_use/tool_result pair splitting bug (CRITICAL)
- Added thread safety to agent.chat()
- Fixed N+1 query problem in hybrid search
- Optimized database batch queries
- Added conversation history pruning (50 messages max)

Updated .gitignore:
- Exclude user profiles (memory_workspace/users/*.md)
- Exclude usage data (usage_data.json)
- Exclude vector index (vectors.usearch)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-13 23:38:44 -07:00

238 lines
7.8 KiB
Python

"""Tool definitions and execution for agent capabilities."""
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, List
# Tool definitions in Anthropic's tool use format
TOOL_DEFINITIONS = [
{
"name": "read_file",
"description": "Read the contents of a file. Use this to view configuration files, code, or any text file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to read (relative or absolute)",
}
},
"required": ["file_path"],
},
},
{
"name": "write_file",
"description": "Write content to a file. Creates a new file or overwrites existing file completely.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
},
"required": ["file_path", "content"],
},
},
{
"name": "edit_file",
"description": "Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.",
"input_schema": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to edit",
},
"old_text": {
"type": "string",
"description": "Exact text to find and replace",
},
"new_text": {
"type": "string",
"description": "New text to replace with",
},
},
"required": ["file_path", "old_text", "new_text"],
},
},
{
"name": "list_directory",
"description": "List files and directories in a given path. Useful for exploring the file system.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list (defaults to current directory)",
"default": ".",
}
},
},
},
{
"name": "run_command",
"description": "Execute a shell command. Use for git operations, running scripts, installing packages, etc.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute",
},
"working_dir": {
"type": "string",
"description": "Working directory for command execution (defaults to current directory)",
"default": ".",
},
},
"required": ["command"],
},
},
]
def execute_tool(tool_name: str, tool_input: Dict[str, Any]) -> str:
"""Execute a tool and return the result as a string."""
try:
if tool_name == "read_file":
return _read_file(tool_input["file_path"])
elif tool_name == "write_file":
return _write_file(tool_input["file_path"], tool_input["content"])
elif tool_name == "edit_file":
return _edit_file(
tool_input["file_path"],
tool_input["old_text"],
tool_input["new_text"],
)
elif tool_name == "list_directory":
path = tool_input.get("path", ".")
return _list_directory(path)
elif tool_name == "run_command":
command = tool_input["command"]
working_dir = tool_input.get("working_dir", ".")
return _run_command(command, working_dir)
else:
return f"Error: Unknown tool '{tool_name}'"
except Exception as e:
return f"Error executing {tool_name}: {str(e)}"
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
def _read_file(file_path: str) -> str:
"""Read and return file contents."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if len(content) > _MAX_TOOL_OUTPUT:
content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)"
return f"Content of {file_path}:\n\n{content}"
except Exception as e:
return f"Error reading file: {str(e)}"
def _write_file(file_path: str, content: str) -> str:
"""Write content to a file."""
path = Path(file_path)
try:
# Create parent directories if they don't exist
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return f"Successfully wrote to {file_path} ({len(content)} characters)"
except Exception as e:
return f"Error writing file: {str(e)}"
def _edit_file(file_path: str, old_text: str, new_text: str) -> str:
"""Edit file by replacing text."""
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
try:
content = path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: Text not found in file. Could not find:\n{old_text[:100]}..."
new_content = content.replace(old_text, new_text, 1)
path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {file_path}. Replaced 1 occurrence."
except Exception as e:
return f"Error editing file: {str(e)}"
def _list_directory(path: str) -> str:
"""List directory contents."""
dir_path = Path(path)
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
try:
items = []
for item in sorted(dir_path.iterdir()):
item_type = "DIR " if item.is_dir() else "FILE"
size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)"
items.append(f" {item_type} {item.name}{size}")
if not items:
return f"Directory {path} is empty"
return f"Contents of {path}:\n" + "\n".join(items)
except Exception as e:
return f"Error listing directory: {str(e)}"
def _run_command(command: str, working_dir: str) -> str:
"""Execute a shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30,
)
output = []
if result.stdout:
stdout = result.stdout
if len(stdout) > _MAX_TOOL_OUTPUT:
stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)"
output.append(f"STDOUT:\n{stdout}")
if result.stderr:
stderr = result.stderr
if len(stderr) > _MAX_TOOL_OUTPUT:
stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)"
output.append(f"STDERR:\n{stderr}")
status = f"Command exited with code {result.returncode}"
if not output:
return status
return status + "\n\n" + "\n\n".join(output)
except subprocess.TimeoutExpired:
return "Error: Command timed out after 30 seconds"
except Exception as e:
return f"Error running command: {str(e)}"