view libervia/backend/plugins/plugin_xep_0461.py @ 4370:0eaa50f21efb

plugin XEP-0461: Message Replies implementation: Implement message replies. Thread ID are always added when a reply is initiated from Libervia, so a thread can continue the reply. rel 457
author Goffi <goffi@goffi.org>
date Tue, 06 May 2025 00:34:01 +0200
parents
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Extended Channel Search (XEP-0461)
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING, Any, Final, Self, cast
from pydantic import BaseModel, Field
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer
from libervia.backend import G
from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.sqla import Storage
from libervia.backend.memory.sqla_mapping import History
from libervia.backend.models.core import MessageData
from libervia.backend.models.types import JIDType
from libervia.backend.tools.utils import ensure_deferred

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Message Replies",
    C.PI_IMPORT_NAME: "XEP-0461",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_DEPENDENCIES: [],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0461",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("Message replies support."),
}

NS_REPLY = "urn:xmpp:reply:0"
host: "LiberviaBackend | None" = None


class ReplyTo(BaseModel):
    to: JIDType | None = None
    id: str
    internal_uid: bool = Field(
        default=True,
        description="True if the id is a message UID, i.e. Libervia internal ID of "
        "message, otherwise it's an XMPP stanza ID or origin ID as specified in the XEP"
    )

    async def ensure_xmpp_id(self, client: SatXMPPEntity) -> None:
        if not self.internal_uid:
            return
        history = await G.storage.get(
            client,
            History,
            History.uid,
            self.id,
        )

        if history.uid != self.id:
            raise exceptions.InternalError(
                f"Inconsistency between given history UID {history.uid!r} and ReplyTo "
                f"id ({self.id!r})."
            )
        if history.type == C.MESS_TYPE_GROUPCHAT:
            self.id = history.stanza_id
        else:
            self.id = history.origin_id
        assert self.id
        self.internal_uid = False

    @classmethod
    def from_element(cls, reply_elt: domish.Element) -> Self:
        """Create a ReplyTo instance from a <reply> element or its parent.

        @param reply_elt: The <reply> element or a parent element.
        @return: ReplyTo instance.
        @raise exceptions.NotFound: If the <reply> element is not found.
        """
        if reply_elt.uri != NS_REPLY or reply_elt.name != "reply":
            child_file_metadata_elt = next(reply_elt.elements(NS_REPLY, "reply"), None)
            if child_file_metadata_elt is None:
                raise exceptions.NotFound("<reply> element not found")
            else:
                reply_elt = child_file_metadata_elt

        try:
            message_id = reply_elt["id"]
        except KeyError:
            raise exceptions.DataError('Missing "id" attribute.')

        return cls(to=reply_elt.getAttribute("to"), id=message_id, internal_uid=False)

    def to_element(self) -> domish.Element:
        """Build the <reply> element from this instance's data.

        @return: <reply> element.
        """
        if self.internal_uid:
            raise exceptions.DataError(
                '"id" must be converted to XMPP id before calling "to_element". Please '
                '"use ensure_xmpp_id" first.')
        reply_elt = domish.Element((NS_REPLY, "reply"), attribs={"id": self.id})
        if self.to is not None:
            reply_elt["to"] = self.to.full()
        return reply_elt


class XEP_0461:
    """Implementation of XEP-0461 Message Replies."""

    namespace: Final[str] = NS_REPLY

    def __init__(self, host: Any):
        log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.")
        self.host = host
        host.register_namespace("reply", NS_REPLY)
        host.trigger.add("message_received", self._message_received_trigger)
        host.trigger.add("sendMessage", self._sendMessage_trigger)

    def get_handler(self, client):
        return XEP_0461_handler()

    async def _parse_reply(
        self, client: SatXMPPEntity, message_elt: domish.Element, mess_data: MessageData
    ) -> MessageData:
        try:
            reply_to = ReplyTo.from_element(message_elt)
        except exceptions.NotFound:
            pass
        else:
            try:
                thread_id = mess_data["extra"]["thread"]
            except KeyError:
                pass
            else:
                message_type = message_elt.getAttribute("type")
                storage = G.storage
                history = await storage.get_history_from_xmpp_id(
                    client, reply_to.id, message_type
                )
                if history is None:
                    log.warning(f"Received a <reply> to an unknown message: {reply_to!r}")
                else:
                    if not history.thread:
                        await storage.add_thread(
                            history.uid, thread_id, None, is_retroactive=True
                        )
                    # We can to use internal UID to make frontends life easier.
                    reply_to.id = history.uid
                    reply_to.internal_uid = True
            # FIXME: We use mode=json to have a serialisable data in storage. When
            #  Pydantic models are fully integrated with MessageData and storage, the
            #  ReplyTo model should be used directly here instead.
            mess_data["extra"]["reply"] = reply_to.model_dump(mode="json")
        return mess_data

    def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> bool:
        post_treat.addCallback(
            lambda mess_data: defer.ensureDeferred(
                self._parse_reply(client, message_elt, mess_data)
            )
        )
        return True

    @ensure_deferred
    async def _add_reply_to_elt(self, mess_data: MessageData, client: SatXMPPEntity) -> MessageData:
        try:
            # FIXME: Once MessageData is fully moved to Pydantic, we should have directly
            #   the ReplyTo instance here.
            reply_to = ReplyTo(**mess_data["extra"]["reply"])
        except KeyError:
            log.error('"_add_reply_to_elt" should not be called when there is no reply.')
        else:
            await reply_to.ensure_xmpp_id(client)
            message_elt: domish.Element = mess_data["xml"]
            message_elt.addChild(reply_to.to_element())

        return mess_data

    def _sendMessage_trigger(
        self,
        client: SatXMPPEntity,
        mess_data: MessageData,
        pre_xml_treatments: defer.Deferred,
        post_xml_treatments: defer.Deferred,
    ) -> bool:
        try:
            extra = mess_data["extra"]
            reply_to = ReplyTo(**extra["reply"])
        except KeyError:
            pass
        else:
            if "thread" not in extra:
                # FIXME: once MessageData is fully moved to Pydantic, ReplyTo should be
                #   used directly here instead of a dump.
                extra["reply"] = reply_to.model_dump(mode="json")
                # We use parent message ID as thread ID.
                extra["thread"] = reply_to.id

            post_xml_treatments.addCallback(self._add_reply_to_elt, client)
        return True


@implementer(iwokkel.IDisco)
class XEP_0461_handler(XMPPHandler):

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        return [disco.DiscoFeature(NS_REPLY)]

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItems]:
        return []