changeset 3865:59fbb66b2923

component AP gateway: handle XMPP attachments -> AP likes conversion: convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node is created in cache when necessary, and virtual items published to virtual JID mapping AP accounts are cached too. rel 370
author Goffi <goffi@goffi.org>
date Wed, 20 Jul 2022 17:53:12 +0200 (2022-07-20)
parents ac255a0fbd4c
children 915fb230cb28
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/pubsub_service.py
diffstat 3 files changed, 183 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Wed Jul 20 17:49:51 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Wed Jul 20 17:53:12 2022 +0200
@@ -70,6 +70,7 @@
     TYPE_FOLLOWERS,
     TYPE_TOMBSTONE,
     TYPE_MENTION,
+    TYPE_LIKE,
     NS_AP,
     NS_AP_PUBLIC,
     PUBLIC_TUPLE
@@ -90,9 +91,9 @@
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
     C.PI_DEPENDENCIES: [
-        "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329",
-        "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY",
-        "XEP-0054"
+        "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292",
+        "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES",
+        "IDENTITY", "PUBSUB_ATTACHMENTS"
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
@@ -131,6 +132,7 @@
         self._c = host.plugins["PUBSUB_CACHE"]
         self._t = host.plugins["TEXT_SYNTAXES"]
         self._i = host.plugins["IDENTITY"]
+        self._pa = host.plugins["PUBSUB_ATTACHMENTS"]
         self._p.addManagedNode(
             "",
             items_cb=self._itemsReceived,
@@ -287,9 +289,17 @@
             return
 
         ap_account = self._e.unescape(recipient.user)
-        await self.convertAndPostItems(
-            client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items
-        )
+
+        if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier):
+            await self.convertAndPostAttachments(
+                client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
+                itemsEvent.items, publisher=itemsEvent.sender
+            )
+        else:
+            await self.convertAndPostItems(
+                client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
+                itemsEvent.items
+            )
 
     async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
         """Get client for this component with a specified jid
@@ -406,11 +416,14 @@
 
             if found_item is None:
                 # the node is not in cache, we have to make a request to retrieve the item
-                # if doesn't exist, getItems will raise a NotFound exception
+                # If the node doesn't exist, getItems will raise a NotFound exception
                 found_items, __ = await self._p.getItems(
                     self.client, author_jid, node, item_ids=[item_id]
                 )
-                found_item = found_items[0]
+                try:
+                    found_item = found_items[0]
+                except KeyError:
+                    raise exceptions.NotFound("requested items can't be found")
 
             mb_data = await self._m.item2mbdata(
                 self.client, found_item, author_jid, node
@@ -834,6 +847,12 @@
         """Generate base data for an activity
 
         @param activity: one of ACTIVITY_TYPES
+        @param actor_id: AP actor ID of the sender
+        @param object_: content of "object" field
+        @param target: content of "target" field
+        @param activity_id: ID to use for the activity
+            if not set it will be automatically generated, but it is usually desirable to
+            set the ID manually so it can be retrieved (e.g. for Undo)
         """
         if activity not in ACTIVITY_TYPES:
             raise exceptions.InternalError(f"invalid activity: {activity!r}")
@@ -948,27 +967,41 @@
         service: jid.JID,
         node: str,
         items: List[domish.Element],
-        subscribe_comments_nodes: bool = False,
+        subscribe_extra_nodes: bool = True,
     ) -> None:
         """Convert XMPP items to AP items and post them to actor inbox
 
-        @param ap_account: account of ActivityPub actor
-        @param service: JID of the virtual pubsub service corresponding to the AP actor
-        @param node: virtual node corresponding to the AP actor and items
-        @param subscribe_comments_nodes: if True, comment nodes present in given items,
-            they will be automatically subscribed
+        @param ap_account: account of ActivityPub actor receiving the item
+        @param service: JID of the (virtual) pubsub service where the item has been
+            published
+        @param node: (virtual) node corresponding where the item has been published
+        @param subscribe_extra_nodes: if True, extra data nodes will be automatically
+        subscribed, that is comment nodes if present and attachments nodes.
         """
         actor_id = await self.getAPActorIdFromAccount(ap_account)
         inbox = await self.getAPInboxFromId(actor_id)
         for item in items:
             if item.name == "item":
                 mb_data = await self._m.item2mbdata(client, item, service, node)
-                if subscribe_comments_nodes:
+                if subscribe_extra_nodes:
                     # we subscribe automatically to comment nodes if any
+                    recipient_jid = self.getLocalJIDFromAccount(ap_account)
+                    recipient_client = self.client.getVirtualClient(recipient_jid)
                     for comment_data in mb_data.get("comments", []):
                         comment_service = jid.JID(comment_data["service"])
                         comment_node = comment_data["node"]
-                        await self._p.subscribe(client, comment_service, comment_node)
+                        await self._p.subscribe(
+                            recipient_client, comment_service, comment_node
+                        )
+                    try:
+                        await self._pa.subscribe(
+                            recipient_client, service, node, mb_data["id"]
+                        )
+                    except exceptions.NotFound:
+                        log.debug(
+                            f"no attachment node found for item {mb_data['id']!r} on "
+                            f"{node!r} at {service}"
+                        )
                 ap_item = await self.mbdata2APitem(client, mb_data)
                 url_actor = ap_item["actor"]
             elif item.name == "retract":
@@ -978,12 +1011,110 @@
             else:
                 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
             resp = await self.signAndPost(inbox, url_actor, ap_item)
-            if resp.code >= 300:
-                text = await resp.text()
+
+    async def convertAndPostAttachments(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        service: jid.JID,
+        node: str,
+        items: List[domish.Element],
+        publisher: Optional[jid.JID]
+    ) -> None:
+        """Convert XMPP item attachments to AP activities and post them to actor inbox
+
+        @param ap_account: account of ActivityPub actor receiving the item
+        @param service: JID of the (virtual) pubsub service where the item has been
+            published
+        @param node: (virtual) node corresponding where the item has been published
+        @param subscribe_extra_nodes: if True, extra data nodes will be automatically
+        subscribed, that is comment nodes if present and attachments nodes.
+        """
+        if len(items) != 1:
+            log.warning(
+                "we should get exactly one attachment item for an entity, got "
+                f"{len(items)})"
+            )
+
+        actor_id = await self.getAPActorIdFromAccount(ap_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+
+        item_elt = items[0]
+        if publisher is not None:
+            item_elt["publisher"] = publisher.userhost()
+        item_service, item_node, item_id = self._pa.attachmentNode2Item(node)
+        item_account = await self.getAPAccountFromJidAndNode(item_service, item_node)
+        if item_service.host == self.client.jid.userhost():
+            # it's a virtual JID mapping to an external AP actor, we can use the
+            # item_id directly
+            item_url = item_id
+            if not item_url.startswith("https:"):
                 log.warning(
-                    f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
-                    f"{pformat(ap_item)}"
+                    "item ID of external AP actor is not an https link, ignoring: "
+                    f"{item_id!r}"
                 )
+                return
+        else:
+            item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id)
+
+        old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({
+            "profiles": [self.client.profile],
+            "services": [service],
+            "names": [item_elt["id"]]
+        })
+        if not old_attachment_pubsub_items:
+            old_attachment = {}
+        else:
+            old_attachment_items = [i.data for i in old_attachment_pubsub_items]
+            old_attachments = self._pa.items2attachmentData(client, old_attachment_items)
+            try:
+                old_attachment = old_attachments[0]
+            except IndexError:
+                # no known element was present in attachments
+                old_attachment = {}
+        sender_account = await self.getAPAccountFromJidAndNode(
+            client.jid,
+            None
+        )
+        sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account)
+        try:
+            attachments = self._pa.items2attachmentData(client, [item_elt])[0]
+        except IndexError:
+            # no known element was present in attachments
+            attachments = {}
+
+        if "noticed" in attachments:
+            if not "noticed" in old_attachment:
+                # new "noticed" attachment, we translate to "Like" activity
+                activity_id = self.buildAPURL("like", item_account, item_id)
+                like = self.createActivity(
+                    TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
+                )
+                like["to"] = [NS_AP_PUBLIC]
+                await self.signAndPost(inbox, sender_actor_id, like)
+        else:
+            if "noticed" in old_attachment:
+                # "noticed" attachment has been removed, we undo the "Like" activity
+                activity_id = self.buildAPURL("like", item_account, item_id)
+                like = self.createActivity(
+                    TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
+                )
+                like["to"] = [NS_AP_PUBLIC]
+                undo = self.createActivity("Undo", sender_actor_id, like)
+                await self.signAndPost(inbox, sender_actor_id, undo)
+
+        if service.user and service.host == self.client.jid.userhost():
+            # the item is on a virtual service, we need to store it in cache
+            log.debug("storing attachments item in cache")
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node, with_subscriptions=True, create=True
+            )
+            await self.host.memory.storage.cachePubsubItems(
+                self.client,
+                cached_node,
+                [item_elt],
+                [attachments]
+            )
 
     async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
         """Sign a documentent and post it to AP server
@@ -1020,7 +1151,7 @@
             body,
             headers=headers,
         )
-        if resp.code >= 400:
+        if resp.code >= 300:
             text = await resp.text()
             log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
         elif self.verbose:
@@ -1694,7 +1825,7 @@
             target_ap_account = await self.getAPAccountFromJidAndNode(
                 service, node
             )
-            if service.host == self.client.jid.userhost:
+            if service.host == self.client.jid.userhost():
                 # service is a proxy JID for AP account
                 actor_data = await self.getAPActorDataFromAccount(target_ap_account)
                 followers = actor_data.get("followers")
@@ -1758,8 +1889,6 @@
         item_data = await self.mbdata2APitem(client, mess_data)
         url_actor = item_data["actor"]
         resp = await self.signAndPost(inbox_url, url_actor, item_data)
-        if resp.code != 202:
-            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
     async def apDeleteItem(
         self,
@@ -1909,12 +2038,6 @@
             from_jid.userhostJID(), None, fastened_elts.id, public=False
         )
         resp = await self.signAndPost(inbox, url_actor, ap_item)
-        if resp.code >= 300:
-            text = await resp.text()
-            log.warning(
-                f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
-                f"{pformat(ap_item)}"
-            )
         return False
 
     async def _onReferenceReceived(
@@ -2003,12 +2126,6 @@
         inbox = await self.getAPInboxFromId(actor_id)
 
         resp = await self.signAndPost(inbox, ap_item["actor"], ap_item)
-        if resp.code >= 300:
-            text = await resp.text()
-            log.warning(
-                f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
-                f"{pformat(ap_item)}"
-            )
 
         return False
 
@@ -2263,27 +2380,9 @@
             parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
             node = parent_node or node
             cached_node = await self.host.memory.storage.getPubsubNode(
-                client, service, node, with_subscriptions=True
+                client, service, node, with_subscriptions=True, create=True,
+                create_kwargs={"subscribed": True}
             )
-            if cached_node is None:
-                try:
-                    cached_node = await self.host.memory.storage.setPubsubNode(
-                        client,
-                        service,
-                        node,
-                        subscribed=True
-                    )
-                except IntegrityError as e:
-                    if "unique" in str(e.orig).lower():
-                        # the node may already exist, if it has been created just after
-                        # getPubsubNode above
-                        log.debug("ignoring UNIQUE constraint error")
-                        cached_node = await self.host.memory.storage.getPubsubNode(
-                            client, service, node, with_subscriptions=True
-                        )
-                    else:
-                        raise e
-
         else:
             # it is a root item (i.e. not a reply to an other item)
             cached_node = await self.host.memory.storage.getPubsubNode(
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Wed Jul 20 17:49:51 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Wed Jul 20 17:53:12 2022 +0200
@@ -29,6 +29,7 @@
 TYPE_ITEM = "item"
 TYPE_TOMBSTONE = "Tombstone"
 TYPE_MENTION = "Mention"
+TYPE_LIKE = "Like"
 MEDIA_TYPE_AP = "application/activity+json"
 NS_AP = "https://www.w3.org/ns/activitystreams"
 NS_AP_PUBLIC = f"{NS_AP}#Public"
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Wed Jul 20 17:49:51 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Wed Jul 20 17:53:12 2022 +0200
@@ -119,9 +119,14 @@
             )
 
         client = self.apg.client.getVirtualClient(requestor)
-        await self.apg.convertAndPostItems(
-            client, ap_account, service, nodeIdentifier, items
-        )
+        if self.apg._pa.isAttachmentNode(nodeIdentifier):
+            await self.apg.convertAndPostAttachments(
+                client, ap_account, service, nodeIdentifier, items, publisher=requestor
+            )
+        else:
+            await self.apg.convertAndPostItems(
+                client, ap_account, service, nodeIdentifier, items
+            )
 
     async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
         """Convert actor ID from following collection to XMPP item"""
@@ -291,6 +296,10 @@
             log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
             return [], None
 
+        # cached_node may be pre-filled with some nodes (e.g. attachments nodes),
+        # otherwise it is filled when suitable
+        cached_node = None
+        client = self.apg.client
         kwargs = {}
 
         if node == self.apg._pps.subscriptions_node:
@@ -315,6 +324,15 @@
                 self.apg.client, ap_account, itemIdentifiers
             )
             return [item_elt], None
+        elif self.apg._pa.isAttachmentNode(node):
+            use_cache = True
+            # we check cache here because we emit an item-not-found error if the node is
+            # not in cache, as we are not dealing with real AP items
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node
+            )
+            if cached_node is None:
+                raise error.StanzaError("item-not-found")
         else:
             if not node.startswith(self.apg._m.namespace):
                 raise error.StanzaError(
@@ -326,11 +344,11 @@
             parser = self.apg.apItem2Elt
             use_cache = True
 
-        client = self.apg.client
         if use_cache:
-            cached_node = await self.host.memory.storage.getPubsubNode(
-                client, service, node
-            )
+            if cached_node is None:
+                cached_node = await self.host.memory.storage.getPubsubNode(
+                    client, service, node
+                )
             # TODO: check if node is synchronised
             if cached_node is not None:
                 # the node is cached, we return items from cache
@@ -467,7 +485,7 @@
         data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
 
         resp = await self.apg.signAndPost(inbox, req_actor_id, data)
-        if resp.code >= 400:
+        if resp.code >= 300:
             text = await resp.text()
             raise error.StanzaError("service-unavailable", text=text)
         return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
@@ -488,7 +506,7 @@
         )
 
         resp = await self.apg.signAndPost(inbox, req_actor_id, data)
-        if resp.code >= 400:
+        if resp.code >= 300:
             text = await resp.text()
             raise error.StanzaError("service-unavailable", text=text)