chmod +x laplace_client_2.py
[toast/stream2beamer.git] / laplace_client_2.py
1 #!/usr/bin/python3
2
3 import argparse
4 import asyncio
5 import json
6 import logging
7 import pathlib
8 import ssl
9 import sys
10 from typing import Optional
11
12 import websockets
13
14 import gi
15 gi.require_version('Gst', '1.0')
16 from gi.repository import Gst
17 gi.require_version('GstWebRTC', '1.0')
18 from gi.repository import GstWebRTC
19 gi.require_version('GstSdp', '1.0')
20 from gi.repository import GstSdp
21
22
23 log = logging.getLogger(__name__)
24
25
26 sdp_offer: Optional[str] = None
27 ice_candidates = []
28
29
30 async def listen_to_gstreamer_bus():
31     global sdp_offer, ice_candidates
32     Gst.init(None)
33     webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
34     pipe = Gst.Pipeline.new("pipeline")
35     Gst.Bin.do_add_element(pipe, webrtcbin)
36     bus = Gst.Pipeline.get_bus(pipe)
37     pipe.set_state(Gst.State.PLAYING)
38     try:
39         while True:
40             if bus.have_pending():
41                 msg = bus.pop()  # Gst.Message, has to be unref'ed.
42                 log.info(f'Receive Gst.Message: {msg.type}')
43                 # Gst.Message.unref(msg)
44             elif sdp_offer is not None:
45                 res, sm = GstSdp.SDPMessage.new()
46                 assert res == GstSdp.SDPResult.OK
47                 GstSdp.sdp_message_parse_buffer(bytes(sdp_offer.encode()), sm)
48                 # the three lines above can also be done this way in new versions of GStreamer:
49                 # sm = GstSdp.SDPMessage.new_from_text(sdp_offer)
50                 rd = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sm)
51                 webrtcbin.emit('set-remote-description', rd, None)
52                 sdp_offer = None
53             elif len(ice_candidates) > 0:
54                 ic = ice_candidates.pop(0)
55                 webrtcbin.emit('add-ice-candidate', ic['sdpMLineIndex'], ic['candidate'])
56             else:
57                 await asyncio.sleep(0.1)
58     finally:
59         pipe.set_state(Gst.State.NULL)
60
61
62 async def talk_to_websocket(uri):
63     global sdp_offer, ice_candidates
64     ssl_context = ssl.SSLContext()
65     ssl_context.check_hostname = False
66     ssl_context.verify_mode = ssl.CERT_NONE
67     async with websockets.connect(uri, ssl=ssl_context) as websocket:
68         async for msg in websocket:
69             msg_json = json.loads(msg)
70             msg_type = msg_json['Type']
71             msg_value = msg_json['Value']
72             session_id = msg_json['SessionID']
73             log.info(f"receive for session {session_id} type {msg_type}")
74             if msg_type == 'newSession':
75                 pass
76             elif msg_type == 'gotOffer':
77                 value_json = json.loads(msg_value)
78                 sdp = value_json['sdp']
79                 log.info(f'SDP: {sdp}')
80                 sdp_offer = sdp
81             elif msg_type == 'addCallerIceCandidate':
82                 value_json = json.loads(msg_value)
83                 log.info(f'ICE: {value_json}')
84                 ice_candidates.append(value_json)
85             else:
86                 log.error(f'Unknown message type {msg_type}')
87
88
89 async def run(uri):
90     talk_to_websocket_task = asyncio.Task(talk_to_websocket(uri))
91     listen_to_gstreamer_bus_task = asyncio.Task(listen_to_gstreamer_bus())
92     done, pending = await asyncio.wait(
93         [talk_to_websocket_task, listen_to_gstreamer_bus_task],
94         return_when=asyncio.FIRST_COMPLETED)
95     for d in done:
96         d.result()
97     for p in pending:
98         p.cancel()
99
100
101 def main():
102     logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
103     parser = argparse.ArgumentParser()
104     parser.add_argument('--uri', default='wss://localhost:1234/ws_connect?id=cug',
105         help='Signalling server URI')
106     args = parser.parse_args()
107     asyncio.run(run(args.uri), debug=True)
108
109
110 if __name__ == '__main__':
111     main()