comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents aa7197b67c26
children 6fa4ca0c047e
comparison
equal deleted inserted replaced
3903:384b7e6c2dbf 3904:0aa7023dcd08
39 from sat.tools.common import date_utils, uri 39 from sat.tools.common import date_utils, uri
40 from sat.memory.sqla_mapping import SubscriptionState 40 from sat.memory.sqla_mapping import SubscriptionState
41 41
42 from .constants import ( 42 from .constants import (
43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, 43 NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
44 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, 44 TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER,
45 SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, 45 ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS,
46 TYPE_REACTION, ST_AP_CACHE 46 TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE
47 ) 47 )
48 from .regex import RE_SIG_PARAM 48 from .regex import RE_SIG_PARAM
49 49
50 50
51 log = getLogger(__name__) 51 log = getLogger(__name__)
82 request, 82 request,
83 http.INTERNAL_SERVER_ERROR, 83 http.INTERNAL_SERVER_ERROR,
84 f"internal error: {failure_.value}" 84 f"internal error: {failure_.value}"
85 ) 85 )
86 request.finish() 86 request.finish()
87 raise failure_
87 88
88 async def webfinger(self, request): 89 async def webfinger(self, request):
89 url_parsed = parse.urlparse(request.uri.decode()) 90 url_parsed = parse.urlparse(request.uri.decode())
90 query = parse.parse_qs(url_parsed.query) 91 query = parse.parse_qs(url_parsed.query)
91 resource = query.get("resource", [""])[0] 92 resource = query.get("resource", [""])[0]
293 log.error( 294 log.error(
294 '"_repeated" field already present in given AP item, this should not ' 295 '"_repeated" field already present in given AP item, this should not '
295 f"happen. Ignoring object from {signing_actor}\n{data}" 296 f"happen. Ignoring object from {signing_actor}\n{data}"
296 ) 297 )
297 raise exceptions.DataError("unexpected field in item") 298 raise exceptions.DataError("unexpected field in item")
298 if node is None:
299 node = self.apg._m.namespace
300 client = await self.apg.getVirtualClient(signing_actor) 299 client = await self.apg.getVirtualClient(signing_actor)
301 objects = await self.apg.apGetList(data, "object") 300 objects = await self.apg.apGetList(data, "object")
302 for obj in objects: 301 for obj in objects:
302 if node is None:
303 if obj.get("type") == TYPE_EVENT:
304 node = self.apg._events.namespace
305 else:
306 node = self.apg._m.namespace
303 sender = await self.apg.apGetSenderActor(obj) 307 sender = await self.apg.apGetSenderActor(obj)
304 if repeated: 308 if repeated:
305 # we don't check sender when item is repeated, as it should be different 309 # we don't check sender when item is repeated, as it should be different
306 # from post author in this case 310 # from post author in this case
307 sender_jid = await self.apg.getJIDFromId(sender) 311 sender_jid = await self.apg.getJIDFromId(sender)
351 if sender != signing_actor: 355 if sender != signing_actor:
352 log.warning( 356 log.warning(
353 "Ignoring object not attributed to signing actor: {obj}" 357 "Ignoring object not attributed to signing actor: {obj}"
354 ) 358 )
355 continue 359 continue
360
356 await self.apg.newAPItem(client, account_jid, node, obj) 361 await self.apg.newAPItem(client, account_jid, node, obj)
357 362
358 async def handleCreateActivity( 363 async def handleCreateActivity(
359 self, 364 self,
360 request: "HTTPRequest", 365 request: "HTTPRequest",
363 node: Optional[str], 368 node: Optional[str],
364 ap_account: Optional[str], 369 ap_account: Optional[str],
365 ap_url: str, 370 ap_url: str,
366 signing_actor: str 371 signing_actor: str
367 ): 372 ):
373 await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
374
375 async def handleUpdateActivity(
376 self,
377 request: "HTTPRequest",
378 data: dict,
379 account_jid: Optional[jid.JID],
380 node: Optional[str],
381 ap_account: Optional[str],
382 ap_url: str,
383 signing_actor: str
384 ):
385 # Update is the same as create: the item ID stays the same, thus the item will be
386 # overwritten
368 await self.handleNewAPItems(request, data, account_jid, node, signing_actor) 387 await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
369 388
370 async def handleAnnounceActivity( 389 async def handleAnnounceActivity(
371 self, 390 self,
372 request: "HTTPRequest", 391 request: "HTTPRequest",
509 client = await self.apg.getVirtualClient(signing_actor) 528 client = await self.apg.getVirtualClient(signing_actor)
510 await self.handleAttachmentItem(client, data, { 529 await self.handleAttachmentItem(client, data, {
511 "reactions": {"operation": "update", "add": [data["content"]]} 530 "reactions": {"operation": "update", "add": [data["content"]]}
512 }) 531 })
513 532
533 async def handleJoinActivity(
534 self,
535 request: "HTTPRequest",
536 data: dict,
537 account_jid: Optional[jid.JID],
538 node: Optional[str],
539 ap_account: Optional[str],
540 ap_url: str,
541 signing_actor: str
542 ) -> None:
543 client = await self.apg.getVirtualClient(signing_actor)
544 await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "yes"}})
545
546 async def handleLeaveActivity(
547 self,
548 request: "HTTPRequest",
549 data: dict,
550 account_jid: Optional[jid.JID],
551 node: Optional[str],
552 ap_account: Optional[str],
553 ap_url: str,
554 signing_actor: str
555 ) -> None:
556 client = await self.apg.getVirtualClient(signing_actor)
557 await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "no"}})
558
514 async def APActorRequest( 559 async def APActorRequest(
515 self, 560 self,
516 request: "HTTPRequest", 561 request: "HTTPRequest",
517 account_jid: jid.JID, 562 account_jid: jid.JID,
518 node: Optional[str], 563 node: Optional[str],
529 # we have to use AP account as preferredUsername because it is used to retrieve 574 # we have to use AP account as preferredUsername because it is used to retrieve
530 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) 575 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
531 preferred_username = ap_account.split("@", 1)[0] 576 preferred_username = ap_account.split("@", 1)[0]
532 577
533 identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid) 578 identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid)
579 if node and node.startswith(self.apg._events.namespace):
580 events = outbox
581 else:
582 events_account = await self.apg.getAPAccountFromJidAndNode(
583 account_jid, self.apg._events.namespace
584 )
585 events = self.apg.buildAPURL(TYPE_OUTBOX, events_account)
534 586
535 actor_data = { 587 actor_data = {
536 "@context": [ 588 "@context": [
537 "https://www.w3.org/ns/activitystreams", 589 "https://www.w3.org/ns/activitystreams",
538 "https://w3id.org/security/v1" 590 "https://w3id.org/security/v1"
541 "id": ap_url, 593 "id": ap_url,
542 "type": "Person", 594 "type": "Person",
543 "preferredUsername": preferred_username, 595 "preferredUsername": preferred_username,
544 "inbox": inbox, 596 "inbox": inbox,
545 "outbox": outbox, 597 "outbox": outbox,
598 "events": events,
546 "followers": followers, 599 "followers": followers,
547 "following": following, 600 "following": following,
548 "publicKey": { 601 "publicKey": {
549 "id": f"{ap_url}#main-key", 602 "id": f"{ap_url}#main-key",
550 "owner": ap_url, 603 "owner": ap_url,
551 "publicKeyPem": self.apg.public_key_pem 604 "publicKeyPem": self.apg.public_key_pem
552 }, 605 },
553 "endpoints": { 606 "endpoints": {
554 "sharedInbox": shared_inbox 607 "sharedInbox": shared_inbox,
608 "events": events,
555 }, 609 },
556 } 610 }
557 611
558 if identity_data.get("nicknames"): 612 if identity_data.get("nicknames"):
559 actor_data["name"] = identity_data["nicknames"][0] 613 actor_data["name"] = identity_data["nicknames"][0]
631 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") 685 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
632 return {} 686 return {}
633 687
634 base_url = self.getCanonicalURL(request) 688 base_url = self.getCanonicalURL(request)
635 url = f"{base_url}?{parse.urlencode(query_data, True)}" 689 url = f"{base_url}?{parse.urlencode(query_data, True)}"
636 data = { 690 if node and node.startswith(self.apg._events.namespace):
637 "@context": "https://www.w3.org/ns/activitystreams", 691 ordered_items = [
638 "id": url, 692 await self.apg.ap_events.event_data_2_ap_item(
639 "type": "OrderedCollectionPage", 693 self.apg._events.event_elt_2_event_data(item),
640 "partOf": base_url, 694 account_jid
641 "orderedItems" : [ 695 )
642 await self.apg.mbdata2APitem( 696 for item in reversed(items)
697 ]
698 else:
699 ordered_items = [
700 await self.apg.mb_data_2_ap_item(
643 self.apg.client, 701 self.apg.client,
644 await self.apg._m.item2mbdata( 702 await self.apg._m.item2mbdata(
645 self.apg.client, 703 self.apg.client,
646 item, 704 item,
647 account_jid, 705 account_jid,
648 node 706 node
649 ) 707 )
650 ) 708 )
651 for item in reversed(items) 709 for item in reversed(items)
652 ] 710 ]
711 data = {
712 "@context": ["https://www.w3.org/ns/activitystreams"],
713 "id": url,
714 "type": "OrderedCollectionPage",
715 "partOf": base_url,
716 "orderedItems": ordered_items
653 } 717 }
654 718
655 # AP OrderedCollection must be in reversed chronological order, thus the opposite 719 # AP OrderedCollection must be in reversed chronological order, thus the opposite
656 # of what we get with RSM (at least with Libervia Pubsub) 720 # of what we get with RSM (at least with Libervia Pubsub)
657 if not metadata["complete"]: 721 if not metadata["complete"]:
716 780
717 url = self.getCanonicalURL(request) 781 url = self.getCanonicalURL(request)
718 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" 782 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
719 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" 783 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
720 return { 784 return {
721 "@context": "https://www.w3.org/ns/activitystreams", 785 "@context": ["https://www.w3.org/ns/activitystreams"],
722 "id": url, 786 "id": url,
723 "totalItems": items_count, 787 "totalItems": items_count,
724 "type": "OrderedCollection", 788 "type": "OrderedCollection",
725 "first": url_first_page, 789 "first": url_first_page,
726 "last": url_last_page, 790 "last": url_last_page,
735 ap_url: str, 799 ap_url: str,
736 signing_actor: Optional[str] 800 signing_actor: Optional[str]
737 ) -> None: 801 ) -> None:
738 if signing_actor is None: 802 if signing_actor is None:
739 raise exceptions.InternalError("signing_actor must be set for inbox requests") 803 raise exceptions.InternalError("signing_actor must be set for inbox requests")
740 if node is None:
741 node = self.apg._m.namespace
742 try: 804 try:
743 data = json.load(request.content) 805 data = json.load(request.content)
744 if not isinstance(data, dict): 806 if not isinstance(data, dict):
745 raise ValueError("data should be an object") 807 raise ValueError("data should be an object")
746 except (json.JSONDecodeError, ValueError) as e: 808 except (json.JSONDecodeError, ValueError) as e:
803 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) 865 ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node)
804 followers.append(ap_account) 866 followers.append(ap_account)
805 867
806 url = self.getCanonicalURL(request) 868 url = self.getCanonicalURL(request)
807 return { 869 return {
808 "@context": "https://www.w3.org/ns/activitystreams", 870 "@context": ["https://www.w3.org/ns/activitystreams"],
809 "type": "OrderedCollection", 871 "type": "OrderedCollection",
810 "id": url, 872 "id": url,
811 "totalItems": len(subscribers), 873 "totalItems": len(subscribers),
812 "first": { 874 "first": {
813 "type": "OrderedCollectionPage", 875 "type": "OrderedCollectionPage",
842 ) 904 )
843 following.append(ap_account) 905 following.append(ap_account)
844 906
845 url = self.getCanonicalURL(request) 907 url = self.getCanonicalURL(request)
846 return { 908 return {
847 "@context": "https://www.w3.org/ns/activitystreams", 909 "@context": ["https://www.w3.org/ns/activitystreams"],
848 "type": "OrderedCollection", 910 "type": "OrderedCollection",
849 "id": url, 911 "id": url,
850 "totalItems": len(subscriptions), 912 "totalItems": len(subscriptions),
851 "first": { 913 "first": {
852 "type": "OrderedCollectionPage", 914 "type": "OrderedCollectionPage",
899 avatar_filename = extra_args[0] 961 avatar_filename = extra_args[0]
900 avatar_path = self.apg.host.common_cache.getPath(avatar_filename) 962 avatar_path = self.apg.host.common_cache.getPath(avatar_filename)
901 return static.File(str(avatar_path)).render(request) 963 return static.File(str(avatar_path)).render(request)
902 elif request_type == "item": 964 elif request_type == "item":
903 ret_data = await self.apg.apGetLocalObject(ap_url) 965 ret_data = await self.apg.apGetLocalObject(ap_url)
904 ret_data["@context"] = NS_AP 966 if "@context" not in ret_data:
967 ret_data["@context"] = [NS_AP]
905 else: 968 else:
906 if len(extra_args) > 1: 969 if len(extra_args) > 1:
907 log.warning(f"unexpected extra arguments: {extra_args!r}") 970 log.warning(f"unexpected extra arguments: {extra_args!r}")
908 ap_account = extra_args[0] 971 ap_account = extra_args[0]
909 account_jid, node = await self.apg.getJIDAndNode(ap_account) 972 account_jid, node = await self.apg.getJIDAndNode(ap_account)