Skip to content

Instantly share code, notes, and snippets.

View ddelange's full-sized avatar
["translatio", "imitatio", "aemulatio"]

ddelange ddelange

["translatio", "imitatio", "aemulatio"]
View GitHub Profile
ddelange /
Last active August 7, 2024 20:47
Convert iTunes Music Library xml to m3u8 playlists
# pip install click
# python --help
import logging
import plistlib
import re
import typing as tp
from pathlib import Path
from urllib.parse import unquote
import click
ddelange /
Created April 25, 2024 09:34
Conditionally copy or encode video stream into avc (x264) mp4 with ffmpeg and ffprobe
#!/usr/bin/env bash
set -euxo pipefail
# full metadata payload: $(ffprobe -loglevel error -print_format json -show_format -show_streams "${video_in}")
is_avc=$(ffprobe -loglevel error -select_streams v:0 -show_entries stream=is_avc -of default=noprint_wrappers=1:nokey=1 "${video_in}")
ddelange /
Created March 25, 2024 11:38
Multiprocessing numpy using zero-copy SharedNumpyArray and ProcessPoolExecutor.imap
from collections import deque
from concurrent.futures import ProcessPoolExecutor as _ProcessPoolExecutor
from multiprocessing.shared_memory import SharedMemory
import numpy as np
class ProcessPoolExecutor(_ProcessPoolExecutor):
"""Subclass with a lazy consuming imap method."""
ddelange /
Last active February 15, 2024 01:51 — forked from ivan-c/
compile git with openssl instead of gnutls
#!/usr/bin/env bash
# original gist from pescobar/
# changes by LaggAt:
# * to be usable on Raspbian / tested RPi3 and
# * for automatic depency resolving
# changes by ivan-c:
# * add `apt-get update`
# changes by ddelange:
# * add `set -euxo pipefail`
# * remove `--allow-downgrades`
ddelange /
Last active July 14, 2024 21:09
Handling the live output stream of a command
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
ddelange /
Last active July 16, 2024 12:12
The missing ThreadPoolExecutor.imap
from collections import deque
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
class ThreadPoolExecutor(_ThreadPoolExecutor):
"""Subclass with a lazy consuming imap method."""
def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2):
"""Ordered imap that lazily consumes iterables ref"""
futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
ddelange /
Last active May 15, 2023 08:27
Multithreaded S3 downloads
# pip install smart_open[s3]
from collections import deque
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
from functools import partial
from typing import Callable, Dict, Optional, Iterable, Iterator, Sequence
import boto3
import botocore
import smart_open
ddelange /
Last active September 22, 2022 06:29
Create a PIP_FIND_LINKS page containing all release assets
ddelange /
Last active December 19, 2022 07:09
Mux multiple subtitle files into an mp4 file
# pip install sh pycountry
import re
from pathlib import Path
import pycountry
from sh import ffmpeg
def mux_mp4_subs(inp, outp, *subs, _cwd=None, **subs_map):
"""Mux multiple subtitle files into an mp4 file.
ddelange /
Last active July 4, 2023 14:12
Scrape the GitHub Stars Lists for a user
#!/usr/bin/env python3
# $ pip install pandas playwright tabulate && python -m playwright install --with-deps webkit
# $ GITHUB_REPOSITORY_OWNER=octocat python
import asyncio
import os
from contextlib import asynccontextmanager
import pandas as pd
from playwright.async_api import async_playwright
from playwright.async_api._generated import Browser, Locator