comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents aa7197b67c26
children 78b5f356900c
comparison
equal deleted inserted replaced
3903:384b7e6c2dbf 3904:0aa7023dcd08
125 ) 125 )
126 else: 126 else:
127 await self.apg.convertAndPostItems( 127 await self.apg.convertAndPostItems(
128 client, ap_account, service, nodeIdentifier, items 128 client, ap_account, service, nodeIdentifier, items
129 ) 129 )
130 cached_node = await self.host.memory.storage.getPubsubNode(
131 client, service, nodeIdentifier, with_subscriptions=True, create=True
132 )
133 await self.host.memory.storage.cachePubsubItems(
134 client,
135 cached_node,
136 items
137 )
138 for subscription in cached_node.subscriptions:
139 if subscription.state != SubscriptionState.SUBSCRIBED:
140 continue
141 self.notifyPublish(
142 service,
143 nodeIdentifier,
144 [(subscription.subscriber, None, items)]
145 )
130 146
131 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: 147 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
132 """Convert actor ID from following collection to XMPP item""" 148 """Convert actor ID from following collection to XMPP item"""
133 actor_id = ap_item["id"] 149 actor_id = ap_item["id"]
134 actor_jid = await self.apg.getJIDFromId(actor_id) 150 actor_jid = await self.apg.getJIDFromId(actor_id)
332 client, service, node 348 client, service, node
333 ) 349 )
334 if cached_node is None: 350 if cached_node is None:
335 raise error.StanzaError("item-not-found") 351 raise error.StanzaError("item-not-found")
336 else: 352 else:
337 if not node.startswith(self.apg._m.namespace): 353 if node.startswith(self.apg._m.namespace):
354 parser = self.apg.ap_item_2_mb_elt
355 elif node.startswith(self.apg._events.namespace):
356 parser = self.apg.ap_events.ap_item_2_event_elt
357 else:
338 raise error.StanzaError( 358 raise error.StanzaError(
339 "feature-not-implemented", 359 "feature-not-implemented",
340 text=f"AP Gateway {C.APP_VERSION} only supports " 360 text=f"AP Gateway {C.APP_VERSION} only supports "
341 f"{self.apg._m.namespace} node for now" 361 f"{self.apg._m.namespace} node for now"
342 ) 362 )
343 collection_name = "outbox" 363 collection_name = "outbox"
344 parser = self.apg.apItem2Elt
345 use_cache = True 364 use_cache = True
346 365
347 if use_cache: 366 if use_cache:
348 if cached_node is None: 367 if cached_node is None:
349 cached_node = await self.host.memory.storage.getPubsubNode( 368 cached_node = await self.host.memory.storage.getPubsubNode(
431 450
432 @ensure_deferred 451 @ensure_deferred
433 async def subscribe(self, requestor, service, nodeIdentifier, subscriber): 452 async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
434 # TODO: handle comments nodes 453 # TODO: handle comments nodes
435 client = self.apg.client 454 client = self.apg.client
455 # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
456 # is accepted. Other nodes are directly set to subscribed, their subscriptions
457 # being internal.
458 if nodeIdentifier == self.apg._m.namespace:
459 sub_state = SubscriptionState.PENDING
460 else:
461 sub_state = SubscriptionState.SUBSCRIBED
436 node = await self.host.memory.storage.getPubsubNode( 462 node = await self.host.memory.storage.getPubsubNode(
437 client, service, nodeIdentifier, with_subscriptions=True 463 client, service, nodeIdentifier, with_subscriptions=True
438 ) 464 )
439 if node is None: 465 if node is None:
440 node = await self.host.memory.storage.setPubsubNode( 466 node = await self.host.memory.storage.setPubsubNode(
453 subscription = None 479 subscription = None
454 480
455 if subscription is None: 481 if subscription is None:
456 subscription = PubsubSub( 482 subscription = PubsubSub(
457 subscriber=requestor.userhostJID(), 483 subscriber=requestor.userhostJID(),
458 state=SubscriptionState.PENDING 484 state=sub_state
459 ) 485 )
460 node.subscriptions.append(subscription) 486 node.subscriptions.append(subscription)
461 await self.host.memory.storage.add(node) 487 await self.host.memory.storage.add(node)
462 else: 488 else:
463 if subscription.state is None: 489 if subscription.state is None:
464 subscription.state = SubscriptionState.PENDING 490 subscription.state = sub_state
465 await self.host.memory.storage.add(node) 491 await self.host.memory.storage.add(node)
466 elif subscription.state == SubscriptionState.SUBSCRIBED: 492 elif subscription.state == SubscriptionState.SUBSCRIBED:
467 log.info( 493 log.info(
468 f"{requestor.userhostJID()} has already a subscription to {node!r} " 494 f"{requestor.userhostJID()} has already a subscription to {node!r} "
469 f"at {service}. Doing the request anyway." 495 f"at {service}. Doing the request anyway."
471 elif subscription.state == SubscriptionState.PENDING: 497 elif subscription.state == SubscriptionState.PENDING:
472 log.info( 498 log.info(
473 f"{requestor.userhostJID()} has already a pending subscription to " 499 f"{requestor.userhostJID()} has already a pending subscription to "
474 f"{node!r} at {service}. Doing the request anyway." 500 f"{node!r} at {service}. Doing the request anyway."
475 ) 501 )
502 if sub_state != SubscriptionState.PENDING:
503 subscription.state = sub_state
504 await self.host.memory.storage.add(node)
476 else: 505 else:
477 raise exceptions.InternalError( 506 raise exceptions.InternalError(
478 f"unmanaged subscription state: {subscription.state}" 507 f"unmanaged subscription state: {subscription.state}"
479 ) 508 )
480 509
481 if nodeIdentifier == self.apg._m.namespace: 510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
482 # if we subscribe to microblog node, we follow the corresponding account 511 # if we subscribe to microblog or events node, we follow the corresponding
512 # account
483 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( 513 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
484 requestor, service 514 requestor, service
485 ) 515 )
486 516
487 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) 517 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
488 518
489 resp = await self.apg.signAndPost(inbox, req_actor_id, data) 519 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
490 if resp.code >= 300: 520 if resp.code >= 300:
491 text = await resp.text() 521 text = await resp.text()
492 raise error.StanzaError("service-unavailable", text=text) 522 raise error.StanzaError("service-unavailable", text=text)
493 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") 523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
494 524
495 @ensure_deferred 525 @ensure_deferred
496 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): 526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
497 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( 527 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
498 requestor, service 528 requestor, service