Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3904:0aa7023dcd08
component AP gateway: events:
- XMPP Events <=> AP Events conversion
- `Join`/`Leave` activities are converted to RSVP attachments and vice versa
- fix caching/notification on item published on a virtual pubsub node
- add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account
- handle `Update` activity
- on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of
the item is already present in cache
- `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the
actor mapping the same JID with the Events node (i.e. it links to the Events node of the
entity)
- fix subscription to nodes which are not the microblog one
rel 372
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Sep 2022 00:01:41 +0200 |
parents | aa7197b67c26 |
children | 6fa4ca0c047e |
comparison
equal
deleted
inserted
replaced
3903:384b7e6c2dbf | 3904:0aa7023dcd08 |
---|---|
39 from sat.tools.common import date_utils, uri | 39 from sat.tools.common import date_utils, uri |
40 from sat.memory.sqla_mapping import SubscriptionState | 40 from sat.memory.sqla_mapping import SubscriptionState |
41 | 41 |
42 from .constants import ( | 42 from .constants import ( |
43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, | 43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, |
44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, | 44 TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, |
45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, | 45 ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, |
46 TYPE_REACTION, ST_AP_CACHE | 46 TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE |
47 ) | 47 ) |
48 from .regex import RE_SIG_PARAM | 48 from .regex import RE_SIG_PARAM |
49 | 49 |
50 | 50 |
51 log = getLogger(__name__) | 51 log = getLogger(__name__) |
82 request, | 82 request, |
83 http.INTERNAL_SERVER_ERROR, | 83 http.INTERNAL_SERVER_ERROR, |
84 f"internal error: {failure_.value}" | 84 f"internal error: {failure_.value}" |
85 ) | 85 ) |
86 request.finish() | 86 request.finish() |
87 raise failure_ | |
87 | 88 |
88 async def webfinger(self, request): | 89 async def webfinger(self, request): |
89 url_parsed = parse.urlparse(request.uri.decode()) | 90 url_parsed = parse.urlparse(request.uri.decode()) |
90 query = parse.parse_qs(url_parsed.query) | 91 query = parse.parse_qs(url_parsed.query) |
91 resource = query.get("resource", [""])[0] | 92 resource = query.get("resource", [""])[0] |
293 log.error( | 294 log.error( |
294 '"_repeated" field already present in given AP item, this should not ' | 295 '"_repeated" field already present in given AP item, this should not ' |
295 f"happen. Ignoring object from {signing_actor}\n{data}" | 296 f"happen. Ignoring object from {signing_actor}\n{data}" |
296 ) | 297 ) |
297 raise exceptions.DataError("unexpected field in item") | 298 raise exceptions.DataError("unexpected field in item") |
298 if node is None: | |
299 node = self.apg._m.namespace | |
300 client = await self.apg.getVirtualClient(signing_actor) | 299 client = await self.apg.getVirtualClient(signing_actor) |
301 objects = await self.apg.apGetList(data, "object") | 300 objects = await self.apg.apGetList(data, "object") |
302 for obj in objects: | 301 for obj in objects: |
302 if node is None: | |
303 if obj.get("type") == TYPE_EVENT: | |
304 node = self.apg._events.namespace | |
305 else: | |
306 node = self.apg._m.namespace | |
303 sender = await self.apg.apGetSenderActor(obj) | 307 sender = await self.apg.apGetSenderActor(obj) |
304 if repeated: | 308 if repeated: |
305 # we don't check sender when item is repeated, as it should be different | 309 # we don't check sender when item is repeated, as it should be different |
306 # from post author in this case | 310 # from post author in this case |
307 sender_jid = await self.apg.getJIDFromId(sender) | 311 sender_jid = await self.apg.getJIDFromId(sender) |
351 if sender != signing_actor: | 355 if sender != signing_actor: |
352 log.warning( | 356 log.warning( |
353 "Ignoring object not attributed to signing actor: {obj}" | 357 "Ignoring object not attributed to signing actor: {obj}" |
354 ) | 358 ) |
355 continue | 359 continue |
360 | |
356 await self.apg.newAPItem(client, account_jid, node, obj) | 361 await self.apg.newAPItem(client, account_jid, node, obj) |
357 | 362 |
358 async def handleCreateActivity( | 363 async def handleCreateActivity( |
359 self, | 364 self, |
360 request: "HTTPRequest", | 365 request: "HTTPRequest", |
363 node: Optional[str], | 368 node: Optional[str], |
364 ap_account: Optional[str], | 369 ap_account: Optional[str], |
365 ap_url: str, | 370 ap_url: str, |
366 signing_actor: str | 371 signing_actor: str |
367 ): | 372 ): |
373 await self.handleNewAPItems(request, data, account_jid, node, signing_actor) | |
374 | |
375 async def handleUpdateActivity( | |
376 self, | |
377 request: "HTTPRequest", | |
378 data: dict, | |
379 account_jid: Optional[jid.JID], | |
380 node: Optional[str], | |
381 ap_account: Optional[str], | |
382 ap_url: str, | |
383 signing_actor: str | |
384 ): | |
385 # Update is the same as create: the item ID stays the same, thus the item will be | |
386 # overwritten | |
368 await self.handleNewAPItems(request, data, account_jid, node, signing_actor) | 387 await self.handleNewAPItems(request, data, account_jid, node, signing_actor) |
369 | 388 |
370 async def handleAnnounceActivity( | 389 async def handleAnnounceActivity( |
371 self, | 390 self, |
372 request: "HTTPRequest", | 391 request: "HTTPRequest", |
509 client = await self.apg.getVirtualClient(signing_actor) | 528 client = await self.apg.getVirtualClient(signing_actor) |
510 await self.handleAttachmentItem(client, data, { | 529 await self.handleAttachmentItem(client, data, { |
511 "reactions": {"operation": "update", "add": [data["content"]]} | 530 "reactions": {"operation": "update", "add": [data["content"]]} |
512 }) | 531 }) |
513 | 532 |
533 async def handleJoinActivity( | |
534 self, | |
535 request: "HTTPRequest", | |
536 data: dict, | |
537 account_jid: Optional[jid.JID], | |
538 node: Optional[str], | |
539 ap_account: Optional[str], | |
540 ap_url: str, | |
541 signing_actor: str | |
542 ) -> None: | |
543 client = await self.apg.getVirtualClient(signing_actor) | |
544 await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "yes"}}) | |
545 | |
546 async def handleLeaveActivity( | |
547 self, | |
548 request: "HTTPRequest", | |
549 data: dict, | |
550 account_jid: Optional[jid.JID], | |
551 node: Optional[str], | |
552 ap_account: Optional[str], | |
553 ap_url: str, | |
554 signing_actor: str | |
555 ) -> None: | |
556 client = await self.apg.getVirtualClient(signing_actor) | |
557 await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "no"}}) | |
558 | |
514 async def APActorRequest( | 559 async def APActorRequest( |
515 self, | 560 self, |
516 request: "HTTPRequest", | 561 request: "HTTPRequest", |
517 account_jid: jid.JID, | 562 account_jid: jid.JID, |
518 node: Optional[str], | 563 node: Optional[str], |
529 # we have to use AP account as preferredUsername because it is used to retrieve | 574 # we have to use AP account as preferredUsername because it is used to retrieve |
530 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) | 575 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) |
531 preferred_username = ap_account.split("@", 1)[0] | 576 preferred_username = ap_account.split("@", 1)[0] |
532 | 577 |
533 identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid) | 578 identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid) |
579 if node and node.startswith(self.apg._events.namespace): | |
580 events = outbox | |
581 else: | |
582 events_account = await self.apg.getAPAccountFromJidAndNode( | |
583 account_jid, self.apg._events.namespace | |
584 ) | |
585 events = self.apg.buildAPURL(TYPE_OUTBOX, events_account) | |
534 | 586 |
535 actor_data = { | 587 actor_data = { |
536 "@context": [ | 588 "@context": [ |
537 "https://www.w3.org/ns/activitystreams", | 589 "https://www.w3.org/ns/activitystreams", |
538 "https://w3id.org/security/v1" | 590 "https://w3id.org/security/v1" |
541 "id": ap_url, | 593 "id": ap_url, |
542 "type": "Person", | 594 "type": "Person", |
543 "preferredUsername": preferred_username, | 595 "preferredUsername": preferred_username, |
544 "inbox": inbox, | 596 "inbox": inbox, |
545 "outbox": outbox, | 597 "outbox": outbox, |
598 "events": events, | |
546 "followers": followers, | 599 "followers": followers, |
547 "following": following, | 600 "following": following, |
548 "publicKey": { | 601 "publicKey": { |
549 "id": f"{ap_url}#main-key", | 602 "id": f"{ap_url}#main-key", |
550 "owner": ap_url, | 603 "owner": ap_url, |
551 "publicKeyPem": self.apg.public_key_pem | 604 "publicKeyPem": self.apg.public_key_pem |
552 }, | 605 }, |
553 "endpoints": { | 606 "endpoints": { |
554 "sharedInbox": shared_inbox | 607 "sharedInbox": shared_inbox, |
608 "events": events, | |
555 }, | 609 }, |
556 } | 610 } |
557 | 611 |
558 if identity_data.get("nicknames"): | 612 if identity_data.get("nicknames"): |
559 actor_data["name"] = identity_data["nicknames"][0] | 613 actor_data["name"] = identity_data["nicknames"][0] |
631 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") | 685 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") |
632 return {} | 686 return {} |
633 | 687 |
634 base_url = self.getCanonicalURL(request) | 688 base_url = self.getCanonicalURL(request) |
635 url = f"{base_url}?{parse.urlencode(query_data, True)}" | 689 url = f"{base_url}?{parse.urlencode(query_data, True)}" |
636 data = { | 690 if node and node.startswith(self.apg._events.namespace): |
637 "@context": "https://www.w3.org/ns/activitystreams", | 691 ordered_items = [ |
638 "id": url, | 692 await self.apg.ap_events.event_data_2_ap_item( |
639 "type": "OrderedCollectionPage", | 693 self.apg._events.event_elt_2_event_data(item), |
640 "partOf": base_url, | 694 account_jid |
641 "orderedItems" : [ | 695 ) |
642 await self.apg.mbdata2APitem( | 696 for item in reversed(items) |
697 ] | |
698 else: | |
699 ordered_items = [ | |
700 await self.apg.mb_data_2_ap_item( | |
643 self.apg.client, | 701 self.apg.client, |
644 await self.apg._m.item2mbdata( | 702 await self.apg._m.item2mbdata( |
645 self.apg.client, | 703 self.apg.client, |
646 item, | 704 item, |
647 account_jid, | 705 account_jid, |
648 node | 706 node |
649 ) | 707 ) |
650 ) | 708 ) |
651 for item in reversed(items) | 709 for item in reversed(items) |
652 ] | 710 ] |
711 data = { | |
712 "@context": ["https://www.w3.org/ns/activitystreams"], | |
713 "id": url, | |
714 "type": "OrderedCollectionPage", | |
715 "partOf": base_url, | |
716 "orderedItems": ordered_items | |
653 } | 717 } |
654 | 718 |
655 # AP OrderedCollection must be in reversed chronological order, thus the opposite | 719 # AP OrderedCollection must be in reversed chronological order, thus the opposite |
656 # of what we get with RSM (at least with Libervia Pubsub) | 720 # of what we get with RSM (at least with Libervia Pubsub) |
657 if not metadata["complete"]: | 721 if not metadata["complete"]: |
716 | 780 |
717 url = self.getCanonicalURL(request) | 781 url = self.getCanonicalURL(request) |
718 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" | 782 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" |
719 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" | 783 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" |
720 return { | 784 return { |
721 "@context": "https://www.w3.org/ns/activitystreams", | 785 "@context": ["https://www.w3.org/ns/activitystreams"], |
722 "id": url, | 786 "id": url, |
723 "totalItems": items_count, | 787 "totalItems": items_count, |
724 "type": "OrderedCollection", | 788 "type": "OrderedCollection", |
725 "first": url_first_page, | 789 "first": url_first_page, |
726 "last": url_last_page, | 790 "last": url_last_page, |
735 ap_url: str, | 799 ap_url: str, |
736 signing_actor: Optional[str] | 800 signing_actor: Optional[str] |
737 ) -> None: | 801 ) -> None: |
738 if signing_actor is None: | 802 if signing_actor is None: |
739 raise exceptions.InternalError("signing_actor must be set for inbox requests") | 803 raise exceptions.InternalError("signing_actor must be set for inbox requests") |
740 if node is None: | |
741 node = self.apg._m.namespace | |
742 try: | 804 try: |
743 data = json.load(request.content) | 805 data = json.load(request.content) |
744 if not isinstance(data, dict): | 806 if not isinstance(data, dict): |
745 raise ValueError("data should be an object") | 807 raise ValueError("data should be an object") |
746 except (json.JSONDecodeError, ValueError) as e: | 808 except (json.JSONDecodeError, ValueError) as e: |
803 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) | 865 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) |
804 followers.append(ap_account) | 866 followers.append(ap_account) |
805 | 867 |
806 url = self.getCanonicalURL(request) | 868 url = self.getCanonicalURL(request) |
807 return { | 869 return { |
808 "@context": "https://www.w3.org/ns/activitystreams", | 870 "@context": ["https://www.w3.org/ns/activitystreams"], |
809 "type": "OrderedCollection", | 871 "type": "OrderedCollection", |
810 "id": url, | 872 "id": url, |
811 "totalItems": len(subscribers), | 873 "totalItems": len(subscribers), |
812 "first": { | 874 "first": { |
813 "type": "OrderedCollectionPage", | 875 "type": "OrderedCollectionPage", |
842 ) | 904 ) |
843 following.append(ap_account) | 905 following.append(ap_account) |
844 | 906 |
845 url = self.getCanonicalURL(request) | 907 url = self.getCanonicalURL(request) |
846 return { | 908 return { |
847 "@context": "https://www.w3.org/ns/activitystreams", | 909 "@context": ["https://www.w3.org/ns/activitystreams"], |
848 "type": "OrderedCollection", | 910 "type": "OrderedCollection", |
849 "id": url, | 911 "id": url, |
850 "totalItems": len(subscriptions), | 912 "totalItems": len(subscriptions), |
851 "first": { | 913 "first": { |
852 "type": "OrderedCollectionPage", | 914 "type": "OrderedCollectionPage", |
899 avatar_filename = extra_args[0] | 961 avatar_filename = extra_args[0] |
900 avatar_path = self.apg.host.common_cache.getPath(avatar_filename) | 962 avatar_path = self.apg.host.common_cache.getPath(avatar_filename) |
901 return static.File(str(avatar_path)).render(request) | 963 return static.File(str(avatar_path)).render(request) |
902 elif request_type == "item": | 964 elif request_type == "item": |
903 ret_data = await self.apg.apGetLocalObject(ap_url) | 965 ret_data = await self.apg.apGetLocalObject(ap_url) |
904 ret_data["@context"] = NS_AP | 966 if "@context" not in ret_data: |
967 ret_data["@context"] = [NS_AP] | |
905 else: | 968 else: |
906 if len(extra_args) > 1: | 969 if len(extra_args) > 1: |
907 log.warning(f"unexpected extra arguments: {extra_args!r}") | 970 log.warning(f"unexpected extra arguments: {extra_args!r}") |
908 ap_account = extra_args[0] | 971 ap_account = extra_args[0] |
909 account_jid, node = await self.apg.getJIDAndNode(ap_account) | 972 account_jid, node = await self.apg.getJIDAndNode(ap_account) |