From 31259b8120ae4f3068ca708215b2e50fe41a57f6 Mon Sep 17 00:00:00 2001 From: Philipp Spitzer Date: Thu, 18 Jun 2020 00:02:45 +0200 Subject: [PATCH] Start to work on a laplace client. --- laplace_client.py | 193 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 laplace_client.py diff --git a/laplace_client.py b/laplace_client.py new file mode 100644 index 0000000..63fb8b8 --- /dev/null +++ b/laplace_client.py @@ -0,0 +1,193 @@ +import argparse +import asyncio +import json +import logging +import pathlib +import ssl +import sys + +import websockets + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +gi.require_version('GstWebRTC', '1.0') +from gi.repository import GstWebRTC +gi.require_version('GstSdp', '1.0') +from gi.repository import GstSdp + + +log = logging.Logger(__name__) + + +class WebRTCClient: + + def __init__(self, uri: str): + self.uri = uri + self.ssl_context = ssl.SSLContext() + self.ssl_context.check_hostname = False + self.ssl_context.verify_mode = ssl.CERT_NONE + self.websocket = None + self.session_id = None + + def send_sdp_offer(self, offer): + text = offer.sdp.as_text() + log.info('Sending offer:\n%s' % text) + msg = json.dumps({ + 'SessionID': self.session_id, + 'Type': "gotAnswer", + 'Value': json.dumps({ + 'type': 'answer', + 'sdp': text + }) + }) + loop = asyncio.new_event_loop() + loop.run_until_complete(self.websocket.send(msg)) + loop.close() + + def on_offer_created(self, promise, _, __): + promise.wait() + reply = promise.get_reply() + offer = reply['offer'] + promise = Gst.Promise.new() + self.webrtc.emit('set-local-description', offer, promise) + promise.interrupt() + self.send_sdp_offer(offer) + + def on_negotiation_needed(self, element): + promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) + element.emit('create-offer', None, promise) + + def send_ice_candidate_message(self, _, mlineindex, candidate): + icemsg = json.dumps({ + 'SessionID': self.session_id, + 'Type': 'addCalleeIceCandidate', + 'Value': json.dumps({ + "candidate": candidate, + "sdpMid": "0", + "sdpMLineIndex": mlineindex, + }) + }) + loop = asyncio.new_event_loop() + loop.run_until_complete(self.websocket.send(icemsg)) + loop.close() + + def on_incoming_decodebin_stream(self, _, pad): + if not pad.has_current_caps(): + log.info(pad, 'has no caps, ignoring') + return + + caps = pad.get_current_caps() + assert (len(caps)) + s = caps[0] + name = s.get_name() + if name.startswith('video'): + q = Gst.ElementFactory.make('queue') + conv = Gst.ElementFactory.make('videoconvert') + sink = Gst.ElementFactory.make('autovideosink') + self.pipe.add(q, conv, sink) + self.pipe.sync_children_states() + pad.link(q.get_static_pad('sink')) + q.link(conv) + conv.link(sink) + elif name.startswith('audio'): + q = Gst.ElementFactory.make('queue') + conv = Gst.ElementFactory.make('audioconvert') + resample = Gst.ElementFactory.make('audioresample') + sink = Gst.ElementFactory.make('autoaudiosink') + self.pipe.add(q, conv, resample, sink) + self.pipe.sync_children_states() + pad.link(q.get_static_pad('sink')) + q.link(conv) + conv.link(resample) + resample.link(sink) + + def on_incoming_stream(self, _, pad): + if pad.direction != Gst.PadDirection.SRC: + return + decodebin = Gst.ElementFactory.make('decodebin') + decodebin.connect('pad-added', self.on_incoming_decodebin_stream) + self.pipe.add(decodebin) + decodebin.sync_state_with_parent() + self.webrtc.link(decodebin) + + def start_pipeline(self): + self.webrtc = Gst.ElementFactory.make('webrtcbin', 'laplace') + self.webrtc.set_property("bundle-policy", 3) + direction = GstWebRTC.WebRTCRTPTransceiverDirection.RECVONLY + caps = Gst.caps_from_string("application/x-rtp,media=video,encoding-name=VP8/9000,payload=96") + self.webrtc.emit('add-transceiver', direction, caps) + self.pipe = Gst.Pipeline.new("pipeline") + Gst.Bin.do_add_element(self.pipe, self.webrtc) + self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) + self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) + self.webrtc.connect('pad-added', self.on_incoming_stream) + self.pipe.set_state(Gst.State.PLAYING) + + def close_pipeline(self): + self.pipe.set_state(Gst.State.NULL) + self.pipe = None + self.webrtc = None + + def handle_sdp(self, sdp): + res, sdpmsg = GstSdp.SDPMessage.new() + GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) + answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) + promise = Gst.Promise.new() + self.webrtc.emit('set-remote-description', answer, promise) + promise.interrupt() + + def handle_ice(self, ice): + candidate = ice['candidate'] + sdpmlineindex = ice['sdpMLineIndex'] + self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) + + async def run(self): + async with websockets.connect(self.uri, ssl=self.ssl_context) as websocket: + self.websocket = websocket + self.start_pipeline() + async for msg in websocket: + msg_json = json.loads(msg) + msg_type = msg_json['Type'] + msg_value = msg_json['Value'] + session_id = msg_json['SessionID'] + log.info(f"receive for session {session_id} type {msg_type}") + if msg_type == 'newSession': + self.session_id = session_id + elif msg_type == 'gotOffer': + value_json = json.loads(msg_value) + sdp = value_json['sdp'] + self.handle_sdp(sdp) + elif msg_type == 'addCallerIceCandidate': + value_json = json.loads(msg_value) + self.handle_ice(value_json) + self.close_pipeline() + self.websocket = None + self.session_id = None + + +def check_plugins(): + for plugin in ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", + "rtpmanager", "videotestsrc", "audiotestsrc"]: + if Gst.Registry.get().find_plugin(plugin) is None: + print('Missing gstreamer plugin:', plugin) + return False + return True + + +def main(): + logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') + Gst.init(None) + if not check_plugins(): + sys.exit(1) + parser = argparse.ArgumentParser() + parser.add_argument('--uri', default='wss://localhost:2222/ws_connect?id=cug', + help='Signalling server URI') + args = parser.parse_args() + c = WebRTCClient(args.uri) + loop = asyncio.get_event_loop() + loop.run_until_complete(c.run()) + + +if __name__=='__main__': + main() -- 2.39.5