Skip to content

Instantly share code, notes, and snippets.

@ColeMurray
Created August 8, 2024 06:07
Show Gist options
  • Save ColeMurray/63baface4dcbede69390c5a3c9f22940 to your computer and use it in GitHub Desktop.
Save ColeMurray/63baface4dcbede69390c5a3c9f22940 to your computer and use it in GitHub Desktop.
Using GPT to auto-label gmail
import os
import base64
import json
import logging
from datetime import datetime, timedelta
from typing import List
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import Resource, build
from googleapiclient.errors import HttpError
from dotenv import load_dotenv
from openai import OpenAI
import sqlite3
import requests
import json
import logging
import time
from ratelimit import limits, sleep_and_retry
from google.auth.transport.requests import Request
# Load environment variables
load_dotenv()
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
llm_log_file = 'llm_interactions.jsonl'
# Constants and configurations
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.labels",
"https://www.googleapis.com/auth/gmail.modify",
]
TOKEN_FILE = "token.json"
CREDENTIALS_FILE = "credentials.json"
LAST_RUN_FILE = "last_run.json"
PROCESSED_LABEL = "Processed"
CATEGORY_LABELS = [
"Marketing",
"Response Needed / High Priority",
"Bills",
"Subscriptions",
"Newsletters",
"Personal",
"Work",
"Events",
"Travel",
"Receipts",
"Low quality",
"Notifications"
]
DATABASE_FILE = "email_states.db"
PREVIEW_MODE = False
# OpenAI configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = "gpt-4o-mini"
LLM_SERVICE = os.getenv("LLM_SERVICE", "OpenAI") # Default to OpenAI if not specified
# Ollama API URL
OLLAMA_API_URL = "http://0.0.0.0:11434/api/chat"
# Set up rate limiting: adjust as needed for your local setup
@sleep_and_retry
@limits(calls=500, period=60)
def call_ollama_api(prompt):
payload = {
"stream": False,
"format": "json",
"model": "llama3.1",
"messages": [
{"role": "user", "content": prompt}
]
}
try:
start_time = time.time()
response = requests.post(OLLAMA_API_URL, json=payload)
response.raise_for_status()
end_time = time.time()
# Log the request and response
log_entry = {
"request_timestamp": start_time,
"response_timestamp": end_time,
"duration": end_time - start_time,
"request": payload,
"response": response.json()
}
with open(llm_log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
return response.json()['message']['content']
except requests.exceptions.RequestException as e:
logging.error(f"Ollama API error: {str(e)}")
raise
def initialize_db():
"""Initialize the SQLite database and create the necessary tables."""
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS email_states (
email_id TEXT PRIMARY KEY,
labels TEXT
)
''')
conn.commit()
conn.close()
def store_email_state(email_id: str, labels: List[str]):
"""Stores the initial state of an email's labels in the database."""
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO email_states (email_id, labels) VALUES (?, ?)
ON CONFLICT(email_id) DO UPDATE SET labels=excluded.labels;
''', (email_id, json.dumps(labels)))
conn.commit()
conn.close()
def retrieve_email_state(email_id: str) -> List[str]:
"""Retrieves the stored state of an email's labels from the database."""
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('SELECT labels FROM email_states WHERE email_id = ?', (email_id,))
row = cursor.fetchone()
conn.close()
return json.loads(row[0]) if row else []
def get_gmail_client() -> Resource:
"""Creates and returns a Gmail client."""
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
creds = flow.run_local_server(port=8080)
with open(TOKEN_FILE, "w") as token:
token.write(creds.to_json())
return build("gmail", "v1", credentials=creds)
def get_last_run_time() -> datetime:
"""Gets the last run time from file or returns a default time."""
if os.path.exists(LAST_RUN_FILE):
with open(LAST_RUN_FILE, 'r') as f:
data = json.load(f)
return datetime.fromisoformat(data['last_run'])
return datetime.now() - timedelta(days=7) # Default to 7 days ago if no last run
def update_last_run_time():
"""Updates the last run time in the file."""
with open(LAST_RUN_FILE, 'w') as f:
json.dump({'last_run': datetime.now().isoformat()}, f)
def build_query(last_run: datetime) -> str:
"""Builds the query string for fetching emails."""
#return f"is:unread -label:{PROCESSED_LABEL} after:{last_run.strftime('%Y/%m/%d')}"
return f"is:unread after:{last_run.strftime('%Y/%m/%d')}"
def fetch_emails(gmail: Resource, query: str) -> List[dict]:
"""Fetches emails based on the given query."""
try:
results = gmail.users().messages().list(userId="me", q=query).execute()
return results.get("messages", [])
except HttpError as error:
logging.error(f"Failed to fetch emails: {error}")
raise
def get_or_create_label(gmail: Resource, label_name: str) -> str:
"""Gets or creates a label and returns its ID."""
try:
results = gmail.users().labels().list(userId="me").execute()
labels = results.get("labels", [])
for label in labels:
if label["name"] == label_name:
return label["id"]
# If the label doesn't exist, create it
label = {
"name": label_name,
"labelListVisibility": "labelShow",
"messageListVisibility": "show"
}
created_label = gmail.users().labels().create(userId="me", body=label).execute()
return created_label["id"]
except HttpError as error:
logging.error(f"An error occurred while managing label {label_name}: {error}")
return None
def categorize_email_with_openai(email_content: str) -> str:
"""Categorizes an email using OpenAI's language model."""
client = OpenAI(api_key=OPENAI_API_KEY)
prompt = f"""
Categorize the following email into one of these categories: {', '.join(CATEGORY_LABELS)}.
Respond with only the category name.
Email content:
{email_content}
"""
try:
start_time = time.time()
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are an AI assistant that categorizes emails."},
{"role": "user", "content": prompt}
],
max_tokens=10,
temperature=0.3
)
end_time = time.time()
log_entry = {
"request_timestamp": start_time,
"response_timestamp": end_time,
"duration": end_time - start_time,
"request": {"prompt": prompt},
"response": response.choices[0].text.strip()
}
with open(llm_log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
category = response.choices[0].message.content.strip()
return category if category in CATEGORY_LABELS else "Other"
except Exception as e:
logging.error(f"Error in OpenAI categorization: {e}")
return "Other"
def categorize_email_with_ollama(email_content: str) -> str:
"""Categorizes an email using the local Ollama LLM."""
try:
system_prompt = f"""You are an AI trained to categorize emails into predefined categories.
1. Provide a concise explanation for the selected category basd on the email.
2. Categorize the following email into one of these categories. Only use the provided category labels and their descriptions:
CATEGORY_LABELS = [
"Marketing" – Emails promoting products, services, or sales,
"Response Needed / High Priority" – Emails requiring urgent attention or action,
"Bills" – Emails related to payments or invoices,
"Subscriptions" – Emails about subscription services or renewals,
"Newsletters" – Regularly scheduled updates or informational emails,
"Personal" – Emails from friends, family, or personal contacts,
"Work" – Emails related to your job or professional activities,
"Events" – Emails about upcoming events or invitations,
"Travel" – Emails regarding travel plans, bookings, or itineraries,
"Receipts" – Emails containing proof of purchase or transaction details,
"Low quality" – Emails with spammy content or poor quality,
"Notifications" – Automated emails notifying about account activities or updates
].
Please respond in the following JSON format and restrict your response to the provided category labels:
{{
"explanation": "string",
"category": "string"
}}
"""
prompt = f"""
<Email>
{email_content}
</Email>
"""
response = call_ollama_api(prompt + system_prompt)
print(response)
print(type(response))
category = json.loads(response)['category']
print(category)
return category if category in CATEGORY_LABELS else "Other"
except Exception as e:
logging.error(f"Error in Ollama categorization: {str(e)}")
return "Other"
def categorize_email(email_content: str) -> str:
"""Wrapper function to categorize email using the selected LLM service."""
if LLM_SERVICE == "OpenAI":
return categorize_email_with_openai(email_content)
elif LLM_SERVICE == "Ollama":
return categorize_email_with_ollama(email_content)
else:
logging.error("Invalid LLM service specified.")
return "Other"
def get_email_content(gmail: Resource, email_id: str) -> str:
"""Retrieves the content of an email."""
try:
message = gmail.users().messages().get(userId="me", id=email_id, format="full").execute()
headers = message["payload"]["headers"]
subject = next(h["value"] for h in headers if h["name"].lower() == "subject")
from_header = next(h["value"] for h in headers if h["name"].lower() == "from")
# Process the email body, considering both simple and multipart emails
body = ''
parts = message.get("payload", {}).get("parts", [])
if "data" in message["payload"]["body"]:
body = message["payload"]["body"]["data"]
else:
for part in parts:
if "data" in part["body"]:
body += base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8")
elif "parts" in part: # Nested parts in multipart emails
for subpart in part["parts"]:
if "data" in subpart["body"]:
body += base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8")
# If no 'data' is found in any parts, body remains empty
if not body:
logging.warning(f"No content found in the body of email {email_id}")
print(f"Subject: {subject}\nFrom: {from_header}\n")
return f"Subject: {subject}\nFrom: {from_header}\n\n{body}"
except HttpError as error:
logging.error(f"An error occurred while retrieving email {email_id}: {error}")
return ""
except Exception as e:
logging.error(f"Unexpected error when processing email {email_id}: {e}")
return ""
def add_labels_to_email(gmail: Resource, email_id: str, label_ids: List[str]):
"""Adds labels to a specific email."""
if PREVIEW_MODE:
logging.info(f"Preview: Would add labels {label_ids} to email {email_id}")
return
try:
gmail.users().messages().modify(
userId="me",
id=email_id,
body={"addLabelIds": label_ids}
).execute()
logging.info(f"Labels added to email {email_id}")
except HttpError as error:
logging.error(f"An error occurred while adding labels to email {email_id}: {error}")
def remove_from_inbox(gmail: Resource, email_id: str):
"""Remove an email from the inbox."""
try:
gmail.users().messages().modify(
userId='me',
id=email_id,
body={'removeLabelIds': ['INBOX']}
).execute()
logging.info(f"Email {email_id} has been removed from the inbox.")
except HttpError as error:
logging.error(f"Failed to remove email {email_id} from the inbox: {error}")
def main():
gm = get_gmail_client()
last_run = get_last_run_time()
query = build_query(last_run)
mails = fetch_emails(gm, query)
if not mails:
logging.info("No new unread emails found since the last run.")
return
logging.info(f"Found {len(mails)} new unread emails to process.")
processed_label_id = get_or_create_label(gm, PROCESSED_LABEL)
category_label_ids = {label: get_or_create_label(gm, label) for label in CATEGORY_LABELS}
if not processed_label_id or not all(category_label_ids.values()):
logging.error("Failed to get or create labels. Aborting...")
return
for mail in mails:
email_content = get_email_content(gm, mail["id"])
original_labels = gm.users().messages().get(userId="me", id=mail["id"], format="minimal").execute().get(
'labelIds', [])
store_email_state(mail["id"], original_labels)
category = categorize_email(email_content)
if category == 'Other':
logging.warning(f"Could not categorize email {mail['id']}.")
# rollback_email(mail["id"])
continue
label_ids_to_add = [processed_label_id, category_label_ids[category]]
add_labels_to_email(gm, mail["id"], label_ids_to_add)
# Check if the email needs to be moved out of the inbox
if category in ["Marketing", "Newsletters", "Low quality"]:
remove_from_inbox(gm, mail["id"])
logging.info(f"Processed email {mail['id']} and categorized as {category}")
logging.info(f"Processed {len(mails)} emails.")
update_last_run_time()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment