Created
January 21, 2022 00:26
-
-
Save guissalustiano/340ca8f3ca6a141860994eb5707b2739 to your computer and use it in GitHub Desktop.
extract data from http://ftp.dadosabertos.ans.gov.br and save in s3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from ftputil import FTPHost | |
import tempfile | |
import awswrangler as wr | |
import zipfile | |
from io import BytesIO | |
import pandas as pd | |
host = FTPHost('ftp.dadosabertos.ans.gov.br', 'anonymous') | |
BUCKET_NAME = 'vidi-gift' | |
bucket_path = f's3://{BUCKET_NAME}/' | |
def is_newer(path: str, time: datetime): | |
file_mtime = datetime.fromtimestamp(host.path.getmtime(path)) | |
return file_mtime > time | |
def is_file(path: str): | |
return host.path.isfile(path) | |
def list_newer(basepath: str, time: datetime): | |
for path in host.listdir(basepath)[::-1]: | |
complete_path = host.path.join(basepath, path) | |
if is_newer(complete_path, time): | |
yield complete_path | |
def list_recursive_newer(basepath: str, time: datetime): | |
for path in list_newer(basepath, time): | |
if is_file(path): | |
yield path | |
else: | |
yield from list_recursive_newer(path, time) | |
def extract_fpt_zip(source: str): | |
with tempfile.NamedTemporaryFile(suffix='.zip') as tmp: | |
host.download(source, tmp.name) | |
with zipfile.ZipFile(tmp,"r") as zip_ref: | |
for filename in zip_ref.namelist(): | |
if not filename.endswith('.csv'): | |
continue | |
yield filename, BytesIO(zip_ref.read(filename)) | |
def process(df): | |
# escrever processamento aqui | |
return df | |
if __name__ == '__main__': | |
time = datetime(2021, 11, 1) | |
path = 'FTP/PDA/informacoes_consolidadas_de_beneficiarios' | |
for ftpfile in list_recursive_newer(path, time): | |
for filename, file in extract_fpt_zip(ftpfile): | |
s3path = host.path.join(bucket_path, host.path.dirname(ftpfile), filename) | |
df = pd.read_csv(file, sep=";", encoding='cp1252') | |
df = process(df) | |
wr.s3.to_parquet( | |
df=df, | |
path=s3path, | |
dataset=True, | |
database="ans", | |
table="informacoes_consolidadas_de_beneficiarios" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment