Compare commits

..

3 Commits

Author SHA1 Message Date
0eb5d2cab4 Add Google Contacts integration and fix critical Windows issues
- Add People API integration with contact management tools (create, list, get)
- Fix OAuth flow: replace deprecated OOB with localhost callback
- Fix Ctrl+C handling with proper signal handlers for graceful shutdown
- Fix UTF-8 encoding in scheduled_tasks.py for Windows compatibility
- Add users/ directory to gitignore to protect personal data

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-14 15:12:01 -07:00
19af20e700 Fix syntax error in OAuth callback handler reference 2026-02-14 11:42:37 -07:00
dc14baf426 Add Gmail and Google Calendar integration
Implements on-demand Google tools (not adapter) for email and calendar access via OAuth2.

Features:
- OAuth2 user consent flow with automatic token refresh
- 3 Gmail tools: send_email, read_emails, get_email
- 3 Calendar tools: read_calendar, create_calendar_event, search_calendar
- Lazy loading pattern for Google clients
- Secure token storage with file permissions
- Browser-based setup: python bot_runner.py --setup-google

Architecture:
- Tools-only approach (zero API calls when not in use)
- User-initiated actions only (no continuous polling)
- MIME message creation for emails with threading support
- HTML to text conversion for email parsing
- ISO 8601 timestamp handling for calendar events

Files added:
- google_tools/oauth_manager.py: OAuth2 flow and token management
- google_tools/gmail_client.py: Gmail API wrapper
- google_tools/calendar_client.py: Calendar API wrapper
- google_tools/utils.py: Email/MIME helpers
- config/scheduled_tasks.yaml: Example scheduled tasks config

Files modified:
- tools.py: Added 6 Google tool handlers with lazy initialization
- bot_runner.py: Added --setup-google command for OAuth authorization
- requirements.txt: Added Google API dependencies
- .gitignore: Added google_credentials.yaml and google_oauth_token.json

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-14 10:29:28 -07:00
12 changed files with 2066 additions and 7 deletions

7
.gitignore vendored
View File

@@ -50,8 +50,15 @@ memory_workspace/memory_index.db
memory_workspace/users/*.md # User profiles (jordan.md, etc.) memory_workspace/users/*.md # User profiles (jordan.md, etc.)
memory_workspace/vectors.usearch memory_workspace/vectors.usearch
# User profiles (personal info)
users/
# Usage tracking # Usage tracking
usage_data.json usage_data.json
# Google OAuth tokens
config/google_credentials.yaml
config/google_oauth_token.json
# Logs # Logs
*.log *.log

View File

@@ -14,6 +14,8 @@ Environment variables:
import argparse import argparse
import asyncio import asyncio
import signal
import sys
import traceback import traceback
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -26,6 +28,7 @@ from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter from adapters.telegram.adapter import TelegramAdapter
from agent import Agent from agent import Agent
from config.config_loader import ConfigLoader from config.config_loader import ConfigLoader
from google_tools.oauth_manager import GoogleOAuthManager
from scheduled_tasks import TaskScheduler from scheduled_tasks import TaskScheduler
# Adapter class registry mapping platform names to their classes # Adapter class registry mapping platform names to their classes
@@ -44,6 +47,7 @@ class BotRunner:
self.runtime: AdapterRuntime = None self.runtime: AdapterRuntime = None
self.agent: Agent = None self.agent: Agent = None
self.scheduler: TaskScheduler = None self.scheduler: TaskScheduler = None
self.shutdown_event = asyncio.Event()
def _load_adapter(self, platform: str) -> bool: def _load_adapter(self, platform: str) -> bool:
"""Load and register a single adapter. Returns True if loaded.""" """Load and register a single adapter. Returns True if loaded."""
@@ -126,6 +130,17 @@ class BotRunner:
if not self.setup(): if not self.setup():
return return
# Set up signal handlers
loop = asyncio.get_running_loop()
def signal_handler(signum, frame):
print(f"\n\n[Shutdown] Received signal {signum}...")
loop.call_soon_threadsafe(self.shutdown_event.set)
# Register signal handlers (works on both Windows and Unix)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("Starting bot...") print("Starting bot...")
print("=" * 60 + "\n") print("=" * 60 + "\n")
@@ -142,11 +157,9 @@ class BotRunner:
print("Bot is running! Press Ctrl+C to stop.") print("Bot is running! Press Ctrl+C to stop.")
print("=" * 60 + "\n") print("=" * 60 + "\n")
while True: # Wait for shutdown signal
await asyncio.sleep(1) await self.shutdown_event.wait()
except KeyboardInterrupt:
print("\n\n[Shutdown] Received interrupt signal...")
except Exception as e: except Exception as e:
print(f"\n[Error] {e}") print(f"\n[Error] {e}")
traceback.print_exc() traceback.print_exc()
@@ -195,6 +208,16 @@ def main() -> None:
action="store_true", action="store_true",
help="Run health check", help="Run health check",
) )
parser.add_argument(
"--setup-google",
action="store_true",
help="Set up Google OAuth for Gmail/Calendar integration",
)
parser.add_argument(
"--manual",
action="store_true",
help="Use manual OAuth code entry (for headless servers)",
)
args = parser.parse_args() args = parser.parse_args()
@@ -209,6 +232,32 @@ def main() -> None:
print("3. Run: python bot_runner.py") print("3. Run: python bot_runner.py")
return return
if args.setup_google:
print("=" * 60)
print("Google OAuth Setup")
print("=" * 60)
print()
oauth_manager = GoogleOAuthManager()
if oauth_manager.is_authorized():
print("✓ Already authorized!")
print(f"✓ Tokens found at {oauth_manager.token_file}")
print("\nTo re-authorize, delete the token file and run this command again.")
return
success = oauth_manager.run_oauth_flow(manual=args.manual)
if success:
print("You can now use Gmail and Calendar tools!")
print("\nTest it:")
print(" Via Telegram: \"What's on my calendar?\"")
print(" Via Telegram: \"Send an email to john@example.com\"")
else:
print("\nSetup failed. Please check:")
print("1. config/google_credentials.yaml exists with valid client_id/client_secret")
print("2. You authorized the app in your browser")
print("3. No firewall blocking localhost:8080")
return
runner = BotRunner(config_file=args.config) runner = BotRunner(config_file=args.config)
if args.health: if args.health:

116
config/scheduled_tasks.yaml Normal file
View File

@@ -0,0 +1,116 @@
# Scheduled Tasks Configuration
# Tasks that require the Agent/LLM to execute
tasks:
# Morning briefing - sent to Slack/Telegram
- name: morning-weather
prompt: |
Current weather report for my location. Just the weather - keep it brief.
schedule: "daily 06:00"
enabled: true
send_to_platform: "telegram"
send_to_channel: "8088983654" # Your Telegram user ID
# Daily API cost report
- name: daily-cost-report
prompt: |
Generate a daily API usage and cost report:
Read the usage_data.json file to get today's API usage statistics.
Format the report as follows:
📊 **Daily API Usage Report**
**Today's Stats:**
- Total API calls: [count]
- Input tokens: [count]
- Output tokens: [count]
- Cache hits: [count] (if any)
**Costs:**
- Today: $[amount]
- Model breakdown: [breakdown by model]
**Budget Tracking:**
- Remaining budget: $19.86
- 75% threshold: $14.90 (⚠️ WARN IF EXCEEDED)
- Status: [On track / Warning - approaching 75% / Critical - over 75%]
⚠️ **IMPORTANT:** If cumulative cost exceeds $14.90 (75% of $19.86), display a clear warning message.
Keep it clear and actionable!
schedule: "daily 23:00"
enabled: true
send_to_platform: "telegram"
send_to_channel: "8088983654"
# Evening summary
- name: evening-report
prompt: |
Good evening! Time for the daily wrap-up:
1. What was accomplished today?
2. Any tasks still pending?
3. Preview of tomorrow's priorities
4. Weather forecast for tomorrow (infer or say API needed)
Keep it concise and positive.
schedule: "daily 18:00"
enabled: false
send_to_platform: "telegram"
send_to_channel: "123456789" # Replace with chat ID
# Hourly health check (no message sending)
- name: system-health-check
prompt: |
Quick health check:
1. Are there any tasks that have been pending > 24 hours?
2. Is the memory system healthy?
3. Any alerts or issues?
Respond with "HEALTHY" if all is well, otherwise describe the issue.
schedule: "hourly"
enabled: false
username: "health-checker"
# Weekly review on Friday
- name: weekly-summary
prompt: |
It's Friday! Time for the weekly review:
1. Major accomplishments this week
2. Challenges faced and lessons learned
3. Key metrics (tasks completed, etc.)
4. Goals for next week
5. Team shoutouts (if applicable)
Make it comprehensive but engaging.
schedule: "weekly fri 17:00"
enabled: false
send_to_platform: "slack"
send_to_channel: "C12345"
# Custom: Midday standup
- name: midday-standup
prompt: |
Midday check-in! Quick standup report:
1. Morning accomplishments
2. Current focus
3. Any blockers?
4. Afternoon plan
Keep it brief - standup style.
schedule: "daily 12:00"
enabled: false
send_to_platform: "slack"
send_to_channel: "C12345"
# Configuration notes:
# - schedule formats:
# - "hourly" - Every hour on the hour
# - "daily HH:MM" - Every day at specified time (24h format)
# - "weekly day HH:MM" - Every week on specified day (mon, tue, wed, thu, fri, sat, sun)
# - send_to_platform: null = don't send to messaging (only log)
# - username: Agent memory username to use for this task

8
google_tools/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
"""Google Tools - Gmail, Calendar, and Contacts integration for ajarbot."""
from .calendar_client import CalendarClient
from .gmail_client import GmailClient
from .oauth_manager import GoogleOAuthManager
from .people_client import PeopleClient
__all__ = ["GoogleOAuthManager", "GmailClient", "CalendarClient", "PeopleClient"]

View File

@@ -0,0 +1,296 @@
"""Google Calendar API client for managing events."""
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .oauth_manager import GoogleOAuthManager
class CalendarClient:
"""Client for Google Calendar API operations."""
def __init__(self, oauth_manager: GoogleOAuthManager):
"""Initialize Calendar client.
Args:
oauth_manager: Initialized OAuth manager with valid credentials
"""
self.oauth_manager = oauth_manager
self.service = None
self._initialize_service()
def _initialize_service(self) -> bool:
"""Initialize Calendar API service.
Returns:
True if initialized successfully, False otherwise
"""
credentials = self.oauth_manager.get_credentials()
if not credentials:
return False
try:
self.service = build("calendar", "v3", credentials=credentials)
return True
except Exception as e:
print(f"[Calendar] Failed to initialize service: {e}")
return False
def list_events(
self,
days_ahead: int = 7,
calendar_id: str = "primary",
max_results: int = 20,
) -> Dict:
"""List upcoming events.
Args:
days_ahead: Number of days ahead to look (max: 30)
calendar_id: Calendar ID (default: "primary")
max_results: Maximum number of events to return
Returns:
Dict with success status and events or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
# Limit days_ahead to 30
days_ahead = min(days_ahead, 30)
now = datetime.utcnow()
time_min = now.isoformat() + "Z"
time_max = (now + timedelta(days=days_ahead)).isoformat() + "Z"
events_result = (
self.service.events()
.list(
calendarId=calendar_id,
timeMin=time_min,
timeMax=time_max,
maxResults=max_results,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if not events:
return {
"success": True,
"events": [],
"count": 0,
"summary": f"No events in the next {days_ahead} days.",
}
# Format events
formatted_events = []
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
end = event["end"].get("dateTime", event["end"].get("date"))
formatted_events.append({
"id": event["id"],
"summary": event.get("summary", "No title"),
"start": start,
"end": end,
"location": event.get("location", ""),
"description": event.get("description", ""),
"html_link": event.get("htmlLink", ""),
})
summary = self._format_events_summary(formatted_events)
return {
"success": True,
"events": formatted_events,
"count": len(formatted_events),
"summary": summary,
}
except HttpError as e:
return {"success": False, "error": str(e)}
def create_event(
self,
summary: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
calendar_id: str = "primary",
) -> Dict:
"""Create a new calendar event.
Args:
summary: Event title
start_time: Start time (ISO 8601 format)
end_time: End time (ISO 8601 format)
description: Optional event description
location: Optional event location
calendar_id: Calendar ID (default: "primary")
Returns:
Dict with success status and event details or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
# Detect if all-day event (time is 00:00:00)
is_all_day = "T00:00:00" in start_time
if is_all_day:
# Use date format for all-day events
event_body = {
"summary": summary,
"start": {"date": start_time.split("T")[0]},
"end": {"date": end_time.split("T")[0]},
}
else:
# Use dateTime format for timed events
event_body = {
"summary": summary,
"start": {"dateTime": start_time, "timeZone": "UTC"},
"end": {"dateTime": end_time, "timeZone": "UTC"},
}
if description:
event_body["description"] = description
if location:
event_body["location"] = location
event = (
self.service.events()
.insert(calendarId=calendar_id, body=event_body)
.execute()
)
return {
"success": True,
"event_id": event.get("id"),
"html_link": event.get("htmlLink"),
"summary": event.get("summary"),
"start": event["start"].get("dateTime", event["start"].get("date")),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def search_events(
self,
query: str,
calendar_id: str = "primary",
) -> Dict:
"""Search calendar events by text query.
Args:
query: Search query string
calendar_id: Calendar ID (default: "primary")
Returns:
Dict with success status and matching events or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
events_result = (
self.service.events()
.list(
calendarId=calendar_id,
q=query,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if not events:
return {
"success": True,
"events": [],
"count": 0,
"summary": f'No events found matching "{query}".',
}
# Format events
formatted_events = []
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
end = event["end"].get("dateTime", event["end"].get("date"))
formatted_events.append({
"id": event["id"],
"summary": event.get("summary", "No title"),
"start": start,
"end": end,
"location": event.get("location", ""),
"description": event.get("description", ""),
})
summary = self._format_events_summary(formatted_events)
return {
"success": True,
"events": formatted_events,
"count": len(formatted_events),
"summary": summary,
}
except HttpError as e:
return {"success": False, "error": str(e)}
def _format_events_summary(self, events: List[Dict]) -> str:
"""Format events into readable summary.
Args:
events: List of event dicts
Returns:
Formatted string summary
"""
if not events:
return "No events."
lines = []
for i, event in enumerate(events, 1):
start = event["start"]
# Parse datetime for better formatting
try:
if "T" in start:
dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
start_str = dt.strftime("%b %d at %I:%M %p")
else:
dt = datetime.fromisoformat(start)
start_str = dt.strftime("%b %d (all day)")
except:
start_str = start
lines.append(f"{i}. {event['summary']} - {start_str}")
if event.get("location"):
lines.append(f" Location: {event['location']}")
return "\n".join(lines)

View File

@@ -0,0 +1,220 @@
"""Gmail API client for sending and reading emails."""
from typing import Dict, List, Optional
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .oauth_manager import GoogleOAuthManager
from .utils import (
create_mime_message,
format_email_summary,
get_email_body,
parse_email_message,
)
class GmailClient:
"""Client for Gmail API operations."""
def __init__(self, oauth_manager: GoogleOAuthManager):
"""Initialize Gmail client.
Args:
oauth_manager: Initialized OAuth manager with valid credentials
"""
self.oauth_manager = oauth_manager
self.service = None
self._initialize_service()
def _initialize_service(self) -> bool:
"""Initialize Gmail API service.
Returns:
True if initialized successfully, False otherwise
"""
credentials = self.oauth_manager.get_credentials()
if not credentials:
return False
try:
self.service = build("gmail", "v1", credentials=credentials)
return True
except Exception as e:
print(f"[Gmail] Failed to initialize service: {e}")
return False
def send_email(
self,
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> Dict:
"""Send an email.
Args:
to: Recipient email address
subject: Email subject
body: Email body (plain text or HTML)
cc: Optional list of CC recipients
reply_to_message_id: Optional message ID to reply to (for threading)
Returns:
Dict with success status and message_id or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
message = create_mime_message(
to=to,
subject=subject,
body=body,
cc=cc,
reply_to_message_id=reply_to_message_id,
)
result = (
self.service.users()
.messages()
.send(userId="me", body=message)
.execute()
)
return {
"success": True,
"message_id": result.get("id"),
"thread_id": result.get("threadId"),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def search_emails(
self,
query: str = "",
max_results: int = 10,
include_body: bool = False,
) -> Dict:
"""Search emails using Gmail search syntax.
Args:
query: Gmail search query (e.g., "from:john@example.com after:2026/02/10")
max_results: Maximum number of results to return (max: 50)
include_body: Whether to include full email body
Returns:
Dict with success status and emails or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
# Limit max_results to 50 to avoid token overload
max_results = min(max_results, 50)
# Search for messages
results = (
self.service.users()
.messages()
.list(userId="me", q=query, maxResults=max_results)
.execute()
)
messages = results.get("messages", [])
if not messages:
return {
"success": True,
"emails": [],
"count": 0,
"summary": "No emails found.",
}
# Fetch full message details
emails = []
for msg in messages:
message = (
self.service.users()
.messages()
.get(userId="me", id=msg["id"], format="full")
.execute()
)
email_data = parse_email_message(message)
if include_body:
email_data["body"] = get_email_body(message)
emails.append(email_data)
summary = format_email_summary(emails, include_body=include_body)
return {
"success": True,
"emails": emails,
"count": len(emails),
"summary": summary,
}
except HttpError as e:
return {"success": False, "error": str(e)}
def get_email(self, message_id: str, format_type: str = "text") -> Dict:
"""Get full email by ID.
Args:
message_id: Gmail message ID
format_type: "text" or "html" (default: text)
Returns:
Dict with success status and email data or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
message = (
self.service.users()
.messages()
.get(userId="me", id=message_id, format="full")
.execute()
)
email_data = parse_email_message(message)
email_data["body"] = get_email_body(message)
# Get attachment info if any
payload = message.get("payload", {})
attachments = []
for part in payload.get("parts", []):
if part.get("filename"):
attachments.append({
"filename": part["filename"],
"mime_type": part.get("mimeType"),
"size": part.get("body", {}).get("size", 0),
})
email_data["attachments"] = attachments
return {
"success": True,
"email": email_data,
}
except HttpError as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,244 @@
"""OAuth2 Manager for Google APIs (Gmail, Calendar, Contacts)."""
import json
import os
import webbrowser
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from threading import Thread
from typing import Dict, Optional
from urllib.parse import parse_qs, urlparse
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
# OAuth scopes
SCOPES = [
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/contacts",
]
# Token file location
TOKEN_FILE = Path("config/google_oauth_token.json")
CREDENTIALS_FILE = Path("config/google_credentials.yaml")
class OAuthCallbackHandler(BaseHTTPRequestHandler):
"""Handles OAuth2 callback from browser."""
authorization_code: Optional[str] = None
def do_GET(self) -> None:
"""Handle GET request from OAuth callback."""
query = urlparse(self.path).query
params = parse_qs(query)
if "code" in params:
OAuthCallbackHandler.authorization_code = params["code"][0]
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
b"<html><body><h1>Authorization successful!</h1>"
b"<p>You can close this window and return to the terminal.</p>"
b"</body></html>"
)
else:
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
b"<html><body><h1>Authorization failed</h1>"
b"<p>No authorization code received.</p></body></html>"
)
def log_message(self, format: str, *args) -> None:
"""Suppress logging to keep console clean."""
pass
class GoogleOAuthManager:
"""Manages OAuth2 authentication for Google APIs."""
def __init__(
self,
token_file: Path = TOKEN_FILE,
credentials_file: Path = CREDENTIALS_FILE,
) -> None:
self.token_file = token_file
self.credentials_file = credentials_file
self.credentials: Optional[Credentials] = None
def is_authorized(self) -> bool:
"""Check if valid OAuth tokens exist."""
if not self.token_file.exists():
return False
try:
self.credentials = Credentials.from_authorized_user_file(
str(self.token_file), SCOPES
)
return self.credentials and self.credentials.valid
except Exception:
return False
def get_credentials(self) -> Optional[Credentials]:
"""Get valid OAuth credentials, refreshing if needed.
Returns:
Credentials object if authorized, None otherwise.
"""
# Load existing tokens
if self.token_file.exists():
try:
self.credentials = Credentials.from_authorized_user_file(
str(self.token_file), SCOPES
)
except Exception as e:
print(f"[OAuth] Error loading tokens: {e}")
return None
# Refresh if expired
if self.credentials and not self.credentials.valid:
if self.credentials.expired and self.credentials.refresh_token:
try:
print("[OAuth] Refreshing access token...")
self.credentials.refresh(Request())
self._save_credentials()
print("[OAuth] Token refreshed successfully")
except Exception as e:
print(f"[OAuth] Token refresh failed: {e}")
print("[OAuth] Please run: python bot_runner.py --setup-google")
return None
else:
print("[OAuth] Credentials invalid, re-authorization needed")
print("[OAuth] Please run: python bot_runner.py --setup-google")
return None
return self.credentials
def needs_refresh_soon(self) -> bool:
"""Check if token will expire within 5 minutes."""
if not self.credentials or not self.credentials.expiry:
return False
expiry_threshold = datetime.utcnow() + timedelta(minutes=5)
return self.credentials.expiry < expiry_threshold
def run_oauth_flow(self, manual: bool = False) -> bool:
"""Run OAuth2 authorization flow.
Args:
manual: If True, requires manual code paste instead of browser callback.
Returns:
True if authorization successful, False otherwise.
"""
if not self.credentials_file.exists():
print(f"[OAuth] Error: {self.credentials_file} not found")
print("[OAuth] Please create config/google_credentials.yaml with:")
print(" client_id: YOUR_CLIENT_ID")
print(" client_secret: YOUR_CLIENT_SECRET")
return False
try:
# Load client credentials
import yaml
with open(self.credentials_file) as f:
creds_config = yaml.safe_load(f)
client_config = {
"installed": {
"client_id": creds_config["client_id"],
"client_secret": creds_config["client_secret"],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": ["http://localhost:8081"],
}
}
# Both manual and automatic modes use local server
# (Google deprecated OOB flow in 2022)
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
flow.redirect_uri = "http://localhost:8081"
auth_url, _ = flow.authorization_url(prompt="consent")
print("\nPlease visit this URL to authorize:")
print(f"\n{auth_url}\n")
# Auto-open browser unless manual mode
if not manual:
try:
webbrowser.open(auth_url)
print("Opening browser automatically...\n")
except Exception:
print("Could not open browser automatically. Please visit the URL above.\n")
else:
print("Please open the URL above in your browser manually.\n")
# Start local server to receive callback
server = HTTPServer(("localhost", 8081), OAuthCallbackHandler)
print("Waiting for authorization... (press Ctrl+C to cancel)\n")
# Wait for callback
server.handle_request()
if OAuthCallbackHandler.authorization_code:
flow.fetch_token(code=OAuthCallbackHandler.authorization_code)
self.credentials = flow.credentials
else:
print("[OAuth] Authorization failed - no code received")
return False
# Save tokens
self._save_credentials()
print("\n✓ Authorization successful!")
print(f"✓ Tokens saved to {self.token_file}\n")
return True
except Exception as e:
print(f"[OAuth] Authorization failed: {e}")
return False
def _save_credentials(self) -> None:
"""Save credentials to token file with secure permissions."""
if not self.credentials:
return
# Ensure config directory exists
self.token_file.parent.mkdir(parents=True, exist_ok=True)
# Write to temp file first (atomic write)
temp_file = self.token_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
f.write(self.credentials.to_json())
# Set file permissions to 600 (owner read/write only) on Unix systems
if os.name != "nt": # Not Windows
os.chmod(temp_file, 0o600)
# Atomic rename
temp_file.replace(self.token_file)
def revoke_authorization(self) -> bool:
"""Revoke OAuth authorization and delete tokens.
Returns:
True if revoked successfully, False otherwise.
"""
if not self.credentials:
return False
try:
self.credentials.revoke(Request())
if self.token_file.exists():
self.token_file.unlink()
print("[OAuth] Authorization revoked successfully")
return True
except Exception as e:
print(f"[OAuth] Failed to revoke authorization: {e}")
return False

View File

@@ -0,0 +1,327 @@
"""Google People API client for managing contacts."""
from typing import Any, Dict, List, Optional
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .oauth_manager import GoogleOAuthManager
PERSON_FIELDS = "names,emailAddresses,phoneNumbers,biographies"
class PeopleClient:
"""Client for Google People API operations."""
def __init__(self, oauth_manager: GoogleOAuthManager):
"""Initialize People client.
Args:
oauth_manager: Initialized OAuth manager with valid credentials
"""
self.oauth_manager = oauth_manager
self.service = None
self._initialize_service()
def _initialize_service(self) -> bool:
"""Initialize People API service.
Returns:
True if initialized successfully, False otherwise
"""
credentials = self.oauth_manager.get_credentials()
if not credentials:
return False
try:
self.service = build("people", "v1", credentials=credentials)
return True
except Exception as e:
print(f"[People] Failed to initialize service: {e}")
return False
def _ensure_service(self) -> bool:
"""Ensure the service is initialized."""
if not self.service:
return self._initialize_service()
return True
def create_contact(
self,
given_name: str,
family_name: str = "",
email: str = "",
phone: Optional[str] = None,
notes: Optional[str] = None,
) -> Dict:
"""Create a new contact.
Args:
given_name: First name
family_name: Last name
email: Email address
phone: Optional phone number
notes: Optional notes/biography
Returns:
Dict with success status and contact details or error
"""
if not self._ensure_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
body: Dict[str, Any] = {
"names": [{"givenName": given_name, "familyName": family_name}],
}
if email:
body["emailAddresses"] = [{"value": email}]
if phone:
body["phoneNumbers"] = [{"value": phone}]
if notes:
body["biographies"] = [{"value": notes, "contentType": "TEXT_PLAIN"}]
result = (
self.service.people()
.createContact(body=body)
.execute()
)
return {
"success": True,
"resource_name": result.get("resourceName", ""),
"name": f"{given_name} {family_name}".strip(),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def list_contacts(
self,
max_results: int = 100,
query: Optional[str] = None,
) -> Dict:
"""List or search contacts.
Args:
max_results: Maximum number of contacts to return (max: 1000)
query: Optional search query string
Returns:
Dict with success status and contacts or error
"""
if not self._ensure_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
max_results = min(max_results, 1000)
if query:
# Use searchContacts for query-based search
result = (
self.service.people()
.searchContacts(
query=query,
readMask="names,emailAddresses,phoneNumbers,biographies",
pageSize=min(max_results, 30), # searchContacts max is 30
)
.execute()
)
people = [r["person"] for r in result.get("results", [])]
else:
# Use connections.list for full listing
result = (
self.service.people()
.connections()
.list(
resourceName="people/me",
pageSize=max_results,
personFields=PERSON_FIELDS,
sortOrder="LAST_NAME_ASCENDING",
)
.execute()
)
people = result.get("connections", [])
contacts = [self._format_contact(p) for p in people]
return {
"success": True,
"contacts": contacts,
"count": len(contacts),
"summary": self._format_contacts_summary(contacts),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def get_contact(self, resource_name: str) -> Dict:
"""Get a single contact by resource name.
Args:
resource_name: Contact resource name (e.g., "people/c1234567890")
Returns:
Dict with success status and contact details or error
"""
if not self._ensure_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
result = (
self.service.people()
.get(resourceName=resource_name, personFields=PERSON_FIELDS)
.execute()
)
return {
"success": True,
"contact": self._format_contact(result),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def update_contact(self, resource_name: str, updates: Dict) -> Dict:
"""Update an existing contact.
Args:
resource_name: Contact resource name (e.g., "people/c1234567890")
updates: Dict with fields to update (given_name, family_name, email, phone, notes)
Returns:
Dict with success status or error
"""
if not self._ensure_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
# Get current contact to obtain etag
current = (
self.service.people()
.get(resourceName=resource_name, personFields=PERSON_FIELDS)
.execute()
)
body: Dict[str, Any] = {"etag": current["etag"]}
update_fields = []
if "given_name" in updates or "family_name" in updates:
names = current.get("names", [{}])
name = names[0] if names else {}
body["names"] = [{
"givenName": updates.get("given_name", name.get("givenName", "")),
"familyName": updates.get("family_name", name.get("familyName", "")),
}]
update_fields.append("names")
if "email" in updates:
body["emailAddresses"] = [{"value": updates["email"]}]
update_fields.append("emailAddresses")
if "phone" in updates:
body["phoneNumbers"] = [{"value": updates["phone"]}]
update_fields.append("phoneNumbers")
if "notes" in updates:
body["biographies"] = [{"value": updates["notes"], "contentType": "TEXT_PLAIN"}]
update_fields.append("biographies")
if not update_fields:
return {"success": False, "error": "No valid fields to update"}
result = (
self.service.people()
.updateContact(
resourceName=resource_name,
body=body,
updatePersonFields=",".join(update_fields),
)
.execute()
)
return {
"success": True,
"resource_name": result.get("resourceName", resource_name),
"updated_fields": update_fields,
}
except HttpError as e:
return {"success": False, "error": str(e)}
def delete_contact(self, resource_name: str) -> Dict:
"""Delete a contact.
Args:
resource_name: Contact resource name (e.g., "people/c1234567890")
Returns:
Dict with success status or error
"""
if not self._ensure_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
self.service.people().deleteContact(
resourceName=resource_name,
).execute()
return {"success": True, "deleted": resource_name}
except HttpError as e:
return {"success": False, "error": str(e)}
def _format_contact(self, person: Dict) -> Dict:
"""Format a person resource into a simple contact dict."""
names = person.get("names", [])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
bios = person.get("biographies", [])
name = names[0] if names else {}
return {
"resource_name": person.get("resourceName", ""),
"given_name": name.get("givenName", ""),
"family_name": name.get("familyName", ""),
"display_name": name.get("displayName", ""),
"email": emails[0].get("value", "") if emails else "",
"phone": phones[0].get("value", "") if phones else "",
"notes": bios[0].get("value", "") if bios else "",
}
def _format_contacts_summary(self, contacts: List[Dict]) -> str:
"""Format contacts into a readable summary."""
if not contacts:
return "No contacts found."
lines = []
for i, c in enumerate(contacts, 1):
name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip()
parts = [f"{i}. {name or '(no name)'}"]
if c.get("email"):
parts.append(f" Email: {c['email']}")
if c.get("phone"):
parts.append(f" Phone: {c['phone']}")
lines.append("\n".join(parts))
return "\n".join(lines)

208
google_tools/utils.py Normal file
View File

@@ -0,0 +1,208 @@
"""Utility functions for Gmail/Calendar tools."""
import base64
import email
import re
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from html.parser import HTMLParser
from typing import Dict, List, Optional
class HTMLToText(HTMLParser):
"""Convert HTML to plain text."""
def __init__(self):
super().__init__()
self.text = []
self.skip = False
def handle_data(self, data):
if not self.skip:
self.text.append(data)
def handle_starttag(self, tag, attrs):
if tag in ["script", "style"]:
self.skip = True
elif tag == "br":
self.text.append("\n")
elif tag == "p":
self.text.append("\n\n")
def handle_endtag(self, tag):
if tag in ["script", "style"]:
self.skip = False
elif tag in ["p", "div"]:
self.text.append("\n")
def get_text(self):
return "".join(self.text).strip()
def html_to_text(html: str) -> str:
"""Convert HTML to plain text.
Args:
html: HTML content
Returns:
Plain text content
"""
parser = HTMLToText()
parser.feed(html)
return parser.get_text()
def create_mime_message(
to: str,
subject: str,
body: str,
from_email: str = "me",
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> Dict:
"""Create a MIME message for Gmail API.
Args:
to: Recipient email address
subject: Email subject
body: Email body (plain text or HTML)
from_email: Sender email (default: "me")
cc: Optional list of CC recipients
reply_to_message_id: Optional message ID to reply to
Returns:
Dict with 'raw' key containing base64url-encoded message
"""
message = MIMEMultipart("alternative")
message["To"] = to
message["From"] = from_email
message["Subject"] = subject
if cc:
message["Cc"] = ", ".join(cc)
if reply_to_message_id:
message["In-Reply-To"] = reply_to_message_id
message["References"] = reply_to_message_id
# Try to detect if body is HTML
is_html = bool(re.search(r"<[a-z][\s\S]*>", body, re.IGNORECASE))
if is_html:
# Add both plain text and HTML versions
text_part = MIMEText(html_to_text(body), "plain")
html_part = MIMEText(body, "html")
message.attach(text_part)
message.attach(html_part)
else:
# Plain text only
text_part = MIMEText(body, "plain")
message.attach(text_part)
# Encode as base64url
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {"raw": raw_message}
def parse_email_message(message: Dict) -> Dict:
"""Parse Gmail API message into readable format.
Args:
message: Gmail API message object
Returns:
Dict with parsed fields: from, to, subject, date, body, snippet
"""
headers = {
h["name"].lower(): h["value"]
for h in message.get("payload", {}).get("headers", [])
}
result = {
"id": message.get("id"),
"thread_id": message.get("threadId"),
"from": headers.get("from", ""),
"to": headers.get("to", ""),
"cc": headers.get("cc", ""),
"subject": headers.get("subject", ""),
"date": headers.get("date", ""),
"snippet": message.get("snippet", ""),
"labels": message.get("labelIds", []),
}
return result
def get_email_body(message: Dict) -> str:
"""Extract email body from Gmail API message.
Args:
message: Gmail API message object
Returns:
Email body as plain text
"""
payload = message.get("payload", {})
def get_body_from_part(part: Dict) -> Optional[str]:
"""Recursively extract body from MIME parts."""
mime_type = part.get("mimeType", "")
body_data = part.get("body", {}).get("data")
if body_data:
decoded = base64.urlsafe_b64decode(body_data).decode("utf-8", errors="ignore")
if mime_type == "text/html":
return html_to_text(decoded)
elif mime_type == "text/plain":
return decoded
# Check nested parts
for subpart in part.get("parts", []):
result = get_body_from_part(subpart)
if result:
return result
return None
# Try to get body
body = get_body_from_part(payload)
if not body:
# Fallback to snippet
body = message.get("snippet", "")
return body
def format_email_summary(emails: List[Dict], include_body: bool = False) -> str:
"""Format emails into a readable summary.
Args:
emails: List of parsed email dicts
include_body: Whether to include full body
Returns:
Formatted string summary
"""
if not emails:
return "No emails found."
lines = []
for i, email_data in enumerate(emails, 1):
lines.append(f"{i}. From: {email_data['from']}")
lines.append(f" Subject: {email_data['subject']}")
lines.append(f" Date: {email_data['date']}")
if include_body and "body" in email_data:
# Truncate long bodies
body = email_data["body"]
if len(body) > 500:
body = body[:500] + "..."
lines.append(f" Body: {body}")
else:
lines.append(f" Snippet: {email_data['snippet']}")
lines.append("") # Blank line
return "\n".join(lines)

View File

@@ -17,3 +17,9 @@ slack-sdk>=3.23.0
# Telegram adapter # Telegram adapter
python-telegram-bot>=20.7 python-telegram-bot>=20.7
# Google API dependencies (Gmail and Calendar)
google-auth>=2.23.0
google-auth-oauthlib>=1.1.0
google-auth-httplib2>=0.1.1
google-api-python-client>=2.108.0

View File

@@ -93,7 +93,7 @@ class TaskScheduler:
# Track file modification time # Track file modification time
self._last_mtime = self.config_file.stat().st_mtime self._last_mtime = self.config_file.stat().st_mtime
with open(self.config_file) as f: with open(self.config_file, encoding="utf-8") as f:
config = yaml.safe_load(f) or {} config = yaml.safe_load(f) or {}
self.tasks.clear() # Clear existing tasks before reload self.tasks.clear() # Clear existing tasks before reload
@@ -155,7 +155,7 @@ class TaskScheduler:
} }
self.config_file.parent.mkdir(parents=True, exist_ok=True) self.config_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_file, "w") as f: with open(self.config_file, "w", encoding="utf-8") as f:
yaml.dump( yaml.dump(
default_config, f, default_config, f,
default_flow_style=False, default_flow_style=False,

580
tools.py
View File

@@ -3,7 +3,12 @@
import os import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
# Google tools (lazy loaded when needed)
_gmail_client: Optional[Any] = None
_calendar_client: Optional[Any] = None
_people_client: Optional[Any] = None
# Tool definitions in Anthropic's tool use format # Tool definitions in Anthropic's tool use format
@@ -95,12 +100,234 @@ TOOL_DEFINITIONS = [
"required": ["command"], "required": ["command"],
}, },
}, },
# Gmail tools
{
"name": "send_email",
"description": "Send an email from the bot's Gmail account. Requires prior OAuth setup (--setup-google).",
"input_schema": {
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient email address",
},
"subject": {
"type": "string",
"description": "Email subject line",
},
"body": {
"type": "string",
"description": "Email body (plain text or HTML)",
},
"cc": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of CC recipients",
},
"reply_to_message_id": {
"type": "string",
"description": "Optional Gmail message ID to reply to (for threading)",
},
},
"required": ["to", "subject", "body"],
},
},
{
"name": "read_emails",
"description": "Search and read emails from the bot's Gmail account using Gmail search syntax (e.g., 'from:user@example.com after:2026/02/10').",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (supports from, to, subject, after, before, has:attachment, etc.)",
"default": "",
},
"max_results": {
"type": "integer",
"description": "Maximum number of emails to return (default: 10, max: 50)",
"default": 10,
},
"include_body": {
"type": "boolean",
"description": "Whether to include full email body (default: false, shows snippet only)",
"default": False,
},
},
},
},
{
"name": "get_email",
"description": "Get full content of a specific email by its Gmail message ID.",
"input_schema": {
"type": "object",
"properties": {
"message_id": {
"type": "string",
"description": "Gmail message ID",
},
"format": {
"type": "string",
"description": "Format type: 'text' or 'html' (default: text)",
"default": "text",
},
},
"required": ["message_id"],
},
},
# Calendar tools
{
"name": "read_calendar",
"description": "Read upcoming events from Google Calendar. Shows events from today onwards.",
"input_schema": {
"type": "object",
"properties": {
"days_ahead": {
"type": "integer",
"description": "Number of days ahead to look (default: 7, max: 30)",
"default": 7,
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary' for main calendar)",
"default": "primary",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (default: 20)",
"default": 20,
},
},
},
},
{
"name": "create_calendar_event",
"description": "Create a new event in Google Calendar. Use ISO 8601 format for times (e.g., '2026-02-14T10:00:00Z').",
"input_schema": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Event title/summary",
},
"start_time": {
"type": "string",
"description": "Event start time in ISO 8601 format (e.g., '2026-02-14T10:00:00Z')",
},
"end_time": {
"type": "string",
"description": "Event end time in ISO 8601 format",
},
"description": {
"type": "string",
"description": "Optional event description",
"default": "",
},
"location": {
"type": "string",
"description": "Optional event location",
"default": "",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["summary", "start_time", "end_time"],
},
},
{
"name": "search_calendar",
"description": "Search calendar events by text query. Searches event titles and descriptions.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query text",
},
"calendar_id": {
"type": "string",
"description": "Calendar ID (default: 'primary')",
"default": "primary",
},
},
"required": ["query"],
},
},
# Contacts tools
{
"name": "create_contact",
"description": "Create a new Google contact. Requires prior OAuth setup (--setup-google).",
"input_schema": {
"type": "object",
"properties": {
"given_name": {
"type": "string",
"description": "Contact's first name",
},
"family_name": {
"type": "string",
"description": "Contact's last name",
"default": "",
},
"email": {
"type": "string",
"description": "Contact's email address",
"default": "",
},
"phone": {
"type": "string",
"description": "Contact's phone number",
},
"notes": {
"type": "string",
"description": "Optional notes about the contact",
},
},
"required": ["given_name"],
},
},
{
"name": "list_contacts",
"description": "List or search Google contacts. Without a query, lists all contacts sorted by last name.",
"input_schema": {
"type": "object",
"properties": {
"max_results": {
"type": "integer",
"description": "Maximum number of contacts to return (default: 100, max: 1000)",
"default": 100,
},
"query": {
"type": "string",
"description": "Optional search query to filter contacts by name, email, etc.",
},
},
},
},
{
"name": "get_contact",
"description": "Get full details of a specific Google contact by resource name.",
"input_schema": {
"type": "object",
"properties": {
"resource_name": {
"type": "string",
"description": "Contact resource name (e.g., 'people/c1234567890')",
},
},
"required": ["resource_name"],
},
},
] ]
def execute_tool(tool_name: str, tool_input: Dict[str, Any]) -> str: def execute_tool(tool_name: str, tool_input: Dict[str, Any]) -> str:
"""Execute a tool and return the result as a string.""" """Execute a tool and return the result as a string."""
try: try:
# File tools
if tool_name == "read_file": if tool_name == "read_file":
return _read_file(tool_input["file_path"]) return _read_file(tool_input["file_path"])
elif tool_name == "write_file": elif tool_name == "write_file":
@@ -118,6 +345,65 @@ def execute_tool(tool_name: str, tool_input: Dict[str, Any]) -> str:
command = tool_input["command"] command = tool_input["command"]
working_dir = tool_input.get("working_dir", ".") working_dir = tool_input.get("working_dir", ".")
return _run_command(command, working_dir) return _run_command(command, working_dir)
# Gmail tools
elif tool_name == "send_email":
return _send_email(
to=tool_input["to"],
subject=tool_input["subject"],
body=tool_input["body"],
cc=tool_input.get("cc"),
reply_to_message_id=tool_input.get("reply_to_message_id"),
)
elif tool_name == "read_emails":
return _read_emails(
query=tool_input.get("query", ""),
max_results=tool_input.get("max_results", 10),
include_body=tool_input.get("include_body", False),
)
elif tool_name == "get_email":
return _get_email(
message_id=tool_input["message_id"],
format_type=tool_input.get("format", "text"),
)
# Calendar tools
elif tool_name == "read_calendar":
return _read_calendar(
days_ahead=tool_input.get("days_ahead", 7),
calendar_id=tool_input.get("calendar_id", "primary"),
max_results=tool_input.get("max_results", 20),
)
elif tool_name == "create_calendar_event":
return _create_calendar_event(
summary=tool_input["summary"],
start_time=tool_input["start_time"],
end_time=tool_input["end_time"],
description=tool_input.get("description", ""),
location=tool_input.get("location", ""),
calendar_id=tool_input.get("calendar_id", "primary"),
)
elif tool_name == "search_calendar":
return _search_calendar(
query=tool_input["query"],
calendar_id=tool_input.get("calendar_id", "primary"),
)
# Contacts tools
elif tool_name == "create_contact":
return _create_contact(
given_name=tool_input["given_name"],
family_name=tool_input.get("family_name", ""),
email=tool_input.get("email", ""),
phone=tool_input.get("phone"),
notes=tool_input.get("notes"),
)
elif tool_name == "list_contacts":
return _list_contacts(
max_results=tool_input.get("max_results", 100),
query=tool_input.get("query"),
)
elif tool_name == "get_contact":
return _get_contact(
resource_name=tool_input["resource_name"],
)
else: else:
return f"Error: Unknown tool '{tool_name}'" return f"Error: Unknown tool '{tool_name}'"
except Exception as e: except Exception as e:
@@ -235,3 +521,295 @@ def _run_command(command: str, working_dir: str) -> str:
return "Error: Command timed out after 30 seconds" return "Error: Command timed out after 30 seconds"
except Exception as e: except Exception as e:
return f"Error running command: {str(e)}" return f"Error running command: {str(e)}"
# Google Tools Handlers
def _initialize_google_clients() -> tuple[Optional[Any], Optional[Any], Optional[Any]]:
"""Lazy initialization of Google clients.
Returns:
Tuple of (gmail_client, calendar_client, people_client) or (None, None, None) if not authorized
"""
global _gmail_client, _calendar_client, _people_client
if _gmail_client is not None and _calendar_client is not None and _people_client is not None:
return _gmail_client, _calendar_client, _people_client
try:
from google_tools.oauth_manager import GoogleOAuthManager
from google_tools.gmail_client import GmailClient
from google_tools.calendar_client import CalendarClient
from google_tools.people_client import PeopleClient
oauth_manager = GoogleOAuthManager()
credentials = oauth_manager.get_credentials()
if not credentials:
return None, None, None
_gmail_client = GmailClient(oauth_manager)
_calendar_client = CalendarClient(oauth_manager)
_people_client = PeopleClient(oauth_manager)
return _gmail_client, _calendar_client, _people_client
except Exception as e:
print(f"[Google Tools] Failed to initialize: {e}")
return None, None, None
def _send_email(
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> str:
"""Send an email via Gmail API."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.send_email(
to=to,
subject=subject,
body=body,
cc=cc,
reply_to_message_id=reply_to_message_id,
)
if result["success"]:
msg_id = result.get("message_id", "unknown")
return f"✓ Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}"
else:
return f"Error sending email: {result.get('error', 'Unknown error')}"
def _read_emails(
query: str = "",
max_results: int = 10,
include_body: bool = False,
) -> str:
"""Search and read emails from Gmail."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.search_emails(
query=query,
max_results=max_results,
include_body=include_body,
)
if result["success"]:
summary = result.get("summary", "")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return summary
else:
return f"Error reading emails: {result.get('error', 'Unknown error')}"
def _get_email(message_id: str, format_type: str = "text") -> str:
"""Get full content of a specific email."""
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = gmail_client.get_email(message_id=message_id, format_type=format_type)
if result["success"]:
email_data = result.get("email", {})
output = []
output.append(f"From: {email_data.get('from', 'Unknown')}")
output.append(f"To: {email_data.get('to', 'Unknown')}")
if email_data.get("cc"):
output.append(f"CC: {email_data['cc']}")
output.append(f"Subject: {email_data.get('subject', 'No subject')}")
output.append(f"Date: {email_data.get('date', 'Unknown')}")
output.append(f"\n{email_data.get('body', '')}")
if email_data.get("attachments"):
output.append(f"\nAttachments: {', '.join(email_data['attachments'])}")
full_output = "\n".join(output)
if len(full_output) > _MAX_TOOL_OUTPUT:
full_output = full_output[:_MAX_TOOL_OUTPUT] + "\n... (email truncated)"
return full_output
else:
return f"Error getting email: {result.get('error', 'Unknown error')}"
def _read_calendar(
days_ahead: int = 7,
calendar_id: str = "primary",
max_results: int = 20,
) -> str:
"""Read upcoming calendar events."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.list_events(
days_ahead=days_ahead,
calendar_id=calendar_id,
max_results=max_results,
)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f"Upcoming events (next {days_ahead} days):\n\n{summary}"
else:
return f"Error reading calendar: {result.get('error', 'Unknown error')}"
def _create_calendar_event(
summary: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
calendar_id: str = "primary",
) -> str:
"""Create a new calendar event."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.create_event(
summary=summary,
start_time=start_time,
end_time=end_time,
description=description,
location=location,
calendar_id=calendar_id,
)
if result["success"]:
event_id = result.get("event_id", "unknown")
html_link = result.get("html_link", "")
start = result.get("start", start_time)
output = [
f"✓ Calendar event created successfully!",
f"Title: {summary}",
f"Start: {start}",
f"Event ID: {event_id}",
]
if html_link:
output.append(f"Link: {html_link}")
if location:
output.append(f"Location: {location}")
return "\n".join(output)
else:
return f"Error creating calendar event: {result.get('error', 'Unknown error')}"
def _search_calendar(query: str, calendar_id: str = "primary") -> str:
"""Search calendar events by text query."""
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = calendar_client.search_events(query=query, calendar_id=calendar_id)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f'Search results for "{query}":\n\n{summary}'
else:
return f"Error searching calendar: {result.get('error', 'Unknown error')}"
# Contacts Tools Handlers
def _create_contact(
given_name: str,
family_name: str = "",
email: str = "",
phone: Optional[str] = None,
notes: Optional[str] = None,
) -> str:
"""Create a new Google contact."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.create_contact(
given_name=given_name,
family_name=family_name,
email=email,
phone=phone,
notes=notes,
)
if result["success"]:
name = result.get("name", given_name)
resource = result.get("resource_name", "")
return f"Contact created: {name}\nResource: {resource}"
else:
return f"Error creating contact: {result.get('error', 'Unknown error')}"
def _list_contacts(
max_results: int = 100,
query: Optional[str] = None,
) -> str:
"""List or search Google contacts."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.list_contacts(max_results=max_results, query=query)
if result["success"]:
summary = result.get("summary", "No contacts found.")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
return f"Contacts ({result.get('count', 0)} found):\n\n{summary}"
else:
return f"Error listing contacts: {result.get('error', 'Unknown error')}"
def _get_contact(resource_name: str) -> str:
"""Get full details of a specific contact."""
_, _, people_client = _initialize_google_clients()
if not people_client:
return "Error: Google not authorized. Run: python bot_runner.py --setup-google"
result = people_client.get_contact(resource_name=resource_name)
if result["success"]:
c = result.get("contact", {})
output = []
name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip()
output.append(f"Name: {name or '(no name)'}")
if c.get("email"):
output.append(f"Email: {c['email']}")
if c.get("phone"):
output.append(f"Phone: {c['phone']}")
if c.get("notes"):
output.append(f"Notes: {c['notes']}")
output.append(f"Resource: {c.get('resource_name', resource_name)}")
return "\n".join(output)
else:
return f"Error getting contact: {result.get('error', 'Unknown error')}"