Skip to content

Instantly share code, notes, and snippets.

@guissalustiano
Created October 3, 2023 20:49
Show Gist options
  • Save guissalustiano/496c9172756a098335ff9e5a228be4d0 to your computer and use it in GitHub Desktop.
Save guissalustiano/496c9172756a098335ff9e5a228be4d0 to your computer and use it in GitHub Desktop.
# Based in https://github.com/cuducos/chunk
from asyncio import Semaphore, gather, run
import httpx
from loguru import logger
url = "https://dadosabertos.rfb.gov.br/CNPJ/Empresas1.zip"
default_chunk_size = 2**20 # 1MB
# DefaultMaxRetries sets the maximum download attempt for each chunk
default_max_retries = 32
# DefaultMaxParallel sets the maximum parallels downloads per server
default_max_parallel = 16
# DefaultTimeout sets the timeout for each HTTP request
default_timeout = 3 * 60 * 1000 # 3 minutes
def chunk_range(content_length: int, chunk_size: int) -> list[tuple[int, int]]:
"""Split the content length into a list of chunk ranges"""
return [(i * chunk_size, min((i + 1) * chunk_size - 1, content_length - 1)) for i in range(content_length // chunk_size + 1)]
# from https://stackoverflow.com/a/64283770
async def download(
url: str,
chunk_size: int = default_chunk_size,
max_retries: int = default_max_retries,
max_parallel: int = default_max_parallel,
timeout: int = default_timeout
) -> bytes:
request_head = httpx.head(url)
assert request_head.status_code == 200
assert request_head.headers["accept-ranges"] == "bytes"
content_length = int(request_head.headers["content-length"])
logger.info(f"Downloading {url} with {content_length} bytes / {chunk_size} chunks and {max_parallel} parallel downloads")
# TODO: pool http connections
semaphore = Semaphore(max_parallel)
tasks = [download_chunk(url, (start, end), max_retries, timeout, semaphore) for start, end in chunk_range(content_length, chunk_size)]
return b"".join(await gather(*tasks))
async def download_chunk(
url: str,
chunk_range: tuple[int, int],
max_retries: int,
timeout: int,
semaphore: Semaphore
) -> bytes:
async with semaphore:
logger.info(f"Downloading chunk {chunk_range[0]}-{chunk_range[1]}")
for i in range(max_retries):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
headers = {"Range": f"bytes={chunk_range[0]}-{chunk_range[1]}"}
response = await client.get(url, headers=headers)
response.raise_for_status()
return response.content
except httpx.HTTPError as e:
logger.warning(f"Download failed with {e}. Retrying ({i+1}/{max_retries})...")
raise httpx.HTTPError(f"Download failed after {max_retries} retries")
async def main():
content = await download(url)
with open("Empresas1.zip", "wb") as f:
f.write(content)
if __name__ == "__main__":
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment