]> ToastFreeware Gitweb - toast/stream2beamer.git/blobdiff - lagarde.py
Info about RTSP.
[toast/stream2beamer.git] / lagarde.py
index a5f8fbdffbc1e76c6954af16436002c28d8cd4bc..b8295953999531d93264eb07cd6333502800dd0b 100755 (executable)
@@ -58,14 +58,14 @@ class Lagarde:
         self.webrtcbin = None
 
     def on_negotiation_needed(self, element):
-        log.debug('on_negotiation_needed')
+        log.info('on_negotiation_needed')
 
     def on_ice_candidate(self, element, mlineindex, candidate):
-        log.debug('on_ice_candidate')
+        log.info('on_ice_candidate')
         self.generated_ice_candidates.put_nowait((mlineindex, candidate))
 
     def webrtcbin_pad_added(self, element, pad):
-        log.debug('webrtcbin_pad_added')
+        log.info('webrtcbin_pad_added')
         if pad.direction != Gst.PadDirection.SRC:
             return
         decodebin = Gst.ElementFactory.make('decodebin')
@@ -75,9 +75,9 @@ class Lagarde:
         self.webrtcbin.link(decodebin)
 
     def decodebin_pad_added(self, element, pad):
-        log.debug('decodebin_pad_added')
+        log.info('decodebin_pad_added')
         if not pad.has_current_caps():
-            log.debug(pad, 'has no caps, ignoring')
+            log.info(pad, 'has no caps, ignoring')
             return
 
         caps = pad.get_current_caps()
@@ -141,7 +141,8 @@ class Lagarde:
                     self.mids = [sdp_message.get_media(i).get_attribute_val('mid')
                                           for i in range(sdp_message.medias_len())]
                     sdp_answer = sdp_message.as_text()
-                    log.info(f'Send SDP answer:\n{sdp_answer}')
+                    log.info(f'Send SDP answer')
+                    log.debug(f'SDP answer:\n{sdp_answer}')
                     sdp_answer_msg = json.dumps({
                         'SessionID': self.session_id,
                         'Type': "gotAnswer",
@@ -174,7 +175,8 @@ class Lagarde:
                         'Type': 'addCalleeIceCandidate',
                         'Value': icemsg_value,
                     })
-                    log.info(f'Send ICE candidate: {icemsg_value}')
+                    log.info(f'Send ICE candidate')
+                    log.debug(f'ICE candidate: {icemsg_value}')
                     await self.websocket.send(icemsg)
 
                 else:
@@ -182,50 +184,57 @@ class Lagarde:
         finally:
             self.pipe.set_state(Gst.State.NULL)
 
-    async def talk_to_websocket(self, uri):
-        async for msg in self.websocket:
-            msg_json = json.loads(msg)
-            msg_type = msg_json['Type']
-            msg_value = msg_json['Value']
-            assert self.session_id is None or self.session_id == msg_json['SessionID']
-            if msg_type == 'newSession':
-                self.session_id = msg_json['SessionID']
-                log.info(f"New session {self.session_id}")
-            elif msg_type == 'gotOffer':
-                value_json = json.loads(msg_value)
-                sdp = value_json['sdp']
-                log.info(f'Got SDP offer:\n{sdp}')
-                self.sdp_offer = sdp
-            elif msg_type == 'addCallerIceCandidate':
-                value_json = json.loads(msg_value)
-                log.info(f'Got ICE candidate: {value_json}')
-                self.received_ice_candidates.put_nowait(value_json)
-            elif msg_type == 'roomNotFound':
-                log.error(f'The room was not found: {uri}')
-                return
-            elif msg_type == 'roomClosed':
-                log.info(f'Oh noes, the room went away (session {self.session_id})!')
-                self.session_id = None
-                return
-            else:
-                log.error(f'Unknown message type {msg_type}')
+    async def talk_to_websocket(self, uri, ssl_context):
+        async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket:
+            async for msg in self.websocket:
+                msg_json = json.loads(msg)
+                msg_type = msg_json['Type']
+                msg_value = msg_json['Value']
+                assert self.session_id is None or self.session_id == msg_json['SessionID']
+                if msg_type == 'newSession':
+                    self.session_id = msg_json['SessionID']
+                    log.info(f"New session {self.session_id}")
+                elif msg_type == 'gotOffer':
+                    value_json = json.loads(msg_value)
+                    sdp = value_json['sdp']
+                    log.info(f'Got SDP offer')
+                    log.debug(f'SDP offer:\n{sdp}')
+                    self.sdp_offer = sdp
+                elif msg_type == 'addCallerIceCandidate':
+                    value_json = json.loads(msg_value)
+                    log.info(f'Got ICE candidate')
+                    log.debug(f'ICE candidate: {value_json}')
+                    self.received_ice_candidates.put_nowait(value_json)
+                elif msg_type == 'roomNotFound':
+                    log.error(f'The room was not found: {uri}')
+                    return
+                elif msg_type == 'roomClosed':
+                    log.info(f'Oh noes, the room went away (session {self.session_id})!')
+                    self.session_id = None
+                    return
+                else:
+                    log.error(f'Unknown message type {msg_type}')
 
-    async def run(self, uri):
+    async def talk_to_signaling_server(self, uri):
         ssl_context = ssl.SSLContext()
         ssl_context.check_hostname = False
         ssl_context.verify_mode = ssl.CERT_NONE
+        while True:
+            await self.talk_to_websocket(uri, ssl_context)
+            await asyncio.sleep(0.1)
+
+    async def run(self, uri):
         try:
-            async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
-                talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
-                listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-                main_loop = asyncio.Task(gstreamer_main_loop())
-                done, pending = await asyncio.wait(
-                    [talk_to_websocket_task, listen_to_gstreamer_bus_task, main_loop],
-                    return_when=asyncio.FIRST_COMPLETED)
-                for d in done:
-                    d.result()
-                for p in pending:
-                    p.cancel()
+            talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri))
+            listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+            main_loop = asyncio.Task(gstreamer_main_loop())
+            done, pending = await asyncio.wait(
+                [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop],
+                return_when=asyncio.FIRST_COMPLETED)
+            for d in done:
+                d.result()
+            for p in pending:
+                p.cancel()
         except OSError as e:
             print(e)