Skip to content

Instantly share code, notes, and snippets.

Last active February 19, 2021 08:35
Show Gist options
  • Save jdaigle/cc1449a99d4e6448672d to your computer and use it in GitHub Desktop.
Save jdaigle/cc1449a99d4e6448672d to your computer and use it in GitHub Desktop.
public QueueEntry Enqueue(object item)
var entry = new QueueEntry(item);
while (true)
var prevTail = this.tail;
var prevTailNext = prevTail.Next;
if (prevTail == this.tail)
if (prevTailNext == null)
entry.SeqNum = prevTail.SeqNum + 1;
if (prevTail.TrySetNext(entry, null)) // compare and swap, returns if successful
// it shouldn't be possible for two threads to get here at the same time
Interlocked.CompareExchange(ref this.tail, entry, this.tail); // compare and swap
return entry;
// the tail has been updated before we had a chance. so compare and swap
// if it fails... that's okay
Interlocked.CompareExchange(ref this.tail, prevTailNext, prevTail);
private readonly AutoResetEvent messageDeliveryPumpSignal = new AutoResetEvent(false);
private readonly RegisteredWaitHandle messageDeliveryPumpSignalWaitHandle;
private volatile int messageDeliveryPumpLoopIsRunning = 0; // 0 = false, 1 = true
public ConcurrentQueue()
= ThreadPool.RegisterWaitForSingleObject(messageDeliveryPumpSignal
, (state, timedOut) => MessageDeliveryPumpLoop()
, null, -1, false);
messageDeliveryPumpSignal.Set(); // signal to start pump right away
public void Enqueue()
private void MessageDeliveryPumpLoop()
if (Interlocked.CompareExchange(ref messageDeliveryPumpLoopIsRunning, 1, 0) != 0)
return; // if the pump is already running, so we don't want to execute again
trace.Debug("MessageDeliveryPumpLoop Started.");
while (true)
var head = queueList.Head;
var next = head.GetNextValidEntry();
if (next == null)
return; // no more work to do
lock (consumers) // lock to prevent adding new consumers
if (consumers.IsEmpty)
// Loop over consumers and attempt delivery
var consumersToIterate = new List<Consumer>(consumers);
foreach (var consumer in consumersToIterate)
Interlocked.Exchange(ref messageDeliveryPumpLoopIsRunning, 0); // indicate that the pump has stopped
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment