Skip to content

Instantly share code, notes, and snippets.

@CarlosLanderas
Last active January 8, 2020 07:50
Show Gist options
  • Save CarlosLanderas/14898c02b3828431bdf2b3d919ca6630 to your computer and use it in GitHub Desktop.
Save CarlosLanderas/14898c02b3828431bdf2b3d919ca6630 to your computer and use it in GitHub Desktop.
AspNetCore Response Streaming
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Http
{
internal static class TaskHelper
{
internal static Task WaitAsync(this CancellationToken cancellationToken)
{
TaskCompletionSource<bool> cancelationTaskCompletionSource = new TaskCompletionSource<bool>();
cancellationToken.Register(CancellationTokenCallback, cancelationTaskCompletionSource);
return cancellationToken.IsCancellationRequested ? Task.CompletedTask : cancelationTaskCompletionSource.Task;
}
private static void CancellationTokenCallback(object taskCompletionSource)
{
((TaskCompletionSource<bool>)taskCompletionSource).SetResult(true);
}
}
class Program
{
static async Task Main(string[] args)
{
var queue = new Queue<string>();
Task.Run(async () =>
{
while (true)
{
await Task.Delay(1000);
queue.Enqueue($"New event: {Guid.NewGuid().ToString()}");
}
});
using var host = new HostBuilder().ConfigureWebHostDefaults(config =>
{
config.UseKestrel(config =>
{
config.Listen(IPAddress.Loopback, 8080);
})
.ConfigureServices(config =>
{
config.AddLogging(setup => setup.AddConsole());
})
.Configure(app =>
{
app.Map("/stream", builder =>
{
builder.Run(async context =>
{
var body = context.Features.Get<IHttpResponseBodyFeature>();
body.DisableBuffering();
Task.Run(async () =>
{
while (!context.RequestAborted.IsCancellationRequested)
{
if (queue.Count > 0)
{
var item = $"{queue.Dequeue()}/n";
await body.Writer.WriteAsync(Encoding.UTF8.GetBytes($"{item}\n"));
await body.Writer.FlushAsync();
}
}
});
await context.RequestAborted.WaitAsync();
Console.WriteLine("Connection Aborted by client");
});
});
});
}).Build();
await host.StartAsync();
using (var httpClient = new HttpClient())
{
var resp = await httpClient.GetStreamAsync("http://localhost:8080/stream");
using var reader = new StreamReader(resp);
Task.Run(async () =>
{
while (true)
{
var line = await reader.ReadLineAsync();
Console.WriteLine(line);
await Task.Delay(400);
}
});
var reset = new ManualResetEventSlim();
Console.CancelKeyPress += (s, e) =>
{
resp.Close();
reset.Set();
};
reset.Wait();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment