#!/usr/bin/python3 import argparse import asyncio import json import logging import pathlib import ssl import sys from typing import Optional import websockets import gi gi.require_version('Gst', '1.0') from gi.repository import Gst gi.require_version('GstWebRTC', '1.0') from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp log = logging.getLogger(__name__) sdp_offer: Optional[str] = None ice_candidates = [] async def listen_to_gstreamer_bus(): global sdp_offer, ice_candidates Gst.init(None) webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace') pipe = Gst.Pipeline.new("pipeline") Gst.Bin.do_add_element(pipe, webrtcbin) bus = Gst.Pipeline.get_bus(pipe) pipe.set_state(Gst.State.PLAYING) try: while True: if bus.have_pending(): msg = bus.pop() # Gst.Message, has to be unref'ed. log.info(f'Receive Gst.Message: {msg.type}') # Gst.Message.unref(msg) elif sdp_offer is not None: res, sm = GstSdp.SDPMessage.new() assert res == GstSdp.SDPResult.OK GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm) # the three lines above can also be done this way in new versions of GStreamer: # sm = GstSdp.SDPMessage.new_from_text(sdp_offer) rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm) webrtcbin.emit('set-remote-description', rd, None) sdp_offer = None elif len(ice_candidates) > 0: ic = ice_candidates.pop(0) webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate']) else: await asyncio.sleep(0.1) finally: pipe.set_state(Gst.State.NULL) async def talk_to_websocket(uri): global sdp_offer, ice_candidates ssl_context = ssl.SSLContext() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE async with websockets.connect(uri, ssl=ssl_context) as websocket: async for msg in websocket: msg_json = json.loads(msg) msg_type = msg_json['Type'] msg_value = msg_json['Value'] session_id = msg_json['SessionID'] log.info(f"receive for session {session_id} type {msg_type}") if msg_type == 'newSession': pass elif msg_type == 'gotOffer': value_json = json.loads(msg_value) sdp = value_json['sdp'] log.info(f'SDP: {sdp}') sdp_offer = sdp elif msg_type == 'addCallerIceCandidate': value_json = json.loads(msg_value) log.info(f'ICE: {value_json}') ice_candidates.append(value_json) else: log.error(f'Unknown message type {msg_type}') async def run(uri): talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri)) listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus()) done, pending = await asyncio.wait( [talk_to_websocket_task, listen_to_gstreamer_bus_task], return_when=asyncio.FIRST_COMPLETED) for d in done: d.result() for p in pending: p.cancel() def main(): logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') parser = argparse.ArgumentParser() parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug', help='Signalling server URI') args = parser.parse_args() asyncio.run(run(args.uri), debug=True) if __name__ == '__main__': main()