Files
ajarbot/google_tools/gmail_client.py

221 lines
6.4 KiB
Python
Raw Permalink Normal View History

"""Gmail API client for sending and reading emails."""
from typing import Dict, List, Optional
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .oauth_manager import GoogleOAuthManager
from .utils import (
create_mime_message,
format_email_summary,
get_email_body,
parse_email_message,
)
class GmailClient:
"""Client for Gmail API operations."""
def __init__(self, oauth_manager: GoogleOAuthManager):
"""Initialize Gmail client.
Args:
oauth_manager: Initialized OAuth manager with valid credentials
"""
self.oauth_manager = oauth_manager
self.service = None
self._initialize_service()
def _initialize_service(self) -> bool:
"""Initialize Gmail API service.
Returns:
True if initialized successfully, False otherwise
"""
credentials = self.oauth_manager.get_credentials()
if not credentials:
return False
try:
self.service = build("gmail", "v1", credentials=credentials)
return True
except Exception as e:
print(f"[Gmail] Failed to initialize service: {e}")
return False
def send_email(
self,
to: str,
subject: str,
body: str,
cc: Optional[List[str]] = None,
reply_to_message_id: Optional[str] = None,
) -> Dict:
"""Send an email.
Args:
to: Recipient email address
subject: Email subject
body: Email body (plain text or HTML)
cc: Optional list of CC recipients
reply_to_message_id: Optional message ID to reply to (for threading)
Returns:
Dict with success status and message_id or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
message = create_mime_message(
to=to,
subject=subject,
body=body,
cc=cc,
reply_to_message_id=reply_to_message_id,
)
result = (
self.service.users()
.messages()
.send(userId="me", body=message)
.execute()
)
return {
"success": True,
"message_id": result.get("id"),
"thread_id": result.get("threadId"),
}
except HttpError as e:
return {"success": False, "error": str(e)}
def search_emails(
self,
query: str = "",
max_results: int = 10,
include_body: bool = False,
) -> Dict:
"""Search emails using Gmail search syntax.
Args:
query: Gmail search query (e.g., "from:john@example.com after:2026/02/10")
max_results: Maximum number of results to return (max: 50)
include_body: Whether to include full email body
Returns:
Dict with success status and emails or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
# Limit max_results to 50 to avoid token overload
max_results = min(max_results, 50)
# Search for messages
results = (
self.service.users()
.messages()
.list(userId="me", q=query, maxResults=max_results)
.execute()
)
messages = results.get("messages", [])
if not messages:
return {
"success": True,
"emails": [],
"count": 0,
"summary": "No emails found.",
}
# Fetch full message details
emails = []
for msg in messages:
message = (
self.service.users()
.messages()
.get(userId="me", id=msg["id"], format="full")
.execute()
)
email_data = parse_email_message(message)
if include_body:
email_data["body"] = get_email_body(message)
emails.append(email_data)
summary = format_email_summary(emails, include_body=include_body)
return {
"success": True,
"emails": emails,
"count": len(emails),
"summary": summary,
}
except HttpError as e:
return {"success": False, "error": str(e)}
def get_email(self, message_id: str, format_type: str = "text") -> Dict:
"""Get full email by ID.
Args:
message_id: Gmail message ID
format_type: "text" or "html" (default: text)
Returns:
Dict with success status and email data or error
"""
if not self.service:
if not self._initialize_service():
return {
"success": False,
"error": "Not authorized. Run: python bot_runner.py --setup-google",
}
try:
message = (
self.service.users()
.messages()
.get(userId="me", id=message_id, format="full")
.execute()
)
email_data = parse_email_message(message)
email_data["body"] = get_email_body(message)
# Get attachment info if any
payload = message.get("payload", {})
attachments = []
for part in payload.get("parts", []):
if part.get("filename"):
attachments.append({
"filename": part["filename"],
"mime_type": part.get("mimeType"),
"size": part.get("body", {}).get("size", 0),
})
email_data["attachments"] = attachments
return {
"success": True,
"email": email_data,
}
except HttpError as e:
return {"success": False, "error": str(e)}