Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 79c8a70e1813 |
children | f46891f2c9cb |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py Wed Jun 19 18:44:57 2024 +0200 @@ -51,15 +51,15 @@ log = getLogger(__name__) -IQ_SET : Final = '/iq[@type="set"]' -NS_JINGLE : Final = "urn:xmpp:jingle:1" -NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" -JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' -CONFIRM_TXT : Final = D_( +IQ_SET: Final = '/iq[@type="set"]' +NS_JINGLE: Final = "urn:xmpp:jingle:1" +NS_JINGLE_ERROR: Final = "urn:xmpp:jingle:errors:1" +JINGLE_REQUEST: Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' +CONFIRM_TXT: Final = D_( "{entity} want to start a jingle session with you, do you accept ?" ) -PLUGIN_INFO : Final = { +PLUGIN_INFO: Final = { C.PI_NAME: "Jingle", C.PI_IMPORT_NAME: "XEP-0166", C.PI_TYPE: "XEP", @@ -72,54 +72,50 @@ class XEP_0166: - namespace : Final = NS_JINGLE + namespace: Final = NS_JINGLE - ROLE_INITIATOR : Final = "initiator" - ROLE_RESPONDER : Final = "responder" + ROLE_INITIATOR: Final = "initiator" + ROLE_RESPONDER: Final = "responder" - TRANSPORT_DATAGRAM : Final = "UDP" - TRANSPORT_STREAMING : Final = "TCP" + TRANSPORT_DATAGRAM: Final = "UDP" + TRANSPORT_STREAMING: Final = "TCP" - REASON_SUCCESS : Final = "success" - REASON_DECLINE : Final = "decline" - REASON_FAILED_APPLICATION : Final = "failed-application" - REASON_FAILED_TRANSPORT : Final = "failed-transport" - REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" + REASON_SUCCESS: Final = "success" + REASON_DECLINE: Final = "decline" + REASON_FAILED_APPLICATION: Final = "failed-application" + REASON_FAILED_TRANSPORT: Final = "failed-transport" + REASON_CONNECTIVITY_ERROR: Final = "connectivity-error" - STATE_PENDING : Final = "PENDING" - STATE_ACTIVE : Final = "ACTIVE" - STATE_ENDED : Final = "ENDED" + STATE_PENDING: Final = "PENDING" + STATE_ACTIVE: Final = "ACTIVE" + STATE_ENDED: Final = "ENDED" # standard actions - A_SESSION_INITIATE : Final = "session-initiate" - A_SESSION_ACCEPT : Final = "session-accept" - A_SESSION_TERMINATE : Final = "session-terminate" - A_SESSION_INFO : Final = "session-info" - A_TRANSPORT_REPLACE : Final = "transport-replace" - A_TRANSPORT_ACCEPT : Final = "transport-accept" - A_TRANSPORT_REJECT : Final = "transport-reject" - A_TRANSPORT_INFO : Final = "transport-info" + A_SESSION_INITIATE: Final = "session-initiate" + A_SESSION_ACCEPT: Final = "session-accept" + A_SESSION_TERMINATE: Final = "session-terminate" + A_SESSION_INFO: Final = "session-info" + A_TRANSPORT_REPLACE: Final = "transport-replace" + A_TRANSPORT_ACCEPT: Final = "transport-accept" + A_TRANSPORT_REJECT: Final = "transport-reject" + A_TRANSPORT_INFO: Final = "transport-info" # non standard actions #: called before the confirmation request, first event for responder, useful for #: parsing - A_PREPARE_CONFIRMATION : Final = "prepare-confirmation" + A_PREPARE_CONFIRMATION: Final = "prepare-confirmation" #: initiator must prepare tranfer - A_PREPARE_INITIATOR : Final = "prepare-initiator" + A_PREPARE_INITIATOR: Final = "prepare-initiator" #: responder must prepare tranfer - A_PREPARE_RESPONDER : Final = "prepare-responder" - #; session accepted ack has been received from initiator - A_ACCEPTED_ACK : Final = ( - "accepted-ack" - ) - A_START : Final = "start" # application can start + A_PREPARE_RESPONDER: Final = "prepare-responder" + # ; session accepted ack has been received from initiator + A_ACCEPTED_ACK: Final = "accepted-ack" + A_START: Final = "start" # application can start #: called when a transport is destroyed (e.g. because it is remplaced). Used to do #: cleaning operations - A_DESTROY : Final = ( - "destroy" - ) + A_DESTROY: Final = "destroy" def __init__(self, host): log.info(_("plugin Jingle initialization")) @@ -157,9 +153,7 @@ try: return client.jingle_sessions[session_id] except KeyError: - raise exceptions.NotFound( - f"No session with SID {session_id} found" - ) + raise exceptions.NotFound(f"No session with SID {session_id} found") def create_session( self, @@ -167,8 +161,8 @@ sid: str, role: str, peer_jid: jid.JID, - local_jid: jid.JID|None = None, - **kwargs + local_jid: jid.JID | None = None, + **kwargs, ) -> dict: """Create a new jingle session. @@ -192,7 +186,6 @@ if role not in [XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER]: raise ValueError(f"Invalid role {role}. Expected initiator or responder.") - session_data = { "id": sid, "state": XEP_0166.STATE_PENDING, @@ -201,7 +194,7 @@ "local_jid": local_jid or client.jid, "peer_jid": peer_jid, "started": time.time(), - "contents": {} + "contents": {}, } # If extra kw args are provided, merge them into the session_data @@ -213,27 +206,24 @@ return session_data - def delete_session(self, client, sid): try: del client.jingle_sessions[sid] except KeyError: log.debug( f"Jingle session id {sid!r} is unknown, nothing to delete " - f"[{client.profile}]") + f"[{client.profile}]" + ) else: log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") ## helpers methods to build stanzas ## def _build_jingle_elt( - self, - client: SatXMPPEntity, - session: dict, - action: str + self, client: SatXMPPEntity, session: dict, action: str ) -> Tuple[xmlstream.IQ, domish.Element]: iq_elt = client.IQ("set") - iq_elt["from"] = session['local_jid'].full() + iq_elt["from"] = session["local_jid"].full() iq_elt["to"] = session["peer_jid"].full() jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) jingle_elt["sid"] = session["id"] @@ -264,11 +254,7 @@ log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) def _terminate( - self, - session_id: str, - reason: str, - reason_txt: str, - profile: str + self, session_id: str, reason: str, reason_txt: str, profile: str ) -> defer.Deferred: client = self.host.get_client(profile) session = self.get_session(client, session_id) @@ -276,19 +262,14 @@ raise ValueError( 'only "cancel", "decline" and "busy" and empty value are allowed' ) - return self.terminate( - client, - reason or None, - session, - text=reason_txt or None - ) + return self.terminate(client, reason or None, session, text=reason_txt or None) def terminate( - self, - client: SatXMPPEntity, - reason: str|list[domish.Element]|None, - session: dict, - text: str|None = None + self, + client: SatXMPPEntity, + reason: str | list[domish.Element] | None, + session: dict, + text: str | None = None, ) -> defer.Deferred: """Terminate the session @@ -311,14 +292,9 @@ reason_elt = None if text is not None: if reason_elt is None: - raise ValueError( - "You have to specify a reason if text is specified" - ) + raise ValueError("You have to specify a reason if text is specified") reason_elt.addElement("text", content=text) - if not self.host.trigger.point( - "XEP-0166_terminate", - client, session, reason_elt - ): + if not self.host.trigger.point("XEP-0166_terminate", client, session, reason_elt): return defer.succeed(None) self.delete_session(client, session["id"]) d = iq_elt.send() @@ -328,11 +304,11 @@ ## errors which doesn't imply a stanza sending ## def _jingle_error_cb( - self, - failure_: failure.Failure|BaseException, - session: dict, - request: domish.Element, - client: SatXMPPEntity + self, + failure_: failure.Failure | BaseException, + session: dict, + request: domish.Element, + client: SatXMPPEntity, ) -> defer.Deferred: """Called when something is going wrong while parsing jingle request @@ -352,20 +328,19 @@ if isinstance(failure_, exceptions.DataError): return self.sendError(client, "bad-request", session["id"], request) elif isinstance(failure_, error.StanzaError): - return self.terminate(client, self.REASON_FAILED_APPLICATION, session, - text=str(failure_)) + return self.terminate( + client, self.REASON_FAILED_APPLICATION, session, text=str(failure_) + ) else: log.error(f"Unmanaged jingle exception: {failure_}") - return self.terminate(client, self.REASON_FAILED_APPLICATION, session, - text=str(failure_)) + return self.terminate( + client, self.REASON_FAILED_APPLICATION, session, text=str(failure_) + ) ## methods used by other plugins ## def register_application( - self, - namespace: str, - handler: BaseApplicationHandler, - priority: int = 0 + self, namespace: str, handler: BaseApplicationHandler, priority: int = 0 ) -> None: """Register an application plugin @@ -405,11 +380,11 @@ log.debug("new jingle application registered") def register_transport( - self, - namespace: str, - transport_type: str, - handler: BaseTransportHandler, - priority: int = 0 + self, + namespace: str, + transport_type: str, + handler: BaseTransportHandler, + priority: int = 0, ) -> None: """Register a transport plugin @@ -466,7 +441,9 @@ content_elt["name"] = content_name content_elt["creator"] = content_data["creator"] - transport_elt = transport.handler.jingle_session_init(client, session, content_name) + transport_elt = transport.handler.jingle_session_init( + client, session, content_name + ) content_elt.addChild(transport_elt) iq_elt.send() @@ -477,7 +454,7 @@ session: dict, content_name: str, iq_elt: Optional[xmlstream.IQ] = None, - context_elt: Optional[domish.Element] = None + context_elt: Optional[domish.Element] = None, ) -> Tuple[xmlstream.IQ, domish.Element]: """Build an element according to requested action @@ -534,12 +511,10 @@ try: return self._applications[namespace] except KeyError: - raise exceptions.NotFound( - f"No application registered for {namespace}" - ) + raise exceptions.NotFound(f"No application registered for {namespace}") def get_content_data(self, content: dict, content_idx: int) -> ContentData: - """"Retrieve application and its argument from content""" + """ "Retrieve application and its argument from content""" app_ns = content["app_ns"] try: application = self.get_application(app_ns) @@ -553,11 +528,7 @@ except KeyError: content_name = content["name"] = str(content_idx) return ContentData( - application, - app_args, - app_kwargs, - transport_data, - content_name + application, app_args, app_kwargs, transport_data, content_name ) async def initiate( @@ -566,8 +537,8 @@ peer_jid: jid.JID, contents: List[dict], encrypted: bool = False, - sid: str|None = None, - **extra_data: Any + sid: str | None = None, + **extra_data: Any, ) -> str: """Send a session initiation request @@ -591,8 +562,11 @@ @return: Sesson ID """ assert contents # there must be at least one content - if (peer_jid == client.jid - or client.is_component and peer_jid.host == client.jid.host): + if ( + peer_jid == client.jid + or client.is_component + and peer_jid.host == client.jid.host + ): raise ValueError(_("You can't do a jingle session with yourself")) if sid is None: sid = str(uuid.uuid4()) @@ -602,8 +576,7 @@ initiator = session["initiator"] if not await self.host.trigger.async_point( - "XEP-0166_initiate", - client, session, contents + "XEP-0166_initiate", client, session, contents ): return sid @@ -658,21 +631,25 @@ # then the description element application_data["desc_elt"] = desc_elt = await utils.as_deferred( content_data.application.handler.jingle_session_init, - client, session, content_data.content_name, - *content_data.app_args, **content_data.app_kwargs + client, + session, + content_data.content_name, + *content_data.app_args, + **content_data.app_kwargs, ) content_elt.addChild(desc_elt) # and the transport one transport_data["transport_elt"] = transport_elt = await utils.as_deferred( transport.handler.jingle_session_init, - client, session, content_data.content_name, + client, + session, + content_data.content_name, ) content_elt.addChild(transport_elt) if not await self.host.trigger.async_point( - "XEP-0166_initiate_elt_built", - client, session, iq_elt, jingle_elt + "XEP-0166_initiate_elt_built", client, session, iq_elt, jingle_elt ): return sid @@ -737,9 +714,7 @@ defer.ensureDeferred(self.on_jingle_request(client, request)) async def on_jingle_request( - self, - client: SatXMPPEntity, - request: domish.Element + self, client: SatXMPPEntity, request: domish.Element ) -> None: """Called when any jingle request is received @@ -803,15 +778,13 @@ # XXX: we store local_jid using request['to'] because for a component the # jid used may not be client.jid (if a local part is used). session = self.create_session( - client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request['to']) + client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request["to"]) ) else: if session["peer_jid"] != peer_jid: log.warning( "sid conflict ({}), the jid doesn't match. Can be a collision, a " - "hack attempt, or a bad sid generation".format( - sid - ) + "hack attempt, or a bad sid generation".format(sid) ) self.sendError(client, "service-unavailable", sid, request) return @@ -849,7 +822,7 @@ client: SatXMPPEntity, new: bool = False, creator: str = ROLE_INITIATOR, - with_application: bool =True, + with_application: bool = True, with_transport: bool = True, store_in_session: bool = True, ) -> Dict[str, dict]: @@ -926,9 +899,7 @@ try: application = self._applications[app_ns] except KeyError: - log.warning( - "Unmanaged application namespace [{}]".format(app_ns) - ) + log.warning("Unmanaged application namespace [{}]".format(app_ns)) self.sendError( client, "service-unavailable", session["id"], request ) @@ -998,7 +969,7 @@ transp_default_cb: Optional[Callable] = None, delete: bool = True, elements: bool = True, - force_element: Optional[domish.Element] = None + force_element: Optional[domish.Element] = None, ) -> list[Any]: """Call application and transport plugin methods for all contents @@ -1059,7 +1030,7 @@ client: SatXMPPEntity, request: domish.Element, jingle_elt: domish.Element, - session: Dict[str, Any] + session: Dict[str, Any], ) -> None: """Called on session-initiate action @@ -1093,20 +1064,15 @@ # at this point we can send the <iq/> result to confirm reception of the request client.send(xmlstream.toResponse(request, "result")) - assert "jingle_elt" not in session session["jingle_elt"] = jingle_elt if not await self.host.trigger.async_point( - "XEP-0166_on_session_initiate", - client, session, request, jingle_elt + "XEP-0166_on_session_initiate", client, session, request, jingle_elt ): return await self._call_plugins( - client, - XEP_0166.A_PREPARE_CONFIRMATION, - session, - delete=False + client, XEP_0166.A_PREPARE_CONFIRMATION, session, delete=False ) # we now request each application plugin confirmation @@ -1131,7 +1097,7 @@ confirmations: list[bool], session: dict, jingle_elt: domish.Element, - client: SatXMPPEntity + client: SatXMPPEntity, ) -> None: """Method called when confirmation from user has been received @@ -1150,7 +1116,7 @@ iq_elt, jingle_elt = self._build_jingle_elt( client, session, XEP_0166.A_SESSION_ACCEPT ) - jingle_elt["responder"] = session['local_jid'].full() + jingle_elt["responder"] = session["local_jid"].full() session["jingle_elt"] = jingle_elt # contents @@ -1219,7 +1185,9 @@ reason_elt = parent_elt.addElement("reason") return reason_elt - def parse_reason_elt(self, reason_elt: domish.Element) -> tuple[str|None, str|None]: + def parse_reason_elt( + self, reason_elt: domish.Element + ) -> tuple[str | None, str | None]: """Parse a <reason> element @return: reason found, and text if any @@ -1242,7 +1210,7 @@ client: SatXMPPEntity, request: domish.Element, jingle_elt: domish.Element, - session: dict + session: dict, ) -> None: # TODO: check reason, display a message to user if needed log.debug(f"Jingle Session {session['id']} terminated") @@ -1286,10 +1254,7 @@ session["jingle_elt"] = jingle_elt await self._call_plugins( - client, - XEP_0166.A_PREPARE_INITIATOR, - session, - delete=False + client, XEP_0166.A_PREPARE_INITIATOR, session, delete=False ) await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) @@ -1398,7 +1363,11 @@ # we can now actually replace the transport await utils.as_deferred( content_data["transport"].handler.jingle_handler, - client, XEP_0166.A_DESTROY, session, content_name, None + client, + XEP_0166.A_DESTROY, + session, + content_name, + None, ) content_data["transport"] = transport content_data["transport_data"].clear() @@ -1409,13 +1378,21 @@ # we notify the transport and insert its <transport/> in the answer accept_transport_elt = await utils.as_deferred( transport.handler.jingle_handler, - client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt + client, + XEP_0166.A_TRANSPORT_REPLACE, + session, + content_name, + transport_elt, ) content_elt.addChild(accept_transport_elt) # there is no confirmation needed here, so we can directly prepare it await utils.as_deferred( transport.handler.jingle_handler, - client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None + client, + XEP_0166.A_PREPARE_RESPONDER, + session, + content_name, + None, ) iq_elt.send() @@ -1425,7 +1402,7 @@ client: SatXMPPEntity, request: domish.Element, jingle_elt: domish.Element, - session: dict + session: dict, ) -> None: """Method called once transport replacement is accepted @@ -1472,7 +1449,7 @@ client: SatXMPPEntity, request: domish.Element, jingle_elt: domish.Element, - session: dict + session: dict, ) -> None: """Method called when a transport-info action is received from other peer @@ -1487,8 +1464,12 @@ try: parsed_contents = self._parse_elements( - jingle_elt, session, request, client, with_application=False, - store_in_session=False + jingle_elt, + session, + request, + client, + with_application=False, + store_in_session=False, ) except exceptions.CancelError: return