Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0374.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0374.py@c23cad65ae99 |
children | 040095a5dc7f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0374.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,425 @@ +#!/usr/bin/env python3 + +# Libervia plugin for OpenPGP for XMPP Instant Messaging +# Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Dict, Optional, Set, cast + +from typing_extensions import Final +from wokkel import muc # type: ignore[import] + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.log import getLogger, Logger +from libervia.backend.core.sat_main import SAT +from libervia.backend.core.xmpp import SatXMPPClient +from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 +from libervia.backend.plugins.plugin_xep_0334 import XEP_0334 +from libervia.backend.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel +from libervia.backend.tools import xml_tools +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish + + +__all__ = [ # pylint: disable=unused-variable + "PLUGIN_INFO", + "XEP_0374", + "NS_OXIM" +] + + +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] + + +PLUGIN_INFO = { + C.PI_NAME: "OXIM", + C.PI_IMPORT_NAME: "XEP-0374", + C.PI_TYPE: "SEC", + C.PI_PROTOCOLS: [ "XEP-0374" ], + C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ], + C.PI_RECOMMENDATIONS: [ "XEP-0045" ], + C.PI_MAIN: "XEP_0374", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Implementation of OXIM"""), +} + + +# The disco feature +NS_OXIM: Final = "urn:xmpp:openpgp:im:0" + + +class XEP_0374: + """ + Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0`` + namespace. MUC messages are supported next to one to one messages. For trust + management, the two trust models "BTBV" and "manual" are supported. + """ + + def __init__(self, sat: SAT) -> None: + """ + @param sat: The SAT instance. + """ + + self.__sat = sat + + # Plugins + self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) + self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) + self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) + + # Triggers + sat.trigger.add( + "message_received", + self.__message_received_trigger, + priority=100050 + ) + sat.trigger.add("send", self.__send_trigger, priority=0) + + # Register the encryption plugin + sat.register_encryption_plugin(self, "OXIM", NS_OX, 102) + + async def get_trust_ui( # pylint: disable=invalid-name + self, + client: SatXMPPClient, + entity: jid.JID + ) -> xml_tools.XMLUI: + """ + @param client: The client. + @param entity: The entity whose device trust levels to manage. + @return: An XMLUI instance which opens a form to manage the trust level of all + devices belonging to the entity. + """ + + return await self.__xep_0373.get_trust_ui(client, entity) + + @staticmethod + def __get_joined_muc_users( + client: SatXMPPClient, + xep_0045: XEP_0045, + room_jid: jid.JID + ) -> Set[jid.JID]: + """ + @param client: The client. + @param xep_0045: A MUC plugin instance. + @param room_jid: The room JID. + @return: A set containing the bare JIDs of the MUC participants. + @raise InternalError: if the MUC is not joined or the entity information of a + participant isn't available. + """ + + bare_jids: Set[jid.JID] = set() + + try: + room = cast(muc.Room, xep_0045.get_room(client, room_jid)) + except exceptions.NotFound as e: + raise exceptions.InternalError( + "Participant list of unjoined MUC requested." + ) from e + + for user in cast(Dict[str, muc.User], room.roster).values(): + entity = cast(Optional[SatXMPPEntity], user.entity) + if entity is None: + raise exceptions.InternalError( + f"Participant list of MUC requested, but the entity information of" + f" the participant {user} is not available." + ) + + bare_jids.add(entity.jid.userhostJID()) + + return bare_jids + + async def __message_received_trigger( + self, + client: SatXMPPClient, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """ + @param client: The client which received the message. + @param message_elt: The message element. Can be modified. + @param post_treat: A deferred which evaluates to a :class:`MessageData` once the + message has fully progressed through the message receiving flow. Can be used + to apply treatments to the fully processed message, like marking it as + encrypted. + @return: Whether to continue the message received flow. + """ + sender_jid = jid.JID(message_elt["from"]) + feedback_jid: jid.JID + + message_type = message_elt.getAttribute("type", "unknown") + is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT + if is_muc_message: + if self.__xep_0045 is None: + log.warning( + "Ignoring MUC message since plugin XEP-0045 is not available." + ) + # Can't handle a MUC message without XEP-0045, let the flow continue + # normally + return True + + room_jid = feedback_jid = sender_jid.userhostJID() + + try: + room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid)) + except exceptions.NotFound: + log.warning( + f"Ignoring MUC message from a room that has not been joined:" + f" {room_jid}" + ) + # Whatever, let the flow continue + return True + + sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) + if sender_user is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's user" + f" wasn't found {sender_jid.resource}" + ) + # Whatever, let the flow continue + return True + + sender_user_jid = cast(Optional[jid.JID], sender_user.entity) + if sender_user_jid is None: + log.warning( + f"Ignoring MUC message from room {room_jid} since the sender's bare" + f" JID couldn't be found from its user information: {sender_user}" + ) + # Whatever, let the flow continue + return True + + sender_jid = sender_user_jid + else: + # I'm not sure why this check is required, this code is copied from XEP-0384 + if sender_jid.userhostJID() == client.jid.userhostJID(): + try: + feedback_jid = jid.JID(message_elt["to"]) + except KeyError: + feedback_jid = client.server_jid + else: + feedback_jid = sender_jid + + sender_bare_jid = sender_jid.userhost() + + openpgp_elt = cast(Optional[domish.Element], next( + message_elt.elements(NS_OX, "openpgp"), + None + )) + + if openpgp_elt is None: + # None of our business, let the flow continue + return True + + try: + payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( + client, + openpgp_elt, + "signcrypt", + jid.JID(sender_bare_jid) + ) + except Exception as e: + # TODO: More specific exception handling + log.warning(_("Can't decrypt message: {reason}\n{xml}").format( + reason=e, + xml=message_elt.toXml() + )) + client.feedback( + feedback_jid, + D_( + f"An OXIM message from {sender_jid.full()} can't be decrypted:" + f" {e}" + ), + { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } + ) + # No point in further processing this message + return False + + message_elt.children.remove(openpgp_elt) + + log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}") + + # Remove all body elements from the original element, since those act as + # fallbacks in case the encryption protocol is not supported + for child in message_elt.elements(): + if child.name == "body": + message_elt.children.remove(child) + + # Move all extension elements from the payload to the stanza root + # TODO: There should probably be explicitly forbidden elements here too, just as + # for XEP-0420 + for child in list(payload_elt.elements()): + # Remove the child from the content element + payload_elt.children.remove(child) + + # Add the child to the stanza + message_elt.addChild(child) + + # Mark the message as trusted or untrusted. Undecided counts as untrusted here. + trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level + if trust_level is TrustLevel.TRUSTED: + post_treat.addCallback(client.encryption.mark_as_trusted) + else: + post_treat.addCallback(client.encryption.mark_as_untrusted) + + # Mark the message as originally encrypted + post_treat.addCallback( + client.encryption.mark_as_encrypted, + namespace=NS_OX + ) + + # Message processed successfully, continue with the flow + return True + + async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: + """ + @param client: The client sending this message. + @param stanza: The stanza that is about to be sent. Can be modified. + @return: Whether the send message flow should continue or not. + """ + # OXIM only handles message stanzas + if stanza.name != "message": + return True + + # Get the intended recipient + recipient = stanza.getAttribute("to", None) + if recipient is None: + raise exceptions.InternalError( + f"Message without recipient encountered. Blocking further processing to" + f" avoid leaking plaintext data: {stanza.toXml()}" + ) + + # Parse the JID + recipient_bare_jid = jid.JID(recipient).userhostJID() + + # Check whether encryption with OXIM is requested + encryption = client.encryption.getSession(recipient_bare_jid) + + if encryption is None: + # Encryption is not requested for this recipient + return True + + if encryption["plugin"].namespace != NS_OX: + # Encryption is requested for this recipient, but not with OXIM + return True + + # All pre-checks done, we can start encrypting! + await self.__encrypt( + client, + stanza, + recipient_bare_jid, + stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT + ) + + # Add a store hint if this is a message stanza + self.__xep_0334.add_hint_elements(stanza, [ "store" ]) + + # Let the flow continue. + return True + + async def __encrypt( + self, + client: SatXMPPClient, + stanza: domish.Element, + recipient_jid: jid.JID, + is_muc_message: bool + ) -> None: + """ + @param client: The client. + @param stanza: The stanza, which is modified by this call. + @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID + but doesn't have to. + @param is_muc_message: Whether the stanza is a message stanza to a MUC room. + + @warning: The calling code MUST take care of adding the store message processing + hint to the stanza if applicable! This can be done before or after this call, + the order doesn't matter. + """ + + recipient_bare_jids: Set[jid.JID] + feedback_jid: jid.JID + + if is_muc_message: + if self.__xep_0045 is None: + raise exceptions.InternalError( + "Encryption of MUC message requested, but plugin XEP-0045 is not" + " available." + ) + + room_jid = feedback_jid = recipient_jid.userhostJID() + + recipient_bare_jids = self.__get_joined_muc_users( + client, + self.__xep_0045, + room_jid + ) + else: + recipient_bare_jids = { recipient_jid.userhostJID() } + feedback_jid = recipient_jid.userhostJID() + + log.debug( + f"Intercepting message that is to be encrypted by {NS_OX} for" + f" {recipient_bare_jids}" + ) + + signcrypt_elt, payload_elt = \ + self.__xep_0373.build_signcrypt_element(recipient_bare_jids) + + # Move elements from the stanza to the content element. + # TODO: There should probably be explicitly forbidden elements here too, just as + # for XEP-0420 + for child in list(stanza.elements()): + if child.name == "openpgp" and child.uri == NS_OX: + log.debug("not re-encrypting encrypted OX element") + continue + # Remove the child from the stanza + stanza.children.remove(child) + + # A namespace of ``None`` can be used on domish elements to inherit the + # namespace from the parent. When moving elements from the stanza root to + # the content element, however, we don't want elements to inherit the + # namespace of the content element. Thus, check for elements with ``None`` + # for their namespace and set the namespace to jabber:client, which is the + # namespace of the parent element. + if child.uri is None: + child.uri = C.NS_CLIENT + child.defaultUri = C.NS_CLIENT + + # Add the child with corrected namespaces to the content element + payload_elt.addChild(child) + + try: + openpgp_elt = await self.__xep_0373.build_openpgp_element( + client, + signcrypt_elt, + recipient_bare_jids + ) + except Exception as e: + msg = _( + # pylint: disable=consider-using-f-string + "Can't encrypt message for {entities}: {reason}".format( + entities=', '.join(jid.userhost() for jid in recipient_bare_jids), + reason=e + ) + ) + log.warning(msg) + client.feedback(feedback_jid, msg, { + C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR + }) + raise e + + stanza.addChild(openpgp_elt)