comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4259:49019947cc76

component AP Gateway: implement HTTP GET signature.
author Goffi <goffi@goffi.org>
date Wed, 05 Jun 2024 22:34:09 +0200
parents 6784d07b99c8
children d366d90a71aa
comparison
equal deleted inserted replaced
4258:ba28ca268f4a 4259:49019947cc76
41 from cryptography.hazmat.primitives import serialization 41 from cryptography.hazmat.primitives import serialization
42 from cryptography.hazmat.primitives import hashes 42 from cryptography.hazmat.primitives import hashes
43 from cryptography.hazmat.primitives.asymmetric import rsa 43 from cryptography.hazmat.primitives.asymmetric import rsa
44 from cryptography.hazmat.primitives.asymmetric import padding 44 from cryptography.hazmat.primitives.asymmetric import padding
45 import dateutil 45 import dateutil
46 from dateutil.parser import parserinfo
47 import shortuuid 46 import shortuuid
48 from sqlalchemy.exc import IntegrityError
49 import treq 47 import treq
50 from treq.response import _Response as TReqResponse 48 from treq.response import _Response as TReqResponse
51 from twisted.internet import defer, reactor, threads 49 from twisted.internet import defer, reactor, threads
52 from twisted.web import http 50 from twisted.web import http
53 from twisted.words.protocols.jabber import error, jid 51 from twisted.words.protocols.jabber import error, jid
139 137
140 def __init__(self, host): 138 def __init__(self, host):
141 self.host = host 139 self.host = host
142 self.initialised = False 140 self.initialised = False
143 self.client = None 141 self.client = None
142 self.http_sign_get = True
144 self._p = host.plugins["XEP-0060"] 143 self._p = host.plugins["XEP-0060"]
145 self._a = host.plugins["XEP-0084"] 144 self._a = host.plugins["XEP-0084"]
146 self._e = host.plugins["XEP-0106"] 145 self._e = host.plugins["XEP-0106"]
147 self._m = host.plugins["XEP-0277"] 146 self._m = host.plugins["XEP-0277"]
148 self._v = host.plugins["XEP-0292"] 147 self._v = host.plugins["XEP-0292"]
259 if connection_type not in ('http', 'https'): 258 if connection_type not in ('http', 'https'):
260 raise exceptions.ConfigError( 259 raise exceptions.ConfigError(
261 'bad ap-gateay http_connection_type, you must use one of "http" or ' 260 'bad ap-gateay http_connection_type, you must use one of "http" or '
262 '"https"' 261 '"https"'
263 ) 262 )
263 self.http_sign_get = C.bool(
264 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE)
265 )
264 self.max_items = int(self.host.memory.config_get( 266 self.max_items = int(self.host.memory.config_get(
265 CONF_SECTION, 'new_node_max_items', 50 267 CONF_SECTION, 'new_node_max_items', 50
266 268
267 )) 269 ))
268 self.comments_max_depth = int(self.host.memory.config_get( 270 self.comments_max_depth = int(self.host.memory.config_get(
360 await self.convert_and_post_items( 362 await self.convert_and_post_items(
361 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, 363 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
362 itemsEvent.items 364 itemsEvent.items
363 ) 365 )
364 366
365 async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity: 367 async def get_virtual_client(
368 self,
369 requestor_actor_id: str,
370 actor_id: str
371 ) -> SatXMPPEntity:
366 """Get client for this component with a specified jid 372 """Get client for this component with a specified jid
367 373
368 This is needed to perform operations with the virtual JID corresponding to the AP 374 This is needed to perform operations with the virtual JID corresponding to the AP
369 actor instead of the JID of the gateway itself. 375 actor instead of the JID of the gateway itself.
376 @param requestor_actor_id: originating actor ID (URL)
370 @param actor_id: ID of the actor 377 @param actor_id: ID of the actor
371 @return: virtual client 378 @return: virtual client
372 """ 379 """
373 local_jid = await self.get_jid_from_id(actor_id) 380 local_jid = await self.get_jid_from_id(requestor_actor_id, actor_id)
374 return self.client.get_virtual_client(local_jid) 381 return self.client.get_virtual_client(local_jid)
375 382
376 def is_activity(self, data: dict) -> bool: 383 def is_activity(self, data: dict) -> bool:
377 """Return True if the data has an activity type""" 384 """Return True if the data has an activity type"""
378 try: 385 try:
379 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER 386 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
380 except (KeyError, TypeError): 387 except (KeyError, TypeError):
381 return False 388 return False
382 389
383 async def ap_get(self, url: str) -> dict: 390 async def ap_get(self, url: str, requestor_actor_id: str) -> dict:
384 """Retrieve AP JSON from given URL 391 """Retrieve AP JSON from given URL with HTTP Signature
385 392
393 @param url: AP server endpoint
394 @param requestor_actor_id: originating actor ID (URL)
386 @raise error.StanzaError: "service-unavailable" is sent when something went wrong 395 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
387 with AP server 396 with AP server
388 """ 397 """
389 resp = await treq.get( 398 if self.http_sign_get:
390 url, 399 headers = self._generate_signed_headers(url, requestor_actor_id, method="get")
391 headers = { 400 else:
392 "Accept": [MEDIA_TYPE_AP], 401 headers = {}
393 } 402 headers["Accept"] = MEDIA_TYPE_AP
394 ) 403
404 resp = await treq.get(url, headers=headers)
395 if resp.code >= 300: 405 if resp.code >= 300:
396 text = await resp.text() 406 text = await resp.text()
397 if resp.code == 404: 407 if resp.code == 404:
398 raise exceptions.NotFound(f"Can't find resource at {url}") 408 raise exceptions.NotFound(f"Can't find resource at {url}")
399 else: 409 else:
405 raise error.StanzaError( 415 raise error.StanzaError(
406 "service-unavailable", 416 "service-unavailable",
407 text=f"Can't get AP data at {url}: {e}" 417 text=f"Can't get AP data at {url}: {e}"
408 ) 418 )
409 419
410 @overload 420 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse:
411 async def ap_get_object(self, data: dict, key: str) -> Optional[dict]: 421 """Sign a document and post it to AP server
412 ... 422
423 @param url: AP server endpoint
424 @param requestor_actor_id: originating actor ID (URL)
425 @param doc: document to send
426 """
427 if self.verbose:
428 __, actor_args = self.parse_apurl(requestor_actor_id)
429 actor_account = actor_args[0]
430 to_log = [
431 "",
432 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}"
433 ]
434
435 body = json.dumps(doc).encode()
436 headers = self._generate_signed_headers(url, requestor_actor_id, method="post", body=body)
437 headers["Content-Type"] = MEDIA_TYPE_AP
438
439 if self.verbose:
440 if self.verbose >= 3:
441 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items())
442 to_log.append(f" headers:\n{h_to_log}")
443 to_log.append("---")
444 log.info("\n".join(to_log))
445
446 resp = await treq.post(
447 url,
448 body,
449 headers=headers
450 )
451 if resp.code >= 300:
452 text = await resp.text()
453 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
454 elif self.verbose:
455 log.info(f"==> response code: {resp.code}")
456 return resp
457
458 def _generate_signed_headers(
459 self,
460 url: str,
461 actor_id: str,
462 method: str,
463 body: bytes|None = None
464 ) -> dict[str, str]:
465 """Generate HTTP headers with signature for a given request
466
467 @param url: AP server endpoint
468 @param actor_id: originating actor ID (URL)
469 @param method: HTTP method (e.g., 'get', 'post')
470 @param body: request body if any
471 @return: signed headers
472 """
473 p_url = parse.urlparse(url)
474 headers = {
475 "(request-target)": f"{method} {p_url.path}",
476 "Host": p_url.hostname,
477 "Date": http.datetimeToString().decode()
478 }
479
480 if body:
481 digest_algo, digest_hash = self.get_digest(body)
482 headers["Digest"] = f"{digest_algo}={digest_hash}"
483
484 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers)
485 return headers
413 486
414 @overload 487 @overload
415 async def ap_get_object( 488 async def ap_get_object(
416 self, data: Union[str, dict], key: None = None 489 self, requestor_actor_id: str, data: dict, key: str
490 ) -> dict|None:
491 ...
492
493 @overload
494 async def ap_get_object(
495 self, requestor_actor_id: str, data: Union[str, dict], key: None = None
417 ) -> dict: 496 ) -> dict:
418 ... 497 ...
419 498
420 async def ap_get_object(self, data, key = None): 499 async def ap_get_object(self, requestor_actor_id: str, data, key = None) -> dict|None:
421 """Retrieve an AP object, dereferencing when necessary 500 """Retrieve an AP object, dereferencing when necessary
422 501
423 This method is to be used with attributes marked as "Functional" in 502 This method is to be used with attributes marked as "Functional" in
424 https://www.w3.org/TR/activitystreams-vocabulary 503 https://www.w3.org/TR/activitystreams-vocabulary
504 @param requestor_actor_id: ID of the actor doing the request.
425 @param data: AP object where an other object is looked for, or the object itself 505 @param data: AP object where an other object is looked for, or the object itself
426 @param key: name of the object to look for, or None if data is the object directly 506 @param key: name of the object to look for, or None if data is the object directly
427 @return: found object if any 507 @return: found object if any
428 """ 508 """
429 if key is not None: 509 if key is not None:
438 return value 518 return value
439 elif isinstance(value, str): 519 elif isinstance(value, str):
440 if self.is_local_url(value): 520 if self.is_local_url(value):
441 return await self.ap_get_local_object(value) 521 return await self.ap_get_local_object(value)
442 else: 522 else:
443 return await self.ap_get(value) 523 return await self.ap_get(value, requestor_actor_id)
444 else: 524 else:
445 raise NotImplementedError( 525 raise NotImplementedError(
446 "was expecting a string or a dict, got {type(value)}: {value!r}}" 526 "was expecting a string or a dict, got {type(value)}: {value!r}}"
447 ) 527 )
448 528
515 'only object from "item" URLs can be retrieved for now' 595 'only object from "item" URLs can be retrieved for now'
516 ) 596 )
517 597
518 async def ap_get_list( 598 async def ap_get_list(
519 self, 599 self,
600 requestor_actor_id: str,
520 data: dict, 601 data: dict,
521 key: str, 602 key: str,
522 only_ids: bool = False 603 only_ids: bool = False
523 ) -> Optional[List[Dict[str, Any]]]: 604 ) -> Optional[List[Dict[str, Any]]]:
524 """Retrieve a list of objects from AP data, dereferencing when necessary 605 """Retrieve a list of objects from AP data, dereferencing when necessary
525 606
526 This method is to be used with non functional vocabularies. Use ``ap_get_object`` 607 This method is to be used with non functional vocabularies. Use ``ap_get_object``
527 otherwise. 608 otherwise.
528 If the value is a dictionary, it will be wrapped in a list 609 If the value is a dictionary, it will be wrapped in a list
610 @param requestor_actor_id: ID of the actor doing the request.
529 @param data: AP object where a list of objects is looked for 611 @param data: AP object where a list of objects is looked for
530 @param key: key of the list to look for 612 @param key: key of the list to look for
531 @param only_ids: if Trye, only items IDs are retrieved 613 @param only_ids: if Trye, only items IDs are retrieved
532 @return: list of objects, or None if the key is not present 614 @return: list of objects, or None if the key is not present
533 """ 615 """
536 return None 618 return None
537 elif isinstance(value, str): 619 elif isinstance(value, str):
538 if self.is_local_url(value): 620 if self.is_local_url(value):
539 value = await self.ap_get_local_object(value) 621 value = await self.ap_get_local_object(value)
540 else: 622 else:
541 value = await self.ap_get(value) 623 value = await self.ap_get(value, requestor_actor_id)
542 if isinstance(value, dict): 624 if isinstance(value, dict):
543 return [value] 625 return [value]
544 if not isinstance(value, list): 626 if not isinstance(value, list):
545 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") 627 raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
546 if only_ids: 628 if only_ids:
547 return [ 629 return [
548 {"id": v["id"]} if isinstance(v, dict) else {"id": v} 630 {"id": v["id"]} if isinstance(v, dict) else {"id": v}
549 for v in value 631 for v in value
550 ] 632 ]
551 else: 633 else:
552 return [await self.ap_get_object(i) for i in value] 634 return [await self.ap_get_object(requestor_actor_id, i) for i in value]
553 635
554 async def ap_get_actors( 636 async def ap_get_actors(
555 self, 637 self,
638 requestor_actor_id: str,
556 data: dict, 639 data: dict,
557 key: str, 640 key: str,
558 as_account: bool = True 641 as_account: bool = True
559 ) -> List[str]: 642 ) -> List[str]:
560 """Retrieve AP actors from data 643 """Retrieve AP actors from data
561 644
645 @param requestor_actor_id: ID of the actor doing the request.
562 @param data: AP object containing a field with actors 646 @param data: AP object containing a field with actors
563 @param key: field to use to retrieve actors 647 @param key: field to use to retrieve actors
564 @param as_account: if True returns account handles, otherwise will return actor 648 @param as_account: if True returns account handles, otherwise will return actor
565 IDs 649 IDs
566 @raise exceptions.DataError: there is not actor data or it is invalid 650 @raise exceptions.DataError: there is not actor data or it is invalid
589 if not value: 673 if not value:
590 raise exceptions.DataError( 674 raise exceptions.DataError(
591 f"list of actors is empty" 675 f"list of actors is empty"
592 ) 676 )
593 if as_account: 677 if as_account:
594 return [await self.get_ap_account_from_id(actor_id) for actor_id in value] 678 return [
679 await self.get_ap_account_from_id(requestor_actor_id, actor_id)
680 for actor_id in value
681 ]
595 else: 682 else:
596 return value 683 return value
597 684
598 async def ap_get_sender_actor( 685 async def ap_get_sender_actor(
599 self, 686 self,
687 requestor_actor_id: str,
600 data: dict, 688 data: dict,
601 ) -> str: 689 ) -> str:
602 """Retrieve actor who sent data 690 """Retrieve actor who sent data
603 691
604 This is done by checking "actor" field first, then "attributedTo" field. 692 This is done by checking "actor" field first, then "attributedTo" field.
605 Only the first found actor is taken into account 693 Only the first found actor is taken into account
694 @param requestor_actor_id: ID of the actor doing the request.
606 @param data: AP object 695 @param data: AP object
607 @return: actor id of the sender 696 @return: actor id of the sender
608 @raise exceptions.NotFound: no actor has been found in data 697 @raise exceptions.NotFound: no actor has been found in data
609 """ 698 """
610 try: 699 try:
611 actors = await self.ap_get_actors(data, "actor", as_account=False) 700 actors = await self.ap_get_actors(requestor_actor_id, data, "actor", as_account=False)
612 except exceptions.DataError: 701 except exceptions.DataError:
613 actors = None 702 actors = None
614 if not actors: 703 if not actors:
615 try: 704 try:
616 actors = await self.ap_get_actors(data, "attributedTo", as_account=False) 705 actors = await self.ap_get_actors(requestor_actor_id, data, "attributedTo", as_account=False)
617 except exceptions.DataError: 706 except exceptions.DataError:
618 raise exceptions.NotFound( 707 raise exceptions.NotFound(
619 'actor not specified in "actor" or "attributedTo"' 708 'actor not specified in "actor" or "attributedTo"'
620 ) 709 )
621 try: 710 try:
706 return ( 795 return (
707 ("pubsub", "service") in host_disco.identities 796 ("pubsub", "service") in host_disco.identities
708 and not ("pubsub", "pep") in host_disco.identities 797 and not ("pubsub", "pep") in host_disco.identities
709 ) 798 )
710 799
711 async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: 800 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str|None]:
712 """Decode raw AP account handle to get XMPP JID and Pubsub Node 801 """Decode raw AP account handle to get XMPP JID and Pubsub Node
713 802
714 Username are case insensitive. 803 Username are case insensitive.
715 804
716 By default, the username correspond to local username (i.e. username from 805 By default, the username correspond to local username (i.e. username from
817 self.client.jid.host, 906 self.client.jid.host,
818 None 907 None
819 ) 908 )
820 ) 909 )
821 910
822 async def get_jid_from_id(self, actor_id: str) -> jid.JID: 911 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID:
823 """Compute JID linking to an AP Actor ID 912 """Compute JID linking to an AP Actor ID
824 913
825 The local jid is computer by escaping AP actor handle and using it as local part 914 The local jid is computer by escaping AP actor handle and using it as local part
826 of JID, where domain part is this gateway own JID 915 of JID, where domain part is this gateway own JID
827 If the actor_id comes from local server (checked with self.public_url), it means 916 If the actor_id comes from local server (checked with self.public_url), it means
828 that we have an XMPP entity, and the original JID is returned 917 that we have an XMPP entity, and the original JID is returned
918
919 @param requestor_actor_id: ID of the actor doing the request.
920 @param actor_id: ID of the actor to generate JID from.
921 @return: generated JID.
829 """ 922 """
830 if self.is_local_url(actor_id): 923 if self.is_local_url(actor_id):
831 request_type, extra_args = self.parse_apurl(actor_id) 924 request_type, extra_args = self.parse_apurl(actor_id)
832 if request_type != TYPE_ACTOR or len(extra_args) != 1: 925 if request_type != TYPE_ACTOR or len(extra_args) != 1:
833 raise ValueError(f"invalid actor id: {actor_id!r}") 926 raise ValueError(f"invalid actor id: {actor_id!r}")
834 actor_jid, __ = await self.get_jid_and_node(extra_args[0]) 927 actor_jid, __ = await self.get_jid_and_node(extra_args[0])
835 return actor_jid 928 return actor_jid
836 929
837 account = await self.get_ap_account_from_id(actor_id) 930 account = await self.get_ap_account_from_id(requestor_actor_id, actor_id)
838 return self.get_local_jid_from_account(account) 931 return self.get_local_jid_from_account(account)
839 932
840 def parse_apurl(self, url: str) -> Tuple[str, List[str]]: 933 def parse_apurl(self, url: str) -> tuple[str, list[str]]:
841 """Parse an URL leading to an AP endpoint 934 """Parse an URL leading to an AP endpoint
842 935
843 @param url: URL to parse (schema is not mandatory) 936 @param url: URL to parse (schema is not mandatory)
844 @return: endpoint type and extra arguments 937 @return: endpoint type and extra arguments
845 """ 938 """
893 if algo != "SHA-256": 986 if algo != "SHA-256":
894 raise NotImplementedError("only SHA-256 is implemented for now") 987 raise NotImplementedError("only SHA-256 is implemented for now")
895 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() 988 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
896 989
897 @async_lru(maxsize=LRU_MAX_SIZE) 990 @async_lru(maxsize=LRU_MAX_SIZE)
898 async def get_actor_data(self, actor_id) -> dict: 991 async def get_actor_data(self, requestor_actor_id: str, actor_id: str) -> dict:
899 """Retrieve actor data with LRU cache""" 992 """Retrieve actor data with LRU cache"""
900 return await self.ap_get(actor_id) 993 return await self.ap_get(actor_id, requestor_actor_id)
901 994
902 @async_lru(maxsize=LRU_MAX_SIZE) 995 @async_lru(maxsize=LRU_MAX_SIZE)
903 async def get_actor_pub_key_data( 996 async def get_actor_pub_key_data(
904 self, 997 self,
998 requestor_actor_id: str,
905 actor_id: str 999 actor_id: str
906 ) -> Tuple[str, str, rsa.RSAPublicKey]: 1000 ) -> Tuple[str, str, rsa.RSAPublicKey]:
907 """Retrieve Public Key data from actor ID 1001 """Retrieve Public Key data from actor ID
908 1002
1003 @param requestor_actor_id: ID of the actor doing the request.
909 @param actor_id: actor ID (url) 1004 @param actor_id: actor ID (url)
910 @return: key_id, owner and public_key 1005 @return: key_id, owner and public_key
911 @raise KeyError: publicKey is missing from actor data 1006 @raise KeyError: publicKey is missing from actor data
912 """ 1007 """
913 actor_data = await self.get_actor_data(actor_id) 1008 actor_data = await self.get_actor_data(requestor_actor_id, actor_id)
914 pub_key_data = actor_data["publicKey"] 1009 pub_key_data = actor_data["publicKey"]
915 key_id = pub_key_data["id"] 1010 key_id = pub_key_data["id"]
916 owner = pub_key_data["owner"] 1011 owner = pub_key_data["owner"]
917 pub_key_pem = pub_key_data["publicKeyPem"] 1012 pub_key_pem = pub_key_data["publicKeyPem"]
918 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) 1013 pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
967 """Get local key ID from actor ID""" 1062 """Get local key ID from actor ID"""
968 return f"{actor_id}#main-key" 1063 return f"{actor_id}#main-key"
969 1064
970 async def check_signature( 1065 async def check_signature(
971 self, 1066 self,
1067 requestor_actor_id: str,
972 signature: str, 1068 signature: str,
973 key_id: str, 1069 key_id: str,
974 headers: Dict[str, str] 1070 headers: Dict[str, str]
975 ) -> str: 1071 ) -> str:
976 """Verify that signature matches given headers 1072 """Verify that signature matches given headers
977 1073
978 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 1074 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
979 1075
1076 @param requestor_actor_id: ID of the actor doing the request.
980 @param signature: Base64 encoded signature 1077 @param signature: Base64 encoded signature
981 @param key_id: ID of the key used to sign the data 1078 @param key_id: ID of the key used to sign the data
982 @param headers: headers and their values, including pseudo-headers 1079 @param headers: headers and their values, including pseudo-headers
983 @return: id of the signing actor 1080 @return: id of the signing actor
984 1081
989 actor = key_id[5:] 1086 actor = key_id[5:]
990 actor_id = await self.get_ap_actor_id_from_account(actor) 1087 actor_id = await self.get_ap_actor_id_from_account(actor)
991 else: 1088 else:
992 actor_id = key_id.split("#", 1)[0] 1089 actor_id = key_id.split("#", 1)[0]
993 1090
994 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id) 1091 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(
1092 requestor_actor_id,
1093 actor_id
1094 )
995 if pub_key_id != key_id or pub_key_owner != actor_id: 1095 if pub_key_id != key_id or pub_key_owner != actor_id:
996 raise exceptions.EncryptionError("Public Key mismatch") 1096 raise exceptions.EncryptionError("Public Key mismatch")
997 1097
998 try: 1098 try:
999 pub_key.verify( 1099 pub_key.verify(
1064 @param node: (virtual) node corresponding where the item has been published 1164 @param node: (virtual) node corresponding where the item has been published
1065 @param subscribe_extra_nodes: if True, extra data nodes will be automatically 1165 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
1066 subscribed, that is comment nodes if present and attachments nodes. 1166 subscribed, that is comment nodes if present and attachments nodes.
1067 """ 1167 """
1068 actor_id = await self.get_ap_actor_id_from_account(ap_account) 1168 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1069 inbox = await self.get_ap_inbox_from_id(actor_id) 1169 requestor_actor_id = self.build_apurl(
1170 TYPE_ACTOR,
1171 await self.get_ap_account_from_jid_and_node(service, node)
1172 )
1173 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id)
1070 for item in items: 1174 for item in items:
1071 if item.name == "item": 1175 if item.name == "item":
1072 cached_item = await self.host.memory.storage.search_pubsub_items({ 1176 cached_item = await self.host.memory.storage.search_pubsub_items({
1073 "profiles": [self.client.profile], 1177 "profiles": [self.client.profile],
1074 "services": [service], 1178 "services": [service],
1144 url_actor, ap_item = await self.ap_delete_item( 1248 url_actor, ap_item = await self.ap_delete_item(
1145 client.jid, node, item["id"] 1249 client.jid, node, item["id"]
1146 ) 1250 )
1147 else: 1251 else:
1148 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") 1252 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
1149 await self.sign_and_post(inbox, url_actor, ap_item) 1253 await self.ap_post(inbox, url_actor, ap_item)
1150 1254
1151 async def convert_and_post_attachments( 1255 async def convert_and_post_attachments(
1152 self, 1256 self,
1153 client: SatXMPPEntity, 1257 client: SatXMPPEntity,
1154 ap_account: str, 1258 ap_account: str,
1177 "we should get exactly one attachment item for an entity, got " 1281 "we should get exactly one attachment item for an entity, got "
1178 f"{len(items)})" 1282 f"{len(items)})"
1179 ) 1283 )
1180 1284
1181 actor_id = await self.get_ap_actor_id_from_account(ap_account) 1285 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1182 inbox = await self.get_ap_inbox_from_id(actor_id) 1286 requestor_actor_id = self.build_apurl(
1287 TYPE_ACTOR,
1288 await self.get_ap_account_from_jid_and_node(service, node)
1289 )
1290 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id)
1183 1291
1184 item_elt = items[0] 1292 item_elt = items[0]
1185 item_id = item_elt["id"] 1293 item_id = item_elt["id"]
1186 1294
1187 if publisher is None: 1295 if publisher is None:
1201 1309
1202 if publisher is not None: 1310 if publisher is not None:
1203 item_elt["publisher"] = publisher.userhost() 1311 item_elt["publisher"] = publisher.userhost()
1204 1312
1205 item_service, item_node, item_id = self._pa.attachment_node_2_item(node) 1313 item_service, item_node, item_id = self._pa.attachment_node_2_item(node)
1206 item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node) 1314 item_account = await self.get_ap_account_from_jid_and_node(
1315 item_service, item_node
1316 )
1207 if self.is_virtual_jid(item_service): 1317 if self.is_virtual_jid(item_service):
1208 # it's a virtual JID mapping to an external AP actor, we can use the 1318 # it's a virtual JID mapping to an external AP actor, we can use the
1209 # item_id directly 1319 # item_id directly
1210 item_url = item_id 1320 item_url = item_id
1211 if not item_url.startswith("https:"): 1321 if not item_url.startswith("https:"):
1225 }) 1335 })
1226 if not old_attachment_pubsub_items: 1336 if not old_attachment_pubsub_items:
1227 old_attachment = {} 1337 old_attachment = {}
1228 else: 1338 else:
1229 old_attachment_items = [i.data for i in old_attachment_pubsub_items] 1339 old_attachment_items = [i.data for i in old_attachment_pubsub_items]
1230 old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items) 1340 old_attachments = self._pa.items_2_attachment_data(
1341 client, old_attachment_items
1342 )
1231 try: 1343 try:
1232 old_attachment = old_attachments[0] 1344 old_attachment = old_attachments[0]
1233 except IndexError: 1345 except IndexError:
1234 # no known element was present in attachments 1346 # no known element was present in attachments
1235 old_attachment = {} 1347 old_attachment = {}
1252 activity = self.create_activity( 1364 activity = self.create_activity(
1253 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1365 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1254 ) 1366 )
1255 activity["to"] = [ap_account] 1367 activity["to"] = [ap_account]
1256 activity["cc"] = [NS_AP_PUBLIC] 1368 activity["cc"] = [NS_AP_PUBLIC]
1257 await self.sign_and_post(inbox, publisher_actor_id, activity) 1369 await self.ap_post(inbox, publisher_actor_id, activity)
1258 else: 1370 else:
1259 if "noticed" in old_attachment: 1371 if "noticed" in old_attachment:
1260 # "noticed" attachment has been removed, we undo the "Like" activity 1372 # "noticed" attachment has been removed, we undo the "Like" activity
1261 activity_id = self.build_apurl("like", item_account, item_id) 1373 activity_id = self.build_apurl("like", item_account, item_id)
1262 activity = self.create_activity( 1374 activity = self.create_activity(
1263 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1375 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1264 ) 1376 )
1265 activity["to"] = [ap_account] 1377 activity["to"] = [ap_account]
1266 activity["cc"] = [NS_AP_PUBLIC] 1378 activity["cc"] = [NS_AP_PUBLIC]
1267 undo = self.create_activity("Undo", publisher_actor_id, activity) 1379 undo = self.create_activity("Undo", publisher_actor_id, activity)
1268 await self.sign_and_post(inbox, publisher_actor_id, undo) 1380 await self.ap_post(inbox, publisher_actor_id, undo)
1269 1381
1270 # reactions 1382 # reactions
1271 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) 1383 new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
1272 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) 1384 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
1273 reactions_remove = old_reactions - new_reactions 1385 reactions_remove = old_reactions - new_reactions
1288 activy = self.create_activity( 1400 activy = self.create_activity(
1289 "Undo", publisher_actor_id, reaction_activity 1401 "Undo", publisher_actor_id, reaction_activity
1290 ) 1402 )
1291 else: 1403 else:
1292 activy = reaction_activity 1404 activy = reaction_activity
1293 await self.sign_and_post(inbox, publisher_actor_id, activy) 1405 await self.ap_post(inbox, publisher_actor_id, activy)
1294 1406
1295 # RSVP 1407 # RSVP
1296 if "rsvp" in attachments: 1408 if "rsvp" in attachments:
1297 attending = attachments["rsvp"].get("attending", "no") 1409 attending = attachments["rsvp"].get("attending", "no")
1298 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1410 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1302 activity = self.create_activity( 1414 activity = self.create_activity(
1303 activity_type, publisher_actor_id, item_url, activity_id=activity_id 1415 activity_type, publisher_actor_id, item_url, activity_id=activity_id
1304 ) 1416 )
1305 activity["to"] = [ap_account] 1417 activity["to"] = [ap_account]
1306 activity["cc"] = [NS_AP_PUBLIC] 1418 activity["cc"] = [NS_AP_PUBLIC]
1307 await self.sign_and_post(inbox, publisher_actor_id, activity) 1419 await self.ap_post(inbox, publisher_actor_id, activity)
1308 else: 1420 else:
1309 if "rsvp" in old_attachment: 1421 if "rsvp" in old_attachment:
1310 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1422 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1311 if old_attending == "yes": 1423 if old_attending == "yes":
1312 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) 1424 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id)
1313 activity = self.create_activity( 1425 activity = self.create_activity(
1314 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id 1426 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
1315 ) 1427 )
1316 activity["to"] = [ap_account] 1428 activity["to"] = [ap_account]
1317 activity["cc"] = [NS_AP_PUBLIC] 1429 activity["cc"] = [NS_AP_PUBLIC]
1318 await self.sign_and_post(inbox, publisher_actor_id, activity) 1430 await self.ap_post(inbox, publisher_actor_id, activity)
1319 1431
1320 if service.user and self.is_virtual_jid(service): 1432 if service.user and self.is_virtual_jid(service):
1321 # the item is on a virtual service, we need to store it in cache 1433 # the item is on a virtual service, we need to store it in cache
1322 log.debug("storing attachments item in cache") 1434 log.debug("storing attachments item in cache")
1323 cached_node = await self.host.memory.storage.get_pubsub_node( 1435 cached_node = await self.host.memory.storage.get_pubsub_node(
1327 self.client, 1439 self.client,
1328 cached_node, 1440 cached_node,
1329 [item_elt], 1441 [item_elt],
1330 [attachments] 1442 [attachments]
1331 ) 1443 )
1332
1333 async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
1334 """Sign a documentent and post it to AP server
1335
1336 @param url: AP server endpoint
1337 @param actor_id: originating actor ID (URL)
1338 @param doc: document to send
1339 """
1340 if self.verbose:
1341 __, actor_args = self.parse_apurl(actor_id)
1342 actor_account = actor_args[0]
1343 to_log = [
1344 "",
1345 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}"
1346 ]
1347
1348 p_url = parse.urlparse(url)
1349 body = json.dumps(doc).encode()
1350 digest_algo, digest_hash = self.get_digest(body)
1351 digest = f"{digest_algo}={digest_hash}"
1352
1353 headers = {
1354 "(request-target)": f"post {p_url.path}",
1355 "Host": p_url.hostname,
1356 "Date": http.datetimeToString().decode(),
1357 "Digest": digest
1358 }
1359 headers["Content-Type"] = (
1360 MEDIA_TYPE_AP
1361 )
1362 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers)
1363
1364 if self.verbose:
1365 if self.verbose>=3:
1366 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items())
1367 to_log.append(f" headers:\n{h_to_log}")
1368 to_log.append("---")
1369 log.info("\n".join(to_log))
1370
1371 resp = await treq.post(
1372 url,
1373 body,
1374 headers=headers,
1375 )
1376 if resp.code >= 300:
1377 text = await resp.text()
1378 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
1379 elif self.verbose:
1380 log.info(f"==> response code: {resp.code}")
1381 return resp
1382 1444
1383 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): 1445 def _publish_message(self, mess_data_s: str, service_s: str, profile: str):
1384 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 1446 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
1385 service = jid.JID(service_s) 1447 service = jid.JID(service_s)
1386 client = self.host.get_client(profile) 1448 client = self.host.get_client(profile)
1422 raise ValueError( 1484 raise ValueError(
1423 f"No ActivityPub link found for {account!r}" 1485 f"No ActivityPub link found for {account!r}"
1424 ) 1486 )
1425 return href 1487 return href
1426 1488
1427 async def get_ap_actor_data_from_account(self, account: str) -> dict: 1489 async def get_ap_actor_data_from_account(
1490 self,
1491 requestor_actor_id: str,
1492 account: str
1493 ) -> dict:
1428 """Retrieve ActivityPub Actor data 1494 """Retrieve ActivityPub Actor data
1429 1495
1430 @param account: ActivityPub Actor identifier 1496 @param account: ActivityPub Actor identifier
1431 """ 1497 """
1432 href = await self.get_ap_actor_id_from_account(account) 1498 href = await self.get_ap_actor_id_from_account(account)
1433 return await self.ap_get(href) 1499 return await self.ap_get(href, requestor_actor_id)
1434 1500
1435 async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str: 1501 async def get_ap_inbox_from_id(
1502 self,
1503 requestor_actor_id: str,
1504 actor_id: str,
1505 use_shared: bool = True
1506 ) -> str:
1436 """Retrieve inbox of an actor_id 1507 """Retrieve inbox of an actor_id
1437 1508
1509 @param requestor_actor_id: ID of the actor doing the request.
1510 @param actor_id: ID of the actor from whom Inbox must be retrieved.
1438 @param use_shared: if True, and a shared inbox exists, it will be used instead of 1511 @param use_shared: if True, and a shared inbox exists, it will be used instead of
1439 the user inbox 1512 the user inbox
1440 """ 1513 """
1441 data = await self.get_actor_data(actor_id) 1514 data = await self.get_actor_data(requestor_actor_id, actor_id)
1442 if use_shared: 1515 if use_shared:
1443 try: 1516 try:
1444 return data["endpoints"]["sharedInbox"] 1517 return data["endpoints"]["sharedInbox"]
1445 except KeyError: 1518 except KeyError:
1446 pass 1519 pass
1447 return data["inbox"] 1520 return data["inbox"]
1448 1521
1449 @async_lru(maxsize=LRU_MAX_SIZE) 1522 @async_lru(maxsize=LRU_MAX_SIZE)
1450 async def get_ap_account_from_id(self, actor_id: str) -> str: 1523 async def get_ap_account_from_id(self, requestor_actor_id: str, actor_id: str) -> str:
1451 """Retrieve AP account from the ID URL 1524 """Retrieve AP account from the ID URL
1452 1525
1453 Works with external or local actor IDs. 1526 Works with external or local actor IDs.
1527 @param requestor_actor_id: ID of the actor doing the request.
1454 @param actor_id: AP ID of the actor (URL to the actor data) 1528 @param actor_id: AP ID of the actor (URL to the actor data)
1455 @return: AP handle 1529 @return: AP handle
1456 """ 1530 """
1457 if self.is_local_url(actor_id): 1531 if self.is_local_url(actor_id):
1458 url_type, url_args = self.parse_apurl(actor_id) 1532 url_type, url_args = self.parse_apurl(actor_id)
1472 f"{account!r} is not a valid local account (from {actor_id})" 1546 f"{account!r} is not a valid local account (from {actor_id})"
1473 ) 1547 )
1474 return account 1548 return account
1475 1549
1476 url_parsed = parse.urlparse(actor_id) 1550 url_parsed = parse.urlparse(actor_id)
1477 actor_data = await self.get_actor_data(actor_id) 1551 actor_data = await self.get_actor_data(requestor_actor_id, actor_id)
1478 username = actor_data.get("preferredUsername") 1552 username = actor_data.get("preferredUsername")
1479 if not username: 1553 if not username:
1480 raise exceptions.DataError( 1554 raise exceptions.DataError(
1481 'No "preferredUsername" field found, can\'t retrieve actor account' 1555 'No "preferredUsername" field found, can\'t retrieve actor account'
1482 ) 1556 )
1494 raise exceptions.DataError(msg) 1568 raise exceptions.DataError(msg)
1495 return account 1569 return account
1496 1570
1497 async def get_ap_items( 1571 async def get_ap_items(
1498 self, 1572 self,
1573 actor_id: str,
1499 collection: dict, 1574 collection: dict,
1500 max_items: Optional[int] = None, 1575 max_items: Optional[int] = None,
1501 chronological_pagination: bool = True, 1576 chronological_pagination: bool = True,
1502 after_id: Optional[str] = None, 1577 after_id: Optional[str] = None,
1503 start_index: Optional[int] = None, 1578 start_index: Optional[int] = None,
1504 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, 1579 parser: Optional[Callable[[str, dict], Awaitable[domish.Element]]] = None,
1505 only_ids: bool = False, 1580 only_ids: bool = False,
1506 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: 1581 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
1507 """Retrieve AP items and convert them to XMPP items 1582 """Retrieve AP items and convert them to XMPP items
1508 1583
1509 @param account: AP account handle to get items from 1584 @param actor_id: ID of the actor doing the request.
1585 @param collection: AP collection data.
1586 Items will be retrieved from this collection.
1510 @param max_items: maximum number of items to retrieve 1587 @param max_items: maximum number of items to retrieve
1511 retrieve all items by default 1588 retrieve all items by default
1512 @param chronological_pagination: get pages in chronological order 1589 @param chronological_pagination: get pages in chronological order
1513 AP use reversed chronological order for pagination, "first" page returns more 1590 AP use reversed chronological order for pagination, "first" page returns more
1514 recent items. If "chronological_pagination" is True, "last" AP page will be 1591 recent items. If "chronological_pagination" is True, "last" AP page will be
1567 previous_index = start_index - 1 1644 previous_index = start_index - 1
1568 retrieved_items = 0 1645 retrieved_items = 0
1569 current_page = collection["last"] 1646 current_page = collection["last"]
1570 while retrieved_items < count: 1647 while retrieved_items < count:
1571 page_data, items = await self.parse_ap_page( 1648 page_data, items = await self.parse_ap_page(
1572 current_page, parser, only_ids 1649 actor_id, current_page, parser, only_ids
1573 ) 1650 )
1574 if not items: 1651 if not items:
1575 log.warning(f"found an empty AP page at {current_page}") 1652 log.warning(f"found an empty AP page at {current_page}")
1576 return [], rsm_resp 1653 return [], rsm_resp
1577 page_start_idx = retrieved_items 1654 page_start_idx = retrieved_items
1602 page_items = [] 1679 page_items = []
1603 retrieved_items = 0 1680 retrieved_items = 0
1604 found_after_id = False 1681 found_after_id = False
1605 1682
1606 while retrieved_items < count: 1683 while retrieved_items < count:
1607 __, page_items = await self.parse_ap_page(page, parser, only_ids) 1684 __, page_items = await self.parse_ap_page(actor_id, page, parser, only_ids)
1608 if not page_items: 1685 if not page_items:
1609 break 1686 break
1610 retrieved_items += len(page_items) 1687 retrieved_items += len(page_items)
1611 if after_id is not None and not found_after_id: 1688 if after_id is not None and not found_after_id:
1612 # if we have an after_id, we ignore all items until the requested one is 1689 # if we have an after_id, we ignore all items until the requested one is
1658 "last": items[-1]["id"] 1735 "last": items[-1]["id"]
1659 }) 1736 })
1660 1737
1661 return items, rsm.RSMResponse(**rsm_resp) 1738 return items, rsm.RSMResponse(**rsm_resp)
1662 1739
1663 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: 1740 async def ap_item_2_mb_data_and_elt(self, requestor_actor_id: str, ap_item: dict) -> tuple[dict, domish.Element]:
1664 """Convert AP item to parsed microblog data and corresponding item element""" 1741 """Convert AP item to parsed microblog data and corresponding item element
1665 mb_data = await self.ap_item_2_mb_data(ap_item) 1742
1743 @param requestor_actor_id: ID of the actor requesting the conversion.
1744 @param ap_item: AP item to convert.
1745 @return: microblog and correspondign <item> element.
1746 """
1747 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, ap_item)
1666 item_elt = await self._m.mb_data_2_entry_elt( 1748 item_elt = await self._m.mb_data_2_entry_elt(
1667 self.client, mb_data, mb_data["id"], None, self._m.namespace 1749 self.client, mb_data, mb_data["id"], None, self._m.namespace
1668 ) 1750 )
1669 if "repeated" in mb_data["extra"]: 1751 if "repeated" in mb_data["extra"]:
1670 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] 1752 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
1671 else: 1753 else:
1672 item_elt["publisher"] = mb_data["author_jid"] 1754 item_elt["publisher"] = mb_data["author_jid"]
1673 return mb_data, item_elt 1755 return mb_data, item_elt
1674 1756
1675 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: 1757 async def ap_item_2_mb_elt(self, requestor_actor_id: str, ap_item: dict) -> domish.Element:
1676 """Convert AP item to XMPP item element""" 1758 """Convert AP item to XMPP item element
1677 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) 1759
1760 @param requestor_actor_id: ID of the actor requesting the conversion.
1761 @param ap_item: AP item to convert.
1762 @return: <item> element
1763 """
1764 __, item_elt = await self.ap_item_2_mb_data_and_elt(requestor_actor_id, ap_item)
1678 return item_elt 1765 return item_elt
1679 1766
1680 async def parse_ap_page( 1767 async def parse_ap_page(
1681 self, 1768 self,
1769 requestor_actor_id: str,
1682 page: Union[str, dict], 1770 page: Union[str, dict],
1683 parser: Callable[[dict], Awaitable[domish.Element]], 1771 parser: Callable[[str, dict], Awaitable[domish.Element]],
1684 only_ids: bool = False 1772 only_ids: bool = False
1685 ) -> Tuple[dict, List[domish.Element]]: 1773 ) -> Tuple[dict, List[domish.Element]]:
1686 """Convert AP objects from an AP page to XMPP items 1774 """Convert AP objects from an AP page to XMPP items
1687 1775
1776 @param requestor_actor_id: ID of the actor doing the request.
1688 @param page: Can be either url linking and AP page, or the page data directly 1777 @param page: Can be either url linking and AP page, or the page data directly
1689 @param parser: method to use to parse AP items and get XMPP item elements 1778 @param parser: method to use to parse AP items and get XMPP item elements
1690 @param only_ids: if True, only retrieve items IDs 1779 @param only_ids: if True, only retrieve items IDs
1691 @return: page data, pubsub items 1780 @return: page data, pubsub items
1692 """ 1781 """
1693 page_data = await self.ap_get_object(page) 1782 page_data = await self.ap_get_object(requestor_actor_id, page)
1694 if page_data is None: 1783 if page_data is None:
1695 log.warning('No data found in collection') 1784 log.warning('No data found in collection')
1696 return {}, [] 1785 return {}, []
1697 ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids) 1786 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "orderedItems", only_ids=only_ids)
1698 if ap_items is None: 1787 if ap_items is None:
1699 ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids) 1788 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "items", only_ids=only_ids)
1700 if not ap_items: 1789 if not ap_items:
1701 log.warning(f'No item field found in collection: {page_data!r}') 1790 log.warning(f'No item field found in collection: {page_data!r}')
1702 return page_data, [] 1791 return page_data, []
1703 else: 1792 else:
1704 log.warning( 1793 log.warning(
1707 items = [] 1796 items = []
1708 # AP Collections are in antichronological order, but we expect chronological in 1797 # AP Collections are in antichronological order, but we expect chronological in
1709 # Pubsub, thus we reverse it 1798 # Pubsub, thus we reverse it
1710 for ap_item in reversed(ap_items): 1799 for ap_item in reversed(ap_items):
1711 try: 1800 try:
1712 items.append(await parser(ap_item)) 1801 items.append(await parser(requestor_actor_id, ap_item))
1713 except (exceptions.DataError, NotImplementedError, error.StanzaError): 1802 except (exceptions.DataError, NotImplementedError, error.StanzaError):
1714 continue 1803 continue
1715 1804
1716 return page_data, items 1805 return page_data, items
1717 1806
1718 async def get_comments_nodes( 1807 async def get_comments_nodes(
1719 self, 1808 self,
1809 requestor_actor_id: str,
1720 item_id: str, 1810 item_id: str,
1721 parent_id: Optional[str] 1811 parent_id: Optional[str]
1722 ) -> Tuple[Optional[str], Optional[str]]: 1812 ) -> Tuple[Optional[str], Optional[str]]:
1723 """Get node where this item is and node to use for comments 1813 """Get node where this item is and node to use for comments
1724 1814
1725 if config option "comments_max_depth" is set, a common node will be used below the 1815 if config option "comments_max_depth" is set, a common node will be used below the
1726 given depth 1816 given depth
1817 @param requestor_actor_id: ID of the actor doing the request.
1727 @param item_id: ID of the reference item 1818 @param item_id: ID of the reference item
1728 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") 1819 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
1729 @return: a tuple with parent_node_id, comments_node_id: 1820 @return: a tuple with parent_node_id, comments_node_id:
1730 - parent_node_id is the ID of the node where reference item must be. None is 1821 - parent_node_id is the ID of the node where reference item must be. None is
1731 returned when the root node (i.e. not a comments node) must be used. 1822 returned when the root node (i.e. not a comments node) must be used.
1739 self._m.get_comments_node(item_id) 1830 self._m.get_comments_node(item_id)
1740 ) 1831 )
1741 parent_url = parent_id 1832 parent_url = parent_id
1742 parents = [] 1833 parents = []
1743 for __ in range(COMMENTS_MAX_PARENTS): 1834 for __ in range(COMMENTS_MAX_PARENTS):
1744 parent_item = await self.ap_get(parent_url) 1835 parent_item = await self.ap_get(parent_url, requestor_actor_id)
1745 parents.insert(0, parent_item) 1836 parents.insert(0, parent_item)
1746 parent_url = parent_item.get("inReplyTo") 1837 parent_url = parent_item.get("inReplyTo")
1747 if parent_url is None: 1838 if parent_url is None:
1748 break 1839 break
1749 parent_limit = self.comments_max_depth-1 1840 parent_limit = self.comments_max_depth-1
1757 return ( 1848 return (
1758 self._m.get_comments_node(last_level_item["id"]), 1849 self._m.get_comments_node(last_level_item["id"]),
1759 None 1850 None
1760 ) 1851 )
1761 1852
1762 async def ap_item_2_mb_data(self, ap_item: dict) -> dict: 1853 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict:
1763 """Convert AP activity or object to microblog data 1854 """Convert AP activity or object to microblog data
1764 1855
1856 @param actor_id: ID of the actor doing the request.
1765 @param ap_item: ActivityPub item to convert 1857 @param ap_item: ActivityPub item to convert
1766 Can be either an activity of an object 1858 Can be either an activity of an object
1767 @return: AP Item's Object and microblog data 1859 @return: AP Item's Object and microblog data
1768 @raise exceptions.DataError: something is invalid in the AP item 1860 @raise exceptions.DataError: something is invalid in the AP item
1769 @raise NotImplementedError: some AP data is not handled yet 1861 @raise NotImplementedError: some AP data is not handled yet
1770 @raise error.StanzaError: error while contacting the AP server 1862 @raise error.StanzaError: error while contacting the AP server
1771 """ 1863 """
1772 is_activity = self.is_activity(ap_item) 1864 is_activity = self.is_activity(ap_item)
1773 if is_activity: 1865 if is_activity:
1774 ap_object = await self.ap_get_object(ap_item, "object") 1866 ap_object = await self.ap_get_object(requestor_actor_id, ap_item, "object")
1775 if not ap_object: 1867 if not ap_object:
1776 log.warning(f'No "object" found in AP item {ap_item!r}') 1868 log.warning(f'No "object" found in AP item {ap_item!r}')
1777 raise exceptions.DataError 1869 raise exceptions.DataError
1778 else: 1870 else:
1779 ap_object = ap_item 1871 ap_object = ap_item
1829 attachment[key] = value 1921 attachment[key] = value
1830 attachments.append(attachment) 1922 attachments.append(attachment)
1831 1923
1832 # author 1924 # author
1833 if is_activity: 1925 if is_activity:
1834 authors = await self.ap_get_actors(ap_item, "actor") 1926 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor")
1835 else: 1927 else:
1836 authors = await self.ap_get_actors(ap_object, "attributedTo") 1928 authors = await self.ap_get_actors(requestor_actor_id, ap_object, "attributedTo")
1837 if len(authors) > 1: 1929 if len(authors) > 1:
1838 # we only keep first item as author 1930 # we only keep first item as author
1839 # TODO: handle multiple actors 1931 # TODO: handle multiple actors
1840 log.warning("multiple actors are not managed") 1932 log.warning("multiple actors are not managed")
1841 1933
1862 if "_repeated" in ap_item: 1954 if "_repeated" in ap_item:
1863 mb_data["extra"]["repeated"] = ap_item["_repeated"] 1955 mb_data["extra"]["repeated"] = ap_item["_repeated"]
1864 1956
1865 # comments 1957 # comments
1866 in_reply_to = ap_object.get("inReplyTo") 1958 in_reply_to = ap_object.get("inReplyTo")
1867 __, comments_node = await self.get_comments_nodes(item_id, in_reply_to) 1959 __, comments_node = await self.get_comments_nodes(
1960 requestor_actor_id, item_id, in_reply_to
1961 )
1868 if comments_node is not None: 1962 if comments_node is not None:
1869 comments_data = { 1963 comments_data = {
1870 "service": author_jid, 1964 "service": author_jid,
1871 "node": comments_node, 1965 "node": comments_node,
1872 "uri": uri.build_xmpp_uri( 1966 "uri": uri.build_xmpp_uri(
2129 target_ap_account = await self.get_ap_account_from_jid_and_node( 2223 target_ap_account = await self.get_ap_account_from_jid_and_node(
2130 service, node 2224 service, node
2131 ) 2225 )
2132 if self.is_virtual_jid(service): 2226 if self.is_virtual_jid(service):
2133 # service is a proxy JID for AP account 2227 # service is a proxy JID for AP account
2134 actor_data = await self.get_ap_actor_data_from_account(target_ap_account) 2228 actor_data = await self.get_ap_actor_data_from_account(
2229 url_actor,
2230 target_ap_account
2231 )
2135 followers = actor_data.get("followers") 2232 followers = actor_data.get("followers")
2136 else: 2233 else:
2137 # service is a real XMPP entity 2234 # service is a real XMPP entity
2138 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) 2235 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account)
2139 if followers: 2236 if followers:
2190 except KeyError: 2287 except KeyError:
2191 raise exceptions.DataError("Can't get ActivityPub actor inbox") 2288 raise exceptions.DataError("Can't get ActivityPub actor inbox")
2192 2289
2193 item_data = await self.mb_data_2_ap_item(client, mess_data) 2290 item_data = await self.mb_data_2_ap_item(client, mess_data)
2194 url_actor = item_data["actor"] 2291 url_actor = item_data["actor"]
2195 resp = await self.sign_and_post(inbox_url, url_actor, item_data) 2292 await self.ap_post(inbox_url, url_actor, item_data)
2196 2293
2197 async def ap_delete_item( 2294 async def ap_delete_item(
2198 self, 2295 self,
2199 jid_: jid.JID, 2296 jid_: jid.JID,
2200 node: Optional[str], 2297 node: Optional[str],
2264 """add the gateway workflow on post treatment""" 2361 """add the gateway workflow on post treatment"""
2265 if self.client is None: 2362 if self.client is None:
2266 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") 2363 log.debug(f"no client set, ignoring message: {message_elt.toXml()}")
2267 return True 2364 return True
2268 post_treat.addCallback( 2365 post_treat.addCallback(
2269 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) 2366 lambda mess_data: defer.ensureDeferred(self.on_message(client, mess_data))
2270 ) 2367 )
2271 return True 2368 return True
2272 2369
2273 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: 2370 async def on_message(self, client: SatXMPPEntity, mess_data: dict) -> dict:
2274 """Called once message has been parsed 2371 """Called once message has been parsed
2275 2372
2276 this method handle the conversion to AP items and posting 2373 this method handle the conversion to AP items and posting
2277 """ 2374 """
2278 if client != self.client: 2375 if client != self.client:
2286 if not mess_data["to"].user: 2383 if not mess_data["to"].user:
2287 log.warning( 2384 log.warning(
2288 f"ignoring message addressed to gateway itself: {mess_data}" 2385 f"ignoring message addressed to gateway itself: {mess_data}"
2289 ) 2386 )
2290 return mess_data 2387 return mess_data
2388 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost())
2291 2389
2292 actor_account = self._e.unescape(mess_data["to"].user) 2390 actor_account = self._e.unescape(mess_data["to"].user)
2293 try: 2391 try:
2294 actor_id = await self.get_ap_actor_id_from_account(actor_account) 2392 actor_id = await self.get_ap_actor_id_from_account(actor_account)
2295 except Exception as e: 2393 except Exception as e:
2296 log.warning( 2394 log.warning(
2297 f"Can't retrieve data on actor {actor_account}: {e}" 2395 f"Can't retrieve data on actor {actor_account}: {e}"
2298 ) 2396 )
2299 # TODO: send an error <message> 2397 # TODO: send an error <message>
2300 return mess_data 2398 return mess_data
2301 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) 2399 inbox = await self.get_ap_inbox_from_id(
2400 requestor_actor_id, actor_id, use_shared=False
2401 )
2302 2402
2303 try: 2403 try:
2304 language, message = next(iter(mess_data["message"].items())) 2404 language, message = next(iter(mess_data["message"].items()))
2305 except (KeyError, StopIteration): 2405 except (KeyError, StopIteration):
2306 log.warning(f"ignoring empty message: {mess_data}") 2406 log.warning(f"ignoring empty message: {mess_data}")
2332 "href": actor_id, 2432 "href": actor_id,
2333 "name": f"@{actor_account}", 2433 "name": f"@{actor_account}",
2334 }) 2434 })
2335 2435
2336 try: 2436 try:
2337 await self.sign_and_post(inbox, ap_item["actor"], ap_item) 2437 await self.ap_post(inbox, ap_item["actor"], ap_item)
2338 except Exception as e: 2438 except Exception as e:
2339 # TODO: send an error <message> 2439 # TODO: send an error <message>
2340 log.warning( 2440 log.warning(
2341 f"Can't send message to {inbox}: {e}" 2441 f"Can't send message to {inbox}: {e}"
2342 ) 2442 )
2355 if not self.is_local(from_jid): 2455 if not self.is_local(from_jid):
2356 log.debug( 2456 log.debug(
2357 f"ignoring retract request from non local jid {from_jid}" 2457 f"ignoring retract request from non local jid {from_jid}"
2358 ) 2458 )
2359 return False 2459 return False
2460 requestor_actor_id = self.build_apurl(
2461 TYPE_ACTOR,
2462 from_jid.userhost()
2463 )
2360 to_jid = jid.JID(message_elt["to"]) 2464 to_jid = jid.JID(message_elt["to"])
2361 if (to_jid.host != self.client.jid.full() or not to_jid.user): 2465 if (to_jid.host != self.client.jid.full() or not to_jid.user):
2362 # to_jid should be a virtual JID from this gateway 2466 # to_jid should be a virtual JID from this gateway
2363 raise exceptions.InternalError( 2467 raise exceptions.InternalError(
2364 f"Invalid destinee's JID: {to_jid.full()}" 2468 f"Invalid destinee's JID: {to_jid.full()}"
2365 ) 2469 )
2366 ap_account = self._e.unescape(to_jid.user) 2470 ap_account = self._e.unescape(to_jid.user)
2367 actor_id = await self.get_ap_actor_id_from_account(ap_account) 2471 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2368 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) 2472 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False)
2369 url_actor, ap_item = await self.ap_delete_item( 2473 url_actor, ap_item = await self.ap_delete_item(
2370 from_jid.userhostJID(), None, retract_elt["id"], public=False 2474 from_jid.userhostJID(), None, retract_elt["id"], public=False
2371 ) 2475 )
2372 resp = await self.sign_and_post(inbox, url_actor, ap_item) 2476 resp = await self.ap_post(inbox, url_actor, ap_item)
2373 return False 2477 return False
2374 2478
2375 async def _on_reference_received( 2479 async def _on_reference_received(
2376 self, 2480 self,
2377 client: SatXMPPEntity, 2481 client: SatXMPPEntity,
2453 "type": TYPE_MENTION, 2557 "type": TYPE_MENTION,
2454 "href": actor_id, 2558 "href": actor_id,
2455 "name": ap_account, 2559 "name": ap_account,
2456 }) 2560 })
2457 2561
2458 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) 2562 requestor_actor_id = ap_item["actor"]
2459 2563 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False)
2460 resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item) 2564
2565 await self.ap_post(inbox, requestor_actor_id, ap_item)
2461 2566
2462 return False 2567 return False
2463 2568
2464 async def new_reply_to_xmpp_item( 2569 async def new_reply_to_xmpp_item(
2465 self, 2570 self,
2510 # we don't have a comment node set for this item 2615 # we don't have a comment node set for this item
2511 from libervia.backend.tools.xml_tools import pp_elt 2616 from libervia.backend.tools.xml_tools import pp_elt
2512 log.info(f"{pp_elt(parent_item_elt.toXml())}") 2617 log.info(f"{pp_elt(parent_item_elt.toXml())}")
2513 raise NotImplementedError() 2618 raise NotImplementedError()
2514 else: 2619 else:
2515 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) 2620 requestor_actor_id = self.build_apurl(
2621 TYPE_ACTOR,
2622 await self.get_ap_account_from_jid_and_node(comment_service, comment_node)
2623 )
2624 __, item_elt = await self.ap_item_2_mb_data_and_elt(
2625 requestor_actor_id,
2626 ap_item
2627 )
2516 await self._p.publish(client, comment_service, comment_node, [item_elt]) 2628 await self._p.publish(client, comment_service, comment_node, [item_elt])
2517 await self.notify_mentions( 2629 await self.notify_mentions(
2518 targets, mentions, comment_service, comment_node, item_elt["id"] 2630 targets, mentions, comment_service, comment_node, item_elt["id"]
2519 ) 2631 )
2520 2632
2546 continue 2658 continue
2547 if not self.is_local_url(value): 2659 if not self.is_local_url(value):
2548 continue 2660 continue
2549 target_type = self.parse_apurl(value)[0] 2661 target_type = self.parse_apurl(value)[0]
2550 if target_type != TYPE_ACTOR: 2662 if target_type != TYPE_ACTOR:
2551 log.debug(f"ignoring non actor type as a target: {href}") 2663 log.debug(f"ignoring non actor type as a target: {value}")
2552 else: 2664 else:
2553 targets.setdefault(target_type, set()).add(value) 2665 targets.setdefault(target_type, set()).add(value)
2554 2666
2555 mentions = [] 2667 mentions = []
2556 tags = item.get("tag") 2668 tags = item.get("tag")
2598 else: 2710 else:
2599 await self.handle_pubsub_ap_item( 2711 await self.handle_pubsub_ap_item(
2600 client, targets, mentions, destinee, node, item, is_public 2712 client, targets, mentions, destinee, node, item, is_public
2601 ) 2713 )
2602 2714
2715 def get_requestor_actor_id_from_targets(
2716 self,
2717 targets: set[str]
2718 ) -> str:
2719 """Find local actor to use as requestor_actor_id from request targets.
2720
2721 A local actor must be used to sign HTTP request, notably HTTP GET request for AP
2722 instance checking signature, such as Mastodon when set in "secure mode".
2723
2724 This method check a set of targets and use the first local one.
2725
2726 If none match, a generic local actor is used.
2727
2728 @param targets: set of actor IDs to which the current request is sent.
2729 @return: local actor ID to use as requestor_actor_id.
2730 """
2731 try:
2732 return next(t for t in targets if self.is_local_url(t))
2733 except StopIteration:
2734 log.warning(
2735 f"Can't find local target to use as requestor ID: {targets!r}"
2736 )
2737 return self.build_apurl(
2738 TYPE_ACTOR, f"libervia@{self.public_url}"
2739 )
2740
2603 async def handle_message_ap_item( 2741 async def handle_message_ap_item(
2604 self, 2742 self,
2605 client: SatXMPPEntity, 2743 client: SatXMPPEntity,
2606 targets: Dict[str, Set[str]], 2744 targets: dict[str, Set[str]],
2607 mentions: List[Dict[str, str]], 2745 mentions: list[Dict[str, str]],
2608 destinee: Optional[jid.JID], 2746 destinee: jid.JID|None,
2609 item: dict, 2747 item: dict,
2610 ) -> None: 2748 ) -> None:
2611 """Parse and deliver direct AP items translating to XMPP messages 2749 """Parse and deliver direct AP items translating to XMPP messages
2612 2750
2613 @param targets: actors where the item must be delivered 2751 @param targets: actors where the item must be delivered
2614 @param destinee: jid of the destinee, 2752 @param destinee: jid of the destinee,
2615 @param item: AP object payload 2753 @param item: AP object payload
2616 """ 2754 """
2755 targets_urls = {t for t_set in targets.values() for t in t_set}
2756 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls)
2617 targets_jids = { 2757 targets_jids = {
2618 await self.get_jid_from_id(t) 2758 await self.get_jid_from_id(requestor_actor_id, url)
2619 for t_set in targets.values() 2759 for url in targets_urls
2620 for t in t_set
2621 } 2760 }
2622 if destinee is not None: 2761 if destinee is not None:
2623 targets_jids.add(destinee) 2762 targets_jids.add(destinee)
2624 mb_data = await self.ap_item_2_mb_data(item) 2763 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item)
2625 extra = { 2764 extra = {
2626 "origin_id": mb_data["id"] 2765 "origin_id": mb_data["id"]
2627 } 2766 }
2628 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) 2767 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS)
2629 if attachments: 2768 if attachments:
2641 ) 2780 )
2642 await defer.DeferredList(defer_l) 2781 await defer.DeferredList(defer_l)
2643 2782
2644 async def notify_mentions( 2783 async def notify_mentions(
2645 self, 2784 self,
2646 targets: Dict[str, Set[str]], 2785 targets: dict[str, set[str]],
2647 mentions: List[Dict[str, str]], 2786 mentions: list[dict[str, str]],
2648 service: jid.JID, 2787 service: jid.JID,
2649 node: str, 2788 node: str,
2650 item_id: str, 2789 item_id: str,
2651 ) -> None: 2790 ) -> None:
2652 """Send mention notifications to recipients and mentioned entities 2791 """Send mention notifications to recipients and mentioned entities
2655 2794
2656 Mentions are also sent to recipients as they are primary audience (see 2795 Mentions are also sent to recipients as they are primary audience (see
2657 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). 2796 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes).
2658 2797
2659 """ 2798 """
2799 targets_urls = {t for t_set in targets.values() for t in t_set}
2800 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls)
2660 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) 2801 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id)
2661 seen = set() 2802 seen = set()
2662 # we start with explicit mentions because mentions' content will be used in the 2803 # we start with explicit mentions because mentions' content will be used in the
2663 # future to fill "begin" and "end" reference attributes (we can't do it at the 2804 # future to fill "begin" and "end" reference attributes (we can't do it at the
2664 # moment as there is no way to specify the XML element to use in the blog item). 2805 # moment as there is no way to specify the XML element to use in the blog item).
2665 for mention in mentions: 2806 for mention in mentions:
2666 mentioned_jid = await self.get_jid_from_id(mention["uri"]) 2807 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"])
2667 self._refs.send_reference( 2808 self._refs.send_reference(
2668 self.client, 2809 self.client,
2669 to_jid=mentioned_jid, 2810 to_jid=mentioned_jid,
2670 anchor=anchor 2811 anchor=anchor
2671 ) 2812 )
2672 seen.add(mentioned_jid) 2813 seen.add(mentioned_jid)
2673 2814
2674 remaining = { 2815 remaining = {
2675 await self.get_jid_from_id(t) 2816 await self.get_jid_from_id(requestor_actor_id, t)
2676 for t_set in targets.values() 2817 for t_set in targets.values()
2677 for t in t_set 2818 for t in t_set
2678 } - seen 2819 } - seen
2679 for target in remaining: 2820 for target in remaining:
2680 self._refs.send_reference( 2821 self._refs.send_reference(
2684 ) 2825 )
2685 2826
2686 async def handle_pubsub_ap_item( 2827 async def handle_pubsub_ap_item(
2687 self, 2828 self,
2688 client: SatXMPPEntity, 2829 client: SatXMPPEntity,
2689 targets: Dict[str, Set[str]], 2830 targets: dict[str, set[str]],
2690 mentions: List[Dict[str, str]], 2831 mentions: list[dict[str, str]],
2691 destinee: Optional[jid.JID], 2832 destinee: jid.JID|None,
2692 node: str, 2833 node: str,
2693 item: dict, 2834 item: dict,
2694 public: bool 2835 public: bool
2695 ) -> None: 2836 ) -> None:
2696 """Analyse, cache and deliver AP items translating to Pubsub 2837 """Analyse, cache and deliver AP items translating to Pubsub
2700 @param node: XMPP pubsub node 2841 @param node: XMPP pubsub node
2701 @param item: AP object payload 2842 @param item: AP object payload
2702 @param public: True if the item is public 2843 @param public: True if the item is public
2703 """ 2844 """
2704 # XXX: "public" is not used for now 2845 # XXX: "public" is not used for now
2846 targets_urls = {t for t_set in targets.values() for t in t_set}
2847 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls)
2705 service = client.jid 2848 service = client.jid
2706 in_reply_to = item.get("inReplyTo") 2849 in_reply_to = item.get("inReplyTo")
2707 2850
2708 if in_reply_to and isinstance(in_reply_to, list): 2851 if in_reply_to and isinstance(in_reply_to, list):
2709 in_reply_to = in_reply_to[0] 2852 in_reply_to = in_reply_to[0]
2713 await self.new_reply_to_xmpp_item(client, item, targets, mentions) 2856 await self.new_reply_to_xmpp_item(client, item, targets, mentions)
2714 return 2857 return
2715 2858
2716 # this item is a reply to an AP item, we use or create a corresponding node 2859 # this item is a reply to an AP item, we use or create a corresponding node
2717 # for comments 2860 # for comments
2718 parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to) 2861 parent_node, __ = await self.get_comments_nodes(
2862 requestor_actor_id,
2863 item["id"],
2864 in_reply_to
2865 )
2719 node = parent_node or node 2866 node = parent_node or node
2720 cached_node = await self.host.memory.storage.get_pubsub_node( 2867 cached_node = await self.host.memory.storage.get_pubsub_node(
2721 client, service, node, with_subscriptions=True, create=True, 2868 client, service, node, with_subscriptions=True, create=True,
2722 create_kwargs={"subscribed": True} 2869 create_kwargs={"subscribed": True}
2723 ) 2870 )
2733 f"due to a cache purge. We synchronise the node\n{item}" 2880 f"due to a cache purge. We synchronise the node\n{item}"
2734 2881
2735 ) 2882 )
2736 return 2883 return
2737 if item.get("type") == TYPE_EVENT: 2884 if item.get("type") == TYPE_EVENT:
2738 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) 2885 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(
2739 else: 2886 requestor_actor_id,
2740 data, item_elt = await self.ap_item_2_mb_data_and_elt(item) 2887 item
2888 )
2889 else:
2890 data, item_elt = await self.ap_item_2_mb_data_and_elt(
2891 requestor_actor_id,
2892 item
2893 )
2741 await self.host.memory.storage.cache_pubsub_items( 2894 await self.host.memory.storage.cache_pubsub_items(
2742 client, 2895 client,
2743 cached_node, 2896 cached_node,
2744 [item_elt], 2897 [item_elt],
2745 [data] 2898 [data]