view libervia/backend/plugins/plugin_xep_0353.py @ 4234:67de9ed101aa

docker (e2e): add GStreamer dependencies to test WebRTC stack: rel 424
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 15:04:01 +0200
parents e11b13418ba6
children 79c8a70e1813
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Jingle Message Initiation (XEP-0353)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.internet import defer
from twisted.internet import reactor
from twisted.words.protocols.jabber import error, jid
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.xml_tools import element_copy

try:
    from .plugin_xep_0167 import NS_JINGLE_RTP
except ImportError:
    NS_JINGLE_RTP = None

log = getLogger(__name__)


NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle Message Initiation",
    C.PI_IMPORT_NAME: "XEP-0353",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0353"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"],
    C.PI_MAIN: "XEP_0353",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""),
}


class RejectException(exceptions.CancelError):

    def __init__(self, reason: str, text: str|None = None):
        super().__init__(text)
        self.reason = reason


class TakenByOtherDeviceException(exceptions.CancelError):
    reason: str = "taken_by_other_device"

    def __init__(self, device_jid: jid.JID):
        super().__init__(device_jid.full())
        self.device_jid = device_jid


class RetractException(exceptions.CancelError):
    pass


class XEP_0353:
    def __init__(self, host):
        log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME]))
        self.host = host
        host.register_namespace("jingle-message", NS_JINGLE_MESSAGE)
        self._j = host.plugins["XEP-0166"]
        self._h = host.plugins["XEP-0334"]
        host.trigger.add_with_check(
            "XEP-0166_initiate_elt_built",
            self,
            self._on_initiate_trigger,
            # this plugin set the resource, we want it to happen first so other triggers
            # can get the full peer JID
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add_with_check(
            "XEP-0166_terminate",
            self,
            self._terminate_trigger,
            priority=host.trigger.MAX_PRIORITY,
        )
        host.trigger.add("message_received", self._on_message_received)

    def get_handler(self, client):
        return Handler()

    def profile_connecting(self, client):
        # mapping from session id to deferred used to wait for destinee answer
        client._xep_0353_pending_sessions = {}

    def build_message_data(self, client, peer_jid, verb, session_id):
        mess_data = {
            "from": client.jid,
            "to": peer_jid,
            "uid": "",
            "message": {},
            "type": C.MESS_TYPE_CHAT,
            "subject": {},
            "extra": {},
        }
        client.generate_message_xml(mess_data)
        message_elt = mess_data["xml"]
        verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb))
        verb_elt["id"] = session_id
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        return mess_data

    async def _on_initiate_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        iq_elt: domish.Element,
        jingle_elt: domish.Element,
    ) -> bool:
        peer_jid = session["peer_jid"]
        if peer_jid.resource:
            return True

        try:
            infos = await self.host.memory.disco.get_infos(client, peer_jid)
        except error.StanzaError as e:
            if e.condition == "service-unavailable":
                categories = {}
            else:
                raise e
        else:
            categories = {c for c, __ in infos.identities}
        if "component" in categories:
            # we don't use message initiation with components
            return True

        if peer_jid.userhostJID() not in client.roster:
            # if the contact is not in our roster, we need to send a directed presence
            # according to XEP-0353 §3.1
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "propose", session["id"])
        message_elt = mess_data["xml"]
        for content_data in session["contents"].values():
            # we get the full element build by the application plugin
            jingle_description_elt = content_data["application_data"]["desc_elt"]

            # we need to copy the element
            if jingle_description_elt.uri == NS_JINGLE_RTP:
                # for RTP, we only keep the root <description> element, no children
                description_elt = domish.Element(
                    (jingle_description_elt.uri, jingle_description_elt.name),
                    defaultUri=jingle_description_elt.defaultUri,
                    attribs=jingle_description_elt.attributes,
                    localPrefixes=jingle_description_elt.localPrefixes,
                )
            else:
                # Otherwise we keep the children to have application useful data
                description_elt = element_copy(jingle_description_elt, with_parent=False)

            message_elt.propose.addChild(description_elt)
        response_d = defer.Deferred()
        # we wait for 2 min before cancelling the session init
        # FIXME: let's application decide timeout?
        response_d.addTimeout(2 * 60, reactor)
        client._xep_0353_pending_sessions[session["id"]] = response_d
        try:
            await client.send_message_data(mess_data)
            accepting_jid = await response_d
        except defer.TimeoutError:
            log.warning(
                _("Message initiation with {peer_jid} timed out").format(
                    peer_jid=peer_jid
                )
            )
        except exceptions.CancelError as e:
            for content in session["contents"].values():
                await content["application"].handler.jingle_preflight_cancel(
                    client, session, e
                )

            self._j.delete_session(client, session["id"])
            return False
        else:
            if iq_elt["to"] != accepting_jid.userhost():
                raise exceptions.InternalError(
                    f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ "
                    f"from bare JID of the accepting entity ({accepting_jid!r}), this "
                    "may be a sign of an internal bug, a hack attempt, or a MITM attack!"
                )
            iq_elt["to"] = accepting_jid.full()
            session["peer_jid"] = accepting_jid
        finally:
            del client._xep_0353_pending_sessions[session["id"]]
        return True

    def _terminate_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        reason_elt: domish.Element
    ) -> bool:
        session_id = session["id"]
        try:
            response_d = client._xep_0353_pending_sessions[session_id]
        except KeyError:
            return True
        # we have a XEP-0353 session, that means that we are retracting a proposed session
        mess_data = self.build_message_data(
            client, session["peer_jid"], "retract", session_id
        )
        defer.ensureDeferred(client.send_message_data(mess_data))
        response_d.errback(RetractException())

        return False

    async def _on_message_received(self, client, message_elt, post_treat):
        for elt in message_elt.elements():
            if elt.uri == NS_JINGLE_MESSAGE:
                # We use ensureDeferred to process the message initiation workflow in
                # parallel and to avoid blocking the message queue.
                defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt))
                return False
        return True

    async def _handle_mess_init(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        mess_init_elt: domish.Element
    ) -> None:
        if mess_init_elt.name == "propose":
            await self._handle_propose(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "retract":
            self._handle_retract(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "proceed":
            self._handle_proceed(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "accept":
            self._handle_accept(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "reject":
            self._handle_reject(client, message_elt, mess_init_elt)
        elif mess_init_elt.name == "ringing":
            await self._handle_ringing(client, message_elt, mess_init_elt)
        else:
            log.warning(f"invalid element: {mess_init_elt.toXml}")

    def _get_sid_and_session_d(
        self,
        client: SatXMPPEntity,
        elt: domish.Element
    ) -> tuple[str, defer.Deferred|list[defer.Deferred]]:
        """Retrieve session ID and deferred or list of deferred from response element"""
        try:
            session_id = elt["id"]
        except KeyError as e:
            assert elt.parent is not None
            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
            raise e
        try:
            session_d = client._xep_0353_pending_sessions[session_id]
        except KeyError as e:
            log.warning(
                _(
                    "no pending session found with id {session_id}, did it timed out?"
                ).format(session_id=session_id)
            )
            raise e
        return session_id, session_d

    def _get_sid_and_response_d(
        self,
        client: SatXMPPEntity,
        elt: domish.Element
    ) -> tuple[str, defer.Deferred]:
        """Retrieve session ID and response_d from response element"""
        session_id, response_d = self._get_sid_and_session_d(client, elt)
        assert isinstance(response_d, defer.Deferred)
        return session_id, response_d

    def _get_sid_and_preflight_d_list(
        self,
        client: SatXMPPEntity,
        elt: domish.Element
    ) -> tuple[str, list[defer.Deferred]]:
        """Retrieve session ID and list of preflight_d from response element"""
        session_id, preflight_d_list = self._get_sid_and_session_d(client, elt)
        assert isinstance(preflight_d_list, list)
        return session_id, preflight_d_list

    async def _handle_propose(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        elt: domish.Element
    ) -> None:
        peer_jid = jid.JID(message_elt["from"])
        local_jid = jid.JID(message_elt["to"])
        session_id = elt["id"]
        try:
            desc_and_apps = [
                (description_elt, self._j.get_application(description_elt.uri))
                for description_elt in elt.elements()
                if description_elt.name == "description"
            ]
            if not desc_and_apps:
                raise AttributeError
        except AttributeError:
            log.warning(f"Invalid propose element: {message_elt.toXml()}")
            return
        except exceptions.NotFound:
            log.warning(
                f"There is not registered application to handle this "
                f"proposal: {elt.toXml()}"
            )
            return

        if not desc_and_apps:
            log.warning("No application specified: {message_elt.toXml()}")
            return

        session = self._j.create_session(
            client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
        )

        is_in_roster = peer_jid.userhostJID() in client.roster
        if is_in_roster:
            # we indicate that device is ringing as explained in
            # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user
            # is in roster to avoid presence leak of all our devices.
            mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
            await client.send_message_data(mess_data)

        try:
            for description_elt, application in desc_and_apps:
                try:
                    preflight_d = defer.ensureDeferred(
                        application.handler.jingle_preflight(
                            client, session, description_elt
                        )
                    )
                    client._xep_0353_pending_sessions.setdefault(session_id, []).append(
                        preflight_d
                    )
                    await preflight_d
                except TakenByOtherDeviceException as e:
                    log.info(f"The call has been takend by {e.device_jid}")
                    await application.handler.jingle_preflight_cancel(client, session, e)
                    self._j.delete_session(client, session_id)
                    return
                except exceptions.CancelError as e:
                    log.info(f"{client.profile} refused the session: {e}")

                    if is_in_roster:
                        # peer is in our roster, we send reject to them, ou other devices
                        # will get carbon copies
                        reject_dest_jid = peer_jid
                    else:
                        # peer is not in our roster, we send the "reject" only to our own
                        # devices to make them stop ringing/doing notification, and we
                        # don't send anything to peer to avoid presence leak.
                        reject_dest_jid = client.jid.userhostJID()

                    mess_data = self.build_message_data(
                        client, reject_dest_jid, "reject", session_id
                    )
                    await client.send_message_data(mess_data)
                    self._j.delete_session(client, session_id)
                    return
                except defer.CancelledError:
                    # raised when call is retracted before user can reply
                    self._j.delete_session(client, session_id)
                    return
        finally:
            try:
                del client._xep_0353_pending_sessions[session_id]
            except KeyError:
                pass

        if peer_jid.userhostJID() not in client.roster:
            await client.presence.available(peer_jid)

        mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
        await client.send_message_data(mess_data)

    def _handle_retract(self, client, message_elt, retract_elt):
        try:
            session = self._j.get_session(client, retract_elt["id"])
        except KeyError:
            log.warning(f"invalid retract element: {message_elt.toXml()}")
            return False
        except exceptions.NotFound:
            log.warning(f"no session found with ID {retract_elt['id']}")
            return False
        log.debug(
            f"{message_elt['from']} are retracting their proposal {retract_elt['id']}"
        )
        try:
            cancellable_deferred = session["cancellable_deferred"]
            if not cancellable_deferred:
                raise KeyError
        except KeyError:
            self._j.delete_session(client, session["id"])
        else:
            for d in cancellable_deferred:
                d.cancel()
        return False

    def _handle_proceed(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        proceed_elt: domish.Element
    ) -> None:
        from_jid = jid.JID(message_elt["from"])
        # session_d is the deferred of the session, it can be preflight_d or response_d
        if from_jid.userhostJID() == client.jid.userhostJID():
            # an other device took the session
            try:
                sid, preflight_d_list = self._get_sid_and_preflight_d_list(
                    client, proceed_elt
                )
            except KeyError:
                return
            for preflight_d in preflight_d_list:
                if not preflight_d.called:
                    preflight_d.errback(TakenByOtherDeviceException(from_jid))

                try:
                    session = self._j.get_session(client, sid)
                except exceptions.NotFound:
                    log.warning("No session found with sid {sid!r}.")
                else:
                    # jingle_preflight_cancel?
                    pass

            # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always
            #   cleaned correctly (use a timeout?)

        else:
            try:
                __, response_d = self._get_sid_and_response_d(client, proceed_elt)
            except KeyError:
                return
            # we have a response deferred
            response_d.callback(jid.JID(message_elt["from"]))

    def _handle_accept(self, client, message_elt, accept_elt):
        pass

    def _handle_reject(self, client, message_elt, reject_elt):
        try:
            __, response_d = self._get_sid_and_response_d(client, reject_elt)
        except KeyError:
            return True
        reason_elt = self._j.get_reason_elt(reject_elt)
        reason, text = self._j.parse_reason_elt(reason_elt)
        if reason is None:
            reason = "busy"

        response_d.errback(RejectException(reason, text))
        return False

    async def _handle_ringing(self, client, message_elt, ringing_elt):
        session_id = ringing_elt["id"]
        try:
            session = self._j.get_session(client, session_id)
        except exceptions.NotFound:
            log.warning(f"Session {session_id!r} unknown, ignoring ringing.")
            return False
        for __, content_data in session["contents"].items():
            await content_data["application"].handler.jingle_preflight_info(
                client, session, "ringing", None
            )

        return False


@implementer(iwokkel.IDisco)
class Handler(xmlstream.XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_MESSAGE)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []