X-Git-Url: https://git.toastfreeware.priv.at/toast/stream2beamer.git/blobdiff_plain/5d1faedf5e38f0cd3f5e1e4d9b77c05a6b896549..ade37d1eda7962b36c64b2dd79a0bf7c08053e6b:/laplace_client.py diff --git a/laplace_client.py b/laplace_client.py index aeca450..7378c62 100755 --- a/laplace_client.py +++ b/laplace_client.py @@ -1,3 +1,5 @@ +#!/usr/bin/python3 + import argparse import asyncio import json @@ -29,10 +31,11 @@ class WebRTCClient: self.ssl_context.verify_mode = ssl.CERT_NONE self.websocket = None self.session_id = None + self.userfragments = [] def send_sdp_offer(self, offer): text = offer.sdp.as_text() - log.info('Sending offer:\n%s' % text) + log.info(f'send_sdp_offer with {text}') msg = json.dumps({ 'SessionID': self.session_id, 'Type': "gotAnswer", @@ -46,15 +49,20 @@ class WebRTCClient: loop.close() def on_offer_created(self, promise, _, __): + log.info('on_offer_created') promise.wait() reply = promise.get_reply() - offer = reply['offer'] + offer = reply.get_value('offer') promise = Gst.Promise.new() self.webrtc.emit('set-local-description', offer, promise) promise.interrupt() self.send_sdp_offer(offer) + sdp = offer.sdp + self.userfragments = [sdp.get_media(i).get_attribute_val('ice-ufrag') for i in range(sdp.medias_len())] + def on_negotiation_needed(self, element): + log.info('on_negotiation_needed') promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) element.emit('create-offer', None, promise) @@ -64,15 +72,18 @@ class WebRTCClient: 'Type': 'addCalleeIceCandidate', 'Value': json.dumps({ "candidate": candidate, - "sdpMid": "0", + "sdpMid": f"{mlineindex}", "sdpMLineIndex": mlineindex, + "usernameFragment": self.userfragments[mlineindex], }) }) + log.info(f'send_ice_candidate_message with {icemsg}') loop = asyncio.new_event_loop() loop.run_until_complete(self.websocket.send(icemsg)) loop.close() def on_incoming_decodebin_stream(self, _, pad): + log.info('on_incoming_decodebin_stream') if not pad.has_current_caps(): log.info(pad, 'has no caps, ignoring') return @@ -103,6 +114,7 @@ class WebRTCClient: resample.link(sink) def on_incoming_stream(self, _, pad): + log.info('on_incoming_stream') if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make('decodebin') @@ -113,16 +125,19 @@ class WebRTCClient: def start_pipeline(self): self.webrtc = Gst.ElementFactory.make('webrtcbin', 'laplace') - self.webrtc.set_property("bundle-policy", 3) + # self.webrtc.set_property("bundle-policy", 3) direction = GstWebRTC.WebRTCRTPTransceiverDirection.RECVONLY - caps = Gst.caps_from_string("application/x-rtp,media=video,encoding-name=VP8/9000,payload=96") - self.webrtc.emit('add-transceiver', direction, caps) + video_caps = Gst.caps_from_string("application/x-rtp,media=video,encoding-name=VP8/9000,payload=96") + audio_caps = Gst.caps_from_string("application/x-rtp,media=audio,encoding-name=OPUS,clock-rate=48000,payload=111") + self.webrtc.emit('add-transceiver', direction, video_caps) + self.webrtc.emit('add-transceiver', direction, audio_caps) self.pipe = Gst.Pipeline.new("pipeline") Gst.Bin.do_add_element(self.pipe, self.webrtc) self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) + self.webrtc.emit('create-data-channel', 'laplace', None) def close_pipeline(self): self.pipe.set_state(Gst.State.NULL) @@ -130,6 +145,7 @@ class WebRTCClient: self.webrtc = None def handle_sdp(self, sdp): + log.info(f'handle_sdp: {sdp}') res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) @@ -138,32 +154,37 @@ class WebRTCClient: promise.interrupt() def handle_ice(self, ice): + log.info(f'handle_ice: {ice}') candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) async def run(self): - async with websockets.connect(self.uri, ssl=self.ssl_context) as websocket: - self.websocket = websocket - self.start_pipeline() - async for msg in websocket: - msg_json = json.loads(msg) - msg_type = msg_json['Type'] - msg_value = msg_json['Value'] - session_id = msg_json['SessionID'] - log.info(f"receive for session {session_id} type {msg_type}") - if msg_type == 'newSession': - self.session_id = session_id - elif msg_type == 'gotOffer': - value_json = json.loads(msg_value) - sdp = value_json['sdp'] - self.handle_sdp(sdp) - elif msg_type == 'addCallerIceCandidate': - value_json = json.loads(msg_value) - self.handle_ice(value_json) - self.close_pipeline() - self.websocket = None - self.session_id = None + try: + async with websockets.connect(self.uri, ssl=self.ssl_context) as websocket: + self.websocket = websocket + self.start_pipeline() + async for msg in websocket: + msg_json = json.loads(msg) + msg_type = msg_json['Type'] + msg_value = msg_json['Value'] + session_id = msg_json['SessionID'] + log.info(f"receive for session {session_id} type {msg_type}") + if msg_type == 'newSession': + self.session_id = session_id + elif msg_type == 'gotOffer': + value_json = json.loads(msg_value) + sdp = value_json['sdp'] + self.handle_sdp(sdp) + elif msg_type == 'addCallerIceCandidate': + value_json = json.loads(msg_value) + self.handle_ice(value_json) + self.close_pipeline() + self.websocket = None + self.session_id = None + except: + log.error(f'Connection to "{self.uri}" failed') + sys.exit(1) def check_plugins(): @@ -181,7 +202,7 @@ def main(): if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() - parser.add_argument('--uri', default='wss://localhost:2222/ws_connect?id=cug', + parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug', help='Signalling server URI') args = parser.parse_args() c = WebRTCClient(args.uri)