diff libervia/backend/plugins/plugin_xep_0374.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0374.py@c23cad65ae99
children 040095a5dc7f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0374.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,425 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for OpenPGP for XMPP Instant Messaging
+# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Dict, Optional, Set, cast
+
+from typing_extensions import Final
+from wokkel import muc  # type: ignore[import]
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.log import getLogger, Logger
+from libervia.backend.core.sat_main import SAT
+from libervia.backend.core.xmpp import SatXMPPClient
+from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
+from libervia.backend.plugins.plugin_xep_0334 import XEP_0334
+from libervia.backend.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
+from libervia.backend.tools import xml_tools
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+
+__all__ = [  # pylint: disable=unused-variable
+    "PLUGIN_INFO",
+    "XEP_0374",
+    "NS_OXIM"
+]
+
+
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "OXIM",
+    C.PI_IMPORT_NAME: "XEP-0374",
+    C.PI_TYPE: "SEC",
+    C.PI_PROTOCOLS: [ "XEP-0374" ],
+    C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
+    C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
+    C.PI_MAIN: "XEP_0374",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
+}
+
+
+# The disco feature
+NS_OXIM: Final = "urn:xmpp:openpgp:im:0"
+
+
+class XEP_0374:
+    """
+    Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
+    namespace. MUC messages are supported next to one to one messages. For trust
+    management, the two trust models "BTBV" and "manual" are supported.
+    """
+
+    def __init__(self, sat: SAT) -> None:
+        """
+        @param sat: The SAT instance.
+        """
+
+        self.__sat = sat
+
+        # Plugins
+        self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
+        self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
+        self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])
+
+        # Triggers
+        sat.trigger.add(
+            "message_received",
+            self.__message_received_trigger,
+            priority=100050
+        )
+        sat.trigger.add("send", self.__send_trigger, priority=0)
+
+        # Register the encryption plugin
+        sat.register_encryption_plugin(self, "OXIM", NS_OX, 102)
+
+    async def get_trust_ui(  # pylint: disable=invalid-name
+        self,
+        client: SatXMPPClient,
+        entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI instance which opens a form to manage the trust level of all
+            devices belonging to the entity.
+        """
+
+        return await self.__xep_0373.get_trust_ui(client, entity)
+
+    @staticmethod
+    def __get_joined_muc_users(
+        client: SatXMPPClient,
+        xep_0045: XEP_0045,
+        room_jid: jid.JID
+    ) -> Set[jid.JID]:
+        """
+        @param client: The client.
+        @param xep_0045: A MUC plugin instance.
+        @param room_jid: The room JID.
+        @return: A set containing the bare JIDs of the MUC participants.
+        @raise InternalError: if the MUC is not joined or the entity information of a
+            participant isn't available.
+        """
+
+        bare_jids: Set[jid.JID] = set()
+
+        try:
+            room = cast(muc.Room, xep_0045.get_room(client, room_jid))
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(
+                "Participant list of unjoined MUC requested."
+            ) from e
+
+        for user in cast(Dict[str, muc.User], room.roster).values():
+            entity = cast(Optional[SatXMPPEntity], user.entity)
+            if entity is None:
+                raise exceptions.InternalError(
+                    f"Participant list of MUC requested, but the entity information of"
+                    f" the participant {user} is not available."
+                )
+
+            bare_jids.add(entity.jid.userhostJID())
+
+        return bare_jids
+
+    async def __message_received_trigger(
+        self,
+        client: SatXMPPClient,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """
+        @param client: The client which received the message.
+        @param message_elt: The message element. Can be modified.
+        @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
+            message has fully progressed through the message receiving flow. Can be used
+            to apply treatments to the fully processed message, like marking it as
+            encrypted.
+        @return: Whether to continue the message received flow.
+        """
+        sender_jid = jid.JID(message_elt["from"])
+        feedback_jid: jid.JID
+
+        message_type = message_elt.getAttribute("type", "unknown")
+        is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                log.warning(
+                    "Ignoring MUC message since plugin XEP-0045 is not available."
+                )
+                # Can't handle a MUC message without XEP-0045, let the flow continue
+                # normally
+                return True
+
+            room_jid = feedback_jid = sender_jid.userhostJID()
+
+            try:
+                room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid))
+            except exceptions.NotFound:
+                log.warning(
+                    f"Ignoring MUC message from a room that has not been joined:"
+                    f" {room_jid}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
+            if sender_user is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's user"
+                    f" wasn't found {sender_jid.resource}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
+            if sender_user_jid is None:
+                log.warning(
+                    f"Ignoring MUC message from room {room_jid} since the sender's bare"
+                    f" JID couldn't be found from its user information: {sender_user}"
+                )
+                # Whatever, let the flow continue
+                return True
+
+            sender_jid = sender_user_jid
+        else:
+            # I'm not sure why this check is required, this code is copied from XEP-0384
+            if sender_jid.userhostJID() == client.jid.userhostJID():
+                try:
+                    feedback_jid = jid.JID(message_elt["to"])
+                except KeyError:
+                    feedback_jid = client.server_jid
+            else:
+                feedback_jid = sender_jid
+
+        sender_bare_jid = sender_jid.userhost()
+
+        openpgp_elt = cast(Optional[domish.Element], next(
+            message_elt.elements(NS_OX, "openpgp"),
+            None
+        ))
+
+        if openpgp_elt is None:
+            # None of our business, let the flow continue
+            return True
+
+        try:
+            payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
+                client,
+                openpgp_elt,
+                "signcrypt",
+                jid.JID(sender_bare_jid)
+            )
+        except Exception as e:
+            # TODO: More specific exception handling
+            log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
+                reason=e,
+                xml=message_elt.toXml()
+            ))
+            client.feedback(
+                feedback_jid,
+                D_(
+                    f"An OXIM message from {sender_jid.full()} can't be decrypted:"
+                    f" {e}"
+                ),
+                { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
+            )
+            # No point in further processing this message
+            return False
+
+        message_elt.children.remove(openpgp_elt)
+
+        log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")
+
+        # Remove all body elements from the original element, since those act as
+        # fallbacks in case the encryption protocol is not supported
+        for child in message_elt.elements():
+            if child.name == "body":
+                message_elt.children.remove(child)
+
+        # Move all extension elements from the payload to the stanza root
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(payload_elt.elements()):
+            # Remove the child from the content element
+            payload_elt.children.remove(child)
+
+            # Add the child to the stanza
+            message_elt.addChild(child)
+
+        # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
+        trust_level = TrustLevel.UNDECIDED  # TODO: Load the actual trust level
+        if trust_level is TrustLevel.TRUSTED:
+            post_treat.addCallback(client.encryption.mark_as_trusted)
+        else:
+            post_treat.addCallback(client.encryption.mark_as_untrusted)
+
+        # Mark the message as originally encrypted
+        post_treat.addCallback(
+            client.encryption.mark_as_encrypted,
+            namespace=NS_OX
+        )
+
+        # Message processed successfully, continue with the flow
+        return True
+
+    async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
+        """
+        @param client: The client sending this message.
+        @param stanza: The stanza that is about to be sent. Can be modified.
+        @return: Whether the send message flow should continue or not.
+        """
+        # OXIM only handles message stanzas
+        if stanza.name != "message":
+            return True
+
+        # Get the intended recipient
+        recipient = stanza.getAttribute("to", None)
+        if recipient is None:
+            raise exceptions.InternalError(
+                f"Message without recipient encountered. Blocking further processing to"
+                f" avoid leaking plaintext data: {stanza.toXml()}"
+            )
+
+        # Parse the JID
+        recipient_bare_jid = jid.JID(recipient).userhostJID()
+
+        # Check whether encryption with OXIM is requested
+        encryption = client.encryption.getSession(recipient_bare_jid)
+
+        if encryption is None:
+            # Encryption is not requested for this recipient
+            return True
+
+        if encryption["plugin"].namespace != NS_OX:
+            # Encryption is requested for this recipient, but not with OXIM
+            return True
+
+        # All pre-checks done, we can start encrypting!
+        await self.__encrypt(
+            client,
+            stanza,
+            recipient_bare_jid,
+            stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
+        )
+
+        # Add a store hint if this is a message stanza
+        self.__xep_0334.add_hint_elements(stanza, [ "store" ])
+
+        # Let the flow continue.
+        return True
+
+    async def __encrypt(
+        self,
+        client: SatXMPPClient,
+        stanza: domish.Element,
+        recipient_jid: jid.JID,
+        is_muc_message: bool
+    ) -> None:
+        """
+        @param client: The client.
+        @param stanza: The stanza, which is modified by this call.
+        @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
+            but doesn't have to.
+        @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
+
+        @warning: The calling code MUST take care of adding the store message processing
+            hint to the stanza if applicable! This can be done before or after this call,
+            the order doesn't matter.
+        """
+
+        recipient_bare_jids: Set[jid.JID]
+        feedback_jid: jid.JID
+
+        if is_muc_message:
+            if self.__xep_0045 is None:
+                raise exceptions.InternalError(
+                    "Encryption of MUC message requested, but plugin XEP-0045 is not"
+                    " available."
+                )
+
+            room_jid = feedback_jid = recipient_jid.userhostJID()
+
+            recipient_bare_jids = self.__get_joined_muc_users(
+                client,
+                self.__xep_0045,
+                room_jid
+            )
+        else:
+            recipient_bare_jids = { recipient_jid.userhostJID() }
+            feedback_jid = recipient_jid.userhostJID()
+
+        log.debug(
+            f"Intercepting message that is to be encrypted by {NS_OX} for"
+            f" {recipient_bare_jids}"
+        )
+
+        signcrypt_elt, payload_elt = \
+            self.__xep_0373.build_signcrypt_element(recipient_bare_jids)
+
+        # Move elements from the stanza to the content element.
+        # TODO: There should probably be explicitly forbidden elements here too, just as
+        # for XEP-0420
+        for child in list(stanza.elements()):
+            if child.name == "openpgp" and child.uri == NS_OX:
+                log.debug("not re-encrypting encrypted OX element")
+                continue
+            # Remove the child from the stanza
+            stanza.children.remove(child)
+
+            # A namespace of ``None`` can be used on domish elements to inherit the
+            # namespace from the parent. When moving elements from the stanza root to
+            # the content element, however, we don't want elements to inherit the
+            # namespace of the content element. Thus, check for elements with ``None``
+            # for their namespace and set the namespace to jabber:client, which is the
+            # namespace of the parent element.
+            if child.uri is None:
+                child.uri = C.NS_CLIENT
+                child.defaultUri = C.NS_CLIENT
+
+            # Add the child with corrected namespaces to the content element
+            payload_elt.addChild(child)
+
+        try:
+            openpgp_elt = await self.__xep_0373.build_openpgp_element(
+                client,
+                signcrypt_elt,
+                recipient_bare_jids
+            )
+        except Exception as e:
+            msg = _(
+                # pylint: disable=consider-using-f-string
+                "Can't encrypt message for {entities}: {reason}".format(
+                    entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
+                    reason=e
+                )
+            )
+            log.warning(msg)
+            client.feedback(feedback_jid, msg, {
+                C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
+            })
+            raise e
+
+        stanza.addChild(openpgp_elt)