Skip to content

Instantly share code, notes, and snippets.

@asvinours
Last active October 6, 2017 21:37
Show Gist options
  • Save asvinours/e4590456c21674a57e0838d38d75a55f to your computer and use it in GitHub Desktop.
Save asvinours/e4590456c21674a57e0838d38d75a55f to your computer and use it in GitHub Desktop.
Update storage class of objects on S3 from standard to standard_ia

Update storage class of object on S3

This is intended to be used on Lambda

  • First script can only update the storage class of files smaller than 5GB
  • Second one uses multipart_copy to split files in 10MB parts and copy them one by one
  • Third script tries to do the multipart_copy with a pool of threads. this one is not fully tested as Lambda does not give access to /dev/shm
from __future__ import print_function
import json, urllib, boto3, pprint
s3 = boto3.resource('s3')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
try:
o = s3.Object(bucket, key)
if o.storage_class != 'STANDARD_IA':
file_size = o.content_length
if file_size >= 5000000000:
print("Multi part copy is required, size is >= 5GB")
else:
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA')
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key))
else:
print("Received notify on {}, but storage class is {}".format(key, o.storage_class))
return
except Exception as e:
print(e)
print('Failed on B: {} K: {}.'.format(bucket, key))
raise e
from __future__ import print_function
import json, urllib, boto3, pprint, math
from multiprocessing.managers import BaseManager
from multiprocessing import Pool
class MyManager(BaseManager): pass
def Manager():
m = MyManager()
m.start()
return m
class S3MPU(object):
def __init__(self):
self._parts = {
'Parts': []
}
def add_part(self, part):
self._parts['Parts'].append(part)
def get_parts(self):
return self._parts
class Copier(object):
def __call__(self, args):
do_part_copy(args)
MyManager.register('S3MPU', S3MPU)
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
def do_part_copy(args):
print("Copying part {} to STANDARD_IA".format(part_num))
bucket, key, mpu_id, part_num, start_pos, end_pos, s3mpu = args
s3_client = boto3.client('s3')
res = s3_client.upload_part_copy(Bucket=bucket, CopySource={'Bucket': bucket, 'Key': key},
CopySourceRange='bytes=%d-%d' % (start_pos, end_pos),
Key=key,
PartNumber=part_num, UploadId=mpu_id)
s3mpu.add_part({
'PartNumber': part_num,
'ETag': res['CopyPartResult']['ETag']
})
print("Part {} copied to STANDARD_IA".format(part_num))
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
try:
o = s3.Object(bucket, key)
if o.storage_class != 'STANDARD_IA':
file_size = o.content_length
if file_size >= 5000000000:
print("Multi part copy is required, size is >= 5GB")
manager = Manager()
s3mpu = manager.S3MPU()
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key, StorageClass='STANDARD_IA')
byte_position = 0
part_size = 10 * math.pow(2.0, 20.0)
if file_size > part_size:
full_parts = int(file_size / part_size)
else:
full_parts = 1
part_size = file_size
total_parts = full_parts
if full_parts * part_size < file_size:
total_parts += 1
print("Number of parts: {}".format(total_parts))
# Generate arguments for invocations of do_part_copy
def gen_args(num_parts):
cur_pos = 0
for i in range(num_parts):
part_start = cur_pos
cur_pos = cur_pos + part_size
part_end = min(cur_pos - 1, file_size - 1)
part_num = i + 1
yield (bucket, key, mpu["UploadId"], part_num, part_start, part_end, s3mpu)
try:
print("Trying to create the processing pool")
pool = Pool(processes=10)
t1 = time.time()
print("Processing pool created")
pool.map_async(Copier(), gen_args(total_parts)).get(9999999)
s3_client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"], MultipartUpload=s3mpu.get_parts())
t2 = time.time() - t1
s = file_size/1024./1024.
print("Multi part copy is done")
print("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
except KeyboardInterrupt:
print("Received KeyboardInterrupt, canceling copy")
pool.terminate()
s3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"])
except Exception, err:
print("Encountered an error, canceling copy")
print(err)
s3_client.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"])
else:
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA')
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key))
else:
print("Received notify on {}, but storage class is {}".format(key, o.storage_class))
return
except Exception as e:
print(e)
print('Failed on B: {} K: {}.'.format(bucket, key))
raise e
from __future__ import print_function
import json, urllib, boto3, pprint, math
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
try:
o = s3.Object(bucket, key)
if o.storage_class != 'STANDARD_IA':
file_size = o.content_length
if file_size >= 5000000000:
print("Multi part copy is required, size is >= 5GB")
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key, StorageClass='STANDARD_IA')
byte_position = 0
i = 1
part_size = 10 * math.pow(2.0, 20.0)
if file_size > part_size:
full_parts = int(file_size / part_size)
else:
full_parts = 1
part_size = file_size
total_parts = full_parts
if full_parts * part_size < file_size:
total_parts += 1
print("Number of parts: ~ {}".format(total_parts))
part_info = {
'Parts': []
}
while byte_position < file_size:
lastbyte = byte_position + part_size - 1
if lastbyte > file_size:
lastbyte = file_size - 1
res = s3_client.upload_part_copy(Bucket=bucket, CopySource={'Bucket': bucket, 'Key': key},
CopySourceRange='bytes=%d-%d' % (byte_position, lastbyte),
Key=key,
PartNumber=i, UploadId=mpu["UploadId"])
part_info['Parts'].append({
'PartNumber': i,
'ETag': res['CopyPartResult']['ETag']
})
i = i + 1
byte_position += part_size
print("Part {} copied to STANDARD_IA".format(i))
s3_client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu["UploadId"], MultipartUpload=part_info)
print("Multi part copy is done")
else:
response = o.copy_from(CopySource={'Bucket': bucket, 'Key': key}, MetadataDirective='COPY', StorageClass='STANDARD_IA')
print("Setting object to STANDARD_IA: B: {}, K: {}".format(bucket, key))
else:
print("Received notify on {}, but storage class is {}".format(key, o.storage_class))
return
except Exception as e:
print(e)
print('Failed on B: {} K: {}.'.format(bucket, key))
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment