Skip to content

Instantly share code, notes, and snippets.

@ddelange
Last active July 14, 2024 21:09
Show Gist options
  • Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Save ddelange/6517e3267fb74eeee804e3b1490b1c1d to your computer and use it in GitHub Desktop.
Handling the live output stream of a command
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with (
Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately
):
exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed
exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread
exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll() # block until both iterables are exhausted (process finished)
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode), retcode)
import logging
from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
check=True,
text=True,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=PIPE, stderr=STDOUT, **kwargs) as process:
for line in process.stdout:
stdout_handler(line[:-1])
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
stream_command(["echo", "test"])
# INFO:root:test
stream_command("cat ./nonexist", shell=True, check=False)
# ERROR:root:cat: ./nonexist: No such file or directory
stream_command(["echo", "test"], stdout_handler=print)
# test
stdout_lines = []
def handler(line):
print(line)
logging.info(line)
stdout_lines.append(line)
stream_command(["echo", "test"], stdout_handler=handler)
# test
# INFO:root:test
print(stdout_lines)
# ['test']
# stream to log file
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s:%(levelname)-7s %(filename)20s:%(lineno)-4d %(name)s:%(message)s",
filename="./capture.log",
filemode="w",
encoding="utf-8",
)
logging.info("test from python")
stream_command(["echo", "test from subprocess"])
@ddelange
Copy link
Author

ddelange commented Jul 6, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment