Created
October 19, 2012 17:31
-
-
Save sajal/3919506 to your computer and use it in GitHub Desktop.
S3Output for storing disco reduce items directly to Amazon S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from disco.worker.classic.func import task_output_stream | |
class S3Output(object): | |
""" | |
Output stream for S3. | |
key from each reduce becomes key name in S3 | |
If gzip param is True, it gzips before uploading. | |
AWS_KEY, AWS_SECRET and BUCKET_NAME in params is required. | |
""" | |
def __init__(self,stream,params): | |
self.aws_key = params["AWS_KEY"] | |
self.aws_secret = params["AWS_SECRET"] | |
self.bucket_name = params["BUCKET_NAME"] | |
self.go_gzip = params.get("gzip", False) | |
def add(self,key,val): | |
from boto.s3.connection import S3Connection | |
from boto.s3.key import Key | |
s3conn = S3Connection(self.aws_key, self.aws_secret) | |
bucket = s3conn.get_bucket(self.bucket_name) | |
k = Key(bucket) | |
if self.go_gzip: | |
import gzip | |
from cStringIO import StringIO | |
out = StringIO() | |
f = gzip.GzipFile(fileobj=out, mode='w') | |
f.write(val) | |
f.close() | |
out.reset() | |
k.key = key + ".gz" | |
k.set_contents_from_file(out) | |
else: | |
k.key = key | |
k.set_contents_from_string(val) | |
def close(self): | |
pass | |
def s3_output(stream, partition, url, params): | |
from s3output import S3Output | |
return S3Output(stream,params) | |
s3_output_stream = (task_output_stream, s3_output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment