diff sat/plugins/plugin_xep_0166/__init__.py @ 4044:3900626bc100

plugin XEP-0166: refactoring, and various improvments: - add models for transport and applications handlers and linked data - split models into separate file - some type hints - some documentation comments - add actions to prepare confirmation, useful to do initial parsing of all contents - application arg/kwargs and some transport data can be initialised during Jingle `initiate` call, this is notably useful when a call is made with transport data (this is the call for A/V calls where codecs and ICE candidate can be specified when starting a call) - session data can be specified during Jingle `initiate` call - new `store_in_session` argument in `_parse_elements`, which can be used to avoid race-condition when a context element (<decription> or <transport>) is being parsed for an action while an other action happens (like `transport-info`) - don't sed `sid` in `transport_elt` during a `transport-info` action anymore in `build_action`: this is specific to Jingle File Transfer and has been moved there rel 419
author Goffi <goffi@goffi.org>
date Mon, 15 May 2023 16:23:11 +0200
parents sat/plugins/plugin_xep_0166.py@524856bd7b19
children dd39e60ca2aa
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0166/__init__.py	Mon May 15 16:23:11 2023 +0200
@@ -0,0 +1,1390 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Jingle (XEP-0166)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import time
+from typing import Any, Callable, Dict, Final, List, Optional, Tuple
+import uuid
+
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.python import failure
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber import error
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import D_, _
+from sat.core.log import getLogger
+from sat.tools import xml_tools
+from sat.tools import utils
+
+from .models import (
+    ApplicationData,
+    BaseApplicationHandler,
+    BaseTransportHandler,
+    ContentData,
+    TransportData,
+)
+
+
+log = getLogger(__name__)
+
+
+IQ_SET : Final = '/iq[@type="set"]'
+NS_JINGLE : Final = "urn:xmpp:jingle:1"
+NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1"
+JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]'
+STATE_PENDING : Final = "PENDING"
+STATE_ACTIVE : Final = "ACTIVE"
+STATE_ENDED : Final = "ENDED"
+CONFIRM_TXT : Final = D_(
+    "{entity} want to start a jingle session with you, do you accept ?"
+)
+
+PLUGIN_INFO : Final = {
+    C.PI_NAME: "Jingle",
+    C.PI_IMPORT_NAME: "XEP-0166",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0166"],
+    C.PI_MAIN: "XEP_0166",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of Jingle"""),
+}
+
+
+class XEP_0166:
+    namespace : Final = NS_JINGLE
+
+    ROLE_INITIATOR : Final = "initiator"
+    ROLE_RESPONDER : Final = "responder"
+
+    TRANSPORT_DATAGRAM : Final = "UDP"
+    TRANSPORT_STREAMING : Final = "TCP"
+
+    REASON_SUCCESS : Final = "success"
+    REASON_DECLINE : Final = "decline"
+    REASON_FAILED_APPLICATION : Final = "failed-application"
+    REASON_FAILED_TRANSPORT : Final = "failed-transport"
+    REASON_CONNECTIVITY_ERROR : Final = "connectivity-error"
+
+    # standard actions
+
+    A_SESSION_INITIATE : Final = "session-initiate"
+    A_SESSION_ACCEPT : Final = "session-accept"
+    A_SESSION_TERMINATE : Final = "session-terminate"
+    A_SESSION_INFO : Final = "session-info"
+    A_TRANSPORT_REPLACE : Final = "transport-replace"
+    A_TRANSPORT_ACCEPT : Final = "transport-accept"
+    A_TRANSPORT_REJECT : Final = "transport-reject"
+    A_TRANSPORT_INFO : Final = "transport-info"
+
+    # non standard actions
+
+    #: called before the confirmation request, first event for responder, useful for
+    #: parsing
+    A_PREPARE_CONFIRMATION : Final = "prepare-confirmation"
+    #: initiator must prepare tranfer
+    A_PREPARE_INITIATOR : Final = "prepare-initiator"
+    #: responder must prepare tranfer
+    A_PREPARE_RESPONDER : Final = "prepare-responder"
+    #; session accepted ack has been received from initiator
+    A_ACCEPTED_ACK : Final = (
+        "accepted-ack"
+    )
+    A_START : Final = "start"  # application can start
+    #: called when a transport is destroyed (e.g. because it is remplaced). Used to do
+    #: cleaning operations
+    A_DESTROY : Final = (
+        "destroy"
+    )
+
+    def __init__(self, host):
+        log.info(_("plugin Jingle initialization"))
+        self.host = host
+        self._applications = {}  # key: namespace, value: application data
+        self._transports = {}  # key: namespace, value: transport data
+        # we also keep transports by type, they are then sorted by priority
+        self._type_transports = {
+            XEP_0166.TRANSPORT_DATAGRAM: [],
+            XEP_0166.TRANSPORT_STREAMING: [],
+        }
+
+    def profile_connected(self, client):
+        client.jingle_sessions = {}  # key = sid, value = session_data
+
+    def get_handler(self, client):
+        return XEP_0166_handler(self)
+
+    def get_session(self, client: SatXMPPEntity, session_id: str) -> dict:
+        """Retrieve session from its SID
+
+        @param session_id: session ID
+        @return: found session
+
+        @raise exceptions.NotFound: no session with this SID has been found
+        """
+        try:
+            return client.jingle_sessions[session_id]
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No session with SID {session_id} found"
+            )
+
+
+    def _del_session(self, client, sid):
+        try:
+            del client.jingle_sessions[sid]
+        except KeyError:
+            log.debug(
+                f"Jingle session id {sid!r} is unknown, nothing to delete "
+                f"[{client.profile}]")
+        else:
+            log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]")
+
+    ## helpers methods to build stanzas ##
+
+    def _build_jingle_elt(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        action: str
+    ) -> Tuple[xmlstream.IQ, domish.Element]:
+        iq_elt = client.IQ("set")
+        iq_elt["from"] = session['local_jid'].full()
+        iq_elt["to"] = session["peer_jid"].full()
+        jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
+        jingle_elt["sid"] = session["id"]
+        jingle_elt["action"] = action
+        return iq_elt, jingle_elt
+
+    def sendError(self, client, error_condition, sid, request, jingle_condition=None):
+        """Send error stanza
+
+        @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
+        @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
+        @param request(domish.Element): original request
+        @param jingle_condition(None, unicode): if not None, additional jingle-specific error information
+        """
+        iq_elt = error.StanzaError(error_condition).toResponse(request)
+        if jingle_condition is not None:
+            iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition))
+        if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid:
+            self._del_session(client, sid)
+            log.warning(
+                "Error while managing jingle session, cancelling: {condition}".format(
+                    condition=error_condition
+                )
+            )
+        return client.send(iq_elt)
+
+    def _terminate_eb(self, failure_):
+        log.warning(_("Error while terminating session: {msg}").format(msg=failure_))
+
+    def terminate(self, client, reason, session, text=None):
+        """Terminate the session
+
+        send the session-terminate action, and delete the session data
+        @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element
+            if a list of element, add them as children of the <reason/> element
+        @param session(dict): data of the session
+        """
+        iq_elt, jingle_elt = self._build_jingle_elt(
+            client, session, XEP_0166.A_SESSION_TERMINATE
+        )
+        reason_elt = jingle_elt.addElement("reason")
+        if isinstance(reason, str):
+            reason_elt.addElement(reason)
+        else:
+            for elt in reason:
+                reason_elt.addChild(elt)
+        if text is not None:
+            reason_elt.addElement("text", content=text)
+        self._del_session(client, session["id"])
+        d = iq_elt.send()
+        d.addErrback(self._terminate_eb)
+        return d
+
+    ## errors which doesn't imply a stanza sending ##
+
+    def _iq_error(self, failure_, sid, client):
+        """Called when we got an <iq/> error
+
+        @param failure_(failure.Failure): the exceptions raised
+        @param sid(unicode): jingle session id
+        """
+        log.warning(
+            "Error while sending jingle <iq/> stanza: {failure_}".format(
+                failure_=failure_.value
+            )
+        )
+        self._del_session(client, sid)
+
+    def _jingle_error_cb(self, failure_, session, request, client):
+        """Called when something is going wrong while parsing jingle request
+
+        The error condition depend of the exceptions raised:
+            exceptions.DataError raise a bad-request condition
+        @param fail(failure.Failure): the exceptions raised
+        @param session(dict): data of the session
+        @param request(domsih.Element): jingle request
+        @param client: %(doc_client)s
+        """
+        log.warning(f"Error while processing jingle request [{client.profile}]")
+        if isinstance(failure_.value, defer.FirstError):
+            failure_ = failure_.value.subFailure.value
+        if isinstance(failure_, exceptions.DataError):
+            return self.sendError(client, "bad-request", session["id"], request)
+        elif isinstance(failure_, error.StanzaError):
+            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
+                                  text=str(failure_))
+        else:
+            log.error(f"Unmanaged jingle exception: {failure_}")
+            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
+                                  text=str(failure_))
+
+    ## methods used by other plugins ##
+
+    def register_application(
+        self,
+        namespace: str,
+        handler: BaseApplicationHandler
+    ) -> None:
+        """Register an application plugin
+
+        @param namespace(unicode): application namespace managed by the plugin
+        @param handler(object): instance of a class which manage the application.
+            May have the following methods:
+                - request_confirmation(session, desc_elt, client):
+                    - if present, it is called on when session must be accepted.
+                    - if it return True the session is accepted, else rejected.
+                        A Deferred can be returned
+                    - if not present, a generic accept dialog will be used
+                - jingle_session_init(
+                        client, self, session, content_name[, *args, **kwargs]
+                    ): must return the domish.Element used for initial content
+                - jingle_handler(
+                        client, self, action, session, content_name, transport_elt
+                    ):
+                    called on several action to negociate the application or transport
+                - jingle_terminate: called on session terminate, with reason_elt
+                    May be used to clean session
+        """
+        if namespace in self._applications:
+            raise exceptions.ConflictError(
+                f"Trying to register already registered namespace {namespace}"
+            )
+        self._applications[namespace] = ApplicationData(
+            namespace=namespace, handler=handler
+        )
+        log.debug("new jingle application registered")
+
+    def register_transport(
+            self,
+            namespace: str,
+            transport_type: str,
+            handler: BaseTransportHandler,
+            priority: int = 0
+    ) -> None:
+        """Register a transport plugin
+
+        @param namespace: the XML namespace used for this transport
+        @param transport_type: type of transport to use (see XEP-0166 §8)
+        @param handler: instance of a class which manage the application.
+        @param priority: priority of this transport
+        """
+        assert transport_type in (
+            XEP_0166.TRANSPORT_DATAGRAM,
+            XEP_0166.TRANSPORT_STREAMING,
+        )
+        if namespace in self._transports:
+            raise exceptions.ConflictError(
+                "Trying to register already registered namespace {}".format(namespace)
+            )
+        transport_data = TransportData(
+            namespace=namespace, handler=handler, priority=priority
+        )
+        self._type_transports[transport_type].append(transport_data)
+        self._type_transports[transport_type].sort(
+            key=lambda transport_data: transport_data.priority, reverse=True
+        )
+        self._transports[namespace] = transport_data
+        log.debug("new jingle transport registered")
+
+    @defer.inlineCallbacks
+    def transport_replace(self, client, transport_ns, session, content_name):
+        """Replace a transport
+
+        @param transport_ns(unicode): namespace of the new transport to use
+        @param session(dict): jingle session data
+        @param content_name(unicode): name of the content
+        """
+        # XXX: for now we replace the transport before receiving confirmation from other peer
+        #      this is acceptable because we terminate the session if transport is rejected.
+        #      this behavious may change in the future.
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        try:
+            transport = self._transports[transport_ns]
+        except KeyError:
+            raise exceptions.InternalError("Unkown transport")
+        yield content_data["transport"].handler.jingle_handler(
+            client, XEP_0166.A_DESTROY, session, content_name, None
+        )
+        content_data["transport"] = transport
+        transport_data.clear()
+
+        iq_elt, jingle_elt = self._build_jingle_elt(
+            client, session, XEP_0166.A_TRANSPORT_REPLACE
+        )
+        content_elt = jingle_elt.addElement("content")
+        content_elt["name"] = content_name
+        content_elt["creator"] = content_data["creator"]
+
+        transport_elt = transport.handler.jingle_session_init(client, session, content_name)
+        content_elt.addChild(transport_elt)
+        iq_elt.send()
+
+    def build_action(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        iq_elt: Optional[xmlstream.IQ] = None,
+        context_elt: Optional[domish.Element] = None
+    ) -> Tuple[xmlstream.IQ, domish.Element]:
+        """Build an element according to requested action
+
+        @param action: a jingle action (see XEP-0166 §7.2),
+            session-* actions are not managed here
+            transport-replace is managed in the dedicated [transport_replace] method
+        @param session: jingle session data
+        @param content_name: name of the content
+        @param iq_elt: use this IQ instead of creating a new one if provided
+        @param context_elt: use this element instead of creating a new one if provided
+        @return: parent <iq> element, <transport> or <description> element, according to action
+        """
+        # we first build iq, jingle and content element which are the same in every cases
+        if iq_elt is not None:
+            try:
+                jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle"))
+            except StopIteration:
+                raise exceptions.InternalError(
+                    "The <iq> element provided doesn't have a <jingle> element"
+                )
+        else:
+            iq_elt, jingle_elt = self._build_jingle_elt(client, session, action)
+        # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked
+        content_data = session["contents"][content_name]
+        content_elt = jingle_elt.addElement("content")
+        content_elt["name"] = content_name
+        content_elt["creator"] = content_data["creator"]
+
+        if context_elt is not None:
+            pass
+        elif action == XEP_0166.A_TRANSPORT_INFO:
+            context_elt = transport_elt = content_elt.addElement(
+                "transport", content_data["transport"].namespace
+            )
+        else:
+            raise exceptions.InternalError(f"unmanaged action {action}")
+
+        return iq_elt, context_elt
+
+    def build_session_info(self, client, session):
+        """Build a session-info action
+
+        @param session(dict): jingle session data
+        @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element
+        """
+        return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO)
+
+    def get_application(self, namespace: str) -> ApplicationData:
+        """Retreive application corresponding to a namespace
+
+        @raise exceptions.NotFound if application can't be found
+        """
+        try:
+            return self._applications[namespace]
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No application registered for {namespace}"
+            )
+
+    def get_content_data(self, content: dict) -> ContentData:
+        """"Retrieve application and its argument from content"""
+        app_ns = content["app_ns"]
+        try:
+            application = self.get_application(app_ns)
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(str(e))
+        app_args = content.get("app_args", [])
+        app_kwargs = content.get("app_kwargs", {})
+        transport_data = content.get("transport_data", {})
+        try:
+            content_name = content["name"]
+        except KeyError:
+            content_name = content["name"] = str(uuid.uuid4())
+        return ContentData(
+            application,
+            app_args,
+            app_kwargs,
+            transport_data,
+            content_name
+        )
+
+    async def initiate(
+        self,
+        client: SatXMPPEntity,
+        peer_jid: jid.JID,
+        contents: List[dict],
+        encrypted: bool = False,
+        **extra_data: Any
+    ) -> str:
+        """Send a session initiation request
+
+        @param peer_jid: jid to establith session with
+        @param contents: list of contents to use:
+            The dict must have the following keys:
+                - app_ns(str): namespace of the application
+            the following keys are optional:
+                - transport_type(str): type of transport to use (see XEP-0166 §8)
+                    default to TRANSPORT_STREAMING
+                - name(str): name of the content
+                - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none
+                    default to BOTH (see XEP-0166 §7.3)
+                - app_args(list): args to pass to the application plugin
+                - app_kwargs(dict): keyword args to pass to the application plugin
+        @param encrypted: if True, session must be encrypted and "encryption" must be set
+            to all content data of session
+        @return: jingle session id
+        """
+        assert contents  # there must be at least one content
+        if (peer_jid == client.jid
+            or client.is_component and peer_jid.host == client.jid.host):
+            raise ValueError(_("You can't do a jingle session with yourself"))
+        initiator = client.jid
+        sid = str(uuid.uuid4())
+        # TODO: session cleaning after timeout ?
+        session = client.jingle_sessions[sid] = {
+            "id": sid,
+            "state": STATE_PENDING,
+            "initiator": initiator,
+            "role": XEP_0166.ROLE_INITIATOR,
+            "local_jid": client.jid,
+            "peer_jid": peer_jid,
+            "started": time.time(),
+            "contents": {},
+            **extra_data,
+        }
+
+        if not await self.host.trigger.async_point(
+            "XEP-0166_initiate",
+            client, session, contents
+        ):
+            return sid
+
+        iq_elt, jingle_elt = self._build_jingle_elt(
+            client, session, XEP_0166.A_SESSION_INITIATE
+        )
+        jingle_elt["initiator"] = initiator.full()
+
+        session_contents = session["contents"]
+
+        for content in contents:
+            # we get the application plugin
+            content_data = self.get_content_data(content)
+
+            # and the transport plugin
+            transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING)
+            try:
+                transport = self._type_transports[transport_type][0]
+            except IndexError:
+                raise exceptions.InternalError(
+                    "No transport registered for {}".format(transport_type)
+                )
+
+            # we build the session data for this content
+            session_content = {
+                "application": content_data.application,
+                "application_data": {},
+                "transport": transport,
+                "transport_data": content_data.transport_data,
+                "creator": XEP_0166.ROLE_INITIATOR,
+                "senders": content.get("senders", "both"),
+            }
+            if content_data.content_name in session_contents:
+                raise exceptions.InternalError(
+                    "There is already a content with this name"
+                )
+            session_contents[content_data.content_name] = session_content
+
+            # we construct the content element
+            content_elt = jingle_elt.addElement("content")
+            content_elt["creator"] = session_content["creator"]
+            content_elt["name"] = content_data.content_name
+            try:
+                content_elt["senders"] = content["senders"]
+            except KeyError:
+                pass
+
+            # then the description element
+            desc_elt = await utils.as_deferred(
+                content_data.application.handler.jingle_session_init,
+                client, session, content_data.content_name,
+                *content_data.app_args, **content_data.app_kwargs
+            )
+            content_elt.addChild(desc_elt)
+
+            # and the transport one
+            transport_elt = await utils.as_deferred(
+                transport.handler.jingle_session_init,
+                client, session, content_data.content_name,
+            )
+            content_elt.addChild(transport_elt)
+
+        if not await self.host.trigger.async_point(
+            "XEP-0166_initiate_elt_built",
+            client, session, iq_elt, jingle_elt
+        ):
+            return sid
+        if encrypted:
+            for content in session["contents"].values():
+                if "encryption" not in content:
+                    raise exceptions.EncryptionError(
+                        "Encryption is requested, but no encryption has been set"
+                    )
+
+        try:
+            await iq_elt.send()
+        except Exception as e:
+            failure_ = failure.Failure(e)
+            self._iq_error(failure_, sid, client)
+            raise failure_
+        return sid
+
+    def delayed_content_terminate(self, *args, **kwargs):
+        """Put content_terminate in queue but don't execute immediately
+
+        This is used to terminate a content inside a handler, to avoid modifying contents
+        """
+        reactor.callLater(0, self.content_terminate, *args, **kwargs)
+
+    def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS):
+        """Terminate and remove a content
+
+        if there is no more content, then session is terminated
+        @param session(dict): jingle session
+        @param content_name(unicode): name of the content terminated
+        @param reason(unicode): reason of the termination
+        """
+        contents = session["contents"]
+        del contents[content_name]
+        if not contents:
+            self.terminate(client, reason, session)
+
+    ## defaults methods called when plugin doesn't have them ##
+
+    def jingle_request_confirmation_default(
+        self, client, action, session, content_name, desc_elt
+    ):
+        """This method request confirmation for a jingle session"""
+        log.debug("Using generic jingle confirmation method")
+        return xml_tools.defer_confirm(
+            self.host,
+            _(CONFIRM_TXT).format(entity=session["peer_jid"].full()),
+            _("Confirm Jingle session"),
+            profile=client.profile,
+        )
+
+    ## jingle events ##
+
+    def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None:
+        defer.ensureDeferred(self.on_jingle_request(client, request))
+
+    async def on_jingle_request(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element
+    ) -> None:
+        """Called when any jingle request is received
+
+        The request will then be dispatched to appropriate method
+        according to current state
+        @param request(domish.Element): received IQ request
+        """
+        request.handled = True
+        jingle_elt = next(request.elements(NS_JINGLE, "jingle"))
+
+        # first we need the session id
+        try:
+            sid = jingle_elt["sid"]
+            if not sid:
+                raise KeyError
+        except KeyError:
+            log.warning("Received jingle request has no sid attribute")
+            self.sendError(client, "bad-request", None, request)
+            return
+
+        # then the action
+        try:
+            action = jingle_elt["action"]
+            if not action:
+                raise KeyError
+        except KeyError:
+            log.warning("Received jingle request has no action")
+            self.sendError(client, "bad-request", None, request)
+            return
+
+        peer_jid = jid.JID(request["from"])
+
+        # we get or create the session
+        try:
+            session = client.jingle_sessions[sid]
+        except KeyError:
+            if action == XEP_0166.A_SESSION_INITIATE:
+                pass
+            elif action == XEP_0166.A_SESSION_TERMINATE:
+                log.debug(
+                    "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format(
+                        request_id=sid, profile=client.profile
+                    )
+                )
+                return
+            else:
+                log.warning(
+                    "Received request for an unknown session id: {request_id} [{profile}]".format(
+                        request_id=sid, profile=client.profile
+                    )
+                )
+                self.sendError(client, "item-not-found", None, request, "unknown-session")
+                return
+
+            session = client.jingle_sessions[sid] = {
+                "id": sid,
+                "state": STATE_PENDING,
+                "initiator": peer_jid,
+                "role": XEP_0166.ROLE_RESPONDER,
+                # we store local_jid using request['to'] because for a component the jid
+                # used may not be client.jid (if a local part is used).
+                "local_jid": jid.JID(request['to']),
+                "peer_jid": peer_jid,
+                "started": time.time(),
+            }
+        else:
+            if session["peer_jid"] != peer_jid:
+                log.warning(
+                    "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(
+                        sid
+                    )
+                )
+                self.sendError(client, "service-unavailable", sid, request)
+                return
+            if session["id"] != sid:
+                log.error("session id doesn't match")
+                self.sendError(client, "service-unavailable", sid, request)
+                raise exceptions.InternalError
+
+        if action == XEP_0166.A_SESSION_INITIATE:
+            await self.on_session_initiate(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_SESSION_TERMINATE:
+            self.on_session_terminate(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_SESSION_ACCEPT:
+            await self.on_session_accept(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_SESSION_INFO:
+            self.on_session_info(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_INFO:
+            self.on_transport_info(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_REPLACE:
+            await self.on_transport_replace(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_ACCEPT:
+            self.on_transport_accept(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_REJECT:
+            self.on_transport_reject(client, request, jingle_elt, session)
+        else:
+            raise exceptions.InternalError(f"Unknown action {action}")
+
+    ## Actions callbacks ##
+
+    def _parse_elements(
+        self,
+        jingle_elt: domish.Element,
+        session: dict,
+        request: domish.Element,
+        client: SatXMPPEntity,
+        new: bool = False,
+        creator: str = ROLE_INITIATOR,
+        with_application: bool =True,
+        with_transport: bool = True,
+        store_in_session: bool = True,
+    ) -> Dict[str, dict]:
+        """Parse contents elements and fill contents_dict accordingly
+
+        after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt"
+        @param jingle_elt: parent <jingle> element, containing one or more <content>
+        @param session: session data
+        @param request: the whole request
+        @param client: %(doc_client)s
+        @param new: True if the content is new and must be created,
+            else the content must exists, and session data will be filled
+        @param creator: only used if new is True: creating pear (see § 7.3)
+        @param with_application: if True, raise an error if there is no <description>
+            element else ignore it
+        @param with_transport: if True, raise an error if there is no <transport> element
+            else ignore it
+        @param store_in_session: if True, the ``session`` contents will be updated with
+        the parsed elements.
+            Use False when you parse an action which can happen at any time (e.g.
+            transport-info) and meaning that a parsed element may already be present in
+            the session (e.g. if an authorisation request is waiting for user answer),
+            This can't be used when ``new`` is set.
+        @return: contents_dict (from session, or a new one if "store_in_session" is False)
+        @raise exceptions.CancelError: the error is treated and the calling method can
+            cancel the treatment (i.e. return)
+        """
+        if store_in_session:
+            contents_dict = session["contents"]
+        else:
+            if new:
+                raise exceptions.InternalError(
+                    '"store_in_session" must not be used when "new" is set'
+                )
+            contents_dict = {n: {} for n in session["contents"]}
+        content_elts = jingle_elt.elements(NS_JINGLE, "content")
+
+        for content_elt in content_elts:
+            name = content_elt["name"]
+
+            if new:
+                # the content must not exist, we check it
+                if not name or name in contents_dict:
+                    self.sendError(client, "bad-request", session["id"], request)
+                    raise exceptions.CancelError
+                content_data = contents_dict[name] = {
+                    "creator": creator,
+                    "senders": content_elt.attributes.get("senders", "both"),
+                }
+            else:
+                # the content must exist, we check it
+                try:
+                    content_data = contents_dict[name]
+                except KeyError:
+                    log.warning("Other peer try to access an unknown content")
+                    self.sendError(client, "bad-request", session["id"], request)
+                    raise exceptions.CancelError
+
+            # application
+            if with_application:
+                desc_elt = content_elt.description
+                if not desc_elt:
+                    self.sendError(client, "bad-request", session["id"], request)
+                    raise exceptions.CancelError
+
+                if new:
+                    # the content is new, we need to check and link the application
+                    app_ns = desc_elt.uri
+                    if not app_ns or app_ns == NS_JINGLE:
+                        self.sendError(client, "bad-request", session["id"], request)
+                        raise exceptions.CancelError
+
+                    try:
+                        application = self._applications[app_ns]
+                    except KeyError:
+                        log.warning(
+                            "Unmanaged application namespace [{}]".format(app_ns)
+                        )
+                        self.sendError(
+                            client, "service-unavailable", session["id"], request
+                        )
+                        raise exceptions.CancelError
+
+                    content_data["application"] = application
+                    content_data["application_data"] = {}
+                else:
+                    # the content exists, we check that we have not a former desc_elt
+                    if "desc_elt" in content_data:
+                        raise exceptions.InternalError(
+                            "desc_elt should not exist at this point"
+                        )
+
+                content_data["desc_elt"] = desc_elt
+
+            # transport
+            if with_transport:
+                transport_elt = content_elt.transport
+                if not transport_elt:
+                    self.sendError(client, "bad-request", session["id"], request)
+                    raise exceptions.CancelError
+
+                if new:
+                    # the content is new, we need to check and link the transport
+                    transport_ns = transport_elt.uri
+                    if not app_ns or app_ns == NS_JINGLE:
+                        self.sendError(client, "bad-request", session["id"], request)
+                        raise exceptions.CancelError
+
+                    try:
+                        transport = self._transports[transport_ns]
+                    except KeyError:
+                        raise exceptions.InternalError(
+                            "No transport registered for namespace {}".format(
+                                transport_ns
+                            )
+                        )
+                    content_data["transport"] = transport
+                    content_data["transport_data"] = {}
+                else:
+                    # the content exists, we check that we have not a former transport_elt
+                    if "transport_elt" in content_data:
+                        raise exceptions.InternalError(
+                            "transport_elt should not exist at this point"
+                        )
+
+                content_data["transport_elt"] = transport_elt
+
+        return contents_dict
+
+    def _ignore(self, client, action, session, content_name, elt):
+        """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins
+
+        must be used as app_default_cb and/or transp_default_cb
+        """
+        return elt
+
+    def _call_plugins(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        app_method_name: Optional[str] = "jingle_handler",
+        transp_method_name: Optional[str] = "jingle_handler",
+        app_default_cb: Optional[Callable] = None,
+        transp_default_cb: Optional[Callable] = None,
+        delete: bool = True,
+        elements: bool = True,
+        force_element: Optional[domish.Element] = None
+    ) -> List[defer.Deferred]:
+        """Call application and transport plugin methods for all contents
+
+        @param action: jingle action name
+        @param session: jingle session data
+        @param app_method_name: name of the method to call for applications
+            None to ignore
+        @param transp_method_name: name of the method to call for transports
+            None to ignore
+        @param app_default_cb: default callback to use if plugin has not app_method_name
+            None to raise an exception instead
+        @param transp_default_cb: default callback to use if plugin has not transp_method_name
+            None to raise an exception instead
+        @param delete: if True, remove desc_elt and transport_elt from session
+            ignored if elements is False
+        @param elements: True if elements(desc_elt and tranport_elt) must be managed
+            must be True if _call_plugins is used in a request, and False if it is used
+            after a request (i.e. on <iq> result or error)
+        @param force_element: if elements is False, it is used as element parameter
+            else it is ignored
+        @return : list of launched Deferred
+        @raise exceptions.NotFound: method is not implemented
+        """
+        contents_dict = session["contents"]
+        defers_list = []
+        for content_name, content_data in contents_dict.items():
+            for method_name, handler_key, default_cb, elt_name in (
+                (app_method_name, "application", app_default_cb, "desc_elt"),
+                (transp_method_name, "transport", transp_default_cb, "transport_elt"),
+            ):
+                if method_name is None:
+                    continue
+
+                handler = content_data[handler_key].handler
+                try:
+                    method = getattr(handler, method_name)
+                except AttributeError:
+                    if default_cb is None:
+                        raise exceptions.NotFound(
+                            "{} not implemented !".format(method_name)
+                        )
+                    else:
+                        method = default_cb
+                if elements:
+                    elt = content_data.pop(elt_name) if delete else content_data[elt_name]
+                else:
+                    elt = force_element
+                d = utils.as_deferred(
+                    method, client, action, session, content_name, elt
+                )
+                defers_list.append(d)
+
+        return defers_list
+
+    async def on_session_initiate(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element,
+        jingle_elt: domish.Element,
+        session: Dict[str, Any]
+    ) -> None:
+        """Called on session-initiate action
+
+        The "jingle_request_confirmation" method of each application will be called
+        (or self.jingle_request_confirmation_default if the former doesn't exist).
+        The session is only accepted if all application are confirmed.
+        The application must manage itself multiple contents scenari (e.g. audio/video).
+        @param client: %(doc_client)s
+        @param request(domish.Element): full request
+        @param jingle_elt(domish.Element): <jingle> element
+        @param session(dict): session data
+        """
+        if "contents" in session:
+            raise exceptions.InternalError(
+                "Contents dict should not already exist at this point"
+            )
+        session["contents"] = contents_dict = {}
+
+        try:
+            self._parse_elements(
+                jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR
+            )
+        except exceptions.CancelError:
+            return
+
+        if not contents_dict:
+            # there MUST be at least one content
+            self.sendError(client, "bad-request", session["id"], request)
+            return
+
+        # at this point we can send the <iq/> result to confirm reception of the request
+        client.send(xmlstream.toResponse(request, "result"))
+
+
+        if not await self.host.trigger.async_point(
+            "XEP-0166_on_session_initiate",
+            client, session, request, jingle_elt
+        ):
+            return
+
+        await defer.DeferredList(self._call_plugins(
+            client,
+            XEP_0166.A_PREPARE_CONFIRMATION,
+            session,
+            delete=False
+        ))
+
+        # we now request each application plugin confirmation
+        # and if all are accepted, we can accept the session
+        confirm_defers = self._call_plugins(
+            client,
+            XEP_0166.A_SESSION_INITIATE,
+            session,
+            "jingle_request_confirmation",
+            None,
+            self.jingle_request_confirmation_default,
+            delete=False,
+        )
+
+        confirm_dlist = defer.gatherResults(confirm_defers)
+        confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client)
+        confirm_dlist.addErrback(self._jingle_error_cb, session, request, client)
+
+    def _confirmation_cb(self, confirm_results, session, jingle_elt, client):
+        """Method called when confirmation from user has been received
+
+        This method is only called for the responder
+        @param confirm_results(list[bool]): all True if session is accepted
+        @param session(dict): session data
+        @param jingle_elt(domish.Element): jingle data of this session
+        @param client: %(doc_client)s
+        """
+        confirmed = all(confirm_results)
+        if not confirmed:
+            return self.terminate(client, XEP_0166.REASON_DECLINE, session)
+
+        iq_elt, jingle_elt = self._build_jingle_elt(
+            client, session, XEP_0166.A_SESSION_ACCEPT
+        )
+        jingle_elt["responder"] = session['local_jid'].full()
+
+        # contents
+
+        def addElement(domish_elt, content_elt):
+            content_elt.addChild(domish_elt)
+
+        defers_list = []
+
+        for content_name, content_data in session["contents"].items():
+            content_elt = jingle_elt.addElement("content")
+            content_elt["creator"] = XEP_0166.ROLE_INITIATOR
+            content_elt["name"] = content_name
+
+            application = content_data["application"]
+            app_session_accept_cb = application.handler.jingle_handler
+
+            app_d = utils.as_deferred(
+                app_session_accept_cb,
+                client,
+                XEP_0166.A_SESSION_INITIATE,
+                session,
+                content_name,
+                content_data.pop("desc_elt"),
+            )
+            app_d.addCallback(addElement, content_elt)
+            defers_list.append(app_d)
+
+            transport = content_data["transport"]
+            transport_session_accept_cb = transport.handler.jingle_handler
+
+            transport_d = utils.as_deferred(
+                transport_session_accept_cb,
+                client,
+                XEP_0166.A_SESSION_INITIATE,
+                session,
+                content_name,
+                content_data.pop("transport_elt"),
+            )
+            transport_d.addCallback(addElement, content_elt)
+            defers_list.append(transport_d)
+
+        d_list = defer.DeferredList(defers_list)
+        d_list.addCallback(
+            lambda __: self._call_plugins(
+                client,
+                XEP_0166.A_PREPARE_RESPONDER,
+                session,
+                app_method_name=None,
+                elements=False,
+            )
+        )
+        d_list.addCallback(lambda __: iq_elt.send())
+
+        def change_state(__, session):
+            session["state"] = STATE_ACTIVE
+
+        d_list.addCallback(change_state, session)
+        d_list.addCallback(
+            lambda __: self._call_plugins(
+                client, XEP_0166.A_ACCEPTED_ACK, session, elements=False
+            )
+        )
+        d_list.addErrback(self._iq_error, session["id"], client)
+        return d_list
+
+    def on_session_terminate(self, client, request, jingle_elt, session):
+        # TODO: check reason, display a message to user if needed
+        log.debug("Jingle Session {} terminated".format(session["id"]))
+        try:
+            reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason"))
+        except StopIteration:
+            log.warning("No reason given for session termination")
+            reason_elt = jingle_elt.addElement("reason")
+
+        terminate_defers = self._call_plugins(
+            client,
+            XEP_0166.A_SESSION_TERMINATE,
+            session,
+            "jingle_terminate",
+            "jingle_terminate",
+            self._ignore,
+            self._ignore,
+            elements=False,
+            force_element=reason_elt,
+        )
+        terminate_dlist = defer.DeferredList(terminate_defers)
+
+        terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"]))
+        client.send(xmlstream.toResponse(request, "result"))
+
+    async def on_session_accept(self, client, request, jingle_elt, session):
+        """Method called once session is accepted
+
+        This method is only called for initiator
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        log.debug(f"Jingle session {session['id']} has been accepted")
+
+        try:
+            self._parse_elements(jingle_elt, session, request, client)
+        except exceptions.CancelError:
+            return
+
+        # at this point we can send the <iq/> result to confirm reception of the request
+        client.send(xmlstream.toResponse(request, "result"))
+        # and change the state
+        session["state"] = STATE_ACTIVE
+
+        await defer.DeferredList(self._call_plugins(
+            client,
+            XEP_0166.A_PREPARE_INITIATOR,
+            session,
+            delete=False
+        ))
+
+        negociate_defers = []
+        negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
+
+        negociate_dlist = defer.gatherResults(negociate_defers)
+
+        # after negociations we start the transfer
+        negociate_dlist.addCallback(
+            lambda __: self._call_plugins(
+                client, XEP_0166.A_START, session, app_method_name=None, elements=False
+            )
+        )
+
+    def _on_session_cb(self, result, client, request, jingle_elt, session):
+        client.send(xmlstream.toResponse(request, "result"))
+
+    def _on_session_eb(self, failure_, client, request, jingle_elt, session):
+        log.error("Error while handling on_session_info: {}".format(failure_.value))
+        # XXX: only error managed so far, maybe some applications/transports need more
+        self.sendError(
+            client, "feature-not-implemented", None, request, "unsupported-info"
+        )
+
+    def on_session_info(self, client, request, jingle_elt, session):
+        """Method called when a session-info action is received from other peer
+
+        This method is only called for initiator
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        if not jingle_elt.children:
+            # this is a session ping, see XEP-0166 §6.8
+            client.send(xmlstream.toResponse(request, "result"))
+            return
+
+        try:
+            # XXX: session-info is most likely only used for application, so we don't call transport plugins
+            #      if a future transport use it, this behaviour must be adapted
+            defers = self._call_plugins(
+                client,
+                XEP_0166.A_SESSION_INFO,
+                session,
+                "jingle_session_info",
+                None,
+                elements=False,
+                force_element=jingle_elt,
+            )
+        except exceptions.NotFound as e:
+            self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session)
+            return
+
+        dlist = defer.DeferredList(defers, fireOnOneErrback=True)
+        dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session)
+        dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session)
+
+    async def on_transport_replace(self, client, request, jingle_elt, session):
+        """A transport change is requested
+
+        The request is parsed, and jingle_handler is called on concerned transport plugin(s)
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        log.debug("Other peer wants to replace the transport")
+        try:
+            self._parse_elements(
+                jingle_elt, session, request, client, with_application=False
+            )
+        except exceptions.CancelError:
+            defer.returnValue(None)
+
+        client.send(xmlstream.toResponse(request, "result"))
+
+        content_name = None
+        to_replace = []
+
+        for content_name, content_data in session["contents"].items():
+            try:
+                transport_elt = content_data.pop("transport_elt")
+            except KeyError:
+                continue
+            transport_ns = transport_elt.uri
+            try:
+                transport = self._transports[transport_ns]
+            except KeyError:
+                log.warning(
+                    "Other peer want to replace current transport with an unknown one: {}".format(
+                        transport_ns
+                    )
+                )
+                content_name = None
+                break
+            to_replace.append((content_name, content_data, transport, transport_elt))
+
+        if content_name is None:
+            # wa can't accept the replacement
+            iq_elt, reject_jingle_elt = self._build_jingle_elt(
+                client, session, XEP_0166.A_TRANSPORT_REJECT
+            )
+            for child in jingle_elt.children:
+                reject_jingle_elt.addChild(child)
+
+            iq_elt.send()
+            defer.returnValue(None)
+
+        # at this point, everything is alright and we can replace the transport(s)
+        # this is similar to an session-accept action, but for transports only
+        iq_elt, accept_jingle_elt = self._build_jingle_elt(
+            client, session, XEP_0166.A_TRANSPORT_ACCEPT
+        )
+        for content_name, content_data, transport, transport_elt in to_replace:
+            # we can now actually replace the transport
+            await utils.as_deferred(
+                content_data["transport"].handler.jingle_handler,
+                client, XEP_0166.A_DESTROY, session, content_name, None
+            )
+            content_data["transport"] = transport
+            content_data["transport_data"].clear()
+            # and build the element
+            content_elt = accept_jingle_elt.addElement("content")
+            content_elt["name"] = content_name
+            content_elt["creator"] = content_data["creator"]
+            # we notify the transport and insert its <transport/> in the answer
+            accept_transport_elt = await utils.as_deferred(
+                transport.handler.jingle_handler,
+                client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt
+            )
+            content_elt.addChild(accept_transport_elt)
+            # there is no confirmation needed here, so we can directly prepare it
+            await utils.as_deferred(
+                transport.handler.jingle_handler,
+                client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None
+            )
+
+        iq_elt.send()
+
+    def on_transport_accept(self, client, request, jingle_elt, session):
+        """Method called once transport replacement is accepted
+
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        log.debug("new transport has been accepted")
+
+        try:
+            self._parse_elements(
+                jingle_elt, session, request, client, with_application=False
+            )
+        except exceptions.CancelError:
+            return
+
+        # at this point we can send the <iq/> result to confirm reception of the request
+        client.send(xmlstream.toResponse(request, "result"))
+
+        negociate_defers = []
+        negociate_defers = self._call_plugins(
+            client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None
+        )
+
+        negociate_dlist = defer.DeferredList(negociate_defers)
+
+        # after negociations we start the transfer
+        negociate_dlist.addCallback(
+            lambda __: self._call_plugins(
+                client, XEP_0166.A_START, session, app_method_name=None, elements=False
+            )
+        )
+
+    def on_transport_reject(self, client, request, jingle_elt, session):
+        """Method called when a transport replacement is refused
+
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        # XXX: for now, we terminate the session in case of transport-reject
+        #      this behaviour may change in the future
+        self.terminate(client, "failed-transport", session)
+
+    def on_transport_info(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element,
+        jingle_elt: domish.Element,
+        session: dict
+    ) -> None:
+        """Method called when a transport-info action is received from other peer
+
+        The request is parsed, and jingle_handler is called on concerned transport
+        plugin(s)
+        @param client: %(doc_client)s
+        @param request: full <iq> request
+        @param jingle_elt: the <jingle> element
+        @param session: session data
+        """
+        log.debug(f"Jingle session {session['id']} has been accepted")
+
+        try:
+            parsed_contents = self._parse_elements(
+                jingle_elt, session, request, client, with_application=False,
+                store_in_session=False
+            )
+        except exceptions.CancelError:
+            return
+
+        # The parsing was OK, we send the <iq> result
+        client.send(xmlstream.toResponse(request, "result"))
+
+        for content_name, content_data in session["contents"].items():
+            try:
+                transport_elt = parsed_contents[content_name]["transport_elt"]
+            except KeyError:
+                continue
+            else:
+                utils.as_deferred(
+                    content_data["transport"].handler.jingle_handler,
+                    client,
+                    XEP_0166.A_TRANSPORT_INFO,
+                    session,
+                    content_name,
+                    transport_elt,
+                )
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0166_handler(xmlstream.XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(
+            JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent
+        )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_JINGLE)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []