diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children 125c7043b277
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Tue Mar 22 17:00:42 2022 +0100
@@ -16,19 +16,41 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional, List
+from typing import Optional, Tuple, List, Dict, Any
 
+from twisted.internet import defer
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
-from wokkel import rsm
+from wokkel import rsm, pubsub, data_form
 
 from sat.core.i18n import _
+from sat.core import exceptions
 from sat.core.log import getLogger
+from sat.core.constants import Const as C
 from sat.tools.utils import ensure_deferred
+from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
+
+from .constants import (
+    TYPE_ACTOR,
+)
 
 
 log = getLogger(__name__)
 
+# all nodes have the same config
+NODE_CONFIG = [
+    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+    {"var": "pubsub#max_items", "value": "max"},
+    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+
+]
+
+NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
+NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
+for c in NODE_CONFIG:
+    NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
+
 
 class APPubsubService(rsm.PubSubService):
     """Pubsub service for XMPP requests"""
@@ -43,6 +65,31 @@
             "name": "Libervia ActivityPub Gateway",
         }
 
+    async def getAPActorIdsAndInbox(
+        self,
+        requestor: jid.JID,
+        recipient: jid.JID,
+    ) -> Tuple[str, str, str]:
+        """Get AP actor IDs from requestor and destinee JIDs
+
+        @param requestor: XMPP entity doing a request to an AP actor via the gateway
+        @param recipient: JID mapping an AP actor via the gateway
+        @return: requestor actor ID, recipient actor ID and recipient inbox
+        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
+            in requestor
+        """
+        if not recipient.user:
+            raise error.StanzaError(
+                "item-not-found",
+                text="No user part specified"
+            )
+        requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
+        recipient_account = self.apg._e.unescape(recipient.user)
+        recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
+        inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
+        return requestor_actor_id, recipient_actor_id, inbox
+
+
     @ensure_deferred
     async def publish(self, requestor, service, nodeIdentifier, items):
         raise NotImplementedError
@@ -56,55 +103,193 @@
         maxItems: Optional[int],
         itemIdentifiers: Optional[List[str]],
         rsm_req: Optional[rsm.RSMRequest]
-    ) -> List[domish.Element]:
+    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
         if not service.user:
-            return []
+            return [], None
         ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
         if ap_account.count("@") != 1:
             log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
-            return []
-        if node != self.apg._m.namespace:
+            return [], None
+        if not node.startswith(self.apg._m.namespace):
             raise error.StanzaError(
                 "feature-not-implemented",
-                text=f"{VERSION} only supports {self.apg._m.namespace} "
+                text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
                 "node for now"
             )
-        if rsm_req is None:
-            if maxItems is None:
-                maxItems = 20
-            kwargs = {
-                "max_items": maxItems,
-                "chronological_pagination": False,
-            }
+        client = self.apg.client
+        cached_node = await self.host.memory.storage.getPubsubNode(
+            client, service, node
+        )
+        # TODO: check if node is synchronised
+        if cached_node is not None:
+            # the node is cached, we return items from cache
+            log.debug(f"node {node!r} from {service} is in cache")
+            pubsub_items, metadata = await self.apg._c.getItemsFromCache(
+                client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+            )
+            try:
+                rsm_resp = rsm.RSMResponse(**metadata["rsm"])
+            except KeyError:
+                rsm_resp = None
+            return [i.data for i in pubsub_items], rsm_resp
+
+        if itemIdentifiers:
+            items = []
+            for item_id in itemIdentifiers:
+                item_data = await self.apg.apGet(item_id)
+                item_elt = await self.apg.apItem2Elt(item_data)
+                items.append(item_elt)
+            return items, None
         else:
-            if len(
-                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
-                 if v is not None]
-            ) > 1:
+            if rsm_req is None:
+                if maxItems is None:
+                    maxItems = 20
+                kwargs = {
+                    "max_items": maxItems,
+                    "chronological_pagination": False,
+                }
+            else:
+                if len(
+                    [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                     if v is not None]
+                ) > 1:
+                    raise error.StanzaError(
+                        "bad-request",
+                        text="You can't use after, before and index at the same time"
+                    )
+                kwargs = {"max_items": rsm_req.max}
+                if rsm_req.after is not None:
+                    kwargs["after_id"] = rsm_req.after
+                elif rsm_req.before is not None:
+                    kwargs["chronological_pagination"] = False
+                    if rsm_req.before != "":
+                        kwargs["after_id"] = rsm_req.before
+                elif rsm_req.index is not None:
+                    kwargs["start_index"] = rsm_req.index
+
+            log.info(
+                f"No cache found for node {node} at {service} (AP account {ap_account}), "
+                "using Collection Paging to RSM translation"
+            )
+            if self.apg._m.isCommentsNode(node):
+                parent_node = self.apg._m.getParentNode(node)
+                try:
+                    parent_data = await self.apg.apGet(parent_node)
+                    collection = await self.apg.apGetObject(
+                        parent_data.get("object", {}),
+                        "replies"
+                    )
+                except Exception as e:
+                    raise error.StanzaError(
+                        "item-not-found",
+                        text=e
+                    )
+            else:
+                actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
+                collection = await self.apg.apGetObject(actor_data, "outbox")
+            if not collection:
                 raise error.StanzaError(
-                    "bad-request",
-                    text="You can't use after, before and index at the same time"
+                    "item-not-found",
+                    text=f"No collection found for node {node!r} (account: {ap_account})"
                 )
-            kwargs = {"max_items": rsm_req.max}
-            if rsm_req.after is not None:
-                kwargs["after_id"] = rsm_req.after
-            elif rsm_req.before is not None:
-                kwargs["chronological_pagination"] = False
-                if rsm_req.before != "":
-                    kwargs["after_id"] = rsm_req.before
-            elif rsm_req.index is not None:
-                kwargs["start_index"] = rsm_req.index
-
-        log.info(
-            f"No cache found for node {node} at {service} (AP account {ap_account}), "
-            "using Collection Paging to RSM translation"
-        )
-        return await self.apg.getAPItems(ap_account, **kwargs)
+            return await self.apg.getAPItems(collection, **kwargs)
 
     @ensure_deferred
     async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
         raise NotImplementedError
 
+    @ensure_deferred
+    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
+        # TODO: handle comments nodes
+        client = self.apg.client
+        node = await self.host.memory.storage.getPubsubNode(
+            client, service, nodeIdentifier, with_subscriptions=True
+        )
+        if node is None:
+            node = await self.host.memory.storage.setPubsubNode(
+                client,
+                service,
+                nodeIdentifier,
+            )
+            subscription = None
+        else:
+            try:
+                subscription = next(
+                    s for s in node.subscriptions
+                    if s.subscriber == requestor.userhostJID()
+                )
+            except StopIteration:
+                subscription = None
+
+        if subscription is None:
+            subscription = PubsubSub(
+                subscriber=requestor.userhostJID(),
+                state=SubscriptionState.PENDING
+            )
+            node.subscriptions.append(subscription)
+            await self.host.memory.storage.add(node)
+        else:
+            if subscription.state is None:
+                subscription.state = SubscriptionState.PENDING
+                await self.host.memory.storage.add(node)
+            elif subscription.state == SubscriptionState.SUBSCRIBED:
+                log.info(
+                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
+                    f"at {service}. Doing the request anyway."
+                )
+            elif subscription.state == SubscriptionState.PENDING:
+                log.info(
+                    f"{requestor.userhostJID()} has already a pending subscription to "
+                    f"{node!r} at {service}. Doing the request anyway."
+                )
+            else:
+                raise exceptions.InternalError(
+                    f"unmanaged subscription state: {subscription.state}"
+                )
+
+        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
+            requestor, service
+        )
+
+        data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
+
+        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
+        if resp.code >= 400:
+            text = await resp.text()
+            raise error.StanzaError("service-unavailable", text=text)
+        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
+
+    @ensure_deferred
+    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
+        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
+            requestor, service
+        )
+        data = self.apg.createActivity(
+            "Undo",
+            req_actor_id,
+            self.apg.createActivity(
+                "Follow",
+                req_actor_id,
+                recip_actor_id
+            )
+        )
+
+        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
+        if resp.code >= 400:
+            text = await resp.text()
+            raise error.StanzaError("service-unavailable", text=text)
+
+    def getConfigurationOptions(self):
+        return NODE_OPTIONS
+
+    def getConfiguration(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str
+    ) -> defer.Deferred:
+        return defer.succeed(NODE_CONFIG_VALUES)
+
     def getNodeInfo(
         self,
         requestor: jid.JID,
@@ -117,13 +302,6 @@
             return None
         info = {
             "type": "leaf",
-            "meta-data": [
-                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
-                {"var": "pubsub#max_items", "value": "max"},
-                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
-                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
-
-            ]
-
+            "meta-data": NODE_CONFIG
         }
         return info