Created
October 29, 2012 18:40
-
-
Save sajal/3975607 to your computer and use it in GitHub Desktop.
Output stream for S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from disco.worker.classic.func import task_output_stream | |
class S3LineOutput(object): | |
""" | |
Output stream for S3. | |
key from each reduce becomes key name in S3 | |
If gzip param is True, it gzips before uploading. | |
AWS_KEY, AWS_SECRET and BUCKET_NAME in params is required. | |
""" | |
def __init__(self,stream,params,partition, url): | |
self.aws_key = params["AWS_KEY"] | |
self.aws_secret = params["AWS_SECRET"] | |
self.bucket_name = params["BUCKET_NAME"] | |
self.do_gzip = params.get("gzip", False) | |
self.do_unique = params.get("unique", False) | |
self.do_numeric_sort = params.get("numeric_sort") | |
self.partition = partition | |
self.files = {} | |
def add(self,key,val): | |
from tempfile import NamedTemporaryFile | |
if key not in self.files.keys(): | |
self.files[key] = NamedTemporaryFile(delete=False) | |
self.files[key].write(val) | |
def runcommand(self, command): | |
import subprocess | |
pipe = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True) | |
output = pipe.stdout.readlines() | |
sts = pipe.wait() | |
def gzip(self, filename): | |
self.runcommand("pigz --no-name %s" %(filename)) | |
return filename + ".gz" | |
def unique(self, filename): | |
import os | |
#sort -u -t, -k1,1 file | |
targetfile = "%s.sorted" %(filename) | |
self.runcommand("sort -u -k1,1 %s > %s" %(filename, targetfile)) | |
os.remove(filename) | |
return targetfile | |
def numeric_sort(self, filename, col): | |
import os | |
targetfile = "%s.sorted" %(filename) | |
self.runcommand("sort -r -n -k%s,%s %s > %s" %(col, col, filename, targetfile)) | |
os.remove(filename) | |
return targetfile | |
def close(self): | |
import os | |
from boto.s3.connection import S3Connection | |
from boto.s3.key import Key | |
s3conn = S3Connection(self.aws_key, self.aws_secret) | |
bucket = s3conn.get_bucket(self.bucket_name) | |
for key in self.files.keys(): | |
filename = self.files[key].name | |
#We dont need this filehandle anymore | |
self.files[key].close() | |
#One ip only once | |
if self.do_unique: | |
filename = self.unique(filename) | |
if self.do_numeric_sort: | |
filename = self.numeric_sort(filename, self.do_numeric_sort) | |
if self.do_gzip: | |
filename = self.gzip(filename) | |
key = key + ".gz" | |
#Upload the file. | |
k = Key(bucket) | |
k.key = key | |
k.set_contents_from_filename(filename) | |
#delete the file | |
os.remove(filename) | |
def s3_line_output(stream, partition, url, params): | |
from s3lineoutput import S3LineOutput | |
return S3LineOutput(stream,params,partition, url) | |
s3_line_output_stream = (task_output_stream, s3_line_output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment