From 65152b22746b3111f0ea324dbbfcd211691e09f3 Mon Sep 17 00:00:00 2001 From: Philipp Spitzer Date: Tue, 7 Jul 2020 23:21:28 +0200 Subject: [PATCH] Play with new structure of laplace client. --- laplace_client_2.py | 111 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/laplace_client_2.py b/laplace_client_2.py index e69de29..a43c5a6 100644 --- a/laplace_client_2.py +++ b/laplace_client_2.py @@ -0,0 +1,111 @@ +#!/usr/bin/python3 + +import argparse +import asyncio +import json +import logging +import pathlib +import ssl +import sys +from typing import Optional + +import websockets + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +gi.require_version('GstWebRTC', '1.0') +from gi.repository import GstWebRTC +gi.require_version('GstSdp', '1.0') +from gi.repository import GstSdp + + +log = logging.getLogger(__name__) + + +sdp_offer: Optional[str] = None +ice_candidates = [] + + +async def listen_to_gstreamer_bus(): + global sdp_offer, ice_candidates + Gst.init(None) + webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace') + pipe = Gst.Pipeline.new("pipeline") + Gst.Bin.do_add_element(pipe, webrtcbin) + bus = Gst.Pipeline.get_bus(pipe) + pipe.set_state(Gst.State.PLAYING) + try: + while True: + if bus.have_pending(): + msg = bus.pop() # Gst.Message, has to be unref'ed. + log.info(f'Receive Gst.Message: {msg.type}') + # Gst.Message.unref(msg) + elif sdp_offer is not None: + res, sm = GstSdp.SDPMessage.new() + assert res == GstSdp.SDPResult.OK + GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm) + # the three lines above can also be done this way in new versions of GStreamer: + # sm = GstSdp.SDPMessage.new_from_text(sdp_offer) + rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm) + webrtcbin.emit('set-remote-description', rd, None) + sdp_offer = None + elif len(ice_candidates) > 0: + ic = ice_candidates.pop(0) + webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate']) + else: + await asyncio.sleep(0.1) + finally: + pipe.set_state(Gst.State.NULL) + + +async def talk_to_websocket(uri): + global sdp_offer, ice_candidates + ssl_context = ssl.SSLContext() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + async with websockets.connect(uri, ssl=ssl_context) as websocket: + async for msg in websocket: + msg_json = json.loads(msg) + msg_type = msg_json['Type'] + msg_value = msg_json['Value'] + session_id = msg_json['SessionID'] + log.info(f"receive for session {session_id} type {msg_type}") + if msg_type == 'newSession': + pass + elif msg_type == 'gotOffer': + value_json = json.loads(msg_value) + sdp = value_json['sdp'] + log.info(f'SDP: {sdp}') + sdp_offer = sdp + elif msg_type == 'addCallerIceCandidate': + value_json = json.loads(msg_value) + log.info(f'ICE: {value_json}') + ice_candidates.append(value_json) + else: + log.error(f'Unknown message type {msg_type}') + + +async def run(uri): + talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri)) + listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus()) + done, pending = await asyncio.wait( + [talk_to_websocket_task, listen_to_gstreamer_bus_task], + return_when=asyncio.FIRST_COMPLETED) + for d in done: + d.result() + for p in pending: + p.cancel() + + +def main(): + logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s') + parser = argparse.ArgumentParser() + parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug', + help='Signalling server URI') + args = parser.parse_args() + asyncio.run(run(args.uri), debug=True) + + +if __name__ == '__main__': + main() -- 2.39.5