Skip to content

Instantly share code, notes, and snippets.

@hkolbeck
Last active June 22, 2023 19:44
Show Gist options
  • Save hkolbeck/e1d5fa50c6418c13f53576fb1c049c14 to your computer and use it in GitHub Desktop.
Save hkolbeck/e1d5fa50c6418c13f53576fb1c049c14 to your computer and use it in GitHub Desktop.
A sketch of a system for sharing buffers where they pass through a broadcast and are only returned to the pool once all expected consumers have handled them
package industries.hannah.pixelblaze.sensor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.Closeable
import java.util.*
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
private data class BufWrapper(
val buf: ByteArray,
val remainingConsumers: AtomicInteger,
val generation: ULong,
val generationMembers: Set<UUID>
)
private const val BUFFER_POOL = 10
private const val BUFFER_SIZE = 1024
class SharingIsCaring(
private val handlerScope: CoroutineScope
) : Closeable {
// Protects activeListeners for writes, main listener loops don't interact
private val listenerGuard = Mutex(false)
// Current listener IDs
private val activeListeners = ConcurrentHashMap<UUID, Unit>()
// Wrapper around data about the current generation
private val generationGuard = AtomicReference<Pair<ULong, Set<UUID>>>(Pair(0u, setOf()))
// The pool of buffers being shared
private val bufPool = ArrayBlockingQueue<ByteArray>(BUFFER_POOL)
// The Multithreaded-in multi-consumer-out flow
private val flow = MutableSharedFlow<BufWrapper>()
init {
repeat(BUFFER_POOL) {
bufPool.add(ByteArray(BUFFER_SIZE))
}
}
suspend fun addListener(handler: (ByteArray) -> Unit): UUID {
val id = UUID.randomUUID()
listenerGuard.withLock {
// Add the new listener to a new generation
activeListeners[id] = Unit
val oldGen = generationGuard.get()
val newGen = Pair(oldGen.first + 1u, activeListeners.keys().toList().toSet())
generationGuard.set(newGen)
// Launch the coroutine, flow.collect {} never returns until the job its in is cancelled
handlerScope.launch {
var generation = newGen.first
flow.collect { bufWrapper ->
var shouldDispatch = true
if (generation < bufWrapper.generation) {
if (bufWrapper.generationMembers.contains(id)) {
generation = bufWrapper.generation
} else {
// We're no longer wanted, cancel ourselves
coroutineContext.job.cancel()
shouldDispatch = false
}
}
if (shouldDispatch) {
try {
handler(bufWrapper.buf)
} catch (t: Throwable) {
println("Exception in listener: $id: ${t.message}")
t.printStackTrace()
} finally {
// If we're the last consumer expected, return the buffer to the pool
if (bufWrapper.remainingConsumers.decrementAndGet() == 0) {
bufPool.offer(bufWrapper.buf) //If this fails it'll be GCed
}
}
}
}
}
}
return id
}
suspend fun removeListener(id: UUID) {
listenerGuard.withLock {
val oldGen = generationGuard.get()
activeListeners.remove(id)
val newGen = Pair(oldGen.first + 1u, activeListeners.keys().toList().toSet())
generationGuard.set(newGen)
// We don't want to cancel here, because there could be buffers in the pipe that expect it to be alive.
// We cancel in the listener itself instead as soon as a next-gen wrapper hits it
}
}
// Assume all serialization produces the same data length, equal to the BUFFER_SIZE
suspend fun send(toSend: Any) {
// Try to use the shared buffers, but just alloc if we can't
val buf = bufPool.poll(0, TimeUnit.MILLISECONDS) ?: ByteArray(BUFFER_SIZE)
val generation = generationGuard.get()
/* Do some serialization */
flow.emit(BufWrapper(buf, AtomicInteger(generation.second.size), generation.first, generation.second))
}
suspend fun removeAllListeners() {
listenerGuard.withLock {
activeListeners.clear()
val oldGen = generationGuard.get()
val newGen = Pair<ULong, Set<UUID>>(oldGen.first + 1u, setOf())
generationGuard.set(newGen)
}
}
override fun close() {
runBlocking {
removeAllListeners()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment