Last active
August 16, 2023 01:14
-
-
Save tomasoak/81949ef962e8b61786be2355d81153f5 to your computer and use it in GitHub Desktop.
edb011_activity_03
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Main ETL script | |
Formato table final | |
Nome do Banco | |
CNPJ | |
Classificação do Banco | |
Quantidade de Clientes do Bancos | |
Índice de reclamações | |
Quantidade de reclamações | |
Índice de satisfação dos funcionários dos bancos | |
Índice de satisfação com salários dos funcionários dos bancos. | |
""" | |
import io | |
import polars as pl | |
import boto3 | |
s3 = boto3.client("s3") | |
bucket_input_name = "edb011-input" | |
bancos_path_filename = "Bancos/EnquadramentoInicia_v2.tsv" | |
empregados_path_filename = "Empregados/glassdoor_consolidado_join_match_v2.csv" | |
class Extract: | |
def __init__(self, file_name=None): | |
self.bucket = bucket_input_name | |
self.file_name = file_name | |
def read_file(self, separator=None): | |
obj = s3.get_object(Bucket=self.bucket, Key=self.file_name) | |
df = pl.read_csv(obj["Body"], separator=separator) | |
return df | |
def _transform(): | |
"""Minimal transformation to be able to extract data""" | |
COLUMN_TYPES = {"Quantidade total de clientes \x96 CCS e SCR": pl.Int64(), | |
"Quantidade de clientes \x96 CCS": pl.Int64(), | |
"Quantidade de clientes \x96 SCR": pl.Int64()} | |
COLUMN_RENAME = {"Quantidade total de clientes \x96 CCS e SCR": "Quantidade total de clientes CCS e SCR", | |
"Quantidade de clientes \x96 CCS": "Quantidade de clientes CCS", | |
"Quantidade de clientes \x96 SCR": "Quantidade de clientes SCR"} | |
return COLUMN_TYPES, COLUMN_RENAME | |
def read_multi_files(self): | |
COLUMN_TYPES, COLUMN_RENAME = Extract._transform() | |
bucket_input_obj = boto3.resource('s3').Bucket(bucket_input_name) | |
prefix_objs = bucket_input_obj.objects.filter(Prefix="Reclamacoes/") | |
df = pl.concat([(pl.read_csv(io.BytesIO(obj.get()['Body'].read()), | |
encoding="latin1", | |
separator=";", | |
dtypes=COLUMN_TYPES) | |
.rename(COLUMN_RENAME)) | |
for obj in prefix_objs]) | |
return df | |
def transform(): | |
"""Join needed tables, transform and export it""" | |
BANK_NAME = { | |
"BANCO TOKYO": "BANCO TOKYO-MITSUBISHI BM S.A.", | |
"MERCEDES": "MERCEDES-BENZ" | |
} | |
bancos = Extract(bancos_path_filename).read_file("\t") | |
bancos = ( | |
bancos.with_columns(pl.col("Nome").str.split_exact("-", 0)).unnest("Nome").rename({"field_0": "Nome"}) | |
.with_columns(pl.col("Nome").str.strip(" ")) | |
.with_columns(pl.col("Nome").str.replace("BANCO TOKYO", "BANCO TOKYO-MITSUBISHI BM S.A.")) | |
.with_columns(pl.col("Nome").str.replace("MERCEDES", "MERCEDES-BENZ")) | |
) | |
empregados = Extract(empregados_path_filename).read_file("|") | |
bancos_empregados = bancos.join(empregados, on="Nome", how="left") | |
bancos_empregados = bancos_empregados.with_columns(pl.col("CNPJ").cast(pl.Utf8)) | |
bancos_empregados = bancos_empregados.groupby(["CNPJ", "Nome"]).agg([pl.mean("Remuneração e benefícios") | |
, pl.mean("Perspectiva positiva da empresa(%)")]) | |
reclamacoes = Extract().read_multi_files() | |
reclamacoes = reclamacoes.groupby(["CNPJ IF", "Segmento"]).agg([pl.sum("Quantidade total de reclamações") | |
, pl.sum("Quantidade total de clientes CCS e SCR")]) | |
df = bancos_empregados.join(reclamacoes, left_on="CNPJ", right_on="CNPJ IF") | |
df = df.rename({"Nome": "nome", "CNPJ": "cnpj", | |
"Segmento": "classificacao_banco", | |
"Quantidade total de clientes CCS e SCR": "quantidade_total_clientes", | |
"Quantidade total de reclamações": "quantidade_total_reclamacoes", | |
"Remuneração e benefícios": "idx_satisfacao_salario"}) | |
df = df.with_columns((pl.col("quantidade_total_reclamacoes") / pl.col("quantidade_total_clientes")).alias("idx_reclamacoes")) | |
df.select(pl.col(["nome", "cnpj", "classificacao_banco", "quantidade_total_clientes", "idx_reclamacoes", | |
"quantidade_total_reclamacoes", "idx_satisfacao_salario"])) | |
return df | |
def export(): | |
"""Exports data as csv to S3 Bucket""" | |
df = transform() | |
df.write_csv("final_table.csv", separator=",") | |
if __name__ == "__main__": | |
export() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment