changeset 3869:c0bcbcf5b4b7

component AP gateway: handle `Like` and `Undo`/`Like` activities: rel 370
author Goffi <goffi@goffi.org>
date Thu, 21 Jul 2022 18:07:35 +0200
parents 37d2c0282304
children bd84d289fc94
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/http_server.py sat/plugins/plugin_pubsub_attachments.py
diffstat 3 files changed, 246 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Jul 21 18:05:20 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Jul 21 18:07:35 2022 +0200
@@ -293,7 +293,7 @@
         if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier):
             await self.convertAndPostAttachments(
                 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
-                itemsEvent.items, publisher=itemsEvent.sender
+                itemsEvent.items
             )
         else:
             await self.convertAndPostItems(
@@ -581,7 +581,7 @@
         if self.client is None:
             raise exceptions.InternalError("Client is not set yet")
 
-        if jid_.host == self.client.jid.userhost():
+        if self.isVirtualJID(jid_):
             # this is an proxy JID to an AP Actor
             return self._e.unescape(jid_.user)
 
@@ -786,6 +786,10 @@
         """
         return url.startswith(self.base_ap_url)
 
+    def isVirtualJID(self, jid_: jid.JID) -> bool:
+        """Tell if a JID is an AP actor mapped through this gateway"""
+        return jid_.host == self.client.jid.userhost()
+
     def buildSignatureHeader(self, values: Dict[str, str]) -> str:
         """Build key="<value>" signature header from signature data"""
         fields = []
@@ -983,12 +987,16 @@
         for item in items:
             if item.name == "item":
                 mb_data = await self._m.item2mbdata(client, item, service, node)
-                if subscribe_extra_nodes:
+                author_jid = jid.JID(mb_data["author_jid"])
+                if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
                     # we subscribe automatically to comment nodes if any
                     recipient_jid = self.getLocalJIDFromAccount(ap_account)
                     recipient_client = self.client.getVirtualClient(recipient_jid)
                     for comment_data in mb_data.get("comments", []):
                         comment_service = jid.JID(comment_data["service"])
+                        if self.isVirtualJID(comment_service):
+                            log.debug(f"ignoring virtual comment service: {comment_data}")
+                            continue
                         comment_node = comment_data["node"]
                         await self._p.subscribe(
                             recipient_client, comment_service, comment_node
@@ -1019,7 +1027,7 @@
         service: jid.JID,
         node: str,
         items: List[domish.Element],
-        publisher: Optional[jid.JID]
+        publisher: Optional[jid.JID] = None
     ) -> None:
         """Convert XMPP item attachments to AP activities and post them to actor inbox
 
@@ -1027,8 +1035,14 @@
         @param service: JID of the (virtual) pubsub service where the item has been
             published
         @param node: (virtual) node corresponding where the item has been published
-        @param subscribe_extra_nodes: if True, extra data nodes will be automatically
-        subscribed, that is comment nodes if present and attachments nodes.
+            subscribed, that is comment nodes if present and attachments nodes.
+        @param items: attachments items
+        @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub
+            service, it's the publisher of the item). To be filled only when the publisher
+            is known for sure, otherwise publisher will be determined either if
+            "publisher" attribute is set by pubsub service, or as a last resort, using
+            item's ID (which MUST be publisher bare JID according to pubsub-attachments
+            specification).
         """
         if len(items) != 1:
             log.warning(
@@ -1040,11 +1054,29 @@
         inbox = await self.getAPInboxFromId(actor_id)
 
         item_elt = items[0]
+        item_id = item_elt["id"]
+
+        if publisher is None:
+            item_pub_s = item_elt.getAttribute("publisher")
+            publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id)
+
+        if publisher.userhost() != item_id:
+            log.warning(
+                "attachments item ID must be publisher's bare JID, ignoring: "
+                f"{item_elt.toXml()}"
+            )
+            return
+
+        if self.isVirtualJID(publisher):
+            log.debug(f"ignoring item coming from local virtual JID {publisher}")
+            return
+
         if publisher is not None:
             item_elt["publisher"] = publisher.userhost()
+
         item_service, item_node, item_id = self._pa.attachmentNode2Item(node)
         item_account = await self.getAPAccountFromJidAndNode(item_service, item_node)
-        if item_service.host == self.client.jid.userhost():
+        if self.isVirtualJID(item_service):
             # it's a virtual JID mapping to an external AP actor, we can use the
             # item_id directly
             item_url = item_id
@@ -1072,11 +1104,11 @@
             except IndexError:
                 # no known element was present in attachments
                 old_attachment = {}
-        sender_account = await self.getAPAccountFromJidAndNode(
-            client.jid,
+        publisher_account = await self.getAPAccountFromJidAndNode(
+            publisher,
             None
         )
-        sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account)
+        publisher_actor_id = self.buildAPURL(TYPE_ACTOR, publisher_account)
         try:
             attachments = self._pa.items2attachmentData(client, [item_elt])[0]
         except IndexError:
@@ -1088,22 +1120,22 @@
                 # new "noticed" attachment, we translate to "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
                 like = self.createActivity(
-                    TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
+                    TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
                 like["to"] = [NS_AP_PUBLIC]
-                await self.signAndPost(inbox, sender_actor_id, like)
+                await self.signAndPost(inbox, publisher_actor_id, like)
         else:
             if "noticed" in old_attachment:
                 # "noticed" attachment has been removed, we undo the "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
                 like = self.createActivity(
-                    TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
+                    TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
                 like["to"] = [NS_AP_PUBLIC]
-                undo = self.createActivity("Undo", sender_actor_id, like)
-                await self.signAndPost(inbox, sender_actor_id, undo)
+                undo = self.createActivity("Undo", publisher_actor_id, like)
+                await self.signAndPost(inbox, publisher_actor_id, undo)
 
-        if service.user and service.host == self.client.jid.userhost():
+        if service.user and self.isVirtualJID(service):
             # the item is on a virtual service, we need to store it in cache
             log.debug("storing attachments item in cache")
             cached_node = await self.host.memory.storage.getPubsubNode(
@@ -1720,7 +1752,7 @@
         rep_item = parsed_url["item"]
         activity_id = self.buildAPURL("item", repeater.userhost(), mb_data["id"])
 
-        if rep_service.host == self.client.jid.userhost():
+        if self.isVirtualJID(rep_service):
             # it's an AP actor linked through this gateway
             # in this case we can simply use the item ID
             if not rep_item.startswith("https:"):
@@ -1825,7 +1857,7 @@
             target_ap_account = await self.getAPAccountFromJidAndNode(
                 service, node
             )
-            if service.host == self.client.jid.userhost():
+            if self.isVirtualJID(service):
                 # service is a proxy JID for AP account
                 actor_data = await self.getAPActorDataFromAccount(target_ap_account)
                 followers = actor_data.get("followers")
@@ -1836,7 +1868,7 @@
                 ap_object["cc"] = [followers]
             if self._m.isCommentNode(node):
                 parent_item = self._m.getParentItem(node)
-                if service.host == self.client.jid.userhost():
+                if self.isVirtualJID(service):
                     # the publication is on a virtual node (i.e. an XMPP node managed by
                     # this gateway and linking to an ActivityPub actor)
                     ap_object["inReplyTo"] = parent_item
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 21 18:05:20 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Jul 21 18:07:35 2022 +0200
@@ -34,6 +34,7 @@
 from sat.core import exceptions
 from sat.core.constants import Const as C
 from sat.core.i18n import _
+from sat.core.core_types import SatXMPPEntity
 from sat.core.log import getLogger
 from sat.tools.common import date_utils, uri
 from sat.memory.sqla_mapping import SubscriptionState
@@ -41,7 +42,7 @@
 from .constants import (
     NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
     AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
-    SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING
+    SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE
 )
 from .regex import RE_SIG_PARAM
 
@@ -149,6 +150,8 @@
                 # we can use directly the Announce object, as only the "id" field is
                 # needed
                 await self.apg.newAPDeleteItem(client, None, node, obj)
+            elif type_ == TYPE_LIKE:
+                await self.handleNewLikeItem(client, obj, True)
             else:
                 log.warning(f"Unmanaged undo type: {type_!r}")
 
@@ -339,6 +342,108 @@
             repeated=True
         )
 
+    async def handleNewLikeItem(
+        self,
+        client: SatXMPPEntity,
+        data: dict,
+        undo: bool = False,
+    ) -> None:
+        liked_ids = data.get("object")
+        if not liked_ids:
+            raise exceptions.DataError("object should be set")
+        elif isinstance(liked_ids, list):
+            try:
+                liked_ids = [o["id"] for o in liked_ids]
+            except (KeyError, TypeError):
+                raise exceptions.DataError(f"invalid object: {liked_ids!r}")
+        elif isinstance(liked_ids, dict):
+            obj_id = liked_ids.get("id")
+            if not obj_id or not isinstance(obj_id, str):
+                raise exceptions.DataError(f"invalid object: {liked_ids!r}")
+            liked_ids = [obj_id]
+        elif isinstance(liked_ids, str):
+            liked_ids = [liked_ids]
+
+        for liked_id in liked_ids:
+            if not self.apg.isLocalURL(liked_id):
+                log.debug(f"ignoring non local liked ID: {liked_id}")
+                continue
+            url_type, url_args = self.apg.parseAPURL(liked_id)
+            if url_type != TYPE_ITEM:
+                log.warning(f"unexpected local URL for liked item: {liked_id}")
+                continue
+            try:
+                account, item_id = url_args
+            except ValueError:
+                raise ValueError(f"invalid URL: {liked_id}")
+            author_jid, item_node = await self.apg.getJIDAndNode(account)
+            if item_node is None:
+                item_node = self.apg._m.namespace
+            attachment_node = self.apg._pa.getAttachmentNodeName(
+                author_jid, item_node, item_id
+            )
+            cached_node = await self.apg.host.memory.storage.getPubsubNode(
+                client,
+                author_jid,
+                attachment_node,
+                with_subscriptions=True,
+                create=True
+            )
+            found_items, __ = await self.apg.host.memory.storage.getItems(
+                cached_node, item_ids=[item_id]
+            )
+            if not found_items:
+                old_item_elt = None
+            else:
+                found_item = found_items[0]
+                old_item_elt = found_item.data
+
+            item_elt = self.apg._pa.applySetHandler(
+                client,
+                {"extra": {"noticed": not undo}},
+                old_item_elt,
+                [("noticed", self.apg._pa.namespace)]
+            )
+            # we reparse the element, as there can be other attachments
+            attachments_data = self.apg._pa.items2attachmentData(client, [item_elt])
+            # and we update the cache
+            await self.apg.host.memory.storage.cachePubsubItems(
+                client,
+                cached_node,
+                [item_elt],
+                attachments_data or [{}]
+            )
+
+            if self.apg.isVirtualJID(author_jid):
+                # the attachment is on t a virtual pubsub service (linking to an AP item),
+                # we notify all subscribers
+                for subscription in cached_node.subscriptions:
+                    if subscription.state != SubscriptionState.SUBSCRIBED:
+                        continue
+                    self.apg.pubsub_service.notifyPublish(
+                        author_jid,
+                        attachment_node,
+                        [(subscription.subscriber, None, [item_elt])]
+                    )
+            else:
+                # the attachment is on an XMPP item, we publish it to the attachment node
+                await self.apg._p.sendItems(
+                    client, author_jid, attachment_node, [item_elt]
+                )
+
+    async def handleLikeActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        client = await self.apg.getVirtualClient(signing_actor)
+        await self.handleNewLikeItem(client, data)
+
     async def APActorRequest(
         self,
         request: "HTTPRequest",
@@ -623,7 +728,7 @@
         )
         followers = []
         for subscriber in subscribers.keys():
-            if subscriber.host == self.apg.client.jid.userhost():
+            if self.apg.isVirtualJID(subscriber):
                 # the subscriber is an AP user subscribed with this gateway
                 ap_account = self.apg._e.unescape(subscriber.user)
             else:
@@ -660,7 +765,7 @@
         following = []
         for sub_dict in subscriptions:
             service = jid.JID(sub_dict["service"])
-            if service.host == self.apg.client.jid.userhost():
+            if self.apg.isVirtualJID(service):
                 # the subscription is to an AP actor with this gateway
                 ap_account = self.apg._e.unescape(service.user)
             else:
--- a/sat/plugins/plugin_pubsub_attachments.py	Thu Jul 21 18:05:20 2022 +0200
+++ b/sat/plugins/plugin_pubsub_attachments.py	Thu Jul 21 18:07:35 2022 +0200
@@ -61,7 +61,7 @@
         host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
         self.host = host
         self._p = host.plugins["XEP-0060"]
-        self.handlers = {}
+        self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {}
         host.trigger.add("XEP-0277_send", self.onMBSend)
         self.registerAttachmentHandler(
             "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet
@@ -270,12 +270,73 @@
     ) -> None:
         client = self.host.getClient(profile_key)
         attachments = data_format.deserialise(attachments_s)  or {}
-        return defer.ensureDeferred(self.setAttachments( client, attachments))
+        return defer.ensureDeferred(self.setAttachments(client, attachments))
+
+    def applySetHandler(
+        self,
+        client: SatXMPPEntity,
+        attachments_data: dict,
+        item_elt: Optional[domish.Element],
+        handlers: Optional[List[Tuple[str, str]]] = None,
+        from_jid: Optional[jid.JID] = None,
+    ) -> domish.Element:
+        """Apply all ``set`` callbacks to an attachments item
+
+        @param attachments_data: data describing the attachments
+            ``extra`` key will be used, and created if not found
+        @param from_jid: jid of the author of the attachments
+            ``client.jid.userhostJID()`` will be used if not specified
+        @param item_elt: item containing an <attachments> element
+            will be modified in place
+            if None, a new element will be created
+        @param handlers: list of (name, namespace) of handlers to use.
+            if None, all registered handlers will be used.
+        @return: updated item_elt if given, otherwise a new item_elt
+        """
+        attachments_data.setdefault("extra", {})
+        if item_elt is None:
+            item_id = client.jid.userhost() if from_jid is None else from_jid.userhost()
+            item_elt = pubsub.Item(item_id)
+            item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+
+        try:
+            attachments_elt = next(
+                item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+            )
+        except StopIteration:
+            log.warning(
+                f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
+            )
+            attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+
+        if handlers is None:
+            handlers = list(self.handlers.keys())
+
+        for name, namespace in handlers:
+            try:
+                handler = self.handlers[(name, namespace)]
+            except KeyError:
+                log.error(
+                    f"unregistered handler ({name!r}, {namespace!r}) is requested, "
+                    "ignoring"
+                )
+                continue
+            try:
+                former_elt = next(attachments_elt.elements(namespace, name))
+            except StopIteration:
+                former_elt = None
+            new_elt = handler["set"](client, attachments_data, former_elt)
+            if new_elt != former_elt:
+                if former_elt is not None:
+                    attachments_elt.children.remove(former_elt)
+                if new_elt is not None:
+                    attachments_elt.addChild(new_elt)
+        return item_elt
 
     async def setAttachments(
         self,
         client: SatXMPPEntity,
-        data: Dict[str, Any]
+        attachments_data: Dict[str, Any]
     ) -> None:
         """Set or update attachments
 
@@ -287,51 +348,39 @@
         used in attachments where "update" makes sense (e.g. it's used for "reactions"
         but not for "noticed").
 
-        @param data: microblog data data. Various keys (usually stored in
-            data["extra"]) may be used depending on the attachments handlers
-            registered. The keys "service", "node" and "id" MUST be set.
+        @param attachments_data: data describing attachments. Various keys (usually stored
+            in attachments_data["extra"]) may be used depending on the attachments
+            handlers registered. The keys "service", "node" and "id" MUST be set.
+            ``attachments_data`` is thought to be compatible with microblog data.
+
         """
-        data.setdefault("extra", {})
         try:
-            service = jid.JID(data["service"])
-            node = data["node"]
-            item = data["id"]
+            service = jid.JID(attachments_data["service"])
+            node = attachments_data["node"]
+            item = attachments_data["id"]
         except (KeyError, RuntimeError):
             raise ValueError(
                 'data must have "service", "node" and "id" set'
             )
         attachment_node = self.getAttachmentNodeName(service, node, item)
-        items, __ = await self._p.getItems(
-            client, service, attachment_node, item_ids=[client.jid.userhost()]
-        )
-        if not items:
-            # the item doesn't exist, we create a new one
-            item_elt = pubsub.Item(client.jid.userhost())
-            item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
-        else:
-            item_elt = items[0]
-
         try:
-            attachments_elt = next(
-                item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+            items, __ = await self._p.getItems(
+                client, service, attachment_node, item_ids=[client.jid.userhost()]
             )
-        except StopIteration:
-            log.warning(
-                f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
-            )
-            attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+        except exceptions.NotFound:
+            item_elt = None
+        else:
+            if not items:
+                item_elt = None
+            else:
+                item_elt = items[0]
 
-        for (name, namespace), handler in self.handlers.items():
-            try:
-                former_elt = next(attachments_elt.elements(namespace, name))
-            except StopIteration:
-                former_elt = None
-            new_elt = handler["set"](client, data, former_elt)
-            if new_elt != former_elt:
-                if former_elt is not None:
-                    attachments_elt.children.remove(former_elt)
-                if new_elt is not None:
-                    attachments_elt.addChild(new_elt)
+        item_elt = self.applySetHandler(
+            client,
+            attachments_data,
+            item_elt=item_elt,
+        )
+
         try:
             await self._p.sendItems(client, service, attachment_node, [item_elt])
         except error.StanzaError as e: