Created
April 11, 2024 07:31
-
-
Save crozone/4eb5096fb2b67490e3481b4fcd796e6f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Runtime.CompilerServices; | |
using System.Threading; | |
using System.Threading.Channels; | |
public static class ChannelReaderExtensions | |
{ | |
/// <summary> | |
/// Creates an <see cref="IAsyncEnumerable{T}"/> that enables reading all of the data from the channel | |
/// by peeking each value. Each item is only read from the channel upon the next iteration, keeping the item | |
/// in the channel until it has been processed. | |
/// </summary> | |
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to use to cancel the enumeration.</param> | |
/// <remarks> | |
/// This method should only be used in single reader scenarios. | |
/// Multiple concurrent readers can cause incorrect behavior as the peeked value may differ from the read value. | |
/// </remarks> | |
/// <returns>The created async enumerable.</returns> | |
public static async IAsyncEnumerable<T> ReadAllWithPeekAsync<T>( | |
this ChannelReader<T> reader, | |
[EnumeratorCancellation] CancellationToken cancellationToken = default | |
) where T : class | |
{ | |
if (!reader.CanPeek) throw new InvalidOperationException($"This {nameof(ChannelReader<T>)} does not support peeking"); | |
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) | |
{ | |
while (reader.TryPeek(out T? peekedItem)) | |
{ | |
bool error; | |
try | |
{ | |
yield return peekedItem; | |
} | |
finally | |
{ | |
if (reader.TryRead(out T? readItem) && readItem == peekedItem) | |
{ | |
error = false; | |
} | |
else | |
{ | |
// We're not supposed to throw from within a finally clause, so instead just set an error flag. | |
// If we are able to execute past the end of the finally clause, throw the exception then. | |
error = true; | |
} | |
} | |
if (error) | |
{ | |
throw new InvalidOperationException("Unexpected value was read from the channel reader"); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment