Skip to content

Instantly share code, notes, and snippets.

@liberaid2
Created March 24, 2019 16:32
Show Gist options
  • Save liberaid2/2e22b6c71bc7f6ac321a5a8fb55a2618 to your computer and use it in GitHub Desktop.
Save liberaid2/2e22b6c71bc7f6ac321a5a8fb55a2618 to your computer and use it in GitHub Desktop.
Coroutines Pool for task handling
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select
class TaskPool<T, R> (private val taskChannel: ReceiveChannel<T>, private val taskHandler: suspend CoroutineScope.(T) -> R, poolCapacity: Int = 5) {
private val resultChannel = Channel<R>(poolCapacity)
val results: ReceiveChannel<R>
get() = resultChannel
private var tasksRun = CustomAtomic(false)
private val workers = List(poolCapacity) {
val ack = CompletableDeferred<Boolean>()
GlobalScope.actor<T> {
for (task in this) {
resultChannel.send(taskHandler(task))
}
ack.complete(true)
} to ack
}
fun runTasks() = GlobalScope.launch {
if(tasksRun.getValue()){
throw RuntimeException("Tasks are already running")
}
tasksRun.setValue(true)
for (task in taskChannel) {
select<Unit> {
workers.forEach { (worker, _) ->
worker.onSend(task) {}
}
}
}
workers.forEach { (worker, ack) ->
worker.close()
ack.await()
}
resultChannel.close()
}
}
@liberaid2
Copy link
Author

Usage example

/* Hard work imitation */
val handler: suspend CoroutineScope.(Int) -> Int = {
    delay(250)
    it * 2
}

runBlocking {
    val taskPool = TaskPool(produce { repeat(100) { send(it) } }, handler)
    taskPool.runTasks()

    /* The result is array or doubled integers */
    val result = taskPool.results.toList().sorted()
    println(result)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment