Add wrapper that listens to the availability of the room.
[toast/stream2beamer.git] / lagarde.py
index 1724d290d2cb99562031c045a212d40e77e2dfcc..c31a45a98958f760620b85f5a5f2b44c52fe9e6e 100755 (executable)
@@ -32,7 +32,12 @@ class GstreamerRtspServer():
         server.set_address("::")
         server.set_service('8554')  # port as string
         factory = GstRtspServer.RTSPMediaFactory()
-        factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
+        # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
         factory.set_shared(True)
         mountPoints = server.get_mount_points()
         mountPoints.add_factory("/cug", factory)
@@ -78,7 +83,7 @@ class Lagarde:
         caps = pad.get_current_caps()
         padsize = caps.get_size()
         for i in range(padsize):
-            s = caps.get_structure(i) # Gst.Structure
+            s = caps.get_structure(i)  # Gst.Structure
             name = s.get_name()
             if name.startswith('video'):
                 q = Gst.ElementFactory.make('queue')
@@ -91,6 +96,7 @@ class Lagarde:
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
                 conv.link(sink)
+                self.pipe.set_state(Gst.State.PLAYING)
 
     async def listen_to_gstreamer_bus(self):
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
@@ -176,50 +182,55 @@ class Lagarde:
         finally:
             self.pipe.set_state(Gst.State.NULL)
 
-    async def talk_to_websocket(self, uri):
-        async for msg in self.websocket:
-            msg_json = json.loads(msg)
-            msg_type = msg_json['Type']
-            msg_value = msg_json['Value']
-            assert self.session_id is None or self.session_id == msg_json['SessionID']
-            if msg_type == 'newSession':
-                self.session_id = msg_json['SessionID']
-                log.info(f"New session {self.session_id}")
-            elif msg_type == 'gotOffer':
-                value_json = json.loads(msg_value)
-                sdp = value_json['sdp']
-                log.info(f'Got SDP offer:\n{sdp}')
-                self.sdp_offer = sdp
-            elif msg_type == 'addCallerIceCandidate':
-                value_json = json.loads(msg_value)
-                log.info(f'Got ICE candidate: {value_json}')
-                self.received_ice_candidates.put_nowait(value_json)
-            elif msg_type == 'roomNotFound':
-                log.error(f'The room was not found: {uri}')
-                return
-            elif msg_type == 'roomClosed':
-                log.info(f'Oh noes, the room went away (session {self.session_id})!')
-                self.session_id = None
-                return
-            else:
-                log.error(f'Unknown message type {msg_type}')
+    async def talk_to_websocket(self, uri, ssl_context):
+        async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket:
+            async for msg in self.websocket:
+                msg_json = json.loads(msg)
+                msg_type = msg_json['Type']
+                msg_value = msg_json['Value']
+                assert self.session_id is None or self.session_id == msg_json['SessionID']
+                if msg_type == 'newSession':
+                    self.session_id = msg_json['SessionID']
+                    log.info(f"New session {self.session_id}")
+                elif msg_type == 'gotOffer':
+                    value_json = json.loads(msg_value)
+                    sdp = value_json['sdp']
+                    log.info(f'Got SDP offer:\n{sdp}')
+                    self.sdp_offer = sdp
+                elif msg_type == 'addCallerIceCandidate':
+                    value_json = json.loads(msg_value)
+                    log.info(f'Got ICE candidate: {value_json}')
+                    self.received_ice_candidates.put_nowait(value_json)
+                elif msg_type == 'roomNotFound':
+                    log.error(f'The room was not found: {uri}')
+                    return
+                elif msg_type == 'roomClosed':
+                    log.info(f'Oh noes, the room went away (session {self.session_id})!')
+                    self.session_id = None
+                    return
+                else:
+                    log.error(f'Unknown message type {msg_type}')
 
-    async def run(self, uri):
+    async def talk_to_signaling_server(self, uri):
         ssl_context = ssl.SSLContext()
         ssl_context.check_hostname = False
         ssl_context.verify_mode = ssl.CERT_NONE
+        while True:
+            await self.talk_to_websocket(uri, ssl_context)
+            await asyncio.sleep(0.1)
+
+    async def run(self, uri):
         try:
-            async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
-                talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
-                listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-                main_loop = asyncio.Task(gstreamer_main_loop())
-                done, pending = await asyncio.wait(
-                    [talk_to_websocket_task, listen_to_gstreamer_bus_task, main_loop],
-                    return_when=asyncio.FIRST_COMPLETED)
-                for d in done:
-                    d.result()
-                for p in pending:
-                    p.cancel()
+            talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri))
+            listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+            main_loop = asyncio.Task(gstreamer_main_loop())
+            done, pending = await asyncio.wait(
+                [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop],
+                return_when=asyncio.FIRST_COMPLETED)
+            for d in done:
+                d.result()
+            for p in pending:
+                p.cancel()
         except OSError as e:
             print(e)