Skip to content

Instantly share code, notes, and snippets.

@lothrop
Last active August 29, 2015 14:19
Show Gist options
  • Save lothrop/917d1454e9c0ccd293d3 to your computer and use it in GitHub Desktop.
Save lothrop/917d1454e9c0ccd293d3 to your computer and use it in GitHub Desktop.
Reading messages from the serial port using reactive extensions
IObservable<byte> serialPortSource = Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => _serialPort.DataReceived += handler,
handler => _serialPort.DataReceived -= handler
).SelectMany(_ =>
{
var buffer = new byte[1024];
var ret = new List<byte>();
do
{
int bytesRead = _serialPort.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while (bytesRead >= buffer.Length);
return ret;
});
private IObservable<IEnumerable<byte>> ToMessage(IObservable<byte> input)
{
return Observable.Create<IEnumerable<byte>>(observer =>
{
var buffer = new List<byte>();
return input.Subscribe(b =>
{
if (b == _messageDelimiter)
{
observer.OnNext(buffer);
buffer.Clear();
}
else
{
buffer.Add(b);
}
});
});
}
private class FillingCollection
{
public List<byte> Message { get; set; }
public bool Complete { get; set; }
}
private IObservable<IEnumerable<byte>> ToMessage(IObservable<byte> input)
{
return input.Scan(new FillingCollection { Message = new List<byte>() }, (buffer, newByte) =>
{
if (buffer.Complete)
{
buffer.Message.Clear();
buffer.Complete = false;
}
if (newByte == _messageDelimiter)
{
buffer.Complete = true;
}
else
{
buffer.Message.Add(newByte);
}
return buffer;
}).Where(fc => fc.Complete).Select(fc => fc.Message);
}
IConnectableObservable<IEnumerable<byte>> messageSourceConnectable = ToMessage(serialPortSource).Publish();
IObservable<IEnumerable<byte>> messageSource = messageSourceConnectable;
IDisposable serialPortSubscription = messageSourceConnectable.Connect();
IConnectableObservable<DeserializedMessage> connectableObservable = DeserializedMessageSource.Replay();
using (connectableObservable.Connect())
{
var messageId = await SendMessageAsync(command).ConfigureAwait(false);
response = await connectableObservable
.Where(message => message.Type == MessageType.Response && message.CorrelatingMessageID == messageId)
.Timeout(TimeSpan.FromSeconds(_timeoutSeconds))
.FirstAsync();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment