Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3904:0aa7023dcd08
component AP gateway: events:
- XMPP Events <=> AP Events conversion
- `Join`/`Leave` activities are converted to RSVP attachments and vice versa
- fix caching/notification on item published on a virtual pubsub node
- add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account
- handle `Update` activity
- on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of
the item is already present in cache
- `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the
actor mapping the same JID with the Events node (i.e. it links to the Events node of the
entity)
- fix subscription to nodes which are not the microblog one
rel 372
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Sep 2022 00:01:41 +0200 |
parents | aa7197b67c26 |
children | 78b5f356900c |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Thu Sep 22 00:01:41 2022 +0200 @@ -127,6 +127,22 @@ await self.apg.convertAndPostItems( client, ap_account, service, nodeIdentifier, items ) + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, nodeIdentifier, with_subscriptions=True, create=True + ) + await self.host.memory.storage.cachePubsubItems( + client, + cached_node, + items + ) + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.notifyPublish( + service, + nodeIdentifier, + [(subscription.subscriber, None, items)] + ) async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from following collection to XMPP item""" @@ -334,14 +350,17 @@ if cached_node is None: raise error.StanzaError("item-not-found") else: - if not node.startswith(self.apg._m.namespace): + if node.startswith(self.apg._m.namespace): + parser = self.apg.ap_item_2_mb_elt + elif node.startswith(self.apg._events.namespace): + parser = self.apg.ap_events.ap_item_2_event_elt + else: raise error.StanzaError( "feature-not-implemented", text=f"AP Gateway {C.APP_VERSION} only supports " f"{self.apg._m.namespace} node for now" ) collection_name = "outbox" - parser = self.apg.apItem2Elt use_cache = True if use_cache: @@ -433,6 +452,13 @@ async def subscribe(self, requestor, service, nodeIdentifier, subscriber): # TODO: handle comments nodes client = self.apg.client + # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow + # is accepted. Other nodes are directly set to subscribed, their subscriptions + # being internal. + if nodeIdentifier == self.apg._m.namespace: + sub_state = SubscriptionState.PENDING + else: + sub_state = SubscriptionState.SUBSCRIBED node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True ) @@ -455,13 +481,13 @@ if subscription is None: subscription = PubsubSub( subscriber=requestor.userhostJID(), - state=SubscriptionState.PENDING + state=sub_state ) node.subscriptions.append(subscription) await self.host.memory.storage.add(node) else: if subscription.state is None: - subscription.state = SubscriptionState.PENDING + subscription.state = sub_state await self.host.memory.storage.add(node) elif subscription.state == SubscriptionState.SUBSCRIBED: log.info( @@ -473,13 +499,17 @@ f"{requestor.userhostJID()} has already a pending subscription to " f"{node!r} at {service}. Doing the request anyway." ) + if sub_state != SubscriptionState.PENDING: + subscription.state = sub_state + await self.host.memory.storage.add(node) else: raise exceptions.InternalError( f"unmanaged subscription state: {subscription.state}" ) - if nodeIdentifier == self.apg._m.namespace: - # if we subscribe to microblog node, we follow the corresponding account + if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): + # if we subscribe to microblog or events node, we follow the corresponding + # account req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) @@ -490,7 +520,7 @@ if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) - return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") + return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @ensure_deferred async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):