Skip to content

Instantly share code, notes, and snippets.

@RoryKelly
Created November 14, 2020 23:40
Show Gist options
  • Save RoryKelly/695e5f61340ac37a461a8d6b905b187a to your computer and use it in GitHub Desktop.
Save RoryKelly/695e5f61340ac37a461a8d6b905b187a to your computer and use it in GitHub Desktop.
FlowRecorder for recording emissions from flows.
/**
* Starts subscribing / collecting the given Flow and records it's emission to verify them later
*/
suspend fun <T> Flow<T>.recordEmissions(
recordingScope: CoroutineScope = GlobalScope,
verify: suspend SharedFlow<T>.() -> Unit
) {
val recording = shareIn(recordingScope, SharingStarted.Eagerly, Int.MAX_VALUE)
verify(recording)
}
suspend fun <T> SharedFlow<T>.verifyAll(vararg emissions: T, emissionTimeoutMilliseconds: Long = 1000L) {
verifyAllList(emissions.toList(), emissionTimeoutMilliseconds)
}
suspend fun <T> SharedFlow<T>.verifyAllList(nextEmissions: List<T>, emissionTimeoutMilliseconds: Long) {
val result = waitForIndex(emissionTimeoutMilliseconds, nextEmissions.size)
if (result == null) {
error(
"\nWaiting for : $nextEmissions \n" +
"Emissions so far: $replayCache"
)
} else {
assertThat(nextEmissions).isEqualTo(replayCache)
assertThat(nextEmissions).hasSameSizeAs(replayCache)
}
}
suspend fun <T> SharedFlow<T>.verityItemAt(emissionTimeoutMilliseconds: Long, index: Int, verifyBlock: (a: T) -> Unit) {
val result = waitForIndex(emissionTimeoutMilliseconds, index + 1)
if (result == null) {
error(
"Waiting for $index but no new emission within " +
"${emissionTimeoutMilliseconds}ms. Emissions so far: $replayCache"
)
} else {
verifyBlock(replayCache[index])
}
}
suspend fun <T> SharedFlow<T>.waitForIndex(
emissionTimeoutMilliseconds: Long,
size: Int
): Unit? {
return withTimeoutOrNull(emissionTimeoutMilliseconds) {
while (replayCache.size < size) {
delay(100)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment