209 lines
5.5 KiB
Python
209 lines
5.5 KiB
Python
|
|
"""Utility functions for Gmail/Calendar tools."""
|
||
|
|
|
||
|
|
import base64
|
||
|
|
import email
|
||
|
|
import re
|
||
|
|
from email.mime.multipart import MIMEMultipart
|
||
|
|
from email.mime.text import MIMEText
|
||
|
|
from html.parser import HTMLParser
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
|
||
|
|
|
||
|
|
class HTMLToText(HTMLParser):
|
||
|
|
"""Convert HTML to plain text."""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
self.text = []
|
||
|
|
self.skip = False
|
||
|
|
|
||
|
|
def handle_data(self, data):
|
||
|
|
if not self.skip:
|
||
|
|
self.text.append(data)
|
||
|
|
|
||
|
|
def handle_starttag(self, tag, attrs):
|
||
|
|
if tag in ["script", "style"]:
|
||
|
|
self.skip = True
|
||
|
|
elif tag == "br":
|
||
|
|
self.text.append("\n")
|
||
|
|
elif tag == "p":
|
||
|
|
self.text.append("\n\n")
|
||
|
|
|
||
|
|
def handle_endtag(self, tag):
|
||
|
|
if tag in ["script", "style"]:
|
||
|
|
self.skip = False
|
||
|
|
elif tag in ["p", "div"]:
|
||
|
|
self.text.append("\n")
|
||
|
|
|
||
|
|
def get_text(self):
|
||
|
|
return "".join(self.text).strip()
|
||
|
|
|
||
|
|
|
||
|
|
def html_to_text(html: str) -> str:
|
||
|
|
"""Convert HTML to plain text.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
html: HTML content
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Plain text content
|
||
|
|
"""
|
||
|
|
parser = HTMLToText()
|
||
|
|
parser.feed(html)
|
||
|
|
return parser.get_text()
|
||
|
|
|
||
|
|
|
||
|
|
def create_mime_message(
|
||
|
|
to: str,
|
||
|
|
subject: str,
|
||
|
|
body: str,
|
||
|
|
from_email: str = "me",
|
||
|
|
cc: Optional[List[str]] = None,
|
||
|
|
reply_to_message_id: Optional[str] = None,
|
||
|
|
) -> Dict:
|
||
|
|
"""Create a MIME message for Gmail API.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
to: Recipient email address
|
||
|
|
subject: Email subject
|
||
|
|
body: Email body (plain text or HTML)
|
||
|
|
from_email: Sender email (default: "me")
|
||
|
|
cc: Optional list of CC recipients
|
||
|
|
reply_to_message_id: Optional message ID to reply to
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with 'raw' key containing base64url-encoded message
|
||
|
|
"""
|
||
|
|
message = MIMEMultipart("alternative")
|
||
|
|
message["To"] = to
|
||
|
|
message["From"] = from_email
|
||
|
|
message["Subject"] = subject
|
||
|
|
|
||
|
|
if cc:
|
||
|
|
message["Cc"] = ", ".join(cc)
|
||
|
|
|
||
|
|
if reply_to_message_id:
|
||
|
|
message["In-Reply-To"] = reply_to_message_id
|
||
|
|
message["References"] = reply_to_message_id
|
||
|
|
|
||
|
|
# Try to detect if body is HTML
|
||
|
|
is_html = bool(re.search(r"<[a-z][\s\S]*>", body, re.IGNORECASE))
|
||
|
|
|
||
|
|
if is_html:
|
||
|
|
# Add both plain text and HTML versions
|
||
|
|
text_part = MIMEText(html_to_text(body), "plain")
|
||
|
|
html_part = MIMEText(body, "html")
|
||
|
|
message.attach(text_part)
|
||
|
|
message.attach(html_part)
|
||
|
|
else:
|
||
|
|
# Plain text only
|
||
|
|
text_part = MIMEText(body, "plain")
|
||
|
|
message.attach(text_part)
|
||
|
|
|
||
|
|
# Encode as base64url
|
||
|
|
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
|
||
|
|
return {"raw": raw_message}
|
||
|
|
|
||
|
|
|
||
|
|
def parse_email_message(message: Dict) -> Dict:
|
||
|
|
"""Parse Gmail API message into readable format.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
message: Gmail API message object
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with parsed fields: from, to, subject, date, body, snippet
|
||
|
|
"""
|
||
|
|
headers = {
|
||
|
|
h["name"].lower(): h["value"]
|
||
|
|
for h in message.get("payload", {}).get("headers", [])
|
||
|
|
}
|
||
|
|
|
||
|
|
result = {
|
||
|
|
"id": message.get("id"),
|
||
|
|
"thread_id": message.get("threadId"),
|
||
|
|
"from": headers.get("from", ""),
|
||
|
|
"to": headers.get("to", ""),
|
||
|
|
"cc": headers.get("cc", ""),
|
||
|
|
"subject": headers.get("subject", ""),
|
||
|
|
"date": headers.get("date", ""),
|
||
|
|
"snippet": message.get("snippet", ""),
|
||
|
|
"labels": message.get("labelIds", []),
|
||
|
|
}
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def get_email_body(message: Dict) -> str:
|
||
|
|
"""Extract email body from Gmail API message.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
message: Gmail API message object
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Email body as plain text
|
||
|
|
"""
|
||
|
|
payload = message.get("payload", {})
|
||
|
|
|
||
|
|
def get_body_from_part(part: Dict) -> Optional[str]:
|
||
|
|
"""Recursively extract body from MIME parts."""
|
||
|
|
mime_type = part.get("mimeType", "")
|
||
|
|
body_data = part.get("body", {}).get("data")
|
||
|
|
|
||
|
|
if body_data:
|
||
|
|
decoded = base64.urlsafe_b64decode(body_data).decode("utf-8", errors="ignore")
|
||
|
|
if mime_type == "text/html":
|
||
|
|
return html_to_text(decoded)
|
||
|
|
elif mime_type == "text/plain":
|
||
|
|
return decoded
|
||
|
|
|
||
|
|
# Check nested parts
|
||
|
|
for subpart in part.get("parts", []):
|
||
|
|
result = get_body_from_part(subpart)
|
||
|
|
if result:
|
||
|
|
return result
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Try to get body
|
||
|
|
body = get_body_from_part(payload)
|
||
|
|
|
||
|
|
if not body:
|
||
|
|
# Fallback to snippet
|
||
|
|
body = message.get("snippet", "")
|
||
|
|
|
||
|
|
return body
|
||
|
|
|
||
|
|
|
||
|
|
def format_email_summary(emails: List[Dict], include_body: bool = False) -> str:
|
||
|
|
"""Format emails into a readable summary.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
emails: List of parsed email dicts
|
||
|
|
include_body: Whether to include full body
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Formatted string summary
|
||
|
|
"""
|
||
|
|
if not emails:
|
||
|
|
return "No emails found."
|
||
|
|
|
||
|
|
lines = []
|
||
|
|
for i, email_data in enumerate(emails, 1):
|
||
|
|
lines.append(f"{i}. From: {email_data['from']}")
|
||
|
|
lines.append(f" Subject: {email_data['subject']}")
|
||
|
|
lines.append(f" Date: {email_data['date']}")
|
||
|
|
|
||
|
|
if include_body and "body" in email_data:
|
||
|
|
# Truncate long bodies
|
||
|
|
body = email_data["body"]
|
||
|
|
if len(body) > 500:
|
||
|
|
body = body[:500] + "..."
|
||
|
|
lines.append(f" Body: {body}")
|
||
|
|
else:
|
||
|
|
lines.append(f" Snippet: {email_data['snippet']}")
|
||
|
|
|
||
|
|
lines.append("") # Blank line
|
||
|
|
|
||
|
|
return "\n".join(lines)
|