Last active
August 29, 2015 14:21
-
-
Save Cartroo/fa55b9f7c9aad84324c8 to your computer and use it in GitHub Desktop.
procpoller.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import select | |
import signal | |
import subprocess | |
import time | |
class ProcPoller(object): | |
"""Watches multiple processes for output on stdout and stderr.""" | |
def __init__(self): | |
self.results = {} | |
self.__fd_map = {} | |
self.__procs = {} | |
self.__closed_procs = set() | |
self.__poller = select.poll() | |
def __del__(self): | |
for context, proc_list in self.__procs.iteritems(): | |
try: | |
proc_list[0].kill() | |
proc_list[0].wait() | |
except Exception: | |
pass | |
def run_command(self, cmdline, context, cmd_input=None): | |
"""Executes the specified command-line. | |
By default, stdin of the process is not monitored and will remain | |
attached to the controlling terminal. Passing a string for | |
cmd_input will cause stdin to be attached to a pipe instead | |
and the specified input to be sent to the process. Passing an | |
empty string allows attaching stdin without sending any initial | |
input. In either case, passing further input can be done later | |
with send_input(), but if cmd_input was originally passed as None | |
this will fail with an exception. | |
""" | |
if context in self.__procs: | |
raise ValueError("duplicate context value supplied") | |
popen_args = {} | |
if cmd_input is not None: | |
popen_args["stdin"] = subprocess.PIPE | |
proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, close_fds=True, | |
**popen_args) | |
try: | |
proc.__context = context | |
self.results[context] = None | |
self.__procs[context] = [proc, None if cmd_input is None else ""] | |
self.handle_create(self, context) | |
for fd in (i.fileno() for i in (proc.stdout, proc.stderr)): | |
self.__fd_map[fd] = proc | |
self.__poller.register(fd, select.POLLIN | select.POLLHUP) | |
if cmd_input is not None: | |
self.__poller.register(proc.stdin.fileno(), 0) | |
self.__fd_map[proc.stdin.fileno()] = proc | |
self.send_input(context, cmd_input) | |
except: | |
# In case of problems, make sure we terminate the subprocess, but | |
# make sure the exception we re-throw isn't replaced in case the | |
# cleanup code throws its own exception. | |
try: | |
raise | |
finally: | |
try: | |
proc.kill() | |
proc.wait() | |
except: | |
pass | |
def send_input(self, context, cmd_input): | |
"""Send further output to an active process.""" | |
if context not in self.__procs: | |
raise KeyError("context not found") | |
if self.__procs[context][1] is None: | |
raise ValueError("specified process not open for input") | |
old_input = bool(self.__procs[context][1]) | |
self.__procs[context][1] += cmd_input | |
if not old_input and cmd_input: | |
self.__poller.modify(self.__procs[context][0].stdin.fileno(), | |
select.POLLOUT) | |
def send_signal(self, context, sig=signal.SIGTERM): | |
"""Send signal to specified process.""" | |
if context not in self.__procs: | |
raise KeyError("context not found") | |
self.__procs[context][0].send_signal(sig) | |
def poll(self, timeout=None): | |
"""Collect output from processes until stopped or timeout (in secs).""" | |
poll_timeout = timeout * 1000 if timeout is not None else None | |
timeout = time.time() + timeout if timeout is not None else None | |
while self.__fd_map or self.__closed_procs: | |
# While there are processes who have closed file descriptors but | |
# not yet terminated, check their status at least every 500ms. | |
if self.__closed_procs: | |
poll_timeout = min(poll_timeout, 500) | |
for fd, events in self.__poller.poll(poll_timeout): | |
proc = self.__fd_map[fd] | |
if events & select.POLLOUT: | |
proc_list = self.__procs[proc.__context] | |
if proc_list[1]: | |
sent = os.write(fd, proc_list[1]) | |
proc_list[1] = proc_list[1][sent:] | |
if not proc_list[1]: | |
self.__poller.modify(fd, 0) | |
if events & select.POLLIN: | |
data = os.read(fd, 4096) | |
if data: | |
self.handle_output(self, proc.__context, | |
fd == proc.stderr.fileno(), data) | |
if events & select.POLLHUP: | |
self.__poller.unregister(fd) | |
del self.__fd_map[fd] | |
if not proc.stdout.closed and proc.stdout.fileno() == fd: | |
proc.stdout.close() | |
elif not proc.stderr.closed and proc.stderr.fileno() == fd: | |
proc.stderr.close() | |
if proc.stdout.closed and proc.stderr.closed: | |
if self.__procs[proc.__context][1] is not None: | |
self.__poller.unregister(proc.stdin.fileno()) | |
del self.__fd_map[proc.stdin.fileno()] | |
proc.stdin.close() | |
self.__closed_procs.add(proc) | |
dead_procs = set() | |
try: | |
for proc in self.__closed_procs: | |
ret = proc.poll() | |
if ret is not None: | |
self.results[proc.__context] = ret | |
dead_procs.add(proc) | |
self.handle_terminate(self, proc.__context) | |
finally: | |
self.__closed_procs -= dead_procs | |
if timeout is not None: | |
now = time.time() | |
if now >= timeout: | |
break | |
poll_timeout = (timeout - now) * 1000 | |
def handle_output(self, poller, context, is_stderr, data): | |
"""Derived classes can override to intercept output.""" | |
pass | |
def handle_terminate(self, poller, context): | |
"""Derived classes can override to detect terminate.""" | |
pass | |
def handle_create(self, poller, context): | |
"""Derived classes can override to hook in to creation.""" | |
pass | |
class BufferingProcPoller(ProcPoller): | |
"""A sample ProcPoller implementation which simply buffers output.""" | |
def __init__(self): | |
"""Initialise output buffer dict.""" | |
super(BufferingProcPoller, self).__init__() | |
self.output = {} | |
def handle_create(self, poller, context): | |
"""Add empty output buffer entries.""" | |
self.output[context] = ["", ""] | |
def handle_output(self, poller, context, is_stderr, data): | |
"""Buffer up output.""" | |
index = 1 if is_stderr else 0 | |
self.output[context][index] += data | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment