Created
November 5, 2018 18:37
-
-
Save javidcf/2d57e8dd19f6cd3972097c6349930d41 to your computer and use it in GitHub Desktop.
Basic msgpack server for Unreal Engine Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unreal_engine as ue | |
import asyncio | |
import threading | |
import queue | |
import msgpack | |
try: | |
import msgpack_numpy as mnp | |
mnp.patch() | |
except ModuleNotFoundError: pass | |
class MyServer: | |
DEFAULT_HOST = 'localhost' | |
DEFAULT_PORT = 7025 | |
def __init__(self): | |
self._host = MyServer.DEFAULT_HOST | |
self._port = MyServer.DEFAULT_PORT | |
# Managed by main thread | |
self._queue = None | |
self._thread_ready = None | |
self._thread_closed = None | |
# Managed by server thread | |
self._loop = None | |
self._server = None | |
self._server_future = None | |
self._client_tasks = None | |
def begin_play(self): | |
if self._thread_ready is not None or self._thread_closed is not None: | |
ue.log_error('Server already initiated.') | |
raise Exception | |
self._queue = queue.Queue() | |
self._thread_ready = threading.Event() | |
self._thread_closed = threading.Event() | |
threading.Thread(target=self._run_server).start() | |
def tick(self, delta_time): | |
try: | |
while True: | |
self._queue.get(block=False)() | |
except queue.Empty: pass | |
def end_play(self, reason): | |
if self._thread_ready is None or self._thread_closed is None: | |
ue.log_error('Server not started.') | |
raise Exception | |
self._thread_ready.wait() | |
asyncio.run_coroutine_threadsafe(self._stop_server(), self._loop) | |
ue.log(f'Waiting for server to stop.') | |
self._thread_closed.wait() | |
self._queue = None | |
self._thread_ready = None | |
self._thread_closed = None | |
def _process(self, obj): | |
ue.log(f'Received: {obj}') | |
return 'Received' | |
def _run_server(self): | |
self._loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self._loop) | |
self._server_future = asyncio.ensure_future(self._spawn_server()) | |
self._client_tasks = set() | |
self._thread_ready.set() | |
try: | |
try: | |
self._loop.run_forever() | |
finally: | |
tasks = asyncio.Task.all_tasks() | |
for task in tasks: | |
if not task.done(): | |
task.cancel() | |
self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | |
except Exception as e: | |
ue.log_error(e) | |
finally: | |
self._client_tasks = None | |
self._server_future = None | |
self._loop.close() | |
self._loop = None | |
self._thread_closed.set() | |
async def _spawn_server(self): | |
try: | |
ue.log(f'Trying to start listen server on {self._host}:{self._port}.') | |
self._server = await asyncio.start_server(self._new_client_connected, self._host, self._port) | |
await self._server.wait_closed() | |
except asyncio.CancelledError: pass | |
except Exception as e: | |
ue.log_error(e) | |
finally: | |
self._server = None | |
ue.log('Tcp server ended') | |
async def _new_client_connected(self, reader, writer): | |
task = asyncio.Task.current_task() | |
self._client_tasks.add(task) | |
try: | |
name = writer.get_extra_info('peername') | |
ue.log(f'New client connection from {name}.') | |
packer = msgpack.Packer() | |
unpacker = msgpack.Unpacker() | |
finished = False | |
while not finished: | |
data = await reader.read(1 << 20) | |
if not data: | |
ue.log(f'Client {name} disconnected.') | |
break | |
unpacker.feed(data) | |
for obj in unpacker: | |
response = await self._run_in_main_thread(self._process, obj) | |
writer.write(packer.pack(response)) | |
await writer.drain() | |
if obj is None: | |
finished = True | |
except asyncio.CancelledError: | |
ue.log(f'Dropping connection with {name}.') | |
except Exception as e: | |
ue.log_error(f'Error on connection with {name}: {e}.') | |
finally: | |
try: | |
writer.close() | |
if hasattr(writer, 'wait_closed'): # Since Python 3.7 | |
await writer.wait_closed() | |
except Exception: pass | |
finally: | |
self._client_tasks.remove(task) | |
ue.log(f'Connection with {name} terminated.') | |
async def _stop_server(self): | |
if self._server is not None: | |
self._server.close() | |
for task in self._client_tasks: | |
if not task.done(): | |
task.cancel() | |
await self._server_future | |
if self._loop is not None: | |
self._loop.stop() | |
async def _run_in_main_thread(self, callback, *args, **kwargs): | |
future = asyncio.Future() | |
cb = lambda: self._main_thread_callback_wrapper(future, callback, *args, **kwargs) | |
self._queue.put(cb, block=False) | |
result = await future | |
return result | |
def _main_thread_callback_wrapper(self, future, callback, *args, **kwargs): | |
result = callback(*args, **kwargs) | |
self._loop.call_soon_threadsafe(lambda: future.set_result(result)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment