Last active
June 19, 2021 07:55
-
-
Save Zimins/5d843771dfc2a05fb75e4aff82027c1e to your computer and use it in GitHub Desktop.
Part of StateFlow.kt(coroutines)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override suspend fun collect(collector: FlowCollector<T>) { | |
val slot = allocateSlot() | |
try { | |
if (collector is SubscribedFlowCollector) collector.onSubscription() | |
val collectorJob = currentCoroutineContext()[Job] | |
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet) | |
// The loop is arranged so that it starts delivering current value without waiting first | |
while (true) { | |
// Here the coroutine could have waited for a while to be dispatched, | |
// so we use the most recent state here to ensure the best possible conflation of stale values | |
val newState = _state.value | |
// always check for cancellation | |
collectorJob?.ensureActive() | |
// Conflate value emissions using equality | |
if (oldState == null || oldState != newState) { | |
collector.emit(NULL.unbox(newState)) | |
oldState = newState | |
} | |
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot | |
if (!slot.takePending()) { // try fast-path without suspending first | |
slot.awaitPending() // only suspend for new values when needed | |
} | |
} | |
} finally { | |
freeSlot(slot) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Difference between flow