Last active
September 15, 2022 18:39
-
-
Save tebeco/cb38900b18e5e0920d4f37fbdd8c23a5 to your computer and use it in GitHub Desktop.
UDP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>net7.0</TargetFramework> | |
<ImplicitUsings>enable</ImplicitUsings> | |
<Nullable>enable</Nullable> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="System.IO.Pipelines" Version="7.0.0-*" /> | |
</ItemGroup> | |
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using System.IO.Pipelines; | |
using System.Net; | |
using System.Net.Sockets; | |
using System.Text; | |
// This constructor arbitrarily assigns the local port number. | |
var udpClient = new UdpClient(); | |
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); | |
var anyEndpoint = new IPEndPoint(IPAddress.Any, 11000); | |
udpClient.Client.Bind(anyEndpoint); | |
var pipe = new Pipe(); | |
Task writing = FillPipeAsync(udpClient, anyEndpoint, pipe.Writer); | |
Task reading = ReadPipeAsync(pipe.Reader); | |
await Task.WhenAll(reading, writing); | |
async Task FillPipeAsync(UdpClient udpClient, EndPoint endpoint, PipeWriter writer) | |
{ | |
const int minimumBufferSize = 512; | |
while (true) | |
{ | |
// Allocate at least 512 bytes from the PipeWriter. | |
var memory = writer.GetMemory(minimumBufferSize); | |
var receiveResult = await udpClient.Client.ReceiveMessageFromAsync(memory, endpoint); | |
var bytesRead = receiveResult.ReceivedBytes; | |
if (bytesRead == 0) | |
{ | |
break; | |
} | |
// Tell the PipeWriter how much was read from the Socket. | |
writer.Advance(bytesRead); | |
// Make the data available to the PipeReader. | |
FlushResult result = await writer.FlushAsync(); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// By completing PipeWriter, tell the PipeReader that there's no more data coming. | |
await writer.CompleteAsync(); | |
} | |
async Task ReadPipeAsync(PipeReader reader) | |
{ | |
while (true) | |
{ | |
var result = await reader.ReadAsync(); | |
var buffer = result.Buffer; | |
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) | |
{ | |
// Process the line. | |
ProcessLine(line); | |
} | |
// Tell the PipeReader how much of the buffer has been consumed. | |
reader.AdvanceTo(buffer.Start, buffer.End); | |
// Stop reading if there's no more data coming. | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
// Mark the PipeReader as complete. | |
await reader.CompleteAsync(); | |
} | |
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) | |
{ | |
// Look for a EOL in the buffer. | |
SequencePosition? position = buffer.PositionOf((byte)'\n'); | |
if (position == null) | |
{ | |
line = default; | |
return false; | |
} | |
// Skip the line + the \n. | |
line = buffer.Slice(0, position.Value); | |
buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); | |
return true; | |
} | |
void ProcessLine(ReadOnlySequence<byte> payload) | |
{ | |
var line = Encoding.UTF8.GetString(payload); | |
Console.WriteLine($"[RECEIVED] {line}"); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers.Text; | |
using System.IO.Pipelines; | |
using System.Net.Sockets; | |
using System.Text; | |
const string messageHeader = "frame "; | |
const string messageSeparator = "\n"; | |
int messageHeaderLength = messageHeader.Length * sizeof(char); | |
int messageLength = messageHeaderLength + int.MaxValue.ToString().Length + (sizeof(char) * messageSeparator.Length); | |
// This constructor arbitrarily assigns the local port number. | |
var udpClient = new UdpClient(); | |
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); | |
udpClient.Connect("localhost", 11000); | |
var pipe = new Pipe(); | |
Task writing = FillPipeAsync(udpClient, pipe.Writer); | |
Task reading = ReadPipeAsync(pipe.Reader); | |
await Task.WhenAll(reading, writing); | |
async Task FillPipeAsync(UdpClient udpClient, PipeWriter writer) | |
{ | |
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); | |
var i = 0; | |
while (await timer.WaitForNextTickAsync()) | |
{ | |
++i; | |
var memory = writer.GetMemory(messageLength); | |
Encoding.UTF8.GetBytes(messageHeader, memory.Span); | |
Utf8Formatter.TryFormat(i, memory.Span.Slice(messageHeaderLength), out var charsWritten); | |
Encoding.UTF8.GetBytes(messageSeparator, memory.Span.Slice(messageHeaderLength + charsWritten)); | |
var writtenInPipe = messageHeaderLength + charsWritten + sizeof(char); | |
Console.WriteLine($"[SENT TO PIPE WRITER] {writtenInPipe} bytes"); | |
writer.Advance(writtenInPipe); | |
var result = await writer.FlushAsync(); | |
if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
udpClient.Close(); | |
} | |
async Task ReadPipeAsync(PipeReader reader) | |
{ | |
while (true) | |
{ | |
var result = await reader.ReadAsync(); | |
foreach (var buffer in result.Buffer) | |
{ | |
Console.WriteLine($"[RECEIVED FROM PIPE READER] {buffer.Length} bytes"); | |
var remainingToSend = buffer; | |
do | |
{ | |
var written = await udpClient.Client.SendAsync(remainingToSend); | |
remainingToSend = remainingToSend.Slice(written); | |
} while (!remainingToSend.IsEmpty); | |
} | |
reader.AdvanceTo(result.Buffer.End); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment