Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | bf0505d41c09 |
children | 125c7043b277 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue Mar 22 17:00:42 2022 +0100 @@ -17,25 +17,28 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 +import calendar import hashlib import json from pathlib import Path -from typing import Optional, Dict, Tuple, List, Union +from pprint import pformat +import re +from typing import Any, Dict, List, Optional, Tuple, Union, overload from urllib import parse -import calendar -import re -import dateutil +from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding +import dateutil import shortuuid +from sqlalchemy.exc import IntegrityError import treq from treq.response import _Response as TReqResponse from twisted.internet import defer, reactor, threads from twisted.web import http -from twisted.words.protocols.jabber import jid, error +from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish from wokkel import rsm @@ -44,12 +47,25 @@ from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger +from sat.memory.sqla_mapping import PubsubSub, SubscriptionState from sat.tools import utils -from sat.tools.common import data_format, tls +from sat.tools.common import data_format, tls, uri from sat.tools.common.async_utils import async_lru -from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, - AP_MB_MAP, LRU_MAX_SIZE) +from .constants import ( + ACTIVITY_OBJECT_MANDATORY, + ACTIVITY_TARGET_MANDATORY, + ACTIVITY_TYPES, + ACTIVITY_TYPES_LOWER, + AP_MB_MAP, + COMMENTS_MAX_PARENTS, + CONF_SECTION, + IMPORT_NAME, + LRU_MAX_SIZE, + MEDIA_TYPE_AP, + TYPE_ACTOR, + TYPE_ITEM, +) from .http_server import HTTPServer from .pubsub_service import APPubsubService @@ -64,7 +80,7 @@ C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, @@ -86,6 +102,9 @@ self.initialised = False self._m = host.plugins["XEP-0277"] self._p = host.plugins["XEP-0060"] + self._e = host.plugins["XEP-0106"] + self._c = host.plugins["PUBSUB_CACHE"] + self.pubsub_service = APPubsubService(self) host.bridge.addMethod( "APSend", @@ -97,7 +116,7 @@ ) def getHandler(self, __): - return APPubsubService(self) + return self.pubsub_service async def init(self, client): if self.initialised: @@ -165,10 +184,13 @@ 'bad ap-gateay http_connection_type, you must use one of "http" or ' '"https"' ) - self.max_items = self.host.memory.getConfig( + self.max_items = int(self.host.memory.getConfig( CONF_SECTION, 'new_node_max_items', 50 - ) + )) + self.comments_max_depth = int(self.host.memory.getConfig( + CONF_SECTION, 'comments_max_depth', 0 + )) self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") # True (default) if we provide gateway only to entities/services from our server @@ -191,6 +213,25 @@ self.client = client await self.init(client) + async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: + """Get client for this component with a specified jid + + This is needed to perform operations with the virtual JID corresponding to the AP + actor instead of the JID of the gateway itself. + @param actor_id: ID of the actor + @return: virtual client + """ + account = await self.getAPAccountFromId(actor_id) + local_jid = self.getLocalJIDFromAccount(account) + return self.client.getVirtualClient(local_jid) + + def isActivity(self, data: dict) -> bool: + """Return True if the data has an activity type""" + try: + return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER + except (KeyError, TypeError): + return False + async def apGet(self, url: str) -> dict: """Retrieve AP JSON from given URL @@ -514,15 +555,30 @@ return jid_, node - def parseAPURL(self, url: str) -> Tuple[str, str]: + def getLocalJIDFromAccount(self, account: str) -> jid.JID: + """Compute JID linking to an AP account + + The local jid is computer by escaping AP actor handle and using it as local part + of JID, where domain part is this gateway own JID + """ + return jid.JID( + None, + ( + self._e.escape(account), + self.client.jid.host, + None + ) + ) + + def parseAPURL(self, url: str) -> Tuple[str, List[str]]: """Parse an URL leading to an AP endpoint @param url: URL to parse (schema is not mandatory) - @return: endpoint type and AP account + @return: endpoint type and extra arguments """ path = parse.urlparse(url).path.lstrip("/") - type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) - return type_, parse.unquote(account) + type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1) + return type_, [parse.unquote(a) for a in extra_args] def buildAPURL(self, type_:str , *args: str) -> str: """Build an AP endpoint URL @@ -535,42 +591,199 @@ str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) ) - async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: + def buildSignatureHeader(self, values: Dict[str, str]) -> str: + """Build key="<value>" signature header from signature data""" + fields = [] + for key, value in values.items(): + if key not in ("(created)", "(expired)"): + if '"' in value: + raise NotImplementedError( + "string escaping is not implemented, double-quote can't be used " + f"in {value!r}" + ) + value = f'"{value}"' + fields.append(f"{key}={value}") + + return ",".join(fields) + + def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: + """Get digest data to use in header and signature + + @param body: body of the request + @return: hash name and digest + """ + if algo != "SHA-256": + raise NotImplementedError("only SHA-256 is implemented for now") + return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getActorPubKeyData( + self, + actor_id: str + ) -> Tuple[str, str, rsa.RSAPublicKey]: + """Retrieve Public Key data from actor ID + + @param actor_id: actor ID (url) + @return: key_id, owner and public_key + @raise KeyError: publicKey is missing from actor data + """ + actor_data = await self.apGet(actor_id) + pub_key_data = actor_data["publicKey"] + key_id = pub_key_data["id"] + owner = pub_key_data["owner"] + pub_key_pem = pub_key_data["publicKeyPem"] + pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) + return key_id, owner, pub_key + + def createActivity( + self, + activity: str, + actor_id: str, + object_: Optional[Union[str, dict]] = None, + target: Optional[Union[str, dict]] = None, + **kwargs, + ) -> Dict[str, Any]: + """Generate base data for an activity + + @param activity: one of ACTIVITY_TYPES + """ + if activity not in ACTIVITY_TYPES: + raise exceptions.InternalError(f"invalid activity: {activity!r}") + if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: + raise exceptions.InternalError( + f'"object_" is mandatory for activity {activity!r}' + ) + if target is None and activity in ACTIVITY_TARGET_MANDATORY: + raise exceptions.InternalError( + f'"target" is mandatory for activity {activity!r}' + ) + activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" + data: Dict[str, Any] = { + "@context": "https://www.w3.org/ns/activitystreams", + "actor": actor_id, + "id": activity_id, + "type": activity, + } + data.update(kwargs) + if object_ is not None: + data["object"] = object_ + if target is not None: + data["target"] = target + + return data + + def getKeyId(self, actor_id: str) -> str: + """Get local key ID from actor ID""" + return f"{actor_id}#main-key" + + async def checkSignature( + self, + signature: str, + key_id: str, + headers: Dict[str, str] + ) -> str: + """Verify that signature matches given headers + + see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 + + @param signature: Base64 encoded signature + @param key_id: ID of the key used to sign the data + @param headers: headers and their values, including pseudo-headers + @return: id of the signing actor + + @raise InvalidSignature: signature doesn't match headers + """ + to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + if key_id.startswith("acct:"): + actor = key_id[5:] + actor_id = await self.getAPActorIdFromAccount(actor) + else: + actor_id = key_id.split("#", 1)[0] + + pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id) + if pub_key_id != key_id or pub_key_owner != actor_id: + raise exceptions.EncryptionError("Public Key mismatch") + + try: + pub_key.verify( + base64.b64decode(signature), + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + ) + except InvalidSignature: + raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)") + + return actor_id + + def getSignatureData( + self, + key_id: str, + headers: Dict[str, str] + ) -> Tuple[Dict[str, str], Dict[str, str]]: + """Generate and return signature and corresponding headers + + @param parsed_url: URL where the request is sent/has been received + @param key_id: ID of the key (URL linking to the data with public key) + @param date: HTTP datetime string of signature generation + @param body: body of the HTTP request + @param headers: headers to sign and their value: + default value will be used if not specified + + @return: headers and signature data + ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers + removed, and ``Signature`` added. + """ + to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + )).decode() + sign_data = { + "keyId": key_id, + "Algorithm": "rsa-sha256", + "headers": " ".join(headers.keys()), + "signature": signature + } + new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} + new_headers["Signature"] = self.buildSignatureHeader(sign_data) + return new_headers, sign_data + + async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: """Sign a documentent and post it to AP server @param url: AP server endpoint - @param url_actor: URL generated by this gateway for local actor + @param actor_id: originating actor ID (URL) @param doc: document to send """ p_url = parse.urlparse(url) - date = http.datetimeToString().decode() body = json.dumps(doc).encode() - digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() - digest = f"sha-256={digest_hash}" - to_sign = ( - f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" - f"date: {date}\ndigest: {digest}" + digest_algo, digest_hash = self.getDigest(body) + digest = f"{digest_algo}={digest_hash}" + + headers = { + "(request-target)": f"post {p_url.path}", + "Host": p_url.hostname, + "Date": http.datetimeToString().decode(), + "Digest": digest + } + headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers) + + headers["Content-Type"] = ( + 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' ) - signature = base64.b64encode(self.private_key.sign( - to_sign.encode(), - # we have to use PKCS1v15 padding to be compatible with Mastodon - padding.PKCS1v15(), - hashes.SHA256() - )).decode() - h_signature = ( - f'keyId="{url_actor}",headers="(request-target) host date digest",' - f'signature="{signature}"' - ) - return await treq.post( + resp = await treq.post( url, body, - headers={ - "Host": [p_url.hostname], - "Date": [date], - "Digest": [digest], - "Signature": [h_signature], - } + headers=headers, ) + if resp.code >= 400: + text = await resp.text() + log.warning(f"POST request to {url} failed: {text}") + return resp def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore @@ -578,6 +791,7 @@ client = self.host.getClient(profile) return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) + @async_lru(maxsize=LRU_MAX_SIZE) async def getAPActorIdFromAccount(self, account: str) -> str: """Retrieve account ID from it's handle using WebFinger @@ -611,7 +825,7 @@ ) return href - async def getAPActorDataFromId(self, account: str) -> dict: + async def getAPActorDataFromAccount(self, account: str) -> dict: """Retrieve ActivityPub Actor data @param account: ActivityPub Actor identifier @@ -620,7 +834,13 @@ return await self.apGet(href) @async_lru(maxsize=LRU_MAX_SIZE) - async def getAPAccountFromId(self, actor_id: str): + async def getAPInboxFromId(self, actor_id: str) -> str: + """Retrieve inbox of an actor_id""" + data = await self.apGet(actor_id) + return data["inbox"] + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getAPAccountFromId(self, actor_id: str) -> str: """Retrieve AP account from the ID URL @param actor_id: AP ID of the actor (URL to the actor data) @@ -648,7 +868,7 @@ async def getAPItems( self, - account: str, + collection: dict, max_items: Optional[int] = None, chronological_pagination: bool = True, after_id: Optional[str] = None, @@ -675,21 +895,18 @@ @return: XMPP Pubsub items and corresponding RSM Response Items are always returned in chronological order in the result """ - actor_data = await self.getAPActorDataFromId(account) - outbox = actor_data.get("outbox") rsm_resp: Dict[str, Union[bool, int]] = {} - if not outbox: - raise exceptions.DataError(f"No outbox found for actor {account}") - outbox_data = await self.apGet(outbox) try: - count = outbox_data["totalItems"] + count = collection["totalItems"] except KeyError: log.warning( - f'"totalItems" not found in outbox of {account}, defaulting to 20' + f'"totalItems" not found in collection {collection.get("id")}, ' + "defaulting to 20" ) count = 20 else: - log.info(f"{account}'s outbox has {count} item(s)") + log.info(f"{collection.get('id')} has {count} item(s)") + rsm_resp["count"] = count if start_index is not None: @@ -710,7 +927,7 @@ # before "start_index" previous_index = start_index - 1 retrieved_items = 0 - current_page = outbox_data["last"] + current_page = collection["last"] while retrieved_items < count: page_data, items = await self.parseAPPage(current_page) if not items: @@ -734,10 +951,11 @@ ) init_page = "last" if chronological_pagination else "first" - page = outbox_data.get(init_page) + page = collection.get(init_page) if not page: raise exceptions.DataError( - f"Initial page {init_page!r} not found for outbox {outbox}" + f"Initial page {init_page!r} not found for collection " + f"{collection.get('id')})" ) items = [] page_items = [] @@ -746,6 +964,8 @@ while retrieved_items < count: __, page_items = await self.parseAPPage(page) + if not page_items: + break retrieved_items += len(page_items) if after_id is not None and not found_after_id: # if we have an after_id, we ignore all items until the requested one is @@ -754,7 +974,8 @@ limit_idx = [i["id"] for i in page_items].index(after_id) except ValueError: # if "after_id" is not found, we don't add any item from this page - log.debug(f"{after_id!r} not found at {page}, skipping") + page_id = page.get("id") if isinstance(page, dict) else page + log.debug(f"{after_id!r} not found at {page_id}, skipping") else: found_after_id = True if chronological_pagination: @@ -773,25 +994,24 @@ else: items = items[-max_items:] break - page = outbox_data.get("prev" if chronological_pagination else "next") + page = collection.get("prev" if chronological_pagination else "next") if not page: break if after_id is not None and not found_after_id: raise error.StanzaError("item-not-found") - if after_id is None: - rsm_resp["index"] = 0 if chronological_pagination else count - len(items) - - if start_index is not None: - rsm_resp["index"] = start_index - elif after_id is not None: - log.warning("Can't determine index of first element") - elif chronological_pagination: - rsm_resp["index"] = 0 - else: - rsm_resp["index"] = count - len(items) if items: + if after_id is None: + rsm_resp["index"] = 0 if chronological_pagination else count - len(items) + if start_index is not None: + rsm_resp["index"] = start_index + elif after_id is not None: + log.warning("Can't determine index of first element") + elif chronological_pagination: + rsm_resp["index"] = 0 + else: + rsm_resp["index"] = count - len(items) rsm_resp.update({ "first": items[0]["id"], "last": items[-1]["id"] @@ -799,55 +1019,119 @@ return items, rsm.RSMResponse(**rsm_resp) - async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: + async def parseAPPage( + self, + page: Union[str, dict] + ) -> Tuple[dict, List[domish.Element]]: """Convert AP objects from an AP page to XMPP items - @param url: url linking and AP page + @param page: Can be either url linking and AP page, or the page data directly @return: page data, pubsub items """ - page_data = await self.apGet(url) - ap_items = page_data.get("orderedItems") - if not ap_items: - log.warning('No "orderedItems" collection found') - return page_data, [] + page_data = await self.apGetObject(page) + if page_data is None: + log.warning('No data found in collection') + return {}, [] + ap_items = await self.apGetList(page_data, "orderedItems") + if ap_items is None: + ap_items = await self.apGetList(page_data, "items") + if not ap_items: + log.warning(f'No item field found in collection: {page_data!r}') + return page_data, [] + else: + log.warning( + "Items are not ordered, this is not spec compliant" + ) items = [] # AP Collections are in antichronological order, but we expect chronological in # Pubsub, thus we reverse it for ap_item in reversed(ap_items): try: - ap_object, mb_data = await self.apItem2MBdata(ap_item) + items.append(await self.apItem2Elt(ap_item)) except (exceptions.DataError, NotImplementedError, error.StanzaError): continue - item_elt = await self._m.data2entry( - self.client, mb_data, ap_object["id"], None, self._m.namespace - ) - item_elt["publisher"] = mb_data["author_jid"].full() - items.append(item_elt) - return page_data, items - async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: - """Convert AP item to microblog data + async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: + """Convert AP item to parsed microblog data and corresponding item element""" + mb_data = await self.apItem2MBdata(ap_item) + item_elt = await self._m.data2entry( + self.client, mb_data, mb_data["id"], None, self._m.namespace + ) + item_elt["publisher"] = mb_data["author_jid"] + return mb_data, item_elt + + async def apItem2Elt(self, ap_item: dict) -> domish.Element: + """Convert AP item to XMPP item element""" + __, item_elt = await self.apItem2MbDataAndElt(ap_item) + return item_elt + + async def getCommentsNodes( + self, + item_id: str, + parent_id: Optional[str] + ) -> Tuple[Optional[str], Optional[str]]: + """Get node where this item is and node to use for comments + + if config option "comments_max_depth" is set, a common node will be used below the + given depth + @param item_id: ID of the reference item + @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") + @return: a tuple with parent_node_id, comments_node_id: + - parent_node_id is the ID of the node where reference item must be. None is + returned when the root node (i.e. not a comments node) must be used. + - comments_node_id: is the ID of the node to use for comments. None is + returned when no comment node must be used (happens when we have reached + "comments_max_depth") + """ + if parent_id is None or not self.comments_max_depth: + return ( + self._m.getCommentsNode(parent_id) if parent_id is not None else None, + self._m.getCommentsNode(item_id) + ) + parent_url = parent_id + parents = [] + for __ in range(COMMENTS_MAX_PARENTS): + parent_item = await self.apGet(parent_url) + parents.insert(0, parent_item) + parent_url = parent_item.get("inReplyTo") + if parent_url is None: + break + parent_limit = self.comments_max_depth-1 + if len(parents) <= parent_limit: + return ( + self._m.getCommentsNode(parents[-1]["id"]), + self._m.getCommentsNode(item_id) + ) + else: + last_level_item = parents[parent_limit] + return ( + self._m.getCommentsNode(last_level_item["id"]), + None + ) + + async def apItem2MBdata(self, ap_item: dict) -> dict: + """Convert AP activity or object to microblog data @return: AP Item's Object and microblog data @raise exceptions.DataError: something is invalid in the AP item @raise NotImplemented: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ - ap_object = ap_item.get("object") - if not ap_object: - log.warning(f'No "object" found in AP item {ap_item!r}') + is_activity = self.isActivity(ap_item) + if is_activity: + ap_object = await self.apGetObject(ap_item, "object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + else: + ap_object = ap_item + item_id = ap_object.get("id") + if not item_id: + log.warning(f'No "id" found in AP item: {ap_object!r}') raise exceptions.DataError - if isinstance(ap_object, str): - ap_object = await self.apGet(ap_object) - obj_id = ap_object.get("id") - if not obj_id: - log.warning(f'No "id" found in AP object: {ap_object!r}') - raise exceptions.DataError - if ap_object.get("inReplyTo") is not None: - raise NotImplementedError - mb_data = {} + mb_data = {"id": item_id} for ap_key, mb_key in AP_MB_MAP.items(): data = ap_object.get(ap_key) if data is None: @@ -868,37 +1152,20 @@ mb_data["content_xhtml"] = content_xhtml # author - actor = ap_item.get("actor") - if not actor: - log.warning(f"no actor associated to object id {obj_id!r}") - raise exceptions.DataError - elif isinstance(actor, list): - # we only keep first item of list as author + if is_activity: + authors = await self.apGetActors(ap_item, "actor") + else: + authors = await self.apGetActors(ap_object, "attributedTo") + if len(authors) > 1: + # we only keep first item as author # TODO: handle multiple actors - if len(actor) > 1: - log.warning("multiple actors are not managed") - actor = actor[0] + log.warning("multiple actors are not managed") - if isinstance(actor, dict): - actor = actor.get("id") - if not actor: - log.warning(f"no actor id found: {actor!r}") - raise exceptions.DataError + account = authors[0] + author_jid = self.getLocalJIDFromAccount(account).full() - if isinstance(actor, str): - account = await self.getAPAccountFromId(actor) - mb_data["author"] = account.split("@", 1)[0] - author_jid = mb_data["author_jid"] = jid.JID( - None, - ( - self.host.plugins["XEP-0106"].escape(account), - self.client.jid.host, - None - ) - ) - else: - log.warning(f"unknown actor type found: {actor!r}") - raise exceptions.DataError + mb_data["author"] = account.split("@", 1)[0] + mb_data["author_jid"] = author_jid # published/updated for field in ("published", "updated"): @@ -912,14 +1179,30 @@ ) except dateutil.parser.ParserError as e: log.warning(f"Can't parse {field!r} field: {e}") - return ap_object, mb_data + + # comments + in_reply_to = ap_object.get("inReplyTo") + __, comments_node = await self.getCommentsNodes(item_id, in_reply_to) + if comments_node is not None: + comments_data = { + "service": author_jid, + "node": comments_node, + "uri": uri.buildXMPPUri( + "pubsub", + path=author_jid, + node=comments_node + ) + } + mb_data["comments"] = [comments_data] + + return mb_data async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: """Convert Libervia Microblog Data to ActivityPub item""" if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): - mb_data["author_jid"] = client.jid + mb_data["author_jid"] = client.jid.full() ap_account = await self.getAPAccountFromJidAndNode( jid.JID(mb_data["author_jid"]), None @@ -967,8 +1250,8 @@ """ if not service.user: raise ValueError("service must have a local part") - account = self.host.plugins["XEP-0106"].unescape(service.user) - ap_actor_data = await self.getAPActorDataFromId(account) + account = self._e.unescape(service.user) + ap_actor_data = await self.getAPActorDataFromAccount(account) try: inbox_url = ap_actor_data["endpoints"]["sharedInbox"] @@ -980,3 +1263,72 @@ resp = await self.signAndPost(inbox_url, url_actor, item_data) if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}") + + async def newAPItem( + self, + client: SatXMPPEntity, + destinee: Optional[jid.JID], + node: str, + item: dict, + ) -> None: + """Analyse, cache and send notification for received AP item + + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + """ + service = client.jid + in_reply_to = item.get("inReplyTo") + if in_reply_to and isinstance(in_reply_to, str): + # this item is a reply, we use or create a corresponding node for comments + parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) + node = parent_node or node + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + if cached_node is None: + try: + cached_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + subscribed=True + ) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + # the node may already exist, if it has been created just after + # getPubsubNode above + log.debug("ignoring UNIQUE constraint error") + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + else: + raise e + + else: + # it is a root item (i.e. not a reply to an other item) + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True + ) + if cached_node is None: + log.warning( + f"Received item in unknown node {node!r} at {service}\n{item}" + + ) + return + mb_data, item_elt = await self.apItem2MbDataAndElt(item) + await self.host.memory.storage.cachePubsubItems( + client, + cached_node, + [item_elt], + [mb_data] + ) + + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.pubsub_service.notifyPublish( + service, + node, + [(subscription.subscriber, None, [item_elt])] + )