view libervia/backend/plugins/plugin_xep_0353.py @ 4161:2074b2bbe616

core (memory/sqla_mapping): `delete-orphan` in History: add `cascade=delete-orphan` to History's `messages`, `subjects` and `thread`, to make modification of those attributes easier.
author Goffi <goffi@goffi.org>
date Tue, 28 Nov 2023 17:35:20 +0100
parents 0da563780ffc
children 6784d07b99c8
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Jingle Message Initiation (XEP-0353)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.internet import defer
from twisted.internet import reactor
from twisted.words.protocols.jabber import error, jid
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)


NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle Message Initiation",
    C.PI_IMPORT_NAME: "XEP-0353",
    C.PI_TYPE: "XEP",
    C.PI_MODES: [C.PLUG_MODE_CLIENT],
    C.PI_PROTOCOLS: ["XEP-0353"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"],
    C.PI_MAIN: "XEP_0353",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""),
}


class RejectException(exceptions.CancelError):

    def __init__(self, reason: str, text: str|None = None):
        super().__init__(text)
        self.reason = reason


class RetractException(exceptions.CancelError):
    pass


class XEP_0353:
    def __init__(self, host):
        log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME]))
        self.host = host
        host.register_namespace("jingle-message", NS_JINGLE_MESSAGE)
        self._j = host.plugins["XEP-0166"]
        self._h = host.plugins["XEP-0334"]
        host.trigger.add(
            "XEP-0166_initiate_elt_built",
            self._on_initiate_trigger,
            # this plugin set the resource, we want it to happen first to other trigger
            # can get the full peer JID
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add(
            "XEP-0166_terminate",
            self._terminate_trigger,
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add("message_received", self._on_message_received)

    def get_handler(self, client):
        return Handler()

    def profile_connecting(self, client):
        # mapping from session id to deferred used to wait for destinee answer
        client._xep_0353_pending_sessions = {}

    def build_message_data(self, client, peer_jid, verb, session_id):
        mess_data = {
            "from": client.jid,
            "to": peer_jid,
            "uid": "",
            "message": {},
            "type": C.MESS_TYPE_CHAT,
            "subject": {},
            "extra": {},
        }
        client.generate_message_xml(mess_data)
        message_elt = mess_data["xml"]
        verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb))
        verb_elt["id"] = session_id
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        return mess_data

    async def _on_initiate_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        iq_elt: domish.Element,
        jingle_elt: domish.Element,
    ) -> bool:
        peer_jid = session["peer_jid"]
        if peer_jid.resource:
            return True

        try:
            infos = await self.host.memory.disco.get_infos(client, peer_jid)
        except error.StanzaError as e:
            if e.condition == "service-unavailable":
                categories = {}
            else:
                raise e
        else:
            categories = {c for c, __ in infos.identities}
        if "component" in categories:
            # we don't use message initiation with components
            return True

        if peer_jid.userhostJID() not in client.roster:
            # if the contact is not in our roster, we need to send a directed presence
            # according to XEP-0353 §3.1
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "propose", session["id"])
        message_elt = mess_data["xml"]
        for content_data in session["contents"].values():
            # we get the full element build by the application plugin
            jingle_description_elt = content_data["application_data"]["desc_elt"]
            # and copy it to only keep the root <description> element, no children
            description_elt = domish.Element(
                (jingle_description_elt.uri, jingle_description_elt.name),
                defaultUri=jingle_description_elt.defaultUri,
                attribs=jingle_description_elt.attributes,
                localPrefixes=jingle_description_elt.localPrefixes,
            )
            message_elt.propose.addChild(description_elt)
        response_d = defer.Deferred()
        # we wait for 2 min before cancelling the session init
        # FIXME: let's application decide timeout?
        response_d.addTimeout(2 * 60, reactor)
        client._xep_0353_pending_sessions[session["id"]] = response_d
        try:
            await client.send_message_data(mess_data)
            accepting_jid = await response_d
        except defer.TimeoutError:
            log.warning(
                _("Message initiation with {peer_jid} timed out").format(
                    peer_jid=peer_jid
                )
            )
        except exceptions.CancelError as e:
            for content in session["contents"].values():
                await content["application"].handler.jingle_preflight_cancel(
                    client, session, e
                )

            self._j.delete_session(client, session["id"])
            return False
        else:
            if iq_elt["to"] != accepting_jid.userhost():
                raise exceptions.InternalError(
                    f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ "
                    f"from bare JID of the accepting entity ({accepting_jid!r}), this "
                    "may be a sign of an internal bug, a hack attempt, or a MITM attack!"
                )
            iq_elt["to"] = accepting_jid.full()
            session["peer_jid"] = accepting_jid
        finally:
            del client._xep_0353_pending_sessions[session["id"]]
        return True

    def _terminate_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        reason_elt: domish.Element
    ) -> bool:
        session_id = session["id"]
        try:
            response_d = client._xep_0353_pending_sessions[session_id]
        except KeyError:
            return True
        # we have a XEP-0353 session, that means that we are retracting a proposed session
        mess_data = self.build_message_data(
            client, session["peer_jid"], "retract", session_id
        )
        defer.ensureDeferred(client.send_message_data(mess_data))
        response_d.errback(RetractException())

        return False

    async def _on_message_received(self, client, message_elt, post_treat):
        for elt in message_elt.elements():
            if elt.uri == NS_JINGLE_MESSAGE:
                if elt.name == "propose":
                    return await self._handle_propose(client, message_elt, elt)
                elif elt.name == "retract":
                    return self._handle_retract(client, message_elt, elt)
                elif elt.name == "proceed":
                    return self._handle_proceed(client, message_elt, elt)
                elif elt.name == "accept":
                    return self._handle_accept(client, message_elt, elt)
                elif elt.name == "reject":
                    return self._handle_reject(client, message_elt, elt)
                elif elt.name == "ringing":
                    return await self._handle_ringing(client, message_elt, elt)
                else:
                    log.warning(f"invalid element: {elt.toXml}")
                    return True
        return True

    def _get_sid_and_response_d(
        self,
        client: SatXMPPEntity,
        elt: domish.Element
    ) -> tuple[str, defer.Deferred]:
        """Retrieve session ID and response_d from response element"""
        try:
            session_id = elt["id"]
        except KeyError as e:
            assert elt.parent is not None
            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
            raise e
        try:
            response_d = client._xep_0353_pending_sessions[session_id]
        except KeyError as e:
            log.warning(
                _(
                    "no pending session found with id {session_id}, did it timed out?"
                ).format(session_id=session_id)
            )
            raise e
        return session_id, response_d

    async def _handle_propose(self, client, message_elt, elt):
        peer_jid = jid.JID(message_elt["from"])
        local_jid = jid.JID(message_elt["to"])
        session_id = elt["id"]
        try:
            desc_and_apps = [
                (description_elt, self._j.get_application(description_elt.uri))
                for description_elt in elt.elements()
                if description_elt.name == "description"
            ]
            if not desc_and_apps:
                raise AttributeError
        except AttributeError:
            log.warning(f"Invalid propose element: {message_elt.toXml()}")
            return False
        except exceptions.NotFound:
            log.warning(
                f"There is not registered application to handle this "
                f"proposal: {elt.toXml()}"
            )
            return False

        if not desc_and_apps:
            log.warning("No application specified: {message_elt.toXml()}")
            return False

        session = self._j.create_session(
            client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
        )

        is_in_roster = peer_jid.userhostJID() in client.roster
        if is_in_roster:
            # we indicate that device is ringing as explained in
            # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user
            # is in roster to avoid presence leak of all our devices.
            mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
            await client.send_message_data(mess_data)

        for description_elt, application in desc_and_apps:
            try:
                await application.handler.jingle_preflight(
                    client, session, description_elt
                )
            except exceptions.CancelError as e:
                log.info(f"{client.profile} refused the session: {e}")

                if is_in_roster:
                    # peer is in our roster, we send reject to them, ou other devices will
                    # get carbon copies
                    reject_dest_jid = peer_jid
                else:
                    # peer is not in our roster, we send the "reject" only to our own
                    # devices to make them stop ringing/doing notification, and we don't
                    # send anything to peer to avoid presence leak.
                    reject_dest_jid = client.jid.userhostJID()

                mess_data = self.build_message_data(
                    client, reject_dest_jid, "reject", session_id
                )
                await client.send_message_data(mess_data)
                self._j.delete_session(client, session_id)

                return False
            except defer.CancelledError:
                # raised when call is retracted before user can reply
                self._j.delete_session(client, session_id)
                return False

        if peer_jid.userhostJID() not in client.roster:
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
        await client.send_message_data(mess_data)

        return False

    def _handle_retract(self, client, message_elt, retract_elt):
        try:
            session = self._j.get_session(client, retract_elt["id"])
        except KeyError:
            log.warning(f"invalid retract element: {message_elt.toXml()}")
            return False
        except exceptions.NotFound:
            log.warning(f"no session found with ID {retract_elt['id']}")
            return False
        log.debug(
            f"{message_elt['from']} are retracting their proposal {retract_elt['id']}"
        )
        try:
            cancellable_deferred = session["cancellable_deferred"]
            if not cancellable_deferred:
                raise KeyError
        except KeyError:
            self._j.delete_session(client, session["id"])
        else:
            for d in cancellable_deferred:
                d.cancel()
        return False

    def _handle_proceed(self, client, message_elt, proceed_elt):
        try:
            __, response_d = self._get_sid_and_response_d(client, proceed_elt)
        except KeyError:
            return True

        response_d.callback(jid.JID(message_elt["from"]))
        return False

    def _handle_accept(self, client, message_elt, accept_elt):
        pass

    def _handle_reject(self, client, message_elt, reject_elt):
        try:
            __, response_d = self._get_sid_and_response_d(client, reject_elt)
        except KeyError:
            return True
        reason_elt = self._j.get_reason_elt(reject_elt)
        reason, text = self._j.parse_reason_elt(reason_elt)
        if reason is None:
            reason = "busy"

        response_d.errback(RejectException(reason, text))
        return False

    async def _handle_ringing(self, client, message_elt, ringing_elt):
        session_id = ringing_elt["id"]
        try:
            session = self._j.get_session(client, session_id)
        except exceptions.NotFound:
            log.warning(f"Session {session_id!r} unknown, ignoring ringing.")
            return False
        for __, content_data in session["contents"].items():
            await content_data["application"].handler.jingle_preflight_info(
                client, session, "ringing", None
            )

        return False


@implementer(iwokkel.IDisco)
class Handler(xmlstream.XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_MESSAGE)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []