Skip to content

Instantly share code, notes, and snippets.

@sonya75
Created August 2, 2017 03:22
Show Gist options
  • Save sonya75/cdd8e8841ae682ee862b9b79d94bf0f7 to your computer and use it in GitHub Desktop.
Save sonya75/cdd8e8841ae682ee862b9b79d94bf0f7 to your computer and use it in GitHub Desktop.
from multiprocessing import Pipe
from threading import Thread, Lock
import _thread
from threadedqueue import ThreadedQueue
class MPObjectWrapper:
def __init__(self, baseobj):
self.baseobject = baseobj
def get_pipe(self):
u, v = Pipe()
def f():
while True:
try:
(threadid, (cmd, args)) = v.recv()
except:
return
try:
res = getattr(self.baseobject, cmd)(*args)
except Exception as e:
res = e
try:
v.send((threadid, res))
except:
return
Thread(target=f).start()
return u
class MPEndpointHandler:
def __init__(self, mainpipe):
self.mainpipe = mainpipe
self.resultqueue = ThreadedQueue()
self.sendlock = Lock()
self.recvlock = Lock()
self.globallock = Lock()
def __getattr__(self, item):
self.globallock.acquire()
if hasattr(self, item):
self.globallock.release()
return getattr(self, item)
def f(s, *args):
id = _thread.get_ident()
with s.sendlock:
s.mainpipe.send((id, (item, args)))
with s.recvlock:
v, w = s.mainpipe.recv()
if v == id:
return w
else:
self.resultqueue.put(w, v)
l = self.resultqueue.get()
if isinstance(l, Exception):
raise l
return l
setattr(self, item, f)
self.globallock.release()
return f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment