Created
December 6, 2017 17:38
-
-
Save myselfhimself/ca488f2cb52673c7f04a57f5d91a5b7a to your computer and use it in GitHub Desktop.
Convert an online BigQuery dataset's table schema into a PostgreSQL (eg. on CloudSQL) CREATE TABLE command
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.cloud import bigquery | |
""" | |
Use this to convert an online BigQuery dataset's table schema into a PostgreSQL (eg. on CloudSQL) CREATE TABLE command. | |
""" | |
def get_table_schema(project_name, dataset_name, table_name): | |
"""Return 'project_name.dataset_name.table_name''s BigQuery table schema object.""" | |
bq_client = bigquery.Client(project=project_name) | |
dataset = bq_client.dataset(dataset_name) | |
table_ref = dataset.table(table_name) | |
table = bq_client.get_table(table_ref) | |
return table.schema | |
def bq_to_pg_field_type(field_type): | |
"""Return an equivalent variable type from BigQuery to PostgreSQL | |
TODO: support more types. | |
""" | |
if field_type == 'STRING': | |
return 'TEXT' | |
if field_type == 'INTEGER': | |
return 'INTEGER' | |
if field_type == 'FLOAT': | |
return 'REAL' | |
else: | |
raise TypeError( | |
'UNKNOWN TYPE: {}. Unable to convert that BigQuery field type to PostgreSQL. Giving up.'.format(field_type)) | |
def bq_schema_to_pg_query(bq_schema, destination_table_name, primary_keys, add_if_not_exists=True): | |
"""Return a PostgreSQL CREATE TABLE query as string from a given BigQuery schema object.""" | |
pg_query = "CREATE TABLE {} {} ({}, PRIMARY KEY({}));" | |
pg_fields_definition = [] | |
for field in bq_schema: | |
pg_fields_definition.append(field.name + ' ' + bq_to_pg_field_type(field.field_type)) | |
return pg_query.format("IF NOT EXISTS" if add_if_not_exists else "", destination_table_name, | |
", ".join(pg_fields_definition), ", ".join(primary_keys)) | |
def get_pg_schema_from_bq_table(project_name, dataset_name, table_name, destination_table_name=None, primary_keys=[], | |
add_if_not_exists=True): | |
"""Return a PostgreSQL CREATE Table query string, from a given BigQuery dataset's table attributes. | |
If destination_table_name is omitted, defaults to table_name. | |
""" | |
if destination_table_name is None: | |
destination_table_name = table_name | |
if not isinstance(primary_keys, (list, tuple)): | |
raise TypeError('primary_keys argument must be iterable') | |
schema = get_table_schema(project_name, dataset_name, table_name) | |
return bq_schema_to_pg_query(schema, destination_table_name, primary_keys, add_if_not_exists) | |
if __name__ == '__main__': | |
""" | |
Example: | |
print get_pg_schema_from_bq_table('some_project', 'dataset', 'table', 'dest_table', ['my_primary_key'], True) | |
Yields: | |
CREATE TABLE IF NOT EXISTS dest_table | |
(my_primary_key INTEGER, some_field TEXT, another_field REAL, PRIMARY KEY(my_primary_key)); | |
""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment