Files
ajarbot/google_tools/utils.py
Jordan Ramos dc14baf426 Add Gmail and Google Calendar integration
Implements on-demand Google tools (not adapter) for email and calendar access via OAuth2.

Features:
- OAuth2 user consent flow with automatic token refresh
- 3 Gmail tools: send_email, read_emails, get_email
- 3 Calendar tools: read_calendar, create_calendar_event, search_calendar
- Lazy loading pattern for Google clients
- Secure token storage with file permissions
- Browser-based setup: python bot_runner.py --setup-google

Architecture:
- Tools-only approach (zero API calls when not in use)
- User-initiated actions only (no continuous polling)
- MIME message creation for emails with threading support
- HTML to text conversion for email parsing
- ISO 8601 timestamp handling for calendar events

Files added:
- google_tools/oauth_manager.py: OAuth2 flow and token management
- google_tools/gmail_client.py: Gmail API wrapper
- google_tools/calendar_client.py: Calendar API wrapper
- google_tools/utils.py: Email/MIME helpers
- config/scheduled_tasks.yaml: Example scheduled tasks config

Files modified:
- tools.py: Added 6 Google tool handlers with lazy initialization
- bot_runner.py: Added --setup-google command for OAuth authorization
- requirements.txt: Added Google API dependencies
- .gitignore: Added google_credentials.yaml and google_oauth_token.json

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-14 10:29:28 -07:00

209 lines
5.5 KiB
Python

"""Utility functions for Gmail/Calendar tools."""
import base64
import email
import re
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from html.parser import HTMLParser
from typing import Dict, List, Optional
class HTMLToText(HTMLParser):
"""Convert HTML to plain text."""
def __init__(self):
super().__init__()
self.text = []
self.skip = False
def handle_data(self, data):
if not self.skip:
self.text.append(data)
def handle_starttag(self, tag, attrs):
if tag in ["script", "style"]:
self.skip = True
elif tag == "br":
self.text.append("\n")
elif tag == "p":
self.text.append("\n\n")
def handle_endtag(self, tag):
if tag in ["script", "style"]:
self.skip = False
elif tag in ["p", "div"]:
self.text.append("\n")
def get_text(self):
return "".join(self.text).strip()
def html_to_text(html: str) -> str:
"""Convert HTML to plain text.
Args:
html: HTML content
Returns:
Plain text content
"""
parser = HTMLToText()
parser.feed(html)
return parser.get_text()
def create_mime_message(
to: str,
subject: str,
body: str,
from_email: str = "me",
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> Dict:
"""Create a MIME message for Gmail API.
Args:
to: Recipient email address
subject: Email subject
body: Email body (plain text or HTML)
from_email: Sender email (default: "me")
cc: Optional list of CC recipients
reply_to_message_id: Optional message ID to reply to
Returns:
Dict with 'raw' key containing base64url-encoded message
"""
message = MIMEMultipart("alternative")
message["To"] = to
message["From"] = from_email
message["Subject"] = subject
if cc:
message["Cc"] = ", ".join(cc)
if reply_to_message_id:
message["In-Reply-To"] = reply_to_message_id
message["References"] = reply_to_message_id
# Try to detect if body is HTML
is_html = bool(re.search(r"<[a-z][\s\S]*>", body, re.IGNORECASE))
if is_html:
# Add both plain text and HTML versions
text_part = MIMEText(html_to_text(body), "plain")
html_part = MIMEText(body, "html")
message.attach(text_part)
message.attach(html_part)
else:
# Plain text only
text_part = MIMEText(body, "plain")
message.attach(text_part)
# Encode as base64url
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {"raw": raw_message}
def parse_email_message(message: Dict) -> Dict:
"""Parse Gmail API message into readable format.
Args:
message: Gmail API message object
Returns:
Dict with parsed fields: from, to, subject, date, body, snippet
"""
headers = {
h["name"].lower(): h["value"]
for h in message.get("payload", {}).get("headers", [])
}
result = {
"id": message.get("id"),
"thread_id": message.get("threadId"),
"from": headers.get("from", ""),
"to": headers.get("to", ""),
"cc": headers.get("cc", ""),
"subject": headers.get("subject", ""),
"date": headers.get("date", ""),
"snippet": message.get("snippet", ""),
"labels": message.get("labelIds", []),
}
return result
def get_email_body(message: Dict) -> str:
"""Extract email body from Gmail API message.
Args:
message: Gmail API message object
Returns:
Email body as plain text
"""
payload = message.get("payload", {})
def get_body_from_part(part: Dict) -> Optional[str]:
"""Recursively extract body from MIME parts."""
mime_type = part.get("mimeType", "")
body_data = part.get("body", {}).get("data")
if body_data:
decoded = base64.urlsafe_b64decode(body_data).decode("utf-8", errors="ignore")
if mime_type == "text/html":
return html_to_text(decoded)
elif mime_type == "text/plain":
return decoded
# Check nested parts
for subpart in part.get("parts", []):
result = get_body_from_part(subpart)
if result:
return result
return None
# Try to get body
body = get_body_from_part(payload)
if not body:
# Fallback to snippet
body = message.get("snippet", "")
return body
def format_email_summary(emails: List[Dict], include_body: bool = False) -> str:
"""Format emails into a readable summary.
Args:
emails: List of parsed email dicts
include_body: Whether to include full body
Returns:
Formatted string summary
"""
if not emails:
return "No emails found."
lines = []
for i, email_data in enumerate(emails, 1):
lines.append(f"{i}. From: {email_data['from']}")
lines.append(f" Subject: {email_data['subject']}")
lines.append(f" Date: {email_data['date']}")
if include_body and "body" in email_data:
# Truncate long bodies
body = email_data["body"]
if len(body) > 500:
body = body[:500] + "..."
lines.append(f" Body: {body}")
else:
lines.append(f" Snippet: {email_data['snippet']}")
lines.append("") # Blank line
return "\n".join(lines)