Created
January 29, 2021 18:35
-
-
Save ConnorRigby/53edaa88c03405271c22cc03b9f8b970 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule NervesHubLinkCommon.Downloader do | |
@moduledoc """ | |
Handles downloading files via HTTP. | |
internally caches several interesting properties about | |
the download such as: | |
* the URI of the request | |
* the total content amounts of bytes of the file being downloaded | |
* the total amount of bytes downloaded at any given time | |
Using this information, it can restart a download using the | |
[`Range` HTTP header](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range). | |
This process's **only** focus is obtaining data reliably. It doesn't have any | |
side effects on the system. | |
""" | |
require Logger | |
use GenServer | |
alias NervesHubLinkCommon.{Downloader, Downloader.RetryConfig} | |
defstruct uri: nil, | |
conn: nil, | |
request_ref: nil, | |
status: nil, | |
response_headers: [], | |
content_length: 0, | |
downloaded_length: 0, | |
retry_number: 0, | |
handler_fun: nil, | |
retry_args: nil, | |
max_timeout: nil, | |
retry_timeout: nil | |
@type handler_event :: {:data, binary()} | {:error, any()} | :complete | |
@type event_handler_fun :: (handler_event -> any()) | |
@type retry_args :: RetryConfig.t() | |
@type t :: %Downloader{ | |
uri: nil | URI.t(), | |
conn: nil | Mint.HTTP.t(), | |
request_ref: nil | reference(), | |
status: nil | Mint.Types.status(), | |
response_headers: Mint.Types.headers(), | |
content_length: non_neg_integer(), | |
downloaded_length: non_neg_integer(), | |
retry_number: non_neg_integer(), | |
handler_fun: event_handler_fun, | |
retry_args: retry_args(), | |
max_timeout: reference(), | |
retry_timeout: nil | reference() | |
} | |
@type initialized_download :: %Downloader{ | |
uri: URI.t(), | |
conn: Mint.HTTP.t(), | |
request_ref: reference(), | |
status: nil | Mint.Types.status(), | |
response_headers: Mint.Types.headers(), | |
content_length: non_neg_integer(), | |
downloaded_length: non_neg_integer(), | |
retry_number: non_neg_integer(), | |
handler_fun: event_handler_fun | |
} | |
# todo, this should be `t`, but with retry_timeout | |
@type resume_rescheduled :: t() | |
@doc """ | |
Begins downloading a file at `url` handled by `fun`. | |
# Example | |
iex> pid = self() | |
#PID<0.110.0> | |
iex> fun = fn {:data, data} -> File.write("index.html", data) | |
...> {:error, error} -> IO.puts("error streaming file: \#{inspect(error)}") | |
...> :complete -> send pid, :complete | |
...> end | |
#Function<44.97283095/1 in :erl_eval.expr/5> | |
iex> NervesHubLinkCommon.Downloader.start_download("https://httpbin.com/", fun) | |
{:ok, #PID<0.111.0>} | |
iex> flush() | |
:complete | |
""" | |
@spec start_download(String.t() | URI.t(), event_handler_fun()) :: GenServer.on_start() | |
def start_download(url, fun) when is_function(fun, 1) do | |
GenServer.start_link(__MODULE__, [URI.parse(url), fun, %RetryConfig{}]) | |
end | |
def start_download(url, fun, %RetryConfig{} = retry_args) when is_function(fun, 1) do | |
GenServer.start_link(__MODULE__, [URI.parse(url), fun, retry_args]) | |
end | |
@impl GenServer | |
def init([%URI{} = uri, fun, %RetryConfig{} = retry_args]) do | |
timer = Process.send_after(self(), :max_timeout, retry_args.max_timeout) | |
state = | |
reset(%Downloader{ | |
handler_fun: fun, | |
retry_args: retry_args, | |
max_timeout: timer, | |
uri: uri | |
}) | |
send(self(), :resume) | |
{:ok, state} | |
end | |
@impl GenServer | |
# this message is scheduled during init/1 | |
# it is a extreme condition where regardless of download attempts, | |
# idle timeouts etc, this entire process has lived for TOO long. | |
def handle_info(:max_timeout, %Downloader{} = state) do | |
{:stop, :max_timeout_reached, state} | |
end | |
# this message is delivered after `state.retry_args.idle_timeout` | |
# milliseconds have occurred. It indicates that many milliseconds have elapsed since | |
# the last "chunk" from the HTTP server | |
def handle_info(:timeout, %Downloader{handler_fun: handler} = state) do | |
_ = handler.({:error, :idle_timeout}) | |
state = reschedule_resume(state) | |
{:noreply, state} | |
end | |
# message is scheduled when a resumable event happens. | |
def handle_info( | |
:resume, | |
%Downloader{ | |
retry_number: retry_number, | |
retry_args: %RetryConfig{max_disconnects: retry_number} | |
} = state | |
) do | |
{:stop, :max_disconnects_reached, state} | |
end | |
def handle_info(:resume, %Downloader{handler_fun: handler} = state) do | |
case resume_download(state.uri, state) do | |
{:ok, state} -> | |
{:noreply, state, state.retry_args.idle_timeout} | |
error -> | |
_ = handler.(error) | |
state = reschedule_resume(state) | |
{:noreply, state} | |
end | |
end | |
def handle_info(message, %Downloader{handler_fun: handler} = state) do | |
case Mint.HTTP.stream(state.conn, message) do | |
{:ok, conn, responses} -> | |
handle_responses(responses, %{state | conn: conn}) | |
# i think there's probably a race condition here... | |
{:error, conn, error, responses} -> | |
_ = handler.({:error, error}) | |
handle_responses(responses, reschedule_resume(%{state | conn: conn})) | |
:unknown -> | |
IO.inspect(message, label: "unhandled message", limit: :infinity) | |
# {:stop, :unknown, state} | |
{:noreply, state} | |
end | |
end | |
# schedules a message to be delivered based on retry args | |
@spec reschedule_resume(t()) :: resume_rescheduled() | |
defp reschedule_resume(%Downloader{retry_number: retry_number} = state) do | |
timer = Process.send_after(self(), :resume, state.retry_args.time_between_retries) | |
%Downloader{state | retry_timeout: timer, retry_number: retry_number + 1} | |
end | |
defp handle_responses([response | rest], %Downloader{} = state) do | |
case handle_response(response, state) do | |
# this `status != nil` thing seems really weird. Shouldn't be needed. | |
%Downloader{status: status} = state when status != nil and status >= 400 -> | |
{:stop, {:http_error, status}, state} | |
state -> | |
handle_responses(rest, state) | |
end | |
end | |
defp handle_responses( | |
[], | |
%Downloader{downloaded_length: downloaded, content_length: downloaded} = state | |
) | |
when downloaded != 0 do | |
_ = state.handler_fun.(:complete) | |
{:stop, :normal, state} | |
end | |
defp handle_responses([], %Downloader{} = state) do | |
{:noreply, state, state.retry_args.idle_timeout} | |
end | |
def handle_response( | |
{:status, request_ref, status}, | |
%Downloader{request_ref: request_ref} = state | |
) | |
when status >= 300 and status < 400 do | |
%Downloader{state | status: status} | |
end | |
# the handle_responses/2 function checks this value again because this function only handles state | |
def handle_response( | |
{:status, request_ref, status}, | |
%Downloader{request_ref: request_ref} = state | |
) | |
when status >= 400 do | |
# kind of a hack to make the error type uniform | |
state.handler_fun.({:error, %Mint.HTTPError{reason: {:http_error, status}}}) | |
%Downloader{state | status: status} | |
end | |
def handle_response( | |
{:status, request_ref, status}, | |
%Downloader{request_ref: request_ref} = state | |
) | |
when status >= 200 and status < 300 do | |
%Downloader{state | status: status} | |
end | |
# handles HTTP redirects. | |
def handle_response( | |
{:headers, request_ref, headers}, | |
%Downloader{request_ref: request_ref, status: status, handler_fun: handler} = state | |
) | |
when status >= 300 and status < 400 do | |
location = fetch_location(headers) | |
Logger.info("Redirecting to #{location}") | |
state = reset(state) | |
case resume_download(location, state) do | |
{:ok, %Downloader{} = state} -> | |
state | |
error -> | |
handler.(error) | |
state | |
end | |
end | |
# if we already have the content-length header, don't fetch it again. | |
# range requests will change this value | |
def handle_response( | |
{:headers, request_ref, headers}, | |
%Downloader{request_ref: request_ref, content_length: content_length} = state | |
) | |
when content_length > 0 do | |
%Downloader{state | response_headers: headers} | |
end | |
def handle_response( | |
{:headers, request_ref, headers}, | |
%Downloader{request_ref: request_ref, content_length: 0} = state | |
) do | |
case fetch_accept_ranges(headers) do | |
accept_ranges when accept_ranges in ["none", nil] -> | |
Logger.error("HTTP Server does not support the Range header") | |
_ -> | |
:ok | |
end | |
%Downloader{state | response_headers: headers, content_length: fetch_content_length(headers)} | |
end | |
def handle_response( | |
{:data, request_ref, data}, | |
%Downloader{request_ref: request_ref, downloaded_length: downloaded} = state | |
) do | |
_ = state.handler_fun.({:data, data}) | |
%Downloader{state | downloaded_length: downloaded + byte_size(data)} | |
end | |
def handle_response({:done, request_ref}, %Downloader{request_ref: request_ref} = state) do | |
state | |
end | |
# ignore other messages when redirecting | |
def handle_response(_, %Downloader{status: nil} = state) do | |
state | |
end | |
defp reset(%Downloader{} = state) do | |
%Downloader{ | |
state | |
| retry_number: 0, | |
downloaded_length: 0, | |
content_length: 0 | |
} | |
end | |
@spec resume_download(URI.t(), t()) :: | |
{:ok, initialized_download()} | {:error, Mint.Types.error()} | |
defp resume_download( | |
%URI{scheme: scheme, host: host, port: port, path: path, query: query} = uri, | |
%Downloader{} = state | |
) | |
when scheme in ["https", "http"] do | |
request_headers = | |
[{"content-type", "application/octet-stream"}] | |
|> add_range_header(state) | |
|> add_retry_number_header(state) | |
|> add_user_agent_header(state) | |
path = if query do | |
"#{path}?#{query}" | |
else | |
path | |
end | |
Logger.info("Resuming download attempt number #{state.retry_number} #{uri}") | |
IO.puts("\r\n") | |
IO.inspect(path, label: "\r\n\r\nQUERY\r\n") | |
IO.puts("\r\n") | |
with {:ok, conn} <- Mint.HTTP.connect(String.to_existing_atom(scheme), host, port), | |
{:ok, conn, request_ref} <- Mint.HTTP.request(conn, "GET", path, request_headers, nil) do | |
{:ok, | |
%Downloader{ | |
state | |
| uri: uri, | |
conn: conn, | |
request_ref: request_ref, | |
status: nil, | |
response_headers: [] | |
}} | |
end | |
end | |
@spec fetch_content_length(Mint.Types.headers()) :: 0 | pos_integer() | |
defp fetch_content_length(headers) | |
defp fetch_content_length([{"content-length", value} | _]), do: String.to_integer(value) | |
defp fetch_content_length([_ | rest]), do: fetch_content_length(rest) | |
defp fetch_content_length([]), do: 0 | |
@spec fetch_location(Mint.Types.headers()) :: nil | URI.t() | |
defp fetch_location(headers) | |
defp fetch_location([{"location", uri} | _]), do: URI.parse(uri) |> IO.inspect(label: "REDIRECTED!!!!") | |
defp fetch_location([_ | rest]), do: fetch_location(rest) | |
defp fetch_location([]), do: nil | |
defp fetch_accept_ranges(headers) | |
defp fetch_accept_ranges([{"accept-ranges", value} | _]), do: value | |
defp fetch_accept_ranges([_ | rest]), do: fetch_accept_ranges(rest) | |
defp fetch_accept_ranges([]), do: nil | |
@spec add_range_header(Mint.Types.headers(), t()) :: Mint.Types.headers() | |
defp add_range_header(headers, state) | |
defp add_range_header(headers, %Downloader{content_length: 0}), do: headers | |
defp add_range_header(headers, %Downloader{downloaded_length: r, content_length: total}) | |
when total > 0, | |
do: [{"Range", "bytes=#{r}-#{total}"} | headers] | |
@spec add_retry_number_header(Mint.Types.headers(), t()) :: Mint.Types.headers() | |
defp add_retry_number_header(headers, %Downloader{retry_number: retry_number}), | |
do: [{"X-Retry-Number", "#{retry_number}"} | headers] | |
defp add_user_agent_header(headers, _), | |
do: [{"User-Agent", "NHL/#{Application.spec(:nerves_hub_link_common)[:vsn]}"} | headers] | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment