Skip to content

Instantly share code, notes, and snippets.

@mrenouf
Last active April 10, 2024 20:58
Show Gist options
  • Save mrenouf/1803b8b3a95ad2090935785a9bffcaac to your computer and use it in GitHub Desktop.
Save mrenouf/1803b8b3a95ad2090935785a9bffcaac to your computer and use it in GitHub Desktop.
Original Slow version, scales terribly (N^2 with the number of allocations)
package adb.io
import adb.io.ByteBufferSlicePool.Region.Alloc
import adb.io.ByteBufferSlicePool.Region.Free
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.lang.ref.ReferenceQueue
import java.lang.ref.WeakReference
import java.nio.ByteBuffer
class ByteBufferSlicePool(
capacity: Int,
) : ByteBufferAllocator {
private sealed interface Region {
data class Alloc(val start: Int, val size: Int, val ref: WeakReference<ByteBuffer>) : Region
data class Free(val start: Int, val size: Int) : Region
}
private val source: ByteBuffer = ByteBuffer.allocateDirect(capacity)
private val regions = mutableListOf<Region>(Free(0, capacity))
private val refQueue = ReferenceQueue<ByteBuffer>()
fun available() = synchronized(regions) { regions.filterIsInstance<Free>().maxBy { it.size }.size }
fun remaining() = synchronized(regions) { regions.filterIsInstance<Free>().sumOf { it.size } }
init {
MainScope().launch(Dispatchers.IO) {
while (isActive) {
val ref = refQueue.remove()
synchronized(regions) {
val allocIndex = regions.indexOfFirst { it is Alloc && it.ref === ref }
if (allocIndex >= 0) {
// Mop up after any forgotten slices which are about to be finalized.
printLeakWarning(regions[allocIndex] as Alloc)
freeAt(allocIndex)
}
}
ref.clear()
}
}
}
override fun alloc(size: Int): ByteBuffer = synchronized(regions) {
// TODO, pick the most closely sized region instead of the first?
val index = regions.indexOfFirst { it is Free && it.size >= size }
if (index < 0) error("No free regions with at least $size bytes available!")
val freeRegion = regions[index] as Free
// allocate a slice from the head of the free region
val allocSlice = source.slice(freeRegion.start, size)
val allocRegion = Alloc(freeRegion.start, size, WeakReference(allocSlice, refQueue))
if (size == freeRegion.size) {
regions.removeAt(index)
} else {
val remainingFree = freeRegion.copy(
start = freeRegion.start + allocRegion.size,
size = freeRegion.size - allocRegion.size
)
regions[index] = remainingFree
}
regions.add(index, allocRegion)
return allocSlice
}
override fun free(buffer: ByteBuffer): Unit = synchronized(regions) {
freeAt(regions.indexOfFirst { it is Alloc && it.ref.get() === buffer })
}
private fun freeAt(index: Int) = synchronized(regions) {
if (index < 0) error("Buffer to free was not found in an alloc region!")
val allocRegion = regions[index] as Alloc
allocRegion.ref.clear()
// Check for and merge with adjacent Free regions
val freeBefore = if (index > 0) (regions[index - 1] as? Free) else null
val freeAfter = if (index < regions.lastIndex) (regions[index + 1] as? Free) else null
val freeRegion = Free(freeBefore?.start ?: allocRegion.start,
(freeBefore?.size ?: 0) + allocRegion.size + (freeAfter?.size ?: 0))
regions[index] = freeRegion
freeAfter?.also { regions.removeAt(index + 1) }
freeBefore?.also { regions.removeAt(index - 1) }
}
private fun printLeakWarning(alloc: Alloc) {
println("A byte buffer was not freed! (size=${alloc.size})")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment