Last active
June 23, 2022 16:19
-
-
Save szaydel/c9989bbedd108faf4c1f to your computer and use it in GitHub Desktop.
In parallel spin-up multiple sub-processes and poll/wait on each process to make sure it completes. Originally from http://stackoverflow.com/questions/636561/how-can-i-run-an-external-command-asynchronously-from-python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Cross-platform asynchronous version of subprocess.Popen | |
# Copyright (c) 2011 James Buchwald | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
''' | |
Provides an asynchronous wrapper around the subprocess module. | |
This is based on a code snippet by J. F. Sebastian, posted at StackOverflow | |
at the following URL: http://stackoverflow.com/questions/375427/ | |
''' | |
from subprocess import PIPE, Popen | |
from threading import Thread, Lock | |
from warnings import warn | |
from collections import deque | |
class AsyncPopen(Popen): | |
''' | |
Asynchronous wrapper around subprocess.Popen. | |
Do not directly access AsyncPopen.stdout, AsyncPopen.stderr, or | |
AsyncPopen.stdin. Instead, use the (non-blocking asynchronous) | |
AsyncPopen.communicate() method. | |
This reads entire lines from stdout and stderr at once. | |
Based on a code snippet by J. F. Sebastian, found at the following URL: | |
http://stackoverflow.com/questions/375427/ | |
''' | |
def __init__(self, args, bufsize=0, executable=None, | |
stdin=None, stdout=None, stderr=None, | |
preexec_fn=None, close_fds=False, shell=False, | |
cwd=None, env=None, universal_newlines=False, | |
startupinfo=None, creationflags=0): | |
''' | |
Creates a new AsyncPopen instance. | |
All of the arguments are the same as for subprocess.Popen with several | |
exceptions: | |
* stdin, stdout, and stderr can only be None or PIPE. | |
In Python 3, all data read from stdout and stderr will be treated as | |
the "bytes" built-in type; it is up to the user to convert this type | |
to the appropriate character type, if desired. | |
''' | |
# Check for use of stdin, stdout, stderr values other than NONE, PIPE | |
if stdin != None and stdin != PIPE: | |
warn("stdin must be either None or subprocess.PIPE.") | |
_stdin = None | |
else: | |
_stdin = stdin | |
if stdout != None and stdout != PIPE: | |
warn("stdout must be either None or subprocess.PIPE.") | |
_stdout = None | |
else: | |
_stdout = stdout | |
if stderr != None and stderr != PIPE: | |
warn("stderr must be either None or subprocess.PIPE.") | |
_stderr = None | |
else: | |
_stderr = stderr | |
# Inherit base class behavior. | |
super(AsyncPopen, self).__init__(args, bufsize=bufsize, | |
executable=executable, stdin=_stdin, | |
stdout=_stdout, stderr=_stderr, | |
preexec_fn=preexec_fn, | |
close_fds=close_fds, | |
shell=shell, cwd=cwd, env=env, | |
universal_newlines=universal_newlines, | |
startupinfo=startupinfo, | |
creationflags=creationflags) | |
# Start the I/O polling threads. | |
self.use_stdout = False | |
'''Flag to use stdout.''' | |
self.use_stderr = False | |
'''Flag to use stderr.''' | |
self.use_stdin = False | |
'''Flag to use stdin.''' | |
if _stdout == PIPE: | |
self.use_stdout = True | |
self.stdout_queue = deque() | |
'''Queue of data read from stdout.''' | |
self.stdout_lock = Lock() | |
'''Lock used for stdout queue synchronization.''' | |
self.stdout_thread = Thread(target=self._ThreadedOutputQueue, | |
args=(self.stdout, self.stdout_queue, | |
self.stdout_lock)) | |
'''Queue management thread for stdout.''' | |
self.stdout_thread.daemon = True | |
self.stdout_thread.start() | |
if _stderr == PIPE: | |
self.use_stderr = True | |
self.stderr_queue = deque() | |
'''Queue of data read from stderr.''' | |
self.stderr_lock = Lock() | |
'''Lock used for stderr queue synchronization.''' | |
self.stderr_thread = Thread(target=self._ThreadedOutputQueue, | |
args=(self.stderr, self.stderr_queue, | |
self.stderr_lock)) | |
'''Queue management thread for stderr.''' | |
self.stderr_thread.daemon = True | |
self.stderr_thread.start() | |
if _stdin == PIPE: | |
self.use_stdin = True | |
self.stdin_queue = deque() | |
'''Queue of data to write to stdin.''' | |
self.stdin_lock = Lock() | |
'''Lock used for stdin queue synchronization.''' | |
self.stdin_thread = Thread(target=self._ThreadedInputQueue, | |
args=(self.stdin, self.stdin_queue, | |
self.stdin_lock)) | |
'''Queue management thread for stdin.''' | |
self.stdin_thread.daemon = True | |
self.stdin_thread.start() | |
def _ThreadedOutputQueue(self, pipe, queue, lock): | |
''' | |
Called from the thread to update an output (stdout, stderr) queue. | |
''' | |
try: | |
while True: | |
chunk = pipe.readline() | |
if not chunk: | |
# hit end-of-file | |
break | |
lock.acquire() | |
queue.append(chunk) | |
lock.release() | |
except: | |
pass | |
finally: | |
pipe.close() | |
def _ThreadedInputQueue(self, pipe, queue, lock): | |
''' | |
Called from the thread to update an input (stdin) queue. | |
''' | |
try: | |
while True: | |
lock.acquire() | |
while len(queue) > 0: | |
chunk = queue.popleft() | |
pipe.write(chunk) | |
lock.release() | |
pipe.flush() | |
except: | |
pass | |
finally: | |
pipe.close() | |
def communicate(self, input=None): | |
''' | |
Interact with process: Enqueue data to be sent to stdin. Return data | |
read from stdout and stderr as a tuple (stdoutdata, stderrdata). Do | |
NOT wait for process to terminate. | |
''' | |
if self.use_stdin and input: | |
# enqueue data | |
self.stdin_lock.acquire() | |
self.stdin_queue.append(input) | |
self.stdin_lock.release() | |
stdoutdata = None | |
stderrdata = None | |
if self.use_stdout: | |
# get data | |
data = b"" | |
self.stderr_lock.acquire() | |
try: | |
while len(self.stdout_queue) > 0: | |
data += self.stdout_queue.popleft() | |
except: | |
self.stderr_lock.release() | |
raise | |
self.stderr_lock.release() | |
if data: stdoutdata = data | |
if self.use_stderr: | |
# get data | |
data = b"" | |
self.stderr_lock.acquire() | |
try: | |
while len(self.stderr_queue) > 0: | |
data += self.stderr_queue.popleft() | |
except: | |
self.stderr_lock.release() | |
raise | |
self.stderr_lock.release() | |
if data: stderrdata = data | |
return (stdoutdata, stderrdata) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from subprocess import Popen, PIPE | |
import time | |
running_procs = [ | |
Popen(['/bin/cat', '%s' % path], stdout=PIPE, stderr=PIPE) | |
for path in ['/tmp/file0', '/tmp/file1', '/tmp/file2'] | |
] | |
def handle_results(x): | |
return | |
while running_procs: | |
for proc in running_procs: | |
print(proc) | |
retcode = proc.poll() | |
if retcode is not None: # Process finished. | |
running_procs.remove(proc) | |
break | |
else: # No process is done, wait a bit and check again. | |
time.sleep(.1) | |
continue | |
# Here, `proc` has finished with return code `retcode` | |
if retcode != 0: | |
"""Error handling.""" | |
handle_results(proc.stdout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment