Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0374.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 7c5654c54fed |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for OpenPGP for XMPP Instant Messaging # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Dict, Optional, Set, cast from typing_extensions import Final from wokkel import muc # type: ignore[import] from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _, D_ from libervia.backend.core.log import getLogger, Logger from libervia.backend.core.main import LiberviaBackend from libervia.backend.core.xmpp import SatXMPPClient from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 from libervia.backend.plugins.plugin_xep_0334 import XEP_0334 from libervia.backend.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel from libervia.backend.tools import xml_tools from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish __all__ = [ # pylint: disable=unused-variable "PLUGIN_INFO", "XEP_0374", "NS_OXIM" ] log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] PLUGIN_INFO = { C.PI_NAME: "OXIM", C.PI_IMPORT_NAME: "XEP-0374", C.PI_TYPE: "SEC", C.PI_PROTOCOLS: [ "XEP-0374" ], C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ], C.PI_RECOMMENDATIONS: [ "XEP-0045" ], C.PI_MAIN: "XEP_0374", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Implementation of OXIM"""), } # The disco feature NS_OXIM: Final = "urn:xmpp:openpgp:im:0" class XEP_0374: """ Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0`` namespace. MUC messages are supported next to one to one messages. For trust management, the two trust models "BTBV" and "manual" are supported. """ def __init__(self, sat: LiberviaBackend) -> None: """ @param sat: The SAT instance. """ self.__sat = sat # Plugins self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) # Triggers sat.trigger.add( "message_received", self.__message_received_trigger, priority=100050 ) sat.trigger.add("send", self.__send_trigger, priority=0) # Register the encryption plugin sat.register_encryption_plugin(self, "OXIM", NS_OX, 102) async def get_trust_ui( # pylint: disable=invalid-name self, client: SatXMPPClient, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client. @param entity: The entity whose device trust levels to manage. @return: An XMLUI instance which opens a form to manage the trust level of all devices belonging to the entity. """ return await self.__xep_0373.get_trust_ui(client, entity) @staticmethod def __get_joined_muc_users( client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID ) -> Set[jid.JID]: """ @param client: The client. @param xep_0045: A MUC plugin instance. @param room_jid: The room JID. @return: A set containing the bare JIDs of the MUC participants. @raise InternalError: if the MUC is not joined or the entity information of a participant isn't available. """ bare_jids: Set[jid.JID] = set() try: room = cast(muc.Room, xep_0045.get_room(client, room_jid)) except exceptions.NotFound as e: raise exceptions.InternalError( "Participant list of unjoined MUC requested." ) from e for user in cast(Dict[str, muc.User], room.roster).values(): entity = cast(Optional[SatXMPPEntity], user.entity) if entity is None: raise exceptions.InternalError( f"Participant list of MUC requested, but the entity information of" f" the participant {user} is not available." ) bare_jids.add(entity.jid.userhostJID()) return bare_jids async def __message_received_trigger( self, client: SatXMPPClient, message_elt: domish.Element, post_treat: defer.Deferred ) -> bool: """ @param client: The client which received the message. @param message_elt: The message element. Can be modified. @param post_treat: A deferred which evaluates to a :class:`MessageData` once the message has fully progressed through the message receiving flow. Can be used to apply treatments to the fully processed message, like marking it as encrypted. @return: Whether to continue the message received flow. """ sender_jid = jid.JID(message_elt["from"]) feedback_jid: jid.JID message_type = message_elt.getAttribute("type", "unknown") is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT if is_muc_message: if self.__xep_0045 is None: log.warning( "Ignoring MUC message since plugin XEP-0045 is not available." ) # Can't handle a MUC message without XEP-0045, let the flow continue # normally return True room_jid = feedback_jid = sender_jid.userhostJID() try: room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid)) except exceptions.NotFound: log.warning( f"Ignoring MUC message from a room that has not been joined:" f" {room_jid}" ) # Whatever, let the flow continue return True sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) if sender_user is None: log.warning( f"Ignoring MUC message from room {room_jid} since the sender's user" f" wasn't found {sender_jid.resource}" ) # Whatever, let the flow continue return True sender_user_jid = cast(Optional[jid.JID], sender_user.entity) if sender_user_jid is None: log.warning( f"Ignoring MUC message from room {room_jid} since the sender's bare" f" JID couldn't be found from its user information: {sender_user}" ) # Whatever, let the flow continue return True sender_jid = sender_user_jid else: # I'm not sure why this check is required, this code is copied from XEP-0384 if sender_jid.userhostJID() == client.jid.userhostJID(): try: feedback_jid = jid.JID(message_elt["to"]) except KeyError: feedback_jid = client.server_jid else: feedback_jid = sender_jid sender_bare_jid = sender_jid.userhost() openpgp_elt = cast(Optional[domish.Element], next( message_elt.elements(NS_OX, "openpgp"), None )) if openpgp_elt is None: # None of our business, let the flow continue return True try: payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( client, openpgp_elt, "signcrypt", jid.JID(sender_bare_jid) ) except Exception as e: # TODO: More specific exception handling log.warning(_("Can't decrypt message: {reason}\n{xml}").format( reason=e, xml=message_elt.toXml() )) client.feedback( feedback_jid, D_( f"An OXIM message from {sender_jid.full()} can't be decrypted:" f" {e}" ), { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } ) # No point in further processing this message return False message_elt.children.remove(openpgp_elt) log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}") # Remove all body elements from the original element, since those act as # fallbacks in case the encryption protocol is not supported for child in message_elt.elements(): if child.name == "body": message_elt.children.remove(child) # Move all extension elements from the payload to the stanza root # TODO: There should probably be explicitly forbidden elements here too, just as # for XEP-0420 for child in list(payload_elt.elements()): # Remove the child from the content element payload_elt.children.remove(child) # Add the child to the stanza message_elt.addChild(child) # Mark the message as trusted or untrusted. Undecided counts as untrusted here. trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level if trust_level is TrustLevel.TRUSTED: post_treat.addCallback(client.encryption.mark_as_trusted) else: post_treat.addCallback(client.encryption.mark_as_untrusted) # Mark the message as originally encrypted post_treat.addCallback( client.encryption.mark_as_encrypted, namespace=NS_OX ) # Message processed successfully, continue with the flow return True async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: """ @param client: The client sending this message. @param stanza: The stanza that is about to be sent. Can be modified. @return: Whether the send message flow should continue or not. """ # OXIM only handles message stanzas if stanza.name != "message": return True # Get the intended recipient recipient = stanza.getAttribute("to", None) if recipient is None: raise exceptions.InternalError( f"Message without recipient encountered. Blocking further processing to" f" avoid leaking plaintext data: {stanza.toXml()}" ) # Parse the JID recipient_bare_jid = jid.JID(recipient).userhostJID() # Check whether encryption with OXIM is requested encryption = client.encryption.getSession(recipient_bare_jid) if encryption is None: # Encryption is not requested for this recipient return True if encryption["plugin"].namespace != NS_OX: # Encryption is requested for this recipient, but not with OXIM return True # All pre-checks done, we can start encrypting! await self.__encrypt( client, stanza, recipient_bare_jid, stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT ) # Add a store hint if this is a message stanza self.__xep_0334.add_hint_elements(stanza, [ "store" ]) # Let the flow continue. return True async def __encrypt( self, client: SatXMPPClient, stanza: domish.Element, recipient_jid: jid.JID, is_muc_message: bool ) -> None: """ @param client: The client. @param stanza: The stanza, which is modified by this call. @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID but doesn't have to. @param is_muc_message: Whether the stanza is a message stanza to a MUC room. @warning: The calling code MUST take care of adding the store message processing hint to the stanza if applicable! This can be done before or after this call, the order doesn't matter. """ recipient_bare_jids: Set[jid.JID] feedback_jid: jid.JID if is_muc_message: if self.__xep_0045 is None: raise exceptions.InternalError( "Encryption of MUC message requested, but plugin XEP-0045 is not" " available." ) room_jid = feedback_jid = recipient_jid.userhostJID() recipient_bare_jids = self.__get_joined_muc_users( client, self.__xep_0045, room_jid ) else: recipient_bare_jids = { recipient_jid.userhostJID() } feedback_jid = recipient_jid.userhostJID() log.debug( f"Intercepting message that is to be encrypted by {NS_OX} for" f" {recipient_bare_jids}" ) signcrypt_elt, payload_elt = \ self.__xep_0373.build_signcrypt_element(recipient_bare_jids) # Move elements from the stanza to the content element. # TODO: There should probably be explicitly forbidden elements here too, just as # for XEP-0420 for child in list(stanza.elements()): if child.name == "openpgp" and child.uri == NS_OX: log.debug("not re-encrypting encrypted OX element") continue # Remove the child from the stanza stanza.children.remove(child) # A namespace of ``None`` can be used on domish elements to inherit the # namespace from the parent. When moving elements from the stanza root to # the content element, however, we don't want elements to inherit the # namespace of the content element. Thus, check for elements with ``None`` # for their namespace and set the namespace to jabber:client, which is the # namespace of the parent element. if child.uri is None: child.uri = C.NS_CLIENT child.defaultUri = C.NS_CLIENT # Add the child with corrected namespaces to the content element payload_elt.addChild(child) try: openpgp_elt = await self.__xep_0373.build_openpgp_element( client, signcrypt_elt, recipient_bare_jids ) except Exception as e: msg = _( # pylint: disable=consider-using-f-string "Can't encrypt message for {entities}: {reason}".format( entities=', '.join(jid.userhost() for jid in recipient_bare_jids), reason=e ) ) log.warning(msg) client.feedback(feedback_jid, msg, { C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR }) raise e stanza.addChild(openpgp_elt)