Last active
December 14, 2017 21:14
-
-
Save minidfx/9599b64d858ad53cbedcc224e7f4ae95 to your computer and use it in GitHub Desktop.
Contains modules for downloading medias from a friend plex server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Plex.Downloader do | |
@moduledoc """ | |
Module to download a media from a PlexServer asynchronously. | |
""" | |
require Logger | |
use GenServer | |
alias Plex.PlexDownloadRequest | |
alias Plex.PlexMedia | |
alias Plex.Profile | |
@typedoc """ | |
Type representing the tuple of a download media. | |
""" | |
@type download_arguments :: {PlexMedia.t, PlexDownloadRequest.t, binary} | |
@spec start_link() :: {:ok, pid} | {:error, bitstring} | :ignore | |
def start_link do | |
Logger.debug("Plex GenServer started.") | |
GenServer.start_link(__MODULE__, %{}, name: :plex) | |
end | |
@spec init(args :: any) :: {:ok, any} | |
def init(args), do: {:ok, args} | |
@spec terminate(reason :: charlist, args :: any) :: any | |
def terminate(reason, _) do | |
Logger.error("Plex GenServer stopping, please check next message.") | |
IO.inspect(reason) | |
Logger.debug("Plex GenServer stopped.") | |
end | |
@doc """ | |
Returns the downloads which are downloading a media. | |
""" | |
@spec downloads() :: %{pid => download_arguments} | |
def downloads, do: GenServer.call(:plex, :downloads) | |
@callback handle_call(:downloads, map) :: {:reply, map} | |
def handle_call(:downloads, _, state), do: {:reply, state, state} | |
@callback handle_info(HTTPoison.AsyncStatus.t, any) :: {:noreply, any} | |
def handle_info(%HTTPoison.AsyncStatus{:code => code}, state) do | |
Logger.debug("Server response: #{code}") | |
{:noreply, state} | |
end | |
@callback handle_info(HTTPoison.AsyncChunk.t, any) :: {:noreply, any} | |
def handle_info(%HTTPoison.AsyncChunk{:id => response_id} = response, state) do | |
result = case Map.has_key?(state, response_id) do | |
false -> {:ok, state} | |
true -> handle_chunk(response, state) | |
end | |
case result do | |
{:ok, state} -> {:noreply, state} | |
{:pending, state} -> {:noreply, state} | |
{:error, state} -> {:stop, "Cannot write the file, please check permissions", state} | |
end | |
end | |
@callback handle_info(HTTPoison.AsyncEnd.t, any) :: {:noreply, any} | |
def handle_info(%HTTPoison.AsyncEnd{:id => response_id} = response, state) do | |
Logger.debug("Download finished.") | |
result = case Map.has_key?(state, response_id) do | |
false -> state | |
true -> handle_last_chunk(response, state) | |
end | |
case result do | |
{:ok, state} -> {:noreply, state} | |
{:error, state} -> | |
Logger.debug("Error to write the file.") | |
{:stop, "Cannot write the file, please check permissions.", state} | |
end | |
end | |
@callback handle_info(HTTPoison.AsyncHeaders.t, any) :: {:noreply, any} | |
def handle_info(%HTTPoison.AsyncHeaders{}, state), do: {:noreply, state} | |
@callback handle_info(HTTPoison.AsyncError.t, any) :: {:noreply, any} | |
def handle_info(%HTTPoison.Error{:id => response_id, :reason => reason}, state) do | |
Logger.error("Error!") | |
IO.inspect(reason) | |
new_state = case Map.has_key?(state, response_id) do | |
false -> state | |
true -> | |
{ | |
{ | |
%PlexMedia{:file_name => file_name}, | |
%PlexDownloadRequest{:callback_server_id => callback_server_id}, | |
_ | |
}, | |
local_state | |
} = Map.get_and_update!(state, response_id, fn _ -> :pop end) | |
GenServer.cast(callback_server_id, {:plex_important_message, "Failed to download the #{file_name}."}) | |
local_state | |
end | |
{:noreply, new_state} | |
end | |
@callback handle_info(any, state :: map) :: {:noreply, state :: map} | |
def handle_info(request, state) do | |
Logger.debug("Unknown state received.") | |
IO.inspect(request) | |
{:noreply, state} | |
end | |
@callback handle_cast({:stop, pid}, state :: map) :: {:noreply, state :: map} | |
def handle_cast({:stop, pid}, state) do | |
:hackney.stop_async(pid) | |
{:noreply, Map.delete(state, pid)} | |
end | |
@callback handle_cast({:start, Profile.t, media_id :: integer, callback_process :: pid}, state :: map) :: {:noreply, state :: map} | |
def handle_cast({:start, %Profile{:host => host, :token => token, :port => port}, media_id, callback_process}, state) do | |
download(%PlexDownloadRequest{host: host, | |
port: port, | |
media_id: media_id, | |
plex_token: token, | |
callback_server_id: callback_process}) | |
{:noreply, state} | |
end | |
@callback handle_cast({:init, integer, PlexMedia.t, PlexDownloadRequest.t}, state :: any) :: {:noreply, state :: map} | |
def handle_cast({:init, | |
response_id, | |
%PlexMedia{:file_name => file_name} = media, | |
request}, | |
state) do | |
Logger.debug("Filename updated for #{file_name}") | |
new_state = Map.update(state, | |
response_id, | |
{media, request, <<>>}, | |
fn {_, _, chunks} -> {media, request, chunks} end) | |
{:noreply, new_state} | |
end | |
@spec handle_last_chunk(HTTPoison.AsyncEnd.t, state :: map) :: state :: map | |
defp handle_last_chunk(%HTTPoison.AsyncEnd{:id => response_id}, state) do | |
{local_download, new_state} = Map.get_and_update!(state, response_id, fn _ -> :pop end) | |
{ | |
%PlexMedia{:file_name => file_name} = media, | |
%PlexDownloadRequest{:callback_server_id => callback_server_id} = request, | |
_ | |
} = local_download | |
case local_download |> save_latest_chunk_to_disk do | |
:ok -> | |
Logger.debug("Latest chunk saved.") | |
rename_file({media, request, <<>>}) | |
GenServer.cast(callback_server_id, {:plex_important_message, "#{file_name} downloaded successfully and saved."}) | |
{:ok, new_state} | |
{:error, new_state} -> | |
Logger.error("Unable to write the latest buffer to the disk.") | |
GenServer.cast(callback_server_id, {:plex_important_message, "Unable to write the latest buffer to the disk."}) | |
{:error, Map.delete(new_state, response_id)} | |
end | |
end | |
@spec handle_chunk(HTTPoison.AsyncChunk.t, state :: map) :: state :: map | |
defp handle_chunk(%HTTPoison.AsyncChunk{:id => response_id, :chunk => chunk}, state) do | |
{_, %{^response_id => local_download} = local_state} = Map.get_and_update!(state, response_id, fn value -> update_chunks(value, chunk) end) | |
{ | |
%PlexMedia{:file_name => file_name}, | |
%PlexDownloadRequest{:callback_server_id => callback_server_id}, | |
_ | |
} = local_download | |
case local_download |> save_to_disk do | |
:ok -> | |
local_download |> print_download_stats | |
# Reset the chunks saved in memory | |
{_, local_new_state} = Map.get_and_update(state, | |
response_id, | |
&reset_chunk/1) | |
{:ok, local_new_state} | |
:pending -> {:ok, local_state} | |
{:error, reason} -> | |
Logger.error("Unable to write the buffer for #{file_name}: #{reason}") | |
GenServer.cast(callback_server_id, {:plex_important_message, "An error occurred to write the buffer to the disk."}) | |
{:error, Map.delete(local_state, response_id)} | |
end | |
end | |
@spec percentage(PlexMedia.t, bitstring) :: float | |
defp percentage(%PlexMedia{:size => size}, | |
chunks_bytes_length), | |
do: chunks_bytes_length * 100 / size |> Float.round(2) | |
@spec download(PlexDownloadRequest.t) :: {:ok, bitstring} | {:error, bitstring} | |
defp download(%PlexDownloadRequest{:plex_token => plex_token} = request) do | |
request |> metadata_url | |
|> HTTPoison.get(headers(plex_token, "text/xml"), ssl: [{:server_name_indication, :disable}]) | |
|> download_media_metadata(request) | |
|> download_media(request) | |
end | |
@spec reset_chunk(download_arguments) :: {download_arguments, download_arguments} | |
defp reset_chunk({media, request, _} = value), do: {value, {media, request, <<>>}} | |
@spec save_to_disk(download_arguments) :: :ok | {:error, reason :: bitstring} | :pending | |
defp save_to_disk({media, _, chunks}) do | |
case percentage(media, bit_size(chunks) / 8) >= 25 do | |
true -> | |
media |> absolute_file_path | |
|> append_tmp_extension | |
|> File.write(chunks, [:append]) | |
false -> :pending | |
end | |
end | |
@spec save_latest_chunk_to_disk(download_arguments) :: :ok | {:error, atom} | |
defp save_latest_chunk_to_disk({media, _, chunks}) do | |
media |> absolute_file_path | |
|> append_tmp_extension | |
|> File.write(chunks, [:append]) | |
end | |
@spec update_chunks(download_arguments | nil, binary) :: {download_arguments, download_arguments} | |
defp update_chunks({%PlexMedia{:progress => progress} = media, request, chunks} = value, chunk), | |
do: {value, {%{media | progress: progress + bit_size(chunk) / 8}, request, chunks <> chunk}} | |
defp update_chunks(nil, chunk), do: {nil, {nil, nil, chunk}} | |
@spec headers(token :: bitstring, content_type :: bitstring) :: headers :: any | |
defp headers(plex_token, content_type), do: [Accept: content_type, "X-Plex-Token": plex_token] | |
@spec metadata_url(PlexDownloadRequest.t) :: bitstring | |
defp metadata_url(%PlexDownloadRequest{:host => host, :port => port, :media_id => media_id}) do | |
url = "https://#{host}:#{port}/library/metadata/#{media_id}" | |
Logger.debug(url) | |
url | |
end | |
@spec download_media_metadata({:ok, HTTPoison.Response.t} | {:error, HTTPoison.Error.t}, PlexDownloadRequest.t) :: {:ok, PlexMedia.t} | {:error, reason :: bitstring} | |
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => 200, :body => xml}}, | |
%PlexDownloadRequest{:host => host, :port => port}) do | |
doc = Exml.parse(xml) | |
xml_queries = [ | |
{:media_path, Exml.get(doc, "//Part[1]/@key")}, | |
{:file_name, Exml.get(doc, "//Part[1]/@file")}, | |
{:size, Exml.get(doc, "//Part[1]/@size")} | |
] | |
case Enum.filter(xml_queries, fn {_, value} -> value == nil end) do | |
[] -> | |
media_path = Keyword.get(xml_queries, :media_path) | |
file_name = Keyword.get(xml_queries, :file_name) |> Path.basename | |
{size, _} = Keyword.get(xml_queries, :size) |> Integer.parse | |
url = "https://#{host}:#{port}#{media_path}" | |
Logger.debug("Media: file_name(#{file_name}), url(#{url}), size(#{size})") | |
{:ok, %PlexMedia{file_name: file_name, url: url, size: size}} | |
x -> | |
invalid_keys = x |> Enum.map(fn {key, _} -> key |> Atom.to_string end) |> Enum.join(",") | |
{:error, "Following values cannot be read from the XML response, perhaps an invalid media id: #{invalid_keys}"} | |
end | |
end | |
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => 401}}, %PlexDownloadRequest{:host => host}), | |
do: {:error, "Unauthorized to access to the server #{host}, please check the Plex token."} | |
defp download_media_metadata({:ok, %HTTPoison.Response{:status_code => status_code}}, %PlexDownloadRequest{:host => host}), | |
do: {:error, "HTTP error response: #{host}, #{status_code}"} | |
defp download_media_metadata({:error, error}, %PlexDownloadRequest{:host => host, :port => port}) do | |
IO.inspect(error) | |
{:error, "Cannot contact the host #{host}:#{port}, please check server logs."} | |
end | |
@spec download_media({:ok, PlexMedia.t} | {:error, reason :: bitstring}, PlexDownloadRequest.t) :: :ok | {:error, reason :: bitstring} | |
defp download_media({:ok, media}, | |
%PlexDownloadRequest{:callback_server_id => callback_server_id} = request) do | |
path = absolute_file_path(media) | |
path_temp = path |> append_tmp_extension | |
case {path |> File.exists?, path_temp |> File.exists?} do | |
{false, true} -> | |
File.rm!(path_temp) | |
start_download(media, request) | |
{false, false} -> | |
start_download(media, request) | |
_ -> | |
GenServer.cast(callback_server_id, {:plex_important_message, "The output file #{path} or the temp file #{path_temp} already exist."}) | |
{:error, "The output file #{path} or the temp file #{path_temp} already exist."} | |
end | |
end | |
defp download_media({:error, reason} = error, %PlexDownloadRequest{:callback_server_id => callback_server_id}) do | |
Logger.error(reason) | |
GenServer.cast(callback_server_id, {:plex_message, reason}) | |
error | |
end | |
@spec start_download(PlexMedia.t, PlexDownloadRequest.t) :: :ok | |
defp start_download(%PlexMedia{:file_name => file_name, :url => url} = media, %PlexDownloadRequest{:plex_token => plex_token, :callback_server_id => callback_server_id} = request) do | |
%{:id => id} = url |> HTTPoison.get!(headers(plex_token, "application/octet-stream"), | |
stream_to: :plex, | |
ssl: [{:server_name_indication, :disable}]) | |
media = %{media | start_downloading_datetime: NaiveDateTime.utc_now()} | |
GenServer.cast(:plex, {:init, id, media, request}) | |
GenServer.cast(callback_server_id, {:plex_message, "#{file_name} download started."}) | |
end | |
@spec rename_file(download_arguments) :: {:ok, download_arguments} | {:error, download_arguments} | |
defp rename_file({media, _, _} = local_download) do | |
path = media |> absolute_file_path | |
path_temp = path |> append_tmp_extension | |
case File.rename(path_temp, path) do | |
:ok -> | |
Logger.info("Media saved: #{path}") | |
{:ok, local_download} | |
{:error, reason} -> | |
Logger.error("Unable to save the media: #{reason}") | |
{:error, reason, local_download} | |
end | |
end | |
@spec print_download_stats(download_arguments) :: {:ok, percentage :: integer} | {:error, reason :: bitstring} | |
defp print_download_stats({%PlexMedia{:start_downloading_datetime => start, | |
:file_name => file_name, | |
:size => size, | |
:progress => progress} = media, | |
%PlexDownloadRequest{:callback_server_id => callback_server_id}, | |
_}) do | |
local_percentage = percentage(media, progress) | |
total = size / 1024 / 1024 |> Float.round(2) | |
downloaded = progress / 1024 / 1024 |> Float.round(2) | |
time_elapsed = NaiveDateTime.diff(NaiveDateTime.utc_now, start) | |
speed = case time_elapsed do | |
0 -> downloaded |> Float.round(2) | |
x -> (downloaded / x) |> Float.round(2) | |
end | |
Logger.debug("#{file_name} downloaded: #{local_percentage}%, #{downloaded}MB, Avg. speed: #{speed}MB/s") | |
Logger.debug("Total: #{downloaded}, Time elasped: #{time_elapsed}") | |
message = "#{file_name}: #{downloaded}/#{total}MB, #{local_percentage}%, Avg. speed: #{speed}MB/s" | |
GenServer.cast(callback_server_id, {:plex_message, message}) | |
:ok | |
end | |
@spec append_tmp_extension(bitstring) :: bitstring | |
defp append_tmp_extension(path), do: "#{path}.tmp" | |
@spec absolute_file_path(PlexMedia.t) :: path :: bitstring | |
defp absolute_file_path(%PlexMedia{:file_name => file_name}) do | |
download_path = Application.get_env(:bjbot, :download_path) | |
"#{download_path}/#{file_name}" | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Plex.PlexDownloadRequest do | |
@doc """ | |
Model representing the request which will send to the downloader. | |
""" | |
defstruct [:host, :port, :media_id, :plex_token, :callback_server_id] | |
@type t :: %__MODULE__{host: bitstring, port: integer, media_id: integer, plex_token: bitstring, callback_server_id: pid} | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Plex.PlexMedia do | |
@doc """ | |
Model representing the media for the downloader. | |
""" | |
defstruct [:file_name, :url, :start_downloading_datetime, size: 0.0, progress: 0.0] | |
@type t :: %__MODULE__{file_name: bitstring, url: bitstring, start_downloading_datetime: struct, size: float, progress: float} | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Plex.Profile do | |
defstruct [:name, :token, :host, port: 32400] | |
@type t :: %__MODULE__{name: bitstring, token: bitstring, host: bitstring, port: integer} | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add the downloader into your supervisor tree and send a message to the process :plex as following to start the download:
Callback methods: