Skip to content

Instantly share code, notes, and snippets.

@arowla
Created December 11, 2015 16:47
Show Gist options
  • Save arowla/f93b28f014feb4acb180 to your computer and use it in GitHub Desktop.
Save arowla/f93b28f014feb4acb180 to your computer and use it in GitHub Desktop.
api.data.gov Elasticsearch JSON -> CSV multithreaded converter
import csv, json, sys, codecs, cStringIO, multiprocessing
class UnicodeWriter:
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
'''writerow(unicode) -> None
This function takes a Unicode string and encodes it to the output.
'''
self.writer.writerow([unicode(s).encode("utf-8") for s in row])
data = self.queue.getvalue()
data = data.decode("utf-8")
data = self.encoder.encode(data)
self.stream.write(data)
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
pool = multiprocessing.Pool(4)
output = UnicodeWriter(sys.stdout)
def process_line(line):
output.writerow(get_record(line).values())
def get_record(line):
return json.loads(line)['_source']
with open(sys.argv[1]) as fh:
# write the header line, then back up the file to the beginning
data = get_record(fh.readline())
output.writerow(data.keys())
# now multiprocess all lines
pool.map(process_line, fh)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment