"""Utility functions for Gmail/Calendar tools.""" import base64 import mimetypes import re from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email import encoders from html.parser import HTMLParser from pathlib import Path from typing import Dict, List, Optional class HTMLToText(HTMLParser): """Convert HTML to plain text.""" def __init__(self): super().__init__() self.text = [] self.skip = False def handle_data(self, data): if not self.skip: self.text.append(data) def handle_starttag(self, tag, attrs): if tag in ["script", "style"]: self.skip = True elif tag == "br": self.text.append("\n") elif tag == "p": self.text.append("\n\n") def handle_endtag(self, tag): if tag in ["script", "style"]: self.skip = False elif tag in ["p", "div"]: self.text.append("\n") def get_text(self): return "".join(self.text).strip() def html_to_text(html: str) -> str: """Convert HTML to plain text. Args: html: HTML content Returns: Plain text content """ parser = HTMLToText() parser.feed(html) return parser.get_text() def create_mime_message( to: str, subject: str, body: str, from_email: str = "me", cc: Optional[List[str]] = None, reply_to_message_id: Optional[str] = None, attachments: Optional[List[str]] = None, ) -> Dict: """Create a MIME message for Gmail API. Args: to: Recipient email address subject: Email subject body: Email body (plain text or HTML) from_email: Sender email (default: "me") cc: Optional list of CC recipients reply_to_message_id: Optional message ID to reply to attachments: Optional list of file paths to attach Returns: Dict with 'raw' key containing base64url-encoded message """ is_html = bool(re.search(r"<[a-z][\s\S]*>", body, re.IGNORECASE)) # Build the body part first if attachments: # mixed wraps body alternative + file parts message = MIMEMultipart("mixed") body_part = MIMEMultipart("alternative") else: message = MIMEMultipart("alternative") body_part = message message["To"] = to message["From"] = from_email message["Subject"] = subject if cc: message["Cc"] = ", ".join(cc) if reply_to_message_id: message["In-Reply-To"] = reply_to_message_id message["References"] = reply_to_message_id if is_html: body_part.attach(MIMEText(html_to_text(body), "plain")) body_part.attach(MIMEText(body, "html")) else: body_part.attach(MIMEText(body, "plain")) if attachments: message.attach(body_part) for file_path in attachments: path = Path(file_path) mime_type, _ = mimetypes.guess_type(str(path)) main_type, sub_type = (mime_type or "application/octet-stream").split("/", 1) with open(path, "rb") as f: part = MIMEBase(main_type, sub_type) part.set_payload(f.read()) encoders.encode_base64(part) part.add_header("Content-Disposition", "attachment", filename=path.name) message.attach(part) raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode() return {"raw": raw_message} def parse_email_message(message: Dict) -> Dict: """Parse Gmail API message into readable format. Args: message: Gmail API message object Returns: Dict with parsed fields: from, to, subject, date, body, snippet """ headers = { h["name"].lower(): h["value"] for h in message.get("payload", {}).get("headers", []) } result = { "id": message.get("id"), "thread_id": message.get("threadId"), "from": headers.get("from", ""), "to": headers.get("to", ""), "cc": headers.get("cc", ""), "subject": headers.get("subject", ""), "date": headers.get("date", ""), "snippet": message.get("snippet", ""), "labels": message.get("labelIds", []), } return result def get_email_body(message: Dict) -> str: """Extract email body from Gmail API message. Args: message: Gmail API message object Returns: Email body as plain text """ payload = message.get("payload", {}) def get_body_from_part(part: Dict) -> Optional[str]: """Recursively extract body from MIME parts.""" mime_type = part.get("mimeType", "") body_data = part.get("body", {}).get("data") if body_data: decoded = base64.urlsafe_b64decode(body_data).decode("utf-8", errors="ignore") if mime_type == "text/html": return html_to_text(decoded) elif mime_type == "text/plain": return decoded # Check nested parts for subpart in part.get("parts", []): result = get_body_from_part(subpart) if result: return result return None # Try to get body body = get_body_from_part(payload) if not body: # Fallback to snippet body = message.get("snippet", "") return body def format_email_summary(emails: List[Dict], include_body: bool = False) -> str: """Format emails into a readable summary. Args: emails: List of parsed email dicts include_body: Whether to include full body Returns: Formatted string summary """ if not emails: return "No emails found." lines = [] for i, email_data in enumerate(emails, 1): lines.append(f"{i}. From: {email_data['from']}") lines.append(f" Subject: {email_data['subject']}") lines.append(f" Date: {email_data['date']}") lines.append(f" Message-ID: {email_data.get('id', 'N/A')}") if include_body and "body" in email_data: # Truncate long bodies body = email_data["body"] if len(body) > 2000: body = body[:2000] + "..." lines.append(f" Body: {body}") else: lines.append(f" Snippet: {email_data['snippet']}") lines.append("") # Blank line return "\n".join(lines)