Last active
April 30, 2021 08:05
-
-
Save amoerie/65bf3b077a696b1d65cb5fb27cbb7206 to your computer and use it in GitHub Desktop.
PriorityChannelTest - A proof of concept for a thread safe concurrent queue built on channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using FluentAssertions; | |
using Microsoft.Extensions.Logging; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Globalization; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
using Xunit; | |
using Xunit.Abstractions; | |
namespace Dobco.PACSONWEB3.LocalImageCache.Tests | |
{ | |
public class PriorityChannelTest | |
{ | |
private readonly ITestOutputHelper _testOutputHelper; | |
public PriorityChannelTest(ITestOutputHelper testOutputHelper) | |
{ | |
_testOutputHelper = testOutputHelper; | |
} | |
[Fact] | |
public async Task PriorityChannels() | |
{ | |
// Arrange | |
var channel1 = Channel.CreateUnbounded<PriorityItem>(); | |
var channel2 = Channel.CreateUnbounded<PriorityItem>(); | |
var channel3 = Channel.CreateUnbounded<PriorityItem>(); | |
var consumedItems = new ConcurrentBag<PriorityItem>(); | |
var numberOfItemsPerProducer = 100; | |
// Act | |
var producerTasks = new List<Task> | |
{ | |
Task.Run(() => Producer("Producer 0")), | |
Task.Run(() => Producer("Producer 1")), | |
Task.Run(() => Producer("Producer 2")), | |
Task.Run(() => Producer("Producer 3")), | |
Task.Run(() => Producer("Producer 4")), | |
Task.Run(() => Producer("Producer 5")), | |
Task.Run(() => Producer("Producer 6")), | |
Task.Run(() => Producer("Producer 7")), | |
Task.Run(() => Producer("Producer 8")), | |
Task.Run(() => Producer("Producer 9")), | |
}; | |
var consumerTasks = new List<Task> | |
{ | |
Task.Run(() => Consumer("Consumer 0")), | |
Task.Run(() => Consumer("Consumer 1")), | |
Task.Run(() => Consumer("Consumer 2")) | |
}; | |
await Task.WhenAll(producerTasks).ConfigureAwait(false); | |
channel1.Writer.Complete(); | |
channel2.Writer.Complete(); | |
channel3.Writer.Complete(); | |
// Act | |
await Task.WhenAll(consumerTasks).ConfigureAwait(false); | |
// Assert | |
var consumedItemsList = consumedItems.ToList(); | |
consumedItemsList.Count.Should().Be(producerTasks.Count * numberOfItemsPerProducer); | |
var averageWaitTimePerPriority = consumedItemsList | |
.GroupBy(i => i.Priority) | |
.Select(group => new | |
{ | |
Priority = group.Key, | |
AverageWaitTimeInMs = group | |
.Select(item => (item.ConsumptionDateTime!.Value - item.ProductionDateTime).TotalMilliseconds) | |
.DefaultIfEmpty(0) | |
.Average() | |
}) | |
.ToDictionary(i => i.Priority, i => i.AverageWaitTimeInMs); | |
var channel1AverageWaitTime = averageWaitTimePerPriority[1]; | |
var channel2AverageWaitTime = averageWaitTimePerPriority[2]; | |
var channel3AverageWaitTime = averageWaitTimePerPriority[3]; | |
Log($"Channel 1 average wait time: {channel1AverageWaitTime}ms"); | |
Log($"Channel 2 average wait time: {channel2AverageWaitTime}ms"); | |
Log($"Channel 3 average wait time: {channel3AverageWaitTime}ms"); | |
channel1AverageWaitTime.Should().BeLessThan(channel2AverageWaitTime); | |
channel2AverageWaitTime.Should().BeLessThan(channel3AverageWaitTime); | |
// See logging | |
// A producer pumps items into random channels | |
async Task Producer(string producerName) | |
{ | |
var channels = new[] {channel1.Writer, channel2.Writer, channel3.Writer }; | |
var random = new Random(); | |
for (int i = 0; i < numberOfItemsPerProducer; i++) | |
{ | |
// 1-10ms random delay | |
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(1, 10))).ConfigureAwait(false); | |
var priority = random.Next(1, channels.Length + 1); | |
// High priority items go in channel 1, low priority items in channel 3 | |
var channel = channels[priority - 1]; | |
var priorityItem = new PriorityItem | |
{ | |
Label = $"Item {i}", | |
ProducerName = producerName, | |
ProductionDateTime = DateTime.Now, | |
Priority = priority, | |
ConsumerName = null, | |
ConsumptionDateTime = null | |
}; | |
if (!channel.TryWrite(priorityItem)) | |
throw new InvalidOperationException($"{producerName} could not write to channel {priority}"); | |
} | |
} | |
// A consumer reads from all channels, trying to read from the highest priority channels first | |
async Task Consumer(string consumerName) | |
{ | |
var channels = new List<ChannelReader<PriorityItem>>() {channel1.Reader, channel2.Reader, channel3.Reader}; | |
// When a channel is empty, we remove it from the list | |
while (channels.Count > 0) | |
{ | |
bool didRead = false; | |
PriorityItem item = null; | |
// Try to synchronously read from the channels in order of priority | |
foreach (var channel in channels) | |
{ | |
if (channel.TryRead(out item)) | |
{ | |
didRead = true; | |
break; | |
} | |
} | |
if (didRead) | |
{ | |
await OnConsume(consumerName, item).ConfigureAwait(false); | |
continue; | |
} | |
// Try to asynchronously read from the channels in order of priority | |
var waits = new Task<bool>[channels.Count]; | |
for (int i = 0; i < channels.Count; i++) | |
{ | |
waits[i] = channels[i].WaitToReadAsync(CancellationToken.None).AsTask(); | |
} | |
// Wait until one of the "WaitToReadAsync" calls completes | |
var winner = await Task.WhenAny(waits).ConfigureAwait(false); | |
// Find which channel won the race | |
var index = Array.IndexOf(waits, winner); | |
var winningChannel = channels[index]; | |
// WaitToReadAsync returns false if the channel is complete, i.e. there will never be any more items | |
var winningChannelCanRead = await winner.ConfigureAwait(false); | |
// This channel is empty, remove it from the list | |
if (!winningChannelCanRead) | |
{ | |
channels.Remove(winningChannel); | |
continue; | |
} | |
// Try to read from the winning channel, we might be too late but that's okay | |
if (winningChannel.TryRead(out item)) | |
{ | |
await OnConsume(consumerName, item).ConfigureAwait(false); | |
} | |
} | |
} | |
async Task OnConsume(string consumerName, PriorityItem item) | |
{ | |
if (item == null) | |
throw new ArgumentException("Item is null"); | |
item.ConsumerName = consumerName; | |
item.ConsumptionDateTime = DateTime.Now; | |
Log($"{consumerName} consumed item {item.Label} with priority {item.Priority}"); | |
consumedItems.Add(item); | |
// 1-10ms random delay | |
var random = new Random(); | |
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(1, 10))).ConfigureAwait(false); | |
} | |
void Log(string message) | |
{ | |
_testOutputHelper.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff", CultureInfo.InvariantCulture)} {message}"); | |
} | |
} | |
} | |
public class PriorityItem | |
{ | |
public int Priority { get; set; } | |
public string Label { get; set; } | |
public string ProducerName { get; set; } | |
public string ConsumerName { get; set; } | |
public DateTime ProductionDateTime { get; set; } | |
public DateTime? ConsumptionDateTime { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment