Exit when room closed.
[toast/stream2beamer.git] / lagarde.py
index 08c37b0f428adf66d4e0d09d92c04823f3de9229..487b88c43816a15a3744d4d218780c4ac64f7149 100755 (executable)
@@ -2,17 +2,13 @@
 
 import argparse
 import asyncio
-import datetime
 import json
 import logging
-import pathlib
 import ssl
-import sys
 from typing import Optional, List
 
-import websockets
-
 import gi
+import websockets
 
 gi.require_version('Gst', '1.0')
 from gi.repository import Gst
@@ -62,18 +58,18 @@ class Lagarde:
             return
 
         caps = pad.get_current_caps()
-        # assert (len(caps)) # we have a Gst.Caps object and it has no length
-        # s = caps[0] # also, it's not a list
         padsize = caps.get_size()
-        assert(padsize > 0)
-        for i in range(padsize): # pythonic?!
+        for i in range(padsize):
             s = caps.get_structure(i) # Gst.Structure
             name = s.get_name()
             if name.startswith('video'):
                 q = Gst.ElementFactory.make('queue')
                 conv = Gst.ElementFactory.make('videoconvert')
-                sink = Gst.ElementFactory.make('autovideosink')
-                self.pipe.add(q, conv, sink)
+                # sink = Gst.ElementFactory.make('autovideosink') # needs XDG_RUNTIME_DIR
+                sink = Gst.ElementFactory.make('xvimagesink')
+                self.pipe.add(q)
+                self.pipe.add(conv)
+                self.pipe.add(sink)
                 self.pipe.sync_children_states()
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
@@ -83,7 +79,10 @@ class Lagarde:
                 conv = Gst.ElementFactory.make('audioconvert')
                 resample = Gst.ElementFactory.make('audioresample')
                 sink = Gst.ElementFactory.make('autoaudiosink')
-                self.pipe.add(q, conv, resample, sink)
+                self.pipe.add(q)
+                self.pipe.add(conv)
+                self.pipe.add(resample)
+                self.pipe.add(sink)
                 self.pipe.sync_children_states()
                 pad.link(q.get_static_pad('sink'))
                 q.link(conv)
@@ -103,12 +102,13 @@ class Lagarde:
         try:
             while True:
                 if bus.have_pending():
-                    msg = bus.pop()  # Gst.Message, has to be unref'ed.
-                    if msg.type != Gst.MessageType.STATE_CHANGED:
-                        # log.info(f'Receive Gst.Message: {msg.type}, {msg.seqnum}, {msg.get_structure()}')
-                        # log.info(f'{webrtcbin.props.signaling_state} {webrtcbin.props.ice_gathering_state} {webrtcbin.props.ice_connection_state}')
-                        # Gst.Message.unref(msg)
-                        pass
+                    msg = bus.pop()
+                    if msg.type == Gst.MessageType.ERROR:
+                        log.error(f'Error from gstreamer message bus: {msg.get_structure()}')
+                        return
+                    elif msg.type == Gst.MessageType.EOS:  # end of stream
+                        log.info(f'Gstreamer message bus reports end of stream')
+                        return
                 elif self.sdp_offer is not None:
                     res, sm = GstSdp.SDPMessage.new()
                     assert res == GstSdp.SDPResult.OK
@@ -183,22 +183,24 @@ class Lagarde:
                 msg_json = json.loads(msg)
                 msg_type = msg_json['Type']
                 msg_value = msg_json['Value']
-                self.session_id = msg_json['SessionID']
                 log.info(f"receive for session {self.session_id} type {msg_type}")
                 if msg_type == 'newSession':
-                    pass
+                    self.session_id = msg_json['SessionID']
                 elif msg_type == 'gotOffer':
+                    assert self.session_id == msg_json['SessionID']
                     value_json = json.loads(msg_value)
                     sdp = value_json['sdp']
                     log.info(f'SDP: {sdp}')
                     self.sdp_offer = sdp
                 elif msg_type == 'addCallerIceCandidate':
+                    assert self.session_id == msg_json['SessionID']
                     value_json = json.loads(msg_value)
                     log.info(f'ICE: {value_json}')
                     self.received_ice_candidates.append(value_json)
                 elif msg_type == 'roomClosed':
                     log.info('Oh noes, the room went away!')
-                    # and here we should clean up
+                    self.session_id = None
+                    return
                 else:
                     log.error(f'Unknown message type {msg_type}')