Created
January 14, 2020 12:03
-
-
Save juzna/120abd08e1f6cfc052660a829a788290 to your computer and use it in GitHub Desktop.
Bulk retrieve data from S3 Glacier storage class.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
import sys | |
import aiobotocore | |
import asyncio_pool | |
import botocore | |
import botocore.session, botocore.credentials | |
from botocore.exceptions import ClientError | |
from tqdm import tqdm | |
BUCKET = 'kiwi-datadog-logs-archive' | |
def patch_auth(): | |
# Patch the API to work with asyncio. | |
_orig_create_client = botocore.credentials.AssumeRoleCredentialFetcher._create_client | |
def _create_client(self): | |
frozen_credentials = self._source_credentials.get_frozen_credentials() | |
if not getattr(self, '_boto_session', None): | |
self._boto_session = botocore.session.Session() | |
return self._boto_session.create_client( | |
'sts', | |
aws_access_key_id=frozen_credentials.access_key, | |
aws_secret_access_key=frozen_credentials.secret_key, | |
aws_session_token=frozen_credentials.token, | |
) | |
botocore.credentials.AssumeRoleCredentialFetcher._create_client = _create_client | |
async def list_all_objects(c, prefix): | |
marker = '' | |
ret = [] | |
while True: | |
r = await c.list_objects(Bucket=BUCKET, Prefix=prefix, Marker=marker) | |
ret.extend(r['Contents']) | |
if r['IsTruncated']: | |
marker = r['Contents'][-1]['Key'] | |
else: | |
return ret | |
async def f_head(c, k): | |
x = await c.head_object(Bucket=BUCKET, Key=k) | |
return x.get('StorageClass'), x.get('Restore') | |
async def f_restore(c, k): | |
try: | |
await c.restore_object(Bucket=BUCKET, Key=k, RestoreRequest=dict(Days=3)) | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'RestoreAlreadyInProgress': | |
return | |
else: | |
raise | |
async def main(): | |
patch_auth() | |
action = sys.argv[1] | |
paths = sys.argv[2:] | |
if not paths: | |
raise ValueError('Run with list of path prefixes to restore.') | |
s = aiobotocore.get_session() | |
c = s.create_client('s3') | |
print("Listing objects...") | |
rs = [ | |
await list_all_objects(c, p) | |
for p in tqdm(paths) | |
] | |
ks = [ | |
o['Key'] | |
for r in rs | |
for o in r | |
] | |
print(f"Found {len(ks):_d} objects to restore") | |
# Run restore. | |
if action == 'retrieve': | |
with tqdm(total=len(ks)) as bar: | |
bar.update(0) | |
async def f(k): | |
await f_restore(c, k) | |
bar.update() | |
r = await asyncio_pool.AioPool(size=20).map(f, ks) | |
print(filter(None, r)) # print errors | |
elif action == 'status': | |
# ks = ks[0:100] | |
cnt = collections.Counter() | |
with tqdm(total=len(ks)) as bar: | |
bar.update(0) # Make sure it's shown. | |
async def fh(k): | |
sc, r = await f_head(c, k) | |
cnt.update((sc, r)) | |
bar.set_description_str(f"{cnt.most_common()}", refresh=False) | |
bar.update() | |
r = await asyncio_pool.AioPool(size=20).map(fh, ks) | |
print(cnt.most_common()) | |
print(filter(None, r)) # print errors | |
else: | |
raise ValueError("Bad action") | |
await c.close() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment