Call PowerShell from Python with support for color output and exception handling using TTY for stdin, stdout, stderr.
New version without need for TTYs here: https://gist.github.com/nathan815/6e894b3394d42bd1c9e7dfae5142a678
import subprocess | |
import json | |
from json.decoder import JSONDecodeError | |
import errno | |
import os | |
import pty | |
import select | |
import subprocess | |
import re | |
from typing import Tuple, Any, Union | |
def run_powershell( | |
script: str, | |
function: str='', | |
print_stdout: bool=True, | |
print_stderr: bool=True, | |
output: str='json', | |
) -> Tuple[Any, list]: | |
''' | |
Invokes a PowerShell script or specific function | |
Parameters: | |
script: path to the PS script | |
function: PS function to call, can also pass any arguments to it | |
print_stdout: print PS stdout to console | |
print_stderr: print PS stderr to console | |
output: 'json' | 'raw' | |
json: parse output as JSON. The last output line from the PS script/function MUST be a single line of JSON. | |
raw: return all stdout lines without parsing | |
Returns: | |
(output, err) | |
Example: | |
my_script.ps1 | |
function myPsFunction(a, b) { | |
$hosts = 'esx01', 'esx02', a, b | |
return $hosts | ConvertTo-Json -Compress | |
} | |
test.py | |
data, err = run_powershell('my_script.ps1', function='myPsFunction x y') | |
print(data) | |
output: | |
['esx01', 'esx02', 'x', 'y'] | |
''' | |
if output not in ['json', 'raw']: | |
raise ValueError('invalid value for argument output') | |
caught_error_key = '_PsUnhandledException_' | |
# Import the PS script and run the specified function | |
ps_code = f''' | |
$ErrorActionPreference = "stop"; | |
try {{ | |
. {script}; | |
{function}; | |
}} catch {{ | |
$e = $_ | Select-Object ErrorDetails, ErrorRecord, CategoryInfo, ScriptStackTrace | |
$errorJson = @{{ {caught_error_key} = $e }} | ConvertTo-Json -Compress -Depth 10 | |
Write-Host $errorJson | |
exit 1 | |
}} | |
''' | |
print(ps_code) | |
cmd = ['pwsh', '-command', ps_code] | |
print(f'[PowerShell => {script} {function}]') | |
output_lines = {'stdout': [], 'stderr': []} | |
current_line = {'stdout': b'', 'stderr': b''} | |
ansi_escape_re = re.compile(r'(?:\x1b[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') | |
exit_code = None | |
for output in _run_cmd_tty_stream(cmd): | |
returncode = output.get('returncode', None) | |
if returncode is not None: | |
exit_code = returncode | |
else: | |
name = output['name'] | |
data = output['data'] | |
current_line[name] += data | |
eol = b'\n' in current_line[name][-2:] | |
if eol: | |
try: | |
line = current_line[name].decode('utf-8') | |
except AttributeError: | |
line = current_line[name] | |
if print_stdout and name == 'stdout' or print_stderr and name == 'stderr': | |
print(line, end='') | |
output_lines[name].append(ansi_escape_re.sub('', line).replace('\x1b=', '')) | |
current_line[name] = b'' | |
stdout = output_lines['stdout'] | |
stderr = output_lines['stderr'] | |
last_line_json = None | |
try: | |
last_line = stdout[-1] | |
last_line_json = json.loads(last_line) | |
except JSONDecodeError: | |
pass | |
if exit_code != 0: | |
# Check if an exception occurred in PS script | |
msg = f'PowerShell exited with non-zero code {exit_code}' | |
if caught_error_key in last_line_json: | |
error = last_line_json[caught_error_key] | |
info = error.get('CategoryInfo', {}) | |
raise PowerShellException(f"{msg} Exception: {info.get('Reason')} {info.get('TargetName')}", error) | |
raise PowerShellException(msg) | |
if output == 'raw': | |
return stdout, stderr | |
elif output == 'json': | |
if not last_line_json: | |
raise PowerShellException(f'JSON not detected on last line of PS stdout', stdout) | |
return last_line_json, stderr | |
def _run_cmd_tty_stream(cmd, bytes_input=b''): | |
"""Streams the output of cmd with bytes_input to stdin, | |
with stdin, stdout and stderr as TTYs. | |
Each yield from this function is: | |
{ "name": "stdout/stderr", "data": b"", "returncode": 0 } | |
Adapted from https://stackoverflow.com/a/52954716/507629 | |
and https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e | |
""" | |
# provide tty to enable line-buffering | |
mo, so = pty.openpty() # stdout | |
me, se = pty.openpty() # stderr | |
mi, si = pty.openpty() # stdin | |
p = subprocess.Popen( | |
cmd, | |
bufsize=1, stdin=si, stdout=so, stderr=se, | |
close_fds=True) | |
for fd in [so, se, si]: | |
os.close(fd) | |
os.write(mi, bytes_input) | |
timeout = 0.04 # seconds | |
readable = [mo, me] | |
fd_name = {mo: 'stdout', me: 'stderr'} | |
try: | |
while readable: | |
ready, _, _ = select.select(readable, [], [], timeout) | |
for fd in ready: | |
try: | |
data = os.read(fd, 512) | |
except OSError as e: | |
if e.errno != errno.EIO: | |
raise | |
# EIO means EOF on some systems | |
readable.remove(fd) | |
else: | |
if not data: # EOF | |
readable.remove(fd) | |
yield {'name': fd_name[fd], 'data': data, 'returncode': None} | |
finally: | |
for fd in [mo, me, mi]: | |
os.close(fd) | |
if p.poll() is None: | |
p.kill() | |
p.wait() | |
yield {'name': None, 'data': None, 'returncode': p.returncode} | |
return | |
class PowerShellException(Exception): | |
pass |
function myPsFunction($a, $b) { | |
$data = 'abc', 'def', $a, $b | |
Write-Host "This should be green" -ForegroundColor green | |
Write-Error "Test error!" | |
Write-Host "This should be magenta" -ForegroundColor magenta | |
$host.ui.WriteErrorLine("test raw stderror output") | |
Write-Host "No color" | |
return $data | ConvertTo-Json -Compress | |
} |
from powershell import run_powershell, PowerShellException | |
try: | |
data, err = run_powershell('test.ps1', function='myPsFunction arg1 arg2') | |
print('parsed array from PS: ', data) | |
for d in data: | |
print(d) | |
except PowerShellException as e: | |
print('PS Exception: ', e) |
Call PowerShell from Python with support for color output and exception handling using TTY for stdin, stdout, stderr.
New version without need for TTYs here: https://gist.github.com/nathan815/6e894b3394d42bd1c9e7dfae5142a678