diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children 865167c34b82
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Fri May 13 19:12:33 2022 +0200
@@ -16,12 +16,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional, Tuple, List, Dict, Any
+from typing import Optional, Tuple, List, Union
 
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
-from wokkel import rsm, pubsub, data_form
+from wokkel import rsm, pubsub, disco
 
 from sat.core.i18n import _
 from sat.core import exceptions
@@ -92,7 +92,45 @@
 
     @ensure_deferred
     async def publish(self, requestor, service, nodeIdentifier, items):
-        raise NotImplementedError
+        if self.apg.local_only and not self.apg.isLocal(requestor):
+            raise error.StanzaError(
+                "forbidden",
+                "Only local users can publish on this gateway."
+            )
+        if not service.user:
+            raise error.StanzaError(
+                "bad-request",
+                "You must specify an ActivityPub actor account in JID user part."
+            )
+        ap_account = self.apg._e.unescape(service.user)
+        if ap_account.count("@") != 1:
+            raise error.StanzaError(
+                "bad-request",
+                f"{ap_account!r} is not a valid ActivityPub actor account."
+            )
+
+        client = self.apg.client.getVirtualClient(requestor)
+        await self.apg.convertAndPostItems(
+            client, ap_account, service, nodeIdentifier, items
+        )
+
+    async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from following collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.getJIDFromId(actor_id)
+        subscription_elt = self.apg._pps.buildSubscriptionElt(
+            self.apg._m.namespace, actor_jid
+        )
+        item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
+        return item_elt
+
+    async def apFollower2Elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from followers collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.getJIDFromId(actor_id)
+        subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid)
+        item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
+        return item_elt
 
     @ensure_deferred
     async def items(
@@ -110,44 +148,63 @@
         if ap_account.count("@") != 1:
             log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
             return [], None
-        if not node.startswith(self.apg._m.namespace):
-            raise error.StanzaError(
-                "feature-not-implemented",
-                text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
-                "node for now"
-            )
+
+        kwargs = {}
+
+        if node == self.apg._pps.subscriptions_node:
+            collection_name = "following"
+            parser = self.apFollowing2Elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        elif node.startswith(self.apg._pps.subscribers_node_prefix):
+            collection_name = "followers"
+            parser = self.apFollower2Elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        else:
+            if not node.startswith(self.apg._m.namespace):
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text=f"AP Gateway {C.APP_VERSION} only supports "
+                    f"{self.apg._m.namespace} node for now"
+                )
+            collection_name = "outbox"
+            parser = self.apg.apItem2Elt
+            use_cache = True
+
         client = self.apg.client
-        cached_node = await self.host.memory.storage.getPubsubNode(
-            client, service, node
-        )
-        # TODO: check if node is synchronised
-        if cached_node is not None:
-            # the node is cached, we return items from cache
-            log.debug(f"node {node!r} from {service} is in cache")
-            pubsub_items, metadata = await self.apg._c.getItemsFromCache(
-                client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+        if use_cache:
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node
             )
-            try:
-                rsm_resp = rsm.RSMResponse(**metadata["rsm"])
-            except KeyError:
-                rsm_resp = None
-            return [i.data for i in pubsub_items], rsm_resp
+            # TODO: check if node is synchronised
+            if cached_node is not None:
+                # the node is cached, we return items from cache
+                log.debug(f"node {node!r} from {service} is in cache")
+                pubsub_items, metadata = await self.apg._c.getItemsFromCache(
+                    client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+                )
+                try:
+                    rsm_resp = rsm.RSMResponse(**metadata["rsm"])
+                except KeyError:
+                    rsm_resp = None
+                return [i.data for i in pubsub_items], rsm_resp
 
         if itemIdentifiers:
             items = []
             for item_id in itemIdentifiers:
                 item_data = await self.apg.apGet(item_id)
-                item_elt = await self.apg.apItem2Elt(item_data)
+                item_elt = await parser(item_data)
                 items.append(item_elt)
             return items, None
         else:
             if rsm_req is None:
                 if maxItems is None:
                     maxItems = 20
-                kwargs = {
+                kwargs.update({
                     "max_items": maxItems,
                     "chronological_pagination": False,
-                }
+                })
             else:
                 if len(
                     [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
@@ -157,7 +214,7 @@
                         "bad-request",
                         text="You can't use after, before and index at the same time"
                     )
-                kwargs = {"max_items": rsm_req.max}
+                kwargs.update({"max_items": rsm_req.max})
                 if rsm_req.after is not None:
                     kwargs["after_id"] = rsm_req.after
                 elif rsm_req.before is not None:
@@ -171,10 +228,10 @@
                 f"No cache found for node {node} at {service} (AP account {ap_account}), "
                 "using Collection Paging to RSM translation"
             )
-            if self.apg._m.isCommentsNode(node):
-                parent_node = self.apg._m.getParentNode(node)
+            if self.apg._m.isCommentNode(node):
+                parent_item = self.apg._m.getParentItem(node)
                 try:
-                    parent_data = await self.apg.apGet(parent_node)
+                    parent_data = await self.apg.apGet(parent_item)
                     collection = await self.apg.apGetObject(
                         parent_data.get("object", {}),
                         "replies"
@@ -186,12 +243,14 @@
                     )
             else:
                 actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
-                collection = await self.apg.apGetObject(actor_data, "outbox")
+                collection = await self.apg.apGetObject(actor_data, collection_name)
             if not collection:
                 raise error.StanzaError(
                     "item-not-found",
                     text=f"No collection found for node {node!r} (account: {ap_account})"
                 )
+
+            kwargs["parser"] = parser
             return await self.apg.getAPItems(collection, **kwargs)
 
     @ensure_deferred