Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3888:aa7197b67c26
component AP gateway: AP <=> XMPP reactions conversions:
- Pubsub Attachments plugin has been renamed to XEP-0470 following publication
- XEP-0470 has been updated to follow 0.2 changes
- AP reactions (as implemented in Pleroma) are converted to XEP-0470
- XEP-0470 events are converted to AP reactions (again, using "EmojiReact" from Pleroma)
- AP activities related to attachments (like/reactions) are cached in Libervia because
it's not possible to retrieve them from Pleroma instances once they have been emitted
(doing an HTTP get on their ID returns a 404). For now those cache are not flushed, this
should be improved in the future.
- `sharedInbox` is used when available. Pleroma returns a 500 HTTP error when ``to`` or
``cc`` are used in a direct inbox.
- reactions and like are not currently used for direct messages, because they can't be
emitted from Pleroma in this case, thus there is no point in implementing them for the
moment.
rel 371
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 31 Aug 2022 17:07:03 +0200 |
parents | cea52400623d |
children | 0aa7023dcd08 |
comparison
equal
deleted
inserted
replaced
3887:6090141b1b70 | 3888:aa7197b67c26 |
---|---|
40 from sat.memory.sqla_mapping import SubscriptionState | 40 from sat.memory.sqla_mapping import SubscriptionState |
41 | 41 |
42 from .constants import ( | 42 from .constants import ( |
43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, | 43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, |
44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, | 44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, |
45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE | 45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, |
46 TYPE_REACTION, ST_AP_CACHE | |
46 ) | 47 ) |
47 from .regex import RE_SIG_PARAM | 48 from .regex import RE_SIG_PARAM |
48 | 49 |
49 | 50 |
50 log = getLogger(__name__) | 51 log = getLogger(__name__) |
122 signing_actor: str | 123 signing_actor: str |
123 ) -> None: | 124 ) -> None: |
124 if node is None: | 125 if node is None: |
125 node = self.apg._m.namespace | 126 node = self.apg._m.namespace |
126 client = await self.apg.getVirtualClient(signing_actor) | 127 client = await self.apg.getVirtualClient(signing_actor) |
127 objects = await self.apg.apGetList(data, "object") | 128 object_ = data.get("object") |
129 if isinstance(object_, str): | |
130 # we check first if it's not a cached object | |
131 ap_cache_key = f"{ST_AP_CACHE}{object_}" | |
132 value = await self.apg.client._ap_storage.get(ap_cache_key) | |
133 else: | |
134 value = None | |
135 if value is not None: | |
136 objects = [value] | |
137 # because we'll undo the activity, we can remove it from cache | |
138 await self.apg.client._ap_storage.remove(ap_cache_key) | |
139 else: | |
140 objects = await self.apg.apGetList(data, "object") | |
128 for obj in objects: | 141 for obj in objects: |
129 type_ = obj.get("type") | 142 type_ = obj.get("type") |
130 actor = await self.apg.apGetSenderActor(obj) | 143 actor = await self.apg.apGetSenderActor(obj) |
131 if actor != signing_actor: | 144 if actor != signing_actor: |
132 log.warning(f"ignoring object not attributed to signing actor: {data}") | 145 log.warning(f"ignoring object not attributed to signing actor: {data}") |
150 elif type_ == "Announce": | 163 elif type_ == "Announce": |
151 # we can use directly the Announce object, as only the "id" field is | 164 # we can use directly the Announce object, as only the "id" field is |
152 # needed | 165 # needed |
153 await self.apg.newAPDeleteItem(client, None, node, obj) | 166 await self.apg.newAPDeleteItem(client, None, node, obj) |
154 elif type_ == TYPE_LIKE: | 167 elif type_ == TYPE_LIKE: |
155 await self.handleNewLikeItem(client, obj, True) | 168 await self.handleAttachmentItem(client, obj, {"noticed": False}) |
169 elif type_ == TYPE_REACTION: | |
170 await self.handleAttachmentItem(client, obj, { | |
171 "reactions": {"operation": "update", "remove": [obj["content"]]} | |
172 }) | |
156 else: | 173 else: |
157 log.warning(f"Unmanaged undo type: {type_!r}") | 174 log.warning(f"Unmanaged undo type: {type_!r}") |
158 | 175 |
159 async def handleFollowActivity( | 176 async def handleFollowActivity( |
160 self, | 177 self, |
182 # TODO: manage SubscriptionUnconfigured | 199 # TODO: manage SubscriptionUnconfigured |
183 else: | 200 else: |
184 if subscription.state != "subscribed": | 201 if subscription.state != "subscribed": |
185 # other states should raise an Exception | 202 # other states should raise an Exception |
186 raise exceptions.InternalError('"subscribed" state was expected') | 203 raise exceptions.InternalError('"subscribed" state was expected') |
187 inbox = await self.apg.getAPInboxFromId(signing_actor) | 204 inbox = await self.apg.getAPInboxFromId(signing_actor, use_shared=False) |
188 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) | 205 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) |
189 accept_data = self.apg.createActivity( | 206 accept_data = self.apg.createActivity( |
190 "Accept", actor_id, object_=data | 207 "Accept", actor_id, object_=data |
191 ) | 208 ) |
192 await self.apg.signAndPost(inbox, actor_id, accept_data) | 209 await self.apg.signAndPost(inbox, actor_id, accept_data) |
368 node, | 385 node, |
369 signing_actor, | 386 signing_actor, |
370 repeated=True | 387 repeated=True |
371 ) | 388 ) |
372 | 389 |
373 async def handleNewLikeItem( | 390 async def handleAttachmentItem( |
374 self, | 391 self, |
375 client: SatXMPPEntity, | 392 client: SatXMPPEntity, |
376 data: dict, | 393 data: dict, |
377 undo: bool = False, | 394 attachment_data: dict |
378 ) -> None: | 395 ) -> None: |
379 liked_ids = data.get("object") | 396 target_ids = data.get("object") |
380 if not liked_ids: | 397 if not target_ids: |
381 raise exceptions.DataError("object should be set") | 398 raise exceptions.DataError("object should be set") |
382 elif isinstance(liked_ids, list): | 399 elif isinstance(target_ids, list): |
383 try: | 400 try: |
384 liked_ids = [o["id"] for o in liked_ids] | 401 target_ids = [o["id"] for o in target_ids] |
385 except (KeyError, TypeError): | 402 except (KeyError, TypeError): |
386 raise exceptions.DataError(f"invalid object: {liked_ids!r}") | 403 raise exceptions.DataError(f"invalid object: {target_ids!r}") |
387 elif isinstance(liked_ids, dict): | 404 elif isinstance(target_ids, dict): |
388 obj_id = liked_ids.get("id") | 405 obj_id = target_ids.get("id") |
389 if not obj_id or not isinstance(obj_id, str): | 406 if not obj_id or not isinstance(obj_id, str): |
390 raise exceptions.DataError(f"invalid object: {liked_ids!r}") | 407 raise exceptions.DataError(f"invalid object: {target_ids!r}") |
391 liked_ids = [obj_id] | 408 target_ids = [obj_id] |
392 elif isinstance(liked_ids, str): | 409 elif isinstance(target_ids, str): |
393 liked_ids = [liked_ids] | 410 target_ids = [target_ids] |
394 | 411 |
395 for liked_id in liked_ids: | 412 # XXX: we have to cache AP items because some implementation (Pleroma notably) |
396 if not self.apg.isLocalURL(liked_id): | 413 # don't keep object accessible, and we need to be able to retrieve them for |
397 log.debug(f"ignoring non local liked ID: {liked_id}") | 414 # UNDO. Current implementation will grow, we need to add a way to flush it after |
415 # a while. | |
416 # TODO: add a way to flush old cached AP items. | |
417 await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data) | |
418 | |
419 for target_id in target_ids: | |
420 if not self.apg.isLocalURL(target_id): | |
421 log.debug(f"ignoring non local target ID: {target_id}") | |
398 continue | 422 continue |
399 url_type, url_args = self.apg.parseAPURL(liked_id) | 423 url_type, url_args = self.apg.parseAPURL(target_id) |
400 if url_type != TYPE_ITEM: | 424 if url_type != TYPE_ITEM: |
401 log.warning(f"unexpected local URL for liked item: {liked_id}") | 425 log.warning(f"unexpected local URL for attachment on item {target_id}") |
402 continue | 426 continue |
403 try: | 427 try: |
404 account, item_id = url_args | 428 account, item_id = url_args |
405 except ValueError: | 429 except ValueError: |
406 raise ValueError(f"invalid URL: {liked_id}") | 430 raise ValueError(f"invalid URL: {target_id}") |
407 author_jid, item_node = await self.apg.getJIDAndNode(account) | 431 author_jid, item_node = await self.apg.getJIDAndNode(account) |
408 if item_node is None: | 432 if item_node is None: |
409 item_node = self.apg._m.namespace | 433 item_node = self.apg._m.namespace |
410 attachment_node = self.apg._pa.getAttachmentNodeName( | 434 attachment_node = self.apg._pa.getAttachmentNodeName( |
411 author_jid, item_node, item_id | 435 author_jid, item_node, item_id |
416 attachment_node, | 440 attachment_node, |
417 with_subscriptions=True, | 441 with_subscriptions=True, |
418 create=True | 442 create=True |
419 ) | 443 ) |
420 found_items, __ = await self.apg.host.memory.storage.getItems( | 444 found_items, __ = await self.apg.host.memory.storage.getItems( |
421 cached_node, item_ids=[item_id] | 445 cached_node, item_ids=[client.jid.userhost()] |
422 ) | 446 ) |
423 if not found_items: | 447 if not found_items: |
424 old_item_elt = None | 448 old_item_elt = None |
425 else: | 449 else: |
426 found_item = found_items[0] | 450 found_item = found_items[0] |
427 old_item_elt = found_item.data | 451 old_item_elt = found_item.data |
428 | 452 |
429 item_elt = self.apg._pa.applySetHandler( | 453 item_elt = self.apg._pa.applySetHandler( |
430 client, | 454 client, |
431 {"extra": {"noticed": not undo}}, | 455 {"extra": attachment_data}, |
432 old_item_elt, | 456 old_item_elt, |
433 [("noticed", self.apg._pa.namespace)] | 457 None |
434 ) | 458 ) |
435 # we reparse the element, as there can be other attachments | 459 # we reparse the element, as there can be other attachments |
436 attachments_data = self.apg._pa.items2attachmentData(client, [item_elt]) | 460 attachments_data = self.apg._pa.items2attachmentData(client, [item_elt]) |
437 # and we update the cache | 461 # and we update the cache |
438 await self.apg.host.memory.storage.cachePubsubItems( | 462 await self.apg.host.memory.storage.cachePubsubItems( |
466 account_jid: Optional[jid.JID], | 490 account_jid: Optional[jid.JID], |
467 node: Optional[str], | 491 node: Optional[str], |
468 ap_account: Optional[str], | 492 ap_account: Optional[str], |
469 ap_url: str, | 493 ap_url: str, |
470 signing_actor: str | 494 signing_actor: str |
471 ): | 495 ) -> None: |
472 client = await self.apg.getVirtualClient(signing_actor) | 496 client = await self.apg.getVirtualClient(signing_actor) |
473 await self.handleNewLikeItem(client, data) | 497 await self.handleAttachmentItem(client, data, {"noticed": True}) |
498 | |
499 async def handleEmojireactActivity( | |
500 self, | |
501 request: "HTTPRequest", | |
502 data: dict, | |
503 account_jid: Optional[jid.JID], | |
504 node: Optional[str], | |
505 ap_account: Optional[str], | |
506 ap_url: str, | |
507 signing_actor: str | |
508 ) -> None: | |
509 client = await self.apg.getVirtualClient(signing_actor) | |
510 await self.handleAttachmentItem(client, data, { | |
511 "reactions": {"operation": "update", "add": [data["content"]]} | |
512 }) | |
474 | 513 |
475 async def APActorRequest( | 514 async def APActorRequest( |
476 self, | 515 self, |
477 request: "HTTPRequest", | 516 request: "HTTPRequest", |
478 account_jid: jid.JID, | 517 account_jid: jid.JID, |