Mix.install([
{:websockex, "~> 0.4.3"},
{:jason, "~> 1.3"},
{:kino, "~> 0.6.2"},
{:kino_vega_lite, "~> 0.1.3"},
{:contex, "~> 0.4.0"}
])
渲染价格图表,使用 Contex
defmodule Render do
alias Contex.Plot
alias Contex.Dataset
alias Contex.LinePlot
def render_prices(title, frame, data) do
point_plot = data |> Dataset.new(["x", "y"]) |> LinePlot.new()
plot =
Plot.new(600, 400, point_plot)
|> Plot.plot_options(%{legend_setting: :legend_right})
|> Plot.titles(title, "")
{:safe, svg} = Plot.to_svg(plot)
render = Kino.Image.new(Enum.join(svg), :svg)
Kino.Frame.render(frame, render)
end
end
defmodule Streamer.Binance.TradeEvent do
defstruct [
:event_type,
:event_time,
:symbol,
:trade_id,
:price,
:quantity,
:buyer_order_id,
:seller_order_id,
:trade_time,
:buyer_market_maker
]
end
defmodule Streamer.Binance do
use WebSockex
require Logger
@endpoint "wss://stream.binance.com:9443/ws/"
def start_link(symbol, state) do
WebSockex.start_link(
"#{@endpoint}#{String.downcase(symbol)}@trade",
__MODULE__,
state
)
end
def handle_frame({_, msg}, state) do
case Jason.decode(msg) do
{:ok, event} ->
{:ok, process_event(event, state)}
{:error, _} ->
Logger.error("Unable to parse msg: #{msg}")
{:ok, state}
end
end
defp process_event(%{"e" => "trade"} = event, {frame, data} = _state) do
trade_event = %Streamer.Binance.TradeEvent{
:event_type => event["e"],
:event_time => event["E"],
:symbol => event["s"],
:trade_id => event["t"],
:price => event["p"] |> String.to_float(),
:quantity => event["q"],
:buyer_order_id => event["b"],
:seller_order_id => event["a"],
:trade_time => event["T"],
:buyer_market_maker => event["m"]
}
data = [trade.price | data] |> Enum.slice(0..100)
render_data =
data
|> Enum.reverse()
|> Enum.with_index(1)
|> Enum.map(fn {p, i} -> {i, p} end)
case data do
[] ->
nil
_ ->
Render.render_prices("binance #{trade.symbol} realtime price", frame, render_data)
end
{frame, data}
end
end
defmodule Streamer do
def start_streaming(symbol, state) do
Streamer.Binance.start_link(symbol, state)
end
end
frame = Kino.Frame.new()
获取指定symbol
input = Kino.Input.text("symbol", default: "ethusdt")
symbol = Kino.Input.read(input) |> String.trim()
{:ok, pid} = Streamer.start_streaming(symbol, {frame, []})
杀掉之前的进程 清理之前的frame
Process.exit(pid, :kill)
Kino.Frame.clear(frame)