Play with new structure of laplace client.
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Tue, 7 Jul 2020 21:21:28 +0000 (23:21 +0200)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Tue, 7 Jul 2020 21:21:28 +0000 (23:21 +0200)
laplace_client_2.py

index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..a43c5a607c1febd907d1883f71a1211d4bf64d76 100644 (file)
@@ -0,0 +1,111 @@
+#!/usr/bin/python3
+
+import argparse
+import asyncio
+import json
+import logging
+import pathlib
+import ssl
+import sys
+from typing import Optional
+
+import websockets
+
+import gi
+gi.require_version('Gst', '1.0')
+from gi.repository import Gst
+gi.require_version('GstWebRTC', '1.0')
+from gi.repository import GstWebRTC
+gi.require_version('GstSdp', '1.0')
+from gi.repository import GstSdp
+
+
+log = logging.getLogger(__name__)
+
+
+sdp_offer: Optional[str] = None
+ice_candidates = []
+
+
+async def listen_to_gstreamer_bus():
+    global sdp_offer, ice_candidates
+    Gst.init(None)
+    webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
+    pipe = Gst.Pipeline.new("pipeline")
+    Gst.Bin.do_add_element(pipe, webrtcbin)
+    bus = Gst.Pipeline.get_bus(pipe)
+    pipe.set_state(Gst.State.PLAYING)
+    try:
+        while True:
+            if bus.have_pending():
+                msg = bus.pop()  # Gst.Message, has to be unref'ed.
+                log.info(f'Receive Gst.Message: {msg.type}')
+                # Gst.Message.unref(msg)
+            elif sdp_offer is not None:
+                res, sm = GstSdp.SDPMessage.new()
+                assert res == GstSdp.SDPResult.OK
+                GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm)
+                # the three lines above can also be done this way in new versions of GStreamer:
+                # sm = GstSdp.SDPMessage.new_from_text(sdp_offer)
+                rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm)
+                webrtcbin.emit('set-remote-description', rd, None)
+                sdp_offer = None
+            elif len(ice_candidates) > 0:
+                ic = ice_candidates.pop(0)
+                webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
+            else:
+                await asyncio.sleep(0.1)
+    finally:
+        pipe.set_state(Gst.State.NULL)
+
+
+async def talk_to_websocket(uri):
+    global sdp_offer, ice_candidates
+    ssl_context = ssl.SSLContext()
+    ssl_context.check_hostname = False
+    ssl_context.verify_mode = ssl.CERT_NONE
+    async with websockets.connect(uri, ssl=ssl_context) as websocket:
+        async for msg in websocket:
+            msg_json = json.loads(msg)
+            msg_type = msg_json['Type']
+            msg_value = msg_json['Value']
+            session_id = msg_json['SessionID']
+            log.info(f"receive for session {session_id} type {msg_type}")
+            if msg_type == 'newSession':
+                pass
+            elif msg_type == 'gotOffer':
+                value_json = json.loads(msg_value)
+                sdp = value_json['sdp']
+                log.info(f'SDP: {sdp}')
+                sdp_offer = sdp
+            elif msg_type == 'addCallerIceCandidate':
+                value_json = json.loads(msg_value)
+                log.info(f'ICE: {value_json}')
+                ice_candidates.append(value_json)
+            else:
+                log.error(f'Unknown message type {msg_type}')
+
+
+async def run(uri):
+    talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri))
+    listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus())
+    done, pending = await asyncio.wait(
+        [talk_to_websocket_task, listen_to_gstreamer_bus_task],
+        return_when=asyncio.FIRST_COMPLETED)
+    for d in done:
+        d.result()
+    for p in pending:
+        p.cancel()
+
+
+def main():
+    logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
+        help='Signalling server URI')
+    args = parser.parse_args()
+    asyncio.run(run(args.uri), debug=True)
+
+
+if __name__ == '__main__':
+    main()