Exit when room closed.
[toast/stream2beamer.git] / lagarde.py
index a658a4ea27a1eb0da4ef7aadfff89ced674d43a0..487b88c43816a15a3744d4d218780c4ac64f7149 100755 (executable)
@@ -2,17 +2,13 @@
 
 import argparse
 import asyncio
-import datetime
 import json
 import logging
-import pathlib
 import ssl
-import sys
 from typing import Optional, List
 
-import websockets
-
 import gi
+import websockets
 
 gi.require_version('Gst', '1.0')
 from gi.repository import Gst
@@ -62,29 +58,36 @@ class Lagarde:
             return
 
         caps = pad.get_current_caps()
-        assert (len(caps))
-        s = caps[0]
-        name = s.get_name()
-        if name.startswith('video'):
-            q = Gst.ElementFactory.make('queue')
-            conv = Gst.ElementFactory.make('videoconvert')
-            sink = Gst.ElementFactory.make('autovideosink')
-            self.pipe.add(q, conv, sink)
-            self.pipe.sync_children_states()
-            pad.link(q.get_static_pad('sink'))
-            q.link(conv)
-            conv.link(sink)
-        elif name.startswith('audio'):
-            q = Gst.ElementFactory.make('queue')
-            conv = Gst.ElementFactory.make('audioconvert')
-            resample = Gst.ElementFactory.make('audioresample')
-            sink = Gst.ElementFactory.make('autoaudiosink')
-            self.pipe.add(q, conv, resample, sink)
-            self.pipe.sync_children_states()
-            pad.link(q.get_static_pad('sink'))
-            q.link(conv)
-            conv.link(resample)
-            resample.link(sink)
+        padsize = caps.get_size()
+        for i in range(padsize):
+            s = caps.get_structure(i) # Gst.Structure
+            name = s.get_name()
+            if name.startswith('video'):
+                q = Gst.ElementFactory.make('queue')
+                conv = Gst.ElementFactory.make('videoconvert')
+                # sink = Gst.ElementFactory.make('autovideosink') # needs XDG_RUNTIME_DIR
+                sink = Gst.ElementFactory.make('xvimagesink')
+                self.pipe.add(q)
+                self.pipe.add(conv)
+                self.pipe.add(sink)
+                self.pipe.sync_children_states()
+                pad.link(q.get_static_pad('sink'))
+                q.link(conv)
+                conv.link(sink)
+            elif name.startswith('audio'):
+                q = Gst.ElementFactory.make('queue')
+                conv = Gst.ElementFactory.make('audioconvert')
+                resample = Gst.ElementFactory.make('audioresample')
+                sink = Gst.ElementFactory.make('autoaudiosink')
+                self.pipe.add(q)
+                self.pipe.add(conv)
+                self.pipe.add(resample)
+                self.pipe.add(sink)
+                self.pipe.sync_children_states()
+                pad.link(q.get_static_pad('sink'))
+                q.link(conv)
+                conv.link(resample)
+                resample.link(sink)
 
     async def listen_to_gstreamer_bus(self):
         Gst.init(None)
@@ -99,12 +102,13 @@ class Lagarde:
         try:
             while True:
                 if bus.have_pending():
-                    msg = bus.pop()  # Gst.Message, has to be unref'ed.
-                    if msg.type != Gst.MessageType.STATE_CHANGED:
-                        # log.info(f'Receive Gst.Message: {msg.type}, {msg.seqnum}, {msg.get_structure()}')
-                        # log.info(f'{webrtcbin.props.signaling_state} {webrtcbin.props.ice_gathering_state} {webrtcbin.props.ice_connection_state}')
-                        # Gst.Message.unref(msg)
-                        pass
+                    msg = bus.pop()
+                    if msg.type == Gst.MessageType.ERROR:
+                        log.error(f'Error from gstreamer message bus: {msg.get_structure()}')
+                        return
+                    elif msg.type == Gst.MessageType.EOS:  # end of stream
+                        log.info(f'Gstreamer message bus reports end of stream')
+                        return
                 elif self.sdp_offer is not None:
                     res, sm = GstSdp.SDPMessage.new()
                     assert res == GstSdp.SDPResult.OK
@@ -115,7 +119,6 @@ class Lagarde:
                     gst_promise = Gst.Promise.new()
                     self.webrtcbin.emit('set-remote-description', rd, gst_promise)
                     gst_promise.wait()
-                    print(gst_promise.get_reply())
                     self.sdp_offer = None
 
                     log.info('create-answer')
@@ -180,19 +183,24 @@ class Lagarde:
                 msg_json = json.loads(msg)
                 msg_type = msg_json['Type']
                 msg_value = msg_json['Value']
-                self.session_id = msg_json['SessionID']
                 log.info(f"receive for session {self.session_id} type {msg_type}")
                 if msg_type == 'newSession':
-                    pass
+                    self.session_id = msg_json['SessionID']
                 elif msg_type == 'gotOffer':
+                    assert self.session_id == msg_json['SessionID']
                     value_json = json.loads(msg_value)
                     sdp = value_json['sdp']
                     log.info(f'SDP: {sdp}')
                     self.sdp_offer = sdp
                 elif msg_type == 'addCallerIceCandidate':
+                    assert self.session_id == msg_json['SessionID']
                     value_json = json.loads(msg_value)
                     log.info(f'ICE: {value_json}')
                     self.received_ice_candidates.append(value_json)
+                elif msg_type == 'roomClosed':
+                    log.info('Oh noes, the room went away!')
+                    self.session_id = None
+                    return
                 else:
                     log.error(f'Unknown message type {msg_type}')