-
-
Save Bklyn/958105208fbd1d7bec49 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
""" | |
sqla2hdfstore | |
=============== | |
Input: | |
sqlalchemy uri | |
Output: | |
pytables HDFStore | |
""" | |
import logging | |
import pandas | |
import sqlalchemy as sa | |
logging.basicConfig(level=logging.DEBUG) | |
log = logging.getLogger() | |
def sqla2hdfstore(db_uri, hdf_uri, mode='a', complib='blosc'): | |
""" | |
Read a sqlite database into an HDFStore with pandas and pytables. | |
caveat emptor: | |
* this will likely exceed RAM capacity with large datasets | |
* there is no progressbar | |
Args: | |
db_uri (str): `SQLAlchemy connection uri | |
<http://docs.sqlalchemy.org/en/rel_0_9/dialects/>`_ | |
(e.g. ``db_uri = 'oracle://user:pass@dsn'``) | |
hdf_uri (str): Local path to `.h5` HDF5 pytables database | |
(e.g. ``hdf_uri = './example.h5'``) | |
mode (str): ``{'a', 'w', 'r', 'r+'}, default 'a'`` | |
(see ``pandas.HDFStore.__doc__``) | |
complib (str): ``{'zlib', 'bzip2', 'lzo', 'blosc', None},`` | |
default: ``blosc`` | |
Returns: | |
pandas.HDFStore: An open HDFStore instance with keys | |
SQLAlchemy database table names | |
""" | |
log.debug("creating engine") | |
engine = sa.create_engine(db_uri) | |
store = pandas.HDFStore(hdf_uri, mode=mode, complib=complib) | |
for table_name in engine.table_names(): | |
query = sa.sql.select('*', from_obj=table_name) | |
log.debug(table_name) | |
log.debug(str(query)) | |
con = engine.raw_connection() | |
store[table_name] = pandas.read_sql(str(query), con) | |
return store | |
import unittest | |
import random | |
import string | |
class StringGenerator(object): | |
RAND_CHARS = {c: chr(c) for c in | |
(ord(char) for char in (string.ascii_lowercase + '_'))} | |
def __init__(self, chars=None, seed=None): | |
random.seed(seed) | |
self.chars = chars if chars else self.RAND_CHARS | |
def randstr(self, n): | |
def _rand_in_set(n, chars): | |
_len = len(chars) - 1 | |
_keys = chars.keys() | |
for i in xrange(n): | |
_uhh = _keys[random.randint(0, _len)] | |
c = chars[_uhh] | |
yield c | |
return u''.join( | |
_rand_in_set(n, | |
self.chars)) | |
def generate_sqlitedb(db_uri): | |
table_names = ['one', 'two'] | |
import datetime | |
import dataset | |
sg = StringGenerator() | |
db = dataset.connect(db_uri) | |
for table_name in table_names: | |
table = db[table_name] | |
table.insert_many( | |
(dict( | |
time=datetime.datetime.now(), | |
n=n, | |
key=sg.randstr(8), | |
value=sg.randstr(8))) | |
for n in xrange(100) | |
) | |
return True | |
class Test_sqla2hdfstore(unittest.TestCase): | |
conf = { | |
'db_uri': 'sqlite:///test_sql2hdfstore.sqlite', | |
'hdf_path': 'test_sql2hdfstore.h5'} | |
def test_00_randstr(self): | |
sg = StringGenerator() | |
output = sg.randstr(10) | |
print(output) | |
self.assertTrue(output) | |
def test_20_generate_sqlitedb(self): | |
# TODO: conditional | |
output = generate_sqlitedb(self.conf['db_uri']) | |
self.assertTrue(output) | |
def test_90_sqla2hdfstore(self): | |
db_uri = self.conf['db_uri'] | |
hdf_path = self.conf['hdf_path'] | |
generate_sqlitedb(self.conf['db_uri']) | |
store = sqla2hdfstore(db_uri, hdf_path) | |
self.assertTrue(store) | |
keys = store.keys() | |
for key in keys: | |
print(key, len(store[key])) | |
def main(): | |
import optparse | |
import logging | |
prs = optparse.OptionParser( | |
usage="%prog [-t/--test] [-i <uri>] [-o <path>]") | |
prs.add_option('-i', '--sqlalchemy-uri', | |
dest='sqlalchemy_uri', | |
action='store') | |
prs.add_option('-o', '--hdf-path', | |
dest='hdf_path', | |
default='hdfstore.h5', | |
action='store') | |
prs.add_option('-v', '--verbose', | |
dest='verbose', | |
action='store_true',) | |
prs.add_option('-q', '--quiet', | |
dest='quiet', | |
action='store_true',) | |
prs.add_option('-t', '--test', | |
dest='run_tests', | |
action='store_true',) | |
(opts, args) = prs.parse_args() | |
if not opts.quiet: | |
logging.basicConfig() | |
if opts.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
if opts.run_tests: | |
import sys | |
sys.argv = [sys.argv[0]] + args | |
import unittest | |
sys.exit(unittest.main()) | |
else: | |
if opts.sqlalchemy_uri is None: | |
raise prs.error("-i/--sqlalchemy-uri required") | |
if opts.hdf_path is None: | |
raise prs.error("-o/--hdf-path required") | |
store = sqla2hdfstore(opts.sqlalchemy_uri, opts.hdf_path) | |
store | |
return 0 | |
if __name__ == "__main__": | |
import sys | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment