Last active
November 24, 2023 15:23
-
-
Save milanaleksic/556a01fb2d6a9916919beb74a12e660a to your computer and use it in GitHub Desktop.
Download GMAILFS email attachments from Gmail
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: Gmail doesn't allow usage of service accounts for _personal GMail accounts_, you need OAuth client key | |
import base64 | |
import json | |
import logging | |
import os.path | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
def get_attachments(service, msg_id): | |
message = service.users().messages().get(userId='me', id=msg_id).execute() | |
subject = [x['value'] for x in message['payload']['headers'] if x.get('name') == 'Subject'][0] | |
if subject.startswith('GMAILFS:'): | |
filepath = subject[10:subject[10:].find('[')+10-1] | |
if os.path.isfile(filepath): | |
print(f'Skip {filepath}, already there') | |
return | |
else: | |
print(f'Analyzing {filepath}') | |
d = os.path.dirname(filepath) | |
file = os.path.basename(filepath) | |
if d != '' and not os.path.isdir(d): | |
try: | |
os.makedirs(d, exist_ok=True) | |
except: | |
logging.exception("failed to create dir: " + d) | |
if os.path.isfile(file): | |
# peviously downoaded into root | |
os.rename(file, filepath) | |
return | |
for part in message['payload']['parts']: | |
if part['filename']: | |
if 'data' in part['body']: | |
data = part['body']['data'] | |
else: | |
att_id = part['body']['attachmentId'] | |
att = service.users().messages().attachments().get(userId='me', messageId=msg_id, | |
id=att_id).execute() | |
data = att['data'] | |
file_data = base64.urlsafe_b64decode(data.encode('UTF-8')) | |
try: | |
with open(filepath, 'wb') as f: | |
f.write(file_data) | |
except: | |
logging.exception("failed to save an email with path: " + filepath + ", json:" + json.dumps(message)) | |
def authenticate_and_get_service(): | |
# Set the scopes for Gmail API | |
scopes = ['https://www.googleapis.com/auth/gmail.readonly'] | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'client_secret_....apps.googleusercontent.com.json', | |
scopes=scopes | |
) | |
credentials = flow.run_local_server(port=0) | |
# Save the credentials for the next run | |
with open('token.json', 'w') as token: | |
token.write(credentials.to_json()) | |
# Create a Gmail API service client | |
service = build('gmail', 'v1', credentials=credentials) | |
return service | |
if __name__ == '__main__': | |
service = authenticate_and_get_service() | |
pageToken = None | |
while True: | |
messages = service.users().messages().list(userId='me', maxResults=50, pageToken=pageToken).execute() | |
print('Messages downloaded: ', len(messages['messages'])) | |
for msg in messages['messages']: | |
get_attachments(service, msg['id']) | |
if messages.get('nextPageToken', None) is None: | |
break | |
pageToken = messages['nextPageToken'] | |
service.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
google-api-python-client | |
google_auth_oauthlib |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment