Last active
February 18, 2021 18:27
-
-
Save Forevka/b31ae42fee2b4dd029d1558604488425 to your computer and use it in GitHub Desktop.
Generate Data models for python from postgres db
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION public.generate_dataclass(text, BOOLEAN) | |
returns TABLE("table_name_generated" text, "declaration_generated" text) | |
AS $function$ | |
DECLARE | |
select_start_def text default ''; | |
select_end_def text default ''; | |
property_def text default ''; | |
import_def text default 'import datetime'||chr(10)||'import typing'||chr(10)||'from dataclasses import dataclass'||chr(10)||chr(10); | |
full_def text default chr(10); | |
class_def text default ''; | |
column_def text default ''; | |
schema_name_ ALIAS FOR $1; | |
separate_table alias for $2; | |
table_ RECORD; | |
column_ RECORD; | |
name_part TEXT; | |
t_name TEXT; | |
is_first_column BOOLEAN DEFAULT true; | |
t_name_array TEXT[]; | |
tables_ CURSOR (sc_name text) FOR SELECT table_name FROM information_schema.tables WHERE table_schema = sc_name; | |
columns_ CURSOR (t_name text) FOR SELECT column_name, data_type, is_nullable from INFORMATION_SCHEMA.COLUMNS where table_name = t_name; | |
BEGIN | |
open tables_(schema_name_); | |
LOOP | |
FETCH tables_ INTO table_; | |
EXIT WHEN NOT FOUND; | |
t_name := ''; | |
select into t_name_array regexp_split_to_array(table_.table_name, '_'); | |
foreach name_part in array t_name_array loop | |
t_name := t_name || upper(substring(name_part from 1 for 1)) || substring(name_part from 2); | |
end loop; | |
class_def = ''; | |
--test := test || table_.table_name; | |
class_def := '@dataclass'|| chr(10) ||'class ' || t_name || ':' || chr(10); | |
select_start_def := chr(9) || '__select__ = """ select '; | |
select_end_def := ' from ' || TABLE_.table_name || '"""' || chr(10); | |
is_first_column := true; | |
class_def := class_def; | |
property_def := ''; | |
open columns_(table_.table_name); | |
LOOP | |
FETCH columns_ INTO column_; | |
EXIT WHEN NOT FOUND; | |
if is_first_column = false then | |
select_start_def := select_start_def || ', "' || COLUMN_.column_name || '"'; | |
ELSE | |
select_start_def := select_start_def || '"' || COLUMN_.column_name || '"'; | |
end if; | |
is_first_column := false; | |
select | |
case | |
when column_.data_type = 'uuid' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'UUID'||']' else 'UUID' end | |
when column_.data_type = 'bigint' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'int'||']' else 'int' end | |
when column_.data_type = 'integer' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'int'||']' else 'int' end | |
when column_.data_type = 'timestamp without time zone' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'datetime.datetime'||']' else 'datetime.datetime' end | |
when column_.data_type = 'boolean' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'bool'||']' else 'bool' end | |
when column_.data_type = 'text' then COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'str'||']' else 'str' end | |
else COLUMN_.column_name || ': ' || case when column_.is_nullable = 'YES' then 'typing.Optional['||'typing.Any'||']' else 'typing.Any' end | |
end as cn | |
into column_def; | |
raise notice '%', column_def; | |
property_def := property_def || chr(9) || column_def || chr(10); | |
END LOOP; | |
class_def := class_def || property_def || chr(10) || select_start_def || select_end_def; | |
close columns_; | |
raise notice E'%', class_def; | |
full_def := full_def || class_def || chr(10); | |
if separate_table = TRUE THEN | |
return query select t_name as table_name_generated, CONCAT(import_def, full_def) as declaration_generated; | |
full_def := chr(10); | |
end if; | |
END LOOP; | |
close tables_; | |
raise notice '%', import_def||full_def; | |
if separate_table = FALSE THEN | |
return query select 'FullDump' as table_name_generated, CONCAT(import_def, full_def) as declaration_generated; | |
end if; | |
END; | |
$function$ | |
LANGUAGE plpgsql | |
; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import typing | |
import asyncio | |
import asyncpg | |
import re | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
postgres = { | |
'host': '', | |
'user': 'postgres', | |
'password': '', | |
'database': '', | |
'port': 5432, | |
} | |
pattern = re.compile(r'(?<!^)(?=[A-Z])') | |
def write_model_to_file(file_name: str, body: str): | |
f = open(file_name, 'w') | |
f.write(body) | |
f.close() | |
async def get_pool(host: str, port: typing.Union[int, str], database: str, user: str, password: str): | |
return await asyncpg.create_pool( | |
user=user, | |
password=password, | |
database=database, | |
host=host, | |
) | |
async def generate(schema_name: str = 'public', separate_file: bool = True, model_directory: str = 'models', overwrite: bool = True, create_folder_if_not_exist: bool = True): | |
pool = await get_pool(**postgres) | |
if not os.path.isdir(model_directory): | |
if create_folder_if_not_exist: | |
os.mkdir(model_directory) | |
logging.info(f'Creating directory {model_directory}') | |
else: | |
raise FileNotFoundError(f"{model_directory} directory doesn't exist") | |
sql = f"""select * from "{schema_name}".generate_dataclass($1, $2)""" | |
logging.info(f'Data models will be writed into {"separated files" if separate_file else "one file called FullDump.py"}') | |
async with pool.acquire() as con: | |
logging.debug(f"Fetching results from db...") | |
result = await con.fetch(sql, schema_name, separate_file) | |
logging.debug(f"Fetched results from db") | |
for table_data in result: | |
t_name = table_data['table_name_generated'] | |
t_declaration = table_data['declaration_generated'] | |
logging.info(f"Processing {t_name}") | |
file_name = model_directory + '/' + pattern.sub('_', t_name).lower() + '.py' | |
if os.path.isfile(file_name): | |
if not overwrite: | |
logging.warning(f"{overwrite=} and file {file_name} is exist can't write file") | |
continue | |
write_model_to_file(file_name, t_declaration) | |
logging.info(f"Writed {t_name} to file {file_name}") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(generate(separate_file=True)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment