Last active
May 8, 2022 19:35
-
-
Save zopieux/d81d4dfd3c1352fb737dcc5af42c9344 to your computer and use it in GitHub Desktop.
Xorg clipboard + streamlink + mpv in a mosaic / tile layout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env nix-shell | |
#! nix-shell -p clipnotify -p xclip -p mpv -p streamlink -i python3 | |
""" | |
Takes multiple URIs (including URLs, local files should work as well), | |
resolves them through streamlink to get the stream link when applicable, | |
then plays them all in a nicely packed tiled layout. | |
If no URI is provided as argument, starts an interactive selection where | |
Xorg-clipboard copied URLs get accumulated until the program is ^C. | |
Layouts: | |
a | |
/ | |
a b | |
/ | |
a b | |
c d | |
/ | |
a b | |
c d | |
e f | |
/ | |
a b c | |
d e f | |
g h i | |
/ | |
(up to 16 tiles) | |
""" | |
import asyncio | |
import math | |
import hashlib | |
import subprocess | |
import time | |
import sys | |
import pathlib | |
from typing import Iterable | |
async def cache_get_or_set(key: str, cb) -> str: | |
k = f'.scache.{hashlib.sha1(key.encode()).hexdigest()}' | |
path = pathlib.Path('/tmp') / k | |
try: | |
mtime = path.stat().st_mtime | |
if time.time() - mtime > 60*2: | |
path.unlink() | |
except FileNotFoundError: | |
pass | |
try: | |
with path.open('r') as f: | |
return f.read() | |
except FileNotFoundError: | |
value = await cb(key) | |
with path.open('w') as f: | |
f.write(value) | |
return value | |
def tiles_to_xstack(n: int) -> str: | |
if n == 1: | |
return None | |
elif 2 <= n <= 6: | |
cols = 2 | |
elif 7 <= n <= 9: | |
cols = 3 | |
elif 10 <= n <= 16: | |
cols = 4 | |
else: | |
raise ValueError(f'unsupported tile count {n}') | |
rows = n // cols | |
def gen(): | |
for i in range(n): | |
col = i % cols | |
row = i // cols | |
scol = '+'.join(f'w{c*rows}' for c in range(0, col)) or '0' | |
srow = '+'.join(f'h{r}' for r in range(0, row)) or '0' | |
yield f'{scol}_{srow}' | |
layout= '|'.join(gen()) | |
scale = ';'.join(f'[vid{i+1}]scale=1280:-1[vs{i+1}]' for i in range(n)) | |
svids = ''.join(f'[vs{i+1}]' for i in range(n)) | |
return f'{scale};{svids}xstack=inputs={n}:layout={layout}:fill=black[vo]' | |
def mpv_cmd_line(uris: Iterable[str]): | |
cmd = ['mpv', uris[0]] + [f'--external-file={f}' for f in uris[1:]] | |
if (xstack := tiles_to_xstack(len(uris))): | |
cmd.append(f"--lavfi-complex={xstack}") | |
return cmd | |
def from_clipboard_observe(): | |
import signal | |
async def clip(): | |
p = await asyncio.create_subprocess_exec('clipnotify') | |
await p.wait() | |
uris = {} | |
async def observe(): | |
try: | |
while True: | |
await clip() | |
p = await asyncio.create_subprocess_shell( | |
'xclip -selection cliboard -out', | |
stdout=asyncio.subprocess.PIPE) | |
out, err = await p.communicate() | |
try: | |
out = out.decode().strip() | |
if not (out.startswith('http://') or out.startswith('https://')): | |
raise ValueError() | |
if not out in uris: | |
uris[out] = None | |
print(f"collected {len(uris)}, last is {out[:64]}") | |
except: | |
pass | |
except asyncio.CancelledError: | |
pass | |
try: | |
return asyncio.run(observe()) | |
except KeyboardInterrupt: | |
pass | |
return list(uris.keys()) | |
async def gather_with_concurrency(n, tasks): | |
semaphore = asyncio.Semaphore(n) | |
async def sem_task(task): | |
async with semaphore: | |
return await task | |
return await asyncio.gather(*(sem_task(task) for task in tasks)) | |
async def streamlink(uri: str): | |
p = await asyncio.create_subprocess_exec( | |
'streamlink', '--stream-url', uri, 'best', | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE) | |
out, err = await p.communicate() | |
if p.returncode != 0 or not out.strip(): | |
return uri | |
return out.decode().strip() | |
async def cached_resolve(uri: str): | |
if uri.startswith("http:") or uri.startswith("https:"): | |
return await cache_get_or_set(uri, streamlink) | |
return uri | |
async def resolve(uris): | |
resolve_tasks = [cached_resolve(uri) for uri in uris] | |
return await gather_with_concurrency(3, resolve_tasks) | |
if __name__ == '__main__': | |
if len(sys.argv) > 1: | |
uris = list(dict.fromkeys(sys.argv[1:])) | |
else: | |
print("collecting clipboard URIs, ^C when done") | |
uris = from_clipboard_observe() | |
print() | |
print(f"resolving {len(uris)} URIs") | |
uris = asyncio.run(resolve(uris)) | |
cmd_line = mpv_cmd_line(uris) | |
print(' '.join(cmd_line)) | |
subprocess.run(cmd_line) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment