]> ToastFreeware Gitweb - toast/stream2beamer.git/commitdiff
Build pipeline on incoming connection. under-construction
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Wed, 12 Aug 2020 21:53:44 +0000 (23:53 +0200)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Wed, 12 Aug 2020 21:53:44 +0000 (23:53 +0200)
lagarde.py

index 84ff1bb7b4596acab110131214b8f49beedd4a32..d0a077cc70cb1d24106123a639700a3502bdc660 100755 (executable)
@@ -5,6 +5,7 @@ import asyncio
 import json
 import logging
 import ssl
+import threading
 from typing import Optional, List
 
 import gi
@@ -31,13 +32,15 @@ log = logging.getLogger(__name__)
 # https://stackoverflow.com/questions/52562499/is-it-possible-to-stream-an-existing-gstreamer-pipeline-through-gstrtspserver
 # https://stackoverflow.com/questions/59858898/how-to-convert-a-video-on-disk-to-a-rtsp-stream
 class CustomRTSPMediaFactory(GstRtspServer.RTSPMediaFactory):
-    def __init__(self, lagarde):
+    def __init__(self, lagarde, do_create_element_event: threading.Event):
         GstRtspServer.RTSPMediaFactory.__init__(self)
         self.lagarde = lagarde
-        log.info("CustomRTSPMediaFactory created")
+        self.do_create_element_event = do_create_element_event
 
     def do_create_element(self, url):
-        log.info("CustomRTSPMediaFactory do_create_element called")
+        log.info(f"CustomRTSPMediaFactory do_create_element called; thread ID: {threading.get_ident()}")
+        self.do_create_element_event.set()
+        self.lagarde.pipeline_is_ready_event.wait()
         return self.lagarde.pipe
 
 
@@ -53,15 +56,54 @@ class Lagarde:
         self.pipe = None
         self.webrtcbin = None
         self.loop = asyncio.get_event_loop()
+        self.pipeline_is_ready_event = threading.Event()
 
     def create_pipeline(self):
-        self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
-        Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
+        self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
+        self.decodebin = Gst.ElementFactory.make('decodebin')
+        self.q1 = Gst.ElementFactory.make('queue')
+        convert = Gst.ElementFactory.make('videoconvert')
+        capsfilter = Gst.ElementFactory.make('capsfilter')
+        enc = Gst.ElementFactory.make('theoraenc')
+        q2 = Gst.ElementFactory.make('queue')
+        pay = Gst.ElementFactory.make('rtptheorapay', 'pay0')
+
+        assert self.pipe and self.webrtcbin and self.decodebin and self.q1 and convert and capsfilter \
+               and enc and q2 and pay
+
+        self.pipe.add(self.webrtcbin)
+        self.pipe.add(self.decodebin)
+        self.pipe.add(self.q1)
+        self.pipe.add(convert)
+        self.pipe.add(capsfilter)
+        self.pipe.add(enc)
+        self.pipe.add(q2)
+        self.pipe.add(pay)
+
+        self.pipe.sync_children_states()
+
+        capsfilter.set_properties(Gst.Caps.from_string('video/x-raw,format=I420'))
+        pay.set_properties(pt=96)
+
+        ok = self.q1.link(convert)
+        assert ok
+        ok = convert.link(capsfilter)
+        assert ok
+        ok = capsfilter.link(enc)
+        assert ok
+        ok = enc.link(q2)
+        assert ok
+        ok = q2.link(pay)
+        assert ok
+
         self.webrtcbin.connect('on-negotiation-needed', self.on_negotiation_needed)
         self.webrtcbin.connect('on-ice-candidate', self.on_ice_candidate)
         self.webrtcbin.connect('on-new-transceiver', self.on_new_transceiver)
         self.webrtcbin.connect('pad-added', self.webrtcbin_pad_added)
+        self.decodebin.connect('pad-added', self.decodebin_pad_added)
+
+        self.pipeline_is_ready_event.set()
 
     def on_negotiation_needed(self, element):
         log.debug('on_negotiation_needed')
@@ -77,16 +119,11 @@ class Lagarde:
         log.debug('webrtcbin_pad_added')
         if pad.direction != Gst.PadDirection.SRC:
             return
-        decodebin = Gst.ElementFactory.make('decodebin')
-        assert decodebin
-        decodebin.connect('pad-added', self.decodebin_pad_added)
-        self.pipe.add(decodebin)
-        decodebin.sync_state_with_parent()
-        ok = self.webrtcbin.link(decodebin)
+        ok = self.webrtcbin.link(self.decodebin)
         assert ok
 
     def decodebin_pad_added(self, element, pad):
-        log.debug('decodebin_pad_added')
+        log.debug(f'decodebin_pad_added {pad.name}')
         if not pad.has_current_caps():
             log.debug(pad, 'has no caps, ignoring')
             return
@@ -99,55 +136,9 @@ class Lagarde:
             if name.startswith('video'):
                 # rtsp
                 # https://github.com/Enne2/PyGObject-GstRtspServer/blob/master/rtsp-server.py
-
-                q1 = Gst.ElementFactory.make('queue')
-                convert = Gst.ElementFactory.make('videoconvert')
-                capsfilter = Gst.ElementFactory.make('capsfilter')
-                capsfilter.set_properties(Gst.Caps.from_string('video/x-raw,format=I420'))
-                enc = Gst.ElementFactory.make('theoraenc')
-                q2 = Gst.ElementFactory.make('queue')
-                pay = Gst.ElementFactory.make('rtptheorapay')
-                pay.set_properties(name="pay0", pt=96)
-
-                assert q1 and convert and capsfilter and enc and q2 and pay
-
-                self.pipe.add(q1)
-                self.pipe.add(convert)
-                self.pipe.add(capsfilter)
-                self.pipe.add(enc)
-                self.pipe.add(q2)
-                self.pipe.add(pay)
-                self.pipe.sync_children_states()
-
-                pad_link_return = pad.link(q1.get_static_pad('sink'))
+                pad_link_return = pad.link(self.q1.get_static_pad('sink'))
                 assert pad_link_return == Gst.PadLinkReturn.OK
 
-                ok = q1.link(convert)
-                assert ok
-                ok = convert.link(capsfilter)
-                assert ok
-                ok = capsfilter.link(enc)
-                assert ok
-                ok = enc.link(q2)
-                assert ok
-                ok = q2.link(pay)
-                assert ok
-
-            elif name.startswith('audio'):
-                q = Gst.ElementFactory.make('queue')
-                conv = Gst.ElementFactory.make('audioconvert')
-                resample = Gst.ElementFactory.make('audioresample')
-                sink = Gst.ElementFactory.make('autoaudiosink')
-                self.pipe.add(q)
-                self.pipe.add(conv)
-                self.pipe.add(resample)
-                self.pipe.add(sink)
-                self.pipe.sync_children_states()
-                pad.link(q.get_static_pad('sink'))
-                q.link(conv)
-                conv.link(resample)
-                resample.link(sink)
-
     async def gstreamer_main_loop(self):
         gst_loop = GLib.MainLoop()
         context = gst_loop.get_context()
@@ -157,8 +148,10 @@ class Lagarde:
 
     async def listen_to_gstreamer_bus(self):
         self.create_pipeline()
-        self.pipe.set_state(Gst.State.PLAYING)
+        # self.pipe.set_state(Gst.State.PLAYING)
+        assert self.pipe
         bus = Gst.Pipeline.get_bus(self.pipe)
+        assert bus
         try:
             while True:
                 if bus.have_pending():
@@ -267,20 +260,26 @@ class Lagarde:
         ssl_context.check_hostname = False
         ssl_context.verify_mode = ssl.CERT_NONE
 
+        print(f'Main thread ID: {threading.get_ident()}')
+        do_create_element_event = threading.Event()
+
         # start RTSP server
         rtspserver = GstRtspServer.RTSPServer.new()
         rtspserver.set_address("::")
         rtspserver.set_service("8554")
-        factory = CustomRTSPMediaFactory(self)
+        factory = CustomRTSPMediaFactory(self, do_create_element_event)
         factory.set_shared(True)
         mounts = rtspserver.get_mount_points()
         mounts.add_factory("/cug", factory)
         rtspserver.attach()
+
+        gstreamer_main_loop_task = asyncio.create_task(self.gstreamer_main_loop())
+        await asyncio.get_event_loop().run_in_executor(None, do_create_element_event.wait)
         try:
             async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
                 talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
                 listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
-                gstreamer_main_loop_task = asyncio.Task(self.gstreamer_main_loop())
+
                 done, pending = await asyncio.wait(
                     [talk_to_websocket_task, listen_to_gstreamer_bus_task, gstreamer_main_loop_task],
                     return_when=asyncio.FIRST_COMPLETED)