Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3865:59fbb66b2923
component AP gateway: handle XMPP attachments -> AP likes conversion:
convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node
is created in cache when necessary, and virtual items published to virtual JID mapping AP
accounts are cached too.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 20 Jul 2022 17:53:12 +0200 |
parents | 6329ee6b6df4 |
children | 37d2c0282304 |
comparison
equal
deleted
inserted
replaced
3864:ac255a0fbd4c | 3865:59fbb66b2923 |
---|---|
117 "bad-request", | 117 "bad-request", |
118 f"{ap_account!r} is not a valid ActivityPub actor account." | 118 f"{ap_account!r} is not a valid ActivityPub actor account." |
119 ) | 119 ) |
120 | 120 |
121 client = self.apg.client.getVirtualClient(requestor) | 121 client = self.apg.client.getVirtualClient(requestor) |
122 await self.apg.convertAndPostItems( | 122 if self.apg._pa.isAttachmentNode(nodeIdentifier): |
123 client, ap_account, service, nodeIdentifier, items | 123 await self.apg.convertAndPostAttachments( |
124 ) | 124 client, ap_account, service, nodeIdentifier, items, publisher=requestor |
125 ) | |
126 else: | |
127 await self.apg.convertAndPostItems( | |
128 client, ap_account, service, nodeIdentifier, items | |
129 ) | |
125 | 130 |
126 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: | 131 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: |
127 """Convert actor ID from following collection to XMPP item""" | 132 """Convert actor ID from following collection to XMPP item""" |
128 actor_id = ap_item["id"] | 133 actor_id = ap_item["id"] |
129 actor_jid = await self.apg.getJIDFromId(actor_id) | 134 actor_jid = await self.apg.getJIDFromId(actor_id) |
289 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) | 294 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) |
290 if ap_account.count("@") != 1: | 295 if ap_account.count("@") != 1: |
291 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") | 296 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") |
292 return [], None | 297 return [], None |
293 | 298 |
299 # cached_node may be pre-filled with some nodes (e.g. attachments nodes), | |
300 # otherwise it is filled when suitable | |
301 cached_node = None | |
302 client = self.apg.client | |
294 kwargs = {} | 303 kwargs = {} |
295 | 304 |
296 if node == self.apg._pps.subscriptions_node: | 305 if node == self.apg._pps.subscriptions_node: |
297 collection_name = "following" | 306 collection_name = "following" |
298 parser = self.apFollowing2Elt | 307 parser = self.apFollowing2Elt |
313 elif node == self.apg._a.namespace_data: | 322 elif node == self.apg._a.namespace_data: |
314 item_elt = await self.generateAvatarData( | 323 item_elt = await self.generateAvatarData( |
315 self.apg.client, ap_account, itemIdentifiers | 324 self.apg.client, ap_account, itemIdentifiers |
316 ) | 325 ) |
317 return [item_elt], None | 326 return [item_elt], None |
327 elif self.apg._pa.isAttachmentNode(node): | |
328 use_cache = True | |
329 # we check cache here because we emit an item-not-found error if the node is | |
330 # not in cache, as we are not dealing with real AP items | |
331 cached_node = await self.host.memory.storage.getPubsubNode( | |
332 client, service, node | |
333 ) | |
334 if cached_node is None: | |
335 raise error.StanzaError("item-not-found") | |
318 else: | 336 else: |
319 if not node.startswith(self.apg._m.namespace): | 337 if not node.startswith(self.apg._m.namespace): |
320 raise error.StanzaError( | 338 raise error.StanzaError( |
321 "feature-not-implemented", | 339 "feature-not-implemented", |
322 text=f"AP Gateway {C.APP_VERSION} only supports " | 340 text=f"AP Gateway {C.APP_VERSION} only supports " |
324 ) | 342 ) |
325 collection_name = "outbox" | 343 collection_name = "outbox" |
326 parser = self.apg.apItem2Elt | 344 parser = self.apg.apItem2Elt |
327 use_cache = True | 345 use_cache = True |
328 | 346 |
329 client = self.apg.client | |
330 if use_cache: | 347 if use_cache: |
331 cached_node = await self.host.memory.storage.getPubsubNode( | 348 if cached_node is None: |
332 client, service, node | 349 cached_node = await self.host.memory.storage.getPubsubNode( |
333 ) | 350 client, service, node |
351 ) | |
334 # TODO: check if node is synchronised | 352 # TODO: check if node is synchronised |
335 if cached_node is not None: | 353 if cached_node is not None: |
336 # the node is cached, we return items from cache | 354 # the node is cached, we return items from cache |
337 log.debug(f"node {node!r} from {service} is in cache") | 355 log.debug(f"node {node!r} from {service} is in cache") |
338 pubsub_items, metadata = await self.apg._c.getItemsFromCache( | 356 pubsub_items, metadata = await self.apg._c.getItemsFromCache( |
465 ) | 483 ) |
466 | 484 |
467 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) | 485 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) |
468 | 486 |
469 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | 487 resp = await self.apg.signAndPost(inbox, req_actor_id, data) |
470 if resp.code >= 400: | 488 if resp.code >= 300: |
471 text = await resp.text() | 489 text = await resp.text() |
472 raise error.StanzaError("service-unavailable", text=text) | 490 raise error.StanzaError("service-unavailable", text=text) |
473 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") | 491 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") |
474 | 492 |
475 @ensure_deferred | 493 @ensure_deferred |
486 recip_actor_id | 504 recip_actor_id |
487 ) | 505 ) |
488 ) | 506 ) |
489 | 507 |
490 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | 508 resp = await self.apg.signAndPost(inbox, req_actor_id, data) |
491 if resp.code >= 400: | 509 if resp.code >= 300: |
492 text = await resp.text() | 510 text = await resp.text() |
493 raise error.StanzaError("service-unavailable", text=text) | 511 raise error.StanzaError("service-unavailable", text=text) |
494 | 512 |
495 def getConfigurationOptions(self): | 513 def getConfigurationOptions(self): |
496 return NODE_OPTIONS | 514 return NODE_OPTIONS |