Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | bf0505d41c09 |
children | 125c7043b277 |
comparison
equal
deleted
inserted
replaced
3744:658ddbabaf36 | 3745:a8c7e5cef0cb |
---|---|
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import base64 | 19 import base64 |
20 import calendar | |
20 import hashlib | 21 import hashlib |
21 import json | 22 import json |
22 from pathlib import Path | 23 from pathlib import Path |
23 from typing import Optional, Dict, Tuple, List, Union | 24 from pprint import pformat |
25 import re | |
26 from typing import Any, Dict, List, Optional, Tuple, Union, overload | |
24 from urllib import parse | 27 from urllib import parse |
25 import calendar | 28 |
26 import re | 29 from cryptography.exceptions import InvalidSignature |
27 | |
28 import dateutil | |
29 from cryptography.hazmat.primitives import serialization | 30 from cryptography.hazmat.primitives import serialization |
30 from cryptography.hazmat.primitives import hashes | 31 from cryptography.hazmat.primitives import hashes |
31 from cryptography.hazmat.primitives.asymmetric import rsa | 32 from cryptography.hazmat.primitives.asymmetric import rsa |
32 from cryptography.hazmat.primitives.asymmetric import padding | 33 from cryptography.hazmat.primitives.asymmetric import padding |
34 import dateutil | |
33 import shortuuid | 35 import shortuuid |
36 from sqlalchemy.exc import IntegrityError | |
34 import treq | 37 import treq |
35 from treq.response import _Response as TReqResponse | 38 from treq.response import _Response as TReqResponse |
36 from twisted.internet import defer, reactor, threads | 39 from twisted.internet import defer, reactor, threads |
37 from twisted.web import http | 40 from twisted.web import http |
38 from twisted.words.protocols.jabber import jid, error | 41 from twisted.words.protocols.jabber import error, jid |
39 from twisted.words.xish import domish | 42 from twisted.words.xish import domish |
40 from wokkel import rsm | 43 from wokkel import rsm |
41 | 44 |
42 from sat.core import exceptions | 45 from sat.core import exceptions |
43 from sat.core.constants import Const as C | 46 from sat.core.constants import Const as C |
44 from sat.core.core_types import SatXMPPEntity | 47 from sat.core.core_types import SatXMPPEntity |
45 from sat.core.i18n import _ | 48 from sat.core.i18n import _ |
46 from sat.core.log import getLogger | 49 from sat.core.log import getLogger |
50 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState | |
47 from sat.tools import utils | 51 from sat.tools import utils |
48 from sat.tools.common import data_format, tls | 52 from sat.tools.common import data_format, tls, uri |
49 from sat.tools.common.async_utils import async_lru | 53 from sat.tools.common.async_utils import async_lru |
50 | 54 |
51 from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, | 55 from .constants import ( |
52 AP_MB_MAP, LRU_MAX_SIZE) | 56 ACTIVITY_OBJECT_MANDATORY, |
57 ACTIVITY_TARGET_MANDATORY, | |
58 ACTIVITY_TYPES, | |
59 ACTIVITY_TYPES_LOWER, | |
60 AP_MB_MAP, | |
61 COMMENTS_MAX_PARENTS, | |
62 CONF_SECTION, | |
63 IMPORT_NAME, | |
64 LRU_MAX_SIZE, | |
65 MEDIA_TYPE_AP, | |
66 TYPE_ACTOR, | |
67 TYPE_ITEM, | |
68 ) | |
53 from .http_server import HTTPServer | 69 from .http_server import HTTPServer |
54 from .pubsub_service import APPubsubService | 70 from .pubsub_service import APPubsubService |
55 | 71 |
56 | 72 |
57 log = getLogger(__name__) | 73 log = getLogger(__name__) |
62 C.PI_NAME: "ActivityPub Gateway component", | 78 C.PI_NAME: "ActivityPub Gateway component", |
63 C.PI_IMPORT_NAME: IMPORT_NAME, | 79 C.PI_IMPORT_NAME: IMPORT_NAME, |
64 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
65 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
66 C.PI_PROTOCOLS: [], | 82 C.PI_PROTOCOLS: [], |
67 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], | 83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"], |
68 C.PI_RECOMMENDATIONS: [], | 84 C.PI_RECOMMENDATIONS: [], |
69 C.PI_MAIN: "APGateway", | 85 C.PI_MAIN: "APGateway", |
70 C.PI_HANDLER: C.BOOL_TRUE, | 86 C.PI_HANDLER: C.BOOL_TRUE, |
71 C.PI_DESCRIPTION: _( | 87 C.PI_DESCRIPTION: _( |
72 "Gateway for bidirectional communication between XMPP and ActivityPub." | 88 "Gateway for bidirectional communication between XMPP and ActivityPub." |
84 def __init__(self, host): | 100 def __init__(self, host): |
85 self.host = host | 101 self.host = host |
86 self.initialised = False | 102 self.initialised = False |
87 self._m = host.plugins["XEP-0277"] | 103 self._m = host.plugins["XEP-0277"] |
88 self._p = host.plugins["XEP-0060"] | 104 self._p = host.plugins["XEP-0060"] |
105 self._e = host.plugins["XEP-0106"] | |
106 self._c = host.plugins["PUBSUB_CACHE"] | |
107 self.pubsub_service = APPubsubService(self) | |
89 | 108 |
90 host.bridge.addMethod( | 109 host.bridge.addMethod( |
91 "APSend", | 110 "APSend", |
92 ".plugin", | 111 ".plugin", |
93 in_sign="sss", | 112 in_sign="sss", |
95 method=self._publishMessage, | 114 method=self._publishMessage, |
96 async_=True, | 115 async_=True, |
97 ) | 116 ) |
98 | 117 |
99 def getHandler(self, __): | 118 def getHandler(self, __): |
100 return APPubsubService(self) | 119 return self.pubsub_service |
101 | 120 |
102 async def init(self, client): | 121 async def init(self, client): |
103 if self.initialised: | 122 if self.initialised: |
104 return | 123 return |
105 | 124 |
163 if connection_type not in ('http', 'https'): | 182 if connection_type not in ('http', 'https'): |
164 raise exceptions.ConfigError( | 183 raise exceptions.ConfigError( |
165 'bad ap-gateay http_connection_type, you must use one of "http" or ' | 184 'bad ap-gateay http_connection_type, you must use one of "http" or ' |
166 '"https"' | 185 '"https"' |
167 ) | 186 ) |
168 self.max_items = self.host.memory.getConfig( | 187 self.max_items = int(self.host.memory.getConfig( |
169 CONF_SECTION, 'new_node_max_items', 50 | 188 CONF_SECTION, 'new_node_max_items', 50 |
170 | 189 |
171 ) | 190 )) |
191 self.comments_max_depth = int(self.host.memory.getConfig( | |
192 CONF_SECTION, 'comments_max_depth', 0 | |
193 )) | |
172 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') | 194 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') |
173 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") | 195 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") |
174 # True (default) if we provide gateway only to entities/services from our server | 196 # True (default) if we provide gateway only to entities/services from our server |
175 self.local_only = C.bool( | 197 self.local_only = C.bool( |
176 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) | 198 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) |
188 reactor.listenSSL(self.http_port, self.server, context_factory) | 210 reactor.listenSSL(self.http_port, self.server, context_factory) |
189 | 211 |
190 async def profileConnecting(self, client): | 212 async def profileConnecting(self, client): |
191 self.client = client | 213 self.client = client |
192 await self.init(client) | 214 await self.init(client) |
215 | |
216 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: | |
217 """Get client for this component with a specified jid | |
218 | |
219 This is needed to perform operations with the virtual JID corresponding to the AP | |
220 actor instead of the JID of the gateway itself. | |
221 @param actor_id: ID of the actor | |
222 @return: virtual client | |
223 """ | |
224 account = await self.getAPAccountFromId(actor_id) | |
225 local_jid = self.getLocalJIDFromAccount(account) | |
226 return self.client.getVirtualClient(local_jid) | |
227 | |
228 def isActivity(self, data: dict) -> bool: | |
229 """Return True if the data has an activity type""" | |
230 try: | |
231 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER | |
232 except (KeyError, TypeError): | |
233 return False | |
193 | 234 |
194 async def apGet(self, url: str) -> dict: | 235 async def apGet(self, url: str) -> dict: |
195 """Retrieve AP JSON from given URL | 236 """Retrieve AP JSON from given URL |
196 | 237 |
197 @raise error.StanzaError: "service-unavailable" is sent when something went wrong | 238 @raise error.StanzaError: "service-unavailable" is sent when something went wrong |
512 "This gateway is configured to map only local entities and services" | 553 "This gateway is configured to map only local entities and services" |
513 ) | 554 ) |
514 | 555 |
515 return jid_, node | 556 return jid_, node |
516 | 557 |
517 def parseAPURL(self, url: str) -> Tuple[str, str]: | 558 def getLocalJIDFromAccount(self, account: str) -> jid.JID: |
559 """Compute JID linking to an AP account | |
560 | |
561 The local jid is computer by escaping AP actor handle and using it as local part | |
562 of JID, where domain part is this gateway own JID | |
563 """ | |
564 return jid.JID( | |
565 None, | |
566 ( | |
567 self._e.escape(account), | |
568 self.client.jid.host, | |
569 None | |
570 ) | |
571 ) | |
572 | |
573 def parseAPURL(self, url: str) -> Tuple[str, List[str]]: | |
518 """Parse an URL leading to an AP endpoint | 574 """Parse an URL leading to an AP endpoint |
519 | 575 |
520 @param url: URL to parse (schema is not mandatory) | 576 @param url: URL to parse (schema is not mandatory) |
521 @return: endpoint type and AP account | 577 @return: endpoint type and extra arguments |
522 """ | 578 """ |
523 path = parse.urlparse(url).path.lstrip("/") | 579 path = parse.urlparse(url).path.lstrip("/") |
524 type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) | 580 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1) |
525 return type_, parse.unquote(account) | 581 return type_, [parse.unquote(a) for a in extra_args] |
526 | 582 |
527 def buildAPURL(self, type_:str , *args: str) -> str: | 583 def buildAPURL(self, type_:str , *args: str) -> str: |
528 """Build an AP endpoint URL | 584 """Build an AP endpoint URL |
529 | 585 |
530 @param type_: type of AP endpoing | 586 @param type_: type of AP endpoing |
533 return parse.urljoin( | 589 return parse.urljoin( |
534 self.base_ap_url, | 590 self.base_ap_url, |
535 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) | 591 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) |
536 ) | 592 ) |
537 | 593 |
538 async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: | 594 def buildSignatureHeader(self, values: Dict[str, str]) -> str: |
539 """Sign a documentent and post it to AP server | 595 """Build key="<value>" signature header from signature data""" |
540 | 596 fields = [] |
541 @param url: AP server endpoint | 597 for key, value in values.items(): |
542 @param url_actor: URL generated by this gateway for local actor | 598 if key not in ("(created)", "(expired)"): |
543 @param doc: document to send | 599 if '"' in value: |
544 """ | 600 raise NotImplementedError( |
545 p_url = parse.urlparse(url) | 601 "string escaping is not implemented, double-quote can't be used " |
546 date = http.datetimeToString().decode() | 602 f"in {value!r}" |
547 body = json.dumps(doc).encode() | 603 ) |
548 digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() | 604 value = f'"{value}"' |
549 digest = f"sha-256={digest_hash}" | 605 fields.append(f"{key}={value}") |
550 to_sign = ( | 606 |
551 f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" | 607 return ",".join(fields) |
552 f"date: {date}\ndigest: {digest}" | 608 |
553 ) | 609 def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: |
610 """Get digest data to use in header and signature | |
611 | |
612 @param body: body of the request | |
613 @return: hash name and digest | |
614 """ | |
615 if algo != "SHA-256": | |
616 raise NotImplementedError("only SHA-256 is implemented for now") | |
617 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() | |
618 | |
619 @async_lru(maxsize=LRU_MAX_SIZE) | |
620 async def getActorPubKeyData( | |
621 self, | |
622 actor_id: str | |
623 ) -> Tuple[str, str, rsa.RSAPublicKey]: | |
624 """Retrieve Public Key data from actor ID | |
625 | |
626 @param actor_id: actor ID (url) | |
627 @return: key_id, owner and public_key | |
628 @raise KeyError: publicKey is missing from actor data | |
629 """ | |
630 actor_data = await self.apGet(actor_id) | |
631 pub_key_data = actor_data["publicKey"] | |
632 key_id = pub_key_data["id"] | |
633 owner = pub_key_data["owner"] | |
634 pub_key_pem = pub_key_data["publicKeyPem"] | |
635 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) | |
636 return key_id, owner, pub_key | |
637 | |
638 def createActivity( | |
639 self, | |
640 activity: str, | |
641 actor_id: str, | |
642 object_: Optional[Union[str, dict]] = None, | |
643 target: Optional[Union[str, dict]] = None, | |
644 **kwargs, | |
645 ) -> Dict[str, Any]: | |
646 """Generate base data for an activity | |
647 | |
648 @param activity: one of ACTIVITY_TYPES | |
649 """ | |
650 if activity not in ACTIVITY_TYPES: | |
651 raise exceptions.InternalError(f"invalid activity: {activity!r}") | |
652 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: | |
653 raise exceptions.InternalError( | |
654 f'"object_" is mandatory for activity {activity!r}' | |
655 ) | |
656 if target is None and activity in ACTIVITY_TARGET_MANDATORY: | |
657 raise exceptions.InternalError( | |
658 f'"target" is mandatory for activity {activity!r}' | |
659 ) | |
660 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" | |
661 data: Dict[str, Any] = { | |
662 "@context": "https://www.w3.org/ns/activitystreams", | |
663 "actor": actor_id, | |
664 "id": activity_id, | |
665 "type": activity, | |
666 } | |
667 data.update(kwargs) | |
668 if object_ is not None: | |
669 data["object"] = object_ | |
670 if target is not None: | |
671 data["target"] = target | |
672 | |
673 return data | |
674 | |
675 def getKeyId(self, actor_id: str) -> str: | |
676 """Get local key ID from actor ID""" | |
677 return f"{actor_id}#main-key" | |
678 | |
679 async def checkSignature( | |
680 self, | |
681 signature: str, | |
682 key_id: str, | |
683 headers: Dict[str, str] | |
684 ) -> str: | |
685 """Verify that signature matches given headers | |
686 | |
687 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 | |
688 | |
689 @param signature: Base64 encoded signature | |
690 @param key_id: ID of the key used to sign the data | |
691 @param headers: headers and their values, including pseudo-headers | |
692 @return: id of the signing actor | |
693 | |
694 @raise InvalidSignature: signature doesn't match headers | |
695 """ | |
696 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) | |
697 if key_id.startswith("acct:"): | |
698 actor = key_id[5:] | |
699 actor_id = await self.getAPActorIdFromAccount(actor) | |
700 else: | |
701 actor_id = key_id.split("#", 1)[0] | |
702 | |
703 pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id) | |
704 if pub_key_id != key_id or pub_key_owner != actor_id: | |
705 raise exceptions.EncryptionError("Public Key mismatch") | |
706 | |
707 try: | |
708 pub_key.verify( | |
709 base64.b64decode(signature), | |
710 to_sign.encode(), | |
711 # we have to use PKCS1v15 padding to be compatible with Mastodon | |
712 padding.PKCS1v15(), # type: ignore | |
713 hashes.SHA256() # type: ignore | |
714 ) | |
715 except InvalidSignature: | |
716 raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)") | |
717 | |
718 return actor_id | |
719 | |
720 def getSignatureData( | |
721 self, | |
722 key_id: str, | |
723 headers: Dict[str, str] | |
724 ) -> Tuple[Dict[str, str], Dict[str, str]]: | |
725 """Generate and return signature and corresponding headers | |
726 | |
727 @param parsed_url: URL where the request is sent/has been received | |
728 @param key_id: ID of the key (URL linking to the data with public key) | |
729 @param date: HTTP datetime string of signature generation | |
730 @param body: body of the HTTP request | |
731 @param headers: headers to sign and their value: | |
732 default value will be used if not specified | |
733 | |
734 @return: headers and signature data | |
735 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers | |
736 removed, and ``Signature`` added. | |
737 """ | |
738 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) | |
554 signature = base64.b64encode(self.private_key.sign( | 739 signature = base64.b64encode(self.private_key.sign( |
555 to_sign.encode(), | 740 to_sign.encode(), |
556 # we have to use PKCS1v15 padding to be compatible with Mastodon | 741 # we have to use PKCS1v15 padding to be compatible with Mastodon |
557 padding.PKCS1v15(), | 742 padding.PKCS1v15(), # type: ignore |
558 hashes.SHA256() | 743 hashes.SHA256() # type: ignore |
559 )).decode() | 744 )).decode() |
560 h_signature = ( | 745 sign_data = { |
561 f'keyId="{url_actor}",headers="(request-target) host date digest",' | 746 "keyId": key_id, |
562 f'signature="{signature}"' | 747 "Algorithm": "rsa-sha256", |
748 "headers": " ".join(headers.keys()), | |
749 "signature": signature | |
750 } | |
751 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} | |
752 new_headers["Signature"] = self.buildSignatureHeader(sign_data) | |
753 return new_headers, sign_data | |
754 | |
755 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: | |
756 """Sign a documentent and post it to AP server | |
757 | |
758 @param url: AP server endpoint | |
759 @param actor_id: originating actor ID (URL) | |
760 @param doc: document to send | |
761 """ | |
762 p_url = parse.urlparse(url) | |
763 body = json.dumps(doc).encode() | |
764 digest_algo, digest_hash = self.getDigest(body) | |
765 digest = f"{digest_algo}={digest_hash}" | |
766 | |
767 headers = { | |
768 "(request-target)": f"post {p_url.path}", | |
769 "Host": p_url.hostname, | |
770 "Date": http.datetimeToString().decode(), | |
771 "Digest": digest | |
772 } | |
773 headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers) | |
774 | |
775 headers["Content-Type"] = ( | |
776 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' | |
563 ) | 777 ) |
564 return await treq.post( | 778 resp = await treq.post( |
565 url, | 779 url, |
566 body, | 780 body, |
567 headers={ | 781 headers=headers, |
568 "Host": [p_url.hostname], | |
569 "Date": [date], | |
570 "Digest": [digest], | |
571 "Signature": [h_signature], | |
572 } | |
573 ) | 782 ) |
783 if resp.code >= 400: | |
784 text = await resp.text() | |
785 log.warning(f"POST request to {url} failed: {text}") | |
786 return resp | |
574 | 787 |
575 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): | 788 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): |
576 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | 789 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore |
577 service = jid.JID(service_s) | 790 service = jid.JID(service_s) |
578 client = self.host.getClient(profile) | 791 client = self.host.getClient(profile) |
579 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) | 792 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) |
580 | 793 |
794 @async_lru(maxsize=LRU_MAX_SIZE) | |
581 async def getAPActorIdFromAccount(self, account: str) -> str: | 795 async def getAPActorIdFromAccount(self, account: str) -> str: |
582 """Retrieve account ID from it's handle using WebFinger | 796 """Retrieve account ID from it's handle using WebFinger |
583 | 797 |
584 @param account: AP handle (user@domain.tld) | 798 @param account: AP handle (user@domain.tld) |
585 @return: Actor ID (which is an URL) | 799 @return: Actor ID (which is an URL) |
609 raise ValueError( | 823 raise ValueError( |
610 f"No ActivityPub link found for {account!r}" | 824 f"No ActivityPub link found for {account!r}" |
611 ) | 825 ) |
612 return href | 826 return href |
613 | 827 |
614 async def getAPActorDataFromId(self, account: str) -> dict: | 828 async def getAPActorDataFromAccount(self, account: str) -> dict: |
615 """Retrieve ActivityPub Actor data | 829 """Retrieve ActivityPub Actor data |
616 | 830 |
617 @param account: ActivityPub Actor identifier | 831 @param account: ActivityPub Actor identifier |
618 """ | 832 """ |
619 href = await self.getAPActorIdFromAccount(account) | 833 href = await self.getAPActorIdFromAccount(account) |
620 return await self.apGet(href) | 834 return await self.apGet(href) |
621 | 835 |
622 @async_lru(maxsize=LRU_MAX_SIZE) | 836 @async_lru(maxsize=LRU_MAX_SIZE) |
623 async def getAPAccountFromId(self, actor_id: str): | 837 async def getAPInboxFromId(self, actor_id: str) -> str: |
838 """Retrieve inbox of an actor_id""" | |
839 data = await self.apGet(actor_id) | |
840 return data["inbox"] | |
841 | |
842 @async_lru(maxsize=LRU_MAX_SIZE) | |
843 async def getAPAccountFromId(self, actor_id: str) -> str: | |
624 """Retrieve AP account from the ID URL | 844 """Retrieve AP account from the ID URL |
625 | 845 |
626 @param actor_id: AP ID of the actor (URL to the actor data) | 846 @param actor_id: AP ID of the actor (URL to the actor data) |
627 """ | 847 """ |
628 url_parsed = parse.urlparse(actor_id) | 848 url_parsed = parse.urlparse(actor_id) |
646 raise exceptions.DataError(msg) | 866 raise exceptions.DataError(msg) |
647 return account | 867 return account |
648 | 868 |
649 async def getAPItems( | 869 async def getAPItems( |
650 self, | 870 self, |
651 account: str, | 871 collection: dict, |
652 max_items: Optional[int] = None, | 872 max_items: Optional[int] = None, |
653 chronological_pagination: bool = True, | 873 chronological_pagination: bool = True, |
654 after_id: Optional[str] = None, | 874 after_id: Optional[str] = None, |
655 start_index: Optional[int] = None, | 875 start_index: Optional[int] = None, |
656 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | 876 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: |
673 Due to ActivityStream Collection Paging limitations, this is inefficient and | 893 Due to ActivityStream Collection Paging limitations, this is inefficient and |
674 all pages before the requested index will be retrieved to count items. | 894 all pages before the requested index will be retrieved to count items. |
675 @return: XMPP Pubsub items and corresponding RSM Response | 895 @return: XMPP Pubsub items and corresponding RSM Response |
676 Items are always returned in chronological order in the result | 896 Items are always returned in chronological order in the result |
677 """ | 897 """ |
678 actor_data = await self.getAPActorDataFromId(account) | |
679 outbox = actor_data.get("outbox") | |
680 rsm_resp: Dict[str, Union[bool, int]] = {} | 898 rsm_resp: Dict[str, Union[bool, int]] = {} |
681 if not outbox: | |
682 raise exceptions.DataError(f"No outbox found for actor {account}") | |
683 outbox_data = await self.apGet(outbox) | |
684 try: | 899 try: |
685 count = outbox_data["totalItems"] | 900 count = collection["totalItems"] |
686 except KeyError: | 901 except KeyError: |
687 log.warning( | 902 log.warning( |
688 f'"totalItems" not found in outbox of {account}, defaulting to 20' | 903 f'"totalItems" not found in collection {collection.get("id")}, ' |
904 "defaulting to 20" | |
689 ) | 905 ) |
690 count = 20 | 906 count = 20 |
691 else: | 907 else: |
692 log.info(f"{account}'s outbox has {count} item(s)") | 908 log.info(f"{collection.get('id')} has {count} item(s)") |
909 | |
693 rsm_resp["count"] = count | 910 rsm_resp["count"] = count |
694 | 911 |
695 if start_index is not None: | 912 if start_index is not None: |
696 assert chronological_pagination and after_id is None | 913 assert chronological_pagination and after_id is None |
697 if start_index >= count: | 914 if start_index >= count: |
708 else: | 925 else: |
709 # we'll convert "start_index" to "after_id", thus we need the item just | 926 # we'll convert "start_index" to "after_id", thus we need the item just |
710 # before "start_index" | 927 # before "start_index" |
711 previous_index = start_index - 1 | 928 previous_index = start_index - 1 |
712 retrieved_items = 0 | 929 retrieved_items = 0 |
713 current_page = outbox_data["last"] | 930 current_page = collection["last"] |
714 while retrieved_items < count: | 931 while retrieved_items < count: |
715 page_data, items = await self.parseAPPage(current_page) | 932 page_data, items = await self.parseAPPage(current_page) |
716 if not items: | 933 if not items: |
717 log.warning(f"found an empty AP page at {current_page}") | 934 log.warning(f"found an empty AP page at {current_page}") |
718 return [], rsm_resp | 935 return [], rsm_resp |
732 "Error while retrieving previous page from AP service at " | 949 "Error while retrieving previous page from AP service at " |
733 f"{current_page}" | 950 f"{current_page}" |
734 ) | 951 ) |
735 | 952 |
736 init_page = "last" if chronological_pagination else "first" | 953 init_page = "last" if chronological_pagination else "first" |
737 page = outbox_data.get(init_page) | 954 page = collection.get(init_page) |
738 if not page: | 955 if not page: |
739 raise exceptions.DataError( | 956 raise exceptions.DataError( |
740 f"Initial page {init_page!r} not found for outbox {outbox}" | 957 f"Initial page {init_page!r} not found for collection " |
958 f"{collection.get('id')})" | |
741 ) | 959 ) |
742 items = [] | 960 items = [] |
743 page_items = [] | 961 page_items = [] |
744 retrieved_items = 0 | 962 retrieved_items = 0 |
745 found_after_id = False | 963 found_after_id = False |
746 | 964 |
747 while retrieved_items < count: | 965 while retrieved_items < count: |
748 __, page_items = await self.parseAPPage(page) | 966 __, page_items = await self.parseAPPage(page) |
967 if not page_items: | |
968 break | |
749 retrieved_items += len(page_items) | 969 retrieved_items += len(page_items) |
750 if after_id is not None and not found_after_id: | 970 if after_id is not None and not found_after_id: |
751 # if we have an after_id, we ignore all items until the requested one is | 971 # if we have an after_id, we ignore all items until the requested one is |
752 # found | 972 # found |
753 try: | 973 try: |
754 limit_idx = [i["id"] for i in page_items].index(after_id) | 974 limit_idx = [i["id"] for i in page_items].index(after_id) |
755 except ValueError: | 975 except ValueError: |
756 # if "after_id" is not found, we don't add any item from this page | 976 # if "after_id" is not found, we don't add any item from this page |
757 log.debug(f"{after_id!r} not found at {page}, skipping") | 977 page_id = page.get("id") if isinstance(page, dict) else page |
978 log.debug(f"{after_id!r} not found at {page_id}, skipping") | |
758 else: | 979 else: |
759 found_after_id = True | 980 found_after_id = True |
760 if chronological_pagination: | 981 if chronological_pagination: |
761 start_index = retrieved_items - len(page_items) + limit_idx + 1 | 982 start_index = retrieved_items - len(page_items) + limit_idx + 1 |
762 page_items = page_items[limit_idx+1:] | 983 page_items = page_items[limit_idx+1:] |
771 if chronological_pagination: | 992 if chronological_pagination: |
772 items = items[:max_items] | 993 items = items[:max_items] |
773 else: | 994 else: |
774 items = items[-max_items:] | 995 items = items[-max_items:] |
775 break | 996 break |
776 page = outbox_data.get("prev" if chronological_pagination else "next") | 997 page = collection.get("prev" if chronological_pagination else "next") |
777 if not page: | 998 if not page: |
778 break | 999 break |
779 | 1000 |
780 if after_id is not None and not found_after_id: | 1001 if after_id is not None and not found_after_id: |
781 raise error.StanzaError("item-not-found") | 1002 raise error.StanzaError("item-not-found") |
782 | 1003 |
783 if after_id is None: | |
784 rsm_resp["index"] = 0 if chronological_pagination else count - len(items) | |
785 | |
786 if start_index is not None: | |
787 rsm_resp["index"] = start_index | |
788 elif after_id is not None: | |
789 log.warning("Can't determine index of first element") | |
790 elif chronological_pagination: | |
791 rsm_resp["index"] = 0 | |
792 else: | |
793 rsm_resp["index"] = count - len(items) | |
794 if items: | 1004 if items: |
1005 if after_id is None: | |
1006 rsm_resp["index"] = 0 if chronological_pagination else count - len(items) | |
1007 if start_index is not None: | |
1008 rsm_resp["index"] = start_index | |
1009 elif after_id is not None: | |
1010 log.warning("Can't determine index of first element") | |
1011 elif chronological_pagination: | |
1012 rsm_resp["index"] = 0 | |
1013 else: | |
1014 rsm_resp["index"] = count - len(items) | |
795 rsm_resp.update({ | 1015 rsm_resp.update({ |
796 "first": items[0]["id"], | 1016 "first": items[0]["id"], |
797 "last": items[-1]["id"] | 1017 "last": items[-1]["id"] |
798 }) | 1018 }) |
799 | 1019 |
800 return items, rsm.RSMResponse(**rsm_resp) | 1020 return items, rsm.RSMResponse(**rsm_resp) |
801 | 1021 |
802 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: | 1022 async def parseAPPage( |
1023 self, | |
1024 page: Union[str, dict] | |
1025 ) -> Tuple[dict, List[domish.Element]]: | |
803 """Convert AP objects from an AP page to XMPP items | 1026 """Convert AP objects from an AP page to XMPP items |
804 | 1027 |
805 @param url: url linking and AP page | 1028 @param page: Can be either url linking and AP page, or the page data directly |
806 @return: page data, pubsub items | 1029 @return: page data, pubsub items |
807 """ | 1030 """ |
808 page_data = await self.apGet(url) | 1031 page_data = await self.apGetObject(page) |
809 ap_items = page_data.get("orderedItems") | 1032 if page_data is None: |
810 if not ap_items: | 1033 log.warning('No data found in collection') |
811 log.warning('No "orderedItems" collection found') | 1034 return {}, [] |
812 return page_data, [] | 1035 ap_items = await self.apGetList(page_data, "orderedItems") |
1036 if ap_items is None: | |
1037 ap_items = await self.apGetList(page_data, "items") | |
1038 if not ap_items: | |
1039 log.warning(f'No item field found in collection: {page_data!r}') | |
1040 return page_data, [] | |
1041 else: | |
1042 log.warning( | |
1043 "Items are not ordered, this is not spec compliant" | |
1044 ) | |
813 items = [] | 1045 items = [] |
814 # AP Collections are in antichronological order, but we expect chronological in | 1046 # AP Collections are in antichronological order, but we expect chronological in |
815 # Pubsub, thus we reverse it | 1047 # Pubsub, thus we reverse it |
816 for ap_item in reversed(ap_items): | 1048 for ap_item in reversed(ap_items): |
817 try: | 1049 try: |
818 ap_object, mb_data = await self.apItem2MBdata(ap_item) | 1050 items.append(await self.apItem2Elt(ap_item)) |
819 except (exceptions.DataError, NotImplementedError, error.StanzaError): | 1051 except (exceptions.DataError, NotImplementedError, error.StanzaError): |
820 continue | 1052 continue |
821 | 1053 |
822 item_elt = await self._m.data2entry( | |
823 self.client, mb_data, ap_object["id"], None, self._m.namespace | |
824 ) | |
825 item_elt["publisher"] = mb_data["author_jid"].full() | |
826 items.append(item_elt) | |
827 | |
828 return page_data, items | 1054 return page_data, items |
829 | 1055 |
830 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: | 1056 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: |
831 """Convert AP item to microblog data | 1057 """Convert AP item to parsed microblog data and corresponding item element""" |
1058 mb_data = await self.apItem2MBdata(ap_item) | |
1059 item_elt = await self._m.data2entry( | |
1060 self.client, mb_data, mb_data["id"], None, self._m.namespace | |
1061 ) | |
1062 item_elt["publisher"] = mb_data["author_jid"] | |
1063 return mb_data, item_elt | |
1064 | |
1065 async def apItem2Elt(self, ap_item: dict) -> domish.Element: | |
1066 """Convert AP item to XMPP item element""" | |
1067 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | |
1068 return item_elt | |
1069 | |
1070 async def getCommentsNodes( | |
1071 self, | |
1072 item_id: str, | |
1073 parent_id: Optional[str] | |
1074 ) -> Tuple[Optional[str], Optional[str]]: | |
1075 """Get node where this item is and node to use for comments | |
1076 | |
1077 if config option "comments_max_depth" is set, a common node will be used below the | |
1078 given depth | |
1079 @param item_id: ID of the reference item | |
1080 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") | |
1081 @return: a tuple with parent_node_id, comments_node_id: | |
1082 - parent_node_id is the ID of the node where reference item must be. None is | |
1083 returned when the root node (i.e. not a comments node) must be used. | |
1084 - comments_node_id: is the ID of the node to use for comments. None is | |
1085 returned when no comment node must be used (happens when we have reached | |
1086 "comments_max_depth") | |
1087 """ | |
1088 if parent_id is None or not self.comments_max_depth: | |
1089 return ( | |
1090 self._m.getCommentsNode(parent_id) if parent_id is not None else None, | |
1091 self._m.getCommentsNode(item_id) | |
1092 ) | |
1093 parent_url = parent_id | |
1094 parents = [] | |
1095 for __ in range(COMMENTS_MAX_PARENTS): | |
1096 parent_item = await self.apGet(parent_url) | |
1097 parents.insert(0, parent_item) | |
1098 parent_url = parent_item.get("inReplyTo") | |
1099 if parent_url is None: | |
1100 break | |
1101 parent_limit = self.comments_max_depth-1 | |
1102 if len(parents) <= parent_limit: | |
1103 return ( | |
1104 self._m.getCommentsNode(parents[-1]["id"]), | |
1105 self._m.getCommentsNode(item_id) | |
1106 ) | |
1107 else: | |
1108 last_level_item = parents[parent_limit] | |
1109 return ( | |
1110 self._m.getCommentsNode(last_level_item["id"]), | |
1111 None | |
1112 ) | |
1113 | |
1114 async def apItem2MBdata(self, ap_item: dict) -> dict: | |
1115 """Convert AP activity or object to microblog data | |
832 | 1116 |
833 @return: AP Item's Object and microblog data | 1117 @return: AP Item's Object and microblog data |
834 @raise exceptions.DataError: something is invalid in the AP item | 1118 @raise exceptions.DataError: something is invalid in the AP item |
835 @raise NotImplemented: some AP data is not handled yet | 1119 @raise NotImplemented: some AP data is not handled yet |
836 @raise error.StanzaError: error while contacting the AP server | 1120 @raise error.StanzaError: error while contacting the AP server |
837 """ | 1121 """ |
838 ap_object = ap_item.get("object") | 1122 is_activity = self.isActivity(ap_item) |
839 if not ap_object: | 1123 if is_activity: |
840 log.warning(f'No "object" found in AP item {ap_item!r}') | 1124 ap_object = await self.apGetObject(ap_item, "object") |
1125 if not ap_object: | |
1126 log.warning(f'No "object" found in AP item {ap_item!r}') | |
1127 raise exceptions.DataError | |
1128 else: | |
1129 ap_object = ap_item | |
1130 item_id = ap_object.get("id") | |
1131 if not item_id: | |
1132 log.warning(f'No "id" found in AP item: {ap_object!r}') | |
841 raise exceptions.DataError | 1133 raise exceptions.DataError |
842 if isinstance(ap_object, str): | 1134 mb_data = {"id": item_id} |
843 ap_object = await self.apGet(ap_object) | |
844 obj_id = ap_object.get("id") | |
845 if not obj_id: | |
846 log.warning(f'No "id" found in AP object: {ap_object!r}') | |
847 raise exceptions.DataError | |
848 if ap_object.get("inReplyTo") is not None: | |
849 raise NotImplementedError | |
850 mb_data = {} | |
851 for ap_key, mb_key in AP_MB_MAP.items(): | 1135 for ap_key, mb_key in AP_MB_MAP.items(): |
852 data = ap_object.get(ap_key) | 1136 data = ap_object.get(ap_key) |
853 if data is None: | 1137 if data is None: |
854 continue | 1138 continue |
855 mb_data[mb_key] = data | 1139 mb_data[mb_key] = data |
866 else: | 1150 else: |
867 mb_data["language"] = language | 1151 mb_data["language"] = language |
868 mb_data["content_xhtml"] = content_xhtml | 1152 mb_data["content_xhtml"] = content_xhtml |
869 | 1153 |
870 # author | 1154 # author |
871 actor = ap_item.get("actor") | 1155 if is_activity: |
872 if not actor: | 1156 authors = await self.apGetActors(ap_item, "actor") |
873 log.warning(f"no actor associated to object id {obj_id!r}") | 1157 else: |
874 raise exceptions.DataError | 1158 authors = await self.apGetActors(ap_object, "attributedTo") |
875 elif isinstance(actor, list): | 1159 if len(authors) > 1: |
876 # we only keep first item of list as author | 1160 # we only keep first item as author |
877 # TODO: handle multiple actors | 1161 # TODO: handle multiple actors |
878 if len(actor) > 1: | 1162 log.warning("multiple actors are not managed") |
879 log.warning("multiple actors are not managed") | 1163 |
880 actor = actor[0] | 1164 account = authors[0] |
881 | 1165 author_jid = self.getLocalJIDFromAccount(account).full() |
882 if isinstance(actor, dict): | 1166 |
883 actor = actor.get("id") | 1167 mb_data["author"] = account.split("@", 1)[0] |
884 if not actor: | 1168 mb_data["author_jid"] = author_jid |
885 log.warning(f"no actor id found: {actor!r}") | |
886 raise exceptions.DataError | |
887 | |
888 if isinstance(actor, str): | |
889 account = await self.getAPAccountFromId(actor) | |
890 mb_data["author"] = account.split("@", 1)[0] | |
891 author_jid = mb_data["author_jid"] = jid.JID( | |
892 None, | |
893 ( | |
894 self.host.plugins["XEP-0106"].escape(account), | |
895 self.client.jid.host, | |
896 None | |
897 ) | |
898 ) | |
899 else: | |
900 log.warning(f"unknown actor type found: {actor!r}") | |
901 raise exceptions.DataError | |
902 | 1169 |
903 # published/updated | 1170 # published/updated |
904 for field in ("published", "updated"): | 1171 for field in ("published", "updated"): |
905 value = ap_object.get(field) | 1172 value = ap_object.get(field) |
906 if not value and field == "updated": | 1173 if not value and field == "updated": |
910 mb_data[field] = calendar.timegm( | 1177 mb_data[field] = calendar.timegm( |
911 dateutil.parser.parse(str(value)).utctimetuple() | 1178 dateutil.parser.parse(str(value)).utctimetuple() |
912 ) | 1179 ) |
913 except dateutil.parser.ParserError as e: | 1180 except dateutil.parser.ParserError as e: |
914 log.warning(f"Can't parse {field!r} field: {e}") | 1181 log.warning(f"Can't parse {field!r} field: {e}") |
915 return ap_object, mb_data | 1182 |
1183 # comments | |
1184 in_reply_to = ap_object.get("inReplyTo") | |
1185 __, comments_node = await self.getCommentsNodes(item_id, in_reply_to) | |
1186 if comments_node is not None: | |
1187 comments_data = { | |
1188 "service": author_jid, | |
1189 "node": comments_node, | |
1190 "uri": uri.buildXMPPUri( | |
1191 "pubsub", | |
1192 path=author_jid, | |
1193 node=comments_node | |
1194 ) | |
1195 } | |
1196 mb_data["comments"] = [comments_data] | |
1197 | |
1198 return mb_data | |
916 | 1199 |
917 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: | 1200 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: |
918 """Convert Libervia Microblog Data to ActivityPub item""" | 1201 """Convert Libervia Microblog Data to ActivityPub item""" |
919 if not mb_data.get("id"): | 1202 if not mb_data.get("id"): |
920 mb_data["id"] = shortuuid.uuid() | 1203 mb_data["id"] = shortuuid.uuid() |
921 if not mb_data.get("author_jid"): | 1204 if not mb_data.get("author_jid"): |
922 mb_data["author_jid"] = client.jid | 1205 mb_data["author_jid"] = client.jid.full() |
923 ap_account = await self.getAPAccountFromJidAndNode( | 1206 ap_account = await self.getAPAccountFromJidAndNode( |
924 jid.JID(mb_data["author_jid"]), | 1207 jid.JID(mb_data["author_jid"]), |
925 None | 1208 None |
926 ) | 1209 ) |
927 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) | 1210 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) |
965 | 1248 |
966 @param service: JID corresponding to the AP actor. | 1249 @param service: JID corresponding to the AP actor. |
967 """ | 1250 """ |
968 if not service.user: | 1251 if not service.user: |
969 raise ValueError("service must have a local part") | 1252 raise ValueError("service must have a local part") |
970 account = self.host.plugins["XEP-0106"].unescape(service.user) | 1253 account = self._e.unescape(service.user) |
971 ap_actor_data = await self.getAPActorDataFromId(account) | 1254 ap_actor_data = await self.getAPActorDataFromAccount(account) |
972 | 1255 |
973 try: | 1256 try: |
974 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] | 1257 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] |
975 except KeyError: | 1258 except KeyError: |
976 raise exceptions.DataError("Can't get ActivityPub actor inbox") | 1259 raise exceptions.DataError("Can't get ActivityPub actor inbox") |
978 item_data = await self.mbdata2APitem(client, mess_data) | 1261 item_data = await self.mbdata2APitem(client, mess_data) |
979 url_actor = item_data["object"]["attributedTo"] | 1262 url_actor = item_data["object"]["attributedTo"] |
980 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1263 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
981 if resp.code != 202: | 1264 if resp.code != 202: |
982 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | 1265 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |
1266 | |
1267 async def newAPItem( | |
1268 self, | |
1269 client: SatXMPPEntity, | |
1270 destinee: Optional[jid.JID], | |
1271 node: str, | |
1272 item: dict, | |
1273 ) -> None: | |
1274 """Analyse, cache and send notification for received AP item | |
1275 | |
1276 @param destinee: jid of the destinee, | |
1277 @param node: XMPP pubsub node | |
1278 @param item: AP object payload | |
1279 """ | |
1280 service = client.jid | |
1281 in_reply_to = item.get("inReplyTo") | |
1282 if in_reply_to and isinstance(in_reply_to, str): | |
1283 # this item is a reply, we use or create a corresponding node for comments | |
1284 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) | |
1285 node = parent_node or node | |
1286 cached_node = await self.host.memory.storage.getPubsubNode( | |
1287 client, service, node, with_subscriptions=True | |
1288 ) | |
1289 if cached_node is None: | |
1290 try: | |
1291 cached_node = await self.host.memory.storage.setPubsubNode( | |
1292 client, | |
1293 service, | |
1294 node, | |
1295 subscribed=True | |
1296 ) | |
1297 except IntegrityError as e: | |
1298 if "unique" in str(e.orig).lower(): | |
1299 # the node may already exist, if it has been created just after | |
1300 # getPubsubNode above | |
1301 log.debug("ignoring UNIQUE constraint error") | |
1302 cached_node = await self.host.memory.storage.getPubsubNode( | |
1303 client, service, node, with_subscriptions=True | |
1304 ) | |
1305 else: | |
1306 raise e | |
1307 | |
1308 else: | |
1309 # it is a root item (i.e. not a reply to an other item) | |
1310 cached_node = await self.host.memory.storage.getPubsubNode( | |
1311 client, service, node, with_subscriptions=True | |
1312 ) | |
1313 if cached_node is None: | |
1314 log.warning( | |
1315 f"Received item in unknown node {node!r} at {service}\n{item}" | |
1316 | |
1317 ) | |
1318 return | |
1319 mb_data, item_elt = await self.apItem2MbDataAndElt(item) | |
1320 await self.host.memory.storage.cachePubsubItems( | |
1321 client, | |
1322 cached_node, | |
1323 [item_elt], | |
1324 [mb_data] | |
1325 ) | |
1326 | |
1327 for subscription in cached_node.subscriptions: | |
1328 if subscription.state != SubscriptionState.SUBSCRIBED: | |
1329 continue | |
1330 self.pubsub_service.notifyPublish( | |
1331 service, | |
1332 node, | |
1333 [(subscription.subscriber, None, [item_elt])] | |
1334 ) |