Created
December 1, 2023 15:28
-
-
Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.
Download a file in Python 3 with progress reporting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from hashlib import sha512 | |
from pathlib import Path | |
from tarfile import TarFile | |
from tempfile import TemporaryDirectory | |
from typing import Callable, TypeVar, cast | |
from urllib.parse import urlparse, urlunparse, ParseResult | |
import argparse | |
import errno | |
import math | |
import os | |
import shutil | |
import sys | |
import urllib3 | |
# Name of the app that is downloading things | |
# Used for e.g. creating a directory in the home directory of the user. | |
APP_NAME = 'myapp' | |
K = TypeVar('K') | |
V = TypeVar('V') | |
def omit(d: dict[K, V], *keys) -> dict[K, V]: | |
out = dict() | |
for k, v in d.items(): | |
if k not in keys: | |
out[k] = v | |
return out | |
UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'] | |
def humanbytes(bs: int): | |
if bs == 0: | |
return '0B' | |
i = math.floor(math.log(bs, 1024)) | |
return f'{(bs / pow(1024, i)):.2f}{UNITS[i]}' | |
def rimraf(filepath: Path): | |
if filepath == Path.cwd(): | |
raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory') | |
if filepath == filepath.root: | |
raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive') | |
if filepath == Path.home(): | |
raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory') | |
shutil.rmtree(filepath, ignore_errors=True) | |
http = urllib3.PoolManager() | |
_homedir = Path.home() / ('.' + APP_NAME) | |
def mkdirp(path: str | Path) -> None: | |
Path(path).mkdir(parents=True, exist_ok=True) | |
def download(url: str, dest: Path | None = None, chunk_size = 1024 * 50) -> Path: | |
if isinstance(url, ParseResult): | |
parsed = url | |
url = urlunparse(url) | |
else: | |
url = str(url) | |
parsed = urlparse(url) | |
if dest is None: | |
extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:]) | |
filename = sha512(url.encode('utf8')).hexdigest() | |
out_file_path = _homedir / 'downloads' / (filename + extnames) | |
out_file_final_path = _homedir / 'downloads' / (filename + extnames) | |
else: | |
out_file_path = dest.parent / (dest.name + '.downloading') | |
out_file_final_path = dest | |
if os.path.exists(out_file_final_path): | |
return out_file_final_path | |
mkdirp(out_file_path.parent) | |
try: | |
start_byte = out_file_path.stat().st_size | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
start_byte = 0 | |
else: | |
raise e | |
request_headers = { | |
'Range': f'bytes={start_byte}-', | |
} | |
response = http.request('GET', url, preload_content=False, headers=request_headers) | |
total_bytes = int(response.headers['Content-Length']) if 'Content-Length' in response.headers else None | |
if 'Content-Range' not in response.headers: | |
start_byte = 0 | |
rimraf(out_file_path) | |
bytes_read = start_byte | |
with open(out_file_path, 'ab') as f: | |
while True: | |
print(f'[{humanbytes(bytes_read)}] Downloading {url}', end='\r') | |
buf = response.read(chunk_size) | |
if not buf: | |
break | |
bytes_read += len(buf) | |
f.write(buf) | |
print(f'Downloaded {url}') | |
# sys.stdout.write('\n') | |
os.rename(out_file_path, out_file_final_path) | |
return out_file_final_path | |
T = TypeVar('T') | |
def ident(x: T) -> T: return x | |
def nonnull(value: T | None) -> T: return cast(T, value) | |
def extract_tar(tar: TarFile, dest, update_path: Callable[[str], str | None] | None = None, chunk_size=1024 * 50): | |
if update_path is None: | |
update_path = ident | |
dest_path = Path(dest) | |
while True: | |
f = tar.next() | |
if f is None: | |
break | |
updated_path = update_path(f.name) | |
if updated_path is None: | |
continue | |
out_path = dest_path / updated_path | |
if f.isfile(): | |
try: | |
mtime = out_path.stat().st_mtime | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
mtime = None | |
else: | |
raise | |
if mtime is None or f.mtime > mtime: | |
print(f"Extracting {f.name}", end='\r') | |
mkdirp(out_path.parent) | |
with open(out_path, 'wb') as out, nonnull(tar.extractfile(f)) as i: | |
bytes_written = 0 | |
while True: | |
if f.size > 1024 * 1024 * 5: | |
progress = bytes_written / f.size | |
print(f"[{progress:.2f}] Extracting {f.name}", end='\r') | |
buf = i.read(chunk_size) | |
if not buf: | |
break | |
bytes_written += len(buf) | |
out.write(buf) | |
os.utime(out_path, (f.mtime, f.mtime)) | |
os.chmod(out_path, f.mode) | |
else: | |
print(f"Skipping {f.name}", end='\r') | |
elif f.isdir(): | |
mkdirp(out_path) | |
os.utime(out_path, (f.mtime, f.mtime)) | |
os.chmod(out_path, f.mode) | |
elif f.issym(): | |
mkdirp(out_path.parent) | |
rimraf(out_path) | |
os.symlink(f.linkname, out_path) | |
else: | |
raise ValueError(f'unsupported tar entry for {f.name}') | |
def extract(filepath: Path, dest: Path, strip_path=0): | |
filepath = Path(filepath) | |
dest = Path(dest) | |
compression = [] | |
def update_path(path: str): | |
chunks = path.split('/') | |
if len(chunks) > strip_path: | |
return os.path.sep.join(chunks[strip_path:]) | |
with TemporaryDirectory('-extracted', prefix=filepath.stem) as d: | |
for ext in reversed(filepath.suffixes): | |
if ext == '.zip': | |
from zipfile import ZipFile | |
with ZipFile(filepath) as f: | |
f.extractall(d) | |
print(d) | |
break | |
elif ext == '.gz': | |
compression.append('gz') | |
elif ext == '.tar': | |
import tarfile | |
with tarfile.open(filepath, 'r') as tar: | |
extract_tar(tar, dest, update_path=update_path) | |
def is_extractable(path: Path) -> bool: | |
exts = path.name.split('.')[1:] | |
for ext in exts: | |
if ext == 'xz' or ext == 'gz' or ext == 'bz' or ext == 'bz2' or ext == 'lz': | |
continue | |
if ext == 'zip' or ext == 'tar': | |
return True | |
break | |
return False | |
def main(argv = None) -> int: | |
parser = argparse.ArgumentParser() | |
parser.add_argument('url', metavar='URL', help="The resource to download") | |
parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name") | |
parsed = parser.parse_args(argv) | |
url = urlparse(parsed.url) | |
basename = url.path.split('/')[-1] | |
dest = Path(basename if parsed.dest is None else parsed.dest) | |
if dest.is_dir(): | |
dest /= basename | |
if os.path.exists(dest): | |
print(f'Error: file {dest} already exists') | |
return 1 | |
download(url, dest) | |
if is_extractable(dest): | |
extract(dest, strip_path=1) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) | |
#!/usr/bin/env python3 | |
from hashlib import sha512 | |
from pathlib import Path | |
from tarfile import TarFile | |
from tempfile import TemporaryDirectory | |
from typing import Callable, TypeVar, cast | |
from urllib.parse import urlparse, urlunparse, ParseResult | |
import argparse | |
import errno | |
import math | |
import os | |
import shutil | |
import sys | |
import urllib3 | |
# Name of the app that is downloading things | |
# Used for e.g. creating a directory in the home directory of the user. | |
APP_NAME = 'myapp' | |
K = TypeVar('K') | |
V = TypeVar('V') | |
def omit(d: dict[K, V], *keys) -> dict[K, V]: | |
out = dict() | |
for k, v in d.items(): | |
if k not in keys: | |
out[k] = v | |
return out | |
UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'] | |
def humanbytes(bs: int): | |
if bs == 0: | |
return '0B' | |
i = math.floor(math.log(bs, 1024)) | |
return f'{(bs / pow(1024, i)):.2f}{UNITS[i]}' | |
def rimraf(filepath: Path): | |
if filepath == Path.cwd(): | |
raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory') | |
if filepath == filepath.root: | |
raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive') | |
if filepath == Path.home(): | |
raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory') | |
shutil.rmtree(filepath, ignore_errors=True) | |
http = urllib3.PoolManager() | |
_homedir = Path.home() / ('.' + APP_NAME) | |
def mkdirp(path: str | Path) -> None: | |
Path(path).mkdir(parents=True, exist_ok=True) | |
def download(url: str, dest: Path | None = None, chunk_size = 1024 * 50) -> Path: | |
if isinstance(url, ParseResult): | |
parsed = url | |
url = urlunparse(url) | |
else: | |
url = str(url) | |
parsed = urlparse(url) | |
if dest is None: | |
extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:]) | |
filename = sha512(url.encode('utf8')).hexdigest() | |
out_file_path = _homedir / 'downloads' / (filename + extnames) | |
out_file_final_path = _homedir / 'downloads' / (filename + extnames) | |
else: | |
out_file_path = dest.parent / (dest.name + '.downloading') | |
out_file_final_path = dest | |
if os.path.exists(out_file_final_path): | |
return out_file_final_path | |
mkdirp(out_file_path.parent) | |
try: | |
start_byte = out_file_path.stat().st_size | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
start_byte = 0 | |
else: | |
raise e | |
request_headers = { | |
'Range': f'bytes={start_byte}-', | |
} | |
response = http.request('GET', url, preload_content=False, headers=request_headers) | |
total_bytes = int(response.headers['Content-Length']) if 'Content-Length' in response.headers else None | |
if 'Content-Range' not in response.headers: | |
start_byte = 0 | |
rimraf(out_file_path) | |
bytes_read = start_byte | |
with open(out_file_path, 'ab') as f: | |
while True: | |
print(f'[{humanbytes(bytes_read)}] Downloading {url}', end='\r') | |
buf = response.read(chunk_size) | |
if not buf: | |
break | |
bytes_read += len(buf) | |
f.write(buf) | |
print(f'Downloaded {url}') | |
# sys.stdout.write('\n') | |
os.rename(out_file_path, out_file_final_path) | |
return out_file_final_path | |
def main(argv = None) -> int: | |
parser = argparse.ArgumentParser() | |
parser.add_argument('url', metavar='URL', help="The resource to download") | |
parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name") | |
parsed = parser.parse_args(argv) | |
url = urlparse(parsed.url) | |
basename = url.path.split('/')[-1] | |
dest = Path(basename if parsed.dest is None else parsed.dest) | |
if dest.is_dir(): | |
dest /= basename | |
if os.path.exists(dest): | |
print(f'Error: file {dest} already exists') | |
return 1 | |
download(url, dest) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment