# HG changeset patch # User Goffi # Date 1700662204 -3600 # Node ID 04cdcb3fd7136d59805b9d1777a75eab68cdccfe # Parent 2729d424dee78c2ff4d7efb0ce5250dea008cd27 plugin XEP-0444: complete implementation: "Message Reactions" implementation is now working and updating history. The `message_reactions_set` bridge method can be used by frontend to `replace`, `add`, `remove` or `toggle` reactions. History is properly updated, and signal are sent when necessary. diff -r 2729d424dee7 -r 04cdcb3fd713 libervia/backend/plugins/plugin_xep_0444.py --- a/libervia/backend/plugins/plugin_xep_0444.py Wed Nov 22 15:05:41 2023 +0100 +++ b/libervia/backend/plugins/plugin_xep_0444.py Wed Nov 22 15:10:04 2023 +0100 @@ -16,21 +16,25 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import List, Iterable -from copy import deepcopy +from typing import Iterable, List +from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish -from twisted.internet import defer from wokkel import disco, iwokkel from zope.interface import implementer +import emoji +from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger -from libervia.backend.core import exceptions -from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.memory.sqla_mapping import History +from libervia.backend.models.core import MessageReactionData +from libervia.backend.tools.utils import aio +from libervia.backend.memory.sqla import select +from sqlalchemy.orm.attributes import flag_modified log = getLogger(__name__) @@ -40,7 +44,7 @@ C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0444"], - C.PI_DEPENDENCIES: ["XEP-0334"], + C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0334"], C.PI_MAIN: "XEP_0444", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Message Reactions implementation"""), @@ -50,16 +54,18 @@ class XEP_0444: - + # TODO: implement and use occupant-ID (XEP-0421), and check sender (see + # https://xmpp.org/extensions/xep-0444.html#acceptable-reactions). def __init__(self, host): log.info(_("Message Reactions initialization")) host.register_namespace("reactions", NS_REACTIONS) self.host = host + self._m = host.plugins["XEP-0045"] self._h = host.plugins["XEP-0334"] host.bridge.add_method( "message_reactions_set", ".plugin", - in_sign="ssas", + in_sign="sasss", out_sign="", method=self._reactions_set, async_=True, @@ -69,26 +75,111 @@ def get_handler(self, client): return XEP_0444_Handler() + @aio + async def get_history_from_reaction_id( + self, client: SatXMPPEntity, message_elt: domish.Element, reaction_id: str + ) -> History | None: + """Retrieves history rows that match a specific reaction_id. + + The retrieval criteria vary based on the message type, according to XEP-0444. + + @param message_elt: The message element, used to determine the type of message. + @param reaction_id: The reaction ID to match in the history. + """ + profile_id = self.host.memory.storage.profiles[client.profile] + async with self.host.memory.storage.session() as session: + if message_elt.type == C.MESS_TYPE_GROUPCHAT: + query = select(History).where( + History.profile_id == profile_id, History.stanza_id == reaction_id + ) + else: + query = select(History).where( + History.profile_id == profile_id, History.origin_id == reaction_id + ) + + result = await session.execute(query) + history = result.scalars().first() + + return history + async def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, - post_treat: defer.Deferred + post_treat: defer.Deferred, ) -> bool: + reactions_elt = next(message_elt.elements(NS_REACTIONS, "reactions"), None) + if reactions_elt is not None: + reaction_id = reactions_elt.getAttribute("id") + history = await self.get_history_from_reaction_id( + client, message_elt, reaction_id + ) + if history is None: + log.warning( + f"Can't find matching message for reaction: {reactions_elt.toXml()}" + ) + else: + if not reaction_id: + log.warning(f"Invalid reaction: {reactions_elt.toXml()}") + return False + from_jid = jid.JID(message_elt["from"]) + reactions = set() + for reaction_elt in reactions_elt.elements("reaction"): + reaction = str(reaction_elt) + if not emoji.is_emoji(reaction): + log.warning(f"ignoring invalide reaction: {reaction_elt.toXml()}") + continue + reactions.add(reaction) + await self.update_history_reactions(client, history, from_jid, reactions) + + return False + return True - def _reactions_set(self, message_id: str, profile: str, reactions: List[str]) -> None: + def _reactions_set( + self, + message_id: str, + reactions: List[str], + update_type: str = "replace", + profile: str = C.PROF_KEY_NONE, + ) -> defer.Deferred[None]: client = self.host.get_client(profile) return defer.ensureDeferred( - self.set_reactions(client, message_id) + self.set_reactions(client, message_id, reactions, update_type) ) + async def get_history_from_uid( + self, client: SatXMPPEntity, message_id: str + ) -> History: + """Retrieve the chat history associated with a given message ID. + + @param message_id: The Libervia specific identifier of the message + + @return: An instance of History containing the chat history, messages, and + subjects related to the specified message ID. + + @raises exceptions.NotFound: The history corresponding to the given message ID is + not found in the database. + """ + history = await self.host.memory.storage.get( + client, + History, + History.uid, + message_id, + ) + if history is None: + raise exceptions.NotFound( + f"message to retract not found in database ({message_id})" + ) + return history + def send_reactions( self, client: SatXMPPEntity, dest_jid: jid.JID, + message_type: str, message_id: str, - reactions: Iterable[str] + reactions: Iterable[str], ) -> None: """Send the stanza containing the reactions @@ -99,6 +190,7 @@ message_elt = domish.Element((None, "message")) message_elt["from"] = client.jid.full() message_elt["to"] = dest_jid.full() + message_elt["type"] = message_type reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions")) reactions_elt["id"] = message_id for r in set(reactions): @@ -106,64 +198,163 @@ self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) client.send(message_elt) - async def add_reactions_to_history( + def convert_to_replace_update( self, + current_reactions: set[str], + reactions_new: Iterable[str], + update_type: str, + ) -> set[str]: + """Convert the given update to a replace update. + + @param current_reactions: reaction of reacting JID before update + @param reactions_new: New reactions to be updated. + @param update_type: Original type of update (add, remove, toggle, replace). + @return: reactions for a replace operation. + + @raise ValueError: invalid ``update_type``. + """ + new_reactions_set = set(reactions_new) + + if update_type == "replace": + return new_reactions_set + + if update_type == "add": + replace_reactions = current_reactions | new_reactions_set + elif update_type == "remove": + replace_reactions = current_reactions - new_reactions_set + elif update_type == "toggle": + replace_reactions = current_reactions ^ new_reactions_set + else: + raise ValueError(f"Invalid update type: {update_type!r}") + + return replace_reactions + + async def update_history_reactions( + self, + client: SatXMPPEntity, history: History, - from_jid: jid.JID, - reactions: Iterable[str] - ) -> None: - """Update History instance with given reactions + reacting_jid: jid.JID, + reactions_new: Iterable[str], + update_type: str = "replace", + store: bool = True, + ) -> set[str]: + """Update reactions in History instance and optionally store and signal it. - @param history: storage History instance - will be updated in DB - "summary" field of history.extra["reactions"] will also be updated - @param from_jid: author of the reactions - @param reactions: list of reactions + @param history: storage History instance to be updated + @param reacting_jid: author of the reactions + @param reactions_new: new reactions to update + @param update_type: Original type of update (add, remove, toggle, replace). + @param store: if True, update history in storage, and send a `message_update` + signal with the new reactions. + @return: set of reactions for this JID for a "replace" update """ - history.extra = deepcopy(history.extra) if history.extra else {} - h_reactions = history.extra.setdefault("reactions", {}) - # reactions mapped by originating JID - by_jid = h_reactions.setdefault("by_jid", {}) - # reactions are sorted to in summary to keep a consistent order - h_reactions["by_jid"][from_jid.userhost()] = sorted(list(set(reactions))) - h_reactions["summary"] = sorted(list(set().union(*by_jid.values()))) - await self.host.memory.storage.session_add(history) + # FIXME: handle race conditions + if history.type == C.MESS_TYPE_GROUPCHAT: + entity_jid_s = reacting_jid.full() + else: + entity_jid_s = reacting_jid.userhost() + if history.extra is None: + history.extra = {} + extra = history.extra + reactions = extra.get("reactions", {}) + + current_reactions = { + reaction for reaction, jids in reactions.items() if entity_jid_s in jids + } + reactions_replace_set = self.convert_to_replace_update( + current_reactions, reactions_new, update_type + ) + + for reaction in current_reactions - reactions_replace_set: + reaction_jids = reactions[reaction] + reaction_jids.remove(entity_jid_s) + if not reaction_jids: + del reactions[reaction] + + for reaction in reactions_replace_set - current_reactions: + reactions.setdefault(reaction, []).append(entity_jid_s) + + # we want to have a constant order in reactions + extra["reactions"] = dict(sorted(reactions.items())) + + if store: + # FIXME: this is not a clean way to flag "extra" as modified, but a deepcopy + # is for some reason not working here. + flag_modified(history, "extra") + await self.host.memory.storage.add(history) + # we send the signal for frontends update + data = MessageReactionData(reactions=extra["reactions"]) + self.host.bridge.message_update( + history.uid, + C.MESS_UPDATE_REACTION, + data.model_dump_json(), + client.profile, + ) + + return reactions_replace_set async def set_reactions( self, client: SatXMPPEntity, message_id: str, - reactions: Iterable[str] + reactions: Iterable[str], + update_type: str = "replace", ) -> None: """Set and replace reactions to a message @param message_id: internal ID of the message @param rections: lsit of emojis to used to react to the message use empty list to remove all reactions + @param update_type: how to use the reaction to make the update, can be "replace", + "add", "remove" or "toggle". """ if not message_id: raise ValueError("message_id can't be empty") - history = await self.host.memory.storage.get( - client, History, History.uid, message_id, - joined_loads=[History.messages, History.subjects] - ) - if history is None: - raise exceptions.NotFound( - f"message to retract not found in database ({message_id})" - ) - mess_id = history.origin_id or history.stanza_id - if not mess_id: + + history = await self.get_history_from_uid(client, message_id) + if history.origin_id is not None: + mess_id = history.origin_id + else: + mess_id = history.stanza_id + + if mess_id is None: raise exceptions.DataError( "target message has neither origin-id nor message-id, we can't send a " "reaction" ) - await self.add_reactions_to_history(history, client.jid, reactions) - self.send_reactions(client, history.dest_jid, mess_id, reactions) + + if history.source == client.jid.userhost(): + dest_jid = history.dest_jid + else: + dest_jid = history.source_jid + + if history.type == C.MESS_TYPE_GROUPCHAT: + # the reaction is for the chat room itself + dest_jid = dest_jid.userhostJID() + reacting_jid = self._m.get_room_user_jid(client, dest_jid) + # we don't store reactions for MUC, at this will be done when we receive back + # the reaction + store = False + else: + # we use bare JIDs for one2one message, except if the recipient is from a MUC + if self._m.is_room(client, dest_jid): + # this is a private message in a room, we need to use the MUC JID + reacting_jid = self._m.get_room_user_jid(client, dest_jid.userhostJID()) + else: + # this is a classic one2one message, we need the bare JID + reacting_jid = client.jid.userhostJID() + dest_jid = dest_jid.userhostJID() + store = True + + reaction_replace_set = await self.update_history_reactions( + client, history, reacting_jid, reactions, update_type, store + ) + + self.send_reactions(client, dest_jid, history.type, mess_id, reaction_replace_set) @implementer(iwokkel.IDisco) class XEP_0444_Handler(xmlstream.XMPPHandler): - def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_REACTIONS)] diff -r 2729d424dee7 -r 04cdcb3fd713 pyproject.toml --- a/pyproject.toml Wed Nov 22 15:05:41 2023 +0100 +++ b/pyproject.toml Wed Nov 22 15:10:04 2023 +0100 @@ -26,6 +26,7 @@ "babel < 3", "cryptography >= 41.0.1", "dbus-python < 1.3", + "emoji ~= 2.8", "html2text < 2020.2", "jinja2 >= 3.1.2", "langid < 2",