Created
September 22, 2023 03:41
-
-
Save brnls/0e06787bcc759e9529b338611e3b9ed7 to your computer and use it in GitHub Desktop.
Concurrent Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Channels; | |
using Confluent.Kafka; | |
using Microsoft.Extensions.Logging; | |
namespace Worker1; | |
public delegate Task MessageHandler(ConsumeResult<string, byte[]> result, Action acknowledge, CancellationToken cancellationToken); | |
public class ConcurrentKafkaReceiver : IAsyncDisposable | |
{ | |
private readonly IConsumer<string, byte[]> _consumer; | |
private readonly IEnumerable<string> _topics; | |
private readonly ILogger<ConcurrentKafkaReceiver> _logger; | |
private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource(); | |
private readonly CancellationToken _disposeCt; | |
private readonly Dictionary<TopicPartition, TopicPartitionConsumer> _topicPartitionConsumers = new(); | |
private MessageHandler? _messageHandler; | |
private Task? _consumeTask; | |
private readonly Channel<TopicPartition> _unpauseChannel = Channel.CreateUnbounded<TopicPartition>(); | |
public ConcurrentKafkaReceiver( | |
ConsumerConfig config, | |
IEnumerable<string> topics, | |
ILoggerFactory loggerFactory) | |
{ | |
_logger = loggerFactory.CreateLogger<ConcurrentKafkaReceiver>(); | |
_disposeCt = _disposeCts.Token; | |
_consumer = new ConsumerBuilder<string, byte[]>(config) | |
.SetPartitionsAssignedHandler((c, topicPartitions) => | |
{ | |
foreach (var topicPartition in topicPartitions) | |
{ | |
_logger.LogInformation($"Assigned {topicPartition}"); | |
_topicPartitionConsumers[topicPartition] = new TopicPartitionConsumer( | |
topicPartition, | |
Channel.CreateBounded<ConsumeResult<string, byte[]>>( | |
new BoundedChannelOptions(100) | |
{ | |
SingleReader = true, | |
SingleWriter = true, | |
AllowSynchronousContinuations = false, | |
}), | |
_disposeCt, | |
loggerFactory.CreateLogger<TopicPartitionConsumer>(), | |
_messageHandler!, | |
consumeResult => | |
{ | |
c.StoreOffset(consumeResult); | |
_logger.LogInformation($"stored {consumeResult.TopicPartitionOffset}"); | |
}); | |
} | |
}) | |
.SetPartitionsRevokedHandler((c, topicPartitions) => | |
{ | |
foreach(var topicPartition in topicPartitions) | |
{ | |
_logger.LogInformation($"Revoked {topicPartition}"); | |
_logger.LogInformation($"Queued messages {_topicPartitionConsumers[topicPartition.TopicPartition].Messages}"); | |
} | |
// Give currently in flight messages 10 seconds to stop processing before cancelling | |
var stopProcessingTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | |
var stoppedProcessing = Task.WhenAll(topicPartitions.Select( | |
x => _topicPartitionConsumers[x.TopicPartition].WaitForStop(stopProcessingTokenSource.Token))) | |
.Wait(TimeSpan.FromSeconds(10)); | |
if (!stoppedProcessing) | |
{ | |
_logger.LogWarning("Timeout occurred while stopping one or more more topic partition consumers"); | |
} | |
foreach(var topicPartition in topicPartitions) | |
{ | |
_topicPartitionConsumers.Remove(topicPartition.TopicPartition); | |
} | |
_logger.LogInformation("Revoke partitions completed"); | |
}) | |
.SetOffsetsCommittedHandler((c, off) => | |
{ | |
foreach(var com in off.Offsets) | |
_logger.LogInformation($"Committing offset: {com}"); | |
}) | |
.Build(); | |
_topics = topics; | |
} | |
public void Start(MessageHandler messageHandler) | |
{ | |
_messageHandler = messageHandler; | |
_ = Task.Run(async () => | |
{ | |
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5)); | |
while (await timer.WaitForNextTickAsync(_disposeCt)) | |
{ | |
foreach(var topicPartitionConsumer in _topicPartitionConsumers.Values) | |
{ | |
if(topicPartitionConsumer.Messages == 0 && topicPartitionConsumer.Paused) | |
{ | |
await _unpauseChannel.Writer.WriteAsync(topicPartitionConsumer.TopicPartition); | |
} | |
} | |
} | |
}); | |
_consumeTask = Task.Run(async () => | |
{ | |
_logger.LogInformation($"Subscribing to {string.Join(", ", _topics)}"); | |
_consumer.Subscribe(_topics); | |
while (!_disposeCt.IsCancellationRequested) | |
{ | |
while(_unpauseChannel.Reader.TryRead(out var topicPartition)) | |
{ | |
_logger.LogInformation($"Resuming {topicPartition}"); | |
_consumer.Resume(new[] { topicPartition }); | |
} | |
try | |
{ | |
var consumeResult = _consumer.Consume(100); | |
_disposeCt.ThrowIfCancellationRequested(); | |
if (consumeResult is not null) | |
{ | |
var topicPartitionConsumer = _topicPartitionConsumers[consumeResult.TopicPartition]; | |
bool posted = topicPartitionConsumer.TryPostMessage(consumeResult); | |
if (!posted) | |
{ | |
_logger.LogInformation($"Pausing {consumeResult.TopicPartition}"); | |
_consumer.Pause(new[] { consumeResult.TopicPartition }); | |
_consumer.Seek(consumeResult.TopicPartitionOffset); | |
topicPartitionConsumer.Paused = true; | |
} | |
} | |
} | |
catch(ConsumeException ex) | |
{ | |
await Task.Delay(TimeSpan.FromSeconds(5)); | |
_logger.LogError(ex, "Error while consuming"); | |
} | |
} | |
}); | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
if (_disposeCt.IsCancellationRequested) return; | |
_disposeCts.Cancel(); | |
_disposeCts.Dispose(); | |
try | |
{ | |
await (_consumeTask ?? Task.CompletedTask); | |
} | |
catch (Exception) { } | |
_consumer.Close(); | |
_consumer.Dispose(); | |
_logger.LogInformation("consumer closed and disposed"); | |
} | |
} | |
public class TopicPartitionConsumer | |
{ | |
public TopicPartition TopicPartition { get; } | |
private readonly Channel<ConsumeResult<string, byte[]>> _channel; | |
private readonly ILogger<TopicPartitionConsumer> _logger; | |
private readonly MessageHandler _messageHandler; | |
private readonly Action<ConsumeResult<string, byte[]>> _storeOffset; | |
private readonly Task _processTask; | |
private readonly CancellationTokenSource _gracefulShutdownCts; | |
private readonly CancellationTokenSource _ungraceulShutdownCts; | |
public int Messages => _channel.Reader.Count; | |
public bool Paused { get; set; } | |
public TopicPartitionConsumer( | |
TopicPartition topicPartition, | |
Channel<ConsumeResult<string, byte[]>> channel, | |
CancellationToken stoppingToken, | |
ILogger<TopicPartitionConsumer> logger, | |
MessageHandler messageHandler, | |
Action<ConsumeResult<string, byte[]>> storeOffset) | |
{ | |
TopicPartition = topicPartition; | |
_channel = channel; | |
_logger = logger; | |
_messageHandler = messageHandler; | |
_storeOffset = storeOffset; | |
_processTask = Task.Run(ProcessPartition); | |
_gracefulShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); | |
_ungraceulShutdownCts = new CancellationTokenSource(); | |
} | |
public TopicPartitionOffset? LatestOffset { get; private set; } | |
public bool TryPostMessage(ConsumeResult<string, byte[]> result) | |
{ | |
return _channel.Writer.TryWrite(result); | |
} | |
/// <summary> | |
/// Stop processing the partition, waiting for the currently procesing message to complete. | |
/// </summary> | |
public async Task WaitForStop(CancellationToken token) | |
{ | |
token.Register(() => _ungraceulShutdownCts.Cancel()); | |
_gracefulShutdownCts.Cancel(); | |
try | |
{ | |
await _processTask; | |
} | |
catch (OperationCanceledException e) when (e.CancellationToken == _gracefulShutdownCts.Token) { } | |
} | |
private async Task ProcessPartition() | |
{ | |
while (true) | |
{ | |
if (await _channel.Reader.WaitToReadAsync(_gracefulShutdownCts.Token)) | |
{ | |
if (!_channel.Reader.TryPeek(out var consumeResult)) continue; | |
_logger.LogInformation($"{TopicPartition} - Handling message offset {consumeResult.Offset}"); | |
try | |
{ | |
await _messageHandler!(consumeResult, () => | |
{ | |
LatestOffset = consumeResult.TopicPartitionOffset; | |
_storeOffset(consumeResult); | |
_channel.Reader.TryRead(out _); | |
}, | |
_ungraceulShutdownCts.Token); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogError(ex, $"{TopicPartition} Uncaught exception while handling message."); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment