X-Git-Url: https://git.toastfreeware.priv.at/toast/stream2beamer.git/blobdiff_plain/d3591c6bffd74dbe0ca413f8d12ebcd81ca9870b..e97741140341d1c98f6cb350d819a988c8b73bbd:/lagarde.py diff --git a/lagarde.py b/lagarde.py index e9fca23..f31bc2d 100755 --- a/lagarde.py +++ b/lagarde.py @@ -5,7 +5,7 @@ import json import logging import ssl import queue -from typing import Optional, List +from typing import List import gi import websockets @@ -19,31 +19,9 @@ from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp -gi.require_version('GstRtspServer', '1.0') -from gi.repository import Gst, GstRtspServer, GObject, GLib - log = logging.getLogger(__name__) -class RtspServer: - def __init__(self): - server = GstRtspServer.RTSPServer() - server.set_address("::") - server.set_service('8554') # port as string - factory = GstRtspServer.RTSPMediaFactory() - # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0") - # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0") - # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0") - # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0") - factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0") - # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0") - factory.set_shared(True) - mountPoints = server.get_mount_points() - mountPoints.add_factory("/cug", factory) - server.attach() - self.server = server - - class Events: def __init__(self): self.sdp_offer = queue.Queue() @@ -146,8 +124,9 @@ class SignalingClient: class WebRTCClient: - def __init__(self, events: Events): + def __init__(self, events: Events, rtmp_uri: str): self.events = events + self.rtmp_uri = rtmp_uri self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace') self.pipe = Gst.Pipeline.new("pipeline") Gst.Bin.do_add_element(self.pipe, self.webrtcbin) @@ -177,24 +156,65 @@ class WebRTCClient: if not pad.has_current_caps(): log.info(pad, 'has no caps, ignoring') return - caps = pad.get_current_caps() padsize = caps.get_size() + for i in range(padsize): s = caps.get_structure(i) # Gst.Structure name = s.get_name() if name.startswith('video'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('videoconvert') - sink = Gst.ElementFactory.make('intervideosink') + enc = Gst.ElementFactory.make('x264enc') + enc.set_property('bitrate', 1000) + enc.set_property('tune', 'zerolatency') + capsfilter = Gst.ElementFactory.make('capsfilter') + capsfilter.set_properties(Gst.Caps.from_string('video/x-h264,stream-format=(string)avc')) + flvmux = Gst.ElementFactory.make('flvmux') + flvmux.set_property('streamable', True) + sink = Gst.ElementFactory.make('rtmpsink') + sink.set_property('location', self.rtmp_uri) + assert q and conv and enc and capsfilter and flvmux and sink + self.pipe.add(q) self.pipe.add(conv) + self.pipe.add(enc) + self.pipe.add(capsfilter) + self.pipe.add(flvmux) + self.pipe.add(sink) + + q_pad_sink = q.get_static_pad('sink') + assert q_pad_sink + pad_link_return = pad.link(q_pad_sink) + assert pad_link_return == Gst.PadLinkReturn.OK + + ok = q.link(conv) + assert ok + ok = conv.link(enc) + assert ok + ok = enc.link(capsfilter) + assert ok + ok = capsfilter.link(flvmux) + assert ok + ok = flvmux.link(sink) + assert ok + self.pipe.set_state(Gst.State.PLAYING) + self.pipe.sync_children_states() + + elif name.startswith('audio'): + q = Gst.ElementFactory.make('queue') + conv = Gst.ElementFactory.make('audioconvert') + resample = Gst.ElementFactory.make('audioresample') + sink = Gst.ElementFactory.make('autoaudiosink') + self.pipe.add(q) + self.pipe.add(conv) + self.pipe.add(resample) self.pipe.add(sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) - conv.link(sink) - self.pipe.set_state(Gst.State.PLAYING) + conv.link(resample) + resample.link(sink) def set_remote_desciption_done(self, gst_promise): gst_promise = Gst.Promise.new_with_change_func(self.create_answer_done) @@ -203,22 +223,28 @@ class WebRTCClient: def create_answer_done(self, gst_promise): reply = gst_promise.get_reply() answer = reply.get_value('answer') + gst_promise = Gst.Promise.new_with_change_func(self.set_local_description_done) + self.webrtcbin.emit('set-local-description', answer, gst_promise) + sdp_message = answer.sdp mids = [sdp_message.get_media(i).get_attribute_val('mid') - for i in range(sdp_message.medias_len())] + for i in range(sdp_message.medias_len())] user_fragments = [sdp_message.get_media(i).get_attribute_val('ice-ufrag') - for i in range(sdp_message.medias_len())] - self.events.sdp_info.put_nowait((mids, user_fragments)) + for i in range(sdp_message.medias_len())] sdp_answer = sdp_message.as_text() - log.info(f'Send SDP answer') - log.debug(f'SDP answer:\n{sdp_answer}') - self.events.sdp_answer.put_nowait(sdp_answer) - gst_promise = Gst.Promise.new_with_change_func(self.set_local_description_done) - self.webrtcbin.emit('set-local-description', answer, gst_promise) + self.mids_uf = mids, user_fragments + self.answer = sdp_answer def set_local_description_done(self, gst_promise): gst_promise.get_reply() + sdp_answer = self.answer + log.info(f'Send SDP answer') + log.debug(f'SDP answer:\n{sdp_answer}') + self.events.sdp_answer.put_nowait(sdp_answer) + mids, user_fragments = self.mids_uf + self.events.sdp_info.put_nowait((mids, user_fragments)) + async def run(self): bus = Gst.Pipeline.get_bus(self.pipe) self.pipe.set_state(Gst.State.PLAYING) @@ -234,11 +260,8 @@ class WebRTCClient: return elif self.events.sdp_offer.qsize() > 0: sdp_offer = self.events.sdp_offer.get_nowait() - res, sm = GstSdp.SDPMessage.new() + res, sm = GstSdp.SDPMessage.new_from_text(sdp_offer) assert res == GstSdp.SDPResult.OK - GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm) - # the three lines above can also be done this way in new versions of GStreamer: - # sm = GstSdp.SDPMessage.new_from_text(sdp_offer) rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm) gst_promise = Gst.Promise.new_with_change_func(self.set_remote_desciption_done) self.webrtcbin.emit('set-remote-description', rd, gst_promise) @@ -258,38 +281,16 @@ class WebRTCClient: self.pipe.set_state(Gst.State.NULL) -async def gstreamer_main_loop(): - """Does the equivalent of the following lines in an async friendly way: - loop = GLib.MainLoop() - loop.run() - """ - gst_loop = GLib.MainLoop() - context = gst_loop.get_context() - while True: - events_dispatched = context.iteration(False) - await asyncio.sleep(0. if events_dispatched else 0.01) - - -async def run_repeated(task): - while True: - await task() - await asyncio.sleep(0.1) - - -async def run(uri): +async def run_room(laplace_uri: str, rtmp_uri: str): try: events = Events() + webrtc = WebRTCClient(events, rtmp_uri) + signaling = SignalingClient(events, laplace_uri) - rtsp = RtspServer() - webrtc = WebRTCClient(events) - signaling = SignalingClient(events, uri) - - main_loop_task = asyncio.Task(gstreamer_main_loop()) - webrtc_task = asyncio.Task(run_repeated(webrtc.run)) - signaling_task = asyncio.Task(run_repeated(signaling.run)) + webrtc_task = asyncio.Task(webrtc.run()) + signaling_task = asyncio.Task(signaling.run()) - done, pending = await asyncio.wait([main_loop_task, webrtc_task, signaling_task], - return_when=asyncio.FIRST_COMPLETED) + done, pending = await asyncio.wait([webrtc_task, signaling_task], return_when=asyncio.FIRST_COMPLETED) for task in done: task.result() @@ -299,15 +300,43 @@ async def run(uri): print(e) +async def run_room_repeated(laplace_uri: str, rtmp_uri: str, sleep_time: float): + while True: + await run_room(laplace_uri, rtmp_uri) + await asyncio.sleep(sleep_time) + + +async def run_rooms(laplace_base_uri: str, rtmp_base_uri: str, rooms: List[str], retry: bool): + tasks = [] + for room in rooms: + laplace_uri = laplace_base_uri + room # TODO: encode + rtmp_uri = rtmp_base_uri + room # TODO: encode + if retry: + tasks.append(run_room_repeated(laplace_uri, rtmp_uri, 2.)) + else: + tasks.append(run_room(laplace_uri, rtmp_uri)) + await asyncio.gather(*tasks) + + def main(): - logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') + logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(message)s') + default_source = 'wss://localhost:1234/ws_connect?id=' + default_dest = 'rtmp://localhost:1935/' + default_room = 'cug' parser = argparse.ArgumentParser() - parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug', - help='Signalling server URI') + parser.add_argument('-s', '--source', default=default_source, + help=f'Laplace signalling websocket base URI, default: {default_source}') + parser.add_argument('-d', '--destination', default=default_dest, + help=f'RTMP server base URI, default: {default_dest}') + parser.add_argument('-r', '--retry', action='store_true', help=f'Retry forever if room not found or closed') + parser.add_argument('room', nargs='*', help=f'Room names to be used, "{default_room}" if omitted') args = parser.parse_args() Gst.init(None) - asyncio.run(run(args.uri), debug=True) + rooms = args.room + if len(rooms) == 0: + rooms = [default_room] + asyncio.run(run_rooms(args.source, args.destination, rooms, args.retry)) if __name__ == '__main__':