-
-
Save peddamat/ac8f78a375d003d69669d75a012a6c46 to your computer and use it in GitHub Desktop.
An example of how to have komorebi subscribe and send events to a named pipe in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import win32pipe | |
import win32file | |
import pywintypes | |
import subprocess | |
import json | |
import time | |
KOMOREBI_BUFF_SIZE = 64 * 1024 | |
KOMOREBI_PIPE_NAME = "yasb" | |
if __name__ == "__main__": | |
print("Creating named pipe:", KOMOREBI_PIPE_NAME) | |
pipe = win32pipe.CreateNamedPipe( | |
f"\\\\.\\pipe\\{KOMOREBI_PIPE_NAME}", | |
win32pipe.PIPE_ACCESS_DUPLEX, | |
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, | |
1, | |
KOMOREBI_BUFF_SIZE, | |
KOMOREBI_BUFF_SIZE, | |
0, | |
None | |
) | |
print("Waiting for Komorebi") | |
proc = subprocess.Popen(["komorebic.exe", "subscribe", KOMOREBI_PIPE_NAME]) | |
proc.communicate() | |
while proc.returncode != 0: | |
print(f"Komorebic.exe failed with error code {proc.returncode}. Retrying in 5 seconds...") | |
time.sleep(5) | |
proc = subprocess.Popen(["komorebic.exe", "subscribe", KOMOREBI_PIPE_NAME]) | |
proc.communicate() | |
try: | |
win32pipe.ConnectNamedPipe(pipe, None) | |
print("Komorebi Connected") | |
while True: | |
result, data = win32file.ReadFile(pipe, KOMOREBI_BUFF_SIZE, None) | |
# Filters out newlines | |
if not data.strip(): | |
continue | |
event = json.loads(data.decode("utf-8")) | |
print("Received event:", event) | |
except pywintypes.error as e: | |
if e.args[0] == 109: | |
print("Komorebi Disconnected") | |
else: | |
print("Unknown Error", e) | |
finally: | |
win32file.CloseHandle(pipe) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment