comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents bf0505d41c09
children 125c7043b277
comparison
equal deleted inserted replaced
3744:658ddbabaf36 3745:a8c7e5cef0cb
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import base64 19 import base64
20 import calendar
20 import hashlib 21 import hashlib
21 import json 22 import json
22 from pathlib import Path 23 from pathlib import Path
23 from typing import Optional, Dict, Tuple, List, Union 24 from pprint import pformat
25 import re
26 from typing import Any, Dict, List, Optional, Tuple, Union, overload
24 from urllib import parse 27 from urllib import parse
25 import calendar 28
26 import re 29 from cryptography.exceptions import InvalidSignature
27
28 import dateutil
29 from cryptography.hazmat.primitives import serialization 30 from cryptography.hazmat.primitives import serialization
30 from cryptography.hazmat.primitives import hashes 31 from cryptography.hazmat.primitives import hashes
31 from cryptography.hazmat.primitives.asymmetric import rsa 32 from cryptography.hazmat.primitives.asymmetric import rsa
32 from cryptography.hazmat.primitives.asymmetric import padding 33 from cryptography.hazmat.primitives.asymmetric import padding
34 import dateutil
33 import shortuuid 35 import shortuuid
36 from sqlalchemy.exc import IntegrityError
34 import treq 37 import treq
35 from treq.response import _Response as TReqResponse 38 from treq.response import _Response as TReqResponse
36 from twisted.internet import defer, reactor, threads 39 from twisted.internet import defer, reactor, threads
37 from twisted.web import http 40 from twisted.web import http
38 from twisted.words.protocols.jabber import jid, error 41 from twisted.words.protocols.jabber import error, jid
39 from twisted.words.xish import domish 42 from twisted.words.xish import domish
40 from wokkel import rsm 43 from wokkel import rsm
41 44
42 from sat.core import exceptions 45 from sat.core import exceptions
43 from sat.core.constants import Const as C 46 from sat.core.constants import Const as C
44 from sat.core.core_types import SatXMPPEntity 47 from sat.core.core_types import SatXMPPEntity
45 from sat.core.i18n import _ 48 from sat.core.i18n import _
46 from sat.core.log import getLogger 49 from sat.core.log import getLogger
50 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
47 from sat.tools import utils 51 from sat.tools import utils
48 from sat.tools.common import data_format, tls 52 from sat.tools.common import data_format, tls, uri
49 from sat.tools.common.async_utils import async_lru 53 from sat.tools.common.async_utils import async_lru
50 54
51 from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, 55 from .constants import (
52 AP_MB_MAP, LRU_MAX_SIZE) 56 ACTIVITY_OBJECT_MANDATORY,
57 ACTIVITY_TARGET_MANDATORY,
58 ACTIVITY_TYPES,
59 ACTIVITY_TYPES_LOWER,
60 AP_MB_MAP,
61 COMMENTS_MAX_PARENTS,
62 CONF_SECTION,
63 IMPORT_NAME,
64 LRU_MAX_SIZE,
65 MEDIA_TYPE_AP,
66 TYPE_ACTOR,
67 TYPE_ITEM,
68 )
53 from .http_server import HTTPServer 69 from .http_server import HTTPServer
54 from .pubsub_service import APPubsubService 70 from .pubsub_service import APPubsubService
55 71
56 72
57 log = getLogger(__name__) 73 log = getLogger(__name__)
62 C.PI_NAME: "ActivityPub Gateway component", 78 C.PI_NAME: "ActivityPub Gateway component",
63 C.PI_IMPORT_NAME: IMPORT_NAME, 79 C.PI_IMPORT_NAME: IMPORT_NAME,
64 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 80 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
65 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
66 C.PI_PROTOCOLS: [], 82 C.PI_PROTOCOLS: [],
67 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], 83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"],
68 C.PI_RECOMMENDATIONS: [], 84 C.PI_RECOMMENDATIONS: [],
69 C.PI_MAIN: "APGateway", 85 C.PI_MAIN: "APGateway",
70 C.PI_HANDLER: C.BOOL_TRUE, 86 C.PI_HANDLER: C.BOOL_TRUE,
71 C.PI_DESCRIPTION: _( 87 C.PI_DESCRIPTION: _(
72 "Gateway for bidirectional communication between XMPP and ActivityPub." 88 "Gateway for bidirectional communication between XMPP and ActivityPub."
84 def __init__(self, host): 100 def __init__(self, host):
85 self.host = host 101 self.host = host
86 self.initialised = False 102 self.initialised = False
87 self._m = host.plugins["XEP-0277"] 103 self._m = host.plugins["XEP-0277"]
88 self._p = host.plugins["XEP-0060"] 104 self._p = host.plugins["XEP-0060"]
105 self._e = host.plugins["XEP-0106"]
106 self._c = host.plugins["PUBSUB_CACHE"]
107 self.pubsub_service = APPubsubService(self)
89 108
90 host.bridge.addMethod( 109 host.bridge.addMethod(
91 "APSend", 110 "APSend",
92 ".plugin", 111 ".plugin",
93 in_sign="sss", 112 in_sign="sss",
95 method=self._publishMessage, 114 method=self._publishMessage,
96 async_=True, 115 async_=True,
97 ) 116 )
98 117
99 def getHandler(self, __): 118 def getHandler(self, __):
100 return APPubsubService(self) 119 return self.pubsub_service
101 120
102 async def init(self, client): 121 async def init(self, client):
103 if self.initialised: 122 if self.initialised:
104 return 123 return
105 124
163 if connection_type not in ('http', 'https'): 182 if connection_type not in ('http', 'https'):
164 raise exceptions.ConfigError( 183 raise exceptions.ConfigError(
165 'bad ap-gateay http_connection_type, you must use one of "http" or ' 184 'bad ap-gateay http_connection_type, you must use one of "http" or '
166 '"https"' 185 '"https"'
167 ) 186 )
168 self.max_items = self.host.memory.getConfig( 187 self.max_items = int(self.host.memory.getConfig(
169 CONF_SECTION, 'new_node_max_items', 50 188 CONF_SECTION, 'new_node_max_items', 50
170 189
171 ) 190 ))
191 self.comments_max_depth = int(self.host.memory.getConfig(
192 CONF_SECTION, 'comments_max_depth', 0
193 ))
172 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') 194 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
173 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") 195 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
174 # True (default) if we provide gateway only to entities/services from our server 196 # True (default) if we provide gateway only to entities/services from our server
175 self.local_only = C.bool( 197 self.local_only = C.bool(
176 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) 198 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
188 reactor.listenSSL(self.http_port, self.server, context_factory) 210 reactor.listenSSL(self.http_port, self.server, context_factory)
189 211
190 async def profileConnecting(self, client): 212 async def profileConnecting(self, client):
191 self.client = client 213 self.client = client
192 await self.init(client) 214 await self.init(client)
215
216 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
217 """Get client for this component with a specified jid
218
219 This is needed to perform operations with the virtual JID corresponding to the AP
220 actor instead of the JID of the gateway itself.
221 @param actor_id: ID of the actor
222 @return: virtual client
223 """
224 account = await self.getAPAccountFromId(actor_id)
225 local_jid = self.getLocalJIDFromAccount(account)
226 return self.client.getVirtualClient(local_jid)
227
228 def isActivity(self, data: dict) -> bool:
229 """Return True if the data has an activity type"""
230 try:
231 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
232 except (KeyError, TypeError):
233 return False
193 234
194 async def apGet(self, url: str) -> dict: 235 async def apGet(self, url: str) -> dict:
195 """Retrieve AP JSON from given URL 236 """Retrieve AP JSON from given URL
196 237
197 @raise error.StanzaError: "service-unavailable" is sent when something went wrong 238 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
512 "This gateway is configured to map only local entities and services" 553 "This gateway is configured to map only local entities and services"
513 ) 554 )
514 555
515 return jid_, node 556 return jid_, node
516 557
517 def parseAPURL(self, url: str) -> Tuple[str, str]: 558 def getLocalJIDFromAccount(self, account: str) -> jid.JID:
559 """Compute JID linking to an AP account
560
561 The local jid is computer by escaping AP actor handle and using it as local part
562 of JID, where domain part is this gateway own JID
563 """
564 return jid.JID(
565 None,
566 (
567 self._e.escape(account),
568 self.client.jid.host,
569 None
570 )
571 )
572
573 def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
518 """Parse an URL leading to an AP endpoint 574 """Parse an URL leading to an AP endpoint
519 575
520 @param url: URL to parse (schema is not mandatory) 576 @param url: URL to parse (schema is not mandatory)
521 @return: endpoint type and AP account 577 @return: endpoint type and extra arguments
522 """ 578 """
523 path = parse.urlparse(url).path.lstrip("/") 579 path = parse.urlparse(url).path.lstrip("/")
524 type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) 580 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1)
525 return type_, parse.unquote(account) 581 return type_, [parse.unquote(a) for a in extra_args]
526 582
527 def buildAPURL(self, type_:str , *args: str) -> str: 583 def buildAPURL(self, type_:str , *args: str) -> str:
528 """Build an AP endpoint URL 584 """Build an AP endpoint URL
529 585
530 @param type_: type of AP endpoing 586 @param type_: type of AP endpoing
533 return parse.urljoin( 589 return parse.urljoin(
534 self.base_ap_url, 590 self.base_ap_url,
535 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) 591 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
536 ) 592 )
537 593
538 async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: 594 def buildSignatureHeader(self, values: Dict[str, str]) -> str:
539 """Sign a documentent and post it to AP server 595 """Build key="<value>" signature header from signature data"""
540 596 fields = []
541 @param url: AP server endpoint 597 for key, value in values.items():
542 @param url_actor: URL generated by this gateway for local actor 598 if key not in ("(created)", "(expired)"):
543 @param doc: document to send 599 if '"' in value:
544 """ 600 raise NotImplementedError(
545 p_url = parse.urlparse(url) 601 "string escaping is not implemented, double-quote can't be used "
546 date = http.datetimeToString().decode() 602 f"in {value!r}"
547 body = json.dumps(doc).encode() 603 )
548 digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() 604 value = f'"{value}"'
549 digest = f"sha-256={digest_hash}" 605 fields.append(f"{key}={value}")
550 to_sign = ( 606
551 f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" 607 return ",".join(fields)
552 f"date: {date}\ndigest: {digest}" 608
553 ) 609 def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
610 """Get digest data to use in header and signature
611
612 @param body: body of the request
613 @return: hash name and digest
614 """
615 if algo != "SHA-256":
616 raise NotImplementedError("only SHA-256 is implemented for now")
617 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
618
619 @async_lru(maxsize=LRU_MAX_SIZE)
620 async def getActorPubKeyData(
621 self,
622 actor_id: str
623 ) -> Tuple[str, str, rsa.RSAPublicKey]:
624 """Retrieve Public Key data from actor ID
625
626 @param actor_id: actor ID (url)
627 @return: key_id, owner and public_key
628 @raise KeyError: publicKey is missing from actor data
629 """
630 actor_data = await self.apGet(actor_id)
631 pub_key_data = actor_data["publicKey"]
632 key_id = pub_key_data["id"]
633 owner = pub_key_data["owner"]
634 pub_key_pem = pub_key_data["publicKeyPem"]
635 pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
636 return key_id, owner, pub_key
637
638 def createActivity(
639 self,
640 activity: str,
641 actor_id: str,
642 object_: Optional[Union[str, dict]] = None,
643 target: Optional[Union[str, dict]] = None,
644 **kwargs,
645 ) -> Dict[str, Any]:
646 """Generate base data for an activity
647
648 @param activity: one of ACTIVITY_TYPES
649 """
650 if activity not in ACTIVITY_TYPES:
651 raise exceptions.InternalError(f"invalid activity: {activity!r}")
652 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
653 raise exceptions.InternalError(
654 f'"object_" is mandatory for activity {activity!r}'
655 )
656 if target is None and activity in ACTIVITY_TARGET_MANDATORY:
657 raise exceptions.InternalError(
658 f'"target" is mandatory for activity {activity!r}'
659 )
660 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
661 data: Dict[str, Any] = {
662 "@context": "https://www.w3.org/ns/activitystreams",
663 "actor": actor_id,
664 "id": activity_id,
665 "type": activity,
666 }
667 data.update(kwargs)
668 if object_ is not None:
669 data["object"] = object_
670 if target is not None:
671 data["target"] = target
672
673 return data
674
675 def getKeyId(self, actor_id: str) -> str:
676 """Get local key ID from actor ID"""
677 return f"{actor_id}#main-key"
678
679 async def checkSignature(
680 self,
681 signature: str,
682 key_id: str,
683 headers: Dict[str, str]
684 ) -> str:
685 """Verify that signature matches given headers
686
687 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
688
689 @param signature: Base64 encoded signature
690 @param key_id: ID of the key used to sign the data
691 @param headers: headers and their values, including pseudo-headers
692 @return: id of the signing actor
693
694 @raise InvalidSignature: signature doesn't match headers
695 """
696 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
697 if key_id.startswith("acct:"):
698 actor = key_id[5:]
699 actor_id = await self.getAPActorIdFromAccount(actor)
700 else:
701 actor_id = key_id.split("#", 1)[0]
702
703 pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id)
704 if pub_key_id != key_id or pub_key_owner != actor_id:
705 raise exceptions.EncryptionError("Public Key mismatch")
706
707 try:
708 pub_key.verify(
709 base64.b64decode(signature),
710 to_sign.encode(),
711 # we have to use PKCS1v15 padding to be compatible with Mastodon
712 padding.PKCS1v15(), # type: ignore
713 hashes.SHA256() # type: ignore
714 )
715 except InvalidSignature:
716 raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)")
717
718 return actor_id
719
720 def getSignatureData(
721 self,
722 key_id: str,
723 headers: Dict[str, str]
724 ) -> Tuple[Dict[str, str], Dict[str, str]]:
725 """Generate and return signature and corresponding headers
726
727 @param parsed_url: URL where the request is sent/has been received
728 @param key_id: ID of the key (URL linking to the data with public key)
729 @param date: HTTP datetime string of signature generation
730 @param body: body of the HTTP request
731 @param headers: headers to sign and their value:
732 default value will be used if not specified
733
734 @return: headers and signature data
735 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
736 removed, and ``Signature`` added.
737 """
738 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
554 signature = base64.b64encode(self.private_key.sign( 739 signature = base64.b64encode(self.private_key.sign(
555 to_sign.encode(), 740 to_sign.encode(),
556 # we have to use PKCS1v15 padding to be compatible with Mastodon 741 # we have to use PKCS1v15 padding to be compatible with Mastodon
557 padding.PKCS1v15(), 742 padding.PKCS1v15(), # type: ignore
558 hashes.SHA256() 743 hashes.SHA256() # type: ignore
559 )).decode() 744 )).decode()
560 h_signature = ( 745 sign_data = {
561 f'keyId="{url_actor}",headers="(request-target) host date digest",' 746 "keyId": key_id,
562 f'signature="{signature}"' 747 "Algorithm": "rsa-sha256",
748 "headers": " ".join(headers.keys()),
749 "signature": signature
750 }
751 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
752 new_headers["Signature"] = self.buildSignatureHeader(sign_data)
753 return new_headers, sign_data
754
755 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
756 """Sign a documentent and post it to AP server
757
758 @param url: AP server endpoint
759 @param actor_id: originating actor ID (URL)
760 @param doc: document to send
761 """
762 p_url = parse.urlparse(url)
763 body = json.dumps(doc).encode()
764 digest_algo, digest_hash = self.getDigest(body)
765 digest = f"{digest_algo}={digest_hash}"
766
767 headers = {
768 "(request-target)": f"post {p_url.path}",
769 "Host": p_url.hostname,
770 "Date": http.datetimeToString().decode(),
771 "Digest": digest
772 }
773 headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers)
774
775 headers["Content-Type"] = (
776 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'
563 ) 777 )
564 return await treq.post( 778 resp = await treq.post(
565 url, 779 url,
566 body, 780 body,
567 headers={ 781 headers=headers,
568 "Host": [p_url.hostname],
569 "Date": [date],
570 "Digest": [digest],
571 "Signature": [h_signature],
572 }
573 ) 782 )
783 if resp.code >= 400:
784 text = await resp.text()
785 log.warning(f"POST request to {url} failed: {text}")
786 return resp
574 787
575 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): 788 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
576 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 789 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
577 service = jid.JID(service_s) 790 service = jid.JID(service_s)
578 client = self.host.getClient(profile) 791 client = self.host.getClient(profile)
579 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) 792 return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
580 793
794 @async_lru(maxsize=LRU_MAX_SIZE)
581 async def getAPActorIdFromAccount(self, account: str) -> str: 795 async def getAPActorIdFromAccount(self, account: str) -> str:
582 """Retrieve account ID from it's handle using WebFinger 796 """Retrieve account ID from it's handle using WebFinger
583 797
584 @param account: AP handle (user@domain.tld) 798 @param account: AP handle (user@domain.tld)
585 @return: Actor ID (which is an URL) 799 @return: Actor ID (which is an URL)
609 raise ValueError( 823 raise ValueError(
610 f"No ActivityPub link found for {account!r}" 824 f"No ActivityPub link found for {account!r}"
611 ) 825 )
612 return href 826 return href
613 827
614 async def getAPActorDataFromId(self, account: str) -> dict: 828 async def getAPActorDataFromAccount(self, account: str) -> dict:
615 """Retrieve ActivityPub Actor data 829 """Retrieve ActivityPub Actor data
616 830
617 @param account: ActivityPub Actor identifier 831 @param account: ActivityPub Actor identifier
618 """ 832 """
619 href = await self.getAPActorIdFromAccount(account) 833 href = await self.getAPActorIdFromAccount(account)
620 return await self.apGet(href) 834 return await self.apGet(href)
621 835
622 @async_lru(maxsize=LRU_MAX_SIZE) 836 @async_lru(maxsize=LRU_MAX_SIZE)
623 async def getAPAccountFromId(self, actor_id: str): 837 async def getAPInboxFromId(self, actor_id: str) -> str:
838 """Retrieve inbox of an actor_id"""
839 data = await self.apGet(actor_id)
840 return data["inbox"]
841
842 @async_lru(maxsize=LRU_MAX_SIZE)
843 async def getAPAccountFromId(self, actor_id: str) -> str:
624 """Retrieve AP account from the ID URL 844 """Retrieve AP account from the ID URL
625 845
626 @param actor_id: AP ID of the actor (URL to the actor data) 846 @param actor_id: AP ID of the actor (URL to the actor data)
627 """ 847 """
628 url_parsed = parse.urlparse(actor_id) 848 url_parsed = parse.urlparse(actor_id)
646 raise exceptions.DataError(msg) 866 raise exceptions.DataError(msg)
647 return account 867 return account
648 868
649 async def getAPItems( 869 async def getAPItems(
650 self, 870 self,
651 account: str, 871 collection: dict,
652 max_items: Optional[int] = None, 872 max_items: Optional[int] = None,
653 chronological_pagination: bool = True, 873 chronological_pagination: bool = True,
654 after_id: Optional[str] = None, 874 after_id: Optional[str] = None,
655 start_index: Optional[int] = None, 875 start_index: Optional[int] = None,
656 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: 876 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
673 Due to ActivityStream Collection Paging limitations, this is inefficient and 893 Due to ActivityStream Collection Paging limitations, this is inefficient and
674 all pages before the requested index will be retrieved to count items. 894 all pages before the requested index will be retrieved to count items.
675 @return: XMPP Pubsub items and corresponding RSM Response 895 @return: XMPP Pubsub items and corresponding RSM Response
676 Items are always returned in chronological order in the result 896 Items are always returned in chronological order in the result
677 """ 897 """
678 actor_data = await self.getAPActorDataFromId(account)
679 outbox = actor_data.get("outbox")
680 rsm_resp: Dict[str, Union[bool, int]] = {} 898 rsm_resp: Dict[str, Union[bool, int]] = {}
681 if not outbox:
682 raise exceptions.DataError(f"No outbox found for actor {account}")
683 outbox_data = await self.apGet(outbox)
684 try: 899 try:
685 count = outbox_data["totalItems"] 900 count = collection["totalItems"]
686 except KeyError: 901 except KeyError:
687 log.warning( 902 log.warning(
688 f'"totalItems" not found in outbox of {account}, defaulting to 20' 903 f'"totalItems" not found in collection {collection.get("id")}, '
904 "defaulting to 20"
689 ) 905 )
690 count = 20 906 count = 20
691 else: 907 else:
692 log.info(f"{account}'s outbox has {count} item(s)") 908 log.info(f"{collection.get('id')} has {count} item(s)")
909
693 rsm_resp["count"] = count 910 rsm_resp["count"] = count
694 911
695 if start_index is not None: 912 if start_index is not None:
696 assert chronological_pagination and after_id is None 913 assert chronological_pagination and after_id is None
697 if start_index >= count: 914 if start_index >= count:
708 else: 925 else:
709 # we'll convert "start_index" to "after_id", thus we need the item just 926 # we'll convert "start_index" to "after_id", thus we need the item just
710 # before "start_index" 927 # before "start_index"
711 previous_index = start_index - 1 928 previous_index = start_index - 1
712 retrieved_items = 0 929 retrieved_items = 0
713 current_page = outbox_data["last"] 930 current_page = collection["last"]
714 while retrieved_items < count: 931 while retrieved_items < count:
715 page_data, items = await self.parseAPPage(current_page) 932 page_data, items = await self.parseAPPage(current_page)
716 if not items: 933 if not items:
717 log.warning(f"found an empty AP page at {current_page}") 934 log.warning(f"found an empty AP page at {current_page}")
718 return [], rsm_resp 935 return [], rsm_resp
732 "Error while retrieving previous page from AP service at " 949 "Error while retrieving previous page from AP service at "
733 f"{current_page}" 950 f"{current_page}"
734 ) 951 )
735 952
736 init_page = "last" if chronological_pagination else "first" 953 init_page = "last" if chronological_pagination else "first"
737 page = outbox_data.get(init_page) 954 page = collection.get(init_page)
738 if not page: 955 if not page:
739 raise exceptions.DataError( 956 raise exceptions.DataError(
740 f"Initial page {init_page!r} not found for outbox {outbox}" 957 f"Initial page {init_page!r} not found for collection "
958 f"{collection.get('id')})"
741 ) 959 )
742 items = [] 960 items = []
743 page_items = [] 961 page_items = []
744 retrieved_items = 0 962 retrieved_items = 0
745 found_after_id = False 963 found_after_id = False
746 964
747 while retrieved_items < count: 965 while retrieved_items < count:
748 __, page_items = await self.parseAPPage(page) 966 __, page_items = await self.parseAPPage(page)
967 if not page_items:
968 break
749 retrieved_items += len(page_items) 969 retrieved_items += len(page_items)
750 if after_id is not None and not found_after_id: 970 if after_id is not None and not found_after_id:
751 # if we have an after_id, we ignore all items until the requested one is 971 # if we have an after_id, we ignore all items until the requested one is
752 # found 972 # found
753 try: 973 try:
754 limit_idx = [i["id"] for i in page_items].index(after_id) 974 limit_idx = [i["id"] for i in page_items].index(after_id)
755 except ValueError: 975 except ValueError:
756 # if "after_id" is not found, we don't add any item from this page 976 # if "after_id" is not found, we don't add any item from this page
757 log.debug(f"{after_id!r} not found at {page}, skipping") 977 page_id = page.get("id") if isinstance(page, dict) else page
978 log.debug(f"{after_id!r} not found at {page_id}, skipping")
758 else: 979 else:
759 found_after_id = True 980 found_after_id = True
760 if chronological_pagination: 981 if chronological_pagination:
761 start_index = retrieved_items - len(page_items) + limit_idx + 1 982 start_index = retrieved_items - len(page_items) + limit_idx + 1
762 page_items = page_items[limit_idx+1:] 983 page_items = page_items[limit_idx+1:]
771 if chronological_pagination: 992 if chronological_pagination:
772 items = items[:max_items] 993 items = items[:max_items]
773 else: 994 else:
774 items = items[-max_items:] 995 items = items[-max_items:]
775 break 996 break
776 page = outbox_data.get("prev" if chronological_pagination else "next") 997 page = collection.get("prev" if chronological_pagination else "next")
777 if not page: 998 if not page:
778 break 999 break
779 1000
780 if after_id is not None and not found_after_id: 1001 if after_id is not None and not found_after_id:
781 raise error.StanzaError("item-not-found") 1002 raise error.StanzaError("item-not-found")
782 1003
783 if after_id is None:
784 rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
785
786 if start_index is not None:
787 rsm_resp["index"] = start_index
788 elif after_id is not None:
789 log.warning("Can't determine index of first element")
790 elif chronological_pagination:
791 rsm_resp["index"] = 0
792 else:
793 rsm_resp["index"] = count - len(items)
794 if items: 1004 if items:
1005 if after_id is None:
1006 rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
1007 if start_index is not None:
1008 rsm_resp["index"] = start_index
1009 elif after_id is not None:
1010 log.warning("Can't determine index of first element")
1011 elif chronological_pagination:
1012 rsm_resp["index"] = 0
1013 else:
1014 rsm_resp["index"] = count - len(items)
795 rsm_resp.update({ 1015 rsm_resp.update({
796 "first": items[0]["id"], 1016 "first": items[0]["id"],
797 "last": items[-1]["id"] 1017 "last": items[-1]["id"]
798 }) 1018 })
799 1019
800 return items, rsm.RSMResponse(**rsm_resp) 1020 return items, rsm.RSMResponse(**rsm_resp)
801 1021
802 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: 1022 async def parseAPPage(
1023 self,
1024 page: Union[str, dict]
1025 ) -> Tuple[dict, List[domish.Element]]:
803 """Convert AP objects from an AP page to XMPP items 1026 """Convert AP objects from an AP page to XMPP items
804 1027
805 @param url: url linking and AP page 1028 @param page: Can be either url linking and AP page, or the page data directly
806 @return: page data, pubsub items 1029 @return: page data, pubsub items
807 """ 1030 """
808 page_data = await self.apGet(url) 1031 page_data = await self.apGetObject(page)
809 ap_items = page_data.get("orderedItems") 1032 if page_data is None:
810 if not ap_items: 1033 log.warning('No data found in collection')
811 log.warning('No "orderedItems" collection found') 1034 return {}, []
812 return page_data, [] 1035 ap_items = await self.apGetList(page_data, "orderedItems")
1036 if ap_items is None:
1037 ap_items = await self.apGetList(page_data, "items")
1038 if not ap_items:
1039 log.warning(f'No item field found in collection: {page_data!r}')
1040 return page_data, []
1041 else:
1042 log.warning(
1043 "Items are not ordered, this is not spec compliant"
1044 )
813 items = [] 1045 items = []
814 # AP Collections are in antichronological order, but we expect chronological in 1046 # AP Collections are in antichronological order, but we expect chronological in
815 # Pubsub, thus we reverse it 1047 # Pubsub, thus we reverse it
816 for ap_item in reversed(ap_items): 1048 for ap_item in reversed(ap_items):
817 try: 1049 try:
818 ap_object, mb_data = await self.apItem2MBdata(ap_item) 1050 items.append(await self.apItem2Elt(ap_item))
819 except (exceptions.DataError, NotImplementedError, error.StanzaError): 1051 except (exceptions.DataError, NotImplementedError, error.StanzaError):
820 continue 1052 continue
821 1053
822 item_elt = await self._m.data2entry(
823 self.client, mb_data, ap_object["id"], None, self._m.namespace
824 )
825 item_elt["publisher"] = mb_data["author_jid"].full()
826 items.append(item_elt)
827
828 return page_data, items 1054 return page_data, items
829 1055
830 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: 1056 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
831 """Convert AP item to microblog data 1057 """Convert AP item to parsed microblog data and corresponding item element"""
1058 mb_data = await self.apItem2MBdata(ap_item)
1059 item_elt = await self._m.data2entry(
1060 self.client, mb_data, mb_data["id"], None, self._m.namespace
1061 )
1062 item_elt["publisher"] = mb_data["author_jid"]
1063 return mb_data, item_elt
1064
1065 async def apItem2Elt(self, ap_item: dict) -> domish.Element:
1066 """Convert AP item to XMPP item element"""
1067 __, item_elt = await self.apItem2MbDataAndElt(ap_item)
1068 return item_elt
1069
1070 async def getCommentsNodes(
1071 self,
1072 item_id: str,
1073 parent_id: Optional[str]
1074 ) -> Tuple[Optional[str], Optional[str]]:
1075 """Get node where this item is and node to use for comments
1076
1077 if config option "comments_max_depth" is set, a common node will be used below the
1078 given depth
1079 @param item_id: ID of the reference item
1080 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
1081 @return: a tuple with parent_node_id, comments_node_id:
1082 - parent_node_id is the ID of the node where reference item must be. None is
1083 returned when the root node (i.e. not a comments node) must be used.
1084 - comments_node_id: is the ID of the node to use for comments. None is
1085 returned when no comment node must be used (happens when we have reached
1086 "comments_max_depth")
1087 """
1088 if parent_id is None or not self.comments_max_depth:
1089 return (
1090 self._m.getCommentsNode(parent_id) if parent_id is not None else None,
1091 self._m.getCommentsNode(item_id)
1092 )
1093 parent_url = parent_id
1094 parents = []
1095 for __ in range(COMMENTS_MAX_PARENTS):
1096 parent_item = await self.apGet(parent_url)
1097 parents.insert(0, parent_item)
1098 parent_url = parent_item.get("inReplyTo")
1099 if parent_url is None:
1100 break
1101 parent_limit = self.comments_max_depth-1
1102 if len(parents) <= parent_limit:
1103 return (
1104 self._m.getCommentsNode(parents[-1]["id"]),
1105 self._m.getCommentsNode(item_id)
1106 )
1107 else:
1108 last_level_item = parents[parent_limit]
1109 return (
1110 self._m.getCommentsNode(last_level_item["id"]),
1111 None
1112 )
1113
1114 async def apItem2MBdata(self, ap_item: dict) -> dict:
1115 """Convert AP activity or object to microblog data
832 1116
833 @return: AP Item's Object and microblog data 1117 @return: AP Item's Object and microblog data
834 @raise exceptions.DataError: something is invalid in the AP item 1118 @raise exceptions.DataError: something is invalid in the AP item
835 @raise NotImplemented: some AP data is not handled yet 1119 @raise NotImplemented: some AP data is not handled yet
836 @raise error.StanzaError: error while contacting the AP server 1120 @raise error.StanzaError: error while contacting the AP server
837 """ 1121 """
838 ap_object = ap_item.get("object") 1122 is_activity = self.isActivity(ap_item)
839 if not ap_object: 1123 if is_activity:
840 log.warning(f'No "object" found in AP item {ap_item!r}') 1124 ap_object = await self.apGetObject(ap_item, "object")
1125 if not ap_object:
1126 log.warning(f'No "object" found in AP item {ap_item!r}')
1127 raise exceptions.DataError
1128 else:
1129 ap_object = ap_item
1130 item_id = ap_object.get("id")
1131 if not item_id:
1132 log.warning(f'No "id" found in AP item: {ap_object!r}')
841 raise exceptions.DataError 1133 raise exceptions.DataError
842 if isinstance(ap_object, str): 1134 mb_data = {"id": item_id}
843 ap_object = await self.apGet(ap_object)
844 obj_id = ap_object.get("id")
845 if not obj_id:
846 log.warning(f'No "id" found in AP object: {ap_object!r}')
847 raise exceptions.DataError
848 if ap_object.get("inReplyTo") is not None:
849 raise NotImplementedError
850 mb_data = {}
851 for ap_key, mb_key in AP_MB_MAP.items(): 1135 for ap_key, mb_key in AP_MB_MAP.items():
852 data = ap_object.get(ap_key) 1136 data = ap_object.get(ap_key)
853 if data is None: 1137 if data is None:
854 continue 1138 continue
855 mb_data[mb_key] = data 1139 mb_data[mb_key] = data
866 else: 1150 else:
867 mb_data["language"] = language 1151 mb_data["language"] = language
868 mb_data["content_xhtml"] = content_xhtml 1152 mb_data["content_xhtml"] = content_xhtml
869 1153
870 # author 1154 # author
871 actor = ap_item.get("actor") 1155 if is_activity:
872 if not actor: 1156 authors = await self.apGetActors(ap_item, "actor")
873 log.warning(f"no actor associated to object id {obj_id!r}") 1157 else:
874 raise exceptions.DataError 1158 authors = await self.apGetActors(ap_object, "attributedTo")
875 elif isinstance(actor, list): 1159 if len(authors) > 1:
876 # we only keep first item of list as author 1160 # we only keep first item as author
877 # TODO: handle multiple actors 1161 # TODO: handle multiple actors
878 if len(actor) > 1: 1162 log.warning("multiple actors are not managed")
879 log.warning("multiple actors are not managed") 1163
880 actor = actor[0] 1164 account = authors[0]
881 1165 author_jid = self.getLocalJIDFromAccount(account).full()
882 if isinstance(actor, dict): 1166
883 actor = actor.get("id") 1167 mb_data["author"] = account.split("@", 1)[0]
884 if not actor: 1168 mb_data["author_jid"] = author_jid
885 log.warning(f"no actor id found: {actor!r}")
886 raise exceptions.DataError
887
888 if isinstance(actor, str):
889 account = await self.getAPAccountFromId(actor)
890 mb_data["author"] = account.split("@", 1)[0]
891 author_jid = mb_data["author_jid"] = jid.JID(
892 None,
893 (
894 self.host.plugins["XEP-0106"].escape(account),
895 self.client.jid.host,
896 None
897 )
898 )
899 else:
900 log.warning(f"unknown actor type found: {actor!r}")
901 raise exceptions.DataError
902 1169
903 # published/updated 1170 # published/updated
904 for field in ("published", "updated"): 1171 for field in ("published", "updated"):
905 value = ap_object.get(field) 1172 value = ap_object.get(field)
906 if not value and field == "updated": 1173 if not value and field == "updated":
910 mb_data[field] = calendar.timegm( 1177 mb_data[field] = calendar.timegm(
911 dateutil.parser.parse(str(value)).utctimetuple() 1178 dateutil.parser.parse(str(value)).utctimetuple()
912 ) 1179 )
913 except dateutil.parser.ParserError as e: 1180 except dateutil.parser.ParserError as e:
914 log.warning(f"Can't parse {field!r} field: {e}") 1181 log.warning(f"Can't parse {field!r} field: {e}")
915 return ap_object, mb_data 1182
1183 # comments
1184 in_reply_to = ap_object.get("inReplyTo")
1185 __, comments_node = await self.getCommentsNodes(item_id, in_reply_to)
1186 if comments_node is not None:
1187 comments_data = {
1188 "service": author_jid,
1189 "node": comments_node,
1190 "uri": uri.buildXMPPUri(
1191 "pubsub",
1192 path=author_jid,
1193 node=comments_node
1194 )
1195 }
1196 mb_data["comments"] = [comments_data]
1197
1198 return mb_data
916 1199
917 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: 1200 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
918 """Convert Libervia Microblog Data to ActivityPub item""" 1201 """Convert Libervia Microblog Data to ActivityPub item"""
919 if not mb_data.get("id"): 1202 if not mb_data.get("id"):
920 mb_data["id"] = shortuuid.uuid() 1203 mb_data["id"] = shortuuid.uuid()
921 if not mb_data.get("author_jid"): 1204 if not mb_data.get("author_jid"):
922 mb_data["author_jid"] = client.jid 1205 mb_data["author_jid"] = client.jid.full()
923 ap_account = await self.getAPAccountFromJidAndNode( 1206 ap_account = await self.getAPAccountFromJidAndNode(
924 jid.JID(mb_data["author_jid"]), 1207 jid.JID(mb_data["author_jid"]),
925 None 1208 None
926 ) 1209 )
927 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) 1210 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
965 1248
966 @param service: JID corresponding to the AP actor. 1249 @param service: JID corresponding to the AP actor.
967 """ 1250 """
968 if not service.user: 1251 if not service.user:
969 raise ValueError("service must have a local part") 1252 raise ValueError("service must have a local part")
970 account = self.host.plugins["XEP-0106"].unescape(service.user) 1253 account = self._e.unescape(service.user)
971 ap_actor_data = await self.getAPActorDataFromId(account) 1254 ap_actor_data = await self.getAPActorDataFromAccount(account)
972 1255
973 try: 1256 try:
974 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] 1257 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
975 except KeyError: 1258 except KeyError:
976 raise exceptions.DataError("Can't get ActivityPub actor inbox") 1259 raise exceptions.DataError("Can't get ActivityPub actor inbox")
978 item_data = await self.mbdata2APitem(client, mess_data) 1261 item_data = await self.mbdata2APitem(client, mess_data)
979 url_actor = item_data["object"]["attributedTo"] 1262 url_actor = item_data["object"]["attributedTo"]
980 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1263 resp = await self.signAndPost(inbox_url, url_actor, item_data)
981 if resp.code != 202: 1264 if resp.code != 202:
982 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") 1265 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1266
1267 async def newAPItem(
1268 self,
1269 client: SatXMPPEntity,
1270 destinee: Optional[jid.JID],
1271 node: str,
1272 item: dict,
1273 ) -> None:
1274 """Analyse, cache and send notification for received AP item
1275
1276 @param destinee: jid of the destinee,
1277 @param node: XMPP pubsub node
1278 @param item: AP object payload
1279 """
1280 service = client.jid
1281 in_reply_to = item.get("inReplyTo")
1282 if in_reply_to and isinstance(in_reply_to, str):
1283 # this item is a reply, we use or create a corresponding node for comments
1284 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
1285 node = parent_node or node
1286 cached_node = await self.host.memory.storage.getPubsubNode(
1287 client, service, node, with_subscriptions=True
1288 )
1289 if cached_node is None:
1290 try:
1291 cached_node = await self.host.memory.storage.setPubsubNode(
1292 client,
1293 service,
1294 node,
1295 subscribed=True
1296 )
1297 except IntegrityError as e:
1298 if "unique" in str(e.orig).lower():
1299 # the node may already exist, if it has been created just after
1300 # getPubsubNode above
1301 log.debug("ignoring UNIQUE constraint error")
1302 cached_node = await self.host.memory.storage.getPubsubNode(
1303 client, service, node, with_subscriptions=True
1304 )
1305 else:
1306 raise e
1307
1308 else:
1309 # it is a root item (i.e. not a reply to an other item)
1310 cached_node = await self.host.memory.storage.getPubsubNode(
1311 client, service, node, with_subscriptions=True
1312 )
1313 if cached_node is None:
1314 log.warning(
1315 f"Received item in unknown node {node!r} at {service}\n{item}"
1316
1317 )
1318 return
1319 mb_data, item_elt = await self.apItem2MbDataAndElt(item)
1320 await self.host.memory.storage.cachePubsubItems(
1321 client,
1322 cached_node,
1323 [item_elt],
1324 [mb_data]
1325 )
1326
1327 for subscription in cached_node.subscriptions:
1328 if subscription.state != SubscriptionState.SUBSCRIBED:
1329 continue
1330 self.pubsub_service.notifyPublish(
1331 service,
1332 node,
1333 [(subscription.subscriber, None, [item_elt])]
1334 )