diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3844:65e5718e7710

component AP gateway: `Announce` activity implementation: `Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277 "repeat" items, or retract. rel 370
author Goffi <goffi@goffi.org>
date Thu, 14 Jul 2022 12:55:30 +0200
parents 381340b9a9ee
children cc13efdd8360
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 14 12:55:30 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 14 12:55:30 2022 +0200
@@ -35,7 +35,7 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.tools.common import date_utils
+from sat.tools.common import date_utils, uri
 from sat.memory.sqla_mapping import SubscriptionState
 
 from .constants import (
@@ -129,22 +129,26 @@
             if actor != signing_actor:
                 log.warning(f"ignoring object not attributed to signing actor: {data}")
                 continue
-            try:
-                target_account = obj["object"]
-            except KeyError:
-                log.warning(f'ignoring invalid object, missing "object" key: {data}')
-                continue
-            if not self.apg.isLocalURL(target_account):
-                log.warning(f"ignoring unfollow request to non local actor: {data}")
-                continue
 
             if type_ == "Follow":
+                try:
+                    target_account = obj["object"]
+                except KeyError:
+                    log.warning(f'ignoring invalid object, missing "object" key: {data}')
+                    continue
+                if not self.apg.isLocalURL(target_account):
+                    log.warning(f"ignoring unfollow request to non local actor: {data}")
+                    continue
                 await self.apg._p.unsubscribe(
                     client,
                     account_jid,
                     node,
                     sender=client.jid,
                 )
+            elif type_ == "Announce":
+                # we can use directly the Announce object, as only the "id" field is
+                # needed
+                await self.apg.newAPDeleteItem(client, None, node, obj)
             else:
                 log.warning(f"Unmanaged undo type: {type_!r}")
 
@@ -243,11 +247,6 @@
         ap_url: str,
         signing_actor: str
     ):
-        digest = request.getHeader("digest")
-        if digest in self._seen_digest:
-            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
-            return
-        self._seen_digest.append(digest)
         if node is None:
             node = self.apg._m.namespace
         client = await self.apg.getVirtualClient(signing_actor)
@@ -255,6 +254,59 @@
         for obj in objects:
             await self.apg.newAPDeleteItem(client, account_jid, node, obj)
 
+    async def handleNewAPItems(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        signing_actor: str,
+        repeated: bool = False,
+    ):
+        """Helper method to handle workflow for new AP items
+
+        accept globally the same parameter as for handleCreateActivity
+        @param repeated: if True, the item is an item republished from somewhere else
+        """
+        if "_repeated" in data:
+            log.error(
+                '"_repeated" field already present in given AP item, this should not '
+                f"happen. Ignoring object from {signing_actor}\n{data}"
+            )
+            raise exceptions.DataError("unexpected field in item")
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            sender = await self.apg.apGetSenderActor(obj)
+            if repeated:
+                # we don't check sender when item is repeated, as it should be different
+                # from post author in this case
+                sender_jid = await self.apg.getJIDFromId(sender)
+                repeater_jid = await self.apg.getJIDFromId(signing_actor)
+
+                obj["_repeated"] = {
+                    "by": repeater_jid.full(),
+                    "at": data.get("published"),
+                    "uri": uri.buildXMPPUri(
+                        "pubsub",
+                        path=sender_jid.full(),
+                        node=self.apg._m.namespace,
+                        item=obj["id"]
+                    )
+                }
+                # we must use activity's id and targets, not the original item ones
+                for field in ("id", "to", "bto", "cc", "bcc"):
+                    obj[field] = data.get(field)
+            else:
+                if sender != signing_actor:
+                    log.warning(
+                        "Ignoring object not attributed to signing actor: {obj}"
+                    )
+                    continue
+            await self.apg.newAPItem(client, account_jid, node, obj)
+
     async def handleCreateActivity(
         self,
         request: "HTTPRequest",
@@ -265,23 +317,27 @@
         ap_url: str,
         signing_actor: str
     ):
-        digest = request.getHeader("digest")
-        if digest in self._seen_digest:
-            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
-            return
-        self._seen_digest.append(digest)
-        if node is None:
-            node = self.apg._m.namespace
-        client = await self.apg.getVirtualClient(signing_actor)
-        objects = await self.apg.apGetList(data, "object")
-        for obj in objects:
-            sender = await self.apg.apGetSenderActor(obj)
-            if sender != signing_actor:
-                log.warning(
-                    "Ignoring object not attributed to signing actor: {obj}"
-                )
-            else:
-                await self.apg.newAPItem(client, account_jid, node, obj)
+        await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
+
+    async def handleAnnounceActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        # we create a new item
+        await self.handleNewAPItems(
+            request,
+            data,
+            account_jid,
+            node,
+            signing_actor,
+            repeated=True
+        )
 
     async def APActorRequest(
         self,
@@ -680,8 +736,16 @@
             request.finish()
             return
 
+        request.setResponseCode(http.ACCEPTED)
+
+        digest = request.getHeader("digest")
+        if digest in self._seen_digest:
+            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
+            request.finish()
+            return
+        self._seen_digest.append(digest)
+
         # default response code, may be changed, e.g. in case of exception
-        request.setResponseCode(http.ACCEPTED)
         try:
             return await self.APRequest(request, signing_actor)
         except Exception as e: