"""Gmail API client for sending and reading emails.""" import base64 import os from pathlib import Path from typing import Dict, List, Optional from googleapiclient.discovery import build from googleapiclient.errors import HttpError from .oauth_manager import GoogleOAuthManager from .utils import ( create_mime_message, format_email_summary, get_email_body, parse_email_message, ) class GmailClient: """Client for Gmail API operations.""" def __init__(self, oauth_manager: GoogleOAuthManager): """Initialize Gmail client. Args: oauth_manager: Initialized OAuth manager with valid credentials """ self.oauth_manager = oauth_manager self.service = None self._initialize_service() def _initialize_service(self) -> bool: """Initialize Gmail API service. Returns: True if initialized successfully, False otherwise """ credentials = self.oauth_manager.get_credentials() if not credentials: return False try: self.service = build("gmail", "v1", credentials=credentials) return True except Exception as e: print(f"[Gmail] Failed to initialize service: {e}") return False def send_email( self, to: str, subject: str, body: str, cc: Optional[List[str]] = None, reply_to_message_id: Optional[str] = None, ) -> Dict: """Send an email. Args: to: Recipient email address subject: Email subject body: Email body (plain text or HTML) cc: Optional list of CC recipients reply_to_message_id: Optional message ID to reply to (for threading) Returns: Dict with success status and message_id or error """ if not self.service: if not self._initialize_service(): return { "success": False, "error": "Not authorized. Run: python bot_runner.py --setup-google", } try: message = create_mime_message( to=to, subject=subject, body=body, cc=cc, reply_to_message_id=reply_to_message_id, ) result = ( self.service.users() .messages() .send(userId="me", body=message) .execute() ) return { "success": True, "message_id": result.get("id"), "thread_id": result.get("threadId"), } except HttpError as e: return {"success": False, "error": str(e)} def search_emails( self, query: str = "", max_results: int = 10, include_body: bool = False, ) -> Dict: """Search emails using Gmail search syntax. Args: query: Gmail search query (e.g., "from:john@example.com after:2026/02/10") max_results: Maximum number of results to return (max: 50) include_body: Whether to include full email body Returns: Dict with success status and emails or error """ if not self.service: if not self._initialize_service(): return { "success": False, "error": "Not authorized. Run: python bot_runner.py --setup-google", } try: # Limit max_results to 50 to avoid token overload max_results = min(max_results, 50) # Search for messages results = ( self.service.users() .messages() .list(userId="me", q=query, maxResults=max_results) .execute() ) messages = results.get("messages", []) if not messages: return { "success": True, "emails": [], "count": 0, "summary": "No emails found.", } # Fetch full message details emails = [] for msg in messages: message = ( self.service.users() .messages() .get(userId="me", id=msg["id"], format="full") .execute() ) email_data = parse_email_message(message) if include_body: email_data["body"] = get_email_body(message) emails.append(email_data) summary = format_email_summary(emails, include_body=include_body) return { "success": True, "emails": emails, "count": len(emails), "summary": summary, } except HttpError as e: return {"success": False, "error": str(e)} def get_email(self, message_id: str, format_type: str = "text") -> Dict: """Get full email by ID. Args: message_id: Gmail message ID format_type: "text" or "html" (default: text) Returns: Dict with success status and email data or error """ if not self.service: if not self._initialize_service(): return { "success": False, "error": "Not authorized. Run: python bot_runner.py --setup-google", } try: message = ( self.service.users() .messages() .get(userId="me", id=message_id, format="full") .execute() ) email_data = parse_email_message(message) email_data["body"] = get_email_body(message) # Get attachment info if any payload = message.get("payload", {}) attachments = [] def extract_attachments(parts): """Recursively extract attachments from message parts.""" for part in parts: filename = part.get("filename") if filename: body = part.get("body", {}) attachment_id = body.get("attachmentId") if attachment_id: attachments.append({ "filename": filename, "attachment_id": attachment_id, "mime_type": part.get("mimeType"), "size": body.get("size", 0), }) # Recursively check nested parts if "parts" in part: extract_attachments(part["parts"]) if "parts" in payload: extract_attachments(payload["parts"]) email_data["attachments"] = attachments return { "success": True, "email": email_data, } except HttpError as e: return {"success": False, "error": str(e)} def download_attachment( self, message_id: str, attachment_id: str, filename: str, output_dir: str = "downloads", ) -> Dict: """Download an email attachment. Args: message_id: Gmail message ID attachment_id: Attachment ID from the message filename: Original filename of the attachment output_dir: Directory to save the attachment (default: "downloads") Returns: Dict with success status and file path or error """ if not self.service: if not self._initialize_service(): return { "success": False, "error": "Not authorized. Run: python bot_runner.py --setup-google", } try: # Get the attachment data attachment = ( self.service.users() .messages() .attachments() .get(userId="me", messageId=message_id, id=attachment_id) .execute() ) # Decode the attachment data file_data = base64.urlsafe_b64decode(attachment["data"]) # Create output directory if it doesn't exist output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Save the file file_path = output_path / filename with open(file_path, "wb") as f: f.write(file_data) return { "success": True, "file_path": str(file_path), "filename": filename, "size": len(file_data), } except HttpError as e: return {"success": False, "error": str(e)} except Exception as e: return {"success": False, "error": f"Failed to save attachment: {str(e)}"}