Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | e11b13418ba6 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat May 11 13:25:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat May 11 13:52:41 2024 +0200 @@ -327,29 +327,24 @@ ## errors which doesn't imply a stanza sending ## - def _iq_error(self, failure_, sid, client): - """Called when we got an <iq/> error - - @param failure_(failure.Failure): the exceptions raised - @param sid(unicode): jingle session id - """ - log.warning( - "Error while sending jingle <iq/> stanza: {failure_}".format( - failure_=failure_.value - ) - ) - self.delete_session(client, sid) - - def _jingle_error_cb(self, failure_, session, request, client): + def _jingle_error_cb( + self, + failure_: failure.Failure|BaseException, + session: dict, + request: domish.Element, + client: SatXMPPEntity + ) -> defer.Deferred: """Called when something is going wrong while parsing jingle request The error condition depend of the exceptions raised: exceptions.DataError raise a bad-request condition - @param fail(failure.Failure): the exceptions raised - @param session(dict): data of the session - @param request(domsih.Element): jingle request - @param client: %(doc_client)s + @param fail: the exceptions raised + @param session: data of the session + @param request: jingle request + @param client: SatXMPPEntity instance """ + if not isinstance(failure_, failure.Failure): + failure_ = failure.Failure(failure_) del session["jingle_elt"] log.warning(f"Error while processing jingle request [{client.profile}]") if isinstance(failure_.value, defer.FirstError): @@ -369,7 +364,8 @@ def register_application( self, namespace: str, - handler: BaseApplicationHandler + handler: BaseApplicationHandler, + priority: int = 0 ) -> None: """Register an application plugin @@ -390,13 +386,21 @@ called on several action to negociate the application or transport - jingle_terminate: called on session terminate, with reason_elt May be used to clean session + @param priority: Priority of the application. It is used when several contents + from different applications are used to determine in which order methods must + be called. An example use case is for remote control: when using remote + control, contents with call application may be used at the same time, but the + overall session is a remote control one (and so, a remote control confirmation + must be requested to the user, not a call one). + If two applications have the same priority, methods are called in the same + order as the content appears. """ if namespace in self._applications: raise exceptions.ConflictError( f"Trying to register already registered namespace {namespace}" ) self._applications[namespace] = ApplicationData( - namespace=namespace, handler=handler + namespace=namespace, handler=handler, priority=priority ) log.debug("new jingle application registered") @@ -687,10 +691,10 @@ try: await iq_elt.send() - except Exception as e: - failure_ = failure.Failure(e) - self._iq_error(failure_, sid, client) - raise failure_ + except Exception: + log.exception("Error while sending jingle <iq/> stanza") + self.delete_session(client, sid) + raise return sid def delayed_content_terminate(self, *args, **kwargs): @@ -823,13 +827,13 @@ elif action == XEP_0166.A_SESSION_ACCEPT: await self.on_session_accept(client, request, jingle_elt, session) elif action == XEP_0166.A_SESSION_INFO: - self.on_session_info(client, request, jingle_elt, session) + await self.on_session_info(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_INFO: self.on_transport_info(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_REPLACE: await self.on_transport_replace(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_ACCEPT: - self.on_transport_accept(client, request, jingle_elt, session) + await self.on_transport_accept(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_REJECT: self.on_transport_reject(client, request, jingle_elt, session) else: @@ -983,7 +987,7 @@ """ return elt - def _call_plugins( + async def _call_plugins( self, client: SatXMPPEntity, action: str, @@ -995,7 +999,7 @@ delete: bool = True, elements: bool = True, force_element: Optional[domish.Element] = None - ) -> List[defer.Deferred]: + ) -> list[Any]: """Call application and transport plugin methods for all contents @param action: jingle action name @@ -1006,7 +1010,8 @@ None to ignore @param app_default_cb: default callback to use if plugin has not app_method_name None to raise an exception instead - @param transp_default_cb: default callback to use if plugin has not transp_method_name + @param transp_default_cb: default callback to use if plugin has not + transp_method_name None to raise an exception instead @param delete: if True, remove desc_elt and transport_elt from session ignored if elements is False @@ -1015,11 +1020,11 @@ after a request (i.e. on <iq> result or error) @param force_element: if elements is False, it is used as element parameter else it is ignored - @return : list of launched Deferred + @return : list of launched methods results @raise exceptions.NotFound: method is not implemented """ contents_dict = session["contents"] - defers_list = [] + results = [] for content_name, content_data in contents_dict.items(): for method_name, handler_key, default_cb, elt_name in ( (app_method_name, "application", app_default_cb, "desc_elt"), @@ -1042,12 +1047,12 @@ elt = content_data.pop(elt_name) if delete else content_data[elt_name] else: elt = force_element - d = utils.as_deferred( + result = await utils.as_deferred( method, client, action, session, content_name, elt ) - defers_list.append(d) + results.append(result) - return defers_list + return results async def on_session_initiate( self, @@ -1097,42 +1102,50 @@ ): return - await defer.DeferredList(self._call_plugins( + await self._call_plugins( client, XEP_0166.A_PREPARE_CONFIRMATION, session, delete=False - )) + ) # we now request each application plugin confirmation # and if all are accepted, we can accept the session - confirm_defers = self._call_plugins( - client, - XEP_0166.A_SESSION_INITIATE, - session, - "jingle_request_confirmation", - None, - self.jingle_request_confirmation_default, - delete=False, - ) + try: + confirmations = await self._call_plugins( + client, + XEP_0166.A_SESSION_INITIATE, + session, + "jingle_request_confirmation", + None, + self.jingle_request_confirmation_default, + delete=False, + ) + except Exception as e: + await self._jingle_error_cb(e, session, jingle_elt, client) + else: + await self._confirmation_cb(confirmations, session, jingle_elt, client) - confirm_dlist = defer.gatherResults(confirm_defers) - confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) - confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) - - def _confirmation_cb(self, confirm_results, session, jingle_elt, client): + async def _confirmation_cb( + self, + confirmations: list[bool], + session: dict, + jingle_elt: domish.Element, + client: SatXMPPEntity + ) -> None: """Method called when confirmation from user has been received This method is only called for the responder - @param confirm_results(list[bool]): all True if session is accepted - @param session(dict): session data - @param jingle_elt(domish.Element): jingle data of this session - @param client: %(doc_client)s + @param confirm_results: all True if session is accepted + @param session: session data + @param jingle_elt: jingle data of this session + @param client: SatXMPPEntity """ del session["jingle_elt"] - confirmed = all(confirm_results) + confirmed = all(confirmations) if not confirmed: - return self.terminate(client, XEP_0166.REASON_DECLINE, session) + await self.terminate(client, XEP_0166.REASON_DECLINE, session) + return iq_elt, jingle_elt = self._build_jingle_elt( client, session, XEP_0166.A_SESSION_ACCEPT @@ -1142,68 +1155,56 @@ # contents - def addElement(domish_elt, content_elt): - content_elt.addChild(domish_elt) - - defers_list = [] + try: + for content_name, content_data in session["contents"].items(): + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = XEP_0166.ROLE_INITIATOR + content_elt["name"] = content_name - for content_name, content_data in session["contents"].items(): - content_elt = jingle_elt.addElement("content") - content_elt["creator"] = XEP_0166.ROLE_INITIATOR - content_elt["name"] = content_name - - application = content_data["application"] - app_session_accept_cb = application.handler.jingle_handler + application = content_data["application"] + app_session_accept_cb = application.handler.jingle_handler - app_d = utils.as_deferred( - app_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("desc_elt"), - ) - app_d.addCallback(addElement, content_elt) - defers_list.append(app_d) - - transport = content_data["transport"] - transport_session_accept_cb = transport.handler.jingle_handler + updated_desc_elt = await utils.as_deferred( + app_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("desc_elt"), + ) + content_elt.addChild(updated_desc_elt) - transport_d = utils.as_deferred( - transport_session_accept_cb, - client, - XEP_0166.A_SESSION_INITIATE, - session, - content_name, - content_data.pop("transport_elt"), - ) - transport_d.addCallback(addElement, content_elt) - defers_list.append(transport_d) + transport = content_data["transport"] + transport_session_accept_cb = transport.handler.jingle_handler - d_list = defer.DeferredList(defers_list) - d_list.addCallback( - lambda __: self._call_plugins( + updated_transport_elt = await utils.as_deferred( + transport_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("transport_elt"), + ) + content_elt.addChild(updated_transport_elt) + + await self._call_plugins( client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False, ) - ) - d_list.addCallback(lambda __: session.pop("jingle_elt")) - d_list.addCallback(lambda __: iq_elt.send()) + session.pop("jingle_elt") + await iq_elt.send() - def change_state(__, session): session["state"] = XEP_0166.STATE_ACTIVE - d_list.addCallback(change_state, session) - d_list.addCallback( - lambda __: self._call_plugins( + await self._call_plugins( client, XEP_0166.A_ACCEPTED_ACK, session, elements=False ) - ) - d_list.addErrback(self._iq_error, session["id"], client) - return d_list + except Exception: + log.exception("Error while sending jingle <iq/> stanza") + self.delete_session(client, session["id"]) def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element: """Find a <reason> element in parent_elt @@ -1247,7 +1248,7 @@ log.debug(f"Jingle Session {session['id']} terminated") reason_elt = self.get_reason_elt(jingle_elt) - terminate_defers = self._call_plugins( + await self._call_plugins( client, XEP_0166.A_SESSION_TERMINATE, session, @@ -1258,10 +1259,9 @@ elements=False, force_element=reason_elt, ) - terminate_dlist = defer.DeferredList(terminate_defers) - terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"])) - client.send(xmlstream.toResponse(request, "result")) + self.delete_session(client, session["id"]) + await client.a_send(xmlstream.toResponse(request, "result")) async def on_session_accept(self, client, request, jingle_elt, session): """Method called once session is accepted @@ -1285,37 +1285,22 @@ session["state"] = XEP_0166.STATE_ACTIVE session["jingle_elt"] = jingle_elt - await defer.DeferredList(self._call_plugins( + await self._call_plugins( client, XEP_0166.A_PREPARE_INITIATOR, session, delete=False - )) + ) - negociate_defers = [] - negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) - - negociate_dlist = defer.gatherResults(negociate_defers) + await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) + await self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False ) - negociate_dlist.addCallback(lambda __: session.pop("jingle_elt")) + session.pop("jingle_elt") - def _on_session_cb(self, result, client, request, jingle_elt, session): - client.send(xmlstream.toResponse(request, "result")) - - def _on_session_eb(self, failure_, client, request, jingle_elt, session): - log.error("Error while handling on_session_info: {}".format(failure_.value)) - # XXX: only error managed so far, maybe some applications/transports need more - self.sendError( - client, "feature-not-implemented", None, request, "unsupported-info" - ) - - def on_session_info(self, client, request, jingle_elt, session): + async def on_session_info(self, client, request, jingle_elt, session): """Method called when a session-info action is received from other peer This method is only called for initiator @@ -1330,9 +1315,10 @@ return try: - # XXX: session-info is most likely only used for application, so we don't call transport plugins - # if a future transport use it, this behaviour must be adapted - defers = self._call_plugins( + # XXX: session-info is most likely only used for application, so we don't call + # transport plugins if a future transport use it, this behaviour must be + # adapted + await self._call_plugins( client, XEP_0166.A_SESSION_INFO, session, @@ -1342,12 +1328,15 @@ force_element=jingle_elt, ) except exceptions.NotFound as e: - self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) - return - - dlist = defer.DeferredList(defers, fireOnOneErrback=True) - dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) - dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) + log.exception("Error while handling on_session_info") + # XXX: only error managed so far, maybe some applications/transports need more + self.sendError( + client, "feature-not-implemented", None, request, "unsupported-info" + ) + except Exception: + log.exception("Error while managing session info") + else: + client.send(xmlstream.toResponse(request, "result")) async def on_transport_replace(self, client, request, jingle_elt, session): """A transport change is requested @@ -1431,13 +1420,19 @@ iq_elt.send() - def on_transport_accept(self, client, request, jingle_elt, session): + async def on_transport_accept( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: dict + ) -> None: """Method called once transport replacement is accepted - @param client: %(doc_client)s - @param request(domish.Element): full <iq> request - @param jingle_elt(domish.Element): the <jingle> element - @param session(dict): session data + @param client: SatXMPPEntity instance + @param request: full <iq> request + @param jingle_elt: the <jingle> element + @param session: session data """ log.debug("new transport has been accepted") @@ -1449,20 +1444,15 @@ return # at this point we can send the <iq/> result to confirm reception of the request - client.send(xmlstream.toResponse(request, "result")) + await client.a_send(xmlstream.toResponse(request, "result")) - negociate_defers = [] - negociate_defers = self._call_plugins( + await self._call_plugins( client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None ) - negociate_dlist = defer.DeferredList(negociate_defers) - # after negociations we start the transfer - negociate_dlist.addCallback( - lambda __: self._call_plugins( - client, XEP_0166.A_START, session, app_method_name=None, elements=False - ) + await self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False ) def on_transport_reject(self, client, request, jingle_elt, session):