comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3888:aa7197b67c26

component AP gateway: AP <=> XMPP reactions conversions: - Pubsub Attachments plugin has been renamed to XEP-0470 following publication - XEP-0470 has been updated to follow 0.2 changes - AP reactions (as implemented in Pleroma) are converted to XEP-0470 - XEP-0470 events are converted to AP reactions (again, using "EmojiReact" from Pleroma) - AP activities related to attachments (like/reactions) are cached in Libervia because it's not possible to retrieve them from Pleroma instances once they have been emitted (doing an HTTP get on their ID returns a 404). For now those cache are not flushed, this should be improved in the future. - `sharedInbox` is used when available. Pleroma returns a 500 HTTP error when ``to`` or ``cc`` are used in a direct inbox. - reactions and like are not currently used for direct messages, because they can't be emitted from Pleroma in this case, thus there is no point in implementing them for the moment. rel 371
author Goffi <goffi@goffi.org>
date Wed, 31 Aug 2022 17:07:03 +0200
parents cea52400623d
children 0aa7023dcd08
comparison
equal deleted inserted replaced
3887:6090141b1b70 3888:aa7197b67c26
40 from sat.memory.sqla_mapping import SubscriptionState 40 from sat.memory.sqla_mapping import SubscriptionState
41 41
42 from .constants import ( 42 from .constants import (
43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, 43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, 44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE 45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE,
46 TYPE_REACTION, ST_AP_CACHE
46 ) 47 )
47 from .regex import RE_SIG_PARAM 48 from .regex import RE_SIG_PARAM
48 49
49 50
50 log = getLogger(__name__) 51 log = getLogger(__name__)
122 signing_actor: str 123 signing_actor: str
123 ) -> None: 124 ) -> None:
124 if node is None: 125 if node is None:
125 node = self.apg._m.namespace 126 node = self.apg._m.namespace
126 client = await self.apg.getVirtualClient(signing_actor) 127 client = await self.apg.getVirtualClient(signing_actor)
127 objects = await self.apg.apGetList(data, "object") 128 object_ = data.get("object")
129 if isinstance(object_, str):
130 # we check first if it's not a cached object
131 ap_cache_key = f"{ST_AP_CACHE}{object_}"
132 value = await self.apg.client._ap_storage.get(ap_cache_key)
133 else:
134 value = None
135 if value is not None:
136 objects = [value]
137 # because we'll undo the activity, we can remove it from cache
138 await self.apg.client._ap_storage.remove(ap_cache_key)
139 else:
140 objects = await self.apg.apGetList(data, "object")
128 for obj in objects: 141 for obj in objects:
129 type_ = obj.get("type") 142 type_ = obj.get("type")
130 actor = await self.apg.apGetSenderActor(obj) 143 actor = await self.apg.apGetSenderActor(obj)
131 if actor != signing_actor: 144 if actor != signing_actor:
132 log.warning(f"ignoring object not attributed to signing actor: {data}") 145 log.warning(f"ignoring object not attributed to signing actor: {data}")
150 elif type_ == "Announce": 163 elif type_ == "Announce":
151 # we can use directly the Announce object, as only the "id" field is 164 # we can use directly the Announce object, as only the "id" field is
152 # needed 165 # needed
153 await self.apg.newAPDeleteItem(client, None, node, obj) 166 await self.apg.newAPDeleteItem(client, None, node, obj)
154 elif type_ == TYPE_LIKE: 167 elif type_ == TYPE_LIKE:
155 await self.handleNewLikeItem(client, obj, True) 168 await self.handleAttachmentItem(client, obj, {"noticed": False})
169 elif type_ == TYPE_REACTION:
170 await self.handleAttachmentItem(client, obj, {
171 "reactions": {"operation": "update", "remove": [obj["content"]]}
172 })
156 else: 173 else:
157 log.warning(f"Unmanaged undo type: {type_!r}") 174 log.warning(f"Unmanaged undo type: {type_!r}")
158 175
159 async def handleFollowActivity( 176 async def handleFollowActivity(
160 self, 177 self,
182 # TODO: manage SubscriptionUnconfigured 199 # TODO: manage SubscriptionUnconfigured
183 else: 200 else:
184 if subscription.state != "subscribed": 201 if subscription.state != "subscribed":
185 # other states should raise an Exception 202 # other states should raise an Exception
186 raise exceptions.InternalError('"subscribed" state was expected') 203 raise exceptions.InternalError('"subscribed" state was expected')
187 inbox = await self.apg.getAPInboxFromId(signing_actor) 204 inbox = await self.apg.getAPInboxFromId(signing_actor, use_shared=False)
188 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) 205 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
189 accept_data = self.apg.createActivity( 206 accept_data = self.apg.createActivity(
190 "Accept", actor_id, object_=data 207 "Accept", actor_id, object_=data
191 ) 208 )
192 await self.apg.signAndPost(inbox, actor_id, accept_data) 209 await self.apg.signAndPost(inbox, actor_id, accept_data)
368 node, 385 node,
369 signing_actor, 386 signing_actor,
370 repeated=True 387 repeated=True
371 ) 388 )
372 389
373 async def handleNewLikeItem( 390 async def handleAttachmentItem(
374 self, 391 self,
375 client: SatXMPPEntity, 392 client: SatXMPPEntity,
376 data: dict, 393 data: dict,
377 undo: bool = False, 394 attachment_data: dict
378 ) -> None: 395 ) -> None:
379 liked_ids = data.get("object") 396 target_ids = data.get("object")
380 if not liked_ids: 397 if not target_ids:
381 raise exceptions.DataError("object should be set") 398 raise exceptions.DataError("object should be set")
382 elif isinstance(liked_ids, list): 399 elif isinstance(target_ids, list):
383 try: 400 try:
384 liked_ids = [o["id"] for o in liked_ids] 401 target_ids = [o["id"] for o in target_ids]
385 except (KeyError, TypeError): 402 except (KeyError, TypeError):
386 raise exceptions.DataError(f"invalid object: {liked_ids!r}") 403 raise exceptions.DataError(f"invalid object: {target_ids!r}")
387 elif isinstance(liked_ids, dict): 404 elif isinstance(target_ids, dict):
388 obj_id = liked_ids.get("id") 405 obj_id = target_ids.get("id")
389 if not obj_id or not isinstance(obj_id, str): 406 if not obj_id or not isinstance(obj_id, str):
390 raise exceptions.DataError(f"invalid object: {liked_ids!r}") 407 raise exceptions.DataError(f"invalid object: {target_ids!r}")
391 liked_ids = [obj_id] 408 target_ids = [obj_id]
392 elif isinstance(liked_ids, str): 409 elif isinstance(target_ids, str):
393 liked_ids = [liked_ids] 410 target_ids = [target_ids]
394 411
395 for liked_id in liked_ids: 412 # XXX: we have to cache AP items because some implementation (Pleroma notably)
396 if not self.apg.isLocalURL(liked_id): 413 # don't keep object accessible, and we need to be able to retrieve them for
397 log.debug(f"ignoring non local liked ID: {liked_id}") 414 # UNDO. Current implementation will grow, we need to add a way to flush it after
415 # a while.
416 # TODO: add a way to flush old cached AP items.
417 await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data)
418
419 for target_id in target_ids:
420 if not self.apg.isLocalURL(target_id):
421 log.debug(f"ignoring non local target ID: {target_id}")
398 continue 422 continue
399 url_type, url_args = self.apg.parseAPURL(liked_id) 423 url_type, url_args = self.apg.parseAPURL(target_id)
400 if url_type != TYPE_ITEM: 424 if url_type != TYPE_ITEM:
401 log.warning(f"unexpected local URL for liked item: {liked_id}") 425 log.warning(f"unexpected local URL for attachment on item {target_id}")
402 continue 426 continue
403 try: 427 try:
404 account, item_id = url_args 428 account, item_id = url_args
405 except ValueError: 429 except ValueError:
406 raise ValueError(f"invalid URL: {liked_id}") 430 raise ValueError(f"invalid URL: {target_id}")
407 author_jid, item_node = await self.apg.getJIDAndNode(account) 431 author_jid, item_node = await self.apg.getJIDAndNode(account)
408 if item_node is None: 432 if item_node is None:
409 item_node = self.apg._m.namespace 433 item_node = self.apg._m.namespace
410 attachment_node = self.apg._pa.getAttachmentNodeName( 434 attachment_node = self.apg._pa.getAttachmentNodeName(
411 author_jid, item_node, item_id 435 author_jid, item_node, item_id
416 attachment_node, 440 attachment_node,
417 with_subscriptions=True, 441 with_subscriptions=True,
418 create=True 442 create=True
419 ) 443 )
420 found_items, __ = await self.apg.host.memory.storage.getItems( 444 found_items, __ = await self.apg.host.memory.storage.getItems(
421 cached_node, item_ids=[item_id] 445 cached_node, item_ids=[client.jid.userhost()]
422 ) 446 )
423 if not found_items: 447 if not found_items:
424 old_item_elt = None 448 old_item_elt = None
425 else: 449 else:
426 found_item = found_items[0] 450 found_item = found_items[0]
427 old_item_elt = found_item.data 451 old_item_elt = found_item.data
428 452
429 item_elt = self.apg._pa.applySetHandler( 453 item_elt = self.apg._pa.applySetHandler(
430 client, 454 client,
431 {"extra": {"noticed": not undo}}, 455 {"extra": attachment_data},
432 old_item_elt, 456 old_item_elt,
433 [("noticed", self.apg._pa.namespace)] 457 None
434 ) 458 )
435 # we reparse the element, as there can be other attachments 459 # we reparse the element, as there can be other attachments
436 attachments_data = self.apg._pa.items2attachmentData(client, [item_elt]) 460 attachments_data = self.apg._pa.items2attachmentData(client, [item_elt])
437 # and we update the cache 461 # and we update the cache
438 await self.apg.host.memory.storage.cachePubsubItems( 462 await self.apg.host.memory.storage.cachePubsubItems(
466 account_jid: Optional[jid.JID], 490 account_jid: Optional[jid.JID],
467 node: Optional[str], 491 node: Optional[str],
468 ap_account: Optional[str], 492 ap_account: Optional[str],
469 ap_url: str, 493 ap_url: str,
470 signing_actor: str 494 signing_actor: str
471 ): 495 ) -> None:
472 client = await self.apg.getVirtualClient(signing_actor) 496 client = await self.apg.getVirtualClient(signing_actor)
473 await self.handleNewLikeItem(client, data) 497 await self.handleAttachmentItem(client, data, {"noticed": True})
498
499 async def handleEmojireactActivity(
500 self,
501 request: "HTTPRequest",
502 data: dict,
503 account_jid: Optional[jid.JID],
504 node: Optional[str],
505 ap_account: Optional[str],
506 ap_url: str,
507 signing_actor: str
508 ) -> None:
509 client = await self.apg.getVirtualClient(signing_actor)
510 await self.handleAttachmentItem(client, data, {
511 "reactions": {"operation": "update", "add": [data["content"]]}
512 })
474 513
475 async def APActorRequest( 514 async def APActorRequest(
476 self, 515 self,
477 request: "HTTPRequest", 516 request: "HTTPRequest",
478 account_jid: jid.JID, 517 account_jid: jid.JID,