Mercurial > libervia-backend
changeset 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 658ddbabaf36 |
children | fa3dc4ed7906 |
files | sat/core/exceptions.py sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/http_server.py sat/plugins/plugin_comp_ap_gateway/pubsub_service.py |
diffstat | 5 files changed, 1146 insertions(+), 189 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/exceptions.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/core/exceptions.py Tue Mar 22 17:00:42 2022 +0100 @@ -123,6 +123,11 @@ pass +class EncryptionError(Exception): + """Invalid encryption""" + pass + + # Something which need to be done is not available yet class NotReady(Exception): pass
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue Mar 22 17:00:42 2022 +0100 @@ -17,25 +17,28 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 +import calendar import hashlib import json from pathlib import Path -from typing import Optional, Dict, Tuple, List, Union +from pprint import pformat +import re +from typing import Any, Dict, List, Optional, Tuple, Union, overload from urllib import parse -import calendar -import re -import dateutil +from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding +import dateutil import shortuuid +from sqlalchemy.exc import IntegrityError import treq from treq.response import _Response as TReqResponse from twisted.internet import defer, reactor, threads from twisted.web import http -from twisted.words.protocols.jabber import jid, error +from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish from wokkel import rsm @@ -44,12 +47,25 @@ from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger +from sat.memory.sqla_mapping import PubsubSub, SubscriptionState from sat.tools import utils -from sat.tools.common import data_format, tls +from sat.tools.common import data_format, tls, uri from sat.tools.common.async_utils import async_lru -from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, - AP_MB_MAP, LRU_MAX_SIZE) +from .constants import ( + ACTIVITY_OBJECT_MANDATORY, + ACTIVITY_TARGET_MANDATORY, + ACTIVITY_TYPES, + ACTIVITY_TYPES_LOWER, + AP_MB_MAP, + COMMENTS_MAX_PARENTS, + CONF_SECTION, + IMPORT_NAME, + LRU_MAX_SIZE, + MEDIA_TYPE_AP, + TYPE_ACTOR, + TYPE_ITEM, +) from .http_server import HTTPServer from .pubsub_service import APPubsubService @@ -64,7 +80,7 @@ C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, @@ -86,6 +102,9 @@ self.initialised = False self._m = host.plugins["XEP-0277"] self._p = host.plugins["XEP-0060"] + self._e = host.plugins["XEP-0106"] + self._c = host.plugins["PUBSUB_CACHE"] + self.pubsub_service = APPubsubService(self) host.bridge.addMethod( "APSend", @@ -97,7 +116,7 @@ ) def getHandler(self, __): - return APPubsubService(self) + return self.pubsub_service async def init(self, client): if self.initialised: @@ -165,10 +184,13 @@ 'bad ap-gateay http_connection_type, you must use one of "http" or ' '"https"' ) - self.max_items = self.host.memory.getConfig( + self.max_items = int(self.host.memory.getConfig( CONF_SECTION, 'new_node_max_items', 50 - ) + )) + self.comments_max_depth = int(self.host.memory.getConfig( + CONF_SECTION, 'comments_max_depth', 0 + )) self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") # True (default) if we provide gateway only to entities/services from our server @@ -191,6 +213,25 @@ self.client = client await self.init(client) + async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: + """Get client for this component with a specified jid + + This is needed to perform operations with the virtual JID corresponding to the AP + actor instead of the JID of the gateway itself. + @param actor_id: ID of the actor + @return: virtual client + """ + account = await self.getAPAccountFromId(actor_id) + local_jid = self.getLocalJIDFromAccount(account) + return self.client.getVirtualClient(local_jid) + + def isActivity(self, data: dict) -> bool: + """Return True if the data has an activity type""" + try: + return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER + except (KeyError, TypeError): + return False + async def apGet(self, url: str) -> dict: """Retrieve AP JSON from given URL @@ -514,15 +555,30 @@ return jid_, node - def parseAPURL(self, url: str) -> Tuple[str, str]: + def getLocalJIDFromAccount(self, account: str) -> jid.JID: + """Compute JID linking to an AP account + + The local jid is computer by escaping AP actor handle and using it as local part + of JID, where domain part is this gateway own JID + """ + return jid.JID( + None, + ( + self._e.escape(account), + self.client.jid.host, + None + ) + ) + + def parseAPURL(self, url: str) -> Tuple[str, List[str]]: """Parse an URL leading to an AP endpoint @param url: URL to parse (schema is not mandatory) - @return: endpoint type and AP account + @return: endpoint type and extra arguments """ path = parse.urlparse(url).path.lstrip("/") - type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) - return type_, parse.unquote(account) + type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1) + return type_, [parse.unquote(a) for a in extra_args] def buildAPURL(self, type_:str , *args: str) -> str: """Build an AP endpoint URL @@ -535,42 +591,199 @@ str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) ) - async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: + def buildSignatureHeader(self, values: Dict[str, str]) -> str: + """Build key="<value>" signature header from signature data""" + fields = [] + for key, value in values.items(): + if key not in ("(created)", "(expired)"): + if '"' in value: + raise NotImplementedError( + "string escaping is not implemented, double-quote can't be used " + f"in {value!r}" + ) + value = f'"{value}"' + fields.append(f"{key}={value}") + + return ",".join(fields) + + def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: + """Get digest data to use in header and signature + + @param body: body of the request + @return: hash name and digest + """ + if algo != "SHA-256": + raise NotImplementedError("only SHA-256 is implemented for now") + return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getActorPubKeyData( + self, + actor_id: str + ) -> Tuple[str, str, rsa.RSAPublicKey]: + """Retrieve Public Key data from actor ID + + @param actor_id: actor ID (url) + @return: key_id, owner and public_key + @raise KeyError: publicKey is missing from actor data + """ + actor_data = await self.apGet(actor_id) + pub_key_data = actor_data["publicKey"] + key_id = pub_key_data["id"] + owner = pub_key_data["owner"] + pub_key_pem = pub_key_data["publicKeyPem"] + pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) + return key_id, owner, pub_key + + def createActivity( + self, + activity: str, + actor_id: str, + object_: Optional[Union[str, dict]] = None, + target: Optional[Union[str, dict]] = None, + **kwargs, + ) -> Dict[str, Any]: + """Generate base data for an activity + + @param activity: one of ACTIVITY_TYPES + """ + if activity not in ACTIVITY_TYPES: + raise exceptions.InternalError(f"invalid activity: {activity!r}") + if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: + raise exceptions.InternalError( + f'"object_" is mandatory for activity {activity!r}' + ) + if target is None and activity in ACTIVITY_TARGET_MANDATORY: + raise exceptions.InternalError( + f'"target" is mandatory for activity {activity!r}' + ) + activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" + data: Dict[str, Any] = { + "@context": "https://www.w3.org/ns/activitystreams", + "actor": actor_id, + "id": activity_id, + "type": activity, + } + data.update(kwargs) + if object_ is not None: + data["object"] = object_ + if target is not None: + data["target"] = target + + return data + + def getKeyId(self, actor_id: str) -> str: + """Get local key ID from actor ID""" + return f"{actor_id}#main-key" + + async def checkSignature( + self, + signature: str, + key_id: str, + headers: Dict[str, str] + ) -> str: + """Verify that signature matches given headers + + see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 + + @param signature: Base64 encoded signature + @param key_id: ID of the key used to sign the data + @param headers: headers and their values, including pseudo-headers + @return: id of the signing actor + + @raise InvalidSignature: signature doesn't match headers + """ + to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + if key_id.startswith("acct:"): + actor = key_id[5:] + actor_id = await self.getAPActorIdFromAccount(actor) + else: + actor_id = key_id.split("#", 1)[0] + + pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id) + if pub_key_id != key_id or pub_key_owner != actor_id: + raise exceptions.EncryptionError("Public Key mismatch") + + try: + pub_key.verify( + base64.b64decode(signature), + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + ) + except InvalidSignature: + raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)") + + return actor_id + + def getSignatureData( + self, + key_id: str, + headers: Dict[str, str] + ) -> Tuple[Dict[str, str], Dict[str, str]]: + """Generate and return signature and corresponding headers + + @param parsed_url: URL where the request is sent/has been received + @param key_id: ID of the key (URL linking to the data with public key) + @param date: HTTP datetime string of signature generation + @param body: body of the HTTP request + @param headers: headers to sign and their value: + default value will be used if not specified + + @return: headers and signature data + ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers + removed, and ``Signature`` added. + """ + to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + )).decode() + sign_data = { + "keyId": key_id, + "Algorithm": "rsa-sha256", + "headers": " ".join(headers.keys()), + "signature": signature + } + new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} + new_headers["Signature"] = self.buildSignatureHeader(sign_data) + return new_headers, sign_data + + async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: """Sign a documentent and post it to AP server @param url: AP server endpoint - @param url_actor: URL generated by this gateway for local actor + @param actor_id: originating actor ID (URL) @param doc: document to send """ p_url = parse.urlparse(url) - date = http.datetimeToString().decode() body = json.dumps(doc).encode() - digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() - digest = f"sha-256={digest_hash}" - to_sign = ( - f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" - f"date: {date}\ndigest: {digest}" + digest_algo, digest_hash = self.getDigest(body) + digest = f"{digest_algo}={digest_hash}" + + headers = { + "(request-target)": f"post {p_url.path}", + "Host": p_url.hostname, + "Date": http.datetimeToString().decode(), + "Digest": digest + } + headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers) + + headers["Content-Type"] = ( + 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' ) - signature = base64.b64encode(self.private_key.sign( - to_sign.encode(), - # we have to use PKCS1v15 padding to be compatible with Mastodon - padding.PKCS1v15(), - hashes.SHA256() - )).decode() - h_signature = ( - f'keyId="{url_actor}",headers="(request-target) host date digest",' - f'signature="{signature}"' - ) - return await treq.post( + resp = await treq.post( url, body, - headers={ - "Host": [p_url.hostname], - "Date": [date], - "Digest": [digest], - "Signature": [h_signature], - } + headers=headers, ) + if resp.code >= 400: + text = await resp.text() + log.warning(f"POST request to {url} failed: {text}") + return resp def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore @@ -578,6 +791,7 @@ client = self.host.getClient(profile) return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) + @async_lru(maxsize=LRU_MAX_SIZE) async def getAPActorIdFromAccount(self, account: str) -> str: """Retrieve account ID from it's handle using WebFinger @@ -611,7 +825,7 @@ ) return href - async def getAPActorDataFromId(self, account: str) -> dict: + async def getAPActorDataFromAccount(self, account: str) -> dict: """Retrieve ActivityPub Actor data @param account: ActivityPub Actor identifier @@ -620,7 +834,13 @@ return await self.apGet(href) @async_lru(maxsize=LRU_MAX_SIZE) - async def getAPAccountFromId(self, actor_id: str): + async def getAPInboxFromId(self, actor_id: str) -> str: + """Retrieve inbox of an actor_id""" + data = await self.apGet(actor_id) + return data["inbox"] + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getAPAccountFromId(self, actor_id: str) -> str: """Retrieve AP account from the ID URL @param actor_id: AP ID of the actor (URL to the actor data) @@ -648,7 +868,7 @@ async def getAPItems( self, - account: str, + collection: dict, max_items: Optional[int] = None, chronological_pagination: bool = True, after_id: Optional[str] = None, @@ -675,21 +895,18 @@ @return: XMPP Pubsub items and corresponding RSM Response Items are always returned in chronological order in the result """ - actor_data = await self.getAPActorDataFromId(account) - outbox = actor_data.get("outbox") rsm_resp: Dict[str, Union[bool, int]] = {} - if not outbox: - raise exceptions.DataError(f"No outbox found for actor {account}") - outbox_data = await self.apGet(outbox) try: - count = outbox_data["totalItems"] + count = collection["totalItems"] except KeyError: log.warning( - f'"totalItems" not found in outbox of {account}, defaulting to 20' + f'"totalItems" not found in collection {collection.get("id")}, ' + "defaulting to 20" ) count = 20 else: - log.info(f"{account}'s outbox has {count} item(s)") + log.info(f"{collection.get('id')} has {count} item(s)") + rsm_resp["count"] = count if start_index is not None: @@ -710,7 +927,7 @@ # before "start_index" previous_index = start_index - 1 retrieved_items = 0 - current_page = outbox_data["last"] + current_page = collection["last"] while retrieved_items < count: page_data, items = await self.parseAPPage(current_page) if not items: @@ -734,10 +951,11 @@ ) init_page = "last" if chronological_pagination else "first" - page = outbox_data.get(init_page) + page = collection.get(init_page) if not page: raise exceptions.DataError( - f"Initial page {init_page!r} not found for outbox {outbox}" + f"Initial page {init_page!r} not found for collection " + f"{collection.get('id')})" ) items = [] page_items = [] @@ -746,6 +964,8 @@ while retrieved_items < count: __, page_items = await self.parseAPPage(page) + if not page_items: + break retrieved_items += len(page_items) if after_id is not None and not found_after_id: # if we have an after_id, we ignore all items until the requested one is @@ -754,7 +974,8 @@ limit_idx = [i["id"] for i in page_items].index(after_id) except ValueError: # if "after_id" is not found, we don't add any item from this page - log.debug(f"{after_id!r} not found at {page}, skipping") + page_id = page.get("id") if isinstance(page, dict) else page + log.debug(f"{after_id!r} not found at {page_id}, skipping") else: found_after_id = True if chronological_pagination: @@ -773,25 +994,24 @@ else: items = items[-max_items:] break - page = outbox_data.get("prev" if chronological_pagination else "next") + page = collection.get("prev" if chronological_pagination else "next") if not page: break if after_id is not None and not found_after_id: raise error.StanzaError("item-not-found") - if after_id is None: - rsm_resp["index"] = 0 if chronological_pagination else count - len(items) - - if start_index is not None: - rsm_resp["index"] = start_index - elif after_id is not None: - log.warning("Can't determine index of first element") - elif chronological_pagination: - rsm_resp["index"] = 0 - else: - rsm_resp["index"] = count - len(items) if items: + if after_id is None: + rsm_resp["index"] = 0 if chronological_pagination else count - len(items) + if start_index is not None: + rsm_resp["index"] = start_index + elif after_id is not None: + log.warning("Can't determine index of first element") + elif chronological_pagination: + rsm_resp["index"] = 0 + else: + rsm_resp["index"] = count - len(items) rsm_resp.update({ "first": items[0]["id"], "last": items[-1]["id"] @@ -799,55 +1019,119 @@ return items, rsm.RSMResponse(**rsm_resp) - async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: + async def parseAPPage( + self, + page: Union[str, dict] + ) -> Tuple[dict, List[domish.Element]]: """Convert AP objects from an AP page to XMPP items - @param url: url linking and AP page + @param page: Can be either url linking and AP page, or the page data directly @return: page data, pubsub items """ - page_data = await self.apGet(url) - ap_items = page_data.get("orderedItems") - if not ap_items: - log.warning('No "orderedItems" collection found') - return page_data, [] + page_data = await self.apGetObject(page) + if page_data is None: + log.warning('No data found in collection') + return {}, [] + ap_items = await self.apGetList(page_data, "orderedItems") + if ap_items is None: + ap_items = await self.apGetList(page_data, "items") + if not ap_items: + log.warning(f'No item field found in collection: {page_data!r}') + return page_data, [] + else: + log.warning( + "Items are not ordered, this is not spec compliant" + ) items = [] # AP Collections are in antichronological order, but we expect chronological in # Pubsub, thus we reverse it for ap_item in reversed(ap_items): try: - ap_object, mb_data = await self.apItem2MBdata(ap_item) + items.append(await self.apItem2Elt(ap_item)) except (exceptions.DataError, NotImplementedError, error.StanzaError): continue - item_elt = await self._m.data2entry( - self.client, mb_data, ap_object["id"], None, self._m.namespace - ) - item_elt["publisher"] = mb_data["author_jid"].full() - items.append(item_elt) - return page_data, items - async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: - """Convert AP item to microblog data + async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: + """Convert AP item to parsed microblog data and corresponding item element""" + mb_data = await self.apItem2MBdata(ap_item) + item_elt = await self._m.data2entry( + self.client, mb_data, mb_data["id"], None, self._m.namespace + ) + item_elt["publisher"] = mb_data["author_jid"] + return mb_data, item_elt + + async def apItem2Elt(self, ap_item: dict) -> domish.Element: + """Convert AP item to XMPP item element""" + __, item_elt = await self.apItem2MbDataAndElt(ap_item) + return item_elt + + async def getCommentsNodes( + self, + item_id: str, + parent_id: Optional[str] + ) -> Tuple[Optional[str], Optional[str]]: + """Get node where this item is and node to use for comments + + if config option "comments_max_depth" is set, a common node will be used below the + given depth + @param item_id: ID of the reference item + @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") + @return: a tuple with parent_node_id, comments_node_id: + - parent_node_id is the ID of the node where reference item must be. None is + returned when the root node (i.e. not a comments node) must be used. + - comments_node_id: is the ID of the node to use for comments. None is + returned when no comment node must be used (happens when we have reached + "comments_max_depth") + """ + if parent_id is None or not self.comments_max_depth: + return ( + self._m.getCommentsNode(parent_id) if parent_id is not None else None, + self._m.getCommentsNode(item_id) + ) + parent_url = parent_id + parents = [] + for __ in range(COMMENTS_MAX_PARENTS): + parent_item = await self.apGet(parent_url) + parents.insert(0, parent_item) + parent_url = parent_item.get("inReplyTo") + if parent_url is None: + break + parent_limit = self.comments_max_depth-1 + if len(parents) <= parent_limit: + return ( + self._m.getCommentsNode(parents[-1]["id"]), + self._m.getCommentsNode(item_id) + ) + else: + last_level_item = parents[parent_limit] + return ( + self._m.getCommentsNode(last_level_item["id"]), + None + ) + + async def apItem2MBdata(self, ap_item: dict) -> dict: + """Convert AP activity or object to microblog data @return: AP Item's Object and microblog data @raise exceptions.DataError: something is invalid in the AP item @raise NotImplemented: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ - ap_object = ap_item.get("object") - if not ap_object: - log.warning(f'No "object" found in AP item {ap_item!r}') + is_activity = self.isActivity(ap_item) + if is_activity: + ap_object = await self.apGetObject(ap_item, "object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + else: + ap_object = ap_item + item_id = ap_object.get("id") + if not item_id: + log.warning(f'No "id" found in AP item: {ap_object!r}') raise exceptions.DataError - if isinstance(ap_object, str): - ap_object = await self.apGet(ap_object) - obj_id = ap_object.get("id") - if not obj_id: - log.warning(f'No "id" found in AP object: {ap_object!r}') - raise exceptions.DataError - if ap_object.get("inReplyTo") is not None: - raise NotImplementedError - mb_data = {} + mb_data = {"id": item_id} for ap_key, mb_key in AP_MB_MAP.items(): data = ap_object.get(ap_key) if data is None: @@ -868,37 +1152,20 @@ mb_data["content_xhtml"] = content_xhtml # author - actor = ap_item.get("actor") - if not actor: - log.warning(f"no actor associated to object id {obj_id!r}") - raise exceptions.DataError - elif isinstance(actor, list): - # we only keep first item of list as author + if is_activity: + authors = await self.apGetActors(ap_item, "actor") + else: + authors = await self.apGetActors(ap_object, "attributedTo") + if len(authors) > 1: + # we only keep first item as author # TODO: handle multiple actors - if len(actor) > 1: - log.warning("multiple actors are not managed") - actor = actor[0] + log.warning("multiple actors are not managed") - if isinstance(actor, dict): - actor = actor.get("id") - if not actor: - log.warning(f"no actor id found: {actor!r}") - raise exceptions.DataError + account = authors[0] + author_jid = self.getLocalJIDFromAccount(account).full() - if isinstance(actor, str): - account = await self.getAPAccountFromId(actor) - mb_data["author"] = account.split("@", 1)[0] - author_jid = mb_data["author_jid"] = jid.JID( - None, - ( - self.host.plugins["XEP-0106"].escape(account), - self.client.jid.host, - None - ) - ) - else: - log.warning(f"unknown actor type found: {actor!r}") - raise exceptions.DataError + mb_data["author"] = account.split("@", 1)[0] + mb_data["author_jid"] = author_jid # published/updated for field in ("published", "updated"): @@ -912,14 +1179,30 @@ ) except dateutil.parser.ParserError as e: log.warning(f"Can't parse {field!r} field: {e}") - return ap_object, mb_data + + # comments + in_reply_to = ap_object.get("inReplyTo") + __, comments_node = await self.getCommentsNodes(item_id, in_reply_to) + if comments_node is not None: + comments_data = { + "service": author_jid, + "node": comments_node, + "uri": uri.buildXMPPUri( + "pubsub", + path=author_jid, + node=comments_node + ) + } + mb_data["comments"] = [comments_data] + + return mb_data async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: """Convert Libervia Microblog Data to ActivityPub item""" if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): - mb_data["author_jid"] = client.jid + mb_data["author_jid"] = client.jid.full() ap_account = await self.getAPAccountFromJidAndNode( jid.JID(mb_data["author_jid"]), None @@ -967,8 +1250,8 @@ """ if not service.user: raise ValueError("service must have a local part") - account = self.host.plugins["XEP-0106"].unescape(service.user) - ap_actor_data = await self.getAPActorDataFromId(account) + account = self._e.unescape(service.user) + ap_actor_data = await self.getAPActorDataFromAccount(account) try: inbox_url = ap_actor_data["endpoints"]["sharedInbox"] @@ -980,3 +1263,72 @@ resp = await self.signAndPost(inbox_url, url_actor, item_data) if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}") + + async def newAPItem( + self, + client: SatXMPPEntity, + destinee: Optional[jid.JID], + node: str, + item: dict, + ) -> None: + """Analyse, cache and send notification for received AP item + + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + """ + service = client.jid + in_reply_to = item.get("inReplyTo") + if in_reply_to and isinstance(in_reply_to, str): + # this item is a reply, we use or create a corresponding node for comments + parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) + node = parent_node or node + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + if cached_node is None: + try: + cached_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + subscribed=True + ) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + # the node may already exist, if it has been created just after + # getPubsubNode above + log.debug("ignoring UNIQUE constraint error") + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + else: + raise e + + else: + # it is a root item (i.e. not a reply to an other item) + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + if cached_node is None: + log.warning( + f"Received item in unknown node {node!r} at {service}\n{item}" + + ) + return + mb_data, item_elt = await self.apItem2MbDataAndElt(item) + await self.host.memory.storage.cachePubsubItems( + client, + cached_node, + [item_elt], + [mb_data] + ) + + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.pubsub_service.notifyPublish( + service, + node, + [(subscription.subscriber, None, [item_elt])] + )
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Tue Mar 22 17:00:42 2022 +0100 @@ -22,6 +22,7 @@ CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" TYPE_ACTOR = "actor" TYPE_INBOX = "inbox" +TYPE_SHARED_INBOX = "shared_inbox" TYPE_OUTBOX = "outbox" TYPE_ITEM = "item" MEDIA_TYPE_AP = "application/activity+json" @@ -30,7 +31,39 @@ "content": "content_xhtml", } -AP_REQUEST_TYPES = {"actor", "outbox"} +AP_REQUEST_TYPES = { + "GET": {"actor", "outbox"}, + "POST": {"inbox"}, +} +# headers to check for signature +SIGN_HEADERS = { + # headers needed for all HTTP methods + None: [ + # tuples are equivalent headers/pseudo headers, one of them must be present + ("date", "(created)"), + ("digest", "(request-target)"), + ], + b"GET": ["host"], + b"POST": ["digest"] +} PAGE_SIZE = 10 +HS2019 = "hs2019" +# delay after which a signed request is not accepted anymore +SIGN_EXP = 12*60*60 # 12 hours (same value as for Mastodon) LRU_MAX_SIZE = 200 +ACTIVITY_TYPES = ( + "Accept", "Add", "Announce", "Arrive", "Block", "Create", "Delete", "Dislike", "Flag", + "Follow", "Ignore", "Invite", "Join", "Leave", "Like", "Listen", "Move", "Offer", + "Question", "Reject", "Read", "Remove", "TentativeReject", "TentativeAccept", + "Travel", "Undo", "Update", "View" +) +ACTIVITY_TYPES_LOWER = [a.lower() for a in ACTIVITY_TYPES] +ACTIVITY_OBJECT_MANDATORY = ( + "Create", "Update", "Delete", "Follow", "Add", "Remove", "Like", "Block", "Undo" +) +ACTIVITY_TARGET_MANDATORY = ("Add", "Remove") +# activities which can be used with Shared Inbox (i.e. with not account specified) +ACTIVIY_NO_ACCOUNT_ALLOWED = ("create",) +# maximum number of parents to retrieve when comments_max_depth option is set +COMMENTS_MAX_PARENTS = 100
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Tue Mar 22 17:00:42 2022 +0100 @@ -16,14 +16,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import time from typing import Optional, Dict, List import json from urllib import parse -import re +from collections import deque import unicodedata +from pprint import pformat from twisted.web import http, resource as web_resource, server -from twisted.internet import defer +from twisted.internet import reactor, defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm @@ -31,9 +33,15 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger +from sat.tools.common import date_utils +from sat.memory.sqla_mapping import SubscriptionState -from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, - AP_REQUEST_TYPES, PAGE_SIZE) +from .constants import ( + CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, + AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, + SIGN_HEADERS, HS2019, SIGN_EXP +) +from .regex import RE_SIG_PARAM log = getLogger(__name__) @@ -50,8 +58,19 @@ def __init__(self, ap_gateway): self.apg = ap_gateway + self._seen_digest = deque(maxlen=50) super().__init__() + def responseCode( + self, + request: "HTTPRequest", + http_code: int, + msg: Optional[str] = None + ) -> None: + """Log and set HTTP return code and associated message""" + log.warning(msg) + request.setResponseCode(http_code, None if msg is None else msg.encode()) + async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) @@ -78,15 +97,127 @@ request.write(json.dumps(resp).encode()) request.finish() + async def handleFollowActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + try: + subscription = await self.apg._p.subscribe( + client, + account_jid, + node + ) + except pubsub.SubscriptionPending: + log.info(f"subscription to node {node!r} of {account_jid} is pending") + # TODO: manage SubscriptionUnconfigured + else: + if subscription.state != "subscribed": + # other states should raise an Exception + raise exceptions.InternalError('"subscribed" state was expected') + inbox = await self.apg.getAPInboxFromId(signing_actor) + actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) + accept_data = self.apg.createActivity( + "Accept", actor_id, object_=data + ) + await self.apg.signAndPost(inbox, actor_id, accept_data) + + async def handleAcceptActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + type_ = obj.get("type") + if type_ == "Follow": + follow_node = await self.apg.host.memory.storage.getPubsubNode( + client, client.jid, node, with_subscriptions=True + ) + if follow_node is None: + log.warning( + f"Received a follow accept on an unknown node: {node!r} at " + f"{client.jid}. Ignoring it" + ) + continue + try: + sub = next( + s for s in follow_node.subscriptions if s.subscriber==account_jid + ) + except StopIteration: + log.warning( + "Received a follow accept on a node without subscription: " + f"{node!r} at {client.jid}. Ignoring it" + ) + else: + if sub.state == SubscriptionState.SUBSCRIBED: + log.warning(f"Already subscribed to {node!r} at {client.jid}") + elif sub.state == SubscriptionState.PENDING: + follow_node.subscribed = True + sub.state = SubscriptionState.SUBSCRIBED + await self.apg.host.memory.storage.add(follow_node) + else: + raise exceptions.InternalError( + f"Unhandled subscription state {sub.state!r}" + ) + else: + log.warning(f"Unmanaged accept type: {type_!r}") + + async def handleCreateActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + digest = request.getHeader("digest") + if digest in self._seen_digest: + log.debug(f"Ignoring duplicated request (digest: {digest!r})") + return + self._seen_digest.append(digest) + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + sender = await self.apg.apGetSenderActor(obj) + if sender != signing_actor: + log.warning( + "Ignoring object not attributed to signing actor: {obj}" + ) + else: + await self.apg.newAPItem(client, account_jid, node, obj) + async def APActorRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, - actor_url: str + actor_url: str, + signing_actor: Optional[str] ) -> dict: inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) + shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve @@ -107,7 +238,10 @@ "id": f"{actor_url}#main-key", "owner": actor_url, "publicKeyPem": self.apg.public_key_pem - } + }, + "endpoints": { + "sharedInbox": shared_inbox + }, } def getCanonicalURL(self, request: "HTTPRequest") -> str: @@ -206,7 +340,8 @@ account_jid: jid.JID, node: Optional[str], ap_account: str, - ap_url: str + ap_url: str, + signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace @@ -229,7 +364,8 @@ service=account_jid, node=node, max_items=0, - rsm_request=rsm.RSMRequest(max_=0) + rsm_request=rsm.RSMRequest(max_=0), + extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") @@ -255,22 +391,268 @@ "last": url_last_page, } - async def APRequest(self, request): + async def APInboxRequest( + self, + request: "HTTPRequest", + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> None: + if signing_actor is None: + raise exceptions.InternalError("signing_actor must be set for inbox requests") + if node is None: + node = self.apg._m.namespace + try: + data = json.load(request.content) + if not isinstance(data, dict): + raise ValueError("data should be an object") + except (json.JSONDecodeError, ValueError) as e: + return self.responseCode( + request, + http.BAD_REQUEST, + f"invalid json in inbox request: {e}" + ) + await self.checkSigningActor(data, signing_actor) + activity_type = (data.get("type") or "").lower() + if not activity_type in ACTIVITY_TYPES_LOWER: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"request is not an activity, ignoring" + ) + + if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()!r} activity must target an account" + ) + + try: + method = getattr(self, f"handle{activity_type.title()}Activity") + except AttributeError: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()} activity is not yet supported" + ) + else: + await method( + request, data, account_jid, node, ap_account, ap_url, signing_actor + ) + + async def APRequest( + self, + request: "HTTPRequest", + signing_actor: Optional[str] = None + ) -> None: path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) - request_type, ap_account = self.apg.parseAPURL(ap_url) - account_jid, node = await self.apg.getJIDAndNode(ap_account) - if request_type not in AP_REQUEST_TYPES: - raise exceptions.DataError(f"Invalid request type: {request_type!r}") - method = getattr(self, f"AP{request_type.title()}Request") - ret_data = await method(request, account_jid, node, ap_account, ap_url) - request.setHeader("content-type", CONTENT_TYPE_AP) - request.write(json.dumps(ret_data).encode()) + request_type, extra_args = self.apg.parseAPURL(ap_url) + if len(extra_args) == 0: + if request_type != "shared_inbox": + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + ret_data = await self.APInboxRequest( + request, None, None, None, ap_url, signing_actor + ) + else: + if len(extra_args) > 1: + log.warning(f"unexpected extra arguments: {extra_args!r}") + ap_account = extra_args[0] + account_jid, node = await self.apg.getJIDAndNode(ap_account) + if request_type not in AP_REQUEST_TYPES.get( + request.method.decode().upper(), [] + ): + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + method = getattr(self, f"AP{request_type.title()}Request") + ret_data = await method( + request, account_jid, node, ap_account, ap_url, signing_actor + ) + if ret_data is not None: + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(ret_data).encode()) request.finish() + async def APPostRequest(self, request: "HTTPRequest"): + try: + signing_actor = await self.checkSignature(request) + except exceptions.EncryptionError as e: + self.responseCode( + request, + http.FORBIDDEN, + f"invalid signature: {e}" + ) + request.finish() + return + + return await self.APRequest(request, signing_actor) + + async def checkSigningActor(self, data: dict, signing_actor: str) -> None: + """That that signing actor correspond to actor declared in data + + @param data: request payload + @param signing_actor: actor ID of the signing entity, as returned by + checkSignature + @raise exceptions.NotFound: no actor found in data + @raise exceptions.EncryptionError: signing actor doesn't match actor in data + """ + actor = await self.apg.apGetSenderActor(data) + + if signing_actor != actor: + raise exceptions.EncryptionError( + f"signing actor ({signing_actor}) doesn't match actor in data ({actor})" + ) + + async def checkSignature(self, request: "HTTPRequest") -> str: + """Check and validate HTTP signature + + @return: id of the signing actor + + @raise exceptions.EncryptionError: signature is not present or doesn't match + """ + signature = request.getHeader("Signature") + if signature is None: + raise exceptions.EncryptionError("No signature found") + sign_data = { + m["key"]: m["uq_value"] or m["quoted_value"][1:-1] + for m in RE_SIG_PARAM.finditer(signature) + } + try: + key_id = sign_data["keyId"] + except KeyError: + raise exceptions.EncryptionError('"keyId" is missing from signature') + algorithm = sign_data.get("algorithm", HS2019) + signed_headers = sign_data.get( + "headers", + "(created)" if algorithm==HS2019 else "date" + ).lower().split() + try: + headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method] + except KeyError: + raise exceptions.InternalError( + f"there should be a list of headers for {request.method} method" + ) + if not headers_to_check: + raise exceptions.InternalError("headers_to_check must not be empty") + + for header in headers_to_check: + if isinstance(header, tuple): + if len(set(header).intersection(signed_headers)) == 0: + raise exceptions.EncryptionError( + f"at least one of following header must be signed: {header}" + ) + elif header not in signed_headers: + raise exceptions.EncryptionError( + f"the {header!r} header must be signed" + ) + + body = request.content.read() + request.content.seek(0) + headers = {} + for to_sign in signed_headers: + if to_sign == "(request-target)": + method = request.method.decode().lower() + uri = parse.unquote(request.uri.decode()) + headers[to_sign] = f"{method} /{uri.lstrip('/')}" + elif to_sign in ("(created)", "(expires)"): + if algorithm != HS2019: + raise exceptions.EncryptionError( + f"{to_sign!r} pseudo-header can only be used with {HS2019} " + "algorithm" + ) + key = to_sign[1:-1] + value = sign_data.get(key) + if not value: + raise exceptions.EncryptionError( + "{key!r} parameter is missing from signature" + ) + try: + if float(value) < 0: + raise ValueError + except ValueError: + raise exceptions.EncryptionError( + f"{to_sign} must be a Unix timestamp" + ) + headers[to_sign] = value + else: + value = request.getHeader(to_sign) + if not value: + raise exceptions.EncryptionError( + f"value of header {to_sign!r} is missing!" + ) + elif to_sign == "host": + # we check Forwarded/X-Forwarded-Host headers + # as we need original host if a proxy has modified the header + forwarded = request.getHeader("forwarded") + if forwarded is not None: + try: + host = [ + f[5:] for f in forwarded.split(";") + if f.startswith("host=") + ][0] or None + except IndexError: + host = None + else: + host = None + if host is None: + host = request.getHeader("x-forwarded-host") + if host: + value = host + elif to_sign == "digest": + hashes = { + algo.lower(): hash_ for algo, hash_ in ( + digest.split("=", 1) for digest in value.split(",") + ) + } + try: + given_digest = hashes["sha-256"] + except KeyError: + raise exceptions.EncryptionError( + "Only SHA-256 algorithm is currently supported for digest" + ) + __, computed_digest = self.apg.getDigest(body) + if given_digest != computed_digest: + raise exceptions.EncryptionError( + f"SHA-256 given and computed digest differ:\n" + f"given: {given_digest!r}\ncomputed: {computed_digest!r}" + ) + headers[to_sign] = value + + # date check + limit_ts = time.time() + SIGN_EXP + if "(created)" in headers: + created = float(headers["created"]) + else: + created = date_utils.date_parse(headers["date"]) + + + try: + expires = float(headers["expires"]) + except KeyError: + pass + else: + if expires < created: + log.warning( + f"(expires) [{expires}] set in the past of (created) [{created}] " + "ignoring it according to specs" + ) + else: + limit_ts = min(limit_ts, expires) + + if created > limit_ts: + raise exceptions.EncryptionError("Signature has expired") + + return await self.apg.checkSignature( + sign_data["signature"], + key_id, + headers + ) + def render(self, request): request.setHeader("server", VERSION) return super().render(request) @@ -286,6 +668,13 @@ return web_resource.NoResource().render(request) + def render_POST(self, request): + path = request.path.decode().lstrip("/") + if not path.startswith(self.apg.ap_path): + return web_resource.NoResource().render(request) + defer.ensureDeferred(self.APPostRequest(request)) + return server.NOT_DONE_YET + class HTTPRequest(server.Request): pass
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Tue Mar 22 17:00:42 2022 +0100 @@ -16,19 +16,41 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional, List +from typing import Optional, Tuple, List, Dict, Any +from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish -from wokkel import rsm +from wokkel import rsm, pubsub, data_form from sat.core.i18n import _ +from sat.core import exceptions from sat.core.log import getLogger +from sat.core.constants import Const as C from sat.tools.utils import ensure_deferred +from sat.memory.sqla_mapping import PubsubSub, SubscriptionState + +from .constants import ( + TYPE_ACTOR, +) log = getLogger(__name__) +# all nodes have the same config +NODE_CONFIG = [ + {"var": "pubsub#persist_items", "type": "boolean", "value": True}, + {"var": "pubsub#max_items", "value": "max"}, + {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, + {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, + +] + +NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} +NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} +for c in NODE_CONFIG: + NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) + class APPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" @@ -43,6 +65,31 @@ "name": "Libervia ActivityPub Gateway", } + async def getAPActorIdsAndInbox( + self, + requestor: jid.JID, + recipient: jid.JID, + ) -> Tuple[str, str, str]: + """Get AP actor IDs from requestor and destinee JIDs + + @param requestor: XMPP entity doing a request to an AP actor via the gateway + @param recipient: JID mapping an AP actor via the gateway + @return: requestor actor ID, recipient actor ID and recipient inbox + @raise error.StanzaError: "item-not-found" is raised if not user part is specified + in requestor + """ + if not recipient.user: + raise error.StanzaError( + "item-not-found", + text="No user part specified" + ) + requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) + recipient_account = self.apg._e.unescape(recipient.user) + recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) + inbox = await self.apg.getAPInboxFromId(recipient_actor_id) + return requestor_actor_id, recipient_actor_id, inbox + + @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): raise NotImplementedError @@ -56,55 +103,193 @@ maxItems: Optional[int], itemIdentifiers: Optional[List[str]], rsm_req: Optional[rsm.RSMRequest] - ) -> List[domish.Element]: + ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: if not service.user: - return [] + return [], None ap_account = self.host.plugins["XEP-0106"].unescape(service.user) if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") - return [] - if node != self.apg._m.namespace: + return [], None + if not node.startswith(self.apg._m.namespace): raise error.StanzaError( "feature-not-implemented", - text=f"{VERSION} only supports {self.apg._m.namespace} " + text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " "node for now" ) - if rsm_req is None: - if maxItems is None: - maxItems = 20 - kwargs = { - "max_items": maxItems, - "chronological_pagination": False, - } + client = self.apg.client + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + # TODO: check if node is synchronised + if cached_node is not None: + # the node is cached, we return items from cache + log.debug(f"node {node!r} from {service} is in cache") + pubsub_items, metadata = await self.apg._c.getItemsFromCache( + client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req + ) + try: + rsm_resp = rsm.RSMResponse(**metadata["rsm"]) + except KeyError: + rsm_resp = None + return [i.data for i in pubsub_items], rsm_resp + + if itemIdentifiers: + items = [] + for item_id in itemIdentifiers: + item_data = await self.apg.apGet(item_id) + item_elt = await self.apg.apItem2Elt(item_data) + items.append(item_elt) + return items, None else: - if len( - [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) - if v is not None] - ) > 1: + if rsm_req is None: + if maxItems is None: + maxItems = 20 + kwargs = { + "max_items": maxItems, + "chronological_pagination": False, + } + else: + if len( + [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) + if v is not None] + ) > 1: + raise error.StanzaError( + "bad-request", + text="You can't use after, before and index at the same time" + ) + kwargs = {"max_items": rsm_req.max} + if rsm_req.after is not None: + kwargs["after_id"] = rsm_req.after + elif rsm_req.before is not None: + kwargs["chronological_pagination"] = False + if rsm_req.before != "": + kwargs["after_id"] = rsm_req.before + elif rsm_req.index is not None: + kwargs["start_index"] = rsm_req.index + + log.info( + f"No cache found for node {node} at {service} (AP account {ap_account}), " + "using Collection Paging to RSM translation" + ) + if self.apg._m.isCommentsNode(node): + parent_node = self.apg._m.getParentNode(node) + try: + parent_data = await self.apg.apGet(parent_node) + collection = await self.apg.apGetObject( + parent_data.get("object", {}), + "replies" + ) + except Exception as e: + raise error.StanzaError( + "item-not-found", + text=e + ) + else: + actor_data = await self.apg.getAPActorDataFromAccount(ap_account) + collection = await self.apg.apGetObject(actor_data, "outbox") + if not collection: raise error.StanzaError( - "bad-request", - text="You can't use after, before and index at the same time" + "item-not-found", + text=f"No collection found for node {node!r} (account: {ap_account})" ) - kwargs = {"max_items": rsm_req.max} - if rsm_req.after is not None: - kwargs["after_id"] = rsm_req.after - elif rsm_req.before is not None: - kwargs["chronological_pagination"] = False - if rsm_req.before != "": - kwargs["after_id"] = rsm_req.before - elif rsm_req.index is not None: - kwargs["start_index"] = rsm_req.index - - log.info( - f"No cache found for node {node} at {service} (AP account {ap_account}), " - "using Collection Paging to RSM translation" - ) - return await self.apg.getAPItems(ap_account, **kwargs) + return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise NotImplementedError + @ensure_deferred + async def subscribe(self, requestor, service, nodeIdentifier, subscriber): + # TODO: handle comments nodes + client = self.apg.client + node = await self.host.memory.storage.getPubsubNode( + client, service, nodeIdentifier, with_subscriptions=True + ) + if node is None: + node = await self.host.memory.storage.setPubsubNode( + client, + service, + nodeIdentifier, + ) + subscription = None + else: + try: + subscription = next( + s for s in node.subscriptions + if s.subscriber == requestor.userhostJID() + ) + except StopIteration: + subscription = None + + if subscription is None: + subscription = PubsubSub( + subscriber=requestor.userhostJID(), + state=SubscriptionState.PENDING + ) + node.subscriptions.append(subscription) + await self.host.memory.storage.add(node) + else: + if subscription.state is None: + subscription.state = SubscriptionState.PENDING + await self.host.memory.storage.add(node) + elif subscription.state == SubscriptionState.SUBSCRIBED: + log.info( + f"{requestor.userhostJID()} has already a subscription to {node!r} " + f"at {service}. Doing the request anyway." + ) + elif subscription.state == SubscriptionState.PENDING: + log.info( + f"{requestor.userhostJID()} has already a pending subscription to " + f"{node!r} at {service}. Doing the request anyway." + ) + else: + raise exceptions.InternalError( + f"unmanaged subscription state: {subscription.state}" + ) + + req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( + requestor, service + ) + + data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) + + resp = await self.apg.signAndPost(inbox, req_actor_id, data) + if resp.code >= 400: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") + + @ensure_deferred + async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( + requestor, service + ) + data = self.apg.createActivity( + "Undo", + req_actor_id, + self.apg.createActivity( + "Follow", + req_actor_id, + recip_actor_id + ) + ) + + resp = await self.apg.signAndPost(inbox, req_actor_id, data) + if resp.code >= 400: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + + def getConfigurationOptions(self): + return NODE_OPTIONS + + def getConfiguration( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str + ) -> defer.Deferred: + return defer.succeed(NODE_CONFIG_VALUES) + def getNodeInfo( self, requestor: jid.JID, @@ -117,13 +302,6 @@ return None info = { "type": "leaf", - "meta-data": [ - {"var": "pubsub#persist_items", "type": "boolean", "value": True}, - {"var": "pubsub#max_items", "value": "max"}, - {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, - {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, - - ] - + "meta-data": NODE_CONFIG } return info