-
-
Save luke/5697511 to your computer and use it in GitHub Desktop.
import logging | |
import cStringIO | |
import csv | |
DEBUG = False | |
def data2csv(data): | |
si = cStringIO.StringIO() | |
cw = csv.writer(si, delimiter='\t',lineterminator="\n") | |
for row in data: | |
r = [ (x is None and '\N' or x) for x in row] | |
cw.writerow(r) | |
si.seek(0) | |
return si # .getvalue() | |
def upsert(cursor, table_name, selector_fields, setter_fields, data): | |
csv_data = data2csv(data) | |
sql_template = """ | |
WITH updates AS ( | |
UPDATE %(target)s t | |
SET %(set)s | |
FROM source s | |
WHERE %(where_t_pk_eq_s_pk)s | |
RETURNING %(s_pk)s | |
) | |
INSERT INTO %(target)s (%(columns)s) | |
SELECT %(source_columns)s | |
FROM source s LEFT JOIN updates t USING(%(pk)s) | |
WHERE %(where_t_pk_is_null)s | |
GROUP BY %(s_pk)s | |
""" | |
statement = sql_template % dict( | |
target = table_name, | |
set = ',\n'.join(["%s = s.%s" % (x,x) for x in setter_fields]), | |
where_t_pk_eq_s_pk = ' AND '.join(["t.%s = s.%s" % (x,x) for x in selector_fields]), | |
s_pk = ','.join(["s.%s" % x for x in selector_fields]), | |
columns = ','.join([x for x in selector_fields+setter_fields]), | |
source_columns = ','.join(['s.%s' % x for x in selector_fields+setter_fields]), | |
pk = ','.join(selector_fields), | |
where_t_pk_is_null = ' AND '.join(["t.%s IS NULL" % x for x in selector_fields]), | |
t_pk = ','.join(["t.%s" % x for x in selector_fields])) | |
if DEBUG: | |
logging.debug(statement) | |
# with cursor as cur: | |
cur = cursor | |
cur.execute('CREATE TEMP TABLE source(LIKE %s INCLUDING ALL) ON COMMIT DROP;' % table_name); | |
cur.copy_from(csv_data, 'source', columns=selector_fields+setter_fields) | |
cur.execute(statement) | |
cur.execute('DROP TABLE source') | |
csv_data.close() |
Thanks so much for this!! A couple tweaks I made:
- took me a while to realize that this code doesn't show the
connection.commit()
command. Added it in at the very end and promptly stopped tearing my hair out. - since I'm upserting files that are already tab-delimited and sitting on my hard drive, I was able to skip the
data2csv()
step. Briefly:- format the file without quotes and with the correct null character (which you can also specify in the
cur.copy_from()
command on line 51 if\\N
doesn't work for your use case) - on line 18,
csv_data = open(filename, "r")
- format the file without quotes and with the correct null character (which you can also specify in the
Thank you for sharing!
Thanks for this, really helpful! Don't you need some aggregation due to the group by for the setter_fields? I get an error without specifying MAX or something
can you please give an example of how to use this bulkinsert?
Could you please share usage . Than will be really helpful.
Thanks a lot, that's a smarter way to do it!. I had to do a couple of modification for my use case. First, the cStringIO library is deprecated so i replaced to io, and all works the same. I added a function to manage the encoding, because i use utf-8 in database, but the files i load are in cp1252. The group by clause dont't work due there are not aggregation, as said by @chemeng, but in my case is don'n nedeed.
Nice! I've actually just decided to go with the same solution after realizing the COPY FROM STDIN/STDOUT is easier to use than anticipated. Every day I use Postgres I love it a little more...