Created
August 16, 2022 17:55
-
-
Save jcary741/9a478419a9baa827b6956c061417eda8 to your computer and use it in GitHub Desktop.
Python upload files to S3 in parallel using concurrent.futures and boto3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An example of how to use boto3 and a concurrent.futures process pool to upload files in parallel. | |
Bonus: when uploading files ending in .gz, the Content-Encoding metadata will be set automatically. | |
""" | |
import os | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
import boto3 | |
def concurrent_s3_upload(files, bucket, prefix, metadata, remove_gz_extension=True): | |
""" | |
Upload a list of files to s3 concurrently | |
:param files: list of files to upload | |
:param bucket: s3 bucket to upload to | |
:param prefix: s3 prefix to upload to | |
:param metadata: s3 metadata to apply to each file | |
:param remove_gz_extension: Whether to remove the .gz extension from the filename and apply ContentEncoding=gzip | |
""" | |
prefix = prefix.strip('/') | |
batch_size = 1000 # to reduce memory usage, futures are submitted in batches | |
offset = 0 | |
completed = 0 | |
file_futures = dict() | |
failed = [] | |
# spawn slightly more workers than available cpus to keep the bottleneck at IO | |
with ProcessPoolExecutor(max_workers=int(os.cpu_count() * 1.5)) as executor: | |
while offset < len(files): | |
# submit futures for the next batch of files | |
futures = [] | |
for file in files[offset:offset + batch_size]: | |
future = executor.submit(_concurrent_s3_upload, file, bucket, prefix, metadata, remove_gz_extension) | |
futures.append(future) | |
# keep track of the futures, so we can retry them if they fail | |
file_futures[future] = file | |
for future in as_completed(futures): | |
try: | |
future.result() | |
completed += 1 | |
except Exception: | |
failed.append(file_futures[future]) | |
print(f'{completed}/{len(files)} uploaded. {len(failed)} failed, pending retry', end='\r') | |
offset += batch_size | |
# retry failed uploads | |
if not len(failed): | |
print(f'{completed}/{len(files)} uploaded. 0 failed, no retry') | |
else: | |
print(f'retrying {len(failed)} failed uploads') | |
futures = [] | |
for file in failed: | |
future = executor.submit(_concurrent_s3_upload, file, bucket, prefix, metadata, remove_gz_extension) | |
futures.append(future) | |
failed = [] | |
for future in as_completed(futures): | |
try: | |
future.result() | |
completed += 1 | |
# keep track of the futures, so we can list them if they fail | |
file_futures[future] = file | |
except Exception as e: | |
failed.append(file_futures[future]) | |
print(f'{completed}/{len(files)} uploaded, {len(failed)} failed', end='\r') | |
if len(failed) > 0: | |
print(f'{len(failed)} failed uploads:') | |
for file in failed: | |
print(file) | |
s3 = None | |
def _concurrent_s3_upload(file, bucket, prefix, metadata=None, remove_gz_extension=True): | |
global s3 | |
extra_args = {} | |
if metadata is not None: | |
extra_args['Metadata'] = metadata | |
s3_path = f'{prefix}/{file}' | |
# check if the file has the gz extension, if so remove it | |
if remove_gz_extension and file.endswith('.gz'): | |
s3_path = f'{prefix}/{file[:-3]}' | |
# See also: s3transfer/manager.py:159 | |
extra_args['ContentEncoding'] = 'gzip' | |
if s3_path.endswith('.json'): | |
extra_args['ContentType'] = 'application/json' | |
if s3 is None: | |
s3 = boto3.resource('s3') | |
s3.Bucket(bucket).upload_file(file, s3_path, ExtraArgs=extra_args) | |
return f's3://{bucket}/{s3_path}' | |
os.chdir(os.path.dirname(os.path.realpath(__file__))) | |
files = glob.glob('path/to/files/*.json.gz') | |
concurrent_s3_upload(files, 'mybucket', f'/custom/prefix/', {}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment