Skip to content

Instantly share code, notes, and snippets.

@aspett
Created May 9, 2021 23:53
Show Gist options
  • Save aspett/c18fff08fd58a62f28448864c027666a to your computer and use it in GitHub Desktop.
Save aspett/c18fff08fd58a62f28448864c027666a to your computer and use it in GitHub Desktop.
defmodule Producer do
use GenStage
@behaviour Broadway.Producer
def init(_opts) do
{:producer, 0}
end
def handle_demand(demand, stored_demand) do
total_demand = demand + stored_demand
messages =
total_demand
|> BlockingBuffer.drain()
|> Enum.map(&%Broadway.Message{data: &1, acknowledger: {Broadway.NoopAcknowledger, nil, nil}}) # Create a broadway message from the kafka message
message_count = length(messages)
# If we've run out of messages, we need to kickstart the pipeline again
# so, try drain again soon
if message_count < total_demand do
Process.send_after(self(), :kick_start, 1_000)
end
{:noreply, messages, total_demand - message_count}
end
def handle_info(:kick_start, stored_demand) do
handle_demand(0, stored_demand)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment