Skip to content

Instantly share code, notes, and snippets.

@wixb50
Created January 25, 2018 02:13
Show Gist options
  • Save wixb50/6a95a255f0f7aca32cbd718124a46ba7 to your computer and use it in GitHub Desktop.
Save wixb50/6a95a255f0f7aca32cbd718124a46ba7 to your computer and use it in GitHub Desktop.
zerorpc thread friendly client.
# -*- coding: utf-8 -*-
# @Author: xiewenqian <int>
# @Date: 2016-09-12T09:13:01+08:00
# @Email: wixb50@gmail.com
# @Last modified by: int
# @Last modified time: 2016-09-22T10:24:14+08:00
import os
import zmq
import logging
from zerorpc.events import Events
from zerorpc.context import Context
from zerorpc.exceptions import TimeoutExpired, RemoteError
from redis.connection import BlockingConnectionPool
ZRPC_HOST_SOCK = 'tcp://120.0.0.1:8080'
import msgpack_numpy as m
m.patch()
logger = logging.getLogger(__name__)
class RContext(Context):
_instance = None
def __init__(self):
super(RContext, self).__init__()
@staticmethod
def get_instance():
if RContext._instance is None:
RContext._instance = RContext()
return RContext._instance
def socket(self, socket_type):
if self.closed:
raise zmq.ZMQError(zmq.ENOTSUP)
return zmq.Socket(self, socket_type)
class Connection(Events):
def __init__(self, connect_to, heartbeat=30):
super(Connection, self).__init__(zmq.REQ, RContext.get_instance())
self._heartbeat = heartbeat
self.connect_to = connect_to
self.connect(connect_to)
self.pid = os.getpid()
def _process_answer(self, context, req_event, rep_event,
handle_remote_error):
if rep_event.name == u'ERR':
exception = handle_remote_error(rep_event)
context.hook_client_after_request(req_event, rep_event, exception)
raise exception
context.hook_client_after_request(req_event, rep_event)
return rep_event.args[0]
def _handle_remote_error(self, event):
exception = self._context.hook_client_handle_remote_error(event)
if not exception:
if event.header.get(u'v', 1) >= 2:
(name, msg, traceback) = event.args
exception = RemoteError(name, msg, traceback)
else:
(msg,) = event.args
exception = RemoteError('RemoteError', msg, None)
return exception
def process_response(self, request_event, timeout):
def raise_error(ex):
self._context.hook_client_after_request(request_event, None, ex)
raise ex
try:
reply_event = self.recv(timeout=timeout)
except TimeoutExpired:
raise_error(TimeoutExpired(timeout,
'calling remote method {0}'.format(request_event.name)))
return self._process_answer(self._context, request_event,
reply_event, self._handle_remote_error)
def __call__(self, method, *args, **kargs):
if isinstance(method, bytes):
method = method.decode('utf-8')
timeout = kargs.get('timeout', self._heartbeat)
request_event = self.new_event(method, args)
self.emit_event(request_event)
return self.process_response(request_event, timeout)
def disconnect(self, resolve=True):
r = []
for endpoint_ in self._resolve_endpoint(self.connect_to, resolve):
r.append(self._socket.disconnect(endpoint_))
logger.debug('disconnected from %s (status=%s)', endpoint_, r[-1])
return r
class RPClient(object):
def __init__(self, connect_to, heartbeat=30):
self.connection_pool = BlockingConnectionPool(
connection_class=Connection,
timeout=heartbeat,
connect_to=connect_to,
heartbeat=heartbeat
)
def close(self):
self.connection_pool.disconnect()
def __getattr__(self, name):
return lambda *args, **kwargs: self(name, *args, **kwargs)
def __call__(self, name, *args, **kwargs):
connection = self.connection_pool.get_connection('')
try:
return connection(name, *args, **kwargs)
finally:
self.connection_pool.release(connection)
class RPCSock(RPClient):
def __init__(self, heartbeat=60):
super(RPCSock, self).__init__(ZRPC_HOST_SOCK, heartbeat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment