Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3904:0aa7023dcd08
component AP gateway: events:
- XMPP Events <=> AP Events conversion
- `Join`/`Leave` activities are converted to RSVP attachments and vice versa
- fix caching/notification on item published on a virtual pubsub node
- add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account
- handle `Update` activity
- on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of
the item is already present in cache
- `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the
actor mapping the same JID with the Events node (i.e. it links to the Events node of the
entity)
- fix subscription to nodes which are not the microblog one
rel 372
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Sep 2022 00:01:41 +0200 |
parents | aa7197b67c26 |
children | 78b5f356900c |
comparison
equal
deleted
inserted
replaced
3903:384b7e6c2dbf | 3904:0aa7023dcd08 |
---|---|
125 ) | 125 ) |
126 else: | 126 else: |
127 await self.apg.convertAndPostItems( | 127 await self.apg.convertAndPostItems( |
128 client, ap_account, service, nodeIdentifier, items | 128 client, ap_account, service, nodeIdentifier, items |
129 ) | 129 ) |
130 cached_node = await self.host.memory.storage.getPubsubNode( | |
131 client, service, nodeIdentifier, with_subscriptions=True, create=True | |
132 ) | |
133 await self.host.memory.storage.cachePubsubItems( | |
134 client, | |
135 cached_node, | |
136 items | |
137 ) | |
138 for subscription in cached_node.subscriptions: | |
139 if subscription.state != SubscriptionState.SUBSCRIBED: | |
140 continue | |
141 self.notifyPublish( | |
142 service, | |
143 nodeIdentifier, | |
144 [(subscription.subscriber, None, items)] | |
145 ) | |
130 | 146 |
131 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: | 147 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: |
132 """Convert actor ID from following collection to XMPP item""" | 148 """Convert actor ID from following collection to XMPP item""" |
133 actor_id = ap_item["id"] | 149 actor_id = ap_item["id"] |
134 actor_jid = await self.apg.getJIDFromId(actor_id) | 150 actor_jid = await self.apg.getJIDFromId(actor_id) |
332 client, service, node | 348 client, service, node |
333 ) | 349 ) |
334 if cached_node is None: | 350 if cached_node is None: |
335 raise error.StanzaError("item-not-found") | 351 raise error.StanzaError("item-not-found") |
336 else: | 352 else: |
337 if not node.startswith(self.apg._m.namespace): | 353 if node.startswith(self.apg._m.namespace): |
354 parser = self.apg.ap_item_2_mb_elt | |
355 elif node.startswith(self.apg._events.namespace): | |
356 parser = self.apg.ap_events.ap_item_2_event_elt | |
357 else: | |
338 raise error.StanzaError( | 358 raise error.StanzaError( |
339 "feature-not-implemented", | 359 "feature-not-implemented", |
340 text=f"AP Gateway {C.APP_VERSION} only supports " | 360 text=f"AP Gateway {C.APP_VERSION} only supports " |
341 f"{self.apg._m.namespace} node for now" | 361 f"{self.apg._m.namespace} node for now" |
342 ) | 362 ) |
343 collection_name = "outbox" | 363 collection_name = "outbox" |
344 parser = self.apg.apItem2Elt | |
345 use_cache = True | 364 use_cache = True |
346 | 365 |
347 if use_cache: | 366 if use_cache: |
348 if cached_node is None: | 367 if cached_node is None: |
349 cached_node = await self.host.memory.storage.getPubsubNode( | 368 cached_node = await self.host.memory.storage.getPubsubNode( |
431 | 450 |
432 @ensure_deferred | 451 @ensure_deferred |
433 async def subscribe(self, requestor, service, nodeIdentifier, subscriber): | 452 async def subscribe(self, requestor, service, nodeIdentifier, subscriber): |
434 # TODO: handle comments nodes | 453 # TODO: handle comments nodes |
435 client = self.apg.client | 454 client = self.apg.client |
455 # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow | |
456 # is accepted. Other nodes are directly set to subscribed, their subscriptions | |
457 # being internal. | |
458 if nodeIdentifier == self.apg._m.namespace: | |
459 sub_state = SubscriptionState.PENDING | |
460 else: | |
461 sub_state = SubscriptionState.SUBSCRIBED | |
436 node = await self.host.memory.storage.getPubsubNode( | 462 node = await self.host.memory.storage.getPubsubNode( |
437 client, service, nodeIdentifier, with_subscriptions=True | 463 client, service, nodeIdentifier, with_subscriptions=True |
438 ) | 464 ) |
439 if node is None: | 465 if node is None: |
440 node = await self.host.memory.storage.setPubsubNode( | 466 node = await self.host.memory.storage.setPubsubNode( |
453 subscription = None | 479 subscription = None |
454 | 480 |
455 if subscription is None: | 481 if subscription is None: |
456 subscription = PubsubSub( | 482 subscription = PubsubSub( |
457 subscriber=requestor.userhostJID(), | 483 subscriber=requestor.userhostJID(), |
458 state=SubscriptionState.PENDING | 484 state=sub_state |
459 ) | 485 ) |
460 node.subscriptions.append(subscription) | 486 node.subscriptions.append(subscription) |
461 await self.host.memory.storage.add(node) | 487 await self.host.memory.storage.add(node) |
462 else: | 488 else: |
463 if subscription.state is None: | 489 if subscription.state is None: |
464 subscription.state = SubscriptionState.PENDING | 490 subscription.state = sub_state |
465 await self.host.memory.storage.add(node) | 491 await self.host.memory.storage.add(node) |
466 elif subscription.state == SubscriptionState.SUBSCRIBED: | 492 elif subscription.state == SubscriptionState.SUBSCRIBED: |
467 log.info( | 493 log.info( |
468 f"{requestor.userhostJID()} has already a subscription to {node!r} " | 494 f"{requestor.userhostJID()} has already a subscription to {node!r} " |
469 f"at {service}. Doing the request anyway." | 495 f"at {service}. Doing the request anyway." |
471 elif subscription.state == SubscriptionState.PENDING: | 497 elif subscription.state == SubscriptionState.PENDING: |
472 log.info( | 498 log.info( |
473 f"{requestor.userhostJID()} has already a pending subscription to " | 499 f"{requestor.userhostJID()} has already a pending subscription to " |
474 f"{node!r} at {service}. Doing the request anyway." | 500 f"{node!r} at {service}. Doing the request anyway." |
475 ) | 501 ) |
502 if sub_state != SubscriptionState.PENDING: | |
503 subscription.state = sub_state | |
504 await self.host.memory.storage.add(node) | |
476 else: | 505 else: |
477 raise exceptions.InternalError( | 506 raise exceptions.InternalError( |
478 f"unmanaged subscription state: {subscription.state}" | 507 f"unmanaged subscription state: {subscription.state}" |
479 ) | 508 ) |
480 | 509 |
481 if nodeIdentifier == self.apg._m.namespace: | 510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): |
482 # if we subscribe to microblog node, we follow the corresponding account | 511 # if we subscribe to microblog or events node, we follow the corresponding |
512 # account | |
483 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | 513 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( |
484 requestor, service | 514 requestor, service |
485 ) | 515 ) |
486 | 516 |
487 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) | 517 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) |
488 | 518 |
489 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | 519 resp = await self.apg.signAndPost(inbox, req_actor_id, data) |
490 if resp.code >= 300: | 520 if resp.code >= 300: |
491 text = await resp.text() | 521 text = await resp.text() |
492 raise error.StanzaError("service-unavailable", text=text) | 522 raise error.StanzaError("service-unavailable", text=text) |
493 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") | 523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") |
494 | 524 |
495 @ensure_deferred | 525 @ensure_deferred |
496 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | 526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): |
497 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | 527 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( |
498 requestor, service | 528 requestor, service |