#!/usr/bin/python3 import argparse import asyncio import json import logging import pathlib import ssl import sys import websockets import gi gi.require_version('Gst', '1.0') from gi.repository import Gst gi.require_version('GstWebRTC', '1.0') from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp log = logging.getLogger(__name__) class WebRTCClient: def __init__(self, uri: str): self.uri = uri self.ssl_context = ssl.SSLContext() self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE self.websocket = None self.session_id = None self.userfragments = [] def send_sdp_offer(self, offer): text = offer.sdp.as_text() log.info(f'send_sdp_offer with {text}') msg = json.dumps({ 'SessionID': self.session_id, 'Type': "gotAnswer", 'Value': json.dumps({ 'type': 'answer', 'sdp': text }) }) loop = asyncio.new_event_loop() loop.run_until_complete(self.websocket.send(msg)) loop.close() def on_offer_created(self, promise, _, __): log.info('on_offer_created') promise.wait() reply = promise.get_reply() offer = reply.get_value('offer') promise = Gst.Promise.new() self.webrtc.emit('set-local-description', offer, promise) promise.interrupt() self.send_sdp_offer(offer) sdp = offer.sdp self.userfragments = [sdp.get_media(i).get_attribute_val('ice-ufrag') for i in range(sdp.medias_len())] def on_negotiation_needed(self, element): log.info('on_negotiation_needed') promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) element.emit('create-offer', None, promise) def send_ice_candidate_message(self, _, mlineindex, candidate): icemsg = json.dumps({ 'SessionID': self.session_id, 'Type': 'addCalleeIceCandidate', 'Value': json.dumps({ "candidate": candidate, "sdpMid": f"{mlineindex}", "sdpMLineIndex": mlineindex, "usernameFragment": self.userfragments[mlineindex], }) }) log.info(f'send_ice_candidate_message with {icemsg}') loop = asyncio.new_event_loop() loop.run_until_complete(self.websocket.send(icemsg)) loop.close() def on_incoming_decodebin_stream(self, _, pad): log.info('on_incoming_decodebin_stream') if not pad.has_current_caps(): log.info(pad, 'has no caps, ignoring') return caps = pad.get_current_caps() assert (len(caps)) s = caps[0] name = s.get_name() if name.startswith('video'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('videoconvert') sink = Gst.ElementFactory.make('autovideosink') self.pipe.add(q, conv, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(sink) elif name.startswith('audio'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('audioconvert') resample = Gst.ElementFactory.make('audioresample') sink = Gst.ElementFactory.make('autoaudiosink') self.pipe.add(q, conv, resample, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(sink) def on_incoming_stream(self, _, pad): log.info('on_incoming_stream') if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make('decodebin') decodebin.connect('pad-added', self.on_incoming_decodebin_stream) self.pipe.add(decodebin) decodebin.sync_state_with_parent() self.webrtc.link(decodebin) def start_pipeline(self): self.webrtc = Gst.ElementFactory.make('webrtcbin', 'laplace') # self.webrtc.set_property("bundle-policy", 3) direction = GstWebRTC.WebRTCRTPTransceiverDirection.RECVONLY video_caps = Gst.caps_from_string("application/x-rtp,media=video,encoding-name=VP8/9000,payload=96") audio_caps = Gst.caps_from_string("application/x-rtp,media=audio,encoding-name=OPUS,clock-rate=48000,payload=111") self.webrtc.emit('add-transceiver', direction, video_caps) self.webrtc.emit('add-transceiver', direction, audio_caps) self.pipe = Gst.Pipeline.new("pipeline") Gst.Bin.do_add_element(self.pipe, self.webrtc) self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) self.webrtc.emit('create-data-channel', 'laplace', None) def close_pipeline(self): self.pipe.set_state(Gst.State.NULL) self.pipe = None self.webrtc = None def handle_sdp(self, sdp): log.info(f'handle_sdp: {sdp}') res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() self.webrtc.emit('set-remote-description', answer, promise) promise.interrupt() def handle_ice(self, ice): log.info(f'handle_ice: {ice}') candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) async def run(self): async with websockets.connect(self.uri, ssl=self.ssl_context) as websocket: self.websocket = websocket self.start_pipeline() async for msg in websocket: msg_json = json.loads(msg) msg_type = msg_json['Type'] msg_value = msg_json['Value'] session_id = msg_json['SessionID'] log.info(f"receive for session {session_id} type {msg_type}") if msg_type == 'newSession': self.session_id = session_id elif msg_type == 'gotOffer': value_json = json.loads(msg_value) sdp = value_json['sdp'] self.handle_sdp(sdp) elif msg_type == 'addCallerIceCandidate': value_json = json.loads(msg_value) self.handle_ice(value_json) self.close_pipeline() self.websocket = None self.session_id = None def check_plugins(): for plugin in ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", "rtpmanager", "videotestsrc", "audiotestsrc"]: if Gst.Registry.get().find_plugin(plugin) is None: print('Missing gstreamer plugin:', plugin) return False return True def main(): logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') Gst.init(None) if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug', help='Signalling server URI') args = parser.parse_args() c = WebRTCClient(args.uri) loop = asyncio.get_event_loop() loop.run_until_complete(c.run()) if __name__=='__main__': main()