Last active
October 6, 2023 10:22
-
-
Save antont/237c23b3edf35f1a9861727c52cfb3fe to your computer and use it in GitHub Desktop.
Python thread to run async tasks in the background, in a single eventloop & thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import asyncio | |
from queue import Queue | |
import time | |
from typing import Awaitable | |
import flask | |
from firebase_functions.private import serving | |
concurrent_handler_id = 0 | |
runner = None #set in init() | |
""" | |
NOTE: see this for hints maybe: | |
https://www.googlecloudcommunity.com/gc/Serverless/The-issue-with-pythons-s-threading-on-Google-Function/m-p/610388 | |
""" | |
def run_once(loop: asyncio.AbstractEventLoop): | |
loop.call_later(0.1, loop.stop) #runs for 0.1s, before we check for new jobs | |
#this is more efficient for the tasks, yet responsive enough to not spawn more instances | |
#loop.call_soon(loop.stop) | |
loop.run_forever() | |
class TaskRunnerThread(threading.Thread): | |
def __init__(self, in_flask=True): | |
super().__init__() | |
print("[TaskRunnerThread] init") | |
self.loop = asyncio.new_event_loop() | |
self.queue = Queue() | |
self.tasks: set[Awaitable] = set() | |
self.in_flask = in_flask | |
self.did_jobs = False | |
def run(self): | |
print("[TaskRunnerThread] run") | |
# if self.in_flask: | |
# print("Flask:", flask.current_app) | |
# flask.current_app.teardown_request_funcs.setdefault(None, []).append(self.finish) | |
while not self.did_jobs or len(self.tasks) > 0: | |
if self.queue.qsize() > 0: | |
job = self.queue.get(block=True) | |
print("[TaskRunnerThread] Creating task for", job) | |
task = self.loop.create_task(job) | |
task.add_done_callback(self.tasks.remove) | |
self.tasks.add(task) | |
self.did_jobs = True | |
run_once(self.loop) | |
print("[TaskRunnerThread] run Is DONE!") | |
stop(self) | |
def add_job(self, job): | |
print("[TaskRunnerThread] add_job", job) | |
self.queue.put(job) | |
#TODO: this would require calling via queue to run in correct thread | |
# def finish(self): | |
# print("[TaskRunnerThread] finish") | |
# for task in self.tasks: | |
# self.loop.run_until_complete(task) | |
def init(): | |
global runner | |
runner = TaskRunnerThread() | |
serving.runner = runner | |
runner.start() | |
def stop(done_runner): | |
global runner | |
assert done_runner is runner | |
runner = None | |
serving.runner = None | |
async def jobtest(name): | |
for i in range(10): | |
print(f"{name} @{i}") | |
await asyncio.sleep(1) | |
if __name__ == '__main__': | |
runner = TaskRunnerThread(False) #testing outside Flask | |
runner.start() | |
runner.add_job(jobtest('A')) | |
time.sleep(2) | |
runner.add_job(jobtest('B')) | |
time.sleep(5) | |
#runner.finish() | |
runner.join() | |
"""whoa this works :o | |
i functions: Loaded environment variables from .env.***** | |
> [TaskRunnerThread] init | |
> [TaskRunnerThread] run | |
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b189640> | |
i functions: Beginning execution of "us-central1-test_concurrent" | |
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b189640> | |
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b18aa40> | |
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b18aa40> | |
i functions: Finished "us-central1-test_concurrent" in 2.017917ms | |
i functions: Beginning execution of "us-central1-test_concurrent" | |
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b18b040> | |
i functions: Finished "us-central1-test_concurrent" in 2.567125ms | |
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b18b040> | |
> [do_concurrent] 1: sleeping at 0 in 22728 | |
> [do_concurrent] 2: sleeping at 0 in 22728 | |
> [do_concurrent] 3: sleeping at 0 in 22728 | |
> [do_concurrent] 1: sleeping at 1 in 22728 | |
> [do_concurrent] 2: sleeping at 1 in 22728 | |
i functions: Beginning execution of "us-central1-test_concurrent" | |
> [TaskRunnerThread] add_job <coroutine object do_concurrent at 0x10b189f40> | |
i functions: Finished "us-central1-test_concurrent" in 1.852792ms | |
> [TaskRunnerThread] Creating task for <coroutine object do_concurrent at 0x10b189f40> | |
> [do_concurrent] 3: sleeping at 1 in 22728 | |
> [do_concurrent] 1: sleeping at 2 in 22728 | |
> [do_concurrent] 2: sleeping at 2 in 22728 | |
> [do_concurrent] 4: sleeping at 0 in 22728 | |
> [do_concurrent] 3: sleeping at 2 in 22728 | |
> [do_concurrent] 1: sleeping at 3 in 22728 | |
> [do_concurrent] 2: sleeping at 3 in 22728 | |
> [do_concurrent] 4: sleeping at 1 in 22728 | |
> [do_concurrent] 3: sleeping at 3 in 22728 | |
> [do_concurrent] 1: sleeping at 4 in 22728 | |
> [do_concurrent] 2: sleeping at 4 in 22728 | |
> [do_concurrent] 4: sleeping at 2 in 22728 | |
> [do_concurrent] 3: sleeping at 4 in 22728 | |
> [do_concurrent] 1: sleeping at 5 in 22728 | |
> [do_concurrent] 2: sleeping at 5 in 22728 | |
> [do_concurrent] 4: sleeping at 3 in 22728 | |
> [do_concurrent] 3: sleeping at 5 in 22728 | |
> [do_concurrent] 1: sleeping at 6 in 22728 | |
> [do_concurrent] 2: sleeping at 6 in 22728 | |
> [do_concurrent] 4: sleeping at 4 in 22728 | |
> [do_concurrent] 3: sleeping at 6 in 22728 | |
> [do_concurrent] 1: sleeping at 7 in 22728 | |
> [do_concurrent] 2: sleeping at 7 in 22728 | |
> [do_concurrent] 4: sleeping at 5 in 22728 | |
> [do_concurrent] 3: sleeping at 7 in 22728 | |
> [do_concurrent] 1: sleeping at 8 in 22728 | |
> [do_concurrent] 2: sleeping at 8 in 22728 | |
> [do_concurrent] 4: sleeping at 6 in 22728 | |
> [do_concurrent] 3: sleeping at 8 in 22728 | |
> [do_concurrent] 1: sleeping at 9 in 22728 | |
> [do_concurrent] 2: sleeping at 9 in 22728 | |
> [do_concurrent] 4: sleeping at 7 in 22728 | |
> [do_concurrent] 3: sleeping at 9 in 22728 | |
> [do_concurrent] 4: sleeping at 8 in 22728 | |
> [do_concurrent] 4: sleeping at 9 in 22728 | |
> [TaskRunnerThread] run Is DONE! | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment