Add wrapper that listens to the availability of the room.
[toast/stream2beamer.git] / lagarde.py
index 841f125516e2d27df70cce018351f918be8538a3..c31a45a98958f760620b85f5a5f2b44c52fe9e6e 100755 (executable)
@@ -20,9 +20,31 @@ from gi.repository import GstWebRTC
 gi.require_version('GstSdp', '1.0')
 from gi.repository import GstSdp
 
+gi.require_version('GstRtspServer', '1.0')
+from gi.repository import Gst, GstRtspServer, GObject, GLib
+
 log = logging.getLogger(__name__)
 
 
+class GstreamerRtspServer():
+    def __init__(self):
+        server = GstRtspServer.RTSPServer()
+        server.set_address("::")
+        server.set_service('8554')  # port as string
+        factory = GstRtspServer.RTSPMediaFactory()
+        # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_shared(True)
+        mountPoints = server.get_mount_points()
+        mountPoints.add_factory("/cug", factory)
+        server.attach()
+        self.server = server
+
+
 class Lagarde:
     def __init__(self):
         self.sdp_offer: Optional[str] = None
@@ -61,13 +83,12 @@ class Lagarde:
         caps = pad.get_current_caps()
         padsize = caps.get_size()
         for i in range(padsize):
-            s = caps.get_structure(i) # Gst.Structure
+            s = caps.get_structure(i)  # Gst.Structure
             name = s.get_name()
             if name.startswith('video'):
                 q = Gst.ElementFactory.make('queue')
                 conv = Gst.ElementFactory.make('videoconvert')
-                # sink = Gst.ElementFactory.make('autovideosink') # needs XDG_RUNTIME_DIR
-                sink = Gst.ElementFactory.make('xvimagesink')
+                sink = Gst.ElementFactory.make('intervideosink')
                 self.pipe.add(q)
                 self.pipe.add(conv)
                 self.pipe.add(sink)
@@ -75,23 +96,9 @@ class Lagarde:
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
                 conv.link(sink)
-            elif name.startswith('audio'):
-                q = Gst.ElementFactory.make('queue')
-                conv = Gst.ElementFactory.make('audioconvert')
-                resample = Gst.ElementFactory.make('audioresample')
-                sink = Gst.ElementFactory.make('autoaudiosink')
-                self.pipe.add(q)
-                self.pipe.add(conv)
-                self.pipe.add(resample)
-                self.pipe.add(sink)
-                self.pipe.sync_children_states()
-                pad.link(q.get_static_pad('sink'))
-                q.link(conv)
-                conv.link(resample)
-                resample.link(sink)
+                self.pipe.set_state(Gst.State.PLAYING)
 
     async def listen_to_gstreamer_bus(self):
-        Gst.init(None)
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
         Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
@@ -175,59 +182,80 @@ class Lagarde:
         finally:
             self.pipe.set_state(Gst.State.NULL)
 
-    async def talk_to_websocket(self, uri):
-        async for msg in self.websocket:
-            msg_json = json.loads(msg)
-            msg_type = msg_json['Type']
-            msg_value = msg_json['Value']
-            assert self.session_id is None or self.session_id == msg_json['SessionID']
-            if msg_type == 'newSession':
-                self.session_id = msg_json['SessionID']
-                log.info(f"New session {self.session_id}")
-            elif msg_type == 'gotOffer':
-                value_json = json.loads(msg_value)
-                sdp = value_json['sdp']
-                log.info(f'Got SDP offer:\n{sdp}')
-                self.sdp_offer = sdp
-            elif msg_type == 'addCallerIceCandidate':
-                value_json = json.loads(msg_value)
-                log.info(f'Got ICE candidate: {value_json}')
-                self.received_ice_candidates.put_nowait(value_json)
-            elif msg_type == 'roomNotFound':
-                log.error(f'The room was not found: {uri}')
-                return
-            elif msg_type == 'roomClosed':
-                log.info(f'Oh noes, the room went away (session {self.session_id})!')
-                self.session_id = None
-                return
-            else:
-                log.error(f'Unknown message type {msg_type}')
+    async def talk_to_websocket(self, uri, ssl_context):
+        async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket:
+            async for msg in self.websocket:
+                msg_json = json.loads(msg)
+                msg_type = msg_json['Type']
+                msg_value = msg_json['Value']
+                assert self.session_id is None or self.session_id == msg_json['SessionID']
+                if msg_type == 'newSession':
+                    self.session_id = msg_json['SessionID']
+                    log.info(f"New session {self.session_id}")
+                elif msg_type == 'gotOffer':
+                    value_json = json.loads(msg_value)
+                    sdp = value_json['sdp']
+                    log.info(f'Got SDP offer:\n{sdp}')
+                    self.sdp_offer = sdp
+                elif msg_type == 'addCallerIceCandidate':
+                    value_json = json.loads(msg_value)
+                    log.info(f'Got ICE candidate: {value_json}')
+                    self.received_ice_candidates.put_nowait(value_json)
+                elif msg_type == 'roomNotFound':
+                    log.error(f'The room was not found: {uri}')
+                    return
+                elif msg_type == 'roomClosed':
+                    log.info(f'Oh noes, the room went away (session {self.session_id})!')
+                    self.session_id = None
+                    return
+                else:
+                    log.error(f'Unknown message type {msg_type}')
 
-    async def run(self, uri):
+    async def talk_to_signaling_server(self, uri):
         ssl_context = ssl.SSLContext()
         ssl_context.check_hostname = False
         ssl_context.verify_mode = ssl.CERT_NONE
+        while True:
+            await self.talk_to_websocket(uri, ssl_context)
+            await asyncio.sleep(0.1)
+
+    async def run(self, uri):
         try:
-            async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
-                talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
-                listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-                done, pending = await asyncio.wait(
-                    [talk_to_websocket_task, listen_to_gstreamer_bus_task],
-                    return_when=asyncio.FIRST_COMPLETED)
-                for d in done:
-                    d.result()
-                for p in pending:
-                    p.cancel()
+            talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri))
+            listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+            main_loop = asyncio.Task(gstreamer_main_loop())
+            done, pending = await asyncio.wait(
+                [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop],
+                return_when=asyncio.FIRST_COMPLETED)
+            for d in done:
+                d.result()
+            for p in pending:
+                p.cancel()
         except OSError as e:
             print(e)
 
 
+async def gstreamer_main_loop():
+    """Does the equivalent of the following lines in an async friendly way:
+        loop = GLib.MainLoop()
+        loop.run()
+    """
+    gst_loop = GLib.MainLoop()
+    context = gst_loop.get_context()
+    while True:
+        events_dispatched = context.iteration(False)
+        await asyncio.sleep(0. if events_dispatched else 0.01)
+
+
 def main():
     logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
     parser = argparse.ArgumentParser()
     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
                         help='Signalling server URI')
     args = parser.parse_args()
+
+    Gst.init(None)
+    rtsp = GstreamerRtspServer()
     lagarde = Lagarde()
     asyncio.run(lagarde.run(args.uri), debug=True)