+ self.received_ice_candidates = queue.Queue()
+ self.sdp_info = queue.Queue() # (sdp_mids, user_fragments)
+ self.room_left = queue.Queue()
+
+
+class SignalingClient:
+ def __init__(self, events: Events, uri):
+ self.events = events
+ self.uri = uri
+ self.ssl_context = ssl.SSLContext()
+ self.ssl_context.check_hostname = False
+ self.ssl_context.verify_mode = ssl.CERT_NONE
+ self.session_id = None
+
+ async def receive(self, uri):
+ async for msg in self.websocket:
+ msg_json = json.loads(msg)
+ msg_type = msg_json['Type']
+ msg_value = msg_json['Value']
+ assert self.session_id is None or self.session_id == msg_json['SessionID']
+ if msg_type == 'newSession':
+ self.session_id = msg_json['SessionID']
+ log.info(f"New session {self.session_id}")
+ elif msg_type == 'gotOffer':
+ value_json = json.loads(msg_value)
+ sdp = value_json['sdp']
+ log.info(f'Got SDP offer')
+ log.debug(f'SDP offer:\n{sdp}')
+ self.events.sdp_offer.put_nowait(sdp)
+ elif msg_type == 'addCallerIceCandidate':
+ value_json = json.loads(msg_value)
+ log.info(f'Got ICE candidate')
+ log.debug(f'ICE candidate: {value_json}')
+ self.events.received_ice_candidates.put_nowait(value_json)
+ elif msg_type == 'roomNotFound':
+ log.error(f'The room was not found: {uri}')
+ return
+ elif msg_type == 'roomClosed':
+ log.info(f'Oh noes, the room went away (session {self.session_id})!')
+ self.events.room_left.put_nowait(True)
+ return
+ else:
+ log.error(f'Unknown message type {msg_type}')
+
+ async def send(self):
+ sdp_mids = None
+ user_fragments = None
+ while True:
+ if self.events.sdp_answer.qsize() > 0:
+ sdp_answer = self.events.sdp_answer.get_nowait()
+ sdp_answer_msg = json.dumps({
+ 'SessionID': self.session_id,
+ 'Type': "gotAnswer",
+ 'Value': json.dumps({
+ 'type': 'answer',
+ 'sdp': sdp_answer
+ })
+ })
+ await self.websocket.send(sdp_answer_msg)
+
+ elif self.events.sdp_info.qsize() > 0:
+ sdp_mids, user_fragments = self.events.sdp_info.get_nowait()
+
+ elif self.events.generated_ice_candidates.qsize() > 0 \
+ and sdp_mids is not None and user_fragments is not None:
+ mlineindex, candidate = self.events.generated_ice_candidates.get_nowait()
+ sdp_mid = sdp_mids[mlineindex]
+ user_fragment = user_fragments[mlineindex]
+ icemsg_value = json.dumps({
+ "candidate": candidate,
+ "sdpMid": sdp_mid,
+ "sdpMLineIndex": mlineindex,
+ "usernameFragment": user_fragment,
+ })
+ icemsg = json.dumps({
+ 'SessionID': self.session_id,
+ 'Type': 'addCalleeIceCandidate',
+ 'Value': icemsg_value,
+ })
+ log.info(f'Send ICE candidate')
+ log.debug(f'ICE candidate: {icemsg_value}')
+ await self.websocket.send(icemsg)
+
+ else:
+ await asyncio.sleep(0.2)
+
+ async def run(self):
+ self.session_id = None
+ async with websockets.connect(self.uri, ssl=self.ssl_context, close_timeout=0.5) as self.websocket:
+ receive_task = asyncio.Task(self.receive(self.uri))
+ send_task = asyncio.Task(self.send())
+ done, pending = await asyncio.wait([receive_task, send_task], return_when=asyncio.FIRST_COMPLETED)
+ for task in pending:
+ task.cancel()
+
+
+class WebRTCClient:
+ def __init__(self, events: Events):
+ self.events = events
+ self.webrtcbin = Gst.ElementFactory.make('webrtcbin', 'laplace')
+ self.pipe = Gst.Pipeline.new("pipeline")
+ Gst.Bin.do_add_element(self.pipe, self.webrtcbin)
+ self.webrtcbin.connect('on-negotiation-needed', self.on_negotiation_needed)
+ self.webrtcbin.connect('on-ice-candidate', self.on_ice_candidate)
+ self.webrtcbin.connect('pad-added', self.webrtcbin_pad_added)