Created
September 30, 2016 22:55
-
-
Save myronmarston/42e3d2261e42751fea0959e88538a6ba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alias Experimental.GenStage | |
defmodule WorkerPool.Worker do | |
@type subscription_options :: Keyword.t | |
@type producer :: pid | {pid, subscription_options} | |
@doc """ | |
Starts a worker, subscribed to the provided job producers. | |
""" | |
@spec start_link([producer], Keyword.t) :: {:ok, pid} | |
def start_link(producers, options \\ []) do | |
GenStage.start_link(__MODULE__.Server, producers, options) | |
end | |
defmodule Server do | |
use GenStage | |
require Logger | |
@moduledoc false | |
@doc "Implementation of c:GenStage.init/1." | |
def init(producers) do | |
{:ok, task_supervisor} = Task.Supervisor.start_link() | |
{:consumer, task_supervisor, subscribe_to: producers} | |
end | |
@doc "Implementation of c:GenStage.handle_events/3." | |
def handle_events(jobs, _from, task_supervisor) do | |
Enum.each(jobs, &process_job_in_isolated_process(&1, task_supervisor)) | |
{:noreply, [], task_supervisor} | |
end | |
defp process_job_in_isolated_process(job, task_supervisor) do | |
task_supervisor | |
|> Task.Supervisor.async_nolink(fn -> process_job(job) end) | |
|> wait_for_process_completion(job) | |
end | |
defp wait_for_process_completion(task, job) do | |
case Task.yield(task, job.timeout) do | |
{:ok, :task_finished} -> :ok | |
{:exit, reason} -> notify_observer(job, :job_failed, reason) | |
nil -> | |
Task.shutdown(task) | |
notify_observer(job, :job_timed_out, job.timeout) | |
end | |
end | |
defp process_job(job) do | |
Logger.metadata(job.logger_metadata) | |
if Process.alive?(job.observer_pid) do | |
result = job.job_fun.() | |
notify_observer(job, :job_finished, result) | |
else | |
Logger.warn "#{__MODULE__} aborted job #{inspect job.id}, " <> | |
"since observer #{inspect job.observer_pid} is down" | |
end | |
:task_finished | |
end | |
defp notify_observer(job, notification, data) do | |
send(job.observer_pid, {notification, job.id, data}) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment