Add wrapper that listens to the availability of the room.
[toast/stream2beamer.git] / lagarde.py
index 1994893fc2f81c9aa26587fa2912eedf1459ca36..c31a45a98958f760620b85f5a5f2b44c52fe9e6e 100755 (executable)
@@ -5,6 +5,7 @@ import asyncio
 import json
 import logging
 import ssl
+import queue
 from typing import Optional, List
 
 import gi
@@ -19,16 +20,38 @@ from gi.repository import GstWebRTC
 gi.require_version('GstSdp', '1.0')
 from gi.repository import GstSdp
 
+gi.require_version('GstRtspServer', '1.0')
+from gi.repository import Gst, GstRtspServer, GObject, GLib
+
 log = logging.getLogger(__name__)
 
 
+class GstreamerRtspServer():
+    def __init__(self):
+        server = GstRtspServer.RTSPServer()
+        server.set_address("::")
+        server.set_service('8554')  # port as string
+        factory = GstRtspServer.RTSPMediaFactory()
+        # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_shared(True)
+        mountPoints = server.get_mount_points()
+        mountPoints.add_factory("/cug", factory)
+        server.attach()
+        self.server = server
+
+
 class Lagarde:
     def __init__(self):
         self.sdp_offer: Optional[str] = None
         self.websocket: Optional[websockets.client.WebSocketClientProtocol] = None
         self.session_id = None
-        self.received_ice_candidates = []
-        self.generated_ice_candidates = []
+        self.received_ice_candidates = queue.Queue()
+        self.generated_ice_candidates = queue.Queue()
         self.user_fragments: Optional[List] = None
         self.mids: Optional[List] = None
         self.pipe = None
@@ -39,7 +62,7 @@ class Lagarde:
 
     def on_ice_candidate(self, element, mlineindex, candidate):
         log.debug('on_ice_candidate')
-        self.generated_ice_candidates.append((mlineindex, candidate))
+        self.generated_ice_candidates.put_nowait((mlineindex, candidate))
 
     def webrtcbin_pad_added(self, element, pad):
         log.debug('webrtcbin_pad_added')
@@ -60,13 +83,12 @@ class Lagarde:
         caps = pad.get_current_caps()
         padsize = caps.get_size()
         for i in range(padsize):
-            s = caps.get_structure(i) # Gst.Structure
+            s = caps.get_structure(i)  # Gst.Structure
             name = s.get_name()
             if name.startswith('video'):
                 q = Gst.ElementFactory.make('queue')
                 conv = Gst.ElementFactory.make('videoconvert')
-                # sink = Gst.ElementFactory.make('autovideosink') # needs XDG_RUNTIME_DIR
-                sink = Gst.ElementFactory.make('xvimagesink')
+                sink = Gst.ElementFactory.make('intervideosink')
                 self.pipe.add(q)
                 self.pipe.add(conv)
                 self.pipe.add(sink)
@@ -74,23 +96,9 @@ class Lagarde:
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
                 conv.link(sink)
-            elif name.startswith('audio'):
-                q = Gst.ElementFactory.make('queue')
-                conv = Gst.ElementFactory.make('audioconvert')
-                resample = Gst.ElementFactory.make('audioresample')
-                sink = Gst.ElementFactory.make('autoaudiosink')
-                self.pipe.add(q)
-                self.pipe.add(conv)
-                self.pipe.add(resample)
-                self.pipe.add(sink)
-                self.pipe.sync_children_states()
-                pad.link(q.get_static_pad('sink'))
-                q.link(conv)
-                conv.link(resample)
-                resample.link(sink)
+                self.pipe.set_state(Gst.State.PLAYING)
 
     async def listen_to_gstreamer_bus(self):
-        Gst.init(None)
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
         Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
@@ -148,13 +156,13 @@ class Lagarde:
                     gst_promise.get_reply()
                     await self.websocket.send(sdp_answer_msg)
 
-                elif len(self.received_ice_candidates) > 0:
-                    ic = self.received_ice_candidates.pop(0)
+                elif self.received_ice_candidates.qsize() > 0:
+                    ic = self.received_ice_candidates.get_nowait()
                     if ic['candidate'] != '':
                         self.webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
 
-                elif len(self.generated_ice_candidates) > 0:
-                    mlineindex, candidate = self.generated_ice_candidates.pop(0)
+                elif self.generated_ice_candidates.qsize() > 0:
+                    mlineindex, candidate = self.generated_ice_candidates.get_nowait()
                     icemsg_value = json.dumps({
                         "candidate": candidate,
                         "sdpMid": self.mids[mlineindex],
@@ -174,59 +182,80 @@ class Lagarde:
         finally:
             self.pipe.set_state(Gst.State.NULL)
 
-    async def talk_to_websocket(self, uri):
-        async for msg in self.websocket:
-            msg_json = json.loads(msg)
-            msg_type = msg_json['Type']
-            msg_value = msg_json['Value']
-            assert self.session_id is None or self.session_id == msg_json['SessionID']
-            if msg_type == 'newSession':
-                self.session_id = msg_json['SessionID']
-                log.info(f"New session {self.session_id}")
-            elif msg_type == 'gotOffer':
-                value_json = json.loads(msg_value)
-                sdp = value_json['sdp']
-                log.info(f'Got SDP offer:\n{sdp}')
-                self.sdp_offer = sdp
-            elif msg_type == 'addCallerIceCandidate':
-                value_json = json.loads(msg_value)
-                log.info(f'Got ICE candidate: {value_json}')
-                self.received_ice_candidates.append(value_json)
-            elif msg_type == 'roomNotFound':
-                log.error(f'The room was not found: {uri}')
-                return
-            elif msg_type == 'roomClosed':
-                log.info(f'Oh noes, the room went away (session {self.session_id})!')
-                self.session_id = None
-                return
-            else:
-                log.error(f'Unknown message type {msg_type}')
+    async def talk_to_websocket(self, uri, ssl_context):
+        async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket:
+            async for msg in self.websocket:
+                msg_json = json.loads(msg)
+                msg_type = msg_json['Type']
+                msg_value = msg_json['Value']
+                assert self.session_id is None or self.session_id == msg_json['SessionID']
+                if msg_type == 'newSession':
+                    self.session_id = msg_json['SessionID']
+                    log.info(f"New session {self.session_id}")
+                elif msg_type == 'gotOffer':
+                    value_json = json.loads(msg_value)
+                    sdp = value_json['sdp']
+                    log.info(f'Got SDP offer:\n{sdp}')
+                    self.sdp_offer = sdp
+                elif msg_type == 'addCallerIceCandidate':
+                    value_json = json.loads(msg_value)
+                    log.info(f'Got ICE candidate: {value_json}')
+                    self.received_ice_candidates.put_nowait(value_json)
+                elif msg_type == 'roomNotFound':
+                    log.error(f'The room was not found: {uri}')
+                    return
+                elif msg_type == 'roomClosed':
+                    log.info(f'Oh noes, the room went away (session {self.session_id})!')
+                    self.session_id = None
+                    return
+                else:
+                    log.error(f'Unknown message type {msg_type}')
 
-    async def run(self, uri):
+    async def talk_to_signaling_server(self, uri):
         ssl_context = ssl.SSLContext()
         ssl_context.check_hostname = False
         ssl_context.verify_mode = ssl.CERT_NONE
+        while True:
+            await self.talk_to_websocket(uri, ssl_context)
+            await asyncio.sleep(0.1)
+
+    async def run(self, uri):
         try:
-            async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
-                talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
-                listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-                done, pending = await asyncio.wait(
-                    [talk_to_websocket_task, listen_to_gstreamer_bus_task],
-                    return_when=asyncio.FIRST_COMPLETED)
-                for d in done:
-                    d.result()
-                for p in pending:
-                    p.cancel()
+            talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri))
+            listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+            main_loop = asyncio.Task(gstreamer_main_loop())
+            done, pending = await asyncio.wait(
+                [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop],
+                return_when=asyncio.FIRST_COMPLETED)
+            for d in done:
+                d.result()
+            for p in pending:
+                p.cancel()
         except OSError as e:
             print(e)
 
 
+async def gstreamer_main_loop():
+    """Does the equivalent of the following lines in an async friendly way:
+        loop = GLib.MainLoop()
+        loop.run()
+    """
+    gst_loop = GLib.MainLoop()
+    context = gst_loop.get_context()
+    while True:
+        events_dispatched = context.iteration(False)
+        await asyncio.sleep(0. if events_dispatched else 0.01)
+
+
 def main():
     logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
     parser = argparse.ArgumentParser()
     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
                         help='Signalling server URI')
     args = parser.parse_args()
+
+    Gst.init(None)
+    rtsp = GstreamerRtspServer()
     lagarde = Lagarde()
     asyncio.run(lagarde.run(args.uri), debug=True)