Last active
March 18, 2023 06:37
-
-
Save paulbatum/2037b6d13ab53bc36e2a87eb9ae91772 to your computer and use it in GitHub Desktop.
Channel Factories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Diagnostics; | |
using System.Threading.Channels; | |
static async Task MinerLoop(ChannelWriter<IronOre> destination, int total, ProductionMetrics metrics) | |
{ | |
while (total > 0 && await destination.WaitToWriteAsync()) | |
{ | |
while (total > 0 && destination.TryWrite(new IronOre())) | |
{ | |
total -= 1; | |
Interlocked.Increment(ref metrics.IronOreProduced); | |
Debug.WriteLine("Produced 1 iron ore."); | |
} | |
} | |
destination.Complete(); | |
} | |
static async Task InserterLoop<T>(ChannelReader<T> source, ChannelWriter<T> destination, string iterationMessage) | |
{ | |
while (await source.WaitToReadAsync() && await destination.WaitToWriteAsync()) | |
{ | |
var read = source.TryRead(out var item); | |
Debug.Assert(read); | |
var wrote = destination.TryWrite(item); | |
Debug.Assert(wrote); | |
Debug.WriteLine(iterationMessage); | |
} | |
destination.Complete(); | |
} | |
static async Task SmelterLoop(ChannelReader<IronOre> source, ChannelWriter<IronPlate> destination, ProductionMetrics metrics) | |
{ | |
var readCount = 0; | |
while (await source.WaitToReadAsync() && await destination.WaitToWriteAsync()) | |
{ | |
while(source.TryRead(out var item)) | |
{ | |
readCount++; | |
if (readCount == 2) | |
{ | |
var plate = new IronPlate(); | |
while (destination.TryWrite(plate) == false) | |
{ | |
await destination.WaitToWriteAsync(); | |
} | |
Debug.WriteLine("Converted 2 iron ore into 1 iron plate."); | |
Interlocked.Increment(ref metrics.IronPlateProduced); | |
readCount = 0; | |
} | |
} | |
} | |
destination.Complete(); | |
} | |
static async Task Setup(int total, ProductionMetrics metrics) | |
{ | |
Channel<IronOre> miner = Channel.CreateBounded<IronOre>( | |
new BoundedChannelOptions(10) | |
{ | |
AllowSynchronousContinuations = true, | |
} | |
); | |
Channel<IronOre> smelterInput = Channel.CreateBounded<IronOre>( | |
new BoundedChannelOptions(2) | |
{ | |
AllowSynchronousContinuations = true, | |
} | |
); | |
Channel<IronPlate> smelterOutput = Channel.CreateBounded<IronPlate>( | |
new BoundedChannelOptions(1) | |
{ | |
AllowSynchronousContinuations = true, | |
} | |
); | |
//Channel<IronPlate> chest = Channel.CreateUnbounded<IronPlate>( | |
// new UnboundedChannelOptions() | |
// { | |
// AllowSynchronousContinuations = true, | |
// } | |
//); | |
Channel<IronPlate> chest = Channel.CreateBounded<IronPlate>( | |
new BoundedChannelOptions(10) | |
{ | |
AllowSynchronousContinuations = true, | |
} | |
); | |
List<Task> tasks = new List<Task>(); | |
var minerLoop = MinerLoop(miner.Writer, total, metrics); | |
tasks.Add(minerLoop); | |
var ironOreInserterLoop = InserterLoop<IronOre>(miner.Reader, smelterInput.Writer, "Inserted 1 iron ore into smelter"); | |
tasks.Add(ironOreInserterLoop); | |
var ironSmelterLoop = SmelterLoop(smelterInput.Reader, smelterOutput.Writer, metrics); | |
tasks.Add(ironSmelterLoop); | |
var ironPlateInserterLoop = InserterLoop<IronPlate>(smelterOutput.Reader, chest.Writer, "Inserted 1 iron plate into chest"); | |
tasks.Add(ironPlateInserterLoop); | |
await Task.WhenAll(tasks); | |
} | |
var watch = Stopwatch.StartNew(); | |
ProductionMetrics metrics = new ProductionMetrics(); | |
var tasks = new List<Task>(); | |
var total = 40; //;50_000_000; | |
var parallelSetups = 1; | |
var perSetup = total / parallelSetups; | |
for(int i = 0; i < parallelSetups; i++) | |
{ | |
tasks.Add(Setup(perSetup, metrics)); | |
} | |
await Task.WhenAll(tasks); | |
watch.Stop(); | |
//while (true) | |
//{ | |
// await Task.Delay(TimeSpan.FromSeconds(1)); | |
// Console.WriteLine($"Iron ore mined: {metrics.ironOreProduced}"); | |
// Console.WriteLine($"Iron plates smelted: {metrics.ironPlateProduced}"); | |
// foreach (var t in tasks) | |
// { | |
// Console.WriteLine(t.IsCompleted); | |
// } | |
// if (tasks.All(t => t.IsCompleted)) | |
// { | |
// Console.WriteLine("Done."); | |
// break; | |
// } | |
//} | |
Console.WriteLine($"Iron ore mined: {metrics.IronOreProduced:n0}"); | |
Console.WriteLine($"Iron plates smelted: {metrics.IronPlateProduced:n0}"); | |
Console.WriteLine($"Elapsed: {watch.ElapsedMilliseconds}ms"); | |
//Console.WriteLine("Press any key to continue."); | |
//Console.ReadKey(); | |
public readonly record struct IronOre; | |
public readonly record struct IronPlate; | |
public class ProductionMetrics | |
{ | |
public int IronOreProduced = 0; | |
public int IronPlateProduced = 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment