+#!/usr/bin/python3
+
+import argparse
+import asyncio
+import json
+import logging
+import pathlib
+import ssl
+import sys
+from typing import Optional
+
+import websockets
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+gi.require_version('GstWebRTC', '1.0')
+from gi.repository import GstWebRTC
+gi.require_version('GstSdp', '1.0')
+from gi.repository import GstSdp
+
+
+log = logging.getLogger(__name__)
+
+
+sdp_offer: Optional[str] = None
+ice_candidates = []
+
+
+async def listen_to_gstreamer_bus():
+ global sdp_offer, ice_candidates
+ Gst.init(None)
+ webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
+ pipe = Gst.Pipeline.new("pipeline")
+ Gst.Bin.do_add_element(pipe, webrtcbin)
+ bus = Gst.Pipeline.get_bus(pipe)
+ pipe.set_state(Gst.State.PLAYING)
+ try:
+ while True:
+ if bus.have_pending():
+ msg = bus.pop() # Gst.Message, has to be unref'ed.
+ log.info(f'Receive Gst.Message: {msg.type}')
+ # Gst.Message.unref(msg)
+ elif sdp_offer is not None:
+ res, sm = GstSdp.SDPMessage.new()
+ assert res == GstSdp.SDPResult.OK
+ GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm)
+ # the three lines above can also be done this way in new versions of GStreamer:
+ # sm = GstSdp.SDPMessage.new_from_text(sdp_offer)
+ rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm)
+ webrtcbin.emit('set-remote-description', rd, None)
+ sdp_offer = None
+ elif len(ice_candidates) > 0:
+ ic = ice_candidates.pop(0)
+ webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
+ else:
+ await asyncio.sleep(0.1)
+ finally:
+ pipe.set_state(Gst.State.NULL)
+
+
+async def talk_to_websocket(uri):
+ global sdp_offer, ice_candidates
+ ssl_context = ssl.SSLContext()
+ ssl_context.check_hostname = False
+ ssl_context.verify_mode = ssl.CERT_NONE
+ async with websockets.connect(uri, ssl=ssl_context) as websocket:
+ async for msg in websocket:
+ msg_json = json.loads(msg)
+ msg_type = msg_json['Type']
+ msg_value = msg_json['Value']
+ session_id = msg_json['SessionID']
+ log.info(f"receive for session {session_id} type {msg_type}")
+ if msg_type == 'newSession':
+ pass
+ elif msg_type == 'gotOffer':
+ value_json = json.loads(msg_value)
+ sdp = value_json['sdp']
+ log.info(f'SDP: {sdp}')
+ sdp_offer = sdp
+ elif msg_type == 'addCallerIceCandidate':
+ value_json = json.loads(msg_value)
+ log.info(f'ICE: {value_json}')
+ ice_candidates.append(value_json)
+ else:
+ log.error(f'Unknown message type {msg_type}')
+
+
+async def run(uri):
+ talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri))
+ listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus())
+ done, pending = await asyncio.wait(
+ [talk_to_websocket_task, listen_to_gstreamer_bus_task],
+ return_when=asyncio.FIRST_COMPLETED)
+ for d in done:
+ d.result()
+ for p in pending:
+ p.cancel()
+
+
+def main():
+ logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
+ help='Signalling server URI')
+ args = parser.parse_args()
+ asyncio.run(run(args.uri), debug=True)
+
+
+if __name__ == '__main__':
+ main()