Skip to content

Instantly share code, notes, and snippets.

@Zimins
Last active June 19, 2021 07:55
Show Gist options
  • Save Zimins/5d843771dfc2a05fb75e4aff82027c1e to your computer and use it in GitHub Desktop.
Save Zimins/5d843771dfc2a05fb75e4aff82027c1e to your computer and use it in GitHub Desktop.
Part of StateFlow.kt(coroutines)
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// always check for cancellation
collectorJob?.ensureActive()
// Conflate value emissions using equality
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
@Zimins
Copy link
Author

Zimins commented Jun 19, 2021

Difference between flow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment