In my case, I download about 80k jpg files from S3 with three different method:
method | total download time | avg. files download/sec |
---|---|---|
Seq | ~7hr | 3.2 |
Async | ~40min | 35 |
Async+MP | ~10min | 135 |
In my case, I download about 80k jpg files from S3 with three different method:
method | total download time | avg. files download/sec |
---|---|---|
Seq | ~7hr | 3.2 |
Async | ~40min | 35 |
Async+MP | ~10min | 135 |
import os | |
import asyncio | |
import aiobotocore | |
import io | |
from PIL import Image | |
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
async def getS3Obj(client, s3_bucket, s3_key): | |
"""send request and retrieve the obj from S3""" | |
resp = await client.get_object( | |
Bucket=s3_bucket, | |
Key=s3_key | |
) | |
obj = await resp['Body'].read() | |
return obj | |
async def downloadImg(client, s3_bucket, s3_key, save_path): | |
"""convert the obj to image and save""" | |
obj = await getS3Obj(client, s3_bucket, s3_key) | |
img = Image.open(io.BytesIO(obj)) | |
img.save(save_path) | |
async def go(loop, download_list): | |
"""launch the process to download multiple files""" | |
session = aiobotocore.get_session(loop=loop) | |
async with session.create_client( | |
's3', | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
tasks = [downloadImg(client, *file_) for file_ in download_list] | |
await asyncio.gather(*tasks) | |
if __name__ == "__main__": | |
download_list = [ | |
("s3_bucket1", "s3_key1", "save_path1"), | |
("s3_bucket2", "s3_key2", "save_path2"), | |
("s3_bucket3", "s3_key3", "save_path3") | |
] | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(go(loop, download_list)) |
import os | |
import asyncio | |
import aiobotocore | |
import multiprocessing as mp | |
import io | |
from PIL import Image | |
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
async def getS3Obj(client, s3_bucket, s3_key): | |
"""send request and retrieve the obj from S3""" | |
resp = await client.get_object( | |
Bucket=s3_bucket, | |
Key=s3_key | |
) | |
obj = await resp['Body'].read() | |
return obj | |
async def downloadImg(client, s3_bucket, s3_key, save_path): | |
"""convert the obj to image and save""" | |
obj = await getS3Obj(client, s3_bucket, s3_key) | |
img = Image.open(io.BytesIO(obj)) | |
img.save(save_path) | |
async def go(loop, download_list): | |
"""launch the process to download multiple files""" | |
session = aiobotocore.get_session(loop=loop) | |
async with session.create_client( | |
's3', | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
tasks = [downloadImg(client, *file_) for file_ in download_list] | |
await asyncio.gather(*tasks) | |
def singleProcess(download_list_): | |
"""mission for single process""" | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(go(loop, download_list_)) | |
if __name__ == "__main__": | |
download_list = [ | |
("s3_bucket1", "s3_key1", "save_path1"), | |
("s3_bucket2", "s3_key2", "save_path2"), | |
("s3_bucket3", "s3_key3", "save_path3") | |
] | |
# number of process | |
no_mp = 8 # or no_mp = mp.cpu_count() | |
# number of tasks | |
no_tasks = 16 | |
download_list_chunk = [download_list[i::no_tasks] for i in range(no_tasks)] | |
with mp.Pool(no_mp) as p: | |
p.map(singleProcess, download_list_chunk) |
import os | |
import botocore.session | |
import io | |
from PIL import Image | |
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
def getS3Obj(client, s3_bucket, s3_key): | |
"""send request and retrieve the obj from S3""" | |
resp = client.get_object( | |
Bucket=s3_bucket, | |
Key=s3_key | |
) | |
obj = resp['Body'].read() | |
return obj | |
def downloadImg(client, s3_bucket, s3_key, save_path): | |
"""convert the obj to image and save""" | |
obj = getS3Obj(client, s3_bucket, s3_key) | |
img = Image.open(io.BytesIO(obj)) | |
img.save(save_path) | |
def go(download_list): | |
"""launch the process to download multiple files""" | |
session = botocore.session.get_session() | |
with session.create_client( | |
's3', | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
tasks = [downloadImg(client, *file_) for file_ in download_list] | |
if __name__ == "__main__": | |
download_list = [ | |
("s3_bucket1", "s3_key1", "save_path1"), | |
("s3_bucket2", "s3_key2", "save_path2"), | |
("s3_bucket3", "s3_key3", "save_path3") | |
] | |
go(download_list) |