Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4259:49019947cc76
component AP Gateway: implement HTTP GET signature.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 05 Jun 2024 22:34:09 +0200 |
parents | 6784d07b99c8 |
children | d366d90a71aa |
comparison
equal
deleted
inserted
replaced
4258:ba28ca268f4a | 4259:49019947cc76 |
---|---|
41 from cryptography.hazmat.primitives import serialization | 41 from cryptography.hazmat.primitives import serialization |
42 from cryptography.hazmat.primitives import hashes | 42 from cryptography.hazmat.primitives import hashes |
43 from cryptography.hazmat.primitives.asymmetric import rsa | 43 from cryptography.hazmat.primitives.asymmetric import rsa |
44 from cryptography.hazmat.primitives.asymmetric import padding | 44 from cryptography.hazmat.primitives.asymmetric import padding |
45 import dateutil | 45 import dateutil |
46 from dateutil.parser import parserinfo | |
47 import shortuuid | 46 import shortuuid |
48 from sqlalchemy.exc import IntegrityError | |
49 import treq | 47 import treq |
50 from treq.response import _Response as TReqResponse | 48 from treq.response import _Response as TReqResponse |
51 from twisted.internet import defer, reactor, threads | 49 from twisted.internet import defer, reactor, threads |
52 from twisted.web import http | 50 from twisted.web import http |
53 from twisted.words.protocols.jabber import error, jid | 51 from twisted.words.protocols.jabber import error, jid |
139 | 137 |
140 def __init__(self, host): | 138 def __init__(self, host): |
141 self.host = host | 139 self.host = host |
142 self.initialised = False | 140 self.initialised = False |
143 self.client = None | 141 self.client = None |
142 self.http_sign_get = True | |
144 self._p = host.plugins["XEP-0060"] | 143 self._p = host.plugins["XEP-0060"] |
145 self._a = host.plugins["XEP-0084"] | 144 self._a = host.plugins["XEP-0084"] |
146 self._e = host.plugins["XEP-0106"] | 145 self._e = host.plugins["XEP-0106"] |
147 self._m = host.plugins["XEP-0277"] | 146 self._m = host.plugins["XEP-0277"] |
148 self._v = host.plugins["XEP-0292"] | 147 self._v = host.plugins["XEP-0292"] |
259 if connection_type not in ('http', 'https'): | 258 if connection_type not in ('http', 'https'): |
260 raise exceptions.ConfigError( | 259 raise exceptions.ConfigError( |
261 'bad ap-gateay http_connection_type, you must use one of "http" or ' | 260 'bad ap-gateay http_connection_type, you must use one of "http" or ' |
262 '"https"' | 261 '"https"' |
263 ) | 262 ) |
263 self.http_sign_get = C.bool( | |
264 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE) | |
265 ) | |
264 self.max_items = int(self.host.memory.config_get( | 266 self.max_items = int(self.host.memory.config_get( |
265 CONF_SECTION, 'new_node_max_items', 50 | 267 CONF_SECTION, 'new_node_max_items', 50 |
266 | 268 |
267 )) | 269 )) |
268 self.comments_max_depth = int(self.host.memory.config_get( | 270 self.comments_max_depth = int(self.host.memory.config_get( |
360 await self.convert_and_post_items( | 362 await self.convert_and_post_items( |
361 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | 363 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, |
362 itemsEvent.items | 364 itemsEvent.items |
363 ) | 365 ) |
364 | 366 |
365 async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity: | 367 async def get_virtual_client( |
368 self, | |
369 requestor_actor_id: str, | |
370 actor_id: str | |
371 ) -> SatXMPPEntity: | |
366 """Get client for this component with a specified jid | 372 """Get client for this component with a specified jid |
367 | 373 |
368 This is needed to perform operations with the virtual JID corresponding to the AP | 374 This is needed to perform operations with the virtual JID corresponding to the AP |
369 actor instead of the JID of the gateway itself. | 375 actor instead of the JID of the gateway itself. |
376 @param requestor_actor_id: originating actor ID (URL) | |
370 @param actor_id: ID of the actor | 377 @param actor_id: ID of the actor |
371 @return: virtual client | 378 @return: virtual client |
372 """ | 379 """ |
373 local_jid = await self.get_jid_from_id(actor_id) | 380 local_jid = await self.get_jid_from_id(requestor_actor_id, actor_id) |
374 return self.client.get_virtual_client(local_jid) | 381 return self.client.get_virtual_client(local_jid) |
375 | 382 |
376 def is_activity(self, data: dict) -> bool: | 383 def is_activity(self, data: dict) -> bool: |
377 """Return True if the data has an activity type""" | 384 """Return True if the data has an activity type""" |
378 try: | 385 try: |
379 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER | 386 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER |
380 except (KeyError, TypeError): | 387 except (KeyError, TypeError): |
381 return False | 388 return False |
382 | 389 |
383 async def ap_get(self, url: str) -> dict: | 390 async def ap_get(self, url: str, requestor_actor_id: str) -> dict: |
384 """Retrieve AP JSON from given URL | 391 """Retrieve AP JSON from given URL with HTTP Signature |
385 | 392 |
393 @param url: AP server endpoint | |
394 @param requestor_actor_id: originating actor ID (URL) | |
386 @raise error.StanzaError: "service-unavailable" is sent when something went wrong | 395 @raise error.StanzaError: "service-unavailable" is sent when something went wrong |
387 with AP server | 396 with AP server |
388 """ | 397 """ |
389 resp = await treq.get( | 398 if self.http_sign_get: |
390 url, | 399 headers = self._generate_signed_headers(url, requestor_actor_id, method="get") |
391 headers = { | 400 else: |
392 "Accept": [MEDIA_TYPE_AP], | 401 headers = {} |
393 } | 402 headers["Accept"] = MEDIA_TYPE_AP |
394 ) | 403 |
404 resp = await treq.get(url, headers=headers) | |
395 if resp.code >= 300: | 405 if resp.code >= 300: |
396 text = await resp.text() | 406 text = await resp.text() |
397 if resp.code == 404: | 407 if resp.code == 404: |
398 raise exceptions.NotFound(f"Can't find resource at {url}") | 408 raise exceptions.NotFound(f"Can't find resource at {url}") |
399 else: | 409 else: |
405 raise error.StanzaError( | 415 raise error.StanzaError( |
406 "service-unavailable", | 416 "service-unavailable", |
407 text=f"Can't get AP data at {url}: {e}" | 417 text=f"Can't get AP data at {url}: {e}" |
408 ) | 418 ) |
409 | 419 |
410 @overload | 420 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse: |
411 async def ap_get_object(self, data: dict, key: str) -> Optional[dict]: | 421 """Sign a document and post it to AP server |
412 ... | 422 |
423 @param url: AP server endpoint | |
424 @param requestor_actor_id: originating actor ID (URL) | |
425 @param doc: document to send | |
426 """ | |
427 if self.verbose: | |
428 __, actor_args = self.parse_apurl(requestor_actor_id) | |
429 actor_account = actor_args[0] | |
430 to_log = [ | |
431 "", | |
432 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" | |
433 ] | |
434 | |
435 body = json.dumps(doc).encode() | |
436 headers = self._generate_signed_headers(url, requestor_actor_id, method="post", body=body) | |
437 headers["Content-Type"] = MEDIA_TYPE_AP | |
438 | |
439 if self.verbose: | |
440 if self.verbose >= 3: | |
441 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items()) | |
442 to_log.append(f" headers:\n{h_to_log}") | |
443 to_log.append("---") | |
444 log.info("\n".join(to_log)) | |
445 | |
446 resp = await treq.post( | |
447 url, | |
448 body, | |
449 headers=headers | |
450 ) | |
451 if resp.code >= 300: | |
452 text = await resp.text() | |
453 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") | |
454 elif self.verbose: | |
455 log.info(f"==> response code: {resp.code}") | |
456 return resp | |
457 | |
458 def _generate_signed_headers( | |
459 self, | |
460 url: str, | |
461 actor_id: str, | |
462 method: str, | |
463 body: bytes|None = None | |
464 ) -> dict[str, str]: | |
465 """Generate HTTP headers with signature for a given request | |
466 | |
467 @param url: AP server endpoint | |
468 @param actor_id: originating actor ID (URL) | |
469 @param method: HTTP method (e.g., 'get', 'post') | |
470 @param body: request body if any | |
471 @return: signed headers | |
472 """ | |
473 p_url = parse.urlparse(url) | |
474 headers = { | |
475 "(request-target)": f"{method} {p_url.path}", | |
476 "Host": p_url.hostname, | |
477 "Date": http.datetimeToString().decode() | |
478 } | |
479 | |
480 if body: | |
481 digest_algo, digest_hash = self.get_digest(body) | |
482 headers["Digest"] = f"{digest_algo}={digest_hash}" | |
483 | |
484 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers) | |
485 return headers | |
413 | 486 |
414 @overload | 487 @overload |
415 async def ap_get_object( | 488 async def ap_get_object( |
416 self, data: Union[str, dict], key: None = None | 489 self, requestor_actor_id: str, data: dict, key: str |
490 ) -> dict|None: | |
491 ... | |
492 | |
493 @overload | |
494 async def ap_get_object( | |
495 self, requestor_actor_id: str, data: Union[str, dict], key: None = None | |
417 ) -> dict: | 496 ) -> dict: |
418 ... | 497 ... |
419 | 498 |
420 async def ap_get_object(self, data, key = None): | 499 async def ap_get_object(self, requestor_actor_id: str, data, key = None) -> dict|None: |
421 """Retrieve an AP object, dereferencing when necessary | 500 """Retrieve an AP object, dereferencing when necessary |
422 | 501 |
423 This method is to be used with attributes marked as "Functional" in | 502 This method is to be used with attributes marked as "Functional" in |
424 https://www.w3.org/TR/activitystreams-vocabulary | 503 https://www.w3.org/TR/activitystreams-vocabulary |
504 @param requestor_actor_id: ID of the actor doing the request. | |
425 @param data: AP object where an other object is looked for, or the object itself | 505 @param data: AP object where an other object is looked for, or the object itself |
426 @param key: name of the object to look for, or None if data is the object directly | 506 @param key: name of the object to look for, or None if data is the object directly |
427 @return: found object if any | 507 @return: found object if any |
428 """ | 508 """ |
429 if key is not None: | 509 if key is not None: |
438 return value | 518 return value |
439 elif isinstance(value, str): | 519 elif isinstance(value, str): |
440 if self.is_local_url(value): | 520 if self.is_local_url(value): |
441 return await self.ap_get_local_object(value) | 521 return await self.ap_get_local_object(value) |
442 else: | 522 else: |
443 return await self.ap_get(value) | 523 return await self.ap_get(value, requestor_actor_id) |
444 else: | 524 else: |
445 raise NotImplementedError( | 525 raise NotImplementedError( |
446 "was expecting a string or a dict, got {type(value)}: {value!r}}" | 526 "was expecting a string or a dict, got {type(value)}: {value!r}}" |
447 ) | 527 ) |
448 | 528 |
515 'only object from "item" URLs can be retrieved for now' | 595 'only object from "item" URLs can be retrieved for now' |
516 ) | 596 ) |
517 | 597 |
518 async def ap_get_list( | 598 async def ap_get_list( |
519 self, | 599 self, |
600 requestor_actor_id: str, | |
520 data: dict, | 601 data: dict, |
521 key: str, | 602 key: str, |
522 only_ids: bool = False | 603 only_ids: bool = False |
523 ) -> Optional[List[Dict[str, Any]]]: | 604 ) -> Optional[List[Dict[str, Any]]]: |
524 """Retrieve a list of objects from AP data, dereferencing when necessary | 605 """Retrieve a list of objects from AP data, dereferencing when necessary |
525 | 606 |
526 This method is to be used with non functional vocabularies. Use ``ap_get_object`` | 607 This method is to be used with non functional vocabularies. Use ``ap_get_object`` |
527 otherwise. | 608 otherwise. |
528 If the value is a dictionary, it will be wrapped in a list | 609 If the value is a dictionary, it will be wrapped in a list |
610 @param requestor_actor_id: ID of the actor doing the request. | |
529 @param data: AP object where a list of objects is looked for | 611 @param data: AP object where a list of objects is looked for |
530 @param key: key of the list to look for | 612 @param key: key of the list to look for |
531 @param only_ids: if Trye, only items IDs are retrieved | 613 @param only_ids: if Trye, only items IDs are retrieved |
532 @return: list of objects, or None if the key is not present | 614 @return: list of objects, or None if the key is not present |
533 """ | 615 """ |
536 return None | 618 return None |
537 elif isinstance(value, str): | 619 elif isinstance(value, str): |
538 if self.is_local_url(value): | 620 if self.is_local_url(value): |
539 value = await self.ap_get_local_object(value) | 621 value = await self.ap_get_local_object(value) |
540 else: | 622 else: |
541 value = await self.ap_get(value) | 623 value = await self.ap_get(value, requestor_actor_id) |
542 if isinstance(value, dict): | 624 if isinstance(value, dict): |
543 return [value] | 625 return [value] |
544 if not isinstance(value, list): | 626 if not isinstance(value, list): |
545 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") | 627 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") |
546 if only_ids: | 628 if only_ids: |
547 return [ | 629 return [ |
548 {"id": v["id"]} if isinstance(v, dict) else {"id": v} | 630 {"id": v["id"]} if isinstance(v, dict) else {"id": v} |
549 for v in value | 631 for v in value |
550 ] | 632 ] |
551 else: | 633 else: |
552 return [await self.ap_get_object(i) for i in value] | 634 return [await self.ap_get_object(requestor_actor_id, i) for i in value] |
553 | 635 |
554 async def ap_get_actors( | 636 async def ap_get_actors( |
555 self, | 637 self, |
638 requestor_actor_id: str, | |
556 data: dict, | 639 data: dict, |
557 key: str, | 640 key: str, |
558 as_account: bool = True | 641 as_account: bool = True |
559 ) -> List[str]: | 642 ) -> List[str]: |
560 """Retrieve AP actors from data | 643 """Retrieve AP actors from data |
561 | 644 |
645 @param requestor_actor_id: ID of the actor doing the request. | |
562 @param data: AP object containing a field with actors | 646 @param data: AP object containing a field with actors |
563 @param key: field to use to retrieve actors | 647 @param key: field to use to retrieve actors |
564 @param as_account: if True returns account handles, otherwise will return actor | 648 @param as_account: if True returns account handles, otherwise will return actor |
565 IDs | 649 IDs |
566 @raise exceptions.DataError: there is not actor data or it is invalid | 650 @raise exceptions.DataError: there is not actor data or it is invalid |
589 if not value: | 673 if not value: |
590 raise exceptions.DataError( | 674 raise exceptions.DataError( |
591 f"list of actors is empty" | 675 f"list of actors is empty" |
592 ) | 676 ) |
593 if as_account: | 677 if as_account: |
594 return [await self.get_ap_account_from_id(actor_id) for actor_id in value] | 678 return [ |
679 await self.get_ap_account_from_id(requestor_actor_id, actor_id) | |
680 for actor_id in value | |
681 ] | |
595 else: | 682 else: |
596 return value | 683 return value |
597 | 684 |
598 async def ap_get_sender_actor( | 685 async def ap_get_sender_actor( |
599 self, | 686 self, |
687 requestor_actor_id: str, | |
600 data: dict, | 688 data: dict, |
601 ) -> str: | 689 ) -> str: |
602 """Retrieve actor who sent data | 690 """Retrieve actor who sent data |
603 | 691 |
604 This is done by checking "actor" field first, then "attributedTo" field. | 692 This is done by checking "actor" field first, then "attributedTo" field. |
605 Only the first found actor is taken into account | 693 Only the first found actor is taken into account |
694 @param requestor_actor_id: ID of the actor doing the request. | |
606 @param data: AP object | 695 @param data: AP object |
607 @return: actor id of the sender | 696 @return: actor id of the sender |
608 @raise exceptions.NotFound: no actor has been found in data | 697 @raise exceptions.NotFound: no actor has been found in data |
609 """ | 698 """ |
610 try: | 699 try: |
611 actors = await self.ap_get_actors(data, "actor", as_account=False) | 700 actors = await self.ap_get_actors(requestor_actor_id, data, "actor", as_account=False) |
612 except exceptions.DataError: | 701 except exceptions.DataError: |
613 actors = None | 702 actors = None |
614 if not actors: | 703 if not actors: |
615 try: | 704 try: |
616 actors = await self.ap_get_actors(data, "attributedTo", as_account=False) | 705 actors = await self.ap_get_actors(requestor_actor_id, data, "attributedTo", as_account=False) |
617 except exceptions.DataError: | 706 except exceptions.DataError: |
618 raise exceptions.NotFound( | 707 raise exceptions.NotFound( |
619 'actor not specified in "actor" or "attributedTo"' | 708 'actor not specified in "actor" or "attributedTo"' |
620 ) | 709 ) |
621 try: | 710 try: |
706 return ( | 795 return ( |
707 ("pubsub", "service") in host_disco.identities | 796 ("pubsub", "service") in host_disco.identities |
708 and not ("pubsub", "pep") in host_disco.identities | 797 and not ("pubsub", "pep") in host_disco.identities |
709 ) | 798 ) |
710 | 799 |
711 async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: | 800 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str|None]: |
712 """Decode raw AP account handle to get XMPP JID and Pubsub Node | 801 """Decode raw AP account handle to get XMPP JID and Pubsub Node |
713 | 802 |
714 Username are case insensitive. | 803 Username are case insensitive. |
715 | 804 |
716 By default, the username correspond to local username (i.e. username from | 805 By default, the username correspond to local username (i.e. username from |
817 self.client.jid.host, | 906 self.client.jid.host, |
818 None | 907 None |
819 ) | 908 ) |
820 ) | 909 ) |
821 | 910 |
822 async def get_jid_from_id(self, actor_id: str) -> jid.JID: | 911 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID: |
823 """Compute JID linking to an AP Actor ID | 912 """Compute JID linking to an AP Actor ID |
824 | 913 |
825 The local jid is computer by escaping AP actor handle and using it as local part | 914 The local jid is computer by escaping AP actor handle and using it as local part |
826 of JID, where domain part is this gateway own JID | 915 of JID, where domain part is this gateway own JID |
827 If the actor_id comes from local server (checked with self.public_url), it means | 916 If the actor_id comes from local server (checked with self.public_url), it means |
828 that we have an XMPP entity, and the original JID is returned | 917 that we have an XMPP entity, and the original JID is returned |
918 | |
919 @param requestor_actor_id: ID of the actor doing the request. | |
920 @param actor_id: ID of the actor to generate JID from. | |
921 @return: generated JID. | |
829 """ | 922 """ |
830 if self.is_local_url(actor_id): | 923 if self.is_local_url(actor_id): |
831 request_type, extra_args = self.parse_apurl(actor_id) | 924 request_type, extra_args = self.parse_apurl(actor_id) |
832 if request_type != TYPE_ACTOR or len(extra_args) != 1: | 925 if request_type != TYPE_ACTOR or len(extra_args) != 1: |
833 raise ValueError(f"invalid actor id: {actor_id!r}") | 926 raise ValueError(f"invalid actor id: {actor_id!r}") |
834 actor_jid, __ = await self.get_jid_and_node(extra_args[0]) | 927 actor_jid, __ = await self.get_jid_and_node(extra_args[0]) |
835 return actor_jid | 928 return actor_jid |
836 | 929 |
837 account = await self.get_ap_account_from_id(actor_id) | 930 account = await self.get_ap_account_from_id(requestor_actor_id, actor_id) |
838 return self.get_local_jid_from_account(account) | 931 return self.get_local_jid_from_account(account) |
839 | 932 |
840 def parse_apurl(self, url: str) -> Tuple[str, List[str]]: | 933 def parse_apurl(self, url: str) -> tuple[str, list[str]]: |
841 """Parse an URL leading to an AP endpoint | 934 """Parse an URL leading to an AP endpoint |
842 | 935 |
843 @param url: URL to parse (schema is not mandatory) | 936 @param url: URL to parse (schema is not mandatory) |
844 @return: endpoint type and extra arguments | 937 @return: endpoint type and extra arguments |
845 """ | 938 """ |
893 if algo != "SHA-256": | 986 if algo != "SHA-256": |
894 raise NotImplementedError("only SHA-256 is implemented for now") | 987 raise NotImplementedError("only SHA-256 is implemented for now") |
895 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() | 988 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() |
896 | 989 |
897 @async_lru(maxsize=LRU_MAX_SIZE) | 990 @async_lru(maxsize=LRU_MAX_SIZE) |
898 async def get_actor_data(self, actor_id) -> dict: | 991 async def get_actor_data(self, requestor_actor_id: str, actor_id: str) -> dict: |
899 """Retrieve actor data with LRU cache""" | 992 """Retrieve actor data with LRU cache""" |
900 return await self.ap_get(actor_id) | 993 return await self.ap_get(actor_id, requestor_actor_id) |
901 | 994 |
902 @async_lru(maxsize=LRU_MAX_SIZE) | 995 @async_lru(maxsize=LRU_MAX_SIZE) |
903 async def get_actor_pub_key_data( | 996 async def get_actor_pub_key_data( |
904 self, | 997 self, |
998 requestor_actor_id: str, | |
905 actor_id: str | 999 actor_id: str |
906 ) -> Tuple[str, str, rsa.RSAPublicKey]: | 1000 ) -> Tuple[str, str, rsa.RSAPublicKey]: |
907 """Retrieve Public Key data from actor ID | 1001 """Retrieve Public Key data from actor ID |
908 | 1002 |
1003 @param requestor_actor_id: ID of the actor doing the request. | |
909 @param actor_id: actor ID (url) | 1004 @param actor_id: actor ID (url) |
910 @return: key_id, owner and public_key | 1005 @return: key_id, owner and public_key |
911 @raise KeyError: publicKey is missing from actor data | 1006 @raise KeyError: publicKey is missing from actor data |
912 """ | 1007 """ |
913 actor_data = await self.get_actor_data(actor_id) | 1008 actor_data = await self.get_actor_data(requestor_actor_id, actor_id) |
914 pub_key_data = actor_data["publicKey"] | 1009 pub_key_data = actor_data["publicKey"] |
915 key_id = pub_key_data["id"] | 1010 key_id = pub_key_data["id"] |
916 owner = pub_key_data["owner"] | 1011 owner = pub_key_data["owner"] |
917 pub_key_pem = pub_key_data["publicKeyPem"] | 1012 pub_key_pem = pub_key_data["publicKeyPem"] |
918 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) | 1013 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) |
967 """Get local key ID from actor ID""" | 1062 """Get local key ID from actor ID""" |
968 return f"{actor_id}#main-key" | 1063 return f"{actor_id}#main-key" |
969 | 1064 |
970 async def check_signature( | 1065 async def check_signature( |
971 self, | 1066 self, |
1067 requestor_actor_id: str, | |
972 signature: str, | 1068 signature: str, |
973 key_id: str, | 1069 key_id: str, |
974 headers: Dict[str, str] | 1070 headers: Dict[str, str] |
975 ) -> str: | 1071 ) -> str: |
976 """Verify that signature matches given headers | 1072 """Verify that signature matches given headers |
977 | 1073 |
978 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 | 1074 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 |
979 | 1075 |
1076 @param requestor_actor_id: ID of the actor doing the request. | |
980 @param signature: Base64 encoded signature | 1077 @param signature: Base64 encoded signature |
981 @param key_id: ID of the key used to sign the data | 1078 @param key_id: ID of the key used to sign the data |
982 @param headers: headers and their values, including pseudo-headers | 1079 @param headers: headers and their values, including pseudo-headers |
983 @return: id of the signing actor | 1080 @return: id of the signing actor |
984 | 1081 |
989 actor = key_id[5:] | 1086 actor = key_id[5:] |
990 actor_id = await self.get_ap_actor_id_from_account(actor) | 1087 actor_id = await self.get_ap_actor_id_from_account(actor) |
991 else: | 1088 else: |
992 actor_id = key_id.split("#", 1)[0] | 1089 actor_id = key_id.split("#", 1)[0] |
993 | 1090 |
994 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id) | 1091 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data( |
1092 requestor_actor_id, | |
1093 actor_id | |
1094 ) | |
995 if pub_key_id != key_id or pub_key_owner != actor_id: | 1095 if pub_key_id != key_id or pub_key_owner != actor_id: |
996 raise exceptions.EncryptionError("Public Key mismatch") | 1096 raise exceptions.EncryptionError("Public Key mismatch") |
997 | 1097 |
998 try: | 1098 try: |
999 pub_key.verify( | 1099 pub_key.verify( |
1064 @param node: (virtual) node corresponding where the item has been published | 1164 @param node: (virtual) node corresponding where the item has been published |
1065 @param subscribe_extra_nodes: if True, extra data nodes will be automatically | 1165 @param subscribe_extra_nodes: if True, extra data nodes will be automatically |
1066 subscribed, that is comment nodes if present and attachments nodes. | 1166 subscribed, that is comment nodes if present and attachments nodes. |
1067 """ | 1167 """ |
1068 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 1168 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
1069 inbox = await self.get_ap_inbox_from_id(actor_id) | 1169 requestor_actor_id = self.build_apurl( |
1170 TYPE_ACTOR, | |
1171 await self.get_ap_account_from_jid_and_node(service, node) | |
1172 ) | |
1173 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) | |
1070 for item in items: | 1174 for item in items: |
1071 if item.name == "item": | 1175 if item.name == "item": |
1072 cached_item = await self.host.memory.storage.search_pubsub_items({ | 1176 cached_item = await self.host.memory.storage.search_pubsub_items({ |
1073 "profiles": [self.client.profile], | 1177 "profiles": [self.client.profile], |
1074 "services": [service], | 1178 "services": [service], |
1144 url_actor, ap_item = await self.ap_delete_item( | 1248 url_actor, ap_item = await self.ap_delete_item( |
1145 client.jid, node, item["id"] | 1249 client.jid, node, item["id"] |
1146 ) | 1250 ) |
1147 else: | 1251 else: |
1148 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") | 1252 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") |
1149 await self.sign_and_post(inbox, url_actor, ap_item) | 1253 await self.ap_post(inbox, url_actor, ap_item) |
1150 | 1254 |
1151 async def convert_and_post_attachments( | 1255 async def convert_and_post_attachments( |
1152 self, | 1256 self, |
1153 client: SatXMPPEntity, | 1257 client: SatXMPPEntity, |
1154 ap_account: str, | 1258 ap_account: str, |
1177 "we should get exactly one attachment item for an entity, got " | 1281 "we should get exactly one attachment item for an entity, got " |
1178 f"{len(items)})" | 1282 f"{len(items)})" |
1179 ) | 1283 ) |
1180 | 1284 |
1181 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 1285 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
1182 inbox = await self.get_ap_inbox_from_id(actor_id) | 1286 requestor_actor_id = self.build_apurl( |
1287 TYPE_ACTOR, | |
1288 await self.get_ap_account_from_jid_and_node(service, node) | |
1289 ) | |
1290 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) | |
1183 | 1291 |
1184 item_elt = items[0] | 1292 item_elt = items[0] |
1185 item_id = item_elt["id"] | 1293 item_id = item_elt["id"] |
1186 | 1294 |
1187 if publisher is None: | 1295 if publisher is None: |
1201 | 1309 |
1202 if publisher is not None: | 1310 if publisher is not None: |
1203 item_elt["publisher"] = publisher.userhost() | 1311 item_elt["publisher"] = publisher.userhost() |
1204 | 1312 |
1205 item_service, item_node, item_id = self._pa.attachment_node_2_item(node) | 1313 item_service, item_node, item_id = self._pa.attachment_node_2_item(node) |
1206 item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node) | 1314 item_account = await self.get_ap_account_from_jid_and_node( |
1315 item_service, item_node | |
1316 ) | |
1207 if self.is_virtual_jid(item_service): | 1317 if self.is_virtual_jid(item_service): |
1208 # it's a virtual JID mapping to an external AP actor, we can use the | 1318 # it's a virtual JID mapping to an external AP actor, we can use the |
1209 # item_id directly | 1319 # item_id directly |
1210 item_url = item_id | 1320 item_url = item_id |
1211 if not item_url.startswith("https:"): | 1321 if not item_url.startswith("https:"): |
1225 }) | 1335 }) |
1226 if not old_attachment_pubsub_items: | 1336 if not old_attachment_pubsub_items: |
1227 old_attachment = {} | 1337 old_attachment = {} |
1228 else: | 1338 else: |
1229 old_attachment_items = [i.data for i in old_attachment_pubsub_items] | 1339 old_attachment_items = [i.data for i in old_attachment_pubsub_items] |
1230 old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items) | 1340 old_attachments = self._pa.items_2_attachment_data( |
1341 client, old_attachment_items | |
1342 ) | |
1231 try: | 1343 try: |
1232 old_attachment = old_attachments[0] | 1344 old_attachment = old_attachments[0] |
1233 except IndexError: | 1345 except IndexError: |
1234 # no known element was present in attachments | 1346 # no known element was present in attachments |
1235 old_attachment = {} | 1347 old_attachment = {} |
1252 activity = self.create_activity( | 1364 activity = self.create_activity( |
1253 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | 1365 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id |
1254 ) | 1366 ) |
1255 activity["to"] = [ap_account] | 1367 activity["to"] = [ap_account] |
1256 activity["cc"] = [NS_AP_PUBLIC] | 1368 activity["cc"] = [NS_AP_PUBLIC] |
1257 await self.sign_and_post(inbox, publisher_actor_id, activity) | 1369 await self.ap_post(inbox, publisher_actor_id, activity) |
1258 else: | 1370 else: |
1259 if "noticed" in old_attachment: | 1371 if "noticed" in old_attachment: |
1260 # "noticed" attachment has been removed, we undo the "Like" activity | 1372 # "noticed" attachment has been removed, we undo the "Like" activity |
1261 activity_id = self.build_apurl("like", item_account, item_id) | 1373 activity_id = self.build_apurl("like", item_account, item_id) |
1262 activity = self.create_activity( | 1374 activity = self.create_activity( |
1263 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | 1375 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id |
1264 ) | 1376 ) |
1265 activity["to"] = [ap_account] | 1377 activity["to"] = [ap_account] |
1266 activity["cc"] = [NS_AP_PUBLIC] | 1378 activity["cc"] = [NS_AP_PUBLIC] |
1267 undo = self.create_activity("Undo", publisher_actor_id, activity) | 1379 undo = self.create_activity("Undo", publisher_actor_id, activity) |
1268 await self.sign_and_post(inbox, publisher_actor_id, undo) | 1380 await self.ap_post(inbox, publisher_actor_id, undo) |
1269 | 1381 |
1270 # reactions | 1382 # reactions |
1271 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) | 1383 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) |
1272 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) | 1384 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) |
1273 reactions_remove = old_reactions - new_reactions | 1385 reactions_remove = old_reactions - new_reactions |
1288 activy = self.create_activity( | 1400 activy = self.create_activity( |
1289 "Undo", publisher_actor_id, reaction_activity | 1401 "Undo", publisher_actor_id, reaction_activity |
1290 ) | 1402 ) |
1291 else: | 1403 else: |
1292 activy = reaction_activity | 1404 activy = reaction_activity |
1293 await self.sign_and_post(inbox, publisher_actor_id, activy) | 1405 await self.ap_post(inbox, publisher_actor_id, activy) |
1294 | 1406 |
1295 # RSVP | 1407 # RSVP |
1296 if "rsvp" in attachments: | 1408 if "rsvp" in attachments: |
1297 attending = attachments["rsvp"].get("attending", "no") | 1409 attending = attachments["rsvp"].get("attending", "no") |
1298 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | 1410 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") |
1302 activity = self.create_activity( | 1414 activity = self.create_activity( |
1303 activity_type, publisher_actor_id, item_url, activity_id=activity_id | 1415 activity_type, publisher_actor_id, item_url, activity_id=activity_id |
1304 ) | 1416 ) |
1305 activity["to"] = [ap_account] | 1417 activity["to"] = [ap_account] |
1306 activity["cc"] = [NS_AP_PUBLIC] | 1418 activity["cc"] = [NS_AP_PUBLIC] |
1307 await self.sign_and_post(inbox, publisher_actor_id, activity) | 1419 await self.ap_post(inbox, publisher_actor_id, activity) |
1308 else: | 1420 else: |
1309 if "rsvp" in old_attachment: | 1421 if "rsvp" in old_attachment: |
1310 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | 1422 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") |
1311 if old_attending == "yes": | 1423 if old_attending == "yes": |
1312 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) | 1424 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) |
1313 activity = self.create_activity( | 1425 activity = self.create_activity( |
1314 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id | 1426 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id |
1315 ) | 1427 ) |
1316 activity["to"] = [ap_account] | 1428 activity["to"] = [ap_account] |
1317 activity["cc"] = [NS_AP_PUBLIC] | 1429 activity["cc"] = [NS_AP_PUBLIC] |
1318 await self.sign_and_post(inbox, publisher_actor_id, activity) | 1430 await self.ap_post(inbox, publisher_actor_id, activity) |
1319 | 1431 |
1320 if service.user and self.is_virtual_jid(service): | 1432 if service.user and self.is_virtual_jid(service): |
1321 # the item is on a virtual service, we need to store it in cache | 1433 # the item is on a virtual service, we need to store it in cache |
1322 log.debug("storing attachments item in cache") | 1434 log.debug("storing attachments item in cache") |
1323 cached_node = await self.host.memory.storage.get_pubsub_node( | 1435 cached_node = await self.host.memory.storage.get_pubsub_node( |
1327 self.client, | 1439 self.client, |
1328 cached_node, | 1440 cached_node, |
1329 [item_elt], | 1441 [item_elt], |
1330 [attachments] | 1442 [attachments] |
1331 ) | 1443 ) |
1332 | |
1333 async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse: | |
1334 """Sign a documentent and post it to AP server | |
1335 | |
1336 @param url: AP server endpoint | |
1337 @param actor_id: originating actor ID (URL) | |
1338 @param doc: document to send | |
1339 """ | |
1340 if self.verbose: | |
1341 __, actor_args = self.parse_apurl(actor_id) | |
1342 actor_account = actor_args[0] | |
1343 to_log = [ | |
1344 "", | |
1345 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" | |
1346 ] | |
1347 | |
1348 p_url = parse.urlparse(url) | |
1349 body = json.dumps(doc).encode() | |
1350 digest_algo, digest_hash = self.get_digest(body) | |
1351 digest = f"{digest_algo}={digest_hash}" | |
1352 | |
1353 headers = { | |
1354 "(request-target)": f"post {p_url.path}", | |
1355 "Host": p_url.hostname, | |
1356 "Date": http.datetimeToString().decode(), | |
1357 "Digest": digest | |
1358 } | |
1359 headers["Content-Type"] = ( | |
1360 MEDIA_TYPE_AP | |
1361 ) | |
1362 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers) | |
1363 | |
1364 if self.verbose: | |
1365 if self.verbose>=3: | |
1366 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items()) | |
1367 to_log.append(f" headers:\n{h_to_log}") | |
1368 to_log.append("---") | |
1369 log.info("\n".join(to_log)) | |
1370 | |
1371 resp = await treq.post( | |
1372 url, | |
1373 body, | |
1374 headers=headers, | |
1375 ) | |
1376 if resp.code >= 300: | |
1377 text = await resp.text() | |
1378 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") | |
1379 elif self.verbose: | |
1380 log.info(f"==> response code: {resp.code}") | |
1381 return resp | |
1382 | 1444 |
1383 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): | 1445 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): |
1384 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | 1446 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore |
1385 service = jid.JID(service_s) | 1447 service = jid.JID(service_s) |
1386 client = self.host.get_client(profile) | 1448 client = self.host.get_client(profile) |
1422 raise ValueError( | 1484 raise ValueError( |
1423 f"No ActivityPub link found for {account!r}" | 1485 f"No ActivityPub link found for {account!r}" |
1424 ) | 1486 ) |
1425 return href | 1487 return href |
1426 | 1488 |
1427 async def get_ap_actor_data_from_account(self, account: str) -> dict: | 1489 async def get_ap_actor_data_from_account( |
1490 self, | |
1491 requestor_actor_id: str, | |
1492 account: str | |
1493 ) -> dict: | |
1428 """Retrieve ActivityPub Actor data | 1494 """Retrieve ActivityPub Actor data |
1429 | 1495 |
1430 @param account: ActivityPub Actor identifier | 1496 @param account: ActivityPub Actor identifier |
1431 """ | 1497 """ |
1432 href = await self.get_ap_actor_id_from_account(account) | 1498 href = await self.get_ap_actor_id_from_account(account) |
1433 return await self.ap_get(href) | 1499 return await self.ap_get(href, requestor_actor_id) |
1434 | 1500 |
1435 async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str: | 1501 async def get_ap_inbox_from_id( |
1502 self, | |
1503 requestor_actor_id: str, | |
1504 actor_id: str, | |
1505 use_shared: bool = True | |
1506 ) -> str: | |
1436 """Retrieve inbox of an actor_id | 1507 """Retrieve inbox of an actor_id |
1437 | 1508 |
1509 @param requestor_actor_id: ID of the actor doing the request. | |
1510 @param actor_id: ID of the actor from whom Inbox must be retrieved. | |
1438 @param use_shared: if True, and a shared inbox exists, it will be used instead of | 1511 @param use_shared: if True, and a shared inbox exists, it will be used instead of |
1439 the user inbox | 1512 the user inbox |
1440 """ | 1513 """ |
1441 data = await self.get_actor_data(actor_id) | 1514 data = await self.get_actor_data(requestor_actor_id, actor_id) |
1442 if use_shared: | 1515 if use_shared: |
1443 try: | 1516 try: |
1444 return data["endpoints"]["sharedInbox"] | 1517 return data["endpoints"]["sharedInbox"] |
1445 except KeyError: | 1518 except KeyError: |
1446 pass | 1519 pass |
1447 return data["inbox"] | 1520 return data["inbox"] |
1448 | 1521 |
1449 @async_lru(maxsize=LRU_MAX_SIZE) | 1522 @async_lru(maxsize=LRU_MAX_SIZE) |
1450 async def get_ap_account_from_id(self, actor_id: str) -> str: | 1523 async def get_ap_account_from_id(self, requestor_actor_id: str, actor_id: str) -> str: |
1451 """Retrieve AP account from the ID URL | 1524 """Retrieve AP account from the ID URL |
1452 | 1525 |
1453 Works with external or local actor IDs. | 1526 Works with external or local actor IDs. |
1527 @param requestor_actor_id: ID of the actor doing the request. | |
1454 @param actor_id: AP ID of the actor (URL to the actor data) | 1528 @param actor_id: AP ID of the actor (URL to the actor data) |
1455 @return: AP handle | 1529 @return: AP handle |
1456 """ | 1530 """ |
1457 if self.is_local_url(actor_id): | 1531 if self.is_local_url(actor_id): |
1458 url_type, url_args = self.parse_apurl(actor_id) | 1532 url_type, url_args = self.parse_apurl(actor_id) |
1472 f"{account!r} is not a valid local account (from {actor_id})" | 1546 f"{account!r} is not a valid local account (from {actor_id})" |
1473 ) | 1547 ) |
1474 return account | 1548 return account |
1475 | 1549 |
1476 url_parsed = parse.urlparse(actor_id) | 1550 url_parsed = parse.urlparse(actor_id) |
1477 actor_data = await self.get_actor_data(actor_id) | 1551 actor_data = await self.get_actor_data(requestor_actor_id, actor_id) |
1478 username = actor_data.get("preferredUsername") | 1552 username = actor_data.get("preferredUsername") |
1479 if not username: | 1553 if not username: |
1480 raise exceptions.DataError( | 1554 raise exceptions.DataError( |
1481 'No "preferredUsername" field found, can\'t retrieve actor account' | 1555 'No "preferredUsername" field found, can\'t retrieve actor account' |
1482 ) | 1556 ) |
1494 raise exceptions.DataError(msg) | 1568 raise exceptions.DataError(msg) |
1495 return account | 1569 return account |
1496 | 1570 |
1497 async def get_ap_items( | 1571 async def get_ap_items( |
1498 self, | 1572 self, |
1573 actor_id: str, | |
1499 collection: dict, | 1574 collection: dict, |
1500 max_items: Optional[int] = None, | 1575 max_items: Optional[int] = None, |
1501 chronological_pagination: bool = True, | 1576 chronological_pagination: bool = True, |
1502 after_id: Optional[str] = None, | 1577 after_id: Optional[str] = None, |
1503 start_index: Optional[int] = None, | 1578 start_index: Optional[int] = None, |
1504 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, | 1579 parser: Optional[Callable[[str, dict], Awaitable[domish.Element]]] = None, |
1505 only_ids: bool = False, | 1580 only_ids: bool = False, |
1506 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | 1581 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: |
1507 """Retrieve AP items and convert them to XMPP items | 1582 """Retrieve AP items and convert them to XMPP items |
1508 | 1583 |
1509 @param account: AP account handle to get items from | 1584 @param actor_id: ID of the actor doing the request. |
1585 @param collection: AP collection data. | |
1586 Items will be retrieved from this collection. | |
1510 @param max_items: maximum number of items to retrieve | 1587 @param max_items: maximum number of items to retrieve |
1511 retrieve all items by default | 1588 retrieve all items by default |
1512 @param chronological_pagination: get pages in chronological order | 1589 @param chronological_pagination: get pages in chronological order |
1513 AP use reversed chronological order for pagination, "first" page returns more | 1590 AP use reversed chronological order for pagination, "first" page returns more |
1514 recent items. If "chronological_pagination" is True, "last" AP page will be | 1591 recent items. If "chronological_pagination" is True, "last" AP page will be |
1567 previous_index = start_index - 1 | 1644 previous_index = start_index - 1 |
1568 retrieved_items = 0 | 1645 retrieved_items = 0 |
1569 current_page = collection["last"] | 1646 current_page = collection["last"] |
1570 while retrieved_items < count: | 1647 while retrieved_items < count: |
1571 page_data, items = await self.parse_ap_page( | 1648 page_data, items = await self.parse_ap_page( |
1572 current_page, parser, only_ids | 1649 actor_id, current_page, parser, only_ids |
1573 ) | 1650 ) |
1574 if not items: | 1651 if not items: |
1575 log.warning(f"found an empty AP page at {current_page}") | 1652 log.warning(f"found an empty AP page at {current_page}") |
1576 return [], rsm_resp | 1653 return [], rsm_resp |
1577 page_start_idx = retrieved_items | 1654 page_start_idx = retrieved_items |
1602 page_items = [] | 1679 page_items = [] |
1603 retrieved_items = 0 | 1680 retrieved_items = 0 |
1604 found_after_id = False | 1681 found_after_id = False |
1605 | 1682 |
1606 while retrieved_items < count: | 1683 while retrieved_items < count: |
1607 __, page_items = await self.parse_ap_page(page, parser, only_ids) | 1684 __, page_items = await self.parse_ap_page(actor_id, page, parser, only_ids) |
1608 if not page_items: | 1685 if not page_items: |
1609 break | 1686 break |
1610 retrieved_items += len(page_items) | 1687 retrieved_items += len(page_items) |
1611 if after_id is not None and not found_after_id: | 1688 if after_id is not None and not found_after_id: |
1612 # if we have an after_id, we ignore all items until the requested one is | 1689 # if we have an after_id, we ignore all items until the requested one is |
1658 "last": items[-1]["id"] | 1735 "last": items[-1]["id"] |
1659 }) | 1736 }) |
1660 | 1737 |
1661 return items, rsm.RSMResponse(**rsm_resp) | 1738 return items, rsm.RSMResponse(**rsm_resp) |
1662 | 1739 |
1663 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: | 1740 async def ap_item_2_mb_data_and_elt(self, requestor_actor_id: str, ap_item: dict) -> tuple[dict, domish.Element]: |
1664 """Convert AP item to parsed microblog data and corresponding item element""" | 1741 """Convert AP item to parsed microblog data and corresponding item element |
1665 mb_data = await self.ap_item_2_mb_data(ap_item) | 1742 |
1743 @param requestor_actor_id: ID of the actor requesting the conversion. | |
1744 @param ap_item: AP item to convert. | |
1745 @return: microblog and correspondign <item> element. | |
1746 """ | |
1747 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, ap_item) | |
1666 item_elt = await self._m.mb_data_2_entry_elt( | 1748 item_elt = await self._m.mb_data_2_entry_elt( |
1667 self.client, mb_data, mb_data["id"], None, self._m.namespace | 1749 self.client, mb_data, mb_data["id"], None, self._m.namespace |
1668 ) | 1750 ) |
1669 if "repeated" in mb_data["extra"]: | 1751 if "repeated" in mb_data["extra"]: |
1670 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] | 1752 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] |
1671 else: | 1753 else: |
1672 item_elt["publisher"] = mb_data["author_jid"] | 1754 item_elt["publisher"] = mb_data["author_jid"] |
1673 return mb_data, item_elt | 1755 return mb_data, item_elt |
1674 | 1756 |
1675 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: | 1757 async def ap_item_2_mb_elt(self, requestor_actor_id: str, ap_item: dict) -> domish.Element: |
1676 """Convert AP item to XMPP item element""" | 1758 """Convert AP item to XMPP item element |
1677 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) | 1759 |
1760 @param requestor_actor_id: ID of the actor requesting the conversion. | |
1761 @param ap_item: AP item to convert. | |
1762 @return: <item> element | |
1763 """ | |
1764 __, item_elt = await self.ap_item_2_mb_data_and_elt(requestor_actor_id, ap_item) | |
1678 return item_elt | 1765 return item_elt |
1679 | 1766 |
1680 async def parse_ap_page( | 1767 async def parse_ap_page( |
1681 self, | 1768 self, |
1769 requestor_actor_id: str, | |
1682 page: Union[str, dict], | 1770 page: Union[str, dict], |
1683 parser: Callable[[dict], Awaitable[domish.Element]], | 1771 parser: Callable[[str, dict], Awaitable[domish.Element]], |
1684 only_ids: bool = False | 1772 only_ids: bool = False |
1685 ) -> Tuple[dict, List[domish.Element]]: | 1773 ) -> Tuple[dict, List[domish.Element]]: |
1686 """Convert AP objects from an AP page to XMPP items | 1774 """Convert AP objects from an AP page to XMPP items |
1687 | 1775 |
1776 @param requestor_actor_id: ID of the actor doing the request. | |
1688 @param page: Can be either url linking and AP page, or the page data directly | 1777 @param page: Can be either url linking and AP page, or the page data directly |
1689 @param parser: method to use to parse AP items and get XMPP item elements | 1778 @param parser: method to use to parse AP items and get XMPP item elements |
1690 @param only_ids: if True, only retrieve items IDs | 1779 @param only_ids: if True, only retrieve items IDs |
1691 @return: page data, pubsub items | 1780 @return: page data, pubsub items |
1692 """ | 1781 """ |
1693 page_data = await self.ap_get_object(page) | 1782 page_data = await self.ap_get_object(requestor_actor_id, page) |
1694 if page_data is None: | 1783 if page_data is None: |
1695 log.warning('No data found in collection') | 1784 log.warning('No data found in collection') |
1696 return {}, [] | 1785 return {}, [] |
1697 ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids) | 1786 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "orderedItems", only_ids=only_ids) |
1698 if ap_items is None: | 1787 if ap_items is None: |
1699 ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids) | 1788 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "items", only_ids=only_ids) |
1700 if not ap_items: | 1789 if not ap_items: |
1701 log.warning(f'No item field found in collection: {page_data!r}') | 1790 log.warning(f'No item field found in collection: {page_data!r}') |
1702 return page_data, [] | 1791 return page_data, [] |
1703 else: | 1792 else: |
1704 log.warning( | 1793 log.warning( |
1707 items = [] | 1796 items = [] |
1708 # AP Collections are in antichronological order, but we expect chronological in | 1797 # AP Collections are in antichronological order, but we expect chronological in |
1709 # Pubsub, thus we reverse it | 1798 # Pubsub, thus we reverse it |
1710 for ap_item in reversed(ap_items): | 1799 for ap_item in reversed(ap_items): |
1711 try: | 1800 try: |
1712 items.append(await parser(ap_item)) | 1801 items.append(await parser(requestor_actor_id, ap_item)) |
1713 except (exceptions.DataError, NotImplementedError, error.StanzaError): | 1802 except (exceptions.DataError, NotImplementedError, error.StanzaError): |
1714 continue | 1803 continue |
1715 | 1804 |
1716 return page_data, items | 1805 return page_data, items |
1717 | 1806 |
1718 async def get_comments_nodes( | 1807 async def get_comments_nodes( |
1719 self, | 1808 self, |
1809 requestor_actor_id: str, | |
1720 item_id: str, | 1810 item_id: str, |
1721 parent_id: Optional[str] | 1811 parent_id: Optional[str] |
1722 ) -> Tuple[Optional[str], Optional[str]]: | 1812 ) -> Tuple[Optional[str], Optional[str]]: |
1723 """Get node where this item is and node to use for comments | 1813 """Get node where this item is and node to use for comments |
1724 | 1814 |
1725 if config option "comments_max_depth" is set, a common node will be used below the | 1815 if config option "comments_max_depth" is set, a common node will be used below the |
1726 given depth | 1816 given depth |
1817 @param requestor_actor_id: ID of the actor doing the request. | |
1727 @param item_id: ID of the reference item | 1818 @param item_id: ID of the reference item |
1728 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") | 1819 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") |
1729 @return: a tuple with parent_node_id, comments_node_id: | 1820 @return: a tuple with parent_node_id, comments_node_id: |
1730 - parent_node_id is the ID of the node where reference item must be. None is | 1821 - parent_node_id is the ID of the node where reference item must be. None is |
1731 returned when the root node (i.e. not a comments node) must be used. | 1822 returned when the root node (i.e. not a comments node) must be used. |
1739 self._m.get_comments_node(item_id) | 1830 self._m.get_comments_node(item_id) |
1740 ) | 1831 ) |
1741 parent_url = parent_id | 1832 parent_url = parent_id |
1742 parents = [] | 1833 parents = [] |
1743 for __ in range(COMMENTS_MAX_PARENTS): | 1834 for __ in range(COMMENTS_MAX_PARENTS): |
1744 parent_item = await self.ap_get(parent_url) | 1835 parent_item = await self.ap_get(parent_url, requestor_actor_id) |
1745 parents.insert(0, parent_item) | 1836 parents.insert(0, parent_item) |
1746 parent_url = parent_item.get("inReplyTo") | 1837 parent_url = parent_item.get("inReplyTo") |
1747 if parent_url is None: | 1838 if parent_url is None: |
1748 break | 1839 break |
1749 parent_limit = self.comments_max_depth-1 | 1840 parent_limit = self.comments_max_depth-1 |
1757 return ( | 1848 return ( |
1758 self._m.get_comments_node(last_level_item["id"]), | 1849 self._m.get_comments_node(last_level_item["id"]), |
1759 None | 1850 None |
1760 ) | 1851 ) |
1761 | 1852 |
1762 async def ap_item_2_mb_data(self, ap_item: dict) -> dict: | 1853 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict: |
1763 """Convert AP activity or object to microblog data | 1854 """Convert AP activity or object to microblog data |
1764 | 1855 |
1856 @param actor_id: ID of the actor doing the request. | |
1765 @param ap_item: ActivityPub item to convert | 1857 @param ap_item: ActivityPub item to convert |
1766 Can be either an activity of an object | 1858 Can be either an activity of an object |
1767 @return: AP Item's Object and microblog data | 1859 @return: AP Item's Object and microblog data |
1768 @raise exceptions.DataError: something is invalid in the AP item | 1860 @raise exceptions.DataError: something is invalid in the AP item |
1769 @raise NotImplementedError: some AP data is not handled yet | 1861 @raise NotImplementedError: some AP data is not handled yet |
1770 @raise error.StanzaError: error while contacting the AP server | 1862 @raise error.StanzaError: error while contacting the AP server |
1771 """ | 1863 """ |
1772 is_activity = self.is_activity(ap_item) | 1864 is_activity = self.is_activity(ap_item) |
1773 if is_activity: | 1865 if is_activity: |
1774 ap_object = await self.ap_get_object(ap_item, "object") | 1866 ap_object = await self.ap_get_object(requestor_actor_id, ap_item, "object") |
1775 if not ap_object: | 1867 if not ap_object: |
1776 log.warning(f'No "object" found in AP item {ap_item!r}') | 1868 log.warning(f'No "object" found in AP item {ap_item!r}') |
1777 raise exceptions.DataError | 1869 raise exceptions.DataError |
1778 else: | 1870 else: |
1779 ap_object = ap_item | 1871 ap_object = ap_item |
1829 attachment[key] = value | 1921 attachment[key] = value |
1830 attachments.append(attachment) | 1922 attachments.append(attachment) |
1831 | 1923 |
1832 # author | 1924 # author |
1833 if is_activity: | 1925 if is_activity: |
1834 authors = await self.ap_get_actors(ap_item, "actor") | 1926 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor") |
1835 else: | 1927 else: |
1836 authors = await self.ap_get_actors(ap_object, "attributedTo") | 1928 authors = await self.ap_get_actors(requestor_actor_id, ap_object, "attributedTo") |
1837 if len(authors) > 1: | 1929 if len(authors) > 1: |
1838 # we only keep first item as author | 1930 # we only keep first item as author |
1839 # TODO: handle multiple actors | 1931 # TODO: handle multiple actors |
1840 log.warning("multiple actors are not managed") | 1932 log.warning("multiple actors are not managed") |
1841 | 1933 |
1862 if "_repeated" in ap_item: | 1954 if "_repeated" in ap_item: |
1863 mb_data["extra"]["repeated"] = ap_item["_repeated"] | 1955 mb_data["extra"]["repeated"] = ap_item["_repeated"] |
1864 | 1956 |
1865 # comments | 1957 # comments |
1866 in_reply_to = ap_object.get("inReplyTo") | 1958 in_reply_to = ap_object.get("inReplyTo") |
1867 __, comments_node = await self.get_comments_nodes(item_id, in_reply_to) | 1959 __, comments_node = await self.get_comments_nodes( |
1960 requestor_actor_id, item_id, in_reply_to | |
1961 ) | |
1868 if comments_node is not None: | 1962 if comments_node is not None: |
1869 comments_data = { | 1963 comments_data = { |
1870 "service": author_jid, | 1964 "service": author_jid, |
1871 "node": comments_node, | 1965 "node": comments_node, |
1872 "uri": uri.build_xmpp_uri( | 1966 "uri": uri.build_xmpp_uri( |
2129 target_ap_account = await self.get_ap_account_from_jid_and_node( | 2223 target_ap_account = await self.get_ap_account_from_jid_and_node( |
2130 service, node | 2224 service, node |
2131 ) | 2225 ) |
2132 if self.is_virtual_jid(service): | 2226 if self.is_virtual_jid(service): |
2133 # service is a proxy JID for AP account | 2227 # service is a proxy JID for AP account |
2134 actor_data = await self.get_ap_actor_data_from_account(target_ap_account) | 2228 actor_data = await self.get_ap_actor_data_from_account( |
2229 url_actor, | |
2230 target_ap_account | |
2231 ) | |
2135 followers = actor_data.get("followers") | 2232 followers = actor_data.get("followers") |
2136 else: | 2233 else: |
2137 # service is a real XMPP entity | 2234 # service is a real XMPP entity |
2138 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) | 2235 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) |
2139 if followers: | 2236 if followers: |
2190 except KeyError: | 2287 except KeyError: |
2191 raise exceptions.DataError("Can't get ActivityPub actor inbox") | 2288 raise exceptions.DataError("Can't get ActivityPub actor inbox") |
2192 | 2289 |
2193 item_data = await self.mb_data_2_ap_item(client, mess_data) | 2290 item_data = await self.mb_data_2_ap_item(client, mess_data) |
2194 url_actor = item_data["actor"] | 2291 url_actor = item_data["actor"] |
2195 resp = await self.sign_and_post(inbox_url, url_actor, item_data) | 2292 await self.ap_post(inbox_url, url_actor, item_data) |
2196 | 2293 |
2197 async def ap_delete_item( | 2294 async def ap_delete_item( |
2198 self, | 2295 self, |
2199 jid_: jid.JID, | 2296 jid_: jid.JID, |
2200 node: Optional[str], | 2297 node: Optional[str], |
2264 """add the gateway workflow on post treatment""" | 2361 """add the gateway workflow on post treatment""" |
2265 if self.client is None: | 2362 if self.client is None: |
2266 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") | 2363 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") |
2267 return True | 2364 return True |
2268 post_treat.addCallback( | 2365 post_treat.addCallback( |
2269 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) | 2366 lambda mess_data: defer.ensureDeferred(self.on_message(client, mess_data)) |
2270 ) | 2367 ) |
2271 return True | 2368 return True |
2272 | 2369 |
2273 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: | 2370 async def on_message(self, client: SatXMPPEntity, mess_data: dict) -> dict: |
2274 """Called once message has been parsed | 2371 """Called once message has been parsed |
2275 | 2372 |
2276 this method handle the conversion to AP items and posting | 2373 this method handle the conversion to AP items and posting |
2277 """ | 2374 """ |
2278 if client != self.client: | 2375 if client != self.client: |
2286 if not mess_data["to"].user: | 2383 if not mess_data["to"].user: |
2287 log.warning( | 2384 log.warning( |
2288 f"ignoring message addressed to gateway itself: {mess_data}" | 2385 f"ignoring message addressed to gateway itself: {mess_data}" |
2289 ) | 2386 ) |
2290 return mess_data | 2387 return mess_data |
2388 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost()) | |
2291 | 2389 |
2292 actor_account = self._e.unescape(mess_data["to"].user) | 2390 actor_account = self._e.unescape(mess_data["to"].user) |
2293 try: | 2391 try: |
2294 actor_id = await self.get_ap_actor_id_from_account(actor_account) | 2392 actor_id = await self.get_ap_actor_id_from_account(actor_account) |
2295 except Exception as e: | 2393 except Exception as e: |
2296 log.warning( | 2394 log.warning( |
2297 f"Can't retrieve data on actor {actor_account}: {e}" | 2395 f"Can't retrieve data on actor {actor_account}: {e}" |
2298 ) | 2396 ) |
2299 # TODO: send an error <message> | 2397 # TODO: send an error <message> |
2300 return mess_data | 2398 return mess_data |
2301 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | 2399 inbox = await self.get_ap_inbox_from_id( |
2400 requestor_actor_id, actor_id, use_shared=False | |
2401 ) | |
2302 | 2402 |
2303 try: | 2403 try: |
2304 language, message = next(iter(mess_data["message"].items())) | 2404 language, message = next(iter(mess_data["message"].items())) |
2305 except (KeyError, StopIteration): | 2405 except (KeyError, StopIteration): |
2306 log.warning(f"ignoring empty message: {mess_data}") | 2406 log.warning(f"ignoring empty message: {mess_data}") |
2332 "href": actor_id, | 2432 "href": actor_id, |
2333 "name": f"@{actor_account}", | 2433 "name": f"@{actor_account}", |
2334 }) | 2434 }) |
2335 | 2435 |
2336 try: | 2436 try: |
2337 await self.sign_and_post(inbox, ap_item["actor"], ap_item) | 2437 await self.ap_post(inbox, ap_item["actor"], ap_item) |
2338 except Exception as e: | 2438 except Exception as e: |
2339 # TODO: send an error <message> | 2439 # TODO: send an error <message> |
2340 log.warning( | 2440 log.warning( |
2341 f"Can't send message to {inbox}: {e}" | 2441 f"Can't send message to {inbox}: {e}" |
2342 ) | 2442 ) |
2355 if not self.is_local(from_jid): | 2455 if not self.is_local(from_jid): |
2356 log.debug( | 2456 log.debug( |
2357 f"ignoring retract request from non local jid {from_jid}" | 2457 f"ignoring retract request from non local jid {from_jid}" |
2358 ) | 2458 ) |
2359 return False | 2459 return False |
2460 requestor_actor_id = self.build_apurl( | |
2461 TYPE_ACTOR, | |
2462 from_jid.userhost() | |
2463 ) | |
2360 to_jid = jid.JID(message_elt["to"]) | 2464 to_jid = jid.JID(message_elt["to"]) |
2361 if (to_jid.host != self.client.jid.full() or not to_jid.user): | 2465 if (to_jid.host != self.client.jid.full() or not to_jid.user): |
2362 # to_jid should be a virtual JID from this gateway | 2466 # to_jid should be a virtual JID from this gateway |
2363 raise exceptions.InternalError( | 2467 raise exceptions.InternalError( |
2364 f"Invalid destinee's JID: {to_jid.full()}" | 2468 f"Invalid destinee's JID: {to_jid.full()}" |
2365 ) | 2469 ) |
2366 ap_account = self._e.unescape(to_jid.user) | 2470 ap_account = self._e.unescape(to_jid.user) |
2367 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 2471 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
2368 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | 2472 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) |
2369 url_actor, ap_item = await self.ap_delete_item( | 2473 url_actor, ap_item = await self.ap_delete_item( |
2370 from_jid.userhostJID(), None, retract_elt["id"], public=False | 2474 from_jid.userhostJID(), None, retract_elt["id"], public=False |
2371 ) | 2475 ) |
2372 resp = await self.sign_and_post(inbox, url_actor, ap_item) | 2476 resp = await self.ap_post(inbox, url_actor, ap_item) |
2373 return False | 2477 return False |
2374 | 2478 |
2375 async def _on_reference_received( | 2479 async def _on_reference_received( |
2376 self, | 2480 self, |
2377 client: SatXMPPEntity, | 2481 client: SatXMPPEntity, |
2453 "type": TYPE_MENTION, | 2557 "type": TYPE_MENTION, |
2454 "href": actor_id, | 2558 "href": actor_id, |
2455 "name": ap_account, | 2559 "name": ap_account, |
2456 }) | 2560 }) |
2457 | 2561 |
2458 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | 2562 requestor_actor_id = ap_item["actor"] |
2459 | 2563 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) |
2460 resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item) | 2564 |
2565 await self.ap_post(inbox, requestor_actor_id, ap_item) | |
2461 | 2566 |
2462 return False | 2567 return False |
2463 | 2568 |
2464 async def new_reply_to_xmpp_item( | 2569 async def new_reply_to_xmpp_item( |
2465 self, | 2570 self, |
2510 # we don't have a comment node set for this item | 2615 # we don't have a comment node set for this item |
2511 from libervia.backend.tools.xml_tools import pp_elt | 2616 from libervia.backend.tools.xml_tools import pp_elt |
2512 log.info(f"{pp_elt(parent_item_elt.toXml())}") | 2617 log.info(f"{pp_elt(parent_item_elt.toXml())}") |
2513 raise NotImplementedError() | 2618 raise NotImplementedError() |
2514 else: | 2619 else: |
2515 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) | 2620 requestor_actor_id = self.build_apurl( |
2621 TYPE_ACTOR, | |
2622 await self.get_ap_account_from_jid_and_node(comment_service, comment_node) | |
2623 ) | |
2624 __, item_elt = await self.ap_item_2_mb_data_and_elt( | |
2625 requestor_actor_id, | |
2626 ap_item | |
2627 ) | |
2516 await self._p.publish(client, comment_service, comment_node, [item_elt]) | 2628 await self._p.publish(client, comment_service, comment_node, [item_elt]) |
2517 await self.notify_mentions( | 2629 await self.notify_mentions( |
2518 targets, mentions, comment_service, comment_node, item_elt["id"] | 2630 targets, mentions, comment_service, comment_node, item_elt["id"] |
2519 ) | 2631 ) |
2520 | 2632 |
2546 continue | 2658 continue |
2547 if not self.is_local_url(value): | 2659 if not self.is_local_url(value): |
2548 continue | 2660 continue |
2549 target_type = self.parse_apurl(value)[0] | 2661 target_type = self.parse_apurl(value)[0] |
2550 if target_type != TYPE_ACTOR: | 2662 if target_type != TYPE_ACTOR: |
2551 log.debug(f"ignoring non actor type as a target: {href}") | 2663 log.debug(f"ignoring non actor type as a target: {value}") |
2552 else: | 2664 else: |
2553 targets.setdefault(target_type, set()).add(value) | 2665 targets.setdefault(target_type, set()).add(value) |
2554 | 2666 |
2555 mentions = [] | 2667 mentions = [] |
2556 tags = item.get("tag") | 2668 tags = item.get("tag") |
2598 else: | 2710 else: |
2599 await self.handle_pubsub_ap_item( | 2711 await self.handle_pubsub_ap_item( |
2600 client, targets, mentions, destinee, node, item, is_public | 2712 client, targets, mentions, destinee, node, item, is_public |
2601 ) | 2713 ) |
2602 | 2714 |
2715 def get_requestor_actor_id_from_targets( | |
2716 self, | |
2717 targets: set[str] | |
2718 ) -> str: | |
2719 """Find local actor to use as requestor_actor_id from request targets. | |
2720 | |
2721 A local actor must be used to sign HTTP request, notably HTTP GET request for AP | |
2722 instance checking signature, such as Mastodon when set in "secure mode". | |
2723 | |
2724 This method check a set of targets and use the first local one. | |
2725 | |
2726 If none match, a generic local actor is used. | |
2727 | |
2728 @param targets: set of actor IDs to which the current request is sent. | |
2729 @return: local actor ID to use as requestor_actor_id. | |
2730 """ | |
2731 try: | |
2732 return next(t for t in targets if self.is_local_url(t)) | |
2733 except StopIteration: | |
2734 log.warning( | |
2735 f"Can't find local target to use as requestor ID: {targets!r}" | |
2736 ) | |
2737 return self.build_apurl( | |
2738 TYPE_ACTOR, f"libervia@{self.public_url}" | |
2739 ) | |
2740 | |
2603 async def handle_message_ap_item( | 2741 async def handle_message_ap_item( |
2604 self, | 2742 self, |
2605 client: SatXMPPEntity, | 2743 client: SatXMPPEntity, |
2606 targets: Dict[str, Set[str]], | 2744 targets: dict[str, Set[str]], |
2607 mentions: List[Dict[str, str]], | 2745 mentions: list[Dict[str, str]], |
2608 destinee: Optional[jid.JID], | 2746 destinee: jid.JID|None, |
2609 item: dict, | 2747 item: dict, |
2610 ) -> None: | 2748 ) -> None: |
2611 """Parse and deliver direct AP items translating to XMPP messages | 2749 """Parse and deliver direct AP items translating to XMPP messages |
2612 | 2750 |
2613 @param targets: actors where the item must be delivered | 2751 @param targets: actors where the item must be delivered |
2614 @param destinee: jid of the destinee, | 2752 @param destinee: jid of the destinee, |
2615 @param item: AP object payload | 2753 @param item: AP object payload |
2616 """ | 2754 """ |
2755 targets_urls = {t for t_set in targets.values() for t in t_set} | |
2756 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) | |
2617 targets_jids = { | 2757 targets_jids = { |
2618 await self.get_jid_from_id(t) | 2758 await self.get_jid_from_id(requestor_actor_id, url) |
2619 for t_set in targets.values() | 2759 for url in targets_urls |
2620 for t in t_set | |
2621 } | 2760 } |
2622 if destinee is not None: | 2761 if destinee is not None: |
2623 targets_jids.add(destinee) | 2762 targets_jids.add(destinee) |
2624 mb_data = await self.ap_item_2_mb_data(item) | 2763 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item) |
2625 extra = { | 2764 extra = { |
2626 "origin_id": mb_data["id"] | 2765 "origin_id": mb_data["id"] |
2627 } | 2766 } |
2628 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) | 2767 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) |
2629 if attachments: | 2768 if attachments: |
2641 ) | 2780 ) |
2642 await defer.DeferredList(defer_l) | 2781 await defer.DeferredList(defer_l) |
2643 | 2782 |
2644 async def notify_mentions( | 2783 async def notify_mentions( |
2645 self, | 2784 self, |
2646 targets: Dict[str, Set[str]], | 2785 targets: dict[str, set[str]], |
2647 mentions: List[Dict[str, str]], | 2786 mentions: list[dict[str, str]], |
2648 service: jid.JID, | 2787 service: jid.JID, |
2649 node: str, | 2788 node: str, |
2650 item_id: str, | 2789 item_id: str, |
2651 ) -> None: | 2790 ) -> None: |
2652 """Send mention notifications to recipients and mentioned entities | 2791 """Send mention notifications to recipients and mentioned entities |
2655 | 2794 |
2656 Mentions are also sent to recipients as they are primary audience (see | 2795 Mentions are also sent to recipients as they are primary audience (see |
2657 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). | 2796 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). |
2658 | 2797 |
2659 """ | 2798 """ |
2799 targets_urls = {t for t_set in targets.values() for t in t_set} | |
2800 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) | |
2660 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) | 2801 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) |
2661 seen = set() | 2802 seen = set() |
2662 # we start with explicit mentions because mentions' content will be used in the | 2803 # we start with explicit mentions because mentions' content will be used in the |
2663 # future to fill "begin" and "end" reference attributes (we can't do it at the | 2804 # future to fill "begin" and "end" reference attributes (we can't do it at the |
2664 # moment as there is no way to specify the XML element to use in the blog item). | 2805 # moment as there is no way to specify the XML element to use in the blog item). |
2665 for mention in mentions: | 2806 for mention in mentions: |
2666 mentioned_jid = await self.get_jid_from_id(mention["uri"]) | 2807 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"]) |
2667 self._refs.send_reference( | 2808 self._refs.send_reference( |
2668 self.client, | 2809 self.client, |
2669 to_jid=mentioned_jid, | 2810 to_jid=mentioned_jid, |
2670 anchor=anchor | 2811 anchor=anchor |
2671 ) | 2812 ) |
2672 seen.add(mentioned_jid) | 2813 seen.add(mentioned_jid) |
2673 | 2814 |
2674 remaining = { | 2815 remaining = { |
2675 await self.get_jid_from_id(t) | 2816 await self.get_jid_from_id(requestor_actor_id, t) |
2676 for t_set in targets.values() | 2817 for t_set in targets.values() |
2677 for t in t_set | 2818 for t in t_set |
2678 } - seen | 2819 } - seen |
2679 for target in remaining: | 2820 for target in remaining: |
2680 self._refs.send_reference( | 2821 self._refs.send_reference( |
2684 ) | 2825 ) |
2685 | 2826 |
2686 async def handle_pubsub_ap_item( | 2827 async def handle_pubsub_ap_item( |
2687 self, | 2828 self, |
2688 client: SatXMPPEntity, | 2829 client: SatXMPPEntity, |
2689 targets: Dict[str, Set[str]], | 2830 targets: dict[str, set[str]], |
2690 mentions: List[Dict[str, str]], | 2831 mentions: list[dict[str, str]], |
2691 destinee: Optional[jid.JID], | 2832 destinee: jid.JID|None, |
2692 node: str, | 2833 node: str, |
2693 item: dict, | 2834 item: dict, |
2694 public: bool | 2835 public: bool |
2695 ) -> None: | 2836 ) -> None: |
2696 """Analyse, cache and deliver AP items translating to Pubsub | 2837 """Analyse, cache and deliver AP items translating to Pubsub |
2700 @param node: XMPP pubsub node | 2841 @param node: XMPP pubsub node |
2701 @param item: AP object payload | 2842 @param item: AP object payload |
2702 @param public: True if the item is public | 2843 @param public: True if the item is public |
2703 """ | 2844 """ |
2704 # XXX: "public" is not used for now | 2845 # XXX: "public" is not used for now |
2846 targets_urls = {t for t_set in targets.values() for t in t_set} | |
2847 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) | |
2705 service = client.jid | 2848 service = client.jid |
2706 in_reply_to = item.get("inReplyTo") | 2849 in_reply_to = item.get("inReplyTo") |
2707 | 2850 |
2708 if in_reply_to and isinstance(in_reply_to, list): | 2851 if in_reply_to and isinstance(in_reply_to, list): |
2709 in_reply_to = in_reply_to[0] | 2852 in_reply_to = in_reply_to[0] |
2713 await self.new_reply_to_xmpp_item(client, item, targets, mentions) | 2856 await self.new_reply_to_xmpp_item(client, item, targets, mentions) |
2714 return | 2857 return |
2715 | 2858 |
2716 # this item is a reply to an AP item, we use or create a corresponding node | 2859 # this item is a reply to an AP item, we use or create a corresponding node |
2717 # for comments | 2860 # for comments |
2718 parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to) | 2861 parent_node, __ = await self.get_comments_nodes( |
2862 requestor_actor_id, | |
2863 item["id"], | |
2864 in_reply_to | |
2865 ) | |
2719 node = parent_node or node | 2866 node = parent_node or node |
2720 cached_node = await self.host.memory.storage.get_pubsub_node( | 2867 cached_node = await self.host.memory.storage.get_pubsub_node( |
2721 client, service, node, with_subscriptions=True, create=True, | 2868 client, service, node, with_subscriptions=True, create=True, |
2722 create_kwargs={"subscribed": True} | 2869 create_kwargs={"subscribed": True} |
2723 ) | 2870 ) |
2733 f"due to a cache purge. We synchronise the node\n{item}" | 2880 f"due to a cache purge. We synchronise the node\n{item}" |
2734 | 2881 |
2735 ) | 2882 ) |
2736 return | 2883 return |
2737 if item.get("type") == TYPE_EVENT: | 2884 if item.get("type") == TYPE_EVENT: |
2738 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) | 2885 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt( |
2739 else: | 2886 requestor_actor_id, |
2740 data, item_elt = await self.ap_item_2_mb_data_and_elt(item) | 2887 item |
2888 ) | |
2889 else: | |
2890 data, item_elt = await self.ap_item_2_mb_data_and_elt( | |
2891 requestor_actor_id, | |
2892 item | |
2893 ) | |
2741 await self.host.memory.storage.cache_pubsub_items( | 2894 await self.host.memory.storage.cache_pubsub_items( |
2742 client, | 2895 client, |
2743 cached_node, | 2896 cached_node, |
2744 [item_elt], | 2897 [item_elt], |
2745 [data] | 2898 [data] |