Last active
May 24, 2021 08:52
-
-
Save hrach/a552727f98e3eb7a241632a05c4ca9cb to your computer and use it in GitHub Desktop.
Kotlin Channels Debounce & Throttle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.* | |
import kotlinx.coroutines.experimental.channels.ReceiveChannel | |
import kotlinx.coroutines.experimental.channels.consumeEach | |
import kotlinx.coroutines.experimental.channels.produce | |
import kotlin.coroutines.experimental.CoroutineContext | |
fun <E> ReceiveChannel<E>.debounce( | |
wait: Long = 50, | |
context: CoroutineContext = DefaultDispatcher | |
): ReceiveChannel<E> = produce(context) { | |
var lastTimeout: Job? = null | |
consumeEach { | |
lastTimeout?.cancel() | |
lastTimeout = launch { | |
delay(wait) | |
send(it) | |
} | |
} | |
lastTimeout?.join() | |
} | |
fun <E> ReceiveChannel<E>.throttle( | |
wait: Long = 200, | |
context: CoroutineContext = DefaultDispatcher | |
): ReceiveChannel<E> = produce(context) { | |
var nextTime = 0L | |
consumeEach { | |
val curTime = System.currentTimeMillis() | |
if (curTime >= nextTime) { | |
nextTime = curTime + wait | |
send(it) | |
} | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.* | |
import kotlinx.coroutines.experimental.channels.ReceiveChannel | |
import kotlinx.coroutines.experimental.channels.consumeEach | |
import kotlinx.coroutines.experimental.channels.produce | |
import kotlin.coroutines.experimental.CoroutineContext | |
fun main(args: Array<String>) = runBlocking { | |
val channel = produce<Int> { | |
(0..100).forEach { | |
send(it) | |
delay(20) | |
} | |
} | |
channel.throttle().consumeEach { println(it) } | |
} |
System.currentTimeMillis()
is not reliable, could be modified by user at any time, SystemClock.uptimeMillis()
is much better.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
updated to work with new Kotlin versions where experimental package is removed and migrated to kotlinx
` import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlin.coroutines.CoroutineContext
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
suspend fun ReceiveChannel.debounce(
wait: Long = 50,
context: CoroutineContext = Dispatchers.Default
): ReceiveChannel {
return withContext(context) {
produce {
var lastTimeout: Job? = null
consumeEach {
lastTimeout?.cancel()
lastTimeout = launch {
delay(wait)
send(it)
}
}
lastTimeout?.join()
}
}
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
suspend fun ReceiveChannel.throttle(
wait: Long = 200,
context: CoroutineContext = Dispatchers.Default
): ReceiveChannel {
return withContext(context){
produce {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime >= nextTime) {
nextTime = curTime + wait
send(it)
}
}
}
}
} `