diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents aa7197b67c26
children 78b5f356900c
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Thu Sep 22 00:01:41 2022 +0200
@@ -127,6 +127,22 @@
             await self.apg.convertAndPostItems(
                 client, ap_account, service, nodeIdentifier, items
             )
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, nodeIdentifier, with_subscriptions=True, create=True
+            )
+            await self.host.memory.storage.cachePubsubItems(
+                client,
+                cached_node,
+                items
+            )
+            for subscription in cached_node.subscriptions:
+                if subscription.state != SubscriptionState.SUBSCRIBED:
+                    continue
+                self.notifyPublish(
+                    service,
+                    nodeIdentifier,
+                    [(subscription.subscriber, None, items)]
+                )
 
     async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
         """Convert actor ID from following collection to XMPP item"""
@@ -334,14 +350,17 @@
             if cached_node is None:
                 raise error.StanzaError("item-not-found")
         else:
-            if not node.startswith(self.apg._m.namespace):
+            if node.startswith(self.apg._m.namespace):
+                parser = self.apg.ap_item_2_mb_elt
+            elif node.startswith(self.apg._events.namespace):
+                parser = self.apg.ap_events.ap_item_2_event_elt
+            else:
                 raise error.StanzaError(
                     "feature-not-implemented",
                     text=f"AP Gateway {C.APP_VERSION} only supports "
                     f"{self.apg._m.namespace} node for now"
                 )
             collection_name = "outbox"
-            parser = self.apg.apItem2Elt
             use_cache = True
 
         if use_cache:
@@ -433,6 +452,13 @@
     async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
         # TODO: handle comments nodes
         client = self.apg.client
+        # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
+        # is accepted. Other nodes are directly set to subscribed, their subscriptions
+        # being internal.
+        if nodeIdentifier == self.apg._m.namespace:
+            sub_state = SubscriptionState.PENDING
+        else:
+            sub_state = SubscriptionState.SUBSCRIBED
         node = await self.host.memory.storage.getPubsubNode(
             client, service, nodeIdentifier, with_subscriptions=True
         )
@@ -455,13 +481,13 @@
         if subscription is None:
             subscription = PubsubSub(
                 subscriber=requestor.userhostJID(),
-                state=SubscriptionState.PENDING
+                state=sub_state
             )
             node.subscriptions.append(subscription)
             await self.host.memory.storage.add(node)
         else:
             if subscription.state is None:
-                subscription.state = SubscriptionState.PENDING
+                subscription.state = sub_state
                 await self.host.memory.storage.add(node)
             elif subscription.state == SubscriptionState.SUBSCRIBED:
                 log.info(
@@ -473,13 +499,17 @@
                     f"{requestor.userhostJID()} has already a pending subscription to "
                     f"{node!r} at {service}. Doing the request anyway."
                 )
+                if sub_state != SubscriptionState.PENDING:
+                    subscription.state = sub_state
+                    await self.host.memory.storage.add(node)
             else:
                 raise exceptions.InternalError(
                     f"unmanaged subscription state: {subscription.state}"
                 )
 
-        if nodeIdentifier == self.apg._m.namespace:
-            # if we subscribe to microblog node, we follow the corresponding account
+        if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
+            # if we subscribe to microblog or events node, we follow the corresponding
+            # account
             req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
                 requestor, service
             )
@@ -490,7 +520,7 @@
             if resp.code >= 300:
                 text = await resp.text()
                 raise error.StanzaError("service-unavailable", text=text)
-            return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
+        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
 
     @ensure_deferred
     async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):