changeset 4157:04cdcb3fd713

plugin XEP-0444: complete implementation: "Message Reactions" implementation is now working and updating history. The `message_reactions_set` bridge method can be used by frontend to `replace`, `add`, `remove` or `toggle` reactions. History is properly updated, and signal are sent when necessary.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 15:10:04 +0100
parents 2729d424dee7
children 83d8d8500bc2
files libervia/backend/plugins/plugin_xep_0444.py pyproject.toml
diffstat 2 files changed, 236 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0444.py	Wed Nov 22 15:05:41 2023 +0100
+++ b/libervia/backend/plugins/plugin_xep_0444.py	Wed Nov 22 15:10:04 2023 +0100
@@ -16,21 +16,25 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import List, Iterable
-from copy import deepcopy
+from typing import Iterable, List
 
+from twisted.internet import defer
 from twisted.words.protocols.jabber import jid, xmlstream
 from twisted.words.xish import domish
-from twisted.internet import defer
 from wokkel import disco, iwokkel
 from zope.interface import implementer
 
+import emoji
+from libervia.backend.core import exceptions
 from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import _
 from libervia.backend.core.log import getLogger
-from libervia.backend.core import exceptions
-from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.memory.sqla_mapping import History
+from libervia.backend.models.core import MessageReactionData
+from libervia.backend.tools.utils import aio
+from libervia.backend.memory.sqla import select
+from sqlalchemy.orm.attributes import flag_modified
 
 log = getLogger(__name__)
 
@@ -40,7 +44,7 @@
     C.PI_TYPE: C.PLUG_TYPE_XEP,
     C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_PROTOCOLS: ["XEP-0444"],
-    C.PI_DEPENDENCIES: ["XEP-0334"],
+    C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0334"],
     C.PI_MAIN: "XEP_0444",
     C.PI_HANDLER: "yes",
     C.PI_DESCRIPTION: _("""Message Reactions implementation"""),
@@ -50,16 +54,18 @@
 
 
 class XEP_0444:
-
+    # TODO: implement and use occupant-ID (XEP-0421), and check sender (see
+    #   https://xmpp.org/extensions/xep-0444.html#acceptable-reactions).
     def __init__(self, host):
         log.info(_("Message Reactions initialization"))
         host.register_namespace("reactions", NS_REACTIONS)
         self.host = host
+        self._m = host.plugins["XEP-0045"]
         self._h = host.plugins["XEP-0334"]
         host.bridge.add_method(
             "message_reactions_set",
             ".plugin",
-            in_sign="ssas",
+            in_sign="sasss",
             out_sign="",
             method=self._reactions_set,
             async_=True,
@@ -69,26 +75,111 @@
     def get_handler(self, client):
         return XEP_0444_Handler()
 
+    @aio
+    async def get_history_from_reaction_id(
+        self, client: SatXMPPEntity, message_elt: domish.Element, reaction_id: str
+    ) -> History | None:
+        """Retrieves history rows that match a specific reaction_id.
+
+        The retrieval criteria vary based on the message type, according to XEP-0444.
+
+        @param message_elt: The message element, used to determine the type of message.
+        @param reaction_id: The reaction ID to match in the history.
+        """
+        profile_id = self.host.memory.storage.profiles[client.profile]
+        async with self.host.memory.storage.session() as session:
+            if message_elt.type == C.MESS_TYPE_GROUPCHAT:
+                query = select(History).where(
+                    History.profile_id == profile_id, History.stanza_id == reaction_id
+                )
+            else:
+                query = select(History).where(
+                    History.profile_id == profile_id, History.origin_id == reaction_id
+                )
+
+            result = await session.execute(query)
+            history = result.scalars().first()
+
+            return history
+
     async def _message_received_trigger(
         self,
         client: SatXMPPEntity,
         message_elt: domish.Element,
-        post_treat: defer.Deferred
+        post_treat: defer.Deferred,
     ) -> bool:
+        reactions_elt = next(message_elt.elements(NS_REACTIONS, "reactions"), None)
+        if reactions_elt is not None:
+            reaction_id = reactions_elt.getAttribute("id")
+            history = await self.get_history_from_reaction_id(
+                client, message_elt, reaction_id
+            )
+            if history is None:
+                log.warning(
+                    f"Can't find matching message for reaction: {reactions_elt.toXml()}"
+                )
+            else:
+                if not reaction_id:
+                    log.warning(f"Invalid reaction: {reactions_elt.toXml()}")
+                    return False
+                from_jid = jid.JID(message_elt["from"])
+                reactions = set()
+                for reaction_elt in reactions_elt.elements("reaction"):
+                    reaction = str(reaction_elt)
+                    if not emoji.is_emoji(reaction):
+                        log.warning(f"ignoring invalide reaction: {reaction_elt.toXml()}")
+                        continue
+                    reactions.add(reaction)
+                await self.update_history_reactions(client, history, from_jid, reactions)
+
+            return False
+
         return True
 
-    def _reactions_set(self, message_id: str, profile: str, reactions: List[str]) -> None:
+    def _reactions_set(
+        self,
+        message_id: str,
+        reactions: List[str],
+        update_type: str = "replace",
+        profile: str = C.PROF_KEY_NONE,
+    ) -> defer.Deferred[None]:
         client = self.host.get_client(profile)
         return defer.ensureDeferred(
-            self.set_reactions(client, message_id)
+            self.set_reactions(client, message_id, reactions, update_type)
         )
 
+    async def get_history_from_uid(
+        self, client: SatXMPPEntity, message_id: str
+    ) -> History:
+        """Retrieve the chat history associated with a given message ID.
+
+        @param message_id: The Libervia specific identifier of the message
+
+        @return: An instance of History containing the chat history, messages, and
+            subjects related to the specified message ID.
+
+        @raises exceptions.NotFound: The history corresponding to the given message ID is
+            not found in the database.
+        """
+        history = await self.host.memory.storage.get(
+            client,
+            History,
+            History.uid,
+            message_id,
+        )
+        if history is None:
+            raise exceptions.NotFound(
+                f"message to retract not found in database ({message_id})"
+            )
+        return history
+
     def send_reactions(
         self,
         client: SatXMPPEntity,
         dest_jid: jid.JID,
+        message_type: str,
         message_id: str,
-        reactions: Iterable[str]
+        reactions: Iterable[str],
     ) -> None:
         """Send the <message> stanza containing the reactions
 
@@ -99,6 +190,7 @@
         message_elt = domish.Element((None, "message"))
         message_elt["from"] = client.jid.full()
         message_elt["to"] = dest_jid.full()
+        message_elt["type"] = message_type
         reactions_elt = message_elt.addElement((NS_REACTIONS, "reactions"))
         reactions_elt["id"] = message_id
         for r in set(reactions):
@@ -106,64 +198,163 @@
         self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
         client.send(message_elt)
 
-    async def add_reactions_to_history(
+    def convert_to_replace_update(
         self,
+        current_reactions: set[str],
+        reactions_new: Iterable[str],
+        update_type: str,
+    ) -> set[str]:
+        """Convert the given update to a replace update.
+
+        @param current_reactions: reaction of reacting JID before update
+        @param reactions_new: New reactions to be updated.
+        @param update_type: Original type of update (add, remove, toggle, replace).
+        @return: reactions for a replace operation.
+
+        @raise ValueError: invalid ``update_type``.
+        """
+        new_reactions_set = set(reactions_new)
+
+        if update_type == "replace":
+            return new_reactions_set
+
+        if update_type == "add":
+            replace_reactions = current_reactions | new_reactions_set
+        elif update_type == "remove":
+            replace_reactions = current_reactions - new_reactions_set
+        elif update_type == "toggle":
+            replace_reactions = current_reactions ^ new_reactions_set
+        else:
+            raise ValueError(f"Invalid update type: {update_type!r}")
+
+        return replace_reactions
+
+    async def update_history_reactions(
+        self,
+        client: SatXMPPEntity,
         history: History,
-        from_jid: jid.JID,
-        reactions: Iterable[str]
-    ) -> None:
-        """Update History instance with given reactions
+        reacting_jid: jid.JID,
+        reactions_new: Iterable[str],
+        update_type: str = "replace",
+        store: bool = True,
+    ) -> set[str]:
+        """Update reactions in History instance and optionally store and signal it.
 
-        @param history: storage History instance
-            will be updated in DB
-            "summary" field of history.extra["reactions"] will also be updated
-        @param from_jid: author of the reactions
-        @param reactions: list of reactions
+        @param history: storage History instance to be updated
+        @param reacting_jid: author of the reactions
+        @param reactions_new: new reactions to update
+        @param update_type: Original type of update (add, remove, toggle, replace).
+        @param store: if True, update history in storage, and send a `message_update`
+            signal with the new reactions.
+        @return: set of reactions for this JID for a "replace" update
         """
-        history.extra = deepcopy(history.extra) if history.extra else {}
-        h_reactions = history.extra.setdefault("reactions", {})
-        # reactions mapped by originating JID
-        by_jid = h_reactions.setdefault("by_jid", {})
-        # reactions are sorted to in summary to keep a consistent order
-        h_reactions["by_jid"][from_jid.userhost()] = sorted(list(set(reactions)))
-        h_reactions["summary"] = sorted(list(set().union(*by_jid.values())))
-        await self.host.memory.storage.session_add(history)
+        # FIXME: handle race conditions
+        if history.type == C.MESS_TYPE_GROUPCHAT:
+            entity_jid_s = reacting_jid.full()
+        else:
+            entity_jid_s = reacting_jid.userhost()
+        if history.extra is None:
+            history.extra = {}
+        extra = history.extra
+        reactions = extra.get("reactions", {})
+
+        current_reactions = {
+            reaction for reaction, jids in reactions.items() if entity_jid_s in jids
+        }
+        reactions_replace_set = self.convert_to_replace_update(
+            current_reactions, reactions_new, update_type
+        )
+
+        for reaction in current_reactions - reactions_replace_set:
+            reaction_jids = reactions[reaction]
+            reaction_jids.remove(entity_jid_s)
+            if not reaction_jids:
+                del reactions[reaction]
+
+        for reaction in reactions_replace_set - current_reactions:
+            reactions.setdefault(reaction, []).append(entity_jid_s)
+
+        # we want to have a constant order in reactions
+        extra["reactions"] = dict(sorted(reactions.items()))
+
+        if store:
+            # FIXME: this is not a clean way to flag "extra" as modified, but a deepcopy
+            # is for some reason not working here.
+            flag_modified(history, "extra")
+            await self.host.memory.storage.add(history)
+            # we send the signal for frontends update
+            data = MessageReactionData(reactions=extra["reactions"])
+            self.host.bridge.message_update(
+                history.uid,
+                C.MESS_UPDATE_REACTION,
+                data.model_dump_json(),
+                client.profile,
+            )
+
+        return reactions_replace_set
 
     async def set_reactions(
         self,
         client: SatXMPPEntity,
         message_id: str,
-        reactions: Iterable[str]
+        reactions: Iterable[str],
+        update_type: str = "replace",
     ) -> None:
         """Set and replace reactions to a message
 
         @param message_id: internal ID of the message
         @param rections: lsit of emojis to used to react to the message
             use empty list to remove all reactions
+        @param update_type: how to use the reaction to make the update, can be "replace",
+            "add", "remove" or "toggle".
         """
         if not message_id:
             raise ValueError("message_id can't be empty")
-        history = await self.host.memory.storage.get(
-            client, History, History.uid, message_id,
-            joined_loads=[History.messages, History.subjects]
-        )
-        if history is None:
-            raise exceptions.NotFound(
-                f"message to retract not found in database ({message_id})"
-            )
-        mess_id = history.origin_id or history.stanza_id
-        if not mess_id:
+
+        history = await self.get_history_from_uid(client, message_id)
+        if history.origin_id is not None:
+            mess_id = history.origin_id
+        else:
+            mess_id = history.stanza_id
+
+        if mess_id is None:
             raise exceptions.DataError(
                 "target message has neither origin-id nor message-id, we can't send a "
                 "reaction"
             )
-        await self.add_reactions_to_history(history, client.jid, reactions)
-        self.send_reactions(client, history.dest_jid, mess_id, reactions)
+
+        if history.source == client.jid.userhost():
+            dest_jid = history.dest_jid
+        else:
+            dest_jid = history.source_jid
+
+        if history.type == C.MESS_TYPE_GROUPCHAT:
+            # the reaction is for the chat room itself
+            dest_jid = dest_jid.userhostJID()
+            reacting_jid = self._m.get_room_user_jid(client, dest_jid)
+            # we don't store reactions for MUC, at this will be done when we receive back
+            # the reaction
+            store = False
+        else:
+            # we use bare JIDs for one2one message, except if the recipient is from a MUC
+            if self._m.is_room(client, dest_jid):
+                # this is a private message in a room, we need to use the MUC JID
+                reacting_jid = self._m.get_room_user_jid(client, dest_jid.userhostJID())
+            else:
+                # this is a classic one2one message, we need the bare JID
+                reacting_jid = client.jid.userhostJID()
+                dest_jid = dest_jid.userhostJID()
+            store = True
+
+        reaction_replace_set = await self.update_history_reactions(
+            client, history, reacting_jid, reactions, update_type, store
+        )
+
+        self.send_reactions(client, dest_jid, history.type, mess_id, reaction_replace_set)
 
 
 @implementer(iwokkel.IDisco)
 class XEP_0444_Handler(xmlstream.XMPPHandler):
-
     def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
         return [disco.DiscoFeature(NS_REACTIONS)]
 
--- a/pyproject.toml	Wed Nov 22 15:05:41 2023 +0100
+++ b/pyproject.toml	Wed Nov 22 15:10:04 2023 +0100
@@ -26,6 +26,7 @@
     "babel < 3",
     "cryptography >= 41.0.1",
     "dbus-python < 1.3",
+    "emoji ~= 2.8",
     "html2text < 2020.2",
     "jinja2 >= 3.1.2",
     "langid < 2",