Skip to content

Instantly share code, notes, and snippets.

@h0tw1r3
Created September 3, 2024 04:38
Show Gist options
  • Save h0tw1r3/d80ac9bad8e6eea4e4a1a6f25136b9af to your computer and use it in GitHub Desktop.
Save h0tw1r3/d80ac9bad8e6eea4e4a1a6f25136b9af to your computer and use it in GitHub Desktop.
python async subprocess example module with stdout and stderr support
#!/usr/bin/env python3
import collections
import asyncio
import sys
class AsyncSubProcess:
def __init__(self, program, *args, env={}, output=True, cwd=None):
self.proc = None
self.program = program
self.args = args
self.env = env
self.cwd = cwd
self.callback_stdout = output and (lambda x: print(x, end=""))
self.callback_stderr = output and (lambda x: print(x, end="", file=sys.stderr))
self.tasks = {}
async def exec(self):
self.proc = await asyncio.create_subprocess_exec(
self.program,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self.env,
cwd=self.cwd
)
@staticmethod
async def read_stream(stream, callback=None):
output = collections.deque(maxlen=50000)
while not stream.at_eof():
line = await stream.readline()
output.append(line.decode("UTF-8"))
if line and callback:
callback(output[-1])
return output
def create_tasks(self):
self.tasks["stdout"] = asyncio.create_task(
self.read_stream(self.proc.stdout, self.callback_stdout)
)
self.tasks["stderr"] = asyncio.create_task(
self.read_stream(self.proc.stderr, self.callback_stderr)
)
async def wait(self):
self.create_tasks()
await asyncio.wait(self.tasks.values())
return (
self.proc.returncode,
"".join(self.tasks["stdout"].result()),
"".join(self.tasks["stderr"].result()),
)
@classmethod
async def main(self, *cmd, **kw):
runner = self(*cmd, **kw)
await runner.exec()
return await runner.wait()
def run(*cmd, **kw):
return asyncio.run(AsyncSubProcess.main(*cmd, **kw))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment