|
import random |
|
import ssl |
|
import websockets |
|
import asyncio |
|
import os |
|
import sys |
|
import json |
|
import argparse |
|
|
|
import gi |
|
gi.require_version('Gst', '1.0') |
|
from gi.repository import Gst |
|
gi.require_version('GstWebRTC', '1.0') |
|
from gi.repository import GstWebRTC |
|
gi.require_version('GstSdp', '1.0') |
|
from gi.repository import GstSdp |
|
from gi.repository import GObject |
|
|
|
|
|
os.environ['GST_PLUGIN_PATH'] = "/home/<USER>/gst-wayland-display/" |
|
os.environ['DISPLAY'] = "wayland-1" |
|
os.environ['WAYLAND_DISPLAY'] = "wayland-1" |
|
os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000/" |
|
|
|
PIPELINE_DESC = ''' |
|
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 |
|
waylanddisplaysrc ! video/x-raw,width=1280,height=720,format=RGBx,framerate=60/1 ! queue ! videoconvert ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,width=1280,height=720,format=RGBx,framerate=60/1,encoding-name=VP8,payload=97 ! sendrecv. |
|
audiotestsrc ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. |
|
''' |
|
|
|
|
|
from websockets.version import version as wsv |
|
|
|
class WebRTCClient: |
|
def __init__(self, id_, peer_id, server): |
|
self.id_ = id_ |
|
self.conn = None |
|
self.pipe = None |
|
self.webrtc = None |
|
self.set_devices = False |
|
self.peer_id = peer_id |
|
self.server = server or 'wss://webrtc.nirbheek.in:8443' |
|
|
|
|
|
async def connect(self): |
|
print("connect") |
|
#sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) |
|
sslctx = ssl.create_default_context() |
|
self.conn = await websockets.connect(self.server, ssl=sslctx) |
|
await self.conn.send('HELLO %d' % self.id_) |
|
|
|
async def setup_call(self): |
|
print("setup_call") |
|
await self.conn.send('SESSION {}'.format(self.peer_id)) |
|
|
|
def send_sdp_offer(self, offer): |
|
text = offer.sdp.as_text() |
|
print ('Sending offer:\n%s' % text) |
|
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}}) |
|
loop = asyncio.new_event_loop() |
|
loop.run_until_complete(self.conn.send(msg)) |
|
loop.close() |
|
|
|
def on_offer_created(self, promise, _, __): |
|
print("on_offer_created") |
|
promise.wait() |
|
reply = promise.get_reply() |
|
#offer = reply['offer'] |
|
offer = reply.get_value('offer') |
|
promise = Gst.Promise.new() |
|
self.webrtc.emit('set-local-description', offer, promise) |
|
promise.interrupt() |
|
self.send_sdp_offer(offer) |
|
|
|
def on_negotiation_needed(self, element): |
|
print("on_negotiation_needed") |
|
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) |
|
element.emit('create-offer', None, promise) |
|
|
|
def send_ice_candidate_message(self, _, mlineindex, candidate): |
|
print("send_ice_candidate_message") |
|
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex, 'remoteDescription': "asd"}}) |
|
loop = asyncio.new_event_loop() |
|
loop.run_until_complete(self.conn.send(icemsg)) |
|
loop.close() |
|
|
|
def add_devices(self): |
|
if not self.set_devices: |
|
print("add_devices") |
|
self.set_devices = True |
|
custom_structure = Gst.Structure.new_empty("VirtualDevicesReady") |
|
value = GObject.Value(GObject.ValueArray) |
|
value.set_value([ |
|
"/dev/input/event20", |
|
"/dev/input/event21", |
|
"/dev/input/event22", |
|
]) |
|
custom_structure.set_value("paths", value) |
|
event = Gst.Event.new_custom(Gst.EventType.CUSTOM_UPSTREAM, custom_structure) |
|
self.pipe.send_event(event) |
|
|
|
def on_incoming_decodebin_stream(self, _, pad): |
|
print("on_incoming_decodebin_stream") |
|
self.add_devices() |
|
if not pad.has_current_caps(): |
|
print (pad, 'has no caps, ignoring') |
|
return |
|
|
|
caps = pad.get_current_caps() |
|
s = caps.get_structure(0) |
|
name = s.get_name() |
|
print("name "+name) |
|
if name.startswith('way') or name.startswith('rtspsrc') or name.startswith('video'): |
|
q = Gst.ElementFactory.make('queue') |
|
conv = Gst.ElementFactory.make('videoconvert') |
|
sink = Gst.ElementFactory.make('autovideosink') |
|
self.pipe.add(q) |
|
self.pipe.add(conv) |
|
self.pipe.add(sink) |
|
self.pipe.sync_children_states() |
|
pad.link(q.get_static_pad('sink')) |
|
q.link(conv) |
|
conv.link(sink) |
|
elif name.startswith('audio'): |
|
q = Gst.ElementFactory.make('queue') |
|
conv = Gst.ElementFactory.make('audioconvert') |
|
resample = Gst.ElementFactory.make('audioresample') |
|
sink = Gst.ElementFactory.make('autoaudiosink') |
|
self.pipe.add(q) |
|
self.pipe.add(conv) |
|
self.pipe.add(resample) |
|
self.pipe.add(sink) |
|
self.pipe.sync_children_states() |
|
pad.link(q.get_static_pad('sink')) |
|
q.link(conv) |
|
conv.link(resample) |
|
resample.link(sink) |
|
|
|
def on_incoming_stream(self, _, pad): |
|
print("on_incoming_stream") |
|
if pad.direction != Gst.PadDirection.SRC: |
|
return |
|
|
|
decodebin = Gst.ElementFactory.make('decodebin') |
|
decodebin.connect('pad-added', self.on_incoming_decodebin_stream) |
|
self.pipe.add(decodebin) |
|
decodebin.sync_state_with_parent() |
|
self.webrtc.link(decodebin) |
|
|
|
def start_pipeline(self): |
|
print("start_pipeline") |
|
self.pipe = Gst.parse_launch(PIPELINE_DESC) |
|
self.webrtc = self.pipe.get_by_name('sendrecv') |
|
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) |
|
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) |
|
self.webrtc.connect('pad-added', self.on_incoming_stream) |
|
self.pipe.set_state(Gst.State.PLAYING) |
|
|
|
def handle_sdp(self, message): |
|
print("handle_sdp") |
|
assert (self.webrtc) |
|
msg = json.loads(message) |
|
if 'sdp' in msg: |
|
sdp = msg['sdp'] |
|
assert(sdp['type'] == 'answer') |
|
sdp = sdp['sdp'] |
|
print ('Received answer:\n%s' % sdp) |
|
res, sdpmsg = GstSdp.SDPMessage.new() |
|
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) |
|
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) |
|
promise = Gst.Promise.new() |
|
self.webrtc.emit('set-remote-description', answer, promise) |
|
promise.interrupt() |
|
elif 'ice' in msg: |
|
ice = msg['ice'] |
|
candidate = ice['candidate'] |
|
sdpmlineindex = ice['sdpMLineIndex'] |
|
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) |
|
|
|
def close_pipeline(self): |
|
print("close_pipeline") |
|
self.pipe.set_state(Gst.State.NULL) |
|
self.pipe = None |
|
self.webrtc = None |
|
|
|
async def loop(self): |
|
assert self.conn |
|
async for message in self.conn: |
|
if message == 'HELLO': |
|
await self.setup_call() |
|
elif message == 'SESSION_OK': |
|
self.start_pipeline() |
|
elif message.startswith('ERROR'): |
|
print (message) |
|
self.close_pipeline() |
|
return 1 |
|
else: |
|
self.handle_sdp(message) |
|
self.close_pipeline() |
|
return 0 |
|
|
|
async def stop(self): |
|
if self.conn: |
|
await self.conn.close() |
|
self.conn = None |
|
|
|
|
|
def check_plugins(): |
|
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", |
|
"rtpmanager", "videotestsrc", "audiotestsrc"] |
|
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) |
|
if len(missing): |
|
print('Missing gstreamer plugins:', missing) |
|
return False |
|
return True |
|
|
|
|
|
if __name__=='__main__': |
|
Gst.init(None) |
|
if not check_plugins(): |
|
sys.exit(1) |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('peerid', help='String ID of the peer to connect to') |
|
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"') |
|
args = parser.parse_args() |
|
our_id = random.randrange(10, 10000) |
|
c = WebRTCClient(our_id, args.peerid, args.server) |
|
loop = asyncio.get_event_loop() |
|
loop.run_until_complete(c.connect()) |
|
res = loop.run_until_complete(c.loop()) |
|
sys.exit(res) |