Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0461.py @ 4371:ed683d56b64c default tip
test (XEP-0461): some tests for XEP-0461:
rel 457
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 May 2025 00:34:01 +0200 |
parents | 0eaa50f21efb |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Extended Channel Search (XEP-0461) # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import TYPE_CHECKING, Any, Final, Self, cast from pydantic import BaseModel, Field from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend import G from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory.sqla import Storage from libervia.backend.memory.sqla_mapping import History from libervia.backend.models.core import MessageData from libervia.backend.models.types import JIDType from libervia.backend.tools.utils import ensure_deferred if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Message Replies", C.PI_IMPORT_NAME: "XEP-0461", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_DEPENDENCIES: [], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0461", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("Message replies support."), } NS_REPLY = "urn:xmpp:reply:0" host: "LiberviaBackend | None" = None class ReplyTo(BaseModel): to: JIDType | None = None id: str internal_uid: bool = Field( default=True, description="True if the id is a message UID, i.e. Libervia internal ID of " "message, otherwise it's an XMPP stanza ID or origin ID as specified in the XEP" ) async def ensure_xmpp_id(self, client: SatXMPPEntity) -> None: if not self.internal_uid: return history = await G.storage.get( client, History, History.uid, self.id, ) if history.uid != self.id: raise exceptions.InternalError( f"Inconsistency between given history UID {history.uid!r} and ReplyTo " f"id ({self.id!r})." ) if history.type == C.MESS_TYPE_GROUPCHAT: self.id = history.stanza_id else: self.id = history.origin_id assert self.id self.internal_uid = False @classmethod def from_element(cls, reply_elt: domish.Element) -> Self: """Create a ReplyTo instance from a <reply> element or its parent. @param reply_elt: The <reply> element or a parent element. @return: ReplyTo instance. @raise exceptions.NotFound: If the <reply> element is not found. """ if reply_elt.uri != NS_REPLY or reply_elt.name != "reply": child_file_metadata_elt = next(reply_elt.elements(NS_REPLY, "reply"), None) if child_file_metadata_elt is None: raise exceptions.NotFound("<reply> element not found") else: reply_elt = child_file_metadata_elt try: message_id = reply_elt["id"] except KeyError: raise exceptions.DataError('Missing "id" attribute.') return cls(to=reply_elt.getAttribute("to"), id=message_id, internal_uid=False) def to_element(self) -> domish.Element: """Build the <reply> element from this instance's data. @return: <reply> element. """ if self.internal_uid: raise exceptions.DataError( '"id" must be converted to XMPP id before calling "to_element". Please ' '"use ensure_xmpp_id" first.') reply_elt = domish.Element((NS_REPLY, "reply"), attribs={"id": self.id}) if self.to is not None: reply_elt["to"] = self.to.full() return reply_elt class XEP_0461: """Implementation of XEP-0461 Message Replies.""" namespace: Final[str] = NS_REPLY def __init__(self, host: Any): log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") self.host = host host.register_namespace("reply", NS_REPLY) host.trigger.add("message_received", self._message_received_trigger) host.trigger.add("sendMessage", self._sendMessage_trigger) def get_handler(self, client): return XEP_0461_handler() async def _parse_reply( self, client: SatXMPPEntity, message_elt: domish.Element, mess_data: MessageData ) -> MessageData: try: reply_to = ReplyTo.from_element(message_elt) except exceptions.NotFound: pass else: try: thread_id = mess_data["extra"]["thread"] except KeyError: pass else: message_type = message_elt.getAttribute("type") storage = G.storage history = await storage.get_history_from_xmpp_id( client, reply_to.id, message_type ) if history is None: log.warning(f"Received a <reply> to an unknown message: {reply_to!r}") else: if not history.thread: await storage.add_thread( history.uid, thread_id, None, is_retroactive=True ) # We can to use internal UID to make frontends life easier. reply_to.id = history.uid reply_to.internal_uid = True # FIXME: We use mode=json to have a serialisable data in storage. When # Pydantic models are fully integrated with MessageData and storage, the # ReplyTo model should be used directly here instead. mess_data["extra"]["reply"] = reply_to.model_dump(mode="json") return mess_data def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> bool: post_treat.addCallback( lambda mess_data: defer.ensureDeferred( self._parse_reply(client, message_elt, mess_data) ) ) return True @ensure_deferred async def _add_reply_to_elt(self, mess_data: MessageData, client: SatXMPPEntity) -> MessageData: try: # FIXME: Once MessageData is fully moved to Pydantic, we should have directly # the ReplyTo instance here. reply_to = ReplyTo(**mess_data["extra"]["reply"]) except KeyError: log.error('"_add_reply_to_elt" should not be called when there is no reply.') else: await reply_to.ensure_xmpp_id(client) message_elt: domish.Element = mess_data["xml"] message_elt.addChild(reply_to.to_element()) return mess_data def _sendMessage_trigger( self, client: SatXMPPEntity, mess_data: MessageData, pre_xml_treatments: defer.Deferred, post_xml_treatments: defer.Deferred, ) -> bool: try: extra = mess_data["extra"] reply_to = ReplyTo(**extra["reply"]) except KeyError: pass else: if "thread" not in extra: # FIXME: once MessageData is fully moved to Pydantic, ReplyTo should be # used directly here instead of a dump. extra["reply"] = reply_to.model_dump(mode="json") # We use parent message ID as thread ID. extra["thread"] = reply_to.id post_xml_treatments.addCallback(self._add_reply_to_elt, client) return True @implementer(iwokkel.IDisco) class XEP_0461_handler(XMPPHandler): def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: return [disco.DiscoFeature(NS_REPLY)] def getDiscoItems( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoItems]: return []