Created
August 27, 2018 17:11
-
-
Save tarruda/129c374d8f59a04d35a852e125ce77f0 to your computer and use it in GitHub Desktop.
msgpack-rpc implementation for python asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
import mpack | |
class ErrorResponse(BaseException): | |
pass | |
class MpackRpcSession(object): | |
def __init__(self, reader, writer, mpack_session=None): | |
self._reader = reader | |
self._writer = writer | |
self._session = mpack_session or mpack.Session() | |
self._polling = False | |
# FIXME _loop is a private member of StreamReader, but it also seems | |
# redundant to accept an extra loop parameter since reader/writer are | |
# already associated with loop. Maybe there's a cleaner way? | |
self._loop = reader._loop | |
self._message_queue = collections.deque() | |
self._eof = False | |
self._buf = None | |
def _poll_start(self): | |
if self._polling: | |
raise Exception('Already polling') | |
self._polling = True | |
def _poll_stop(self): | |
self._polling = False | |
async def _read(self): | |
if self._buf: | |
rv = self._buf | |
self._buf = None | |
else: | |
if self._reader.at_eof(): | |
raise Exception('Connection was closed') | |
return await self._reader.read(0xfff) | |
async def _receive(self): | |
msg_type = None | |
while not msg_type: | |
chunk = await self._read() | |
if not chunk: | |
return | |
offs, msg_type, name_or_err, args_or_result, id_or_data = ( | |
self._session.receive(chunk)) | |
if not msg_type: | |
continue | |
chunk = chunk[offs:] | |
if chunk: | |
# received more than one message, save the extra chunk for | |
# later | |
self._buf = chunk | |
if msg_type == 'response': | |
# set the result of the saved future | |
assert isinstance(id_or_data, asyncio.Future) | |
if name_or_err: | |
id_or_data.set_exception(ErrorResponse(name_or_err[1])) | |
else: | |
id_or_data.set_result(args_or_result) | |
else: | |
assert msg_type in ['request', 'notification'] | |
# enqueue the message for later processing | |
self._message_queue.append((msg_type, name_or_err, | |
args_or_result, id_or_data)) | |
async def _wait_for(self, future): | |
self._poll_start() | |
while not future.done(): | |
await self._receive() | |
self._poll_stop() | |
async def next_message(self): | |
self._poll_start() | |
while not self._message_queue: | |
await self._receive() | |
self._poll_stop() | |
return self._message_queue.popleft() | |
def request(self, method, *args): | |
future = asyncio.Future(loop=self._loop) | |
request_data = self._session.request(method, args, data=future) | |
self._writer.write(request_data) | |
if not self._polling: | |
self._loop.create_task(self._wait_for(future)) | |
return future | |
def notify(self, method, *args): | |
notification_data = self._session.notify(method, args) | |
self._writer.write(notification_data) | |
return self._writer.drain() | |
def reply(self, request_id, result, error=False): | |
response_data = self._session.reply(request_id, result, error) | |
self._writer.write(response_data) | |
return self._writer.drain() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment