view sat/plugins/plugin_xep_0374.py @ 4007:1d5a81e3c9e8

plugin XEP-0384: skip MessageReceived trigger when in a component: OMEMO is not used in components so far, but the trigger is trying to request OMEMO PEP nodes, which causes an error with virtual pubsub service of AP component.
author Goffi <goffi@goffi.org>
date Thu, 16 Mar 2023 12:31:24 +0100
parents f461f11ea176
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for OpenPGP for XMPP Instant Messaging
# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Dict, Optional, Set, cast

from typing_extensions import Final
from wokkel import muc  # type: ignore[import]

from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.core_types import SatXMPPEntity
from sat.core.i18n import _, D_
from sat.core.log import getLogger, Logger
from sat.core.sat_main import SAT
from sat.core.xmpp import SatXMPPClient
from sat.plugins.plugin_xep_0045 import XEP_0045
from sat.plugins.plugin_xep_0334 import XEP_0334
from sat.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
from sat.tools import xml_tools
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish


__all__ = [  # pylint: disable=unused-variable
    "PLUGIN_INFO",
    "XEP_0374",
    "NS_OXIM"
]


log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]


PLUGIN_INFO = {
    C.PI_NAME: "OXIM",
    C.PI_IMPORT_NAME: "XEP-0374",
    C.PI_TYPE: "SEC",
    C.PI_PROTOCOLS: [ "XEP-0374" ],
    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
    C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
    C.PI_MAIN: "XEP_0374",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
}


# The disco feature
NS_OXIM: Final = "urn:xmpp:openpgp:im:0"


class XEP_0374:
    """
    Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
    namespace. MUC messages are supported next to one to one messages. For trust
    management, the two trust models "BTBV" and "manual" are supported.
    """

    def __init__(self, sat: SAT) -> None:
        """
        @param sat: The SAT instance.
        """

        self.__sat = sat

        # Plugins
        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
        self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])

        # Triggers
        sat.trigger.add(
            "messageReceived",
            self.__message_received_trigger,
            priority=100050
        )
        sat.trigger.add("send", self.__send_trigger, priority=0)

        # Register the encryption plugin
        sat.registerEncryptionPlugin(self, "OXIM", NS_OX, 102)

    async def getTrustUI(  # pylint: disable=invalid-name
        self,
        client: SatXMPPClient,
        entity: jid.JID
    ) -> xml_tools.XMLUI:
        """
        @param client: The client.
        @param entity: The entity whose device trust levels to manage.
        @return: An XMLUI instance which opens a form to manage the trust level of all
            devices belonging to the entity.
        """

        return await self.__xep_0373.getTrustUI(client, entity)

    @staticmethod
    def __get_joined_muc_users(
        client: SatXMPPClient,
        xep_0045: XEP_0045,
        room_jid: jid.JID
    ) -> Set[jid.JID]:
        """
        @param client: The client.
        @param xep_0045: A MUC plugin instance.
        @param room_jid: The room JID.
        @return: A set containing the bare JIDs of the MUC participants.
        @raise InternalError: if the MUC is not joined or the entity information of a
            participant isn't available.
        """

        bare_jids: Set[jid.JID] = set()

        try:
            room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
        except exceptions.NotFound as e:
            raise exceptions.InternalError(
                "Participant list of unjoined MUC requested."
            ) from e

        for user in cast(Dict[str, muc.User], room.roster).values():
            entity = cast(Optional[SatXMPPEntity], user.entity)
            if entity is None:
                raise exceptions.InternalError(
                    f"Participant list of MUC requested, but the entity information of"
                    f" the participant {user} is not available."
                )

            bare_jids.add(entity.jid.userhostJID())

        return bare_jids

    async def __message_received_trigger(
        self,
        client: SatXMPPClient,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        """
        @param client: The client which received the message.
        @param message_elt: The message element. Can be modified.
        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
            message has fully progressed through the message receiving flow. Can be used
            to apply treatments to the fully processed message, like marking it as
            encrypted.
        @return: Whether to continue the message received flow.
        """
        sender_jid = jid.JID(message_elt["from"])
        feedback_jid: jid.JID

        message_type = message_elt.getAttribute("type", "unknown")
        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
        if is_muc_message:
            if self.__xep_0045 is None:
                log.warning(
                    "Ignoring MUC message since plugin XEP-0045 is not available."
                )
                # Can't handle a MUC message without XEP-0045, let the flow continue
                # normally
                return True

            room_jid = feedback_jid = sender_jid.userhostJID()

            try:
                room = cast(muc.Room, self.__xep_0045.getRoom(client, room_jid))
            except exceptions.NotFound:
                log.warning(
                    f"Ignoring MUC message from a room that has not been joined:"
                    f" {room_jid}"
                )
                # Whatever, let the flow continue
                return True

            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
            if sender_user is None:
                log.warning(
                    f"Ignoring MUC message from room {room_jid} since the sender's user"
                    f" wasn't found {sender_jid.resource}"
                )
                # Whatever, let the flow continue
                return True

            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
            if sender_user_jid is None:
                log.warning(
                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
                    f" JID couldn't be found from its user information: {sender_user}"
                )
                # Whatever, let the flow continue
                return True

            sender_jid = sender_user_jid
        else:
            # I'm not sure why this check is required, this code is copied from XEP-0384
            if sender_jid.userhostJID() == client.jid.userhostJID():
                try:
                    feedback_jid = jid.JID(message_elt["to"])
                except KeyError:
                    feedback_jid = client.server_jid
            else:
                feedback_jid = sender_jid

        sender_bare_jid = sender_jid.userhost()

        openpgp_elt = cast(Optional[domish.Element], next(
            message_elt.elements(NS_OX, "openpgp"),
            None
        ))

        if openpgp_elt is None:
            # None of our business, let the flow continue
            return True

        try:
            payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
                client,
                openpgp_elt,
                "signcrypt",
                jid.JID(sender_bare_jid)
            )
        except Exception as e:
            # TODO: More specific exception handling
            log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
                reason=e,
                xml=message_elt.toXml()
            ))
            client.feedback(
                feedback_jid,
                D_(
                    f"An OXIM message from {sender_jid.full()} can't be decrypted:"
                    f" {e}"
                ),
                { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
            )
            # No point in further processing this message
            return False

        message_elt.children.remove(openpgp_elt)

        log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")

        # Remove all body elements from the original element, since those act as
        # fallbacks in case the encryption protocol is not supported
        for child in message_elt.elements():
            if child.name == "body":
                message_elt.children.remove(child)

        # Move all extension elements from the payload to the stanza root
        # TODO: There should probably be explicitly forbidden elements here too, just as
        # for XEP-0420
        for child in list(payload_elt.elements()):
            # Remove the child from the content element
            payload_elt.children.remove(child)

            # Add the child to the stanza
            message_elt.addChild(child)

        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
        trust_level = TrustLevel.UNDECIDED  # TODO: Load the actual trust level
        if trust_level is TrustLevel.TRUSTED:
            post_treat.addCallback(client.encryption.markAsTrusted)
        else:
            post_treat.addCallback(client.encryption.markAsUntrusted)

        # Mark the message as originally encrypted
        post_treat.addCallback(
            client.encryption.markAsEncrypted,
            namespace=NS_OX
        )

        # Message processed successfully, continue with the flow
        return True

    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
        """
        @param client: The client sending this message.
        @param stanza: The stanza that is about to be sent. Can be modified.
        @return: Whether the send message flow should continue or not.
        """
        # OXIM only handles message stanzas
        if stanza.name != "message":
            return True

        # Get the intended recipient
        recipient = stanza.getAttribute("to", None)
        if recipient is None:
            raise exceptions.InternalError(
                f"Message without recipient encountered. Blocking further processing to"
                f" avoid leaking plaintext data: {stanza.toXml()}"
            )

        # Parse the JID
        recipient_bare_jid = jid.JID(recipient).userhostJID()

        # Check whether encryption with OXIM is requested
        encryption = client.encryption.getSession(recipient_bare_jid)

        if encryption is None:
            # Encryption is not requested for this recipient
            return True

        if encryption["plugin"].namespace != NS_OX:
            # Encryption is requested for this recipient, but not with OXIM
            return True

        # All pre-checks done, we can start encrypting!
        await self.__encrypt(
            client,
            stanza,
            recipient_bare_jid,
            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
        )

        # Add a store hint if this is a message stanza
        self.__xep_0334.addHintElements(stanza, [ "store" ])

        # Let the flow continue.
        return True

    async def __encrypt(
        self,
        client: SatXMPPClient,
        stanza: domish.Element,
        recipient_jid: jid.JID,
        is_muc_message: bool
    ) -> None:
        """
        @param client: The client.
        @param stanza: The stanza, which is modified by this call.
        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
            but doesn't have to.
        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.

        @warning: The calling code MUST take care of adding the store message processing
            hint to the stanza if applicable! This can be done before or after this call,
            the order doesn't matter.
        """

        recipient_bare_jids: Set[jid.JID]
        feedback_jid: jid.JID

        if is_muc_message:
            if self.__xep_0045 is None:
                raise exceptions.InternalError(
                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
                    " available."
                )

            room_jid = feedback_jid = recipient_jid.userhostJID()

            recipient_bare_jids = self.__get_joined_muc_users(
                client,
                self.__xep_0045,
                room_jid
            )
        else:
            recipient_bare_jids = { recipient_jid.userhostJID() }
            feedback_jid = recipient_jid.userhostJID()

        log.debug(
            f"Intercepting message that is to be encrypted by {NS_OX} for"
            f" {recipient_bare_jids}"
        )

        signcrypt_elt, payload_elt = \
            self.__xep_0373.build_signcrypt_element(recipient_bare_jids)

        # Move elements from the stanza to the content element.
        # TODO: There should probably be explicitly forbidden elements here too, just as
        # for XEP-0420
        for child in list(stanza.elements()):
            if child.name == "openpgp" and child.uri == NS_OX:
                log.debug("not re-encrypting encrypted OX element")
                continue
            # Remove the child from the stanza
            stanza.children.remove(child)

            # A namespace of ``None`` can be used on domish elements to inherit the
            # namespace from the parent. When moving elements from the stanza root to
            # the content element, however, we don't want elements to inherit the
            # namespace of the content element. Thus, check for elements with ``None``
            # for their namespace and set the namespace to jabber:client, which is the
            # namespace of the parent element.
            if child.uri is None:
                child.uri = C.NS_CLIENT
                child.defaultUri = C.NS_CLIENT

            # Add the child with corrected namespaces to the content element
            payload_elt.addChild(child)

        try:
            openpgp_elt = await self.__xep_0373.build_openpgp_element(
                client,
                signcrypt_elt,
                recipient_bare_jids
            )
        except Exception as e:
            msg = _(
                # pylint: disable=consider-using-f-string
                "Can't encrypt message for {entities}: {reason}".format(
                    entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
                    reason=e
                )
            )
            log.warning(msg)
            client.feedback(feedback_jid, msg, {
                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
            })
            raise e

        stanza.addChild(openpgp_elt)