Created
March 8, 2021 18:30
-
-
Save samdmarshall/f41ce155423776a2dfb44204d50958f4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ======= | |
# Imports | |
# ======= | |
import json | |
import strformat | |
import asyncdispatch | |
import threadproxy | |
# ===== | |
# Types | |
# ===== | |
type | |
Queue = object | |
name: string | |
token: ThreadToken | |
thread: Thread[QueueTask] | |
QueueTask = object | |
token: ThreadToken | |
QueueManager = object | |
main: MainThreadProxy | |
QueueMain = proc (task: QueueTask) {.thread, nimcall.} | |
# ========= | |
# Functions | |
# ========= | |
proc initQueueManager(name: string = "main"): QueueManager = | |
let proxy = newMainThreadProxy(name) | |
asyncCheck proxy.poll() | |
proxy.onData "new": | |
echo "action: new" | |
return newJNull() | |
proxy.onData "stop": | |
proxy.stop() | |
result = QueueManager(main: proxy) | |
proc createQueue(ctx: QueueManager, name: string, main: QueueMain): Queue = | |
let proxy = ctx.main | |
echo "creating queue!" | |
let token = proxy.createToken(name) | |
var worker: Thread[QueueTask] | |
createThread(worker, main, QueueTask(token: token)) | |
let init_future = waitFor proxy.ask(name, "init", newJNull()) | |
result = Queue(name: name, token: token, thread: worker) | |
proc dispatchAsync(ctx: QueueManager, queue: Queue, action: string, data: JsonNode): Future[JsonNode] = | |
let proxy = ctx.main | |
echo fmt"pushing ({action}) with '{data}' to thread: {queue.name}" | |
result = proxy.ask(queue.name, action, data) | |
# ======= | |
# Threads | |
# ======= | |
proc workerMain(task: QueueTask) {.thread.} = | |
let proxy = newThreadProxy(task.token) | |
# register action handler | |
proxy.onData "multiply": | |
let x = data["value"].getInt() | |
echo fmt"start processing task: {x}" | |
return %*{"value": x + 20} | |
# start processing channel | |
waitFor proxy.poll() | |
proc response(f: Future[JsonNode]) = | |
echo "future callback!" | |
let data = f.read() | |
let value = data["value"].getInt() | |
echo fmt"{value - 20} -> {value}" | |
echo fmt"finish processing task: {value - 20}" | |
# =========== | |
# Entry Point | |
# =========== | |
proc main() = | |
var ctx = initQueueManager() | |
var queue = ctx.createQueue("thread_1", workerMain) | |
let index = 8 | |
# for index in 0..10: | |
let data = %*{"value": index} | |
echo "dispatching task: " & $data | |
let answer = ctx.dispatchAsync(queue, "multiply", data) | |
answer.addCallback response | |
echo "done dispatching" | |
runForever() | |
when isMainModule: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment