changeset 3804:36b167ddbfca

component AP gateway: AP delete activity => message retract: handle retractation of messages. As it is not possible to know from the AP item alone if we need to to a message retractation (XEP-0424) or a pubsub retractation (XEP-0060), we now cache sent message, and decide which method to use according to how the item is cached (i.e. in message history or in pubsub cache). rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents d5f343939239
children 33ab258df0de
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/http_server.py
diffstat 2 files changed, 64 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri Jun 17 14:15:23 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri Jun 17 14:15:23 2022 +0200
@@ -49,7 +49,7 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.memory.sqla_mapping import SubscriptionState
+from sat.memory.sqla_mapping import SubscriptionState, History
 from sat.tools import utils
 from sat.tools.common import data_format, tls, uri
 from sat.tools.common.async_utils import async_lru
@@ -229,6 +229,7 @@
 
     async def profileConnecting(self, client):
         self.client = client
+        client.sendHistory = True
         await self.init(client)
 
     async def _itemsReceived(self, client, itemsEvent):
@@ -1694,7 +1695,9 @@
                 f"({in_reply_to!r}):\n{pformat(ap_item)}"
             )
             return
-        parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account)
+        parent_item_service, parent_item_node = await self.getJIDAndNode(
+            parent_item_account
+        )
         if parent_item_node is None:
             parent_item_node = self._m.namespace
         items, __ = await self._p.getItems(
@@ -1722,18 +1725,14 @@
             __, item_elt = await self.apItem2MbDataAndElt(ap_item)
             await self._p.publish(client, comment_service, comment_node, [item_elt])
 
-    async def newAPItem(
-        self,
-        client: SatXMPPEntity,
-        destinee: Optional[jid.JID],
-        node: str,
-        item: dict,
-    ) -> None:
-        """Analyse, cache and send notification for received AP item
+    def getAPItemTargets(self, item: Dict[str, Any]) -> Tuple[bool, Set[str], Set[str]]:
+        """Retrieve targets of an AP item, and indicate if it's a public one
 
-        @param destinee: jid of the destinee,
-        @param node: XMPP pubsub node
         @param item: AP object payload
+        @return: Are returned:
+            - is_public flag, indicating if the item is world-readable
+            - targets of the item
+            - targets of the items
         """
         targets: Set[str] = set()
         is_public = False
@@ -1755,6 +1754,22 @@
                 targets.add(value)
 
         targets_types = {self.parseAPURL(t)[0] for t in targets}
+        return is_public, targets, targets_types
+
+    async def newAPItem(
+        self,
+        client: SatXMPPEntity,
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+    ) -> None:
+        """Analyse, cache and send notification for received AP item
+
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        """
+        is_public, targets, targets_types = self.getAPItemTargets(item)
         if not is_public and targets_types == {TYPE_ACTOR}:
             # this is a direct message
             await self.handleMessageAPItem(
@@ -1789,7 +1804,7 @@
                     target_jid,
                     {'': mb_data.get("content", "")},
                     mb_data.get("title"),
-
+                    extra={"origin_id": mb_data["id"]}
                 )
             )
         await defer.DeferredList(defer_l)
@@ -1883,29 +1898,57 @@
         client: SatXMPPEntity,
         destinee: Optional[jid.JID],
         node: str,
+        activity: dict,
         item: dict,
     ) -> None:
         """Analyse, cache and send notification for received AP item
 
         @param destinee: jid of the destinee,
         @param node: XMPP pubsub node
+        @param activity: parent AP activity
         @param item: AP object payload
         """
         item_id = item.get("id")
         if not item_id:
             raise exceptions.DataError('"id" attribute is missing in item')
+        if not item_id.startswith("http"):
+            raise exceptions.DataError(f"invalid id: {item_id!r}")
         if self.isLocalURL(item_id):
             raise ValueError("Local IDs should not be used")
 
-        cached_node = await self.host.memory.storage.getPubsubNode(
-            client, client.jid, node, with_subscriptions=True
+        # we have no way to know if a deleted item is a direct one (thus a message) or one
+        # converted to pubsub. We check if the id is in message history to decide what to
+        # do.
+        history = await self.host.memory.storage.get(
+            client,
+            History,
+            History.origin_id,
+            item_id,
+            (History.messages, History.subjects)
         )
-        if cached_node is None:
-            log.warning(
-                f"Received an item retract for node {node!r} at {client.jid} which is "
-                "not cached"
+
+        if history is not None:
+            # it's a direct message
+            if history.source_jid != client.jid:
+                log.warning(
+                    f"retractation received from an entity ''{client.jid}) which is "
+                    f"not the original sender of the message ({history.source_jid}), "
+                    "hack attemps?"
+                )
+                raise exceptions.PermissionError("forbidden")
+
+            await self._r.retractByHistory(client, history)
+        else:
+            # no history in cache with this ID, it's probably a pubsub item
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, client.jid, node, with_subscriptions=True
             )
-        else:
+            if cached_node is None:
+                log.warning(
+                    f"Received an item retract for node {node!r} at {client.jid} "
+                    "which is not cached"
+                )
+                raise exceptions.NotFound
             await self.host.memory.storage.deletePubsubItems(cached_node, [item_id])
             # notifyRetract is expecting domish.Element instances
             item_elt = domish.Element((None, "item"))
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri Jun 17 14:15:23 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri Jun 17 14:15:23 2022 +0200
@@ -252,7 +252,7 @@
         client = await self.apg.getVirtualClient(signing_actor)
         objects = await self.apg.apGetList(data, "object")
         for obj in objects:
-            await self.apg.newAPDeleteItem(client, account_jid, node, obj)
+            await self.apg.newAPDeleteItem(client, account_jid, node, data, obj)
 
     async def handleCreateActivity(
         self,