changeset 3844:65e5718e7710

component AP gateway: `Announce` activity implementation: `Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277 "repeat" items, or retract. rel 370
author Goffi <goffi@goffi.org>
date Thu, 14 Jul 2022 12:55:30 +0200
parents 17c757bd74bc
children 4f9d4650eab5
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/http_server.py
diffstat 3 files changed, 109 insertions(+), 35 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Jul 14 12:55:30 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Jul 14 12:55:30 2022 +0200
@@ -1267,7 +1267,10 @@
         item_elt = await self._m.data2entry(
             self.client, mb_data, mb_data["id"], None, self._m.namespace
         )
-        item_elt["publisher"] = mb_data["author_jid"]
+        if "repeated" in mb_data["extra"]:
+            item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
+        else:
+            item_elt["publisher"] = mb_data["author_jid"]
         return mb_data, item_elt
 
     async def apItem2Elt(self, ap_item: dict) -> domish.Element:
@@ -1360,6 +1363,8 @@
     async def apItem2MBdata(self, ap_item: dict) -> dict:
         """Convert AP activity or object to microblog data
 
+        @param ap_item: ActivityPub item to convert
+            Can be either an activity of an object
         @return: AP Item's Object and microblog data
         @raise exceptions.DataError: something is invalid in the AP item
         @raise NotImplemented: some AP data is not handled yet
@@ -1377,7 +1382,7 @@
         if not item_id:
             log.warning(f'No "id" found in AP item: {ap_object!r}')
             raise exceptions.DataError
-        mb_data = {"id": item_id}
+        mb_data = {"id": item_id, "extra": {}}
 
         # content
         try:
@@ -1428,6 +1433,10 @@
                 except dateutil.parser.ParserError as e:
                     log.warning(f"Can't parse {field!r} field: {e}")
 
+        # repeat
+        if "_repeated" in ap_item:
+            mb_data["extra"]["repeated"] = ap_item["_repeated"]
+
         # comments
         in_reply_to = ap_object.get("inReplyTo")
         __, comments_node = await self.getCommentsNodes(item_id, in_reply_to)
@@ -2194,6 +2203,7 @@
         @param node: XMPP pubsub node
         @param activity: parent AP activity
         @param item: AP object payload
+            only the "id" field is used
         """
         item_id = item.get("id")
         if not item_id:
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Thu Jul 14 12:55:30 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Thu Jul 14 12:55:30 2022 +0200
@@ -67,7 +67,7 @@
 ACTIVITY_TARGET_MANDATORY = ("Add", "Remove")
 # activities which can be used with Shared Inbox (i.e. with no account specified)
 # must be lowercase
-ACTIVIY_NO_ACCOUNT_ALLOWED = ("create", "delete")
+ACTIVIY_NO_ACCOUNT_ALLOWED = ("create", "delete", "announce", "undo")
 # maximum number of parents to retrieve when comments_max_depth option is set
 COMMENTS_MAX_PARENTS = 100
 # maximum size of avatar, in bytes
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 14 12:55:30 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 14 12:55:30 2022 +0200
@@ -35,7 +35,7 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.tools.common import date_utils
+from sat.tools.common import date_utils, uri
 from sat.memory.sqla_mapping import SubscriptionState
 
 from .constants import (
@@ -129,22 +129,26 @@
             if actor != signing_actor:
                 log.warning(f"ignoring object not attributed to signing actor: {data}")
                 continue
-            try:
-                target_account = obj["object"]
-            except KeyError:
-                log.warning(f'ignoring invalid object, missing "object" key: {data}')
-                continue
-            if not self.apg.isLocalURL(target_account):
-                log.warning(f"ignoring unfollow request to non local actor: {data}")
-                continue
 
             if type_ == "Follow":
+                try:
+                    target_account = obj["object"]
+                except KeyError:
+                    log.warning(f'ignoring invalid object, missing "object" key: {data}')
+                    continue
+                if not self.apg.isLocalURL(target_account):
+                    log.warning(f"ignoring unfollow request to non local actor: {data}")
+                    continue
                 await self.apg._p.unsubscribe(
                     client,
                     account_jid,
                     node,
                     sender=client.jid,
                 )
+            elif type_ == "Announce":
+                # we can use directly the Announce object, as only the "id" field is
+                # needed
+                await self.apg.newAPDeleteItem(client, None, node, obj)
             else:
                 log.warning(f"Unmanaged undo type: {type_!r}")
 
@@ -243,11 +247,6 @@
         ap_url: str,
         signing_actor: str
     ):
-        digest = request.getHeader("digest")
-        if digest in self._seen_digest:
-            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
-            return
-        self._seen_digest.append(digest)
         if node is None:
             node = self.apg._m.namespace
         client = await self.apg.getVirtualClient(signing_actor)
@@ -255,6 +254,59 @@
         for obj in objects:
             await self.apg.newAPDeleteItem(client, account_jid, node, obj)
 
+    async def handleNewAPItems(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        signing_actor: str,
+        repeated: bool = False,
+    ):
+        """Helper method to handle workflow for new AP items
+
+        accept globally the same parameter as for handleCreateActivity
+        @param repeated: if True, the item is an item republished from somewhere else
+        """
+        if "_repeated" in data:
+            log.error(
+                '"_repeated" field already present in given AP item, this should not '
+                f"happen. Ignoring object from {signing_actor}\n{data}"
+            )
+            raise exceptions.DataError("unexpected field in item")
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            sender = await self.apg.apGetSenderActor(obj)
+            if repeated:
+                # we don't check sender when item is repeated, as it should be different
+                # from post author in this case
+                sender_jid = await self.apg.getJIDFromId(sender)
+                repeater_jid = await self.apg.getJIDFromId(signing_actor)
+
+                obj["_repeated"] = {
+                    "by": repeater_jid.full(),
+                    "at": data.get("published"),
+                    "uri": uri.buildXMPPUri(
+                        "pubsub",
+                        path=sender_jid.full(),
+                        node=self.apg._m.namespace,
+                        item=obj["id"]
+                    )
+                }
+                # we must use activity's id and targets, not the original item ones
+                for field in ("id", "to", "bto", "cc", "bcc"):
+                    obj[field] = data.get(field)
+            else:
+                if sender != signing_actor:
+                    log.warning(
+                        "Ignoring object not attributed to signing actor: {obj}"
+                    )
+                    continue
+            await self.apg.newAPItem(client, account_jid, node, obj)
+
     async def handleCreateActivity(
         self,
         request: "HTTPRequest",
@@ -265,23 +317,27 @@
         ap_url: str,
         signing_actor: str
     ):
-        digest = request.getHeader("digest")
-        if digest in self._seen_digest:
-            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
-            return
-        self._seen_digest.append(digest)
-        if node is None:
-            node = self.apg._m.namespace
-        client = await self.apg.getVirtualClient(signing_actor)
-        objects = await self.apg.apGetList(data, "object")
-        for obj in objects:
-            sender = await self.apg.apGetSenderActor(obj)
-            if sender != signing_actor:
-                log.warning(
-                    "Ignoring object not attributed to signing actor: {obj}"
-                )
-            else:
-                await self.apg.newAPItem(client, account_jid, node, obj)
+        await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
+
+    async def handleAnnounceActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        # we create a new item
+        await self.handleNewAPItems(
+            request,
+            data,
+            account_jid,
+            node,
+            signing_actor,
+            repeated=True
+        )
 
     async def APActorRequest(
         self,
@@ -680,8 +736,16 @@
             request.finish()
             return
 
+        request.setResponseCode(http.ACCEPTED)
+
+        digest = request.getHeader("digest")
+        if digest in self._seen_digest:
+            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
+            request.finish()
+            return
+        self._seen_digest.append(digest)
+
         # default response code, may be changed, e.g. in case of exception
-        request.setResponseCode(http.ACCEPTED)
         try:
             return await self.APRequest(request, signing_actor)
         except Exception as e: