Created
March 8, 2021 18:26
-
-
Save samdmarshall/8c2879196e45a57384f7a54bffa70c28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ======= | |
# Imports | |
# ======= | |
import json | |
import strformat | |
import asyncdispatch | |
import threadproxy | |
# ===== | |
# Types | |
# ===== | |
type | |
Queue = object | |
name: string | |
token: ThreadToken | |
thread: Thread[QueueTask] | |
QueueTask = object | |
token: ThreadToken | |
QueueManager = object | |
main: MainThreadProxy | |
QueueMain = proc (task: QueueTask) {.thread, nimcall.} | |
# ========= | |
# Functions | |
# ========= | |
proc initQueueManager(name: string = "main"): QueueManager = | |
let proxy = newMainThreadProxy(name) | |
asyncCheck proxy.poll() | |
proxy.onData "new": | |
echo "action: new" | |
return newJNull() | |
proxy.onData "stop": | |
proxy.stop() | |
result = QueueManager(main: proxy) | |
proc createQueue(ctx: QueueManager, name: string, main: QueueMain): Queue = | |
let proxy = ctx.main | |
echo "creating queue!" | |
let token = proxy.createToken(name) | |
var worker: Thread[QueueTask] | |
createThread(worker, main, QueueTask(token: token)) | |
result = Queue(name: name, token: token, thread: worker) | |
proc dispatchAsync(ctx: QueueManager, queue: Queue, action: string, data: JsonNode): Future[JsonNode] = | |
let proxy = ctx.main | |
echo fmt"pushing ({action}) with '{data}' to thread: {queue.name}" | |
result = proxy.ask(queue.name, action, data) | |
# ======= | |
# Threads | |
# ======= | |
proc workerMain(task: QueueTask) {.thread.} = | |
let proxy = newThreadProxy(task.token) | |
# register action handler | |
proxy.onData "multiply": | |
let x = data["value"].getInt() | |
echo fmt"start processing task: {x}" | |
return %*{"value": x + 20} | |
# start processing channel | |
waitFor proxy.poll() | |
proc response(f: Future[JsonNode]) = | |
echo "future callback!" | |
let data = f.read() | |
let value = data["value"].getInt() | |
echo fmt"{value - 20} -> {value}" | |
echo fmt"finish processing task: {value - 20}" | |
# =========== | |
# Entry Point | |
# =========== | |
proc main() = | |
var ctx = initQueueManager() | |
var queue = ctx.createQueue("thread_1", workerMain) | |
let index = 8 | |
# for index in 0..10: | |
let data = %*{"value": index} | |
echo "dispatching task: " & $data | |
let answer = ctx.dispatchAsync(queue, "multiply", data) | |
answer.addCallback response | |
echo "done dispatching" | |
runForever() | |
when isMainModule: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment