Instantly share code, notes, and snippets.
Created
December 5, 2023 11:12
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save obsessedcake/cc19eb4a3a3f843e4d8a642438955b2d to your computer and use it in GitHub Desktop.
2019-11-07
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import pickle | |
import os.path | |
import traceback | |
from apiclient.http import MediaIoBaseDownload | |
from googleapiclient.discovery import build | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from collections import defaultdict | |
from pathlib import Path | |
class GDriveSharedFolderApi: | |
# If modifying these scopes, delete the file token.pickle. | |
SCOPES = ['https://www.googleapis.com/auth/drive'] | |
''' Initialization ''' | |
def init(self): | |
creds = self.__load_credentials() | |
# If there are no (valid) credentials available, let the user log in. | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
creds = flow.run_local_server() | |
self.__safe_credentials(creds) | |
self.service = build('drive', 'v3', credentials=creds) | |
def __load_credentials(self): | |
credentials = None | |
# The file token.pickle stores the user's access and refresh tokens, and is | |
# created automatically when the authorization flow completes for the first | |
# time. | |
if os.path.exists('token.pickle'): | |
with open('token.pickle', 'rb') as token: | |
credentials = pickle.load(token) | |
return credentials | |
def __safe_credentials(self, credentials): | |
# Save the credentials for the next run | |
with open('token.pickle', 'wb') as token: | |
pickle.dump(credentials, token) | |
''' API wrappers ''' | |
def get_items(self, driver_or_folder_id, page_token=None, page_size=1000): | |
results = self.service.files().list( | |
fields = 'nextPageToken, files(id, name, mimeType, size)', | |
pageToken = page_token, | |
pageSize = page_size, | |
q = f"'{driver_or_folder_id}' in parents" | |
).execute() | |
return results.get('files', []) | |
def get_media(self, file_id): | |
return self.service.files().get_media(fileId=file_id) | |
def is_folder(self, mime_type): | |
return mime_type == 'application/vnd.google-apps.folder' | |
class Downloader: | |
def __init__(self, api): | |
self.api = api | |
def download(self, output_folder, driver_id): | |
self.base_folder_len = len(output_folder) + 1 # Plus '/' | |
output_folder = Path(output_folder) | |
self.current_folder = output_folder | |
self.current_folder.mkdir(exist_ok=True, parents=True) | |
self.__load_data(output_folder) | |
try: | |
self.__walkThoughDriver(driver_id) | |
except: | |
print(traceback.format_exc()) | |
pass | |
self.__save_data(output_folder) | |
''' Helpful data ''' | |
def __load_data(self, output_folder): | |
file = output_folder / 'file_names.bin' | |
if file.exists(): | |
with file.open('rb') as f: | |
self.file_names = pickle.load(f) | |
else: | |
self.file_names = [] | |
def __save_data(self, output_folder): | |
with open( output_folder / 'file_names.bin', 'wb') as f: | |
pickle.dump(self.file_names, f) | |
''' Recurcive walker ''' | |
def __walkThoughDriver(self, driver_or_folder_id): | |
items = self.api.get_items(driver_or_folder_id) | |
items.sort(key=lambda i: i['name']) | |
self.__fix_duplicates(items) | |
for item in items: | |
id = item['id'] | |
name = item['name'] | |
if self.api.is_folder(item['mimeType']): | |
self.__handleFolder(id, name) | |
else: | |
self.__handleFile(id, name, int(item.get('size', 0))) # Temp hack for doc files. | |
def __handleFolder(self, folder_id, folder_name): | |
self.current_folder /= folder_name | |
try: | |
# Yes, somehow this shit can throw an exception, ha-ha-ha... | |
self.current_folder.mkdir(exist_ok=True, parents=True) | |
except: | |
pass | |
self.__walkThoughDriver(folder_id) | |
self.current_folder = self.current_folder.parent | |
def __handleFile(self, file_id, file_name, file_size): | |
real_file_path = self.current_folder / file_name | |
drive_path = self.__get_drive_path(real_file_path) | |
if drive_path in self.file_names: | |
print(f"'{drive_path}' file was skipped because it is already downloaded.") | |
return | |
if file_size == 0: | |
print(f"'{drive_path}' file was skipped because it has 0B size.") | |
return | |
try: | |
self.__downloadFile(real_file_path, drive_path, file_id) | |
except: | |
print(f"Failed to download the '{drive_path}' file.") | |
print(traceback.format_exc()) | |
return | |
self.file_names.append(drive_path) | |
''' File downloader ''' | |
def __downloadFileInMemory(self, real_file_path, drive_path, file_id): | |
target = self.api.get_media(file_id) | |
fh = io.BytesIO() | |
downloader = MediaIoBaseDownload(fh, target) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f"Downloaded {int(status.progress() * 100)}% of '{drive_path}' file...\r", end='', flush=True) | |
print(f"Finished downloading '{drive_path}' file.") | |
with open(real_file_path, 'wb') as f: | |
fh.seek(0) | |
f.write(fh.read()) | |
def __downloadFile(self, real_file_path, drive_path, file_id): | |
with open(real_file_path, 'wb') as f: | |
target = self.api.get_media(file_id) | |
downloader = MediaIoBaseDownload(f, target) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f"Downloaded {int(status.progress() * 100)}% of '{drive_path}' file...\r", end='', flush=True) | |
print(f"Finished downloading '{drive_path}' file.") | |
''' Utils ''' | |
def __get_drive_path(self, path): | |
return str(path)[self.base_folder_len:] | |
def __fix_duplicates(self, seq): | |
# Collect all duplicates. | |
tally = defaultdict(list) | |
for i, item in enumerate(seq): | |
tally[item['name']].append(i) | |
# Fix all found duplicates. | |
for locs in tally.values(): | |
if len(locs) > 1: | |
for loc in locs: | |
item = seq[loc] | |
item['name'] = f"{item['name']}-{item['id']}" | |
if __name__ == '__main__': | |
api = GDriveSharedFolderApi() | |
api.init() | |
downloader = Downloader(api) | |
downloader.download('your_output_folder', 'your_driver_id') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
From these SO threads:
Because there can be files with same names: