view libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4151:18026ce0819c

core (xmpp): message reception workflow refactoring: - Call methods from a root async one instead of using Deferred callbacks chain. - Use a queue to be sure to process messages in order.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 14:50:35 +0100
parents 23fa52acf72c
children e11b13418ba6
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Jingle (XEP-0166)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import time
from typing import Any, Callable, Dict, Final, List, Optional, Tuple
import uuid

from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import failure
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber import error
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import xml_tools
from libervia.backend.tools import utils

from .models import (
    ApplicationData,
    BaseApplicationHandler,
    BaseTransportHandler,
    ContentData,
    TransportData,
)


log = getLogger(__name__)


IQ_SET : Final = '/iq[@type="set"]'
NS_JINGLE : Final = "urn:xmpp:jingle:1"
NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1"
JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]'
CONFIRM_TXT : Final = D_(
    "{entity} want to start a jingle session with you, do you accept ?"
)

PLUGIN_INFO : Final = {
    C.PI_NAME: "Jingle",
    C.PI_IMPORT_NAME: "XEP-0166",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0166"],
    C.PI_MAIN: "XEP_0166",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle"""),
}


class XEP_0166:
    namespace : Final = NS_JINGLE

    ROLE_INITIATOR : Final = "initiator"
    ROLE_RESPONDER : Final = "responder"

    TRANSPORT_DATAGRAM : Final = "UDP"
    TRANSPORT_STREAMING : Final = "TCP"

    REASON_SUCCESS : Final = "success"
    REASON_DECLINE : Final = "decline"
    REASON_FAILED_APPLICATION : Final = "failed-application"
    REASON_FAILED_TRANSPORT : Final = "failed-transport"
    REASON_CONNECTIVITY_ERROR : Final = "connectivity-error"

    STATE_PENDING : Final = "PENDING"
    STATE_ACTIVE : Final = "ACTIVE"
    STATE_ENDED : Final = "ENDED"

    # standard actions

    A_SESSION_INITIATE : Final = "session-initiate"
    A_SESSION_ACCEPT : Final = "session-accept"
    A_SESSION_TERMINATE : Final = "session-terminate"
    A_SESSION_INFO : Final = "session-info"
    A_TRANSPORT_REPLACE : Final = "transport-replace"
    A_TRANSPORT_ACCEPT : Final = "transport-accept"
    A_TRANSPORT_REJECT : Final = "transport-reject"
    A_TRANSPORT_INFO : Final = "transport-info"

    # non standard actions

    #: called before the confirmation request, first event for responder, useful for
    #: parsing
    A_PREPARE_CONFIRMATION : Final = "prepare-confirmation"
    #: initiator must prepare tranfer
    A_PREPARE_INITIATOR : Final = "prepare-initiator"
    #: responder must prepare tranfer
    A_PREPARE_RESPONDER : Final = "prepare-responder"
    #; session accepted ack has been received from initiator
    A_ACCEPTED_ACK : Final = (
        "accepted-ack"
    )
    A_START : Final = "start"  # application can start
    #: called when a transport is destroyed (e.g. because it is remplaced). Used to do
    #: cleaning operations
    A_DESTROY : Final = (
        "destroy"
    )

    def __init__(self, host):
        log.info(_("plugin Jingle initialization"))
        self.host = host
        self._applications = {}  # key: namespace, value: application data
        self._transports = {}  # key: namespace, value: transport data
        # we also keep transports by type, they are then sorted by priority
        self._type_transports = {
            XEP_0166.TRANSPORT_DATAGRAM: [],
            XEP_0166.TRANSPORT_STREAMING: [],
        }
        host.bridge.add_method(
            "jingle_terminate",
            ".plugin",
            in_sign="ssss",
            out_sign="",
            method=self._terminate,
            async_=True,
        )

    def profile_connected(self, client):
        client.jingle_sessions = {}  # key = sid, value = session_data

    def get_handler(self, client):
        return XEP_0166_handler(self)

    def get_session(self, client: SatXMPPEntity, session_id: str) -> dict:
        """Retrieve session from its SID

        @param session_id: session ID
        @return: found session

        @raise exceptions.NotFound: no session with this SID has been found
        """
        try:
            return client.jingle_sessions[session_id]
        except KeyError:
            raise exceptions.NotFound(
                f"No session with SID {session_id} found"
            )

    def create_session(
        self,
        client: SatXMPPEntity,
        sid: str,
        role: str,
        peer_jid: jid.JID,
        local_jid: jid.JID|None = None,
        **kwargs
    ) -> dict:
        """Create a new jingle session.

        @param client: The client entity.
        @param sid: Session ID.
        @param role: Session role (initiator or responder).
        @param peer_jid: JID of the peer.
        @param local_jid: JID of the local entity.
         If None, defaults to client.jid.
        @param extra_data: Additional data to be added to the session. Defaults to None.

        @return: The created session.

        @raise ValueError: If the provided role is neither initiator nor responder.
        """
        # TODO: session cleaning after timeout ?

        if not sid:
            raise exceptions.DataError("Empty session ID is not allowed")

        if role not in [XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER]:
            raise ValueError(f"Invalid role {role}. Expected initiator or responder.")


        session_data = {
            "id": sid,
            "state": XEP_0166.STATE_PENDING,
            "initiator": client.jid if role == XEP_0166.ROLE_INITIATOR else peer_jid,
            "role": role,
            "local_jid": local_jid or client.jid,
            "peer_jid": peer_jid,
            "started": time.time(),
            "contents": {}
        }

        # If extra kw args are provided, merge them into the session_data
        if kwargs:
            session_data.update(kwargs)

        # Add the session to the client's jingle sessions
        client.jingle_sessions[sid] = session_data

        return session_data


    def delete_session(self, client, sid):
        try:
            del client.jingle_sessions[sid]
        except KeyError:
            log.debug(
                f"Jingle session id {sid!r} is unknown, nothing to delete "
                f"[{client.profile}]")
        else:
            log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]")

    ## helpers methods to build stanzas ##

    def _build_jingle_elt(
        self,
        client: SatXMPPEntity,
        session: dict,
        action: str
    ) -> Tuple[xmlstream.IQ, domish.Element]:
        iq_elt = client.IQ("set")
        iq_elt["from"] = session['local_jid'].full()
        iq_elt["to"] = session["peer_jid"].full()
        jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
        jingle_elt["sid"] = session["id"]
        jingle_elt["action"] = action
        return iq_elt, jingle_elt

    def sendError(self, client, error_condition, sid, request, jingle_condition=None):
        """Send error stanza

        @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys
        @param sid(unicode,None): jingle session id, or None, if session must not be destroyed
        @param request(domish.Element): original request
        @param jingle_condition(None, unicode): if not None, additional jingle-specific error information
        """
        iq_elt = error.StanzaError(error_condition).toResponse(request)
        if jingle_condition is not None:
            iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition))
        if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid:
            self.delete_session(client, sid)
            log.warning(
                "Error while managing jingle session, cancelling: {condition}".format(
                    condition=error_condition
                )
            )
        return client.send(iq_elt)

    def _terminate_eb(self, failure_):
        log.warning(_("Error while terminating session: {msg}").format(msg=failure_))

    def _terminate(
        self,
        session_id: str,
        reason: str,
        reason_txt: str,
        profile: str
    ) -> defer.Deferred:
        client = self.host.get_client(profile)
        session = self.get_session(client, session_id)
        if reason not in ("", "cancel", "decline", "busy"):
            raise ValueError(
                'only "cancel", "decline" and "busy" and empty value are allowed'
            )
        return self.terminate(
            client,
            reason or None,
            session,
            text=reason_txt or None
        )

    def terminate(
            self,
            client: SatXMPPEntity,
            reason: str|list[domish.Element]|None,
            session: dict,
            text: str|None = None
    ) -> defer.Deferred:
        """Terminate the session

        send the session-terminate action, and delete the session data
        @param reason: if unicode, will be transformed to an element
            if a list of element, add them as children of the <reason/> element
        @param session: data of the session
        """
        iq_elt, jingle_elt = self._build_jingle_elt(
            client, session, XEP_0166.A_SESSION_TERMINATE
        )
        if reason is not None:
            reason_elt = jingle_elt.addElement("reason")
            if isinstance(reason, str):
                reason_elt.addElement(reason)
            else:
                for elt in reason:
                    reason_elt.addChild(elt)
        else:
            reason_elt = None
        if text is not None:
            if reason_elt is None:
                raise ValueError(
                    "You have to specify a reason if text is specified"
                )
            reason_elt.addElement("text", content=text)
        if not self.host.trigger.point(
            "XEP-0166_terminate",
            client, session, reason_elt
        ):
            return defer.succeed(None)
        self.delete_session(client, session["id"])
        d = iq_elt.send()
        d.addErrback(self._terminate_eb)
        return d

    ## errors which doesn't imply a stanza sending ##

    def _iq_error(self, failure_, sid, client):
        """Called when we got an <iq/> error

        @param failure_(failure.Failure): the exceptions raised
        @param sid(unicode): jingle session id
        """
        log.warning(
            "Error while sending jingle <iq/> stanza: {failure_}".format(
                failure_=failure_.value
            )
        )
        self.delete_session(client, sid)

    def _jingle_error_cb(self, failure_, session, request, client):
        """Called when something is going wrong while parsing jingle request

        The error condition depend of the exceptions raised:
            exceptions.DataError raise a bad-request condition
        @param fail(failure.Failure): the exceptions raised
        @param session(dict): data of the session
        @param request(domsih.Element): jingle request
        @param client: %(doc_client)s
        """
        del session["jingle_elt"]
        log.warning(f"Error while processing jingle request [{client.profile}]")
        if isinstance(failure_.value, defer.FirstError):
            failure_ = failure_.value.subFailure.value
        if isinstance(failure_, exceptions.DataError):
            return self.sendError(client, "bad-request", session["id"], request)
        elif isinstance(failure_, error.StanzaError):
            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
                                  text=str(failure_))
        else:
            log.error(f"Unmanaged jingle exception: {failure_}")
            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
                                  text=str(failure_))

    ## methods used by other plugins ##

    def register_application(
        self,
        namespace: str,
        handler: BaseApplicationHandler
    ) -> None:
        """Register an application plugin

        @param namespace(unicode): application namespace managed by the plugin
        @param handler(object): instance of a class which manage the application.
            May have the following methods:
                - request_confirmation(session, desc_elt, client):
                    - if present, it is called on when session must be accepted.
                    - if it return True the session is accepted, else rejected.
                        A Deferred can be returned
                    - if not present, a generic accept dialog will be used
                - jingle_session_init(
                        client, self, session, content_name[, *args, **kwargs]
                    ): must return the domish.Element used for initial content
                - jingle_handler(
                        client, self, action, session, content_name, transport_elt
                    ):
                    called on several action to negociate the application or transport
                - jingle_terminate: called on session terminate, with reason_elt
                    May be used to clean session
        """
        if namespace in self._applications:
            raise exceptions.ConflictError(
                f"Trying to register already registered namespace {namespace}"
            )
        self._applications[namespace] = ApplicationData(
            namespace=namespace, handler=handler
        )
        log.debug("new jingle application registered")

    def register_transport(
            self,
            namespace: str,
            transport_type: str,
            handler: BaseTransportHandler,
            priority: int = 0
    ) -> None:
        """Register a transport plugin

        @param namespace: the XML namespace used for this transport
        @param transport_type: type of transport to use (see XEP-0166 §8)
        @param handler: instance of a class which manage the application.
        @param priority: priority of this transport
        """
        assert transport_type in (
            XEP_0166.TRANSPORT_DATAGRAM,
            XEP_0166.TRANSPORT_STREAMING,
        )
        if namespace in self._transports:
            raise exceptions.ConflictError(
                "Trying to register already registered namespace {}".format(namespace)
            )
        transport_data = TransportData(
            namespace=namespace, handler=handler, priority=priority
        )
        self._type_transports[transport_type].append(transport_data)
        self._type_transports[transport_type].sort(
            key=lambda transport_data: transport_data.priority, reverse=True
        )
        self._transports[namespace] = transport_data
        log.debug("new jingle transport registered")

    @defer.inlineCallbacks
    def transport_replace(self, client, transport_ns, session, content_name):
        """Replace a transport

        @param transport_ns(unicode): namespace of the new transport to use
        @param session(dict): jingle session data
        @param content_name(unicode): name of the content
        """
        # XXX: for now we replace the transport before receiving confirmation from other peer
        #      this is acceptable because we terminate the session if transport is rejected.
        #      this behavious may change in the future.
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        try:
            transport = self._transports[transport_ns]
        except KeyError:
            raise exceptions.InternalError("Unkown transport")
        yield content_data["transport"].handler.jingle_handler(
            client, XEP_0166.A_DESTROY, session, content_name, None
        )
        content_data["transport"] = transport
        transport_data.clear()

        iq_elt, jingle_elt = self._build_jingle_elt(
            client, session, XEP_0166.A_TRANSPORT_REPLACE
        )
        content_elt = jingle_elt.addElement("content")
        content_elt["name"] = content_name
        content_elt["creator"] = content_data["creator"]

        transport_elt = transport.handler.jingle_session_init(client, session, content_name)
        content_elt.addChild(transport_elt)
        iq_elt.send()

    def build_action(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        iq_elt: Optional[xmlstream.IQ] = None,
        context_elt: Optional[domish.Element] = None
    ) -> Tuple[xmlstream.IQ, domish.Element]:
        """Build an element according to requested action

        @param action: a jingle action (see XEP-0166 §7.2),
            session-* actions are not managed here
            transport-replace is managed in the dedicated [transport_replace] method
        @param session: jingle session data
        @param content_name: name of the content
        @param iq_elt: use this IQ instead of creating a new one if provided
        @param context_elt: use this element instead of creating a new one if provided
        @return: parent <iq> element, <transport> or <description> element, according to action
        """
        # we first build iq, jingle and content element which are the same in every cases
        if iq_elt is not None:
            try:
                jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle"))
            except StopIteration:
                raise exceptions.InternalError(
                    "The <iq> element provided doesn't have a <jingle> element"
                )
        else:
            iq_elt, jingle_elt = self._build_jingle_elt(client, session, action)
        # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked
        content_data = session["contents"][content_name]
        content_elt = jingle_elt.addElement("content")
        content_elt["name"] = content_name
        content_elt["creator"] = content_data["creator"]

        if context_elt is not None:
            if context_elt.parent is None:
                content_elt.addChild(context_elt)
        elif action == XEP_0166.A_TRANSPORT_INFO:
            context_elt = transport_elt = content_elt.addElement(
                "transport", content_data["transport"].namespace
            )
        else:
            raise exceptions.InternalError(f"unmanaged action {action}")

        return iq_elt, context_elt

    def build_session_info(self, client, session):
        """Build a session-info action

        @param session(dict): jingle session data
        @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element
        """
        return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO)

    def get_application(self, namespace: str) -> ApplicationData:
        """Retreive application corresponding to a namespace

        @raise exceptions.NotFound if application can't be found
        """
        try:
            return self._applications[namespace]
        except KeyError:
            raise exceptions.NotFound(
                f"No application registered for {namespace}"
            )

    def get_content_data(self, content: dict) -> ContentData:
        """"Retrieve application and its argument from content"""
        app_ns = content["app_ns"]
        try:
            application = self.get_application(app_ns)
        except exceptions.NotFound as e:
            raise exceptions.InternalError(str(e))
        app_args = content.get("app_args", [])
        app_kwargs = content.get("app_kwargs", {})
        transport_data = content.get("transport_data", {})
        try:
            content_name = content["name"]
        except KeyError:
            content_name = content["name"] = str(uuid.uuid4())
        return ContentData(
            application,
            app_args,
            app_kwargs,
            transport_data,
            content_name
        )

    async def initiate(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        contents: List[dict],
        encrypted: bool = False,
        sid: str|None = None,
        **extra_data: Any
    ) -> str:
        """Send a session initiation request

        @param peer_jid: jid to establith session with
        @param contents: list of contents to use:
            The dict must have the following keys:
                - app_ns(str): namespace of the application
            the following keys are optional:
                - transport_type(str): type of transport to use (see XEP-0166 §8)
                    default to TRANSPORT_STREAMING
                - name(str): name of the content
                - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER,
                    both or none
                    Defaults to BOTH (see XEP-0166 §7.3)
                - app_args(list): args to pass to the application plugin
                - app_kwargs(dict): keyword args to pass to the application plugin
        @param encrypted: if True, session must be encrypted and "encryption" must be set
            to all content data of session
        @param sid: Session ID.
            If None, one will be generated (and used as return value)
        @return: Sesson ID
        """
        assert contents  # there must be at least one content
        if (peer_jid == client.jid
            or client.is_component and peer_jid.host == client.jid.host):
            raise ValueError(_("You can't do a jingle session with yourself"))
        if sid is None:
            sid = str(uuid.uuid4())
        session = self.create_session(
            client, sid, XEP_0166.ROLE_INITIATOR, peer_jid, **extra_data
        )
        initiator = session["initiator"]

        if not await self.host.trigger.async_point(
            "XEP-0166_initiate",
            client, session, contents
        ):
            return sid

        iq_elt, jingle_elt = self._build_jingle_elt(
            client, session, XEP_0166.A_SESSION_INITIATE
        )
        jingle_elt["initiator"] = initiator.full()
        session["jingle_elt"] = jingle_elt

        session_contents = session["contents"]

        for content in contents:
            # we get the application plugin
            content_data = self.get_content_data(content)

            # and the transport plugin
            transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING)
            try:
                transport = self._type_transports[transport_type][0]
            except IndexError:
                raise exceptions.InternalError(
                    "No transport registered for {}".format(transport_type)
                )

            # we build the session data for this content
            application_data = {}
            transport_data = content_data.transport_data
            session_content = {
                "application": content_data.application,
                "application_data": application_data,
                "transport": transport,
                "transport_data": transport_data,
                "creator": XEP_0166.ROLE_INITIATOR,
                "senders": content.get("senders", "both"),
            }
            if content_data.content_name in session_contents:
                raise exceptions.InternalError(
                    "There is already a content with this name"
                )
            session_contents[content_data.content_name] = session_content

            # we construct the content element
            content_elt = jingle_elt.addElement("content")
            content_elt["creator"] = session_content["creator"]
            content_elt["name"] = content_data.content_name
            try:
                content_elt["senders"] = content["senders"]
            except KeyError:
                pass

            # then the description element
            application_data["desc_elt"] = desc_elt = await utils.as_deferred(
                content_data.application.handler.jingle_session_init,
                client, session, content_data.content_name,
                *content_data.app_args, **content_data.app_kwargs
            )
            content_elt.addChild(desc_elt)

            # and the transport one
            transport_data["transport_elt"] = transport_elt = await utils.as_deferred(
                transport.handler.jingle_session_init,
                client, session, content_data.content_name,
            )
            content_elt.addChild(transport_elt)

        if not await self.host.trigger.async_point(
            "XEP-0166_initiate_elt_built",
            client, session, iq_elt, jingle_elt
        ):
            return sid

        # processing is done, we can remove elements
        for content_data in session_contents.values():
            del content_data["application_data"]["desc_elt"]
            del content_data["transport_data"]["transport_elt"]
        del session["jingle_elt"]

        if encrypted:
            for content in session["contents"].values():
                if "encryption" not in content:
                    raise exceptions.EncryptionError(
                        "Encryption is requested, but no encryption has been set"
                    )

        try:
            await iq_elt.send()
        except Exception as e:
            failure_ = failure.Failure(e)
            self._iq_error(failure_, sid, client)
            raise failure_
        return sid

    def delayed_content_terminate(self, *args, **kwargs):
        """Put content_terminate in queue but don't execute immediately

        This is used to terminate a content inside a handler, to avoid modifying contents
        """
        reactor.callLater(0, self.content_terminate, *args, **kwargs)

    def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS):
        """Terminate and remove a content

        if there is no more content, then session is terminated
        @param session(dict): jingle session
        @param content_name(unicode): name of the content terminated
        @param reason(unicode): reason of the termination
        """
        contents = session["contents"]
        del contents[content_name]
        if not contents:
            self.terminate(client, reason, session)

    ## defaults methods called when plugin doesn't have them ##

    def jingle_request_confirmation_default(
        self, client, action, session, content_name, desc_elt
    ):
        """This method request confirmation for a jingle session"""
        log.debug("Using generic jingle confirmation method")
        return xml_tools.defer_confirm(
            self.host,
            _(CONFIRM_TXT).format(entity=session["peer_jid"].full()),
            _("Confirm Jingle session"),
            profile=client.profile,
        )

    ## jingle events ##

    def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None:
        defer.ensureDeferred(self.on_jingle_request(client, request))

    async def on_jingle_request(
        self,
        client: SatXMPPEntity,
        request: domish.Element
    ) -> None:
        """Called when any jingle request is received

        The request will then be dispatched to appropriate method
        according to current state
        @param request(domish.Element): received IQ request
        """
        request.handled = True
        jingle_elt = next(request.elements(NS_JINGLE, "jingle"))

        # first we need the session id
        try:
            sid = jingle_elt["sid"]
            if not sid:
                raise KeyError
        except KeyError:
            log.warning("Received jingle request has no sid attribute")
            self.sendError(client, "bad-request", None, request)
            return

        # then the action
        try:
            action = jingle_elt["action"]
            if not action:
                raise KeyError
        except KeyError:
            log.warning("Received jingle request has no action")
            self.sendError(client, "bad-request", None, request)
            return

        peer_jid = jid.JID(request["from"])

        # we get or create the session
        try:
            session = client.jingle_sessions[sid]
        except KeyError:
            if action == XEP_0166.A_SESSION_INITIATE:
                pass
            elif action == XEP_0166.A_SESSION_TERMINATE:
                log.debug(
                    "ignoring session terminate action (inexisting session id): "
                    "{request_id} [{profile}]".format(
                        request_id=sid, profile=client.profile
                    )
                )
                return
            else:
                log.warning(
                    "Received request for an unknown session id: {request_id} [{profile}]".format(
                        request_id=sid, profile=client.profile
                    )
                )
                self.sendError(client, "item-not-found", None, request, "unknown-session")
                return

            try:
                # session may have been already created in a jingle_preflight, in this
                # case we re-use it.
                session = self.get_session(client, sid)
            except exceptions.NotFound:
                # XXX: we store local_jid using request['to'] because for a component the
                # jid used may not be client.jid (if a local part is used).
                session = self.create_session(
                    client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request['to'])
                )
        else:
            if session["peer_jid"] != peer_jid:
                log.warning(
                    "sid conflict ({}), the jid doesn't match. Can be a collision, a "
                    "hack attempt, or a bad sid generation".format(
                        sid
                    )
                )
                self.sendError(client, "service-unavailable", sid, request)
                return
            if session["id"] != sid:
                log.error("session id doesn't match")
                self.sendError(client, "service-unavailable", sid, request)
                raise exceptions.InternalError

        if action == XEP_0166.A_SESSION_INITIATE:
            await self.on_session_initiate(client, request, jingle_elt, session)
        elif action == XEP_0166.A_SESSION_TERMINATE:
            await self.on_session_terminate(client, request, jingle_elt, session)
        elif action == XEP_0166.A_SESSION_ACCEPT:
            await self.on_session_accept(client, request, jingle_elt, session)
        elif action == XEP_0166.A_SESSION_INFO:
            self.on_session_info(client, request, jingle_elt, session)
        elif action == XEP_0166.A_TRANSPORT_INFO:
            self.on_transport_info(client, request, jingle_elt, session)
        elif action == XEP_0166.A_TRANSPORT_REPLACE:
            await self.on_transport_replace(client, request, jingle_elt, session)
        elif action == XEP_0166.A_TRANSPORT_ACCEPT:
            self.on_transport_accept(client, request, jingle_elt, session)
        elif action == XEP_0166.A_TRANSPORT_REJECT:
            self.on_transport_reject(client, request, jingle_elt, session)
        else:
            raise exceptions.InternalError(f"Unknown action {action}")

    ## Actions callbacks ##

    def _parse_elements(
        self,
        jingle_elt: domish.Element,
        session: dict,
        request: domish.Element,
        client: SatXMPPEntity,
        new: bool = False,
        creator: str = ROLE_INITIATOR,
        with_application: bool =True,
        with_transport: bool = True,
        store_in_session: bool = True,
    ) -> Dict[str, dict]:
        """Parse contents elements and fill contents_dict accordingly

        after the parsing, contents_dict will containt handlers, "desc_elt" and
        "transport_elt"
        @param jingle_elt: parent <jingle> element, containing one or more <content>
        @param session: session data
        @param request: the whole request
        @param client: %(doc_client)s
        @param new: True if the content is new and must be created,
            else the content must exists, and session data will be filled
        @param creator: only used if new is True: creating pear (see § 7.3)
        @param with_application: if True, raise an error if there is no <description>
            element else ignore it
        @param with_transport: if True, raise an error if there is no <transport> element
            else ignore it
        @param store_in_session: if True, the ``session`` contents will be updated with
        the parsed elements.
            Use False when you parse an action which can happen at any time (e.g.
            transport-info) and meaning that a parsed element may already be present in
            the session (e.g. if an authorisation request is waiting for user answer),
            This can't be used when ``new`` is set.
        @return: contents_dict (from session, or a new one if "store_in_session" is False)
        @raise exceptions.CancelError: the error is treated and the calling method can
            cancel the treatment (i.e. return)
        """
        if store_in_session:
            contents_dict = session["contents"]
        else:
            if new:
                raise exceptions.InternalError(
                    '"store_in_session" must not be used when "new" is set'
                )
            contents_dict = {n: {} for n in session["contents"]}
        content_elts = jingle_elt.elements(NS_JINGLE, "content")

        for content_elt in content_elts:
            name = content_elt["name"]

            if new:
                # the content must not exist, we check it
                if not name or name in contents_dict:
                    self.sendError(client, "bad-request", session["id"], request)
                    raise exceptions.CancelError
                content_data = contents_dict[name] = {
                    "creator": creator,
                    "senders": content_elt.attributes.get("senders", "both"),
                }
            else:
                # the content must exist, we check it
                try:
                    content_data = contents_dict[name]
                except KeyError:
                    log.warning("Other peer try to access an unknown content")
                    self.sendError(client, "bad-request", session["id"], request)
                    raise exceptions.CancelError

            # application
            if with_application:
                desc_elt = content_elt.description
                if not desc_elt:
                    self.sendError(client, "bad-request", session["id"], request)
                    raise exceptions.CancelError

                if new:
                    # the content is new, we need to check and link the application
                    app_ns = desc_elt.uri
                    if not app_ns or app_ns == NS_JINGLE:
                        self.sendError(client, "bad-request", session["id"], request)
                        raise exceptions.CancelError

                    try:
                        application = self._applications[app_ns]
                    except KeyError:
                        log.warning(
                            "Unmanaged application namespace [{}]".format(app_ns)
                        )
                        self.sendError(
                            client, "service-unavailable", session["id"], request
                        )
                        raise exceptions.CancelError

                    content_data["application"] = application
                    content_data["application_data"] = {}
                else:
                    # the content exists, we check that we have not a former desc_elt
                    if "desc_elt" in content_data:
                        raise exceptions.InternalError(
                            "desc_elt should not exist at this point"
                        )

                content_data["desc_elt"] = desc_elt

            # transport
            if with_transport:
                transport_elt = content_elt.transport
                if not transport_elt:
                    self.sendError(client, "bad-request", session["id"], request)
                    raise exceptions.CancelError

                if new:
                    # the content is new, we need to check and link the transport
                    transport_ns = transport_elt.uri
                    if not app_ns or app_ns == NS_JINGLE:
                        self.sendError(client, "bad-request", session["id"], request)
                        raise exceptions.CancelError

                    try:
                        transport = self._transports[transport_ns]
                    except KeyError:
                        raise exceptions.InternalError(
                            "No transport registered for namespace {}".format(
                                transport_ns
                            )
                        )
                    content_data["transport"] = transport
                    content_data["transport_data"] = {}
                else:
                    # the content exists, we check that we have not a former transport_elt
                    if "transport_elt" in content_data:
                        raise exceptions.InternalError(
                            "transport_elt should not exist at this point"
                        )

                content_data["transport_elt"] = transport_elt

        return contents_dict

    def _ignore(self, client, action, session, content_name, elt):
        """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins

        must be used as app_default_cb and/or transp_default_cb
        """
        return elt

    def _call_plugins(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        app_method_name: Optional[str] = "jingle_handler",
        transp_method_name: Optional[str] = "jingle_handler",
        app_default_cb: Optional[Callable] = None,
        transp_default_cb: Optional[Callable] = None,
        delete: bool = True,
        elements: bool = True,
        force_element: Optional[domish.Element] = None
    ) -> List[defer.Deferred]:
        """Call application and transport plugin methods for all contents

        @param action: jingle action name
        @param session: jingle session data
        @param app_method_name: name of the method to call for applications
            None to ignore
        @param transp_method_name: name of the method to call for transports
            None to ignore
        @param app_default_cb: default callback to use if plugin has not app_method_name
            None to raise an exception instead
        @param transp_default_cb: default callback to use if plugin has not transp_method_name
            None to raise an exception instead
        @param delete: if True, remove desc_elt and transport_elt from session
            ignored if elements is False
        @param elements: True if elements(desc_elt and tranport_elt) must be managed
            must be True if _call_plugins is used in a request, and False if it is used
            after a request (i.e. on <iq> result or error)
        @param force_element: if elements is False, it is used as element parameter
            else it is ignored
        @return : list of launched Deferred
        @raise exceptions.NotFound: method is not implemented
        """
        contents_dict = session["contents"]
        defers_list = []
        for content_name, content_data in contents_dict.items():
            for method_name, handler_key, default_cb, elt_name in (
                (app_method_name, "application", app_default_cb, "desc_elt"),
                (transp_method_name, "transport", transp_default_cb, "transport_elt"),
            ):
                if method_name is None:
                    continue

                handler = content_data[handler_key].handler
                try:
                    method = getattr(handler, method_name)
                except AttributeError:
                    if default_cb is None:
                        raise exceptions.NotFound(
                            "{} not implemented !".format(method_name)
                        )
                    else:
                        method = default_cb
                if elements:
                    elt = content_data.pop(elt_name) if delete else content_data[elt_name]
                else:
                    elt = force_element
                d = utils.as_deferred(
                    method, client, action, session, content_name, elt
                )
                defers_list.append(d)

        return defers_list

    async def on_session_initiate(
        self,
        client: SatXMPPEntity,
        request: domish.Element,
        jingle_elt: domish.Element,
        session: Dict[str, Any]
    ) -> None:
        """Called on session-initiate action

        The "jingle_request_confirmation" method of each application will be called
        (or self.jingle_request_confirmation_default if the former doesn't exist).
        The session is only accepted if all application are confirmed.
        The application must manage itself multiple contents scenari (e.g. audio/video).
        @param client: %(doc_client)s
        @param request(domish.Element): full request
        @param jingle_elt(domish.Element): <jingle> element
        @param session(dict): session data
        """
        contents_dict = session["contents"]
        if contents_dict:
            raise exceptions.InternalError(
                "Contents dict should not already be set at this point"
            )

        try:
            self._parse_elements(
                jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR
            )
        except exceptions.CancelError:
            return

        if not contents_dict:
            # there MUST be at least one content
            self.sendError(client, "bad-request", session["id"], request)
            return

        # at this point we can send the <iq/> result to confirm reception of the request
        client.send(xmlstream.toResponse(request, "result"))


        assert "jingle_elt" not in session
        session["jingle_elt"] = jingle_elt
        if not await self.host.trigger.async_point(
            "XEP-0166_on_session_initiate",
            client, session, request, jingle_elt
        ):
            return

        await defer.DeferredList(self._call_plugins(
            client,
            XEP_0166.A_PREPARE_CONFIRMATION,
            session,
            delete=False
        ))

        # we now request each application plugin confirmation
        # and if all are accepted, we can accept the session
        confirm_defers = self._call_plugins(
            client,
            XEP_0166.A_SESSION_INITIATE,
            session,
            "jingle_request_confirmation",
            None,
            self.jingle_request_confirmation_default,
            delete=False,
        )

        confirm_dlist = defer.gatherResults(confirm_defers)
        confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client)
        confirm_dlist.addErrback(self._jingle_error_cb, session, request, client)

    def _confirmation_cb(self, confirm_results, session, jingle_elt, client):
        """Method called when confirmation from user has been received

        This method is only called for the responder
        @param confirm_results(list[bool]): all True if session is accepted
        @param session(dict): session data
        @param jingle_elt(domish.Element): jingle data of this session
        @param client: %(doc_client)s
        """
        del session["jingle_elt"]
        confirmed = all(confirm_results)
        if not confirmed:
            return self.terminate(client, XEP_0166.REASON_DECLINE, session)

        iq_elt, jingle_elt = self._build_jingle_elt(
            client, session, XEP_0166.A_SESSION_ACCEPT
        )
        jingle_elt["responder"] = session['local_jid'].full()
        session["jingle_elt"] = jingle_elt

        # contents

        def addElement(domish_elt, content_elt):
            content_elt.addChild(domish_elt)

        defers_list = []

        for content_name, content_data in session["contents"].items():
            content_elt = jingle_elt.addElement("content")
            content_elt["creator"] = XEP_0166.ROLE_INITIATOR
            content_elt["name"] = content_name

            application = content_data["application"]
            app_session_accept_cb = application.handler.jingle_handler

            app_d = utils.as_deferred(
                app_session_accept_cb,
                client,
                XEP_0166.A_SESSION_INITIATE,
                session,
                content_name,
                content_data.pop("desc_elt"),
            )
            app_d.addCallback(addElement, content_elt)
            defers_list.append(app_d)

            transport = content_data["transport"]
            transport_session_accept_cb = transport.handler.jingle_handler

            transport_d = utils.as_deferred(
                transport_session_accept_cb,
                client,
                XEP_0166.A_SESSION_INITIATE,
                session,
                content_name,
                content_data.pop("transport_elt"),
            )
            transport_d.addCallback(addElement, content_elt)
            defers_list.append(transport_d)

        d_list = defer.DeferredList(defers_list)
        d_list.addCallback(
            lambda __: self._call_plugins(
                client,
                XEP_0166.A_PREPARE_RESPONDER,
                session,
                app_method_name=None,
                elements=False,
            )
        )
        d_list.addCallback(lambda __: session.pop("jingle_elt"))
        d_list.addCallback(lambda __: iq_elt.send())

        def change_state(__, session):
            session["state"] = XEP_0166.STATE_ACTIVE

        d_list.addCallback(change_state, session)
        d_list.addCallback(
            lambda __: self._call_plugins(
                client, XEP_0166.A_ACCEPTED_ACK, session, elements=False
            )
        )
        d_list.addErrback(self._iq_error, session["id"], client)
        return d_list

    def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element:
        """Find a <reason> element in parent_elt

        if none is found, add an empty one to the element
        @return: the <reason> element
        """
        try:
            return next(parent_elt.elements(NS_JINGLE, "reason"))
        except StopIteration:
            log.warning("No reason given for session termination")
            reason_elt = parent_elt.addElement("reason")
            return reason_elt

    def parse_reason_elt(self, reason_elt: domish.Element) -> tuple[str|None, str|None]:
        """Parse a <reason> element

        @return: reason found, and text if any
        """
        reason, text = None, None
        for elt in reason_elt.elements():
            if elt.uri == NS_JINGLE:
                if elt.name == "text":
                    text = str(elt)
                else:
                    reason = elt.name

        if reason is None:
            log.debug("no reason specified,")

        return reason, text

    async def on_session_terminate(
        self,
        client: SatXMPPEntity,
        request: domish.Element,
        jingle_elt: domish.Element,
        session: dict
    ) -> None:
        # TODO: check reason, display a message to user if needed
        log.debug(f"Jingle Session {session['id']} terminated")
        reason_elt = self.get_reason_elt(jingle_elt)

        terminate_defers = self._call_plugins(
            client,
            XEP_0166.A_SESSION_TERMINATE,
            session,
            "jingle_terminate",
            "jingle_terminate",
            self._ignore,
            self._ignore,
            elements=False,
            force_element=reason_elt,
        )
        terminate_dlist = defer.DeferredList(terminate_defers)

        terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"]))
        client.send(xmlstream.toResponse(request, "result"))

    async def on_session_accept(self, client, request, jingle_elt, session):
        """Method called once session is accepted

        This method is only called for initiator
        @param client: %(doc_client)s
        @param request(domish.Element): full <iq> request
        @param jingle_elt(domish.Element): the <jingle> element
        @param session(dict): session data
        """
        log.debug(f"Jingle session {session['id']} has been accepted")

        try:
            self._parse_elements(jingle_elt, session, request, client)
        except exceptions.CancelError:
            return

        # at this point we can send the <iq/> result to confirm reception of the request
        client.send(xmlstream.toResponse(request, "result"))
        # and change the state
        session["state"] = XEP_0166.STATE_ACTIVE
        session["jingle_elt"] = jingle_elt

        await defer.DeferredList(self._call_plugins(
            client,
            XEP_0166.A_PREPARE_INITIATOR,
            session,
            delete=False
        ))

        negociate_defers = []
        negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)

        negociate_dlist = defer.gatherResults(negociate_defers)

        # after negociations we start the transfer
        negociate_dlist.addCallback(
            lambda __: self._call_plugins(
                client, XEP_0166.A_START, session, app_method_name=None, elements=False
            )
        )
        negociate_dlist.addCallback(lambda __: session.pop("jingle_elt"))

    def _on_session_cb(self, result, client, request, jingle_elt, session):
        client.send(xmlstream.toResponse(request, "result"))

    def _on_session_eb(self, failure_, client, request, jingle_elt, session):
        log.error("Error while handling on_session_info: {}".format(failure_.value))
        # XXX: only error managed so far, maybe some applications/transports need more
        self.sendError(
            client, "feature-not-implemented", None, request, "unsupported-info"
        )

    def on_session_info(self, client, request, jingle_elt, session):
        """Method called when a session-info action is received from other peer

        This method is only called for initiator
        @param client: %(doc_client)s
        @param request(domish.Element): full <iq> request
        @param jingle_elt(domish.Element): the <jingle> element
        @param session(dict): session data
        """
        if not jingle_elt.children:
            # this is a session ping, see XEP-0166 §6.8
            client.send(xmlstream.toResponse(request, "result"))
            return

        try:
            # XXX: session-info is most likely only used for application, so we don't call transport plugins
            #      if a future transport use it, this behaviour must be adapted
            defers = self._call_plugins(
                client,
                XEP_0166.A_SESSION_INFO,
                session,
                "jingle_session_info",
                None,
                elements=False,
                force_element=jingle_elt,
            )
        except exceptions.NotFound as e:
            self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session)
            return

        dlist = defer.DeferredList(defers, fireOnOneErrback=True)
        dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session)
        dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session)

    async def on_transport_replace(self, client, request, jingle_elt, session):
        """A transport change is requested

        The request is parsed, and jingle_handler is called on concerned transport plugin(s)
        @param client: %(doc_client)s
        @param request(domish.Element): full <iq> request
        @param jingle_elt(domish.Element): the <jingle> element
        @param session(dict): session data
        """
        log.debug("Other peer wants to replace the transport")
        try:
            self._parse_elements(
                jingle_elt, session, request, client, with_application=False
            )
        except exceptions.CancelError:
            defer.returnValue(None)

        client.send(xmlstream.toResponse(request, "result"))

        content_name = None
        to_replace = []

        for content_name, content_data in session["contents"].items():
            try:
                transport_elt = content_data.pop("transport_elt")
            except KeyError:
                continue
            transport_ns = transport_elt.uri
            try:
                transport = self._transports[transport_ns]
            except KeyError:
                log.warning(
                    "Other peer want to replace current transport with an unknown one: {}".format(
                        transport_ns
                    )
                )
                content_name = None
                break
            to_replace.append((content_name, content_data, transport, transport_elt))

        if content_name is None:
            # wa can't accept the replacement
            iq_elt, reject_jingle_elt = self._build_jingle_elt(
                client, session, XEP_0166.A_TRANSPORT_REJECT
            )
            for child in jingle_elt.children:
                reject_jingle_elt.addChild(child)

            iq_elt.send()
            defer.returnValue(None)

        # at this point, everything is alright and we can replace the transport(s)
        # this is similar to an session-accept action, but for transports only
        iq_elt, accept_jingle_elt = self._build_jingle_elt(
            client, session, XEP_0166.A_TRANSPORT_ACCEPT
        )
        for content_name, content_data, transport, transport_elt in to_replace:
            # we can now actually replace the transport
            await utils.as_deferred(
                content_data["transport"].handler.jingle_handler,
                client, XEP_0166.A_DESTROY, session, content_name, None
            )
            content_data["transport"] = transport
            content_data["transport_data"].clear()
            # and build the element
            content_elt = accept_jingle_elt.addElement("content")
            content_elt["name"] = content_name
            content_elt["creator"] = content_data["creator"]
            # we notify the transport and insert its <transport/> in the answer
            accept_transport_elt = await utils.as_deferred(
                transport.handler.jingle_handler,
                client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt
            )
            content_elt.addChild(accept_transport_elt)
            # there is no confirmation needed here, so we can directly prepare it
            await utils.as_deferred(
                transport.handler.jingle_handler,
                client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None
            )

        iq_elt.send()

    def on_transport_accept(self, client, request, jingle_elt, session):
        """Method called once transport replacement is accepted

        @param client: %(doc_client)s
        @param request(domish.Element): full <iq> request
        @param jingle_elt(domish.Element): the <jingle> element
        @param session(dict): session data
        """
        log.debug("new transport has been accepted")

        try:
            self._parse_elements(
                jingle_elt, session, request, client, with_application=False
            )
        except exceptions.CancelError:
            return

        # at this point we can send the <iq/> result to confirm reception of the request
        client.send(xmlstream.toResponse(request, "result"))

        negociate_defers = []
        negociate_defers = self._call_plugins(
            client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None
        )

        negociate_dlist = defer.DeferredList(negociate_defers)

        # after negociations we start the transfer
        negociate_dlist.addCallback(
            lambda __: self._call_plugins(
                client, XEP_0166.A_START, session, app_method_name=None, elements=False
            )
        )

    def on_transport_reject(self, client, request, jingle_elt, session):
        """Method called when a transport replacement is refused

        @param client: %(doc_client)s
        @param request(domish.Element): full <iq> request
        @param jingle_elt(domish.Element): the <jingle> element
        @param session(dict): session data
        """
        # XXX: for now, we terminate the session in case of transport-reject
        #      this behaviour may change in the future
        self.terminate(client, "failed-transport", session)

    def on_transport_info(
        self,
        client: SatXMPPEntity,
        request: domish.Element,
        jingle_elt: domish.Element,
        session: dict
    ) -> None:
        """Method called when a transport-info action is received from other peer

        The request is parsed, and jingle_handler is called on concerned transport
        plugin(s)
        @param client: %(doc_client)s
        @param request: full <iq> request
        @param jingle_elt: the <jingle> element
        @param session: session data
        """
        log.debug(f"Jingle session {session['id']} has been accepted")

        try:
            parsed_contents = self._parse_elements(
                jingle_elt, session, request, client, with_application=False,
                store_in_session=False
            )
        except exceptions.CancelError:
            return

        # The parsing was OK, we send the <iq> result
        client.send(xmlstream.toResponse(request, "result"))

        for content_name, content_data in session["contents"].items():
            try:
                transport_elt = parsed_contents[content_name]["transport_elt"]
            except KeyError:
                continue
            else:
                utils.as_deferred(
                    content_data["transport"].handler.jingle_handler,
                    client,
                    XEP_0166.A_TRANSPORT_INFO,
                    session,
                    content_name,
                    transport_elt,
                )


@implementer(iwokkel.IDisco)
class XEP_0166_handler(xmlstream.XMPPHandler):

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        self.xmlstream.addObserver(
            JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent
        )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []