Skip to content

Instantly share code, notes, and snippets.

@davidfowl
Last active April 12, 2018 19:16
Show Gist options
  • Save davidfowl/e8b68d5f4572c11263151c3efcf9c785 to your computer and use it in GitHub Desktop.
Save davidfowl/e8b68d5f4572c11263151c3efcf9c785 to your computer and use it in GitHub Desktop.
const int maxBuffer = 1024;
var nagle = new NagleAwaitable(TimeSpan.FromMilliseconds(1));
while (true)
{
var result = await application.Output.ReadAsync();
var buffer = result.Buffer;
// Check if we have enough to perform a write
if (buffer.Length >= maxBuffer)
{
await WriteAsync(buffer);
}
else
{
while (buffer.Length < maxBuffer)
{
var length = buffer.Length;
// If we're under, we want to start buffering, act like we consumed
// nothing but looked at everything so we can wait for more data
application.Output.AdvanceTo(buffer.Start, buffer.End);
// Dispatch to the thread pool to inject some latency (this *could* be another queue)
await nagle;
// TryRead should *always* return true since we know we have something to write
var hasData = application.Output.TryRead(out result);
buffer = result.Buffer;
Debug.Assert(hasData, "We should have data buffered");
if (buffer.Length == length)
{
// We didn't get more data so write what we have
break;
}
}
await WriteAsync(buffer);
}
application.Output.AdvanceTo(buffer.End);
}
public class NagleAwaitable : ICriticalNotifyCompletion
{
private readonly ConcurrentQueue<Action> _work = new ConcurrentQueue<Action>();
private readonly TimerAwaitable _timerAwaitable;
private bool _timerResult;
public NagleAwaitable(TimeSpan period)
{
_timerAwaitable = new TimerAwaitable(period, period);
_timerAwaitable.OnCompleted(FireCallbacks);
_timerAwaitable.Start();
}
public NagleAwaitable GetAwaiter() => this;
public bool GetResult()
{
return _timerResult;
}
public bool IsCompleted => _timerAwaitable.IsCompleted;
public void OnCompleted(Action continuation)
{
_work.Enqueue(continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
public void FireCallbacks()
{
_timerResult = _timerAwaitable.GetResult();
while (_work.TryDequeue(out var action))
{
Task.Run(action);
}
_timerAwaitable.OnCompleted(FireCallbacks);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment