Skip to content

Instantly share code, notes, and snippets.

@guissalustiano
Created January 21, 2022 00:26
Show Gist options
  • Save guissalustiano/340ca8f3ca6a141860994eb5707b2739 to your computer and use it in GitHub Desktop.
Save guissalustiano/340ca8f3ca6a141860994eb5707b2739 to your computer and use it in GitHub Desktop.
extract data from http://ftp.dadosabertos.ans.gov.br and save in s3
from datetime import datetime
from ftputil import FTPHost
import tempfile
import awswrangler as wr
import zipfile
from io import BytesIO
import pandas as pd
host = FTPHost('ftp.dadosabertos.ans.gov.br', 'anonymous')
BUCKET_NAME = 'vidi-gift'
bucket_path = f's3://{BUCKET_NAME}/'
def is_newer(path: str, time: datetime):
file_mtime = datetime.fromtimestamp(host.path.getmtime(path))
return file_mtime > time
def is_file(path: str):
return host.path.isfile(path)
def list_newer(basepath: str, time: datetime):
for path in host.listdir(basepath)[::-1]:
complete_path = host.path.join(basepath, path)
if is_newer(complete_path, time):
yield complete_path
def list_recursive_newer(basepath: str, time: datetime):
for path in list_newer(basepath, time):
if is_file(path):
yield path
else:
yield from list_recursive_newer(path, time)
def extract_fpt_zip(source: str):
with tempfile.NamedTemporaryFile(suffix='.zip') as tmp:
host.download(source, tmp.name)
with zipfile.ZipFile(tmp,"r") as zip_ref:
for filename in zip_ref.namelist():
if not filename.endswith('.csv'):
continue
yield filename, BytesIO(zip_ref.read(filename))
def process(df):
# escrever processamento aqui
return df
if __name__ == '__main__':
time = datetime(2021, 11, 1)
path = 'FTP/PDA/informacoes_consolidadas_de_beneficiarios'
for ftpfile in list_recursive_newer(path, time):
for filename, file in extract_fpt_zip(ftpfile):
s3path = host.path.join(bucket_path, host.path.dirname(ftpfile), filename)
df = pd.read_csv(file, sep=";", encoding='cp1252')
df = process(df)
wr.s3.to_parquet(
df=df,
path=s3path,
dataset=True,
database="ans",
table="informacoes_consolidadas_de_beneficiarios"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment