Created
January 11, 2019 20:20
-
-
Save sjakthol/19367500519a8828ec77ef5d34b1b0b9 to your computer and use it in GitHub Desktop.
Python code snippet for listing bucket objects in parallel.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""List S3 bucket objects in parallel. | |
This module contains a parallel implementation of S3 list_objects_v2() | |
API call. The implementation lists objects under every distinct prefix | |
in parallel. Hence, a speedup is achieved if objects are spread under | |
multiple distinct prefixes. | |
""" | |
import argparse | |
import concurrent.futures | |
import functools | |
import logging | |
import multiprocessing | |
import threading | |
import botocore | |
import botocore.session | |
LOGGER = logging.getLogger('list_objects') | |
@functools.lru_cache() | |
def _get_s3_client(_): | |
"""Get thread-specific S3 client. | |
Args: | |
_ (int): thread identifier | |
Returns: | |
(S3.Client) botocore S3 client | |
""" | |
session = botocore.session.get_session() | |
return session.create_client('s3') | |
def _list_objects_v2(Bucket, Prefix='/', Delimiter='/'): #pylint: disable=invalid-name | |
"""List ALL objects of bucket in given prefix. | |
Args: | |
:Bucket (str): the name of the bucket to list | |
:Prefix (str, optional): a prefix of the bucket to list (Default: None) | |
:Delimiter (str, optional): delimeter used to separate directories in S3 (Default: /) | |
Returns: | |
obj: The list of objects and directories under the given Prefix:: | |
{ | |
'Contents': [{ | |
'Key': 'prefix/file.json', | |
'LastModified': datetime.datetime(2018, 12, 13, 14, 15, 16, 000000), | |
'ETag': '"58bcd9641b1176ea012b6377eb5ce050"' | |
'Size': 262756, | |
'StorageClass': 'STANDARD' | |
}], | |
'CommonPrefixes': [{ | |
'Prefix': 'prefix/another/ | |
}] | |
} | |
""" | |
s3_client = _get_s3_client(threading.current_thread()) | |
paginator = s3_client.get_paginator('list_objects_v2') | |
objects = [] | |
prefixes = [] | |
LOGGER.debug('Starting to list s3://%s/%s', Bucket, Prefix) | |
for resp in paginator.paginate(Bucket=Bucket, Prefix=Prefix, Delimiter=Delimiter): | |
objects.extend(resp.get('Contents', [])) | |
prefixes.extend(resp.get('CommonPrefixes', [])) | |
return {'Contents': objects, 'CommonPrefixes': prefixes} | |
def list_objects_parallel(Bucket, Prefix='/', Delimiter='/', Parallelism=None): #pylint: disable=invalid-name | |
"""List objects of a bucket in parallel. | |
The bucket must have a directory structure for speedups to be | |
realized (each common prefix is listed in parallel). | |
Args: | |
:Bucket (str): the name of the bucket to list | |
:Prefix (str): a prefix of the bucket to list (Default: None) | |
:Delimiter (str): delimeter used to separate directories in S3 (Default: /) | |
:Parallelism (int, optional): the number of threads to use (Default: 10xCPUs) | |
Returns: | |
obj: The list of objects under the given bucket / prefix:: | |
{ | |
'Contents': [{ | |
'Key': 'prefix/file.json', | |
'LastModified': datetime.datetime(2018, 12, 13, 14, 15, 16, 000000), | |
'ETag': '"58bcd9641b1176ea012b6377eb5ce050"' | |
'Size': 262756, | |
'StorageClass': 'STANDARD' | |
}] | |
} | |
""" | |
objects = [] | |
tasks = set() | |
if not Parallelism: | |
# Heavily oversubscribe the CPU as these operations are mostly bound to | |
# network | |
Parallelism = multiprocessing.cpu_count() * 10 | |
with concurrent.futures.ThreadPoolExecutor(max_workers=Parallelism) as tpe: | |
tasks.add(tpe.submit(_list_objects_v2, Bucket=Bucket, Prefix=Prefix, Delimiter=Delimiter)) | |
while tasks: | |
done, _ = concurrent.futures.wait(tasks, return_when='FIRST_COMPLETED') | |
for future in done: | |
res = future.result() | |
objects.extend(res['Contents']) | |
for prefix in res['CommonPrefixes']: | |
tasks.add( | |
tpe.submit(_list_objects_v2, Bucket=Bucket, Prefix=prefix['Prefix'], Delimiter=Delimiter)) | |
tasks = tasks - done | |
return {'Contents': objects} | |
def main(): | |
"""Entrypoint""" | |
parser = argparse.ArgumentParser(description=''' | |
Parallel S3 bucket listing utility. | |
''') | |
parser.add_argument('-b', '--bucket', type=str, required=True) | |
parser.add_argument('-p', '--prefix', type=str) | |
parser.add_argument('-l', '--log-level', type=str, default='INFO') | |
args = parser.parse_args() | |
logging.basicConfig(level=args.log_level.upper()) | |
import json | |
import time | |
start = time.time() | |
res = list_objects_parallel(args.bucket, args.prefix) | |
logging.info('Finished in %s seconds', time.time() - start) | |
for obj in res.get('Contents', []): | |
print(json.dumps(obj, default=str)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I was exactly looking for something like this. This was helped me reduce the a retrieval task from 38 sec ( using bucket.obj.filter ) to 9 sec. Thanks you very much 🙏