comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3865:59fbb66b2923

component AP gateway: handle XMPP attachments -> AP likes conversion: convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node is created in cache when necessary, and virtual items published to virtual JID mapping AP accounts are cached too. rel 370
author Goffi <goffi@goffi.org>
date Wed, 20 Jul 2022 17:53:12 +0200
parents 6329ee6b6df4
children 37d2c0282304
comparison
equal deleted inserted replaced
3864:ac255a0fbd4c 3865:59fbb66b2923
117 "bad-request", 117 "bad-request",
118 f"{ap_account!r} is not a valid ActivityPub actor account." 118 f"{ap_account!r} is not a valid ActivityPub actor account."
119 ) 119 )
120 120
121 client = self.apg.client.getVirtualClient(requestor) 121 client = self.apg.client.getVirtualClient(requestor)
122 await self.apg.convertAndPostItems( 122 if self.apg._pa.isAttachmentNode(nodeIdentifier):
123 client, ap_account, service, nodeIdentifier, items 123 await self.apg.convertAndPostAttachments(
124 ) 124 client, ap_account, service, nodeIdentifier, items, publisher=requestor
125 )
126 else:
127 await self.apg.convertAndPostItems(
128 client, ap_account, service, nodeIdentifier, items
129 )
125 130
126 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: 131 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
127 """Convert actor ID from following collection to XMPP item""" 132 """Convert actor ID from following collection to XMPP item"""
128 actor_id = ap_item["id"] 133 actor_id = ap_item["id"]
129 actor_jid = await self.apg.getJIDFromId(actor_id) 134 actor_jid = await self.apg.getJIDFromId(actor_id)
289 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) 294 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
290 if ap_account.count("@") != 1: 295 if ap_account.count("@") != 1:
291 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") 296 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
292 return [], None 297 return [], None
293 298
299 # cached_node may be pre-filled with some nodes (e.g. attachments nodes),
300 # otherwise it is filled when suitable
301 cached_node = None
302 client = self.apg.client
294 kwargs = {} 303 kwargs = {}
295 304
296 if node == self.apg._pps.subscriptions_node: 305 if node == self.apg._pps.subscriptions_node:
297 collection_name = "following" 306 collection_name = "following"
298 parser = self.apFollowing2Elt 307 parser = self.apFollowing2Elt
313 elif node == self.apg._a.namespace_data: 322 elif node == self.apg._a.namespace_data:
314 item_elt = await self.generateAvatarData( 323 item_elt = await self.generateAvatarData(
315 self.apg.client, ap_account, itemIdentifiers 324 self.apg.client, ap_account, itemIdentifiers
316 ) 325 )
317 return [item_elt], None 326 return [item_elt], None
327 elif self.apg._pa.isAttachmentNode(node):
328 use_cache = True
329 # we check cache here because we emit an item-not-found error if the node is
330 # not in cache, as we are not dealing with real AP items
331 cached_node = await self.host.memory.storage.getPubsubNode(
332 client, service, node
333 )
334 if cached_node is None:
335 raise error.StanzaError("item-not-found")
318 else: 336 else:
319 if not node.startswith(self.apg._m.namespace): 337 if not node.startswith(self.apg._m.namespace):
320 raise error.StanzaError( 338 raise error.StanzaError(
321 "feature-not-implemented", 339 "feature-not-implemented",
322 text=f"AP Gateway {C.APP_VERSION} only supports " 340 text=f"AP Gateway {C.APP_VERSION} only supports "
324 ) 342 )
325 collection_name = "outbox" 343 collection_name = "outbox"
326 parser = self.apg.apItem2Elt 344 parser = self.apg.apItem2Elt
327 use_cache = True 345 use_cache = True
328 346
329 client = self.apg.client
330 if use_cache: 347 if use_cache:
331 cached_node = await self.host.memory.storage.getPubsubNode( 348 if cached_node is None:
332 client, service, node 349 cached_node = await self.host.memory.storage.getPubsubNode(
333 ) 350 client, service, node
351 )
334 # TODO: check if node is synchronised 352 # TODO: check if node is synchronised
335 if cached_node is not None: 353 if cached_node is not None:
336 # the node is cached, we return items from cache 354 # the node is cached, we return items from cache
337 log.debug(f"node {node!r} from {service} is in cache") 355 log.debug(f"node {node!r} from {service} is in cache")
338 pubsub_items, metadata = await self.apg._c.getItemsFromCache( 356 pubsub_items, metadata = await self.apg._c.getItemsFromCache(
465 ) 483 )
466 484
467 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) 485 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
468 486
469 resp = await self.apg.signAndPost(inbox, req_actor_id, data) 487 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
470 if resp.code >= 400: 488 if resp.code >= 300:
471 text = await resp.text() 489 text = await resp.text()
472 raise error.StanzaError("service-unavailable", text=text) 490 raise error.StanzaError("service-unavailable", text=text)
473 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") 491 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
474 492
475 @ensure_deferred 493 @ensure_deferred
486 recip_actor_id 504 recip_actor_id
487 ) 505 )
488 ) 506 )
489 507
490 resp = await self.apg.signAndPost(inbox, req_actor_id, data) 508 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
491 if resp.code >= 400: 509 if resp.code >= 300:
492 text = await resp.text() 510 text = await resp.text()
493 raise error.StanzaError("service-unavailable", text=text) 511 raise error.StanzaError("service-unavailable", text=text)
494 512
495 def getConfigurationOptions(self): 513 def getConfigurationOptions(self):
496 return NODE_OPTIONS 514 return NODE_OPTIONS