Created
November 20, 2018 03:10
-
-
Save brookskindle/fe1fdffe622eb0cd8995d7d1d68d6b8f to your computer and use it in GitHub Desktop.
Populate a Postgres table with information from HOTSLogs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Build a heroes of the storm database | |
""" | |
import csv | |
import logging | |
import time | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy import Column, Integer, Boolean, create_engine | |
from sqlalchemy.orm import sessionmaker | |
logging.basicConfig(level=logging.DEBUG) | |
engine = create_engine("postgresql://brooks:@localhost/hots") | |
Session = sessionmaker(bind=engine) | |
Base = declarative_base() | |
OBJECTS_PER_COMMIT = 100_000 | |
class Character(Base): | |
__tablename__ = "character" | |
id = Column(Integer, primary_key=True) | |
replay_id = Column(Integer, nullable=False) | |
is_winner = Column(Boolean, nullable=False) | |
team_level = Column(Integer, nullable=False) | |
exp_contribution = Column(Integer, nullable=False) | |
def __str__(self): | |
s = ( | |
"<Character: Replay={}, Winner={}, Lvl={}, Exp={}>" | |
.format(self.replay_id, self.is_winner, self.team_level, | |
self.exp_contribution | |
) | |
) | |
return s | |
def __repr__(self): | |
return str(self) | |
@staticmethod | |
def from_dict(d): | |
"""Returns a new Character from a dict""" | |
c = Character() | |
c.replay_id = d["ReplayID"] | |
c.is_winner = bool(int(d["Is Winner"])) | |
c.team_level = d["In Game Level"] | |
c.exp_contribution = d["Experience Contribution"] | |
return c | |
@staticmethod | |
def as_dict(d): | |
""" | |
Given a verbose dictionary, return the dictionary keys for a character | |
""" | |
newdict = { | |
"replay_id": d["ReplayID"], | |
"is_winner": bool(int(d["Is Winner"])), | |
"team_level": d["In Game Level"], | |
"exp_contribution": d["Experience Contribution"], | |
} | |
return newdict | |
def bulk_insert(dictionaries): | |
"""Bulk insert the given character dictionaries into the database | |
We only care about a small subset of columns, so we're going to ignore the | |
rest | |
""" | |
# Using a traditional add_all and commit() takes ~1.2s per 1,000 inserts. | |
# This is much too slow to insert 12million rows | |
# session = Session() | |
# session.add_all([Character.from_dict(d) for d in dictionaries]) | |
# session.commit() | |
# Perform a bulk insert instead | |
# https://stackoverflow.com/a/34344200 | |
engine.execute( | |
Character.__table__.insert(), | |
[Character.as_dict(d) for d in dictionaries], | |
) | |
def main(): | |
logger = logging.getLogger(__name__) | |
# Base.metadata.drop_all(engine) # TODO: only use this when testing | |
Base.metadata.create_all(engine) | |
filename = "/home/brooks/data/hots/ReplayCharacters.csv" | |
with open(filename) as file: | |
logger.debug(f"Successfully opened {filename}") | |
reader = csv.DictReader(file) | |
lines_to_insert = [] | |
start = time.time() | |
per_commit_start = time.time() | |
for i, line in enumerate(reader): | |
lines_to_insert.append(line) | |
if i != 0 and i % OBJECTS_PER_COMMIT == 0: | |
bulk_insert(lines_to_insert) | |
lines_to_insert = [] | |
end = time.time() | |
logger.debug( | |
f"Insert {OBJECTS_PER_COMMIT} objs in " | |
f"{end - per_commit_start:.3f}s " | |
f"({end - start:.3f}s total for {i} inserts)." | |
) | |
per_commit_start = time.time() | |
bulk_insert(lines_to_insert) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment