Skip to content

Instantly share code, notes, and snippets.

@rodolfo42
Created June 12, 2024 21:44
Show Gist options
  • Save rodolfo42/cd8d54134ee822b41450713aea405093 to your computer and use it in GitHub Desktop.
Save rodolfo42/cd8d54134ee822b41450713aea405093 to your computer and use it in GitHub Desktop.
Carregando Dataframes no BigQuery
from google.colab import userdata
from google.oauth2 import service_account
from google.cloud import bigquery
import json
import pandas as pd
import pyarrow as pa
# Configura o client do BigQuery
# precisa ter o secret configurado no Colab, chave 'svc_account'
credentials = service_account.Credentials.from_service_account_info(
info=json.loads(userdata.get('svc_account'), strict=False), scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
client = bigquery.Client(credentials=credentials)
# Substitui valores None ou NaN por valores padrão
def replace_none_with_default(df):
for col in df.columns:
if df[col].dtype == 'int64':
df[col] = df[col].fillna(0)
elif df[col].dtype == 'float64':
df[col] = df[col].fillna(0.0)
elif df[col].dtype == 'object':
df[col] = df[col].fillna('')
return df
# Converte colunas de data para o formato datetime
def convert_date_columns(df, date_columns):
for date_col in date_columns:
if date_col in df.columns:
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
return df
def get_date_columns(bq_schema):
return [field.name for field in bq_schema if field.field_type in ['DATE', 'DATETIME', 'TIME', 'TIMESTAMP']]
# Function to convert BigQuery SchemaField to pyarrow field
def bq_to_pyarrow_field(bq_field):
field_type = bq_field.field_type.lower()
if field_type == 'string':
arrow_type = pa.string()
elif field_type == 'int64':
arrow_type = pa.int64()
elif field_type == 'integer':
arrow_type = pa.int64()
elif field_type == 'float':
arrow_type = pa.float64()
elif field_type == 'float64':
arrow_type = pa.float64()
elif field_type == 'bool':
arrow_type = pa.bool_()
elif field_type == 'boolean':
arrow_type = pa.bool_()
elif field_type == 'timestamp':
arrow_type = pa.timestamp('us')
elif field_type == 'date':
arrow_type = pa.date32()
elif field_type == 'datetime':
arrow_type = pa.timestamp('s')
elif field_type == 'time':
arrow_type = pa.time32('us')
elif field_type == 'bytes':
arrow_type = pa.binary()
elif field_type == 'numeric':
arrow_type = pa.decimal128(38, 9)
else:
raise ValueError(f"Unsupported field type: {field_type}")
return pa.field(bq_field.name, arrow_type, nullable=bq_field.is_nullable)
# Function to convert BigQuery schema to pyarrow schema
def bq_schema_to_pyarrow_schema(bq_schema):
pyarrow_fields = [bq_to_pyarrow_field(field) for field in bq_schema]
return pa.schema(pyarrow_fields)
def prepare_df(df, bq_schema):
pa_schema = bq_schema_to_pyarrow_schema(bq_schema)
df = convert_date_columns(df, get_date_columns(bq_schema))
return pa.Table.from_pandas(df, preserve_index=False).cast(pa_schema).to_pandas()
def load_bq(df, schema, table_name, write_disposition="WRITE_EMPTY"):
job_config = bigquery.LoadJobConfig(
schema=schema,
write_disposition=write_disposition
)
job = client.load_table_from_dataframe(
df, table_name, job_config=job_config
) # Make an API request.
loaded_rows = job.result().output_rows
table_info = client.get_table(table_name)
print(
"Loaded {} rows and {} columns to {}, table now has {}".format(
loaded_rows, len(table_info.schema), table_name, table_info.num_rows
)
)
# Exemplo de uso
some_df = seu_dataframe_ja_modelado
# Configurar o schema
# Referência: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
schema = [
bigquery.SchemaField("ANO_MES", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("ID_ASSISTENCIA", bigquery.enums.SqlTypeNames.STRING)
]
# df_load é o dataframe que será carregado
df_load = prepare_df(some_df, schema_acionamentos)
# montar table_name no formato dataset.tabela (dataset precisa já existir no BigQuery)
table_name = "dataset.tabela"
# para carregar a tabela sem sobrescrever (erro se existir a tabela)
load_bq(df_load, schema, table_name)
# para carregar a tabela apenas acrescentando registros caso a tabela já exista
load_bq(df_load, schema, table_name, "WRITE_APPEND")
# para carregar a tabela sobrescrevendo tudo
load_bq(df_load, schema, table_name, "WRITE_TRUNCATE")
@rodolfo42
Copy link
Author

Output:

Loaded 929 rows and 23 columns to dataset.tabela, table now has 3716

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment