Skip to content

Instantly share code, notes, and snippets.

@jstclair
Created November 18, 2020 10:29
Show Gist options
  • Save jstclair/6170bbeb96d6446e83592e44147690ab to your computer and use it in GitHub Desktop.
Save jstclair/6170bbeb96d6446e83592e44147690ab to your computer and use it in GitHub Desktop.
Commented demo for using System.Threading.Channels
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
namespace ChannelsDemo
{
/// <summary>
/// Simple "getting-started" demo of System.Threading.Channels
/// </summary>
/// <remarks>
/// There is no error handling demonstrated here. Typically, you would surround `WaitTo(Read|Write)Async` with `try {} catch (ChannelClosedException) {}`
/// but that would obscure some of the code here.
/// </remarks>
class Program
{
// NOTE: You can adjust these to see how various combinations work
// NOTE: You might want to put some artificial delays in OutputGreeting (via `await Task.Delay(TimeSpan.FromSeconds(2))`
private const int MaxItemsToProduce = 5_000;
private static readonly int MaxReaders = Environment.ProcessorCount; // or, hard-coded value, like 1, 2, or 10
private static readonly int MaxBoundedSize = 2 * MaxReaders; // or, hard-coded value, like 1, 2, or 400
// NOTE: in all these examples, the time is dominated by the slowness of the Console. Try disabling this and comparing elapsed times.
private const bool WriteToConsole = true;
// NOTE: in scenarios where you want to simulate CPU-bound work, you can disable writing to console and set this instead. Calls Thread.SpinWait(n)
private const int CpuSpinTime = 0;
// NOTE: when experimenting with different strategies, this can help you visualize where tasks are in the channel
private const bool UseTracing = false;
/* Additional Note:
* Performance is constrained when running a Debug build and/or in VS. Try building a Release build and running in a separate console.
*
* Examples from my machine using MaxItemsToProduce = 5000 and the MultipleCompetingTransformsAndConsumers example
*
* [WriteToConsole = true]
* Debug/F5: 1_183 items/sec
* Debug/Cmd: 1_350 items/sec
* Release/F5: 1_204 items/sec
* Release/Cmd: 1_371 items/sec
*
* [WriteToConsole = false]
* Debug/F5: 49_341 items/sec
* Debug/Cmd: 204_788 items/sec
* Release/F5: 44_856 items/sec
* Release/Cmd: 204_611 items/sec
*
* NOTE: There is also overhead for such a small number of items. Increasing MaxItemsToProduce to 500_000:
*
* [WriteToConsole = false]
* Release/F5: 845_265 items/sec (MultipleCompetingTransformsAndConsumers)
* Release/F5: 1_638_367 items/sec (Simple)
*/
static async Task Main(string[] args)
{
Console.WriteLine("Welcome to the Channels Demo app");
// NOTE: 2 types of channels - unbounded and bounded. Bounded channels have a limit - writes will block until reads have drained the channel.
// Unbounded channels will never block reads or writes, but assume you have capacity to store all items in memory.
// Our first channel is just a list of ids that we want channel consumers to read, so we can easily hold hundreds of thousands in memory.
// But for observing the flow of processing (i.e., with UseTracing = true), you might want to look at a Bounded channel
var idChannel = Channel.CreateUnbounded<string>();
//var idChannel = Channel.CreateBounded<string>(MaxBoundedSize);
// Our second channel is doing work, and we want to constrain the amount of simultaneous work (both for memory and resource consumption).
// But you can test various scenarios where you use an unbounded channel here as well.
var greetChannel = Channel.CreateBounded<string>(MaxBoundedSize);
//var greetChannel = Channel.CreateUnbounded<string>();
var sw = Stopwatch.StartNew();
// NOTE: because the examples close the channel, you can only run a single example at a time
await Simple(idChannel, greetChannel);
//await MultipleConsumers(idChannel, greetChannel);
//await MultipleTransformsAndConsumers(idChannel, greetChannel);
//var otherChannel = Channel.CreateBounded<string>(MaxBoundedSize);
//await MultipleCompetingTransformsAndConsumers(idChannel, greetChannel, otherChannel);
sw.Stop();
Console.WriteLine($"{MaxItemsToProduce:N0} consumed in {sw.Elapsed.TotalSeconds:F1} seconds [{(MaxItemsToProduce/sw.Elapsed.TotalSeconds):N0} items/second]");
}
// Example task of producing data and writing it to a ChannelWriter
private static async Task ProduceIds(ChannelWriter<string> writer, int max)
{
var ids = Enumerable.Range(1, max);
foreach (var id in ids)
{
Trace($"{nameof(ProduceIds)}: writing id {id}");
// NOTE: this works just fine...
await writer.WriteAsync($"{id}");
// NOTE: This is an optimization for avoiding the await when the channel isn't blocking, or has a large capacity
// In the current case, since we have an unbounded writer, we would never need to await.
//if (writer.TryWrite($"{id}") == false)
//{
// await writer.WriteAsync($"{id}");
//}
// NOTE: you can signal an error by calling `Complete(Exception)` - this will be an orderly shutdown.
//if (id == 1337)
//{
// Trace("Closing the channel for 1337");
// var e = new Exception($"You should never get a 1337");
// writer.Complete(e);
// // In the simple `writer.WriteAsync` above, if you `Complete` the channel, you will attempt to write an additional item to it.
//}
}
Trace($"{nameof(ProduceIds)}: completed writing");
// when we are done, signal the channel that no more data is coming
writer.Complete();
}
// Example task of reading data from one channel (ChannelReader<T>) and writing it to another channel (ChannelWriter<T>)
private static async Task TransformIdAndPublishGreeting(ChannelReader<string> reader,
ChannelWriter<string> writer, string greeting, bool autoClose = false)
{
while (await reader.WaitToReadAsync())
{
Trace($"{nameof(TransformIdAndPublishGreeting)}: entered WaitToReadAsync");
// NOTE: this will also work just fine..
while (reader.TryRead(out var id))
{
await writer.WriteAsync(FormatGreeting(greeting, id));
}
// NOTE: but again, like above, we can avoid the unnecessary writing await as long as possible
//while (reader.TryRead(out var id))
//{
// var msg = FormatGreeting(greeting, id);
// if (writer.TryWrite(msg) == false)
// {
// Trace($"{nameof(TransformIdAndPublishGreeting)}: TryWrite: had to take slow path");
// await writer.WriteAsync(msg);
// }
//}
Trace($"{nameof(TransformIdAndPublishGreeting)}: exited WaitToReadAsync");
}
// NOTE: if we have a single instance of this task, we can close the writer when we're done;
// but if we have multiple instances of this task, that would fail (because `Complete()` can only
// be called once on a channel.
if (autoClose)
{
Trace($"{nameof(TransformIdAndPublishGreeting)}: completing writer");
writer.Complete();
}
}
private static string FormatGreeting(string greeting, string id) => $"{greeting} {id}";
// Example task of reading from a channel
private static async Task OutputGreeting(ChannelReader<string> reader, string optionalPrefix = "")
{
while (await reader.WaitToReadAsync())
{
Trace($"{nameof(OutputGreeting)}{optionalPrefix}: entered WaitToReadAsync");
while (reader.TryRead(out var greeting))
{
if (WriteToConsole)
Console.WriteLine($"{optionalPrefix}{greeting}");
// NOTE: this is here to heat up your CPUs
else if (CpuSpinTime >= 0)
Thread.SpinWait(CpuSpinTime);
}
Trace($"{nameof(OutputGreeting)}{optionalPrefix}: exited WaitToReadAsync");
}
}
/* NOTE: Simple example
* In this example, we are using a single task for each process - this is closest to a traditional pipe architecture, with the advantage that
* we natively handle async actions.
*/
private static async Task Simple(Channel<string> idChannel, Channel<string> greetChannel)
{
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce));
var transformTask = Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, "Hello", autoClose: true));
var greetTask = Task.Run(() => OutputGreeting(greetChannel.Reader));
// NOTE: we await each task - in addition, we want to await any reader channels. Only readers have a `Task Completion`; writers have a `.Complete()` method.
await Task.WhenAll(idChannel.Reader.Completion, produceTask, transformTask, greetChannel.Reader.Completion, greetTask);
}
/* NOTE: Multiple Consumers example
* In this example, we are using a single task for producing and transforming, but multiple greeting processes. Since the greeting only reads,
* there's minimal changes required.
*/
private static async Task MultipleConsumers(Channel<string> idChannel, Channel<string> greetChannel)
{
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce));
var transformTask = Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, "Hello", autoClose: true));
var greetTasks = Enumerable.Range(0, MaxReaders)
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"{i}")))
.ToArray(); // NOTE: *Very* important that we materialize the running tasks, since `.Select` is lazy
await Task.WhenAll(new[] { idChannel.Reader.Completion, produceTask, transformTask, greetChannel.Reader.Completion }.Concat(greetTasks));
}
/* NOTE: Multiple Transforms And Consumers example
* In this example, we are using a single task for producing, but multiple transform and greeting processes. Since the transform task was
* responsible for signalling that it was done writing in the single-task scenario, we now need to handle this explicitly.
*/
private static async Task MultipleTransformsAndConsumers(Channel<string> idChannel, Channel<string> greetChannel)
{
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce));
var transformTasks = Enumerable.Range(0, MaxReaders * 2)
.Select(i => Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, $"'{i}' Hello", autoClose: false)))
.ToArray();
var greetTasks = Enumerable.Range(0, MaxReaders * 2)
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"{i}")))
.ToArray();
// NOTE: once the transform tasks have completed writing to their channel, we can signal the greet tasks that no more data is coming.
await Task.WhenAll(new [] {idChannel.Reader.Completion, produceTask }.Concat(transformTasks));
greetChannel.Writer.Complete();
await Task.WhenAll(new[] { greetChannel.Reader.Completion }.Concat(greetTasks));
}
/* NOTE: Multiple Competing Transforms And Consumers example
* In this example, we are using a single task for producing, but multiple transform and greeting processes. In addition, the transform task
* is now writing to multiple channels. In real code, this might be producing a read model and updating a search index. Like the MultipleTransformsAndConsumers
* example, we are responsible for explicitly signalling that the transform tasks are completed writing.
*/
private static async Task MultipleCompetingTransformsAndConsumers(Channel<string> idChannel, Channel<string> greetChannel, Channel<string> otherChannel)
{
// NOTE: in this example, tweaking the running tasks is important for throughput. In my case, I have 12 processors.
// If I leave these all set to `MaxReaders`, the best result is:
// 205_000 items/sec
// If I adjust them to MaxReaders/3 (so that consumers roughly equal CPUs), the best result is:
// 218_702 items/sec
// NOTE: this will also depend on the work each of your tasks is doing. If some are slow, you may wish to create more of them...
// NOTE: the produceTask will also burn a CPU while it's working (for large # of items), so you may wish to calculate based on MaxReaders - 1
var produceTask = Task.Run(() => ProduceIds(idChannel.Writer, MaxItemsToProduce));
var readersPerTask = MaxReaders; // MaxReaders / 3;
var transformTasks = Enumerable.Range(0, readersPerTask)
.Select(i => Task.Run(() => TransformIdAndPublishGreeting(idChannel.Reader, greetChannel.Writer, otherChannel.Writer, $"'{i}' Hello")))
.ToArray();
// NOTE: We want to have multiple consumers, but we can use the same underlying function; we will use the optional prefix to help
// distinguish between the two tasks in the console. Each set of greet tasks reads from it's own channel (so slowing down one will only
// affect those tasks. If you want to see this, you can add a `if (optionalPrefix.StartsWith("OTHER") Thread.SpinWait(CpuSpinTime);` in
// the `OutputGreeting` method.
var greetTasks = Enumerable.Range(0, readersPerTask)
.Select(i => Task.Run(() => OutputGreeting(greetChannel.Reader, $"GREET [{i:00}] ")))
.ToArray();
var otherTasks = Enumerable.Range(0, readersPerTask)
.Select(i => Task.Run(() => OutputGreeting(otherChannel.Reader, $"OTHER: [{i:00}] ")))
.ToArray();
// NOTE: like in MultipleTransformsAndConsumers, we signal consumers that there's no more data to come. However, in this case, we need to
// signal to both channels.
await Task.WhenAll(new[] { idChannel.Reader.Completion, produceTask }.Concat(transformTasks));
greetChannel.Writer.Complete();
otherChannel.Writer.Complete();
await Task.WhenAll(new[] { greetChannel.Reader.Completion, otherChannel.Reader.Completion }.Concat(greetTasks).Concat(otherTasks));
}
// NOTE: Overload of TransformIdAndPublishGreeting that takes a multiple channel writers (for the MultipleCompetingTransformsAndConsumers example)
private static async Task TransformIdAndPublishGreeting(ChannelReader<string> reader,
ChannelWriter<string> writer, ChannelWriter<string> otherWriter, string greeting)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var id))
{
await writer.WriteAsync($"{greeting}, {id}");
await otherWriter.WriteAsync($"{greeting}, {id}");
}
}
}
private static void Trace(string msg)
{
if (UseTracing) Console.WriteLine($" {msg}");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment