Last active
September 27, 2021 18:44
-
-
Save saket424/4920977cebaf49ab17c64a1162578eed to your computer and use it in GitHub Desktop.
to publish to videoroom 1234 as user gstwebrtcdemo use 'python3 janusvideoroom.py --server wss://janus.example.com:8989 gstwebrtcdemo'
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import ssl | |
import websockets | |
import asyncio | |
import os | |
import sys | |
import json | |
import argparse | |
import string | |
from websockets.exceptions import ConnectionClosed | |
import attr | |
@attr.s | |
class JanusEvent: | |
sender = attr.ib(validator=attr.validators.instance_of(int)) | |
@attr.s | |
class PluginData(JanusEvent): | |
plugin = attr.ib(validator=attr.validators.instance_of(str)) | |
data = attr.ib() | |
jsep = attr.ib() | |
@attr.s | |
class WebrtcUp(JanusEvent): | |
pass | |
@attr.s | |
class Media(JanusEvent): | |
receiving = attr.ib(validator=attr.validators.instance_of(bool)) | |
kind = attr.ib(validator=attr.validators.in_(["audio", "video"])) | |
@kind.validator | |
def validate_kind(self, attribute, kind): | |
if kind not in ["video", "audio"]: | |
raise ValueError("kind must equal video or audio") | |
@attr.s | |
class SlowLink(JanusEvent): | |
uplink = attr.ib(validator=attr.validators.instance_of(bool)) | |
lost = attr.ib(validator=attr.validators.instance_of(int)) | |
@attr.s | |
class HangUp(JanusEvent): | |
reason = attr.ib(validator=attr.validators.instance_of(str)) | |
@attr.s(cmp=False) | |
class Ack: | |
transaction = attr.ib(validator=attr.validators.instance_of(str)) | |
@attr.s | |
class Jsep: | |
sdp = attr.ib() | |
type = attr.ib(validator=attr.validators.in_(["offer", "pranswer", "answer", "rollback"])) | |
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst | |
gi.require_version('GstWebRTC', '1.0') | |
from gi.repository import GstWebRTC | |
gi.require_version('GstSdp', '1.0') | |
from gi.repository import GstSdp | |
PIPELINE_DESC = ''' | |
webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302 videotestsrc pattern=ball ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! x264enc ! rtph264pay ! | |
queue ! application/x-rtp,media=video,encoding-name=H264,payload=97 ! sendrecv. | |
''' | |
#PIPELINE_DESC = ''' | |
#webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302 rtspsrc location=rtsp://user2:passwd2@192.168.155.166:88/videoMain ! queue ! application/x-rtp, media=video, encoding-name=H264, payload=96 ! rtph264depay ! queue ! rtph264pay config-interval=1 ! application/x-rtp, media=video, encoding-name=H264, payload=97 ! sendrecv. | |
#''' | |
def transaction_id(): | |
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8)) | |
@attr.s | |
class JanusGateway: | |
server = attr.ib(validator=attr.validators.instance_of(str)) | |
#secure = attr.ib(default=True) | |
_messages = attr.ib(factory=set) | |
async def connect(self): | |
self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=ssl.SSLContext()) | |
transaction = transaction_id() | |
await self.conn.send(json.dumps({ | |
"janus": "create", | |
"transaction": transaction | |
})) | |
resp = await self.conn.recv() | |
print (resp) | |
parsed = json.loads(resp) | |
assert parsed["janus"] == "success", "Failed creating session" | |
assert parsed["transaction"] == transaction, "Incorrect transaction" | |
self.session = parsed["data"]["id"] | |
async def close(self): | |
await self.conn.close() | |
async def attach(self, plugin): | |
assert hasattr(self, "session"), "Must connect before attaching to plugin" | |
transaction = transaction_id() | |
await self.conn.send(json.dumps({ | |
"janus": "attach", | |
"session_id": self.session, | |
"plugin": plugin, | |
"transaction": transaction | |
})) | |
resp = await self.conn.recv() | |
parsed = json.loads(resp) | |
assert parsed["janus"] == "success", "Failed attaching to {}".format(plugin) | |
assert parsed["transaction"] == transaction, "Incorrect transaction" | |
self.handle = parsed["data"]["id"] | |
async def sendtrickle(self, candidate): | |
assert hasattr(self, "session"), "Must connect before sending messages" | |
assert hasattr(self, "handle"), "Must attach before sending messages" | |
transaction = transaction_id() | |
janus_message = { | |
"janus": "trickle", | |
"session_id": self.session, | |
"handle_id": self.handle, | |
"transaction": transaction, | |
"candidate": candidate | |
} | |
await self.conn.send(json.dumps(janus_message)) | |
#while True: | |
# resp = await self._recv_and_parse() | |
# if isinstance(resp, PluginData): | |
# return resp | |
# else: | |
# self._messages.add(resp) | |
# | |
async def sendmessage(self, body, jsep=None): | |
assert hasattr(self, "session"), "Must connect before sending messages" | |
assert hasattr(self, "handle"), "Must attach before sending messages" | |
transaction = transaction_id() | |
janus_message = { | |
"janus": "message", | |
"session_id": self.session, | |
"handle_id": self.handle, | |
"transaction": transaction, | |
"body": body | |
} | |
if jsep is not None: | |
janus_message["jsep"] = jsep | |
await self.conn.send(json.dumps(janus_message)) | |
#while True: | |
# resp = await self._recv_and_parse() | |
# if isinstance(resp, PluginData): | |
# if jsep is not None: | |
# await client.handle_sdp(resp.jsep) | |
# return resp | |
# else: | |
# self._messages.add(resp) | |
async def keepalive(self): | |
assert hasattr(self, "session"), "Must connect before sending messages" | |
assert hasattr(self, "handle"), "Must attach before sending messages" | |
while True: | |
try: | |
await asyncio.sleep(10) | |
transaction = transaction_id() | |
await self.conn.send(json.dumps({ | |
"janus": "keepalive", | |
"session_id": self.session, | |
"handle_id": self.handle, | |
"transaction": transaction | |
})) | |
except KeyboardInterrupt: | |
return | |
async def recv(self): | |
if len(self._messages) > 0: | |
return self._messages.pop() | |
else: | |
return await self._recv_and_parse() | |
async def _recv_and_parse(self): | |
raw = json.loads(await self.conn.recv()) | |
janus = raw["janus"] | |
if janus == "event": | |
return PluginData( | |
sender=raw["sender"], | |
plugin=raw["plugindata"]["plugin"], | |
data=raw["plugindata"]["data"], | |
jsep=raw["jsep"] if "jsep" in raw else None | |
) | |
elif janus == "webrtcup": | |
return WebrtcUp( | |
sender=raw["sender"] | |
) | |
elif janus == "media": | |
return Media( | |
sender=raw["sender"], | |
receiving=raw["receiving"], | |
kind=raw["type"] | |
) | |
elif janus == "slowlink": | |
return SlowLink( | |
sender=raw["sender"], | |
uplink=raw["uplink"], | |
lost=raw["lost"] | |
) | |
elif janus == "hangup": | |
return HangUp( | |
sender=raw["sender"], | |
reason=raw["reason"] | |
) | |
elif janus == "ack": | |
return Ack( | |
transaction=raw["transaction"] | |
) | |
else: | |
return raw | |
class WebRTCClient: | |
def __init__(self, id_, peer_id, server, signaling): | |
self.id_ = id_ | |
self.conn = None | |
self.pipe = None | |
self.webrtc = None | |
self.peer_id = peer_id | |
self.server = server or 'wss://127.0.0.1:8989' | |
self.signaling = signaling | |
self.request = None | |
self.offermsg = None | |
def send_sdp_offer(self, offer): | |
text = offer.sdp.as_text() | |
print ('Sending offer:\n%s' % text) | |
# configure media | |
media = {'audio': True, 'video': True} | |
request = {'request': 'publish'} | |
request.update(media) | |
self.request = request | |
self.offermsg = { 'sdp': text, 'trickle': True, 'type': 'offer' } | |
print (self.offermsg) | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(self.signaling.sendmessage(self.request, self.offermsg)) | |
def on_offer_created(self, promise, _, __): | |
promise.wait() | |
reply = promise.get_reply() | |
offer = reply.get_value('offer') | |
promise = Gst.Promise.new() | |
self.webrtc.emit('set-local-description', offer, promise) | |
promise.interrupt() | |
self.send_sdp_offer(offer) | |
def on_negotiation_needed(self, element): | |
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) | |
element.emit('create-offer', None, promise) | |
def send_ice_candidate_message(self, _, mlineindex, candidate): | |
icemsg = {'candidate': candidate, 'sdpMLineIndex': mlineindex, 'sdpMid': "0"} | |
self.webrtc.emit('add-ice-candidate', mlineindex, candidate) | |
print (icemsg) | |
loop = asyncio.new_event_loop() | |
loop.run_until_complete(self.signaling.sendtrickle(icemsg)) | |
def on_incoming_decodebin_stream(self, _, pad): | |
if not pad.has_current_caps(): | |
print (pad, 'has no caps, ignoring') | |
return | |
caps = pad.get_current_caps() | |
name = caps.to_string() | |
if name.startswith('video'): | |
q = Gst.ElementFactory.make('queue') | |
conv = Gst.ElementFactory.make('videoconvert') | |
sink = Gst.ElementFactory.make('autovideosink') | |
self.pipe.add(q) | |
self.pipe.add(conv) | |
self.pipe.add(sink) | |
self.pipe.sync_children_states() | |
pad.link(q.get_static_pad('sink')) | |
q.link(conv) | |
conv.link(sink) | |
elif name.startswith('audio'): | |
q = Gst.ElementFactory.make('queue') | |
conv = Gst.ElementFactory.make('audioconvert') | |
resample = Gst.ElementFactory.make('audioresample') | |
sink = Gst.ElementFactory.make('autoaudiosink') | |
self.pipe.add(q) | |
self.pipe.add(conv) | |
self.pipe.add(resample) | |
self.pipe.add(sink) | |
self.pipe.sync_children_states() | |
pad.link(q.get_static_pad('sink')) | |
q.link(conv) | |
conv.link(resample) | |
resample.link(sink) | |
def on_incoming_stream(self, _, pad): | |
if pad.direction != Gst.PadDirection.SRC: | |
return | |
decodebin = Gst.ElementFactory.make('decodebin') | |
decodebin.connect('pad-added', self.on_incoming_decodebin_stream) | |
self.pipe.add(decodebin) | |
decodebin.sync_state_with_parent() | |
self.webrtc.link(decodebin) | |
def start_pipeline(self): | |
self.pipe = Gst.parse_launch(PIPELINE_DESC) | |
self.webrtc = self.pipe.get_by_name('sendrecv') | |
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) | |
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) | |
self.webrtc.connect('pad-added', self.on_incoming_stream) | |
self.pipe.set_state(Gst.State.PLAYING) | |
async def handle_sdp(self, msg): | |
print (msg) | |
if 'sdp' in msg: | |
sdp = msg['sdp'] | |
assert(msg['type'] == 'answer') | |
print ('Received answer:\n%s' % sdp) | |
res, sdpmsg = GstSdp.SDPMessage.new() | |
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) | |
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) | |
promise = Gst.Promise.new() | |
self.webrtc.emit('set-remote-description', answer, promise) | |
promise.interrupt | |
for line in sdp.splitlines(): | |
if line.startswith("a=candidate"): | |
candidate = line[2:] | |
print ('Received remote ice-candidate : %s\n' % candidate) | |
self.webrtc.emit('add-ice-candidate', 0, candidate) | |
elif 'ice' in msg: | |
ice = msg['ice'] | |
candidate = ice['candidate'] | |
sdpmlineindex = ice['sdpMLineIndex'] | |
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) | |
async def loop(self, signaling): | |
await signaling.connect() | |
await signaling.attach("janus.plugin.videoroom") | |
loop = asyncio.get_event_loop() | |
loop.create_task(signaling.keepalive()) | |
#asyncio.create_task(self.keepalive()) | |
joinmessage = { "request": "join", "ptype": "publisher", "room": 1234, "display": self.peer_id } | |
await signaling.sendmessage(joinmessage) | |
assert signaling.conn | |
self.start_pipeline() | |
while True: | |
try: | |
msg = await signaling.recv() | |
if isinstance(msg, PluginData): | |
if msg.jsep is not None: | |
await self.handle_sdp(msg.jsep) | |
elif isinstance(msg, Media): | |
print (msg) | |
elif isinstance(msg, WebrtcUp): | |
print (msg) | |
elif isinstance(msg, SlowLink): | |
print (msg) | |
elif isinstance(msg, HangUp): | |
print (msg) | |
elif not isinstance(msg, Ack): | |
if 'candidate' in msg: | |
ice = msg['candidate'] | |
print (ice) | |
if 'candidate' in ice: | |
candidate = ice['candidate'] | |
sdpmlineindex = ice['sdpMLineIndex'] | |
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) | |
print(msg) | |
except (KeyboardInterrupt, ConnectionClosed): | |
return | |
return 0 | |
def check_plugins(): | |
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", | |
"rtpmanager", "videotestsrc", "audiotestsrc"] | |
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) | |
if len(missing): | |
print('Missing gstreamer plugins:', missing) | |
return False | |
return True | |
if __name__=='__main__': | |
Gst.init(None) | |
if not check_plugins(): | |
sys.exit(1) | |
parser = argparse.ArgumentParser() | |
parser.add_argument('label', help='videoroom label') | |
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8989"') | |
args = parser.parse_args() | |
our_id = random.randrange(10, 10000) | |
signaling = JanusGateway(args.server) | |
c = WebRTCClient(our_id, args.label, args.server, signaling) | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete( | |
c.loop(signaling) | |
) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
print("Interrupted, cleaning up") | |
loop.run_until_complete(signaling.close()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
websockets | |
gobject | |
PyGObject | |
attrs |
Try ws://127.0.0.1:8188 or get a proper ssl cert
…On Thu, Nov 14, 2019 at 3:48 PM mayankag87 ***@***.***> wrote:
Hi,
I tried executing your python code but facing difficulties connecting to
the janus server which is installed on the same server I am executing your
code on.
I am using the following command:
***@***.*** to_Janus]# python3 janusvideoroom.py --server wss://
127.0.0.1:8989 gstwebrtcdemo
And I am getting the following error:
Interrupted, cleaning up
Traceback (most recent call last):
File "janusvideoroom.py", line 411, in
c.loop(signaling)
File "/usr/lib64/python3.6/asyncio/base_events.py", line 484, in
run_until_complete
return future.result()
File "janusvideoroom.py", line 345, in loop
await signaling.connect()
File "janusvideoroom.py", line 84, in connect
self.conn = await websockets.connect(self.server,
subprotocols=['janus-protocol'], ssl=ssl.SSLContext())
File "/root/.local/lib/python3.6/site-packages/websockets/client.py", line
535, in *await_impl*
transport, protocol = await self._create_connection()
File "/usr/lib64/python3.6/asyncio/base_events.py", line 794, in
create_connection
raise exceptions[0]
File "/usr/lib64/python3.6/asyncio/base_events.py", line 781, in
create_connection
yield from self.sock_connect(sock, address)
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 439, in
sock_connect
return (yield from fut)
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 469, in
_sock_connect_cb
raise OSError(err, 'Connect call failed %s' % (address,))
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 8989)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "janusvideoroom.py", line 417, in
loop.run_until_complete(signaling.close())
File "/usr/lib64/python3.6/asyncio/base_events.py", line 484, in
run_until_complete
return future.result()
File "janusvideoroom.py", line 98, in close
await self.conn.close()
AttributeError: 'JanusGateway' object has no attribute 'conn'
I am able to run the gstreamer webRTC demos including the pyhton and C
demo. I am able to connect to Nirbheek's server using wss but not my own
server. I can confirm that jauns has websockets enabled.
Could you please help me figure out the issue here?
Thanks in advance
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<https://gist.github.com/4920977cebaf49ab17c64a1162578eed?email_source=notifications&email_token=ABCCOS2A4TLNN5ZTUYJZ5QLQTXBTDA5CNFSM4JNSFEHKYY3PNVWWK3TUL52HS4DFVNDWS43UINXW23LFNZ2KUY3PNVWWK3TUL5UWJTQAF4GVQ#gistcomment-3083608>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABCCOS6YGG7WHMMBKLCL4I3QTXBTDANCNFSM4JNSFEHA>
.
I tried ws://127.0.0.1:8188. I am still getting the same error.
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 469, in _sock_connect_cb
raise OSError(err, 'Connect call failed %s' % (address,))
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 8188)
I have a ssl certificate on my server. I use it for HTTPS and it works fine.
Any other idea of what I might be missing here?
Thanks
make sure wss is enabled as transport
…On Thu, Nov 14, 2019 at 4:17 PM mayankag87 ***@***.***> wrote:
I tried ws://127.0.0.1:8188. I am still getting the same error.
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 469, in
_sock_connect_cb
raise OSError(err, 'Connect call failed %s' % (address,))
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 8188)
I have a ssl certificate on my server. I use it for HTTPS and it works
fine.
Any other idea of what I might be missing here?
Thanks
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<https://gist.github.com/4920977cebaf49ab17c64a1162578eed?email_source=notifications&email_token=ABCCOSY2VCQKJQT4SGKIHHTQTXFANA5CNFSM4JNSFEHKYY3PNVWWK3TUL52HS4DFVNDWS43UINXW23LFNZ2KUY3PNVWWK3TUL5UWJTQAF4GW4#gistcomment-3083630>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABCCOS5HJGITY2DJECRDPSLQTXFANANCNFSM4JNSFEHA>
.
Yes wss enabled. However, there is some issue with libwebsockets installation on my server. Can I see that janus is not able to load websockets properly. I will investigate this. Thanks.
@saket424 is there a way to receive a video/audio stream from other publishers?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
I tried executing your python code but facing difficulties connecting to the janus server which is installed on the same server I am executing your code on.
I am using the following command:
[root@dev-mtlrec01 to_Janus]# python3 janusvideoroom.py --server wss://127.0.0.1:8989 gstwebrtcdemo
And I am getting the following error:
Interrupted, cleaning up
Traceback (most recent call last):
File "janusvideoroom.py", line 411, in
c.loop(signaling)
File "/usr/lib64/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "janusvideoroom.py", line 345, in loop
await signaling.connect()
File "janusvideoroom.py", line 84, in connect
self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=ssl.SSLContext())
File "/root/.local/lib/python3.6/site-packages/websockets/client.py", line 535, in await_impl
transport, protocol = await self._create_connection()
File "/usr/lib64/python3.6/asyncio/base_events.py", line 794, in create_connection
raise exceptions[0]
File "/usr/lib64/python3.6/asyncio/base_events.py", line 781, in create_connection
yield from self.sock_connect(sock, address)
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 439, in sock_connect
return (yield from fut)
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 469, in _sock_connect_cb
raise OSError(err, 'Connect call failed %s' % (address,))
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 8989)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "janusvideoroom.py", line 417, in
loop.run_until_complete(signaling.close())
File "/usr/lib64/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "janusvideoroom.py", line 98, in close
await self.conn.close()
AttributeError: 'JanusGateway' object has no attribute 'conn'
I am able to run the gstreamer webRTC demos including the pyhton and C demo. I am able to connect to Nirbheek's server using wss but not my own server. I can confirm that jauns has websockets enabled.
Could you please help me figure out the issue here?
Thanks in advance