view libervia/backend/plugins/plugin_xep_0353.py @ 4336:6e0918e638ee

plugin XEP-0498: "Pubsub File Sharing" implementation: Partial implementation of XEP-0498, necessary to implement the service part in email gateway. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Jingle Message Initiation (XEP-0353)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import cast
from twisted.internet import defer
from twisted.internet import reactor
from twisted.words.protocols.jabber import error, jid
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from libervia.backend.plugins.plugin_xep_0166.models import ApplicationData
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.xml_tools import element_copy

try:
    from .plugin_xep_0167 import NS_JINGLE_RTP
except ImportError:
    NS_JINGLE_RTP = None

log = getLogger(__name__)


NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle Message Initiation",
    C.PI_IMPORT_NAME: "XEP-0353",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0353"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"],
    C.PI_MAIN: "XEP_0353",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""),
}


class RejectException(exceptions.CancelError):

    def __init__(self, reason: str, text: str | None = None):
        super().__init__(text)
        self.reason = reason


class TakenByOtherDeviceException(exceptions.CancelError):
    reason: str = "taken_by_other_device"

    def __init__(self, device_jid: jid.JID):
        super().__init__(device_jid.full())
        self.device_jid = device_jid


class RetractException(exceptions.CancelError):
    pass


class XEP_0353:
    def __init__(self, host):
        log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME]))
        self.host = host
        host.register_namespace("jingle-message", NS_JINGLE_MESSAGE)
        self._j = host.plugins["XEP-0166"]
        self._h = host.plugins["XEP-0334"]
        host.trigger.add_with_check(
            "XEP-0166_initiate_elt_built",
            self,
            self._on_initiate_trigger,
            # this plugin set the resource, we want it to happen first so other triggers
            # can get the full peer JID
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add_with_check(
            "XEP-0166_terminate",
            self,
            self._terminate_trigger,
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add("message_received", self._on_message_received)

    def get_handler(self, client):
        return Handler()

    def profile_connecting(self, client):
        # mapping from session id to deferred used to wait for destinee answer
        client._xep_0353_pending_sessions = {}

    def build_message_data(self, client, peer_jid, verb, session_id):
        mess_data = {
            "from": client.jid,
            "to": peer_jid,
            "uid": "",
            "message": {},
            "type": C.MESS_TYPE_CHAT,
            "subject": {},
            "extra": {},
        }
        client.generate_message_xml(mess_data)
        message_elt = mess_data["xml"]
        verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb))
        verb_elt["id"] = session_id
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        return mess_data

    async def _on_initiate_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        iq_elt: domish.Element,
        jingle_elt: domish.Element,
    ) -> bool:
        peer_jid = session["peer_jid"]
        if peer_jid.resource:
            return True

        try:
            infos = await self.host.memory.disco.get_infos(client, peer_jid)
        except error.StanzaError as e:
            if e.condition == "service-unavailable":
                categories = {}
            else:
                raise e
        else:
            categories = {c for c, __ in infos.identities}
        if "component" in categories:
            # we don't use message initiation with components
            return True

        if peer_jid.userhostJID() not in client.roster:
            # if the contact is not in our roster, we need to send a directed presence
            # according to XEP-0353 §3.1
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "propose", session["id"])
        message_elt = mess_data["xml"]
        for content_data in session["contents"].values():
            # we get the full element build by the application plugin
            jingle_description_elt = content_data["application_data"]["desc_elt"]

            # we need to copy the element
            if jingle_description_elt.uri == NS_JINGLE_RTP:
                # for RTP, we only keep the root <description> element, no children
                description_elt = domish.Element(
                    (jingle_description_elt.uri, jingle_description_elt.name),
                    defaultUri=jingle_description_elt.defaultUri,
                    attribs=jingle_description_elt.attributes,
                    localPrefixes=jingle_description_elt.localPrefixes,
                )
            else:
                # Otherwise we keep the children to have application useful data
                description_elt = element_copy(jingle_description_elt, with_parent=False)

            message_elt.propose.addChild(description_elt)
        response_d = defer.Deferred()
        # we wait for 2 min before cancelling the session init
        # FIXME: let's application decide timeout?
        response_d.addTimeout(2 * 60, reactor)
        client._xep_0353_pending_sessions[session["id"]] = response_d
        try:
            await client.send_message_data(mess_data)
            accepting_jid = await response_d
        except defer.TimeoutError:
            log.warning(
                _("Message initiation with {peer_jid} timed out").format(
                    peer_jid=peer_jid
                )
            )
        except exceptions.CancelError as e:
            for content in session["contents"].values():
                await content["application"].handler.jingle_preflight_cancel(
                    client, session, e
                )

            self._j.delete_session(client, session["id"])
            return False
        else:
            if iq_elt["to"] != accepting_jid.userhost():
                raise exceptions.InternalError(
                    f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ "
                    f"from bare JID of the accepting entity ({accepting_jid!r}), this "
                    "may be a sign of an internal bug, a hack attempt, or a MITM attack!"
                )
            iq_elt["to"] = accepting_jid.full()
            session["peer_jid"] = accepting_jid
        finally:
            del client._xep_0353_pending_sessions[session["id"]]
        return True

    def _terminate_trigger(
        self, client: SatXMPPEntity, session: dict, reason_elt: domish.Element
    ) -> bool:
        session_id = session["id"]
        try:
            response_d = client._xep_0353_pending_sessions[session_id]
        except KeyError:
            return True
        # we have a XEP-0353 session, that means that we are retracting a proposed session
        mess_data = self.build_message_data(
            client, session["peer_jid"], "retract", session_id
        )
        defer.ensureDeferred(client.send_message_data(mess_data))
        response_d.errback(RetractException())

        return False

    async def _on_message_received(self, client, message_elt, post_treat):
        for elt in message_elt.elements():
            if elt.uri == NS_JINGLE_MESSAGE:
                # We use ensureDeferred to process the message initiation workflow in
                # parallel and to avoid blocking the message queue.
                defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt))
                return False
        return True

    async def _handle_mess_init(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        mess_init_elt: domish.Element,
    ) -> None:
        if mess_init_elt.name == "propose":
            await self._handle_propose(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "retract":
            self._handle_retract(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "proceed":
            self._handle_proceed(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "accept":
            self._handle_accept(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "reject":
            self._handle_reject(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "ringing":
            await self._handle_ringing(client, message_elt, mess_init_elt)
        else:
            log.warning(f"invalid element: {mess_init_elt.toXml}")

    def _get_sid_and_session_d(
        self, client: SatXMPPEntity, elt: domish.Element
    ) -> tuple[str, defer.Deferred | list[defer.Deferred]]:
        """Retrieve session ID and deferred or list of deferred from response element"""
        try:
            session_id = elt["id"]
        except KeyError as e:
            assert elt.parent is not None
            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
            raise e
        try:
            session_d = client._xep_0353_pending_sessions[session_id]
        except KeyError as e:
            log.warning(
                _(
                    "no pending session found with id {session_id}, did it timed out?"
                ).format(session_id=session_id)
            )
            raise e
        return session_id, session_d

    def _get_sid_and_response_d(
        self, client: SatXMPPEntity, elt: domish.Element
    ) -> tuple[str, defer.Deferred]:
        """Retrieve session ID and response_d from response element"""
        session_id, response_d = self._get_sid_and_session_d(client, elt)
        assert isinstance(response_d, defer.Deferred)
        return session_id, response_d

    def _get_sid_and_preflight_d_list(
        self, client: SatXMPPEntity, elt: domish.Element
    ) -> tuple[str, list[defer.Deferred]]:
        """Retrieve session ID and list of preflight_d from response element"""
        session_id, preflight_d_list = self._get_sid_and_session_d(client, elt)
        assert isinstance(preflight_d_list, list)
        return session_id, preflight_d_list

    async def _handle_propose(
        self, client: SatXMPPEntity, message_elt: domish.Element, elt: domish.Element
    ) -> None:
        peer_jid = jid.JID(message_elt["from"])
        local_jid = jid.JID(message_elt["to"])
        session_id = elt["id"]
        try:
            desc_and_apps = [
                (description_elt, self._j.get_application(description_elt.uri))
                for description_elt in elt.elements()
                if description_elt.name == "description"
            ]
            if not desc_and_apps:
                raise AttributeError
        except AttributeError:
            log.warning(f"Invalid propose element: {message_elt.toXml()}")
            return
        except exceptions.NotFound:
            log.warning(
                f"There is not registered application to handle this "
                f"proposal: {elt.toXml()}"
            )
            return

        if not desc_and_apps:
            log.warning("No application specified: {message_elt.toXml()}")
            return

        cast(list[tuple[domish.Element, ApplicationData]], desc_and_apps)
        desc_and_apps.sort(
            key=lambda desc_and_app: desc_and_app[1].priority, reverse=True
        )

        session = self._j.create_session(
            client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
        )

        is_in_roster = peer_jid.userhostJID() in client.roster
        if is_in_roster:
            # we indicate that device is ringing as explained in
            # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user
            # is in roster to avoid presence leak of all our devices.
            mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
            await client.send_message_data(mess_data)

        try:
            for description_elt, application in desc_and_apps:
                try:
                    preflight_d = defer.ensureDeferred(
                        application.handler.jingle_preflight(
                            client, session, description_elt
                        )
                    )
                    client._xep_0353_pending_sessions.setdefault(session_id, []).append(
                        preflight_d
                    )
                    await preflight_d
                except TakenByOtherDeviceException as e:
                    log.info(f"The call has been takend by {e.device_jid}")
                    await application.handler.jingle_preflight_cancel(client, session, e)
                    self._j.delete_session(client, session_id)
                    return
                except exceptions.CancelError as e:
                    log.info(f"{client.profile} refused the session: {e}")

                    if is_in_roster:
                        # peer is in our roster, we send reject to them, ou other devices
                        # will get carbon copies
                        reject_dest_jid = peer_jid
                    else:
                        # peer is not in our roster, we send the "reject" only to our own
                        # devices to make them stop ringing/doing notification, and we
                        # don't send anything to peer to avoid presence leak.
                        reject_dest_jid = client.jid.userhostJID()

                    mess_data = self.build_message_data(
                        client, reject_dest_jid, "reject", session_id
                    )
                    await client.send_message_data(mess_data)
                    self._j.delete_session(client, session_id)
                    return
                except defer.CancelledError:
                    # raised when call is retracted before user can reply
                    self._j.delete_session(client, session_id)
                    return
        finally:
            try:
                del client._xep_0353_pending_sessions[session_id]
            except KeyError:
                pass

        if peer_jid.userhostJID() not in client.roster:
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
        await client.send_message_data(mess_data)

    def _handle_retract(self, client, message_elt, retract_elt):
        try:
            session = self._j.get_session(client, retract_elt["id"])
        except KeyError:
            log.warning(f"invalid retract element: {message_elt.toXml()}")
            return False
        except exceptions.NotFound:
            log.warning(f"no session found with ID {retract_elt['id']}")
            return False
        log.debug(
            f"{message_elt['from']} are retracting their proposal {retract_elt['id']}"
        )
        try:
            cancellable_deferred = session["cancellable_deferred"]
            if not cancellable_deferred:
                raise KeyError
        except KeyError:
            self._j.delete_session(client, session["id"])
        else:
            for d in cancellable_deferred:
                d.cancel()
        return False

    def _handle_proceed(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        proceed_elt: domish.Element,
    ) -> None:
        from_jid = jid.JID(message_elt["from"])
        # session_d is the deferred of the session, it can be preflight_d or response_d
        if from_jid.userhostJID() == client.jid.userhostJID():
            # an other device took the session
            try:
                sid, preflight_d_list = self._get_sid_and_preflight_d_list(
                    client, proceed_elt
                )
            except KeyError:
                return
            for preflight_d in preflight_d_list:
                if not preflight_d.called:
                    preflight_d.errback(TakenByOtherDeviceException(from_jid))

                try:
                    session = self._j.get_session(client, sid)
                except exceptions.NotFound:
                    log.warning("No session found with sid {sid!r}.")
                else:
                    # jingle_preflight_cancel?
                    pass

            # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always
            #   cleaned correctly (use a timeout?)

        else:
            try:
                __, response_d = self._get_sid_and_response_d(client, proceed_elt)
            except KeyError:
                return
            # we have a response deferred
            response_d.callback(jid.JID(message_elt["from"]))

    def _handle_accept(self, client, message_elt, accept_elt):
        pass

    def _handle_reject(self, client, message_elt, reject_elt):
        try:
            __, response_d = self._get_sid_and_response_d(client, reject_elt)
        except KeyError:
            return True
        reason_elt = self._j.get_reason_elt(reject_elt)
        reason, text = self._j.parse_reason_elt(reason_elt)
        if reason is None:
            reason = "busy"

        response_d.errback(RejectException(reason, text))
        return False

    async def _handle_ringing(self, client, message_elt, ringing_elt):
        session_id = ringing_elt["id"]
        try:
            session = self._j.get_session(client, session_id)
        except exceptions.NotFound:
            log.warning(f"Session {session_id!r} unknown, ignoring ringing.")
            return False
        for __, content_data in session["contents"].items():
            await content_data["application"].handler.jingle_preflight_info(
                client, session, "ringing", None
            )

        return False


@implementer(iwokkel.IDisco)
class Handler(xmlstream.XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_MESSAGE)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []