Last active
August 10, 2023 18:00
-
-
Save egemenertugrul/29d33c1b5f5f69aa528cba2611e09e93 to your computer and use it in GitHub Desktop.
Python Websockets Full Duplex Asynchronous Multiprocessing Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import multiprocessing as mp | |
from websockets.server import serve | |
from websockets.exceptions import ConnectionClosedOK | |
import time | |
class DuplexWebsocketsServerProcess(mp.Process): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._send_queue = mp.Queue() | |
def run(self): | |
asyncio.run(self.main()) | |
def send(self, item): | |
self._send_queue.put(item) | |
async def consumer_handler(self, websocket): | |
async for message in websocket: | |
print(f"Received: {message}") | |
print(f"--") | |
async def producer_handler(self, websocket): | |
while True: | |
message = await asyncio.get_event_loop().run_in_executor(None, self._send_queue.get) | |
print(f"Sending: {message}") | |
try: | |
await websocket.send(message) | |
except ConnectionClosedOK: | |
print("Connection closed..") | |
break | |
async def handler(self, websocket): | |
await asyncio.gather( | |
self.consumer_handler(websocket), | |
self.producer_handler(websocket) | |
) | |
async def main(self): | |
async with serve(self.handler, "0.0.0.0", 8765) as server: | |
await asyncio.Future() | |
if __name__ == '__main__': | |
socket_process = DuplexWebsocketsServerProcess(daemon=True) | |
socket_process.start() | |
time.sleep(2) | |
for i in range(10): | |
socket_process.send(f"Message from server-{i}") | |
print(f"--\n") | |
time.sleep(4) | |
socket_process.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import websockets | |
class TestWebsocketsClient: | |
def __init__(self): | |
asyncio.run(self.main()) | |
async def main(self): | |
async with websockets.connect("ws://127.0.0.1:8765") as websocket: | |
while True: | |
async for message in websocket: | |
print(f"Received: {message}") | |
client_msg = f"Message from client-{message.split('-')[-1]}" | |
await asyncio.sleep(2) | |
print(f"Sending: {client_msg}") | |
print(f"--") | |
await websocket.send(client_msg) | |
if __name__ == '__main__': | |
websocketsClient = TestWebsocketsClient() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment