Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0166/__init__.py @ 4044:3900626bc100
plugin XEP-0166: refactoring, and various improvments:
- add models for transport and applications handlers and linked data
- split models into separate file
- some type hints
- some documentation comments
- add actions to prepare confirmation, useful to do initial parsing of all contents
- application arg/kwargs and some transport data can be initialised during Jingle
`initiate` call, this is notably useful when a call is made with transport data (this is
the call for A/V calls where codecs and ICE candidate can be specified when starting a
call)
- session data can be specified during Jingle `initiate` call
- new `store_in_session` argument in `_parse_elements`, which can be used to avoid
race-condition when a context element (<decription> or <transport>) is being parsed for
an action while an other action happens (like `transport-info`)
- don't sed `sid` in `transport_elt` during a `transport-info` action anymore in
`build_action`: this is specific to Jingle File Transfer and has been moved there
rel 419
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 15 May 2023 16:23:11 +0200 |
parents | sat/plugins/plugin_xep_0166.py@524856bd7b19 |
children | dd39e60ca2aa |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0166/__init__.py Mon May 15 16:23:11 2023 +0200 @@ -0,0 +1,1390 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Jingle (XEP-0166) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import time +from typing import Any, Callable, Dict, Final, List, Optional, Tuple +import uuid + +from twisted.internet import defer +from twisted.internet import reactor +from twisted.python import failure +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import error +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import D_, _ +from sat.core.log import getLogger +from sat.tools import xml_tools +from sat.tools import utils + +from .models import ( + ApplicationData, + BaseApplicationHandler, + BaseTransportHandler, + ContentData, + TransportData, +) + + +log = getLogger(__name__) + + +IQ_SET : Final = '/iq[@type="set"]' +NS_JINGLE : Final = "urn:xmpp:jingle:1" +NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" +JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' +STATE_PENDING : Final = "PENDING" +STATE_ACTIVE : Final = "ACTIVE" +STATE_ENDED : Final = "ENDED" +CONFIRM_TXT : Final = D_( + "{entity} want to start a jingle session with you, do you accept ?" +) + +PLUGIN_INFO : Final = { + C.PI_NAME: "Jingle", + C.PI_IMPORT_NAME: "XEP-0166", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0166"], + C.PI_MAIN: "XEP_0166", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of Jingle"""), +} + + +class XEP_0166: + namespace : Final = NS_JINGLE + + ROLE_INITIATOR : Final = "initiator" + ROLE_RESPONDER : Final = "responder" + + TRANSPORT_DATAGRAM : Final = "UDP" + TRANSPORT_STREAMING : Final = "TCP" + + REASON_SUCCESS : Final = "success" + REASON_DECLINE : Final = "decline" + REASON_FAILED_APPLICATION : Final = "failed-application" + REASON_FAILED_TRANSPORT : Final = "failed-transport" + REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" + + # standard actions + + A_SESSION_INITIATE : Final = "session-initiate" + A_SESSION_ACCEPT : Final = "session-accept" + A_SESSION_TERMINATE : Final = "session-terminate" + A_SESSION_INFO : Final = "session-info" + A_TRANSPORT_REPLACE : Final = "transport-replace" + A_TRANSPORT_ACCEPT : Final = "transport-accept" + A_TRANSPORT_REJECT : Final = "transport-reject" + A_TRANSPORT_INFO : Final = "transport-info" + + # non standard actions + + #: called before the confirmation request, first event for responder, useful for + #: parsing + A_PREPARE_CONFIRMATION : Final = "prepare-confirmation" + #: initiator must prepare tranfer + A_PREPARE_INITIATOR : Final = "prepare-initiator" + #: responder must prepare tranfer + A_PREPARE_RESPONDER : Final = "prepare-responder" + #; session accepted ack has been received from initiator + A_ACCEPTED_ACK : Final = ( + "accepted-ack" + ) + A_START : Final = "start" # application can start + #: called when a transport is destroyed (e.g. because it is remplaced). Used to do + #: cleaning operations + A_DESTROY : Final = ( + "destroy" + ) + + def __init__(self, host): + log.info(_("plugin Jingle initialization")) + self.host = host + self._applications = {} # key: namespace, value: application data + self._transports = {} # key: namespace, value: transport data + # we also keep transports by type, they are then sorted by priority + self._type_transports = { + XEP_0166.TRANSPORT_DATAGRAM: [], + XEP_0166.TRANSPORT_STREAMING: [], + } + + def profile_connected(self, client): + client.jingle_sessions = {} # key = sid, value = session_data + + def get_handler(self, client): + return XEP_0166_handler(self) + + def get_session(self, client: SatXMPPEntity, session_id: str) -> dict: + """Retrieve session from its SID + + @param session_id: session ID + @return: found session + + @raise exceptions.NotFound: no session with this SID has been found + """ + try: + return client.jingle_sessions[session_id] + except KeyError: + raise exceptions.NotFound( + f"No session with SID {session_id} found" + ) + + + def _del_session(self, client, sid): + try: + del client.jingle_sessions[sid] + except KeyError: + log.debug( + f"Jingle session id {sid!r} is unknown, nothing to delete " + f"[{client.profile}]") + else: + log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") + + ## helpers methods to build stanzas ## + + def _build_jingle_elt( + self, + client: SatXMPPEntity, + session: dict, + action: str + ) -> Tuple[xmlstream.IQ, domish.Element]: + iq_elt = client.IQ("set") + iq_elt["from"] = session['local_jid'].full() + iq_elt["to"] = session["peer_jid"].full() + jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) + jingle_elt["sid"] = session["id"] + jingle_elt["action"] = action + return iq_elt, jingle_elt + + def sendError(self, client, error_condition, sid, request, jingle_condition=None): + """Send error stanza + + @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys + @param sid(unicode,None): jingle session id, or None, if session must not be destroyed + @param request(domish.Element): original request + @param jingle_condition(None, unicode): if not None, additional jingle-specific error information + """ + iq_elt = error.StanzaError(error_condition).toResponse(request) + if jingle_condition is not None: + iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) + if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: + self._del_session(client, sid) + log.warning( + "Error while managing jingle session, cancelling: {condition}".format( + condition=error_condition + ) + ) + return client.send(iq_elt) + + def _terminate_eb(self, failure_): + log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) + + def terminate(self, client, reason, session, text=None): + """Terminate the session + + send the session-terminate action, and delete the session data + @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element + if a list of element, add them as children of the <reason/> element + @param session(dict): data of the session + """ + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_TERMINATE + ) + reason_elt = jingle_elt.addElement("reason") + if isinstance(reason, str): + reason_elt.addElement(reason) + else: + for elt in reason: + reason_elt.addChild(elt) + if text is not None: + reason_elt.addElement("text", content=text) + self._del_session(client, session["id"]) + d = iq_elt.send() + d.addErrback(self._terminate_eb) + return d + + ## errors which doesn't imply a stanza sending ## + + def _iq_error(self, failure_, sid, client): + """Called when we got an <iq/> error + + @param failure_(failure.Failure): the exceptions raised + @param sid(unicode): jingle session id + """ + log.warning( + "Error while sending jingle <iq/> stanza: {failure_}".format( + failure_=failure_.value + ) + ) + self._del_session(client, sid) + + def _jingle_error_cb(self, failure_, session, request, client): + """Called when something is going wrong while parsing jingle request + + The error condition depend of the exceptions raised: + exceptions.DataError raise a bad-request condition + @param fail(failure.Failure): the exceptions raised + @param session(dict): data of the session + @param request(domsih.Element): jingle request + @param client: %(doc_client)s + """ + log.warning(f"Error while processing jingle request [{client.profile}]") + if isinstance(failure_.value, defer.FirstError): + failure_ = failure_.value.subFailure.value + if isinstance(failure_, exceptions.DataError): + return self.sendError(client, "bad-request", session["id"], request) + elif isinstance(failure_, error.StanzaError): + return self.terminate(client, self.REASON_FAILED_APPLICATION, session, + text=str(failure_)) + else: + log.error(f"Unmanaged jingle exception: {failure_}") + return self.terminate(client, self.REASON_FAILED_APPLICATION, session, + text=str(failure_)) + + ## methods used by other plugins ## + + def register_application( + self, + namespace: str, + handler: BaseApplicationHandler + ) -> None: + """Register an application plugin + + @param namespace(unicode): application namespace managed by the plugin + @param handler(object): instance of a class which manage the application. + May have the following methods: + - request_confirmation(session, desc_elt, client): + - if present, it is called on when session must be accepted. + - if it return True the session is accepted, else rejected. + A Deferred can be returned + - if not present, a generic accept dialog will be used + - jingle_session_init( + client, self, session, content_name[, *args, **kwargs] + ): must return the domish.Element used for initial content + - jingle_handler( + client, self, action, session, content_name, transport_elt + ): + called on several action to negociate the application or transport + - jingle_terminate: called on session terminate, with reason_elt + May be used to clean session + """ + if namespace in self._applications: + raise exceptions.ConflictError( + f"Trying to register already registered namespace {namespace}" + ) + self._applications[namespace] = ApplicationData( + namespace=namespace, handler=handler + ) + log.debug("new jingle application registered") + + def register_transport( + self, + namespace: str, + transport_type: str, + handler: BaseTransportHandler, + priority: int = 0 + ) -> None: + """Register a transport plugin + + @param namespace: the XML namespace used for this transport + @param transport_type: type of transport to use (see XEP-0166 §8) + @param handler: instance of a class which manage the application. + @param priority: priority of this transport + """ + assert transport_type in ( + XEP_0166.TRANSPORT_DATAGRAM, + XEP_0166.TRANSPORT_STREAMING, + ) + if namespace in self._transports: + raise exceptions.ConflictError( + "Trying to register already registered namespace {}".format(namespace) + ) + transport_data = TransportData( + namespace=namespace, handler=handler, priority=priority + ) + self._type_transports[transport_type].append(transport_data) + self._type_transports[transport_type].sort( + key=lambda transport_data: transport_data.priority, reverse=True + ) + self._transports[namespace] = transport_data + log.debug("new jingle transport registered") + + @defer.inlineCallbacks + def transport_replace(self, client, transport_ns, session, content_name): + """Replace a transport + + @param transport_ns(unicode): namespace of the new transport to use + @param session(dict): jingle session data + @param content_name(unicode): name of the content + """ + # XXX: for now we replace the transport before receiving confirmation from other peer + # this is acceptable because we terminate the session if transport is rejected. + # this behavious may change in the future. + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError("Unkown transport") + yield content_data["transport"].handler.jingle_handler( + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport + transport_data.clear() + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_REPLACE + ) + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + + transport_elt = transport.handler.jingle_session_init(client, session, content_name) + content_elt.addChild(transport_elt) + iq_elt.send() + + def build_action( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + iq_elt: Optional[xmlstream.IQ] = None, + context_elt: Optional[domish.Element] = None + ) -> Tuple[xmlstream.IQ, domish.Element]: + """Build an element according to requested action + + @param action: a jingle action (see XEP-0166 §7.2), + session-* actions are not managed here + transport-replace is managed in the dedicated [transport_replace] method + @param session: jingle session data + @param content_name: name of the content + @param iq_elt: use this IQ instead of creating a new one if provided + @param context_elt: use this element instead of creating a new one if provided + @return: parent <iq> element, <transport> or <description> element, according to action + """ + # we first build iq, jingle and content element which are the same in every cases + if iq_elt is not None: + try: + jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle")) + except StopIteration: + raise exceptions.InternalError( + "The <iq> element provided doesn't have a <jingle> element" + ) + else: + iq_elt, jingle_elt = self._build_jingle_elt(client, session, action) + # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked + content_data = session["contents"][content_name] + content_elt = jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + + if context_elt is not None: + pass + elif action == XEP_0166.A_TRANSPORT_INFO: + context_elt = transport_elt = content_elt.addElement( + "transport", content_data["transport"].namespace + ) + else: + raise exceptions.InternalError(f"unmanaged action {action}") + + return iq_elt, context_elt + + def build_session_info(self, client, session): + """Build a session-info action + + @param session(dict): jingle session data + @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element + """ + return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO) + + def get_application(self, namespace: str) -> ApplicationData: + """Retreive application corresponding to a namespace + + @raise exceptions.NotFound if application can't be found + """ + try: + return self._applications[namespace] + except KeyError: + raise exceptions.NotFound( + f"No application registered for {namespace}" + ) + + def get_content_data(self, content: dict) -> ContentData: + """"Retrieve application and its argument from content""" + app_ns = content["app_ns"] + try: + application = self.get_application(app_ns) + except exceptions.NotFound as e: + raise exceptions.InternalError(str(e)) + app_args = content.get("app_args", []) + app_kwargs = content.get("app_kwargs", {}) + transport_data = content.get("transport_data", {}) + try: + content_name = content["name"] + except KeyError: + content_name = content["name"] = str(uuid.uuid4()) + return ContentData( + application, + app_args, + app_kwargs, + transport_data, + content_name + ) + + async def initiate( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + contents: List[dict], + encrypted: bool = False, + **extra_data: Any + ) -> str: + """Send a session initiation request + + @param peer_jid: jid to establith session with + @param contents: list of contents to use: + The dict must have the following keys: + - app_ns(str): namespace of the application + the following keys are optional: + - transport_type(str): type of transport to use (see XEP-0166 §8) + default to TRANSPORT_STREAMING + - name(str): name of the content + - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none + default to BOTH (see XEP-0166 §7.3) + - app_args(list): args to pass to the application plugin + - app_kwargs(dict): keyword args to pass to the application plugin + @param encrypted: if True, session must be encrypted and "encryption" must be set + to all content data of session + @return: jingle session id + """ + assert contents # there must be at least one content + if (peer_jid == client.jid + or client.is_component and peer_jid.host == client.jid.host): + raise ValueError(_("You can't do a jingle session with yourself")) + initiator = client.jid + sid = str(uuid.uuid4()) + # TODO: session cleaning after timeout ? + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": initiator, + "role": XEP_0166.ROLE_INITIATOR, + "local_jid": client.jid, + "peer_jid": peer_jid, + "started": time.time(), + "contents": {}, + **extra_data, + } + + if not await self.host.trigger.async_point( + "XEP-0166_initiate", + client, session, contents + ): + return sid + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_INITIATE + ) + jingle_elt["initiator"] = initiator.full() + + session_contents = session["contents"] + + for content in contents: + # we get the application plugin + content_data = self.get_content_data(content) + + # and the transport plugin + transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) + try: + transport = self._type_transports[transport_type][0] + except IndexError: + raise exceptions.InternalError( + "No transport registered for {}".format(transport_type) + ) + + # we build the session data for this content + session_content = { + "application": content_data.application, + "application_data": {}, + "transport": transport, + "transport_data": content_data.transport_data, + "creator": XEP_0166.ROLE_INITIATOR, + "senders": content.get("senders", "both"), + } + if content_data.content_name in session_contents: + raise exceptions.InternalError( + "There is already a content with this name" + ) + session_contents[content_data.content_name] = session_content + + # we construct the content element + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = session_content["creator"] + content_elt["name"] = content_data.content_name + try: + content_elt["senders"] = content["senders"] + except KeyError: + pass + + # then the description element + desc_elt = await utils.as_deferred( + content_data.application.handler.jingle_session_init, + client, session, content_data.content_name, + *content_data.app_args, **content_data.app_kwargs + ) + content_elt.addChild(desc_elt) + + # and the transport one + transport_elt = await utils.as_deferred( + transport.handler.jingle_session_init, + client, session, content_data.content_name, + ) + content_elt.addChild(transport_elt) + + if not await self.host.trigger.async_point( + "XEP-0166_initiate_elt_built", + client, session, iq_elt, jingle_elt + ): + return sid + if encrypted: + for content in session["contents"].values(): + if "encryption" not in content: + raise exceptions.EncryptionError( + "Encryption is requested, but no encryption has been set" + ) + + try: + await iq_elt.send() + except Exception as e: + failure_ = failure.Failure(e) + self._iq_error(failure_, sid, client) + raise failure_ + return sid + + def delayed_content_terminate(self, *args, **kwargs): + """Put content_terminate in queue but don't execute immediately + + This is used to terminate a content inside a handler, to avoid modifying contents + """ + reactor.callLater(0, self.content_terminate, *args, **kwargs) + + def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS): + """Terminate and remove a content + + if there is no more content, then session is terminated + @param session(dict): jingle session + @param content_name(unicode): name of the content terminated + @param reason(unicode): reason of the termination + """ + contents = session["contents"] + del contents[content_name] + if not contents: + self.terminate(client, reason, session) + + ## defaults methods called when plugin doesn't have them ## + + def jingle_request_confirmation_default( + self, client, action, session, content_name, desc_elt + ): + """This method request confirmation for a jingle session""" + log.debug("Using generic jingle confirmation method") + return xml_tools.defer_confirm( + self.host, + _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), + _("Confirm Jingle session"), + profile=client.profile, + ) + + ## jingle events ## + + def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: + defer.ensureDeferred(self.on_jingle_request(client, request)) + + async def on_jingle_request( + self, + client: SatXMPPEntity, + request: domish.Element + ) -> None: + """Called when any jingle request is received + + The request will then be dispatched to appropriate method + according to current state + @param request(domish.Element): received IQ request + """ + request.handled = True + jingle_elt = next(request.elements(NS_JINGLE, "jingle")) + + # first we need the session id + try: + sid = jingle_elt["sid"] + if not sid: + raise KeyError + except KeyError: + log.warning("Received jingle request has no sid attribute") + self.sendError(client, "bad-request", None, request) + return + + # then the action + try: + action = jingle_elt["action"] + if not action: + raise KeyError + except KeyError: + log.warning("Received jingle request has no action") + self.sendError(client, "bad-request", None, request) + return + + peer_jid = jid.JID(request["from"]) + + # we get or create the session + try: + session = client.jingle_sessions[sid] + except KeyError: + if action == XEP_0166.A_SESSION_INITIATE: + pass + elif action == XEP_0166.A_SESSION_TERMINATE: + log.debug( + "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) + return + else: + log.warning( + "Received request for an unknown session id: {request_id} [{profile}]".format( + request_id=sid, profile=client.profile + ) + ) + self.sendError(client, "item-not-found", None, request, "unknown-session") + return + + session = client.jingle_sessions[sid] = { + "id": sid, + "state": STATE_PENDING, + "initiator": peer_jid, + "role": XEP_0166.ROLE_RESPONDER, + # we store local_jid using request['to'] because for a component the jid + # used may not be client.jid (if a local part is used). + "local_jid": jid.JID(request['to']), + "peer_jid": peer_jid, + "started": time.time(), + } + else: + if session["peer_jid"] != peer_jid: + log.warning( + "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( + sid + ) + ) + self.sendError(client, "service-unavailable", sid, request) + return + if session["id"] != sid: + log.error("session id doesn't match") + self.sendError(client, "service-unavailable", sid, request) + raise exceptions.InternalError + + if action == XEP_0166.A_SESSION_INITIATE: + await self.on_session_initiate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_TERMINATE: + self.on_session_terminate(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_ACCEPT: + await self.on_session_accept(client, request, jingle_elt, session) + elif action == XEP_0166.A_SESSION_INFO: + self.on_session_info(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_INFO: + self.on_transport_info(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REPLACE: + await self.on_transport_replace(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_ACCEPT: + self.on_transport_accept(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REJECT: + self.on_transport_reject(client, request, jingle_elt, session) + else: + raise exceptions.InternalError(f"Unknown action {action}") + + ## Actions callbacks ## + + def _parse_elements( + self, + jingle_elt: domish.Element, + session: dict, + request: domish.Element, + client: SatXMPPEntity, + new: bool = False, + creator: str = ROLE_INITIATOR, + with_application: bool =True, + with_transport: bool = True, + store_in_session: bool = True, + ) -> Dict[str, dict]: + """Parse contents elements and fill contents_dict accordingly + + after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" + @param jingle_elt: parent <jingle> element, containing one or more <content> + @param session: session data + @param request: the whole request + @param client: %(doc_client)s + @param new: True if the content is new and must be created, + else the content must exists, and session data will be filled + @param creator: only used if new is True: creating pear (see § 7.3) + @param with_application: if True, raise an error if there is no <description> + element else ignore it + @param with_transport: if True, raise an error if there is no <transport> element + else ignore it + @param store_in_session: if True, the ``session`` contents will be updated with + the parsed elements. + Use False when you parse an action which can happen at any time (e.g. + transport-info) and meaning that a parsed element may already be present in + the session (e.g. if an authorisation request is waiting for user answer), + This can't be used when ``new`` is set. + @return: contents_dict (from session, or a new one if "store_in_session" is False) + @raise exceptions.CancelError: the error is treated and the calling method can + cancel the treatment (i.e. return) + """ + if store_in_session: + contents_dict = session["contents"] + else: + if new: + raise exceptions.InternalError( + '"store_in_session" must not be used when "new" is set' + ) + contents_dict = {n: {} for n in session["contents"]} + content_elts = jingle_elt.elements(NS_JINGLE, "content") + + for content_elt in content_elts: + name = content_elt["name"] + + if new: + # the content must not exist, we check it + if not name or name in contents_dict: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + content_data = contents_dict[name] = { + "creator": creator, + "senders": content_elt.attributes.get("senders", "both"), + } + else: + # the content must exist, we check it + try: + content_data = contents_dict[name] + except KeyError: + log.warning("Other peer try to access an unknown content") + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + # application + if with_application: + desc_elt = content_elt.description + if not desc_elt: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the application + app_ns = desc_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + try: + application = self._applications[app_ns] + except KeyError: + log.warning( + "Unmanaged application namespace [{}]".format(app_ns) + ) + self.sendError( + client, "service-unavailable", session["id"], request + ) + raise exceptions.CancelError + + content_data["application"] = application + content_data["application_data"] = {} + else: + # the content exists, we check that we have not a former desc_elt + if "desc_elt" in content_data: + raise exceptions.InternalError( + "desc_elt should not exist at this point" + ) + + content_data["desc_elt"] = desc_elt + + # transport + if with_transport: + transport_elt = content_elt.transport + if not transport_elt: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + if new: + # the content is new, we need to check and link the transport + transport_ns = transport_elt.uri + if not app_ns or app_ns == NS_JINGLE: + self.sendError(client, "bad-request", session["id"], request) + raise exceptions.CancelError + + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError( + "No transport registered for namespace {}".format( + transport_ns + ) + ) + content_data["transport"] = transport + content_data["transport_data"] = {} + else: + # the content exists, we check that we have not a former transport_elt + if "transport_elt" in content_data: + raise exceptions.InternalError( + "transport_elt should not exist at this point" + ) + + content_data["transport_elt"] = transport_elt + + return contents_dict + + def _ignore(self, client, action, session, content_name, elt): + """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins + + must be used as app_default_cb and/or transp_default_cb + """ + return elt + + def _call_plugins( + self, + client: SatXMPPEntity, + action: str, + session: dict, + app_method_name: Optional[str] = "jingle_handler", + transp_method_name: Optional[str] = "jingle_handler", + app_default_cb: Optional[Callable] = None, + transp_default_cb: Optional[Callable] = None, + delete: bool = True, + elements: bool = True, + force_element: Optional[domish.Element] = None + ) -> List[defer.Deferred]: + """Call application and transport plugin methods for all contents + + @param action: jingle action name + @param session: jingle session data + @param app_method_name: name of the method to call for applications + None to ignore + @param transp_method_name: name of the method to call for transports + None to ignore + @param app_default_cb: default callback to use if plugin has not app_method_name + None to raise an exception instead + @param transp_default_cb: default callback to use if plugin has not transp_method_name + None to raise an exception instead + @param delete: if True, remove desc_elt and transport_elt from session + ignored if elements is False + @param elements: True if elements(desc_elt and tranport_elt) must be managed + must be True if _call_plugins is used in a request, and False if it is used + after a request (i.e. on <iq> result or error) + @param force_element: if elements is False, it is used as element parameter + else it is ignored + @return : list of launched Deferred + @raise exceptions.NotFound: method is not implemented + """ + contents_dict = session["contents"] + defers_list = [] + for content_name, content_data in contents_dict.items(): + for method_name, handler_key, default_cb, elt_name in ( + (app_method_name, "application", app_default_cb, "desc_elt"), + (transp_method_name, "transport", transp_default_cb, "transport_elt"), + ): + if method_name is None: + continue + + handler = content_data[handler_key].handler + try: + method = getattr(handler, method_name) + except AttributeError: + if default_cb is None: + raise exceptions.NotFound( + "{} not implemented !".format(method_name) + ) + else: + method = default_cb + if elements: + elt = content_data.pop(elt_name) if delete else content_data[elt_name] + else: + elt = force_element + d = utils.as_deferred( + method, client, action, session, content_name, elt + ) + defers_list.append(d) + + return defers_list + + async def on_session_initiate( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: Dict[str, Any] + ) -> None: + """Called on session-initiate action + + The "jingle_request_confirmation" method of each application will be called + (or self.jingle_request_confirmation_default if the former doesn't exist). + The session is only accepted if all application are confirmed. + The application must manage itself multiple contents scenari (e.g. audio/video). + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): <jingle> element + @param session(dict): session data + """ + if "contents" in session: + raise exceptions.InternalError( + "Contents dict should not already exist at this point" + ) + session["contents"] = contents_dict = {} + + try: + self._parse_elements( + jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR + ) + except exceptions.CancelError: + return + + if not contents_dict: + # there MUST be at least one content + self.sendError(client, "bad-request", session["id"], request) + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + + + if not await self.host.trigger.async_point( + "XEP-0166_on_session_initiate", + client, session, request, jingle_elt + ): + return + + await defer.DeferredList(self._call_plugins( + client, + XEP_0166.A_PREPARE_CONFIRMATION, + session, + delete=False + )) + + # we now request each application plugin confirmation + # and if all are accepted, we can accept the session + confirm_defers = self._call_plugins( + client, + XEP_0166.A_SESSION_INITIATE, + session, + "jingle_request_confirmation", + None, + self.jingle_request_confirmation_default, + delete=False, + ) + + confirm_dlist = defer.gatherResults(confirm_defers) + confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) + confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) + + def _confirmation_cb(self, confirm_results, session, jingle_elt, client): + """Method called when confirmation from user has been received + + This method is only called for the responder + @param confirm_results(list[bool]): all True if session is accepted + @param session(dict): session data + @param jingle_elt(domish.Element): jingle data of this session + @param client: %(doc_client)s + """ + confirmed = all(confirm_results) + if not confirmed: + return self.terminate(client, XEP_0166.REASON_DECLINE, session) + + iq_elt, jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_SESSION_ACCEPT + ) + jingle_elt["responder"] = session['local_jid'].full() + + # contents + + def addElement(domish_elt, content_elt): + content_elt.addChild(domish_elt) + + defers_list = [] + + for content_name, content_data in session["contents"].items(): + content_elt = jingle_elt.addElement("content") + content_elt["creator"] = XEP_0166.ROLE_INITIATOR + content_elt["name"] = content_name + + application = content_data["application"] + app_session_accept_cb = application.handler.jingle_handler + + app_d = utils.as_deferred( + app_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("desc_elt"), + ) + app_d.addCallback(addElement, content_elt) + defers_list.append(app_d) + + transport = content_data["transport"] + transport_session_accept_cb = transport.handler.jingle_handler + + transport_d = utils.as_deferred( + transport_session_accept_cb, + client, + XEP_0166.A_SESSION_INITIATE, + session, + content_name, + content_data.pop("transport_elt"), + ) + transport_d.addCallback(addElement, content_elt) + defers_list.append(transport_d) + + d_list = defer.DeferredList(defers_list) + d_list.addCallback( + lambda __: self._call_plugins( + client, + XEP_0166.A_PREPARE_RESPONDER, + session, + app_method_name=None, + elements=False, + ) + ) + d_list.addCallback(lambda __: iq_elt.send()) + + def change_state(__, session): + session["state"] = STATE_ACTIVE + + d_list.addCallback(change_state, session) + d_list.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_ACCEPTED_ACK, session, elements=False + ) + ) + d_list.addErrback(self._iq_error, session["id"], client) + return d_list + + def on_session_terminate(self, client, request, jingle_elt, session): + # TODO: check reason, display a message to user if needed + log.debug("Jingle Session {} terminated".format(session["id"])) + try: + reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason")) + except StopIteration: + log.warning("No reason given for session termination") + reason_elt = jingle_elt.addElement("reason") + + terminate_defers = self._call_plugins( + client, + XEP_0166.A_SESSION_TERMINATE, + session, + "jingle_terminate", + "jingle_terminate", + self._ignore, + self._ignore, + elements=False, + force_element=reason_elt, + ) + terminate_dlist = defer.DeferredList(terminate_defers) + + terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"])) + client.send(xmlstream.toResponse(request, "result")) + + async def on_session_accept(self, client, request, jingle_elt, session): + """Method called once session is accepted + + This method is only called for initiator + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug(f"Jingle session {session['id']} has been accepted") + + try: + self._parse_elements(jingle_elt, session, request, client) + except exceptions.CancelError: + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + # and change the state + session["state"] = STATE_ACTIVE + + await defer.DeferredList(self._call_plugins( + client, + XEP_0166.A_PREPARE_INITIATOR, + session, + delete=False + )) + + negociate_defers = [] + negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) + + negociate_dlist = defer.gatherResults(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) + + def _on_session_cb(self, result, client, request, jingle_elt, session): + client.send(xmlstream.toResponse(request, "result")) + + def _on_session_eb(self, failure_, client, request, jingle_elt, session): + log.error("Error while handling on_session_info: {}".format(failure_.value)) + # XXX: only error managed so far, maybe some applications/transports need more + self.sendError( + client, "feature-not-implemented", None, request, "unsupported-info" + ) + + def on_session_info(self, client, request, jingle_elt, session): + """Method called when a session-info action is received from other peer + + This method is only called for initiator + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + if not jingle_elt.children: + # this is a session ping, see XEP-0166 §6.8 + client.send(xmlstream.toResponse(request, "result")) + return + + try: + # XXX: session-info is most likely only used for application, so we don't call transport plugins + # if a future transport use it, this behaviour must be adapted + defers = self._call_plugins( + client, + XEP_0166.A_SESSION_INFO, + session, + "jingle_session_info", + None, + elements=False, + force_element=jingle_elt, + ) + except exceptions.NotFound as e: + self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) + return + + dlist = defer.DeferredList(defers, fireOnOneErrback=True) + dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) + dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) + + async def on_transport_replace(self, client, request, jingle_elt, session): + """A transport change is requested + + The request is parsed, and jingle_handler is called on concerned transport plugin(s) + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug("Other peer wants to replace the transport") + try: + self._parse_elements( + jingle_elt, session, request, client, with_application=False + ) + except exceptions.CancelError: + defer.returnValue(None) + + client.send(xmlstream.toResponse(request, "result")) + + content_name = None + to_replace = [] + + for content_name, content_data in session["contents"].items(): + try: + transport_elt = content_data.pop("transport_elt") + except KeyError: + continue + transport_ns = transport_elt.uri + try: + transport = self._transports[transport_ns] + except KeyError: + log.warning( + "Other peer want to replace current transport with an unknown one: {}".format( + transport_ns + ) + ) + content_name = None + break + to_replace.append((content_name, content_data, transport, transport_elt)) + + if content_name is None: + # wa can't accept the replacement + iq_elt, reject_jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_REJECT + ) + for child in jingle_elt.children: + reject_jingle_elt.addChild(child) + + iq_elt.send() + defer.returnValue(None) + + # at this point, everything is alright and we can replace the transport(s) + # this is similar to an session-accept action, but for transports only + iq_elt, accept_jingle_elt = self._build_jingle_elt( + client, session, XEP_0166.A_TRANSPORT_ACCEPT + ) + for content_name, content_data, transport, transport_elt in to_replace: + # we can now actually replace the transport + await utils.as_deferred( + content_data["transport"].handler.jingle_handler, + client, XEP_0166.A_DESTROY, session, content_name, None + ) + content_data["transport"] = transport + content_data["transport_data"].clear() + # and build the element + content_elt = accept_jingle_elt.addElement("content") + content_elt["name"] = content_name + content_elt["creator"] = content_data["creator"] + # we notify the transport and insert its <transport/> in the answer + accept_transport_elt = await utils.as_deferred( + transport.handler.jingle_handler, + client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt + ) + content_elt.addChild(accept_transport_elt) + # there is no confirmation needed here, so we can directly prepare it + await utils.as_deferred( + transport.handler.jingle_handler, + client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None + ) + + iq_elt.send() + + def on_transport_accept(self, client, request, jingle_elt, session): + """Method called once transport replacement is accepted + + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + log.debug("new transport has been accepted") + + try: + self._parse_elements( + jingle_elt, session, request, client, with_application=False + ) + except exceptions.CancelError: + return + + # at this point we can send the <iq/> result to confirm reception of the request + client.send(xmlstream.toResponse(request, "result")) + + negociate_defers = [] + negociate_defers = self._call_plugins( + client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None + ) + + negociate_dlist = defer.DeferredList(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback( + lambda __: self._call_plugins( + client, XEP_0166.A_START, session, app_method_name=None, elements=False + ) + ) + + def on_transport_reject(self, client, request, jingle_elt, session): + """Method called when a transport replacement is refused + + @param client: %(doc_client)s + @param request(domish.Element): full <iq> request + @param jingle_elt(domish.Element): the <jingle> element + @param session(dict): session data + """ + # XXX: for now, we terminate the session in case of transport-reject + # this behaviour may change in the future + self.terminate(client, "failed-transport", session) + + def on_transport_info( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: dict + ) -> None: + """Method called when a transport-info action is received from other peer + + The request is parsed, and jingle_handler is called on concerned transport + plugin(s) + @param client: %(doc_client)s + @param request: full <iq> request + @param jingle_elt: the <jingle> element + @param session: session data + """ + log.debug(f"Jingle session {session['id']} has been accepted") + + try: + parsed_contents = self._parse_elements( + jingle_elt, session, request, client, with_application=False, + store_in_session=False + ) + except exceptions.CancelError: + return + + # The parsing was OK, we send the <iq> result + client.send(xmlstream.toResponse(request, "result")) + + for content_name, content_data in session["contents"].items(): + try: + transport_elt = parsed_contents[content_name]["transport_elt"] + except KeyError: + continue + else: + utils.as_deferred( + content_data["transport"].handler.jingle_handler, + client, + XEP_0166.A_TRANSPORT_INFO, + session, + content_name, + transport_elt, + ) + + +@implementer(iwokkel.IDisco) +class XEP_0166_handler(xmlstream.XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver( + JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []