import logging
import ssl
import queue
-from typing import Optional, List
import gi
import websockets
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
-gi.require_version('GstRtspServer', '1.0')
-from gi.repository import Gst, GstRtspServer, GObject, GLib
-
log = logging.getLogger(__name__)
-class RtspServer:
- def __init__(self):
- server = GstRtspServer.RTSPServer()
- server.set_address("::")
- server.set_service('8554') # port as string
- factory = GstRtspServer.RTSPMediaFactory()
- # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- factory.set_shared(True)
- mountPoints = server.get_mount_points()
- mountPoints.add_factory("/cug", factory)
- server.attach()
- self.server = server
-
-
class Events:
def __init__(self):
self.sdp_offer = queue.Queue()
class WebRTCClient:
- def __init__(self, events: Events):
+ def __init__(self, events: Events, rtmp_uri: str):
self.events = events
+ self.rtmp_uri = rtmp_uri
self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
self.pipe = Gst.Pipeline.new("pipeline")
Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
caps = pad.get_current_caps()
padsize = caps.get_size()
- log.info(f'>>>> {padsize} {caps}')
-
for i in range(padsize):
s = caps.get_structure(i) # Gst.Structure
name = s.get_name()
- log.info(f'###### {name}')
if name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
capsfilter.set_properties(Gst.Caps.from_string('video/x-h264,stream-format=(string)avc'))
flmux = Gst.ElementFactory.make('flvmux')
sink = Gst.ElementFactory.make('rtmpsink')
- sink.set_property('location', 'rtmp://192.168.1.46:1935/gregoa')
- # sink.set_property('location', 'rtmp://bla:1936/gregoa')
- print(sink.props.location, dir(sink.props))
+ sink.set_property('location', self.rtmp_uri)
assert q and conv and enc and capsfilter and flmux and sink
self.pipe.add(q)
self.pipe.add(capsfilter)
self.pipe.add(flmux)
self.pipe.add(sink)
- self.pipe.sync_children_states()
q_pad_sink = q.get_static_pad('sink')
assert q_pad_sink
pad_link_return = pad.link(q_pad_sink)
assert pad_link_return == Gst.PadLinkReturn.OK
- # ok = element.link(q)
- # assert ok
-
ok = q.link(conv)
assert ok
ok = conv.link(enc)
ok = flmux.link(sink)
assert ok
self.pipe.set_state(Gst.State.PLAYING)
- #print(dir(Gst.DebugGraphDetails))
- #Gst.debug_bin_to_dot_data(element, Gst.DebugGraphDetails.ALL)
+ self.pipe.sync_children_states()
elif name.startswith('audio'):
q = Gst.ElementFactory.make('queue')
answer = reply.get_value('answer')
sdp_message = answer.sdp
mids = [sdp_message.get_media(i).get_attribute_val('mid')
- for i in range(sdp_message.medias_len())]
+ for i in range(sdp_message.medias_len())]
user_fragments = [sdp_message.get_media(i).get_attribute_val('ice-ufrag')
- for i in range(sdp_message.medias_len())]
+ for i in range(sdp_message.medias_len())]
self.events.sdp_info.put_nowait((mids, user_fragments))
sdp_answer = sdp_message.as_text()
log.info(f'Send SDP answer')
self.pipe.set_state(Gst.State.NULL)
-async def run_repeated(task):
- while True:
- await task()
- await asyncio.sleep(0.1)
-
-
-async def run(uri):
+async def run(laplace_uri: str, rtmp_uri: str):
try:
events = Events()
+ webrtc = WebRTCClient(events, rtmp_uri)
+ signaling = SignalingClient(events, laplace_uri)
- # rtsp = RtspServer()
- webrtc = WebRTCClient(events)
- signaling = SignalingClient(events, uri)
-
- webrtc_task = asyncio.Task(run_repeated(webrtc.run))
- signaling_task = asyncio.Task(run_repeated(signaling.run))
+ webrtc_task = asyncio.Task(webrtc.run())
+ signaling_task = asyncio.Task(signaling.run())
- done, pending = await asyncio.wait([webrtc_task, signaling_task],
- return_when=asyncio.FIRST_COMPLETED)
+ done, pending = await asyncio.wait([webrtc_task, signaling_task], return_when=asyncio.FIRST_COMPLETED)
for task in done:
task.result()
print(e)
+async def run_repeated(laplace_uri: str, rtmp_uri: str, sleep_time: float):
+ while True:
+ await run(laplace_uri, rtmp_uri)
+ await asyncio.sleep(sleep_time)
+
+
def main():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(message)s')
+ default_source = 'wss://localhost:1234/ws_connect?id=cug'
+ default_dest = 'rtmp://localhost:1935/cug'
parser = argparse.ArgumentParser()
- parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
- help='Signalling server URI')
+ parser.add_argument('-s', '--source', default=default_source,
+ help=f'Laplace signalling websocket URI, default: {default_source}')
+ parser.add_argument('-d', '--destination', default=default_dest,
+ help=f'RTMP server URI, default: {default_dest}')
+ parser.add_argument('-r', '--retry', action='store_true', help=f'Retry forever if room not found or closed')
args = parser.parse_args()
Gst.init(None)
- asyncio.run(run(args.uri), debug=True)
+ if args.retry:
+ job = run_repeated(args.source, args.destination, 2.)
+ else:
+ job = run(args.source, args.destination)
+ asyncio.run(job)
if __name__ == '__main__':