Skip to content

Instantly share code, notes, and snippets.

@vrutik2809
Last active April 7, 2022 17:31
Show Gist options
  • Save vrutik2809/25cb32d2fd760bf8905a8e44ac2f6a17 to your computer and use it in GitHub Desktop.
Save vrutik2809/25cb32d2fd760bf8905a8e44ac2f6a17 to your computer and use it in GitHub Desktop.
Function prototypes for exporting tables from database
from sqlalchemy import create_engine
from sqlalchemy import inspect, Table, MetaData
from xlsxwriter import Workbook
# TODO replace variable with appropiate values
DB_URL = f"postgresql://{user}:{password}@{hostname}:{port}/{database-name}"
engine = create_engine(DB_URL)
def reflect_table(name, schema, engine, metadata=None, connection_to_use=None):
if metadata is None:
metadata = MetaData(bind=engine)
autoload_with = engine if connection_to_use is None else connection_to_use
return Table(name, metadata, schema=schema, autoload_with=autoload_with, extend_existing=True)
def export_table_to_dict(table_name,schema,engine):
with engine.connect() as connection:
inspector = inspect(connection)
if table_name not in inspector.get_table_names(schema=schema):
raise Exception(f'Table "{schema + "." + table_name}" not found')
table_data = {}
oid = inspector.get_table_oid(table_name,schema=schema)
table_data['table_oid'] = oid
table_data['table_name'] = table_name
table_data['table_schema'] = schema
table = reflect_table(table_name, schema, engine,connection_to_use=connection)
table_data['columns'] = []
for col in table.columns:
col_data = {}
col_data['name'] = col.name
col_data['type'] = col.type.__class__.__name__
col_data['nullable'] = col.nullable
col_data['primary_key'] = col.primary_key
col_data['autoincrement'] = col.autoincrement
col_data['default'] = col.default
table_data['columns'].append(col_data)
table_data['records'] = [dict(row._mapping.items()) for row in table.select().execute()]
return table_data
def export_view_to_dict(view_name,schema,engine):
with engine.connect() as connection:
inspector = inspect(connection)
if view_name not in inspector.get_view_names(schema=schema):
raise Exception(f'View "{schema + "." + view_name}" not found')
view_data = {}
view_data['view_name'] = view_name
view_data['view_schema'] = schema
table = reflect_table(view_name, schema, engine,connection_to_use=connection)
view_data['columns'] = []
for col in table.columns:
col_data = {}
col_data['name'] = col.name
col_data['type'] = col.type.__class__.__name__
col_data['nullable'] = col.nullable
col_data['primary_key'] = col.primary_key
col_data['autoincrement'] = col.autoincrement
col_data['default'] = col.default
view_data['columns'].append(col_data)
view_data['records'] = [dict(row._mapping.items()) for row in table.select().execute()]
return view_data
def export_schema_to_dict(schema,engine):
with engine.connect() as connection:
inspector = inspect(connection)
if schema not in inspector.get_schema_names():
raise Exception(f'Schema "{schema}" not found')
schema_data = {}
schema_data['schema_name'] = schema
schema_data['tables'] = []
for table_name in inspector.get_table_names(schema=schema):
table_data = export_table_to_dict(table_name,schema,engine)
schema_data['tables'].append(table_data)
return schema_data
def export_table_to_csv(table_name,schema,engine):
with engine.connect() as connection:
inspector = inspect(connection)
if table_name not in inspector.get_table_names(schema=schema):
raise Exception(f'Table "{schema + "." + table_name}" not found')
table = reflect_table(table_name, schema, engine, connection_to_use=connection)
csv_data = str()
for col in table.columns:
csv_data += col.name + ','
csv_data = csv_data[:-1]
csv_data += '\n'
for record in table.select().execute():
row = str()
for col in table.columns:
row += str(record[col.name]) + ','
row = row[:-1]
row += '\n'
csv_data += row
return csv_data
def export_table_to_tsv(table_name,schema,engine):
return export_table_to_csv(table_name,schema,engine).replace(',','\t')
def write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet:Workbook.worksheet_class):
with engine.connect() as connection:
inspector = inspect(connection)
if table_name not in inspector.get_table_names(schema=schema):
raise Exception(f'Table "{schema + "." + table_name}" not found')
table = reflect_table(table_name, schema, engine, connection_to_use=connection)
cols = table.columns
for col in range(len(cols)):
worksheet.write(0,col,cols[col].name)
row_idx = 1
for row in table.select().execute():
for col in range(len(cols)):
worksheet.write(row_idx,col,row[cols[col].name])
row_idx += 1
def export_table_to_xlsx(table_name,schema,engine,excel_file_name='test.xlsx'):
with engine.connect() as connection:
inspector = inspect(connection)
if table_name not in inspector.get_table_names(schema=schema):
raise Exception(f'Table "{schema + "." + table_name}" not found')
workbook = Workbook(excel_file_name)
worksheet = workbook.add_worksheet(table_name)
write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet)
workbook.close()
def export_schema_to_xlsx(schema,engine,excel_file_name='test.xlsx'):
with engine.connect() as connection:
inspector = inspect(connection)
if schema not in inspector.get_schema_names():
raise Exception(f'Schema "{schema}" not found')
workbook = Workbook(excel_file_name)
for table_name in inspector.get_table_names(schema=schema):
worksheet = workbook.add_worksheet(table_name)
write_table_data_to_xlsx_worksheet(table_name,schema,engine,worksheet)
workbook.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment