Created
June 28, 2023 19:43
-
-
Save oeway/4073b408ebf79c8cdde79e43835e64f6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
from imjoy_rpc.hypha import connect_to_server | |
import time | |
class BackgroundServer: | |
def __init__(self, server_url): | |
self.loop = None | |
self.thread = None | |
self.server = None | |
self.executor = ThreadPoolExecutor(max_workers=1) | |
self.server_url = server_url | |
async def start_server(self): | |
self.server = await connect_to_server({"server_url": self.server_url}) | |
def start_loop(self): | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
self.loop = asyncio.get_event_loop() | |
self.loop.run_forever() | |
def run(self): | |
if not self.loop: | |
self.thread = threading.Thread(target=self.start_loop, daemon=True) | |
self.thread.start() | |
while not self.loop or not self.loop.is_running(): | |
pass # Wait until loop is running | |
future = asyncio.run_coroutine_threadsafe(self.start_server(), self.loop) | |
future.result() # Wait for the server to start | |
def register_service(self, service): | |
assert 'id' in service, "Each service must have an 'id'" | |
assert self.server, "Server has not been started yet" | |
# wrap functions in service with executor | |
for k, v in service.items(): | |
if callable(v): | |
async def wrapper(*args, **kwargs): | |
result_future = self.loop.create_future() | |
def run_and_set_result(): | |
try: | |
result = v(*args, **kwargs) | |
self.loop.call_soon_threadsafe(result_future.set_result, result) | |
except Exception as e: | |
self.loop.call_soon_threadsafe(result_future.set_exception, e) | |
self.executor.submit(run_and_set_result) | |
return await result_future | |
service[k] = wrapper | |
async def register_service_async(): | |
await self.server.register_service(service) | |
print("==============>Service registered: ", service) | |
print(f"Services registered at workspace: {self.server.config.workspace}") | |
print(f"Test them with the HTTP proxy: {self.server.config.public_base_url}/{self.server.config.workspace}/services/hello-world/hello?name=World") | |
asyncio.run_coroutine_threadsafe(register_service_async(), self.loop) | |
if __name__ == "__main__": | |
server_url = "https://ai.imjoy.io" | |
bg_server = BackgroundServer(server_url) | |
bg_server.run() | |
def hello(name): | |
print("Hello " + name) | |
# print the current thread id, check if it's the mainthread | |
print("Current thread id: ", threading.get_ident(), threading.current_thread()) | |
time.sleep(5) | |
return "Hello " + name | |
bg_server.register_service({ | |
"name": "Hello World", | |
"id": "hello-world", | |
"config": { | |
"visibility": "public", | |
"run_in_executor": True, | |
}, | |
"hello": hello | |
}) | |
while True: | |
print('.', end='', flush=True) | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment