import logging
import ssl
import queue
-from typing import Optional, List
import gi
import websockets
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
-gi.require_version('GstRtspServer', '1.0')
-from gi.repository import Gst, GstRtspServer, GObject, GLib
-
log = logging.getLogger(__name__)
-class RtspServer:
- def __init__(self):
- server = GstRtspServer.RTSPServer()
- server.set_address("::")
- server.set_service('8554') # port as string
- factory = GstRtspServer.RTSPMediaFactory()
- # factory.set_launch("intervideosrc ! decodebin ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,format=I420,framerate=10/1 ! theoraenc ! queue ! rtptheorapay name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videoconvert ! video/x-raw,format=I420 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- factory.set_launch("intervideosrc ! decodebin ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- # factory.set_launch("intervideosrc ! decodebin ! videorate ! videoconvert ! video/x-raw,framerate=10/1 ! x264enc ! queue ! rtph264pay pt=96 name=pay0")
- factory.set_shared(True)
- mountPoints = server.get_mount_points()
- mountPoints.add_factory("/cug", factory)
- server.attach()
- self.server = server
-
-
class Events:
def __init__(self):
self.sdp_offer = queue.Queue()
caps = pad.get_current_caps()
padsize = caps.get_size()
- log.info(f'>>>> {padsize} {caps}')
-
for i in range(padsize):
s = caps.get_structure(i) # Gst.Structure
name = s.get_name()
- log.info(f'###### {name}')
if name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
flmux = Gst.ElementFactory.make('flvmux')
sink = Gst.ElementFactory.make('rtmpsink')
sink.set_property('location', 'rtmp://192.168.1.46:1935/gregoa')
- # sink.set_property('location', 'rtmp://bla:1936/gregoa')
- print(sink.props.location, dir(sink.props))
assert q and conv and enc and capsfilter and flmux and sink
self.pipe.add(q)
pad_link_return = pad.link(q_pad_sink)
assert pad_link_return == Gst.PadLinkReturn.OK
- # ok = element.link(q)
- # assert ok
-
ok = q.link(conv)
assert ok
ok = conv.link(enc)
assert ok
self.pipe.set_state(Gst.State.PLAYING)
self.pipe.sync_children_states()
- #print(dir(Gst.DebugGraphDetails))
- #Gst.debug_bin_to_dot_data(element, Gst.DebugGraphDetails.ALL)
elif name.startswith('audio'):
q = Gst.ElementFactory.make('queue')
async def run(uri):
try:
events = Events()
- # rtsp = RtspServer()
webrtc = WebRTCClient(events)
signaling = SignalingClient(events, uri)
webrtc_task = asyncio.Task(webrtc.run())
signaling_task = asyncio.Task(signaling.run())
- done, pending = await asyncio.wait([webrtc_task, signaling_task],
- return_when=asyncio.FIRST_COMPLETED)
+ done, pending = await asyncio.wait([webrtc_task, signaling_task], return_when=asyncio.FIRST_COMPLETED)
for task in done:
task.result()