10 from typing import Optional
15 gi.require_version('Gst', '1.0')
16 from gi.repository import Gst
17 gi.require_version('GstWebRTC', '1.0')
18 from gi.repository import GstWebRTC
19 gi.require_version('GstSdp', '1.0')
20 from gi.repository import GstSdp
23 log = logging.getLogger(__name__)
26 sdp_offer: Optional[str] = None
30 async def listen_to_gstreamer_bus():
31 global sdp_offer, ice_candidates
33 webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
34 pipe = Gst.Pipeline.new("pipeline")
35 Gst.Bin.do_add_element(pipe, webrtcbin)
36 bus = Gst.Pipeline.get_bus(pipe)
37 pipe.set_state(Gst.State.PLAYING)
40 if bus.have_pending():
41 msg = bus.pop() # Gst.Message, has to be unref'ed.
42 log.info(f'Receive Gst.Message: {msg.type}')
43 # Gst.Message.unref(msg)
44 elif sdp_offer is not None:
45 res, sm = GstSdp.SDPMessage.new()
46 assert res == GstSdp.SDPResult.OK
47 GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm)
48 # the three lines above can also be done this way in new versions of GStreamer:
49 # sm = GstSdp.SDPMessage.new_from_text(sdp_offer)
50 rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm)
51 webrtcbin.emit('set-remote-description', rd, None)
53 elif len(ice_candidates) > 0:
54 ic = ice_candidates.pop(0)
55 webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
57 await asyncio.sleep(0.1)
59 pipe.set_state(Gst.State.NULL)
62 async def talk_to_websocket(uri):
63 global sdp_offer, ice_candidates
64 ssl_context = ssl.SSLContext()
65 ssl_context.check_hostname = False
66 ssl_context.verify_mode = ssl.CERT_NONE
67 async with websockets.connect(uri, ssl=ssl_context) as websocket:
68 async for msg in websocket:
69 msg_json = json.loads(msg)
70 msg_type = msg_json['Type']
71 msg_value = msg_json['Value']
72 session_id = msg_json['SessionID']
73 log.info(f"receive for session {session_id} type {msg_type}")
74 if msg_type == 'newSession':
76 elif msg_type == 'gotOffer':
77 value_json = json.loads(msg_value)
78 sdp = value_json['sdp']
79 log.info(f'SDP: {sdp}')
81 elif msg_type == 'addCallerIceCandidate':
82 value_json = json.loads(msg_value)
83 log.info(f'ICE: {value_json}')
84 ice_candidates.append(value_json)
86 log.error(f'Unknown message type {msg_type}')
90 talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri))
91 listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus())
92 done, pending = await asyncio.wait(
93 [talk_to_websocket_task, listen_to_gstreamer_bus_task],
94 return_when=asyncio.FIRST_COMPLETED)
102 logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
103 parser = argparse.ArgumentParser()
104 parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
105 help='Signalling server URI')
106 args = parser.parse_args()
107 asyncio.run(run(args.uri), debug=True)
110 if __name__ == '__main__':