Last active
October 22, 2020 22:09
-
-
Save ConnorRigby/0cbd7a37c8d1af3339e319d683eef3f3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import the opencv library | |
import cv2 | |
import erlang, os, struct | |
# define a video capture object | |
vid = cv2.VideoCapture(0) | |
AtomOk = erlang.OtpErlangAtom(bytes("ok", "utf-8")) | |
AtomErr = erlang.OtpErlangAtom(bytes("error", "utf-8")) | |
AtomClose = erlang.OtpErlangAtom(bytes("close", "utf-8")) | |
AtomGetFrame = erlang.OtpErlangAtom(bytes("get_frame", "utf-8")) | |
def send(term, stream): | |
"""Write an Erlang term to an output stream.""" | |
payload = erlang.term_to_binary(term) | |
header = struct.pack('!I', len(payload)) | |
stream.write(header) | |
stream.write(payload) | |
stream.flush() | |
def recv(stream): | |
"""Read an Erlang term from an input stream.""" | |
header = stream.read(4) | |
if len(header) != 4: | |
return None # EOF | |
(length,) = struct.unpack('!I', header) | |
payload = stream.read(length) | |
if len(payload) != length: | |
return None | |
term = erlang.binary_to_term(payload) | |
return term | |
def recv_loop(stream): | |
"""Yield Erlang terms from an input stream.""" | |
message = recv(stream) | |
while message: | |
yield message | |
message = recv(stream) | |
if __name__ == '__main__': | |
input, output = os.fdopen(3, 'rb'), os.fdopen(4, 'wb') | |
for message in recv_loop(input): | |
operation = message[0] | |
ref = message[1] | |
if operation == AtomGetFrame: | |
ret, frame = vid.read() | |
_, jpeg = cv2.imencode('.JPEG', frame) | |
send([ref, (AtomOk, jpeg.tobytes())], output) | |
elif operation == AtomClose: | |
# After the loop release the cap object | |
vid.release() | |
# Destroy all the windows | |
cv2.destroyAllWindows() | |
send([ref, AtomOk], output) | |
else: | |
print("unknown message") | |
send([ref, (AtomErr, 'unknwon message')], output) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Test do | |
@moduledoc """ | |
usage: | |
iex> {:ok, pid} = Test.start_link [] | |
{:ok, #PID<0.119.0>} | |
{:ok, jpeg} = Test.get_frame(pid) | |
{:ok, | |
<<255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 255, | |
219, 0, 67, 0, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 2, 2, 2, 2, 4, 3, 2, 2, 2, 2, | |
5, 4, ...>>} | |
""" | |
use GenServer | |
def get_frame(pid), do: GenServer.call(pid, :get_frame) | |
def start_link(args) do | |
GenServer.start_link(__MODULE__, args) | |
end | |
@impl GenServer | |
def init(_args) do | |
port = open_port() | |
{:ok, %{port: port, caller: nil}} | |
end | |
@impl GenServer | |
def handle_call(:get_frame, from, state) do | |
true = :erlang.port_command(state.port, :erlang.term_to_binary([:get_frame, make_ref(), []])) | |
{:noreply, %{state | caller: from}} | |
end | |
@impl GenServer | |
def handle_info({port, {:data, response}}, %{port: port} = state) do | |
[_ref, response] = :erlang.binary_to_term(response) | |
case response do | |
{:ok, jpeg} when is_list(jpeg) -> | |
GenServer.reply(state.caller, {:ok, :erlang.list_to_binary(jpeg)}) | |
error -> | |
GenServer.reply(state.caller, error) | |
end | |
{:noreply, %{state | caller: nil}} | |
end | |
defp open_port do | |
Port.open( | |
{:spawn_executable, "/usr/bin/env"}, | |
[:binary, {:packet, 4}, {:args, ["python", "-u", "test.py"]}, :nouse_stdio] | |
) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment