Please see dotfiles/s3up.py
-
-
Save mtigas/764224 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# coding=utf-8 | |
# | |
# s3up.py | |
# 2010-2011, Mike Tigas | |
# http://mike.tig.as/ | |
# | |
# Usage: | |
# s3up filename | |
# Uploads the given file to the DEFAULT_BUCKET (see below) | |
# at the following path: | |
# files/YYYYMMDD/(filename) | |
# | |
# s3up filename [remote_directory] | |
# As above, except to the given directory: | |
# (remote_directory)/(filename) | |
# | |
# s3up filename [bucket] [remote_filename] [cache_time] | |
# s3up filename [bucket] [remote_filename] [cache_time] [policy] | |
# | |
# | |
# | |
# Please set the following options below before using: | |
# AWS_ACCESS_KEY_ID | |
# AWS_SECRET_ACCESS_KEY | |
# DEFAULT_BUCKET | |
# UPLOAD_PARALLELIZATION | |
# CHUNKING_MIN_SIZE | |
# CHUNK_RETRIES | |
import sys | |
import traceback | |
from mimetypes import guess_type | |
from datetime import datetime, timedelta | |
from time import sleep | |
from urllib import urlencode | |
from boto.s3.connection import S3Connection | |
import os | |
from cStringIO import StringIO | |
from threading import Thread | |
from math import floor | |
AWS_ACCESS_KEY_ID = '' | |
AWS_SECRET_ACCESS_KEY = '' | |
# When only giving one or two args, the following bucket is used: | |
DEFAULT_BUCKET = '' | |
# Number of simultaneous upload threads to execute. | |
UPLOAD_PARALLELIZATION = 4 | |
# Minimum size for a file chunk (except final chunk). Needs to be >= 5242880. | |
CHUNKING_MIN_SIZE = 5242880 | |
# For robustness, we can retry uploading any chunk up to this many times. (Set to | |
# 1 or less to only attempt one upload per chunk.) Because we chunk large uploads, | |
# an error in a single chunk doesn't necessarily mean we need to re-upload the | |
# entire thing. | |
CHUNK_RETRIES = 10 | |
# ========== "MultiPart" (chunked) upload utility methods ========== | |
def mem_chunk_file(local_file): | |
""" | |
Given the file at `local_file`, returns a generator of CHUNKING_MIN_SIZE | |
(default 5MB) StringIO file-like chunks for that file. | |
""" | |
fstat = os.stat(local_file) | |
fsize = fstat.st_size | |
num_chunks = int(floor(float(fsize) / 5242880.0)) | |
fp = open(local_file, 'rb') | |
for i in range(num_chunks): | |
if i == (num_chunks-1): | |
size_hint = 0 | |
else: | |
size_hint = fsize / num_chunks | |
tfp = StringIO() | |
tfp.writelines(fp.readlines(size_hint)) | |
tfp.seek(0) | |
yield tfp | |
fp.close() | |
def upload_worker(multipart_key, fp, index, headers=None): | |
""" | |
Uploads a file chunk in a MultiPart S3 upload. If an error occurs uploading | |
this chunk, retry up to `CHUNK_RETRIES` times. | |
""" | |
success = False | |
attempts = 0 | |
while not success: | |
try: | |
fp.seek(0) | |
multipart_key.upload_part_from_file(fp, index, headers=headers) | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except: | |
success = False | |
attempts += 1 | |
if attempts >= CHUNK_RETRIES: | |
break | |
sleep(0.5) | |
else: | |
success = True | |
if not success: | |
raise Exception("Upload of chunk %d failed after 5 retries." % index) | |
fp.close() | |
def upload_chunk(arg_list): | |
thread = Thread( | |
target=upload_worker, | |
args=arg_list | |
) | |
thread.daemon = False | |
thread.start() | |
return thread | |
# ========== Uploader methods ========== | |
def easy_up(local_file,rdir=None): | |
if os.path.isfile(local_file): | |
print "File:" | |
print os.path.abspath(local_file) | |
if not rdir: | |
rpath = "files/"+datetime.now().strftime("%Y%m%d") | |
else: | |
rpath = rdir | |
remote_path = rpath+"/"+os.path.basename(local_file) | |
upload_file(os.path.abspath(local_file), DEFAULT_BUCKET, remote_path,0) | |
print "File uploaded to:" | |
if BUCKET_CNAME: | |
print "%s/%s" % (BUCKET_CNAME, remote_path) | |
else: | |
print "https://s3.amazonaws.com/%s/%s" % (DEFAULT_BUCKET, remote_path) | |
else: | |
print "Path given is not a file." | |
def upload_file(local_file, bucket, remote_path, cache_time=0, policy="public-read", force_download=False): | |
# Expiration time: | |
cache_time = int(cache_time) | |
# Metadata that we need to pass in before attempting an upload. | |
content_type = guess_type(local_file, False)[0] or "application/octet-stream" | |
basic_headers = { | |
"Content-Type" : content_type, | |
} | |
if force_download: | |
basic_headers["Content-Disposition"] = "attachment; filename=%s"% os.path.basename(local_file) | |
# Set up a connection to S3 | |
s3 = S3Connection(aws_access_key_id=AWS_ACCESS_KEY_ID,aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
bucket = s3.get_bucket(bucket) | |
# Get info on the local file to determine whether it's large enough that we can perform | |
# upload parallelization. | |
fstat = os.stat(local_file) | |
fsize = fstat.st_size | |
# Can only chunk in increments of 5MB, so if the file size is smaller than that, fall back to | |
# the "standard" upload procedure. | |
if fsize <= CHUNKING_MIN_SIZE: | |
print "Standard upload: File size is under %.1f MB\n" % (CHUNKING_MIN_SIZE/1024**2) | |
key = bucket.new_key(remote_path) | |
key.content_type = content_type | |
key.set_contents_from_filename(local_file, policy=policy, headers=basic_headers) | |
else: | |
print "Parallelized upload\n" | |
mp_key = bucket.initiate_multipart_upload(remote_path, headers=basic_headers) | |
try: | |
# Chunk the given file into `CHUNKING_MIN_SIZE` (default: 5MB) chunks that can | |
# be uploaded in parallel. | |
chunk_generator = mem_chunk_file(local_file) | |
# Use `UPLOAD_PARALLELIZATION` (default: 4) threads at a time to churn through | |
# the `chunk_generator` queue. | |
active_threads = [] | |
for i, chunk in enumerate(chunk_generator): | |
args = (mp_key, chunk, i+1, basic_headers) | |
# If we don't have enough concurrent threads yet, spawn an upload thread to | |
# handle this chunk. | |
if len(active_threads) < UPLOAD_PARALLELIZATION: | |
# Upload this chunk in a background thread and hold on to the thread for polling. | |
t = upload_chunk(args) | |
active_threads.append(t) | |
# Poll until an upload thread finishes before allowing more upload threads to spawn. | |
while len(active_threads) >= UPLOAD_PARALLELIZATION: | |
for thread in active_threads: | |
# Kill threads that have been completed. | |
if not thread.isAlive(): | |
thread.join() | |
active_threads.remove(thread) | |
# a polling delay since there's no point in constantly waiting and taxing CPU | |
sleep(0.1) | |
# We've exhausted the queue, so join all of our threads so that we wait on the last pieces | |
# to complete uploading. | |
for thread in active_threads: | |
thread.join() | |
except: | |
# Since we have threads running around and possibly partial data up on the server, | |
# we need to clean up before propogating an exception. | |
sys.stderr.write("Exception! Waiting for existing child threads to stop.\n\n") | |
for thread in active_threads: | |
thread.join() | |
# Remove any already-uploaded chunks from the server. | |
mp_key.cancel_upload() | |
for mp in bucket.list_multipart_uploads(): | |
if mp.key_name == remote_path: | |
mp.cancel_upload() | |
# Propogate the error. | |
raise | |
else: | |
# We finished the upload successfully. | |
mp_key.complete_upload() | |
key = bucket.get_key(mp_key.key_name) | |
# ===== / chunked upload ===== | |
if cache_time != 0: | |
key.set_metadata('Cache-Control','max-age=%d, must-revalidate' % int(cache_time)) | |
else: | |
key.set_metadata('Cache-Control','no-cache, no-store') | |
if policy is "public-read": | |
key.make_public() | |
else: | |
key.set_canned_acl(policy) | |
def main(args): | |
if len(args) == 5: | |
upload_file(args[0],args[1],args[2],args[3],args[4]) | |
elif len(args) == 4: | |
upload_file(args[0],args[1],args[2],args[3]) | |
elif len(args) == 3: | |
upload_file(args[0],args[1],args[2]) | |
elif len(args) == 2: | |
easy_up(args[0],args[1]) | |
elif len(args) == 1: | |
easy_up(args[0],None) | |
else: | |
print "Usage:" | |
print "s3up filename" | |
print " Uploads the given file to DEFAULT_BUCKET (%s) at the following path:" % DEFAULT_BUCKET | |
print " files/YYYYMMDD/(filename)" | |
print "s3up filename [remote_directory]" | |
print " As above, except the file is uploaded to the given directory:" | |
print " (remote_directory)/(filename)" | |
print "s3up filename [bucket] [remote_filename] [cache_time]" | |
print "s3up filename [bucket] [remote_filename] [cache_time] [policy]" | |
if __name__ == '__main__': | |
try: | |
main(sys.argv[1:]) | |
except Exception, e: | |
sys.stderr.write('\n') | |
traceback.print_exc(file=sys.stderr) | |
sys.stderr.write('\n') | |
sys.exit(1) |
S3 always ignores the file size of the last chunk, so this is either due to some counting error regarding the number of chunks the file actually needs or the file was modified while being read.
I'll experiment with this to make sure I haven't done anything stupid here.
I also tried a much higher value for CHUNKING_MIN_SIZE . one where filesize mod CHUNKING_MIN_SIZE = 0
Then one time i got internal server error, and the next time, after the upload, nothing happened. no more network activity, nothing, so i killed it. Finally i split the file into 4GB chunks and uploaded it via s3cmd. FWIW I was uploading from an ec2 instance to S3.
this upload works only for text files as "file.readlines" is used ("If given an optional parameter sizehint, it reads that many bytes from the file and enough more to complete a line, and returns the lines from that."). Therefore, the chunking is only kind of explicit.
To make it more explicit for binary files you can simply replace:
if i == (num_chunks-1):
size_hint = 0
else:
size_hint = fsize / num_chunks
tfp = StringIO()
tfp.writelines(fp.readlines(size_hint))
tfp.seek(0)
yield tfp
with
size = fsize / num_chunks
tfp = StringIO()
if i == (num_chunks-1):
tfp.write(fp.read())
else:
tfp.write(fp.read(size))
tfp.seek(0)
yield tfp
Traceback (most recent call last):
File "s3up.py", line 284, in
main(sys.argv[1:])
File "s3up.py", line 264, in main
easy_up(args[0],args[1])
File "s3up.py", line 148, in easy_up
if BUCKET_CNAME:
NameError: global name 'BUCKET_CNAME' is not defined
I see this message when I am uploading a 6 GB file. Am I missing some arguments ?
Where should I set the BUCKET_CNAME ???
Please help...
when trying this with a 24GB file ... based on my data transfer log, this was probably the last chunk