defmodule StageB do
@moduledoc """
An updated version of Stage (B) from the post at
Hope this example helps, but understand that I just wrote this on the fly and it may or may not contain bugs.
I tried to comment thouroughly so you should have no trouble implementing on your own.
alias Experimental.GenStage
use GenStage
# This struct will represent our internal state for this stage. For now we just care about our work buffer.
# You could easily add counts of how many events were received, how many were finished and sent to the next stage, etc
defstruct buffer: []
# Public API
# start our GenStage and set the initial state
# note the fact that this is using a named process. if you went with a pid-based approach, be sure to change
# the other public `GenStage.cast(__MODULE__` calls below to take the pid instead.
def start_link do
GenStage.start_link(__MODULE__, %__MODULE__{}, name: __MODULE__}
@doc """
This can be called from an external process that is performing the long-running work asynchronously.
`StageB.work_finished(%{some_result: :foo, other_info: [1, 2, 3]})`
def work_finished(result) do
# we will use a cast here since there is no good reason to send a response
# back to a pool-based process that could be dead or doing new work now
GenStage.cast(__MODULE__, {:work_finished, result})
@doc """
Force the buffer to be processed without delay
def process_buffer do
GenStage.cast(__MODULE__, :process_buffer)
@doc """
Tell the VM to schedule the buffer to be processed after a certain amount of time.
TODO: add a guard to make sure `delay` is numeric.
def process_buffer(delay) do
Process.send_after(self, { :"$gen_cast", :process_pending_work }, delay)
# Private API
def init(state) do
{:producer_consumer, state)
# callback to handle processing of items in the buffer
def handle_cast(:process_pending_work, %__MODULE__{ buffer: [] } = state), do: {:noreply, [], state}
def handle_cast(:process_pending_work, %__MODULE__{ buffer: buffer } = state) do
# assuming we have work in the buffer, lets attempt to move it to the worker pool.
# depending on your implementation, you may want to limit how many things you actually begin processing
# the code below will try to handle this in `queue_pending_work` and anything that cannot be queued up
# now will remain in the buffer until it can
with remaining_work <- buffer |> queue_pending_work,
do: {:noreply, [], %__MODULE__{ state | buffer: remaining_work }}
# callback to handle messages from the worker pool process with the finished result
def handle_cast({:work_finished, result}, %__MODULE__{} = state) do
# If you need to do anything else with the work or keep track/increment how many
# things you've delivered/processed, this is the place to do it.
# notice that we need to send the data in list form to the next stage, even though
# we are only handling a single item at the moment.
{:noreply, [result], state}
# events come in here from the previous stage
def handle_events(events, _from, %__MODULE__{ buffer: buffer } = state) do
# Defer the processing until after we store the received events in state, just in case we cannot
# queue everything up right now. You can adjust the time as needed in the last parameter.
# See the implementation above.
# we return `[]` to not send any data to the subscribing stage(s) at this point.
{:noreply, [], %__MODULE__{ state | buffer: buffer ++ events }}
defp queue_pending_work([]), do: [] # no work to do, return an empty list
defp queue_pending_work([ work_item | buffer ]) do
# You can optionally check the work queue on your pool and move it there or not
# If you do not care and just want to dump it all in the worker pool - feel free to
# not bother with this approach.
if queue_limit_reached? do
# assuming we do not want to queue more work at the moment, lets tell the VM to
# try processing items from the buffer again in 4 seconds. Otherwise we would not try
# again until we received more events, which may or may not happen.
# send the `work_item` to the pool to be processed
# IMPLEMENT your call here
defp queue_limit_reached? do
# if you are using any queue mechanism for your worker pool and want to limit the items queued up
# this is the place you could check that queue level and return true or false
