view libervia/backend/plugins/plugin_xep_0424.py @ 4184:50c919dfe61b

plugin XEP-0050: small code quality improvements + add `C.ENTITY_ADMINS` magic key to allow administrators profiles
author Goffi <goffi@goffi.org>
date Sun, 10 Dec 2023 15:08:08 +0100
parents a1f7040b5a15
children 7eda7cb8a15c
line wrap: on
line source

#!/usr/bin/env python3

# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
from typing import Any, Dict

from sqlalchemy.orm.attributes import flag_modified
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, xmlstream
from twisted.words.xish import domish
from wokkel import disco
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import MessageData, SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.sqla_mapping import History
from libervia.backend.tools.common import data_format

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Message Retraction",
    C.PI_IMPORT_NAME: "XEP-0424",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0424"],
    C.PI_DEPENDENCIES: ["XEP-0334", "XEP-0428"],
    C.PI_MAIN: "XEP_0424",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation Message Retraction"""),
}

NS_MESSAGE_RETRACT = "urn:xmpp:message-retract:1"

CATEGORY = "Privacy"
NAME = "retract_history"
LABEL = D_("Keep History of Retracted Messages")
PARAMS = """
    <params>
    <individual>
    <category name="{category_name}">
        <param name="{name}" label="{label}" type="bool" value="false" />
    </category>
    </individual>
    </params>
    """.format(
    category_name=CATEGORY, name=NAME, label=_(LABEL)
)


class XEP_0424:

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.memory.update_params(PARAMS)
        self._h = host.plugins["XEP-0334"]
        host.register_namespace("message-retract", NS_MESSAGE_RETRACT)
        host.trigger.add("message_received", self._message_received_trigger, 100)
        host.bridge.add_method(
            "message_retract",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._retract,
            async_=True,
        )

    def get_handler(self, __):
        return XEP_0424_handler()

    def _retract(self, message_id: str, profile: str) -> None:
        client = self.host.get_client(profile)
        return defer.ensureDeferred(
            self.retract(client, message_id)
        )

    def retract_by_origin_id(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        origin_id: str
    ) -> None:
        """Send a message retraction using origin-id

        [retract] should be prefered: internal ID should be used as it is independant of
        XEPs changes. However, in some case messages may not be stored in database
        (notably for some components), and then this method can be used
        @param origin_id: origin-id as specified in XEP-0359
        """
        message_elt = domish.Element((None, "message"))
        message_elt["from"] = client.jid.full()
        message_elt["to"] = peer_jid.full()
        retract_elt = message_elt.addElement((NS_MESSAGE_RETRACT, "retract"))
        retract_elt["id"] = origin_id
        self.host.plugins["XEP-0428"].add_fallback_elt(
            message_elt,
            "[A message retraction has been requested, but your client doesn't support "
            "it]"
        )
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)

    async def retract_by_history(
        self,
        client: SatXMPPEntity,
        history: History
    ) -> None:
        """Send a message retraction using History instance

        This method is to use instead of [retract] when the history instance is already
        retrieved. Note that the instance must have messages and subjets loaded
        @param history: history instance of the message to retract
        """
        try:
            origin_id = history.origin_id
        except KeyError:
            raise exceptions.FeatureNotFound(
                f"message to retract doesn't have the necessary origin-id, the sending "
                "client is probably not supporting message retraction."
            )
        else:
            if history.type == C.MESS_TYPE_GROUPCHAT:
                is_group_chat = True
                peer_jid = history.dest_jid
            else:
                is_group_chat = False
                peer_jid = jid.JID(history.dest)
            self.retract_by_origin_id(client, peer_jid, origin_id)
            if not is_group_chat:
                # retraction will happen when <retract> message will be received in the
                # chat.
                await self.retract_db_history(client, history)

    async def retract(
        self,
        client: SatXMPPEntity,
        message_id: str,
    ) -> None:
        """Send a message retraction request

        @param message_id: ID of the message
            This ID is the Libervia internal ID of the message. It will be retrieve from
            database to find the ID used by XMPP (i.e. XEP-0359's "origin ID"). If the
            message is not found in database, an exception will be raised
        """
        if not message_id:
            raise ValueError("message_id can't be empty")
        history = await self.host.memory.storage.get(
            client, History, History.uid, message_id,
            joined_loads=[History.messages, History.subjects, History.thread]
        )
        if history is None:
            raise exceptions.NotFound(
                f"message to retract not found in database ({message_id})"
            )
        await self.retract_by_history(client, history)

    async def retract_db_history(self, client, history: History) -> None:
        """Mark an history instance in database as retracted

        @param history: history instance
            "messages" and "subjects" must be loaded too
        """
        # FIXME: should we keep history? This is useful to check why a message has been
        #   retracted, but if may be bad if the user think it's really deleted
        flag_modified(history, "extra")
        keep_history = await self.host.memory.param_get_a_async(
            NAME, CATEGORY, profile_key=client.profile
        )
        old_version: Dict[str, Any] = {
            "timestamp": time.time()
        }
        if keep_history:
            old_version.update({
                "messages": [m.serialise() for m in history.messages],
                "subjects": [s.serialise() for s in history.subjects],
            })

        history.extra.setdefault("old_versions", []).append(old_version)

        history.messages.clear()
        history.subjects.clear()
        history.extra["retracted"] = True
        # we remove editions history to keep no trace of old messages
        if "editions" in history.extra:
            del history.extra["editions"]
        if "attachments" in history.extra:
            del history.extra["attachments"]
        await self.host.memory.storage.add(history)

        retract_data = MessageData(history.serialise())
        self.host.bridge.message_update(
            history.uid,
            C.MESS_UPDATE_RETRACT,
            data_format.serialise(retract_data),
            client.profile,
        )

    async def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        retract_elt = next(message_elt.elements(NS_MESSAGE_RETRACT, "retract"), None)
        if not retract_elt:
            return True
        try:
            if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT:
                col_id = History.stanza_id
            else:
                col_id = History.origin_id
            history = await self.host.memory.storage.get(
                client, History, col_id, retract_elt["id"],
                joined_loads=[History.messages, History.subjects, History.thread]
            )
        except KeyError:
            log.warning(f"invalid retract element, missing id: {retract_elt.toXml()}")
            return False
        from_jid = jid.JID(message_elt["from"])

        if (
            history is not None
            and history.source_jid.userhostJID() != from_jid.userhostJID()
        ):
            log.warning(
                f"Received message retraction from {from_jid.full()}, but the message to "
                f"retract is from {history.source_jid.full()}. This maybe a hack "
                f"attempt.\n{message_elt.toXml()}"
            )
            return False

        if not await self.host.trigger.async_point(
            "XEP-0424_retract_received", client, message_elt, retract_elt, history
        ):
            return False

        if history is None:
            # we check history after the trigger because we may be in a component which
            # doesn't store messages in database.
            log.warning(
                f"No message found with given origin-id: {message_elt.toXml()}"
            )
            return False
        log.info(f"[{client.profile}] retracting message {history.uid!r}")
        await self.retract_db_history(client, history)
        return False


@implementer(disco.IDisco)
class XEP_0424_handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, __, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_MESSAGE_RETRACT)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []