Skip to content

Instantly share code, notes, and snippets.

@samvv
Created December 1, 2023 15:28
Show Gist options
  • Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.
Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.
Download a file in Python 3 with progress reporting
#!/usr/bin/env python3
from hashlib import sha512
from pathlib import Path
from tarfile import TarFile
from tempfile import TemporaryDirectory
from typing import Callable, TypeVar, cast
from urllib.parse import urlparse, urlunparse, ParseResult
import argparse
import errno
import math
import os
import shutil
import sys
import urllib3
# Name of the app that is downloading things
# Used for e.g. creating a directory in the home directory of the user.
APP_NAME = 'myapp'
K = TypeVar('K')
V = TypeVar('V')
def omit(d: dict[K, V], *keys) -> dict[K, V]:
out = dict()
for k, v in d.items():
if k not in keys:
out[k] = v
return out
UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']
def humanbytes(bs: int):
if bs == 0:
return '0B'
i = math.floor(math.log(bs, 1024))
return f'{(bs / pow(1024, i)):.2f}{UNITS[i]}'
def rimraf(filepath: Path):
if filepath == Path.cwd():
raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory')
if filepath == filepath.root:
raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive')
if filepath == Path.home():
raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory')
shutil.rmtree(filepath, ignore_errors=True)
http = urllib3.PoolManager()
_homedir = Path.home() / ('.' + APP_NAME)
def mkdirp(path: str | Path) -> None:
Path(path).mkdir(parents=True, exist_ok=True)
def download(url: str, dest: Path | None = None, chunk_size = 1024 * 50) -> Path:
if isinstance(url, ParseResult):
parsed = url
url = urlunparse(url)
else:
url = str(url)
parsed = urlparse(url)
if dest is None:
extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:])
filename = sha512(url.encode('utf8')).hexdigest()
out_file_path = _homedir / 'downloads' / (filename + extnames)
out_file_final_path = _homedir / 'downloads' / (filename + extnames)
else:
out_file_path = dest.parent / (dest.name + '.downloading')
out_file_final_path = dest
if os.path.exists(out_file_final_path):
return out_file_final_path
mkdirp(out_file_path.parent)
try:
start_byte = out_file_path.stat().st_size
except OSError as e:
if e.errno == errno.ENOENT:
start_byte = 0
else:
raise e
request_headers = {
'Range': f'bytes={start_byte}-',
}
response = http.request('GET', url, preload_content=False, headers=request_headers)
total_bytes = int(response.headers['Content-Length']) if 'Content-Length' in response.headers else None
if 'Content-Range' not in response.headers:
start_byte = 0
rimraf(out_file_path)
bytes_read = start_byte
with open(out_file_path, 'ab') as f:
while True:
print(f'[{humanbytes(bytes_read)}] Downloading {url}', end='\r')
buf = response.read(chunk_size)
if not buf:
break
bytes_read += len(buf)
f.write(buf)
print(f'Downloaded {url}')
# sys.stdout.write('\n')
os.rename(out_file_path, out_file_final_path)
return out_file_final_path
T = TypeVar('T')
def ident(x: T) -> T: return x
def nonnull(value: T | None) -> T: return cast(T, value)
def extract_tar(tar: TarFile, dest, update_path: Callable[[str], str | None] | None = None, chunk_size=1024 * 50):
if update_path is None:
update_path = ident
dest_path = Path(dest)
while True:
f = tar.next()
if f is None:
break
updated_path = update_path(f.name)
if updated_path is None:
continue
out_path = dest_path / updated_path
if f.isfile():
try:
mtime = out_path.stat().st_mtime
except OSError as e:
if e.errno == errno.ENOENT:
mtime = None
else:
raise
if mtime is None or f.mtime > mtime:
print(f"Extracting {f.name}", end='\r')
mkdirp(out_path.parent)
with open(out_path, 'wb') as out, nonnull(tar.extractfile(f)) as i:
bytes_written = 0
while True:
if f.size > 1024 * 1024 * 5:
progress = bytes_written / f.size
print(f"[{progress:.2f}] Extracting {f.name}", end='\r')
buf = i.read(chunk_size)
if not buf:
break
bytes_written += len(buf)
out.write(buf)
os.utime(out_path, (f.mtime, f.mtime))
os.chmod(out_path, f.mode)
else:
print(f"Skipping {f.name}", end='\r')
elif f.isdir():
mkdirp(out_path)
os.utime(out_path, (f.mtime, f.mtime))
os.chmod(out_path, f.mode)
elif f.issym():
mkdirp(out_path.parent)
rimraf(out_path)
os.symlink(f.linkname, out_path)
else:
raise ValueError(f'unsupported tar entry for {f.name}')
def extract(filepath: Path, dest: Path, strip_path=0):
filepath = Path(filepath)
dest = Path(dest)
compression = []
def update_path(path: str):
chunks = path.split('/')
if len(chunks) > strip_path:
return os.path.sep.join(chunks[strip_path:])
with TemporaryDirectory('-extracted', prefix=filepath.stem) as d:
for ext in reversed(filepath.suffixes):
if ext == '.zip':
from zipfile import ZipFile
with ZipFile(filepath) as f:
f.extractall(d)
print(d)
break
elif ext == '.gz':
compression.append('gz')
elif ext == '.tar':
import tarfile
with tarfile.open(filepath, 'r') as tar:
extract_tar(tar, dest, update_path=update_path)
def is_extractable(path: Path) -> bool:
exts = path.name.split('.')[1:]
for ext in exts:
if ext == 'xz' or ext == 'gz' or ext == 'bz' or ext == 'bz2' or ext == 'lz':
continue
if ext == 'zip' or ext == 'tar':
return True
break
return False
def main(argv = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument('url', metavar='URL', help="The resource to download")
parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name")
parsed = parser.parse_args(argv)
url = urlparse(parsed.url)
basename = url.path.split('/')[-1]
dest = Path(basename if parsed.dest is None else parsed.dest)
if dest.is_dir():
dest /= basename
if os.path.exists(dest):
print(f'Error: file {dest} already exists')
return 1
download(url, dest)
if is_extractable(dest):
extract(dest, strip_path=1)
return 0
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python3
from hashlib import sha512
from pathlib import Path
from tarfile import TarFile
from tempfile import TemporaryDirectory
from typing import Callable, TypeVar, cast
from urllib.parse import urlparse, urlunparse, ParseResult
import argparse
import errno
import math
import os
import shutil
import sys
import urllib3
# Name of the app that is downloading things
# Used for e.g. creating a directory in the home directory of the user.
APP_NAME = 'myapp'
K = TypeVar('K')
V = TypeVar('V')
def omit(d: dict[K, V], *keys) -> dict[K, V]:
out = dict()
for k, v in d.items():
if k not in keys:
out[k] = v
return out
UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']
def humanbytes(bs: int):
if bs == 0:
return '0B'
i = math.floor(math.log(bs, 1024))
return f'{(bs / pow(1024, i)):.2f}{UNITS[i]}'
def rimraf(filepath: Path):
if filepath == Path.cwd():
raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory')
if filepath == filepath.root:
raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive')
if filepath == Path.home():
raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory')
shutil.rmtree(filepath, ignore_errors=True)
http = urllib3.PoolManager()
_homedir = Path.home() / ('.' + APP_NAME)
def mkdirp(path: str | Path) -> None:
Path(path).mkdir(parents=True, exist_ok=True)
def download(url: str, dest: Path | None = None, chunk_size = 1024 * 50) -> Path:
if isinstance(url, ParseResult):
parsed = url
url = urlunparse(url)
else:
url = str(url)
parsed = urlparse(url)
if dest is None:
extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:])
filename = sha512(url.encode('utf8')).hexdigest()
out_file_path = _homedir / 'downloads' / (filename + extnames)
out_file_final_path = _homedir / 'downloads' / (filename + extnames)
else:
out_file_path = dest.parent / (dest.name + '.downloading')
out_file_final_path = dest
if os.path.exists(out_file_final_path):
return out_file_final_path
mkdirp(out_file_path.parent)
try:
start_byte = out_file_path.stat().st_size
except OSError as e:
if e.errno == errno.ENOENT:
start_byte = 0
else:
raise e
request_headers = {
'Range': f'bytes={start_byte}-',
}
response = http.request('GET', url, preload_content=False, headers=request_headers)
total_bytes = int(response.headers['Content-Length']) if 'Content-Length' in response.headers else None
if 'Content-Range' not in response.headers:
start_byte = 0
rimraf(out_file_path)
bytes_read = start_byte
with open(out_file_path, 'ab') as f:
while True:
print(f'[{humanbytes(bytes_read)}] Downloading {url}', end='\r')
buf = response.read(chunk_size)
if not buf:
break
bytes_read += len(buf)
f.write(buf)
print(f'Downloaded {url}')
# sys.stdout.write('\n')
os.rename(out_file_path, out_file_final_path)
return out_file_final_path
def main(argv = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument('url', metavar='URL', help="The resource to download")
parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name")
parsed = parser.parse_args(argv)
url = urlparse(parsed.url)
basename = url.path.split('/')[-1]
dest = Path(basename if parsed.dest is None else parsed.dest)
if dest.is_dir():
dest /= basename
if os.path.exists(dest):
print(f'Error: file {dest} already exists')
return 1
download(url, dest)
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment