From dfb3ebe45015288a1d3e44bf0869ba52433d3621 Mon Sep 17 00:00:00 2001 From: Philipp Spitzer Date: Tue, 22 Sep 2020 23:29:41 +0200 Subject: [PATCH] Add wrapper that listens to the availability of the room. --- lagarde.py | 83 +++++++++++++++++++++++++++++------------------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/lagarde.py b/lagarde.py index a5f8fbd..c31a45a 100755 --- a/lagarde.py +++ b/lagarde.py @@ -182,50 +182,55 @@ class Lagarde: finally: self.pipe.set_state(Gst.State.NULL) - async def talk_to_websocket(self, uri): - async for msg in self.websocket: - msg_json = json.loads(msg) - msg_type = msg_json['Type'] - msg_value = msg_json['Value'] - assert self.session_id is None or self.session_id == msg_json['SessionID'] - if msg_type == 'newSession': - self.session_id = msg_json['SessionID'] - log.info(f"New session {self.session_id}") - elif msg_type == 'gotOffer': - value_json = json.loads(msg_value) - sdp = value_json['sdp'] - log.info(f'Got SDP offer:\n{sdp}') - self.sdp_offer = sdp - elif msg_type == 'addCallerIceCandidate': - value_json = json.loads(msg_value) - log.info(f'Got ICE candidate: {value_json}') - self.received_ice_candidates.put_nowait(value_json) - elif msg_type == 'roomNotFound': - log.error(f'The room was not found: {uri}') - return - elif msg_type == 'roomClosed': - log.info(f'Oh noes, the room went away (session {self.session_id})!') - self.session_id = None - return - else: - log.error(f'Unknown message type {msg_type}') + async def talk_to_websocket(self, uri, ssl_context): + async with websockets.connect(uri, ssl=ssl_context, close_timeout=0.5) as self.websocket: + async for msg in self.websocket: + msg_json = json.loads(msg) + msg_type = msg_json['Type'] + msg_value = msg_json['Value'] + assert self.session_id is None or self.session_id == msg_json['SessionID'] + if msg_type == 'newSession': + self.session_id = msg_json['SessionID'] + log.info(f"New session {self.session_id}") + elif msg_type == 'gotOffer': + value_json = json.loads(msg_value) + sdp = value_json['sdp'] + log.info(f'Got SDP offer:\n{sdp}') + self.sdp_offer = sdp + elif msg_type == 'addCallerIceCandidate': + value_json = json.loads(msg_value) + log.info(f'Got ICE candidate: {value_json}') + self.received_ice_candidates.put_nowait(value_json) + elif msg_type == 'roomNotFound': + log.error(f'The room was not found: {uri}') + return + elif msg_type == 'roomClosed': + log.info(f'Oh noes, the room went away (session {self.session_id})!') + self.session_id = None + return + else: + log.error(f'Unknown message type {msg_type}') - async def run(self, uri): + async def talk_to_signaling_server(self, uri): ssl_context = ssl.SSLContext() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE + while True: + await self.talk_to_websocket(uri, ssl_context) + await asyncio.sleep(0.1) + + async def run(self, uri): try: - async with websockets.connect(uri, ssl=ssl_context) as self.websocket: - talk_to_websocket_task = asyncio.Task(self.talk_to_websocket(uri)) - listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus()) - main_loop = asyncio.Task(gstreamer_main_loop()) - done, pending = await asyncio.wait( - [talk_to_websocket_task, listen_to_gstreamer_bus_task, main_loop], - return_when=asyncio.FIRST_COMPLETED) - for d in done: - d.result() - for p in pending: - p.cancel() + talk_to_signaling_server_task = asyncio.Task(self.talk_to_signaling_server(uri)) + listen_to_gstreamer_bus_task = asyncio.Task(self.listen_to_gstreamer_bus()) + main_loop = asyncio.Task(gstreamer_main_loop()) + done, pending = await asyncio.wait( + [talk_to_signaling_server_task, listen_to_gstreamer_bus_task, main_loop], + return_when=asyncio.FIRST_COMPLETED) + for d in done: + d.result() + for p in pending: + p.cancel() except OSError as e: print(e) -- 2.39.5