Add RTSP server with videotestsrc.
authorPhilipp Spitzer <philipp@spitzer.priv.at>
Wed, 16 Sep 2020 19:56:28 +0000 (21:56 +0200)
committerPhilipp Spitzer <philipp@spitzer.priv.at>
Wed, 16 Sep 2020 19:56:28 +0000 (21:56 +0200)
RTSP and WebRTC are both working, but not connected yet.

lagarde.py

index 841f125516e2d27df70cce018351f918be8538a3..b8cd22bd5a76cb8bd216680d297918053f9f88e9 100755 (executable)
@@ -20,9 +20,26 @@ from gi.repository import GstWebRTC
 gi.require_version('GstSdp', '1.0')
 from gi.repository import GstSdp
 
 gi.require_version('GstSdp', '1.0')
 from gi.repository import GstSdp
 
+gi.require_version('GstRtspServer', '1.0')
+from gi.repository import Gst, GstRtspServer, GObject, GLib
+
 log = logging.getLogger(__name__)
 
 
 log = logging.getLogger(__name__)
 
 
+class GstreamerRtspServer():
+    def __init__(self):
+        server = GstRtspServer.RTSPServer()
+        server.set_address("::")
+        server.set_service('8554')  # port as string
+        factory = GstRtspServer.RTSPMediaFactory()
+        factory.set_launch("videotestsrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
+        factory.set_shared(True)
+        mountPoints = server.get_mount_points()
+        mountPoints.add_factory("/cug", factory)
+        server.attach()
+        self.server = server
+
+
 class Lagarde:
     def __init__(self):
         self.sdp_offer: Optional[str] = None
 class Lagarde:
     def __init__(self):
         self.sdp_offer: Optional[str] = None
@@ -91,7 +108,6 @@ class Lagarde:
                 resample.link(sink)
 
     async def listen_to_gstreamer_bus(self):
                 resample.link(sink)
 
     async def listen_to_gstreamer_bus(self):
-        Gst.init(None)
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
         Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
         self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
         self.pipe = Gst.Pipeline.new("pipeline")
         Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
@@ -211,8 +227,9 @@ class Lagarde:
             async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
                 talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
                 listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
             async with websockets.connect(uri, ssl=ssl_context) as self.websocket:
                 talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri))
                 listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus())
+                main_loop = asyncio.Task(gstreamer_main_loop())
                 done, pending = await asyncio.wait(
                 done, pending = await asyncio.wait(
-                    [talk_to_websocket_task, listen_to_gstreamer_bus_task],
+                    [talk_to_websocket_task, listen_to_gstreamer_bus_task, main_loop],
                     return_when=asyncio.FIRST_COMPLETED)
                 for d in done:
                     d.result()
                     return_when=asyncio.FIRST_COMPLETED)
                 for d in done:
                     d.result()
@@ -222,12 +239,27 @@ class Lagarde:
             print(e)
 
 
             print(e)
 
 
+async def gstreamer_main_loop():
+    """Does the equivalent of the following lines in an async friendly way:
+        loop = GLib.MainLoop()
+        loop.run()
+    """
+    gst_loop = GLib.MainLoop()
+    context = gst_loop.get_context()
+    while True:
+        events_dispatched = context.iteration(False)
+        await asyncio.sleep(0. if events_dispatched else 0.01)
+
+
 def main():
     logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
     parser = argparse.ArgumentParser()
     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
                         help='Signalling server URI')
     args = parser.parse_args()
 def main():
     logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
     parser = argparse.ArgumentParser()
     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
                         help='Signalling server URI')
     args = parser.parse_args()
+
+    Gst.init(None)
+    rtsp = GstreamerRtspServer()
     lagarde = Lagarde()
     asyncio.run(lagarde.run(args.uri), debug=True)
 
     lagarde = Lagarde()
     asyncio.run(lagarde.run(args.uri), debug=True)