Add wrapper that listens to the availability of the room.
[toast/stream2beamer.git] / lagarde.py
index eca29c9f60512771d8543bb212964d561c99ce7e..c31a45a98958f760620b85f5a5f2b44c52fe9e6e 100755 (executable)
@@ -5,6 +5,7 @@ import asyncio
 import json
 import logging
 import ssl
+import queue
 from typing import Optional, List
 
 import gi
@@ -19,16 +20,38 @@ from gi.repository import GstWebRTC
 gi.require_version('GstSdp', '1.0')
 from gi.repository import GstSdp
 
+gi.require_version('GstRtspServer', '1.0')
+from gi.repository import Gst, GstRtspServer, GObject, GLib
+
 log = logging.getLogger(__name__)
 
 
+class GstreamerRtspServer():
+    def __init__(self):
+        server = GstRtspServer.RTSPServer()
+        server.set_address("::")
+        server.set_service('8554')  # port as string
+        factory = GstRtspServer.RTSPMediaFactory()
+        # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_shared(True)
+        mountPoints = server.get_mount_points()
+        mountPoints.add_factory("/cug", factory)
+        server.attach()
+        self.server = server
+
+
 class Lagarde:
     def __init__(self):
         self.sdp_offer: Optional[str] = None
         self.websocket: Optional[websockets.client.WebSocketClientProtocol] = None
         self.session_id = None
-        self.received_ice_candidates = []
-        self.generated_ice_candidates = []
+        self.received_ice_candidates = queue.Queue()
+        self.generated_ice_candidates = queue.Queue()
         self.user_fragments: Optional[List] = None
         self.mids: Optional[List] = None
         self.pipe = None
@@ -39,7 +62,7 @@ class Lagarde:
 
     def on_ice_candidate(self, element, mlineindex, candidate):
         log.debug('on_ice_candidate')
-        self.generated_ice_candidates.append((mlineindex, candidate))
+        self.generated_ice_candidates.put_nowait((mlineindex, candidate))
 
     def webrtcbin_pad_added(self, element, pad):
         log.debug('webrtcbin_pad_added')
@@ -60,13 +83,12 @@ class Lagarde:
         caps = pad.get_current_caps()
         padsize = caps.get_size()
         for i in range(padsize):
-            s = caps.get_structure(i) # Gst.Structure
+            s = caps.get_structure(i)  # Gst.Structure
             name = s.get_name()
             if name.startswith('video'):
                 q = Gst.ElementFactory.make('queue')
                 conv = Gst.ElementFactory.make('videoconvert')
-                # sink = Gst.ElementFactory.make('autovideosink') # needs XDG_RUNTIME_DIR
-                sink = Gst.ElementFactory.make('xvimagesink')
+                sink = Gst.ElementFactory.make('intervideosink')
                 self.pipe.add(q)
                 self.pipe.add(conv)
                 self.pipe.add(sink)
@@ -74,23 +96,9 @@ class Lagarde:
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
                 conv.link(sink)
-            elif name.startswith('audio'):
-                q = Gst.ElementFactory.make('queue')
-                conv = Gst.ElementFactory.make('audioconvert')
-                resample = Gst.ElementFactory.make('audioresample')
-                sink = Gst.ElementFactory.make('autoaudiosink')
-                self.pipe.add(q)
-                self.pipe.add(conv)
-                self.pipe.add(resample)
-                self.pipe.add(sink)
-                self.pipe.sync_children_states()
-                pad.link(q.get_static_pad('sink'))
-                q.link(conv)
-                conv.link(resample)
-                resample.link(sink)
+                self.pipe.set_state(Gst.State.PLAYING)
 
     async def listen_to_gstreamer_bus(self):
-        Gst.init(None)
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
         Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
@@ -148,13 +156,13 @@ class Lagarde:
                     gst_promise.get_reply()
                     await self.websocket.send(sdp_answer_msg)
 
-                elif len(self.received_ice_candidates) > 0:
-                    ic = self.received_ice_candidates.pop(0)
+                elif self.received_ice_candidates.qsize() > 0:
+                    ic = self.received_ice_candidates.get_nowait()
                     if ic['candidate'] != '':
                         self.webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
 
-                elif len(self.generated_ice_candidates) > 0:
-                    mlineindex, candidate = self.generated_ice_candidates.pop(0)
+                elif self.generated_ice_candidates.qsize() > 0:
+                    mlineindex, candidate = self.generated_ice_candidates.get_nowait()
                     icemsg_value = json.dumps({
                         "candidate": candidate,
                         "sdpMid": self.mids[mlineindex],
@@ -174,11 +182,8 @@ class Lagarde:
         finally:
             self.pipe.set_state(Gst.State.NULL)
 
-    async def talk_to_websocket(self, uri):
-        ssl_context = ssl.SSLContext()
-        ssl_context.check_hostname = False
-        ssl_context.verify_mode = ssl.CERT_NONE
-        async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
+    async def talk_to_websocket(self, uri, ssl_context):
+        async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket:
             async for msg in self.websocket:
                 msg_json = json.loads(msg)
                 msg_type = msg_json['Type']
@@ -195,7 +200,10 @@ class Lagarde:
                 elif msg_type == 'addCallerIceCandidate':
                     value_json = json.loads(msg_value)
                     log.info(f'Got ICE candidate: {value_json}')
-                    self.received_ice_candidates.append(value_json)
+                    self.received_ice_candidates.put_nowait(value_json)
+                elif msg_type == 'roomNotFound':
+                    log.error(f'The room was not found: {uri}')
+                    return
                 elif msg_type == 'roomClosed':
                     log.info(f'Oh noes, the room went away (session {self.session_id})!')
                     self.session_id = None
@@ -203,16 +211,40 @@ class Lagarde:
                 else:
                     log.error(f'Unknown message type {msg_type}')
 
+    async def talk_to_signaling_server(self, uri):
+        ssl_context = ssl.SSLContext()
+        ssl_context.check_hostname = False
+        ssl_context.verify_mode = ssl.CERT_NONE
+        while True:
+            await self.talk_to_websocket(uri, ssl_context)
+            await asyncio.sleep(0.1)
+
     async def run(self, uri):
-        talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
-        listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-        done, pending = await asyncio.wait(
-            [talk_to_websocket_task, listen_to_gstreamer_bus_task],
-            return_when=asyncio.FIRST_COMPLETED)
-        for d in done:
-            d.result()
-        for p in pending:
-            p.cancel()
+        try:
+            talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri))
+            listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+            main_loop = asyncio.Task(gstreamer_main_loop())
+            done, pending = await asyncio.wait(
+                [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop],
+                return_when=asyncio.FIRST_COMPLETED)
+            for d in done:
+                d.result()
+            for p in pending:
+                p.cancel()
+        except OSError as e:
+            print(e)
+
+
+async def gstreamer_main_loop():
+    """Does the equivalent of the following lines in an async friendly way:
+        loop = GLib.MainLoop()
+        loop.run()
+    """
+    gst_loop = GLib.MainLoop()
+    context = gst_loop.get_context()
+    while True:
+        events_dispatched = context.iteration(False)
+        await asyncio.sleep(0. if events_dispatched else 0.01)
 
 
 def main():
@@ -221,6 +253,9 @@ def main():
     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
                         help='Signalling server URI')
     args = parser.parse_args()
+
+    Gst.init(None)
+    rtsp = GstreamerRtspServer()
     lagarde = Lagarde()
     asyncio.run(lagarde.run(args.uri), debug=True)