Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_comp_ap_gateway/__init__.py@c23cad65ae99 |
children | c3b68fdc2de7 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,2781 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import base64 +import calendar +import hashlib +import json +from pathlib import Path +from pprint import pformat +import re +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Union, + overload, +) +from urllib import parse + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import padding +import dateutil +from dateutil.parser import parserinfo +import shortuuid +from sqlalchemy.exc import IntegrityError +import treq +from treq.response import _Response as TReqResponse +from twisted.internet import defer, reactor, threads +from twisted.web import http +from twisted.words.protocols.jabber import error, jid +from twisted.words.xish import domish +from wokkel import pubsub, rsm + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.memory import persistent +from libervia.backend.memory.sqla_mapping import History, SubscriptionState +from libervia.backend.tools import utils +from libervia.backend.tools.common import data_format, date_utils, tls, uri +from libervia.backend.tools.common.async_utils import async_lru + +from .ad_hoc import APAdHocService +from .events import APEvents +from .constants import ( + ACTIVITY_OBJECT_MANDATORY, + ACTIVITY_TARGET_MANDATORY, + ACTIVITY_TYPES, + ACTIVITY_TYPES_LOWER, + COMMENTS_MAX_PARENTS, + CONF_SECTION, + IMPORT_NAME, + LRU_MAX_SIZE, + MEDIA_TYPE_AP, + NS_AP, + NS_AP_PUBLIC, + PUBLIC_TUPLE, + TYPE_ACTOR, + TYPE_EVENT, + TYPE_FOLLOWERS, + TYPE_ITEM, + TYPE_LIKE, + TYPE_MENTION, + TYPE_REACTION, + TYPE_TOMBSTONE, + TYPE_JOIN, + TYPE_LEAVE +) +from .http_server import HTTPServer +from .pubsub_service import APPubsubService +from .regex import RE_MENTION + + +log = getLogger(__name__) + +IMPORT_NAME = "ap-gateway" + +PLUGIN_INFO = { + C.PI_NAME: "ActivityPub Gateway component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: [ + "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", + "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", + "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" + ], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "APGateway", + C.PI_HANDLER: C.BOOL_TRUE, + C.PI_DESCRIPTION: _( + "Gateway for bidirectional communication between XMPP and ActivityPub." + ), +} + +HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" +RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") +RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") +RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") + + +class APGateway: + IMPORT_NAME = IMPORT_NAME + # show data send or received through HTTP, used for debugging + # 1: log POST objects + # 2: log POST and GET objects + # 3: log POST and GET objects with HTTP headers for GET requests + verbose = 0 + + def __init__(self, host): + self.host = host + self.initialised = False + self.client = None + self._p = host.plugins["XEP-0060"] + self._a = host.plugins["XEP-0084"] + self._e = host.plugins["XEP-0106"] + self._m = host.plugins["XEP-0277"] + self._v = host.plugins["XEP-0292"] + self._refs = host.plugins["XEP-0372"] + self._r = host.plugins["XEP-0424"] + self._sfs = host.plugins["XEP-0447"] + self._pps = host.plugins["XEP-0465"] + self._pa = host.plugins["XEP-0470"] + self._c = host.plugins["PUBSUB_CACHE"] + self._t = host.plugins["TEXT_SYNTAXES"] + self._i = host.plugins["IDENTITY"] + self._events = host.plugins["XEP-0471"] + self._p.add_managed_node( + "", + items_cb=self._items_received, + # we want to be sure that the callbacks are launched before pubsub cache's + # one, as we need to inspect items before they are actually removed from cache + # or updated + priority=1000 + ) + self.pubsub_service = APPubsubService(self) + self.ad_hoc = APAdHocService(self) + self.ap_events = APEvents(self) + host.trigger.add("message_received", self._message_received_trigger, priority=-1000) + host.trigger.add("XEP-0424_retractReceived", self._on_message_retract) + host.trigger.add("XEP-0372_ref_received", self._on_reference_received) + + host.bridge.add_method( + "ap_send", + ".plugin", + in_sign="sss", + out_sign="", + method=self._publish_message, + async_=True, + ) + + def get_handler(self, __): + return self.pubsub_service + + async def init(self, client): + if self.initialised: + return + + self.initialised = True + log.info(_("ActivityPub Gateway initialization")) + + # RSA keys + stored_data = await self.host.memory.storage.get_privates( + IMPORT_NAME, ["rsa_key"], profile=client.profile + ) + private_key_pem = stored_data.get("rsa_key") + if private_key_pem is None: + self.private_key = await threads.deferToThread( + rsa.generate_private_key, + public_exponent=65537, + key_size=4096, + ) + private_key_pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode() + await self.host.memory.storage.set_private_value( + IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile + ) + else: + self.private_key = serialization.load_pem_private_key( + private_key_pem.encode(), + password=None, + ) + self.public_key = self.private_key.public_key() + self.public_key_pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ).decode() + + # params + # URL and port + self.public_url = self.host.memory.config_get( + CONF_SECTION, "public_url" + ) or self.host.memory.config_get( + CONF_SECTION, "xmpp_domain" + ) + if self.public_url is None: + log.error( + '"public_url" not set in configuration, this is mandatory to have' + "ActivityPub Gateway running. Please set this option it to public facing " + f"url in {CONF_SECTION!r} configuration section." + ) + return + if parse.urlparse(self.public_url).scheme: + log.error( + "Scheme must not be specified in \"public_url\", please remove it from " + "\"public_url\" configuration option. ActivityPub Gateway won't be run." + ) + return + self.http_port = int(self.host.memory.config_get( + CONF_SECTION, 'http_port', 8123)) + connection_type = self.host.memory.config_get( + CONF_SECTION, 'http_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + 'bad ap-gateay http_connection_type, you must use one of "http" or ' + '"https"' + ) + self.max_items = int(self.host.memory.config_get( + CONF_SECTION, 'new_node_max_items', 50 + + )) + self.comments_max_depth = int(self.host.memory.config_get( + CONF_SECTION, 'comments_max_depth', 0 + )) + self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap') + self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") + # True (default) if we provide gateway only to entities/services from our server + self.local_only = C.bool( + self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) + ) + # if True (default), mention will be parsed in non-private content coming from + # XMPP. This is necessary as XEP-0372 are coming separately from item where the + # mention is done, which is hard to impossible to translate to ActivityPub (where + # mention specified inside the item directly). See documentation for details. + self.auto_mentions = C.bool( + self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) + ) + + html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( + CONF_SECTION, 'html_redirect_dict', {} + ) + self.html_redirect: Dict[str, List[dict]] = {} + for url_type, target in html_redirect.items(): + if isinstance(target, str): + target = {"url": target} + elif not isinstance(target, dict): + raise exceptions.ConfigError( + f"html_redirect target must be a URL or a dict, not {target!r}" + ) + filters = target.setdefault("filters", {}) + if "url" not in target: + log.warning(f"invalid HTML redirection, missing target URL: {target}") + continue + # a slash in the url_type is a syntactic shortcut to have a node filter + if "/" in url_type: + url_type, node_filter = url_type.split("/", 1) + filters["node"] = node_filter + self.html_redirect.setdefault(url_type, []).append(target) + + # HTTP server launch + self.server = HTTPServer(self) + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.get_options_from_config( + self.host.memory.config, CONF_SECTION) + tls.tls_options_check(options) + context_factory = tls.get_tls_context_factory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + + async def profile_connecting(self, client): + self.client = client + client.sendHistory = True + client._ap_storage = persistent.LazyPersistentBinaryDict( + IMPORT_NAME, + client.profile + ) + await self.init(client) + + def profile_connected(self, client): + self.ad_hoc.init(client) + + async def _items_received( + self, + client: SatXMPPEntity, + itemsEvent: pubsub.ItemsEvent + ) -> None: + """Callback called when pubsub items are received + + if the items are adressed to a JID corresponding to an AP actor, they are + converted to AP items and sent to the corresponding AP server. + + If comments nodes are linked, they are automatically subscribed to get new items + from there too. + """ + if client != self.client: + return + # we need recipient as JID and not gateway own JID to be able to use methods such + # as "subscribe" + client = self.client.get_virtual_client(itemsEvent.sender) + recipient = itemsEvent.recipient + if not recipient.user: + log.debug("ignoring items event without local part specified") + return + + ap_account = self._e.unescape(recipient.user) + + if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): + await self.convert_and_post_attachments( + client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, + itemsEvent.items + ) + else: + await self.convert_and_post_items( + client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, + itemsEvent.items + ) + + async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity: + """Get client for this component with a specified jid + + This is needed to perform operations with the virtual JID corresponding to the AP + actor instead of the JID of the gateway itself. + @param actor_id: ID of the actor + @return: virtual client + """ + local_jid = await self.get_jid_from_id(actor_id) + return self.client.get_virtual_client(local_jid) + + def is_activity(self, data: dict) -> bool: + """Return True if the data has an activity type""" + try: + return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER + except (KeyError, TypeError): + return False + + async def ap_get(self, url: str) -> dict: + """Retrieve AP JSON from given URL + + @raise error.StanzaError: "service-unavailable" is sent when something went wrong + with AP server + """ + resp = await treq.get( + url, + headers = { + "Accept": [MEDIA_TYPE_AP], + "Content-Type": [MEDIA_TYPE_AP], + } + ) + if resp.code >= 300: + text = await resp.text() + if resp.code == 404: + raise exceptions.NotFound(f"Can't find resource at {url}") + else: + msg = f"HTTP error {resp.code} (url: {url}): {text}" + raise exceptions.ExternalRequestError(msg) + try: + return await treq.json_content(resp) + except Exception as e: + raise error.StanzaError( + "service-unavailable", + text=f"Can't get AP data at {url}: {e}" + ) + + @overload + async def ap_get_object(self, data: dict, key: str) -> Optional[dict]: + ... + + @overload + async def ap_get_object( + self, data: Union[str, dict], key: None = None + ) -> dict: + ... + + async def ap_get_object(self, data, key = None): + """Retrieve an AP object, dereferencing when necessary + + This method is to be used with attributes marked as "Functional" in + https://www.w3.org/TR/activitystreams-vocabulary + @param data: AP object where an other object is looked for, or the object itself + @param key: name of the object to look for, or None if data is the object directly + @return: found object if any + """ + if key is not None: + value = data.get(key) + else: + value = data + if value is None: + if key is None: + raise ValueError("None can't be used with ap_get_object is key is None") + return None + elif isinstance(value, dict): + return value + elif isinstance(value, str): + if self.is_local_url(value): + return await self.ap_get_local_object(value) + else: + return await self.ap_get(value) + else: + raise NotImplementedError( + "was expecting a string or a dict, got {type(value)}: {value!r}}" + ) + + async def ap_get_local_object( + self, + url: str + ) -> dict: + """Retrieve or generate local object + + for now, only handle XMPP items to convert to AP + """ + url_type, url_args = self.parse_apurl(url) + if url_type == TYPE_ITEM: + try: + account, item_id = url_args + except ValueError: + raise ValueError(f"invalid URL: {url}") + author_jid, node = await self.get_jid_and_node(account) + if node is None: + node = self._m.namespace + cached_node = await self.host.memory.storage.get_pubsub_node( + self.client, author_jid, node + ) + if not cached_node: + log.debug(f"node {node!r} at {author_jid} is not found in cache") + found_item = None + else: + cached_items, __ = await self.host.memory.storage.get_items( + cached_node, item_ids=[item_id] + ) + if not cached_items: + log.debug( + f"item {item_id!r} of {node!r} at {author_jid} is not found in " + "cache" + ) + found_item = None + else: + found_item = cached_items[0].data + + if found_item is None: + # the node is not in cache, we have to make a request to retrieve the item + # If the node doesn't exist, get_items will raise a NotFound exception + found_items, __ = await self._p.get_items( + self.client, author_jid, node, item_ids=[item_id] + ) + try: + found_item = found_items[0] + except IndexError: + raise exceptions.NotFound(f"requested item at {url} can't be found") + + if node.startswith(self._events.namespace): + # this is an event + event_data = self._events.event_elt_2_event_data(found_item) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid + ) + # the URL must return the object and not the activity + ap_item["object"]["@context"] = ap_item["@context"] + return ap_item["object"] + else: + # this is a blog item + mb_data = await self._m.item_2_mb_data( + self.client, found_item, author_jid, node + ) + ap_item = await self.mb_data_2_ap_item(self.client, mb_data) + # the URL must return the object and not the activity + return ap_item["object"] + else: + raise NotImplementedError( + 'only object from "item" URLs can be retrieved for now' + ) + + async def ap_get_list( + self, + data: dict, + key: str, + only_ids: bool = False + ) -> Optional[List[Dict[str, Any]]]: + """Retrieve a list of objects from AP data, dereferencing when necessary + + This method is to be used with non functional vocabularies. Use ``ap_get_object`` + otherwise. + If the value is a dictionary, it will be wrapped in a list + @param data: AP object where a list of objects is looked for + @param key: key of the list to look for + @param only_ids: if Trye, only items IDs are retrieved + @return: list of objects, or None if the key is not present + """ + value = data.get(key) + if value is None: + return None + elif isinstance(value, str): + if self.is_local_url(value): + value = await self.ap_get_local_object(value) + else: + value = await self.ap_get(value) + if isinstance(value, dict): + return [value] + if not isinstance(value, list): + raise ValueError(f"A list was expected, got {type(value)}: {value!r}") + if only_ids: + return [ + {"id": v["id"]} if isinstance(v, dict) else {"id": v} + for v in value + ] + else: + return [await self.ap_get_object(i) for i in value] + + async def ap_get_actors( + self, + data: dict, + key: str, + as_account: bool = True + ) -> List[str]: + """Retrieve AP actors from data + + @param data: AP object containing a field with actors + @param key: field to use to retrieve actors + @param as_account: if True returns account handles, otherwise will return actor + IDs + @raise exceptions.DataError: there is not actor data or it is invalid + """ + value = data.get(key) + if value is None: + raise exceptions.DataError( + f"no actor associated to object {data.get('id')!r}" + ) + elif isinstance(value, dict): + actor_id = value.get("id") + if actor_id is None: + raise exceptions.DataError( + f"invalid actor associated to object {data.get('id')!r}: {value!r}" + ) + value = [actor_id] + elif isinstance(value, str): + value = [value] + elif isinstance(value, list): + try: + value = [a if isinstance(a, str) else a["id"] for a in value] + except (TypeError, KeyError): + raise exceptions.DataError( + f"invalid actors list to object {data.get('id')!r}: {value!r}" + ) + if not value: + raise exceptions.DataError( + f"list of actors is empty" + ) + if as_account: + return [await self.get_ap_account_from_id(actor_id) for actor_id in value] + else: + return value + + async def ap_get_sender_actor( + self, + data: dict, + ) -> str: + """Retrieve actor who sent data + + This is done by checking "actor" field first, then "attributedTo" field. + Only the first found actor is taken into account + @param data: AP object + @return: actor id of the sender + @raise exceptions.NotFound: no actor has been found in data + """ + try: + actors = await self.ap_get_actors(data, "actor", as_account=False) + except exceptions.DataError: + actors = None + if not actors: + try: + actors = await self.ap_get_actors(data, "attributedTo", as_account=False) + except exceptions.DataError: + raise exceptions.NotFound( + 'actor not specified in "actor" or "attributedTo"' + ) + try: + return actors[0] + except IndexError: + raise exceptions.NotFound("list of actors is empty") + + def must_encode(self, text: str) -> bool: + """Indicate if a text must be period encoded""" + return ( + not RE_ALLOWED_UNQUOTED.match(text) + or text.startswith("___") + or "---" in text + ) + + def period_encode(self, text: str) -> str: + """Period encode a text + + see [get_jid_and_node] for reasons of period encoding + """ + return ( + parse.quote(text, safe="") + .replace("---", "%2d%2d%2d") + .replace("___", "%5f%5f%5f") + .replace(".", "%2e") + .replace("~", "%7e") + .replace("%", ".") + ) + + async def get_ap_account_from_jid_and_node( + self, + jid_: jid.JID, + node: Optional[str] + ) -> str: + """Construct AP account from JID and node + + The account construction will use escaping when necessary + """ + if not node or node == self._m.namespace: + node = None + + if self.client is None: + raise exceptions.InternalError("Client is not set yet") + + if self.is_virtual_jid(jid_): + # this is an proxy JID to an AP Actor + return self._e.unescape(jid_.user) + + if node and not jid_.user and not self.must_encode(node): + is_pubsub = await self.is_pubsub(jid_) + # when we have a pubsub service, the user part can be used to set the node + # this produces more user-friendly AP accounts + if is_pubsub: + jid_.user = node + node = None + + is_local = self.is_local(jid_) + user = jid_.user if is_local else jid_.userhost() + if user is None: + user = "" + account_elts = [] + if node and self.must_encode(node) or self.must_encode(user): + account_elts = ["___"] + if node: + node = self.period_encode(node) + user = self.period_encode(user) + + if not user: + raise exceptions.InternalError("there should be a user part") + + if node: + account_elts.extend((node, "---")) + + account_elts.extend(( + user, "@", jid_.host if is_local else self.client.jid.userhost() + )) + return "".join(account_elts) + + def is_local(self, jid_: jid.JID) -> bool: + """Returns True if jid_ use a domain or subdomain of gateway's host""" + local_host = self.client.host.split(".") + assert local_host + return jid_.host.split(".")[-len(local_host):] == local_host + + async def is_pubsub(self, jid_: jid.JID) -> bool: + """Indicate if a JID is a Pubsub service""" + host_disco = await self.host.get_disco_infos(self.client, jid_) + return ( + ("pubsub", "service") in host_disco.identities + and not ("pubsub", "pep") in host_disco.identities + ) + + async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: + """Decode raw AP account handle to get XMPP JID and Pubsub Node + + Username are case insensitive. + + By default, the username correspond to local username (i.e. username from + component's server). + + If local name's domain is a pubsub service (and not PEP), the username is taken as + a pubsub node. + + If ``---`` is present in username, the part before is used as pubsub node, and the + rest as a JID user part. + + If username starts with ``___``, characters are encoded using period encoding + (i.e. percent encoding where a ``.`` is used instead of ``%``). + + This horror is necessary due to limitation in some AP implementation (notably + Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 + + examples: + + ``toto@example.org`` => JID = toto@example.org, node = None + + ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a + non-local JID, and will work only if setings ``local_only`` is False), node = None + + ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => + JID = pubsub.example.org, node = toto + + ``tata---toto@example.org`` => JID = toto@example.org, node = tata + + ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org + being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 + + @param ap_account: ActivityPub account handle (``username@domain.tld``) + @return: service JID and pubsub node + if pubsub node is None, default microblog pubsub node (and possibly other + nodes that plugins may hanlde) will be used + @raise ValueError: invalid account + @raise PermissionError: non local jid is used when gateway doesn't allow them + """ + if ap_account.count("@") != 1: + raise ValueError("Invalid AP account") + if ap_account.startswith("___"): + encoded = True + ap_account = ap_account[3:] + else: + encoded = False + + username, domain = ap_account.split("@") + + if "---" in username: + node, username = username.rsplit("---", 1) + else: + node = None + + if encoded: + username = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g<hex>", username), + errors="strict" + ) + if node: + node = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g<hex>", node), + errors="strict" + ) + + if "@" in username: + username, domain = username.rsplit("@", 1) + + if not node: + # we need to check host disco, because disco request to user may be + # blocked for privacy reason (see + # https://xmpp.org/extensions/xep-0030.html#security) + is_pubsub = await self.is_pubsub(jid.JID(domain)) + + if is_pubsub: + # if the host is a pubsub service and not a PEP, we consider that username + # is in fact the node name + node = username + username = None + + jid_s = f"{username}@{domain}" if username else domain + try: + jid_ = jid.JID(jid_s) + except RuntimeError: + raise ValueError(f"Invalid jid: {jid_s!r}") + + if self.local_only and not self.is_local(jid_): + raise exceptions.PermissionError( + "This gateway is configured to map only local entities and services" + ) + + return jid_, node + + def get_local_jid_from_account(self, account: str) -> jid.JID: + """Compute JID linking to an AP account + + The local jid is computer by escaping AP actor handle and using it as local part + of JID, where domain part is this gateway own JID + """ + return jid.JID( + None, + ( + self._e.escape(account), + self.client.jid.host, + None + ) + ) + + async def get_jid_from_id(self, actor_id: str) -> jid.JID: + """Compute JID linking to an AP Actor ID + + The local jid is computer by escaping AP actor handle and using it as local part + of JID, where domain part is this gateway own JID + If the actor_id comes from local server (checked with self.public_url), it means + that we have an XMPP entity, and the original JID is returned + """ + if self.is_local_url(actor_id): + request_type, extra_args = self.parse_apurl(actor_id) + if request_type != TYPE_ACTOR or len(extra_args) != 1: + raise ValueError(f"invalid actor id: {actor_id!r}") + actor_jid, __ = await self.get_jid_and_node(extra_args[0]) + return actor_jid + + account = await self.get_ap_account_from_id(actor_id) + return self.get_local_jid_from_account(account) + + def parse_apurl(self, url: str) -> Tuple[str, List[str]]: + """Parse an URL leading to an AP endpoint + + @param url: URL to parse (schema is not mandatory) + @return: endpoint type and extra arguments + """ + path = parse.urlparse(url).path.lstrip("/") + type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") + return type_, [parse.unquote(a) for a in extra_args] + + def build_apurl(self, type_:str , *args: str) -> str: + """Build an AP endpoint URL + + @param type_: type of AP endpoing + @param arg: endpoint dependant arguments + """ + return parse.urljoin( + self.base_ap_url, + str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) + ) + + def is_local_url(self, url: str) -> bool: + """Tells if an URL link to this component + + ``public_url`` and ``ap_path`` are used to check the URL + """ + return url.startswith(self.base_ap_url) + + def is_virtual_jid(self, jid_: jid.JID) -> bool: + """Tell if a JID is an AP actor mapped through this gateway""" + return jid_.host == self.client.jid.userhost() + + def build_signature_header(self, values: Dict[str, str]) -> str: + """Build key="<value>" signature header from signature data""" + fields = [] + for key, value in values.items(): + if key not in ("(created)", "(expired)"): + if '"' in value: + raise NotImplementedError( + "string escaping is not implemented, double-quote can't be used " + f"in {value!r}" + ) + value = f'"{value}"' + fields.append(f"{key}={value}") + + return ",".join(fields) + + def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: + """Get digest data to use in header and signature + + @param body: body of the request + @return: hash name and digest + """ + if algo != "SHA-256": + raise NotImplementedError("only SHA-256 is implemented for now") + return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() + + @async_lru(maxsize=LRU_MAX_SIZE) + async def get_actor_data(self, actor_id) -> dict: + """Retrieve actor data with LRU cache""" + return await self.ap_get(actor_id) + + @async_lru(maxsize=LRU_MAX_SIZE) + async def get_actor_pub_key_data( + self, + actor_id: str + ) -> Tuple[str, str, rsa.RSAPublicKey]: + """Retrieve Public Key data from actor ID + + @param actor_id: actor ID (url) + @return: key_id, owner and public_key + @raise KeyError: publicKey is missing from actor data + """ + actor_data = await self.get_actor_data(actor_id) + pub_key_data = actor_data["publicKey"] + key_id = pub_key_data["id"] + owner = pub_key_data["owner"] + pub_key_pem = pub_key_data["publicKeyPem"] + pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) + return key_id, owner, pub_key + + def create_activity( + self, + activity: str, + actor_id: str, + object_: Optional[Union[str, dict]] = None, + target: Optional[Union[str, dict]] = None, + activity_id: Optional[str] = None, + **kwargs, + ) -> Dict[str, Any]: + """Generate base data for an activity + + @param activity: one of ACTIVITY_TYPES + @param actor_id: AP actor ID of the sender + @param object_: content of "object" field + @param target: content of "target" field + @param activity_id: ID to use for the activity + if not set it will be automatically generated, but it is usually desirable to + set the ID manually so it can be retrieved (e.g. for Undo) + """ + if activity not in ACTIVITY_TYPES: + raise exceptions.InternalError(f"invalid activity: {activity!r}") + if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: + raise exceptions.InternalError( + f'"object_" is mandatory for activity {activity!r}' + ) + if target is None and activity in ACTIVITY_TARGET_MANDATORY: + raise exceptions.InternalError( + f'"target" is mandatory for activity {activity!r}' + ) + if activity_id is None: + activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" + data: Dict[str, Any] = { + "@context": [NS_AP], + "actor": actor_id, + "id": activity_id, + "type": activity, + } + data.update(kwargs) + if object_ is not None: + data["object"] = object_ + if target is not None: + data["target"] = target + + return data + + def get_key_id(self, actor_id: str) -> str: + """Get local key ID from actor ID""" + return f"{actor_id}#main-key" + + async def check_signature( + self, + signature: str, + key_id: str, + headers: Dict[str, str] + ) -> str: + """Verify that signature matches given headers + + see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 + + @param signature: Base64 encoded signature + @param key_id: ID of the key used to sign the data + @param headers: headers and their values, including pseudo-headers + @return: id of the signing actor + + @raise InvalidSignature: signature doesn't match headers + """ + to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + if key_id.startswith("acct:"): + actor = key_id[5:] + actor_id = await self.get_ap_actor_id_from_account(actor) + else: + actor_id = key_id.split("#", 1)[0] + + pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id) + if pub_key_id != key_id or pub_key_owner != actor_id: + raise exceptions.EncryptionError("Public Key mismatch") + + try: + pub_key.verify( + base64.b64decode(signature), + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + ) + except InvalidSignature: + raise exceptions.EncryptionError( + "Invalid signature (using PKC0S1 v1.5 and SHA-256)" + ) + + return actor_id + + def get_signature_data( + self, + key_id: str, + headers: Dict[str, str] + ) -> Tuple[Dict[str, str], Dict[str, str]]: + """Generate and return signature and corresponding headers + + @param parsed_url: URL where the request is sent/has been received + @param key_id: ID of the key (URL linking to the data with public key) + @param date: HTTP datetime string of signature generation + @param body: body of the HTTP request + @param headers: headers to sign and their value: + default value will be used if not specified + + @return: headers and signature data + ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers + removed, and ``Signature`` added. + """ + # headers must be lower case + l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} + to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256() # type: ignore + )).decode() + sign_data = { + "keyId": key_id, + "Algorithm": "rsa-sha256", + "headers": " ".join(l_headers.keys()), + "signature": signature + } + new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} + new_headers["Signature"] = self.build_signature_header(sign_data) + return new_headers, sign_data + + async def convert_and_post_items( + self, + client: SatXMPPEntity, + ap_account: str, + service: jid.JID, + node: str, + items: List[domish.Element], + subscribe_extra_nodes: bool = True, + ) -> None: + """Convert XMPP items to AP items and post them to actor inbox + + @param ap_account: account of ActivityPub actor receiving the item + @param service: JID of the (virtual) pubsub service where the item has been + published + @param node: (virtual) node corresponding where the item has been published + @param subscribe_extra_nodes: if True, extra data nodes will be automatically + subscribed, that is comment nodes if present and attachments nodes. + """ + actor_id = await self.get_ap_actor_id_from_account(ap_account) + inbox = await self.get_ap_inbox_from_id(actor_id) + for item in items: + if item.name == "item": + cached_item = await self.host.memory.storage.search_pubsub_items({ + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item["id"]] + }) + is_new = not bool(cached_item) + if node.startswith(self._events.namespace): + # event item + event_data = self._events.event_elt_2_event_data(item) + try: + author_jid = jid.JID(item["publisher"]).userhostJID() + except (KeyError, RuntimeWarning): + root_elt = item + while root_elt.parent is not None: + root_elt = root_elt.parent + author_jid = jid.JID(root_elt["from"]).userhostJID() + if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.get_local_jid_from_account(ap_account) + recipient_client = self.client.get_virtual_client(recipient_jid) + comments_data = event_data.get("comments") + if comments_data: + comment_service = jid.JID(comments_data["jid"]) + comment_node = comments_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, event_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {event_data['id']!r} " + f"on {node!r} at {service}" + ) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid, is_new=is_new + ) + else: + # blog item + mb_data = await self._m.item_2_mb_data(client, item, service, node) + author_jid = jid.JID(mb_data["author_jid"]) + if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.get_local_jid_from_account(ap_account) + recipient_client = self.client.get_virtual_client(recipient_jid) + for comment_data in mb_data.get("comments", []): + comment_service = jid.JID(comment_data["service"]) + if self.is_virtual_jid(comment_service): + log.debug( + f"ignoring virtual comment service: {comment_data}" + ) + continue + comment_node = comment_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, mb_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {mb_data['id']!r} on " + f"{node!r} at {service}" + ) + ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) + + url_actor = ap_item["actor"] + elif item.name == "retract": + url_actor, ap_item = await self.ap_delete_item( + client.jid, node, item["id"] + ) + else: + raise exceptions.InternalError(f"unexpected element: {item.toXml()}") + await self.sign_and_post(inbox, url_actor, ap_item) + + async def convert_and_post_attachments( + self, + client: SatXMPPEntity, + ap_account: str, + service: jid.JID, + node: str, + items: List[domish.Element], + publisher: Optional[jid.JID] = None + ) -> None: + """Convert XMPP item attachments to AP activities and post them to actor inbox + + @param ap_account: account of ActivityPub actor receiving the item + @param service: JID of the (virtual) pubsub service where the item has been + published + @param node: (virtual) node corresponding where the item has been published + subscribed, that is comment nodes if present and attachments nodes. + @param items: attachments items + @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub + service, it's the publisher of the item). To be filled only when the publisher + is known for sure, otherwise publisher will be determined either if + "publisher" attribute is set by pubsub service, or as a last resort, using + item's ID (which MUST be publisher bare JID according to pubsub-attachments + specification). + """ + if len(items) != 1: + log.warning( + "we should get exactly one attachment item for an entity, got " + f"{len(items)})" + ) + + actor_id = await self.get_ap_actor_id_from_account(ap_account) + inbox = await self.get_ap_inbox_from_id(actor_id) + + item_elt = items[0] + item_id = item_elt["id"] + + if publisher is None: + item_pub_s = item_elt.getAttribute("publisher") + publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id) + + if publisher.userhost() != item_id: + log.warning( + "attachments item ID must be publisher's bare JID, ignoring: " + f"{item_elt.toXml()}" + ) + return + + if self.is_virtual_jid(publisher): + log.debug(f"ignoring item coming from local virtual JID {publisher}") + return + + if publisher is not None: + item_elt["publisher"] = publisher.userhost() + + item_service, item_node, item_id = self._pa.attachment_node_2_item(node) + item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node) + if self.is_virtual_jid(item_service): + # it's a virtual JID mapping to an external AP actor, we can use the + # item_id directly + item_url = item_id + if not item_url.startswith("https:"): + log.warning( + "item ID of external AP actor is not an https link, ignoring: " + f"{item_id!r}" + ) + return + else: + item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) + + old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item_elt["id"]] + }) + if not old_attachment_pubsub_items: + old_attachment = {} + else: + old_attachment_items = [i.data for i in old_attachment_pubsub_items] + old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items) + try: + old_attachment = old_attachments[0] + except IndexError: + # no known element was present in attachments + old_attachment = {} + publisher_account = await self.get_ap_account_from_jid_and_node( + publisher, + None + ) + publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) + try: + attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] + except IndexError: + # no known element was present in attachments + attachments = {} + + # noticed + if "noticed" in attachments: + if not "noticed" in old_attachment: + # new "noticed" attachment, we translate to "Like" activity + activity_id = self.build_apurl("like", item_account, item_id) + activity = self.create_activity( + TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.sign_and_post(inbox, publisher_actor_id, activity) + else: + if "noticed" in old_attachment: + # "noticed" attachment has been removed, we undo the "Like" activity + activity_id = self.build_apurl("like", item_account, item_id) + activity = self.create_activity( + TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + undo = self.create_activity("Undo", publisher_actor_id, activity) + await self.sign_and_post(inbox, publisher_actor_id, undo) + + # reactions + new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) + old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) + reactions_remove = old_reactions - new_reactions + reactions_add = new_reactions - old_reactions + for reactions, undo in ((reactions_remove, True), (reactions_add, False)): + for reaction in reactions: + activity_id = self.build_apurl( + "reaction", item_account, item_id, reaction.encode().hex() + ) + reaction_activity = self.create_activity( + TYPE_REACTION, publisher_actor_id, item_url, + activity_id=activity_id + ) + reaction_activity["content"] = reaction + reaction_activity["to"] = [ap_account] + reaction_activity["cc"] = [NS_AP_PUBLIC] + if undo: + activy = self.create_activity( + "Undo", publisher_actor_id, reaction_activity + ) + else: + activy = reaction_activity + await self.sign_and_post(inbox, publisher_actor_id, activy) + + # RSVP + if "rsvp" in attachments: + attending = attachments["rsvp"].get("attending", "no") + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if attending != old_attending: + activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE + activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) + activity = self.create_activity( + activity_type, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.sign_and_post(inbox, publisher_actor_id, activity) + else: + if "rsvp" in old_attachment: + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if old_attending == "yes": + activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) + activity = self.create_activity( + TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.sign_and_post(inbox, publisher_actor_id, activity) + + if service.user and self.is_virtual_jid(service): + # the item is on a virtual service, we need to store it in cache + log.debug("storing attachments item in cache") + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, node, with_subscriptions=True, create=True + ) + await self.host.memory.storage.cache_pubsub_items( + self.client, + cached_node, + [item_elt], + [attachments] + ) + + async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse: + """Sign a documentent and post it to AP server + + @param url: AP server endpoint + @param actor_id: originating actor ID (URL) + @param doc: document to send + """ + if self.verbose: + __, actor_args = self.parse_apurl(actor_id) + actor_account = actor_args[0] + to_log = [ + "", + f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" + ] + + p_url = parse.urlparse(url) + body = json.dumps(doc).encode() + digest_algo, digest_hash = self.get_digest(body) + digest = f"{digest_algo}={digest_hash}" + + headers = { + "(request-target)": f"post {p_url.path}", + "Host": p_url.hostname, + "Date": http.datetimeToString().decode(), + "Digest": digest + } + headers["Content-Type"] = ( + 'application/activity+json' + ) + headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers) + + if self.verbose: + if self.verbose>=3: + h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items()) + to_log.append(f" headers:\n{h_to_log}") + to_log.append("---") + log.info("\n".join(to_log)) + + resp = await treq.post( + url, + body, + headers=headers, + ) + if resp.code >= 300: + text = await resp.text() + log.warning(f"POST request to {url} failed [{resp.code}]: {text}") + elif self.verbose: + log.info(f"==> response code: {resp.code}") + return resp + + def _publish_message(self, mess_data_s: str, service_s: str, profile: str): + mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore + service = jid.JID(service_s) + client = self.host.get_client(profile) + return defer.ensureDeferred(self.publish_message(client, mess_data, service)) + + @async_lru(maxsize=LRU_MAX_SIZE) + async def get_ap_actor_id_from_account(self, account: str) -> str: + """Retrieve account ID from it's handle using WebFinger + + Don't use this method to get local actor id from a local account derivated for + JID: in this case, the actor ID is retrieve with + ``self.build_apurl(TYPE_ACTOR, ap_account)`` + + @param account: AP handle (user@domain.tld) + @return: Actor ID (which is an URL) + """ + if account.count("@") != 1 or "/" in account: + raise ValueError(f"Invalid account: {account!r}") + host = account.split("@")[1] + try: + finger_data = await treq.json_content(await treq.get( + f"https://{host}/.well-known/webfinger?" + f"resource=acct:{parse.quote_plus(account)}", + )) + except Exception as e: + raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") + for link in finger_data.get("links", []): + if ( + link.get("type") == "application/activity+json" + and link.get("rel") == "self" + ): + href = link.get("href", "").strip() + if not href: + raise ValueError( + f"Invalid webfinger data for {account:r}: missing href" + ) + break + else: + raise ValueError( + f"No ActivityPub link found for {account!r}" + ) + return href + + async def get_ap_actor_data_from_account(self, account: str) -> dict: + """Retrieve ActivityPub Actor data + + @param account: ActivityPub Actor identifier + """ + href = await self.get_ap_actor_id_from_account(account) + return await self.ap_get(href) + + async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str: + """Retrieve inbox of an actor_id + + @param use_shared: if True, and a shared inbox exists, it will be used instead of + the user inbox + """ + data = await self.get_actor_data(actor_id) + if use_shared: + try: + return data["endpoints"]["sharedInbox"] + except KeyError: + pass + return data["inbox"] + + @async_lru(maxsize=LRU_MAX_SIZE) + async def get_ap_account_from_id(self, actor_id: str) -> str: + """Retrieve AP account from the ID URL + + Works with external or local actor IDs. + @param actor_id: AP ID of the actor (URL to the actor data) + @return: AP handle + """ + if self.is_local_url(actor_id): + url_type, url_args = self.parse_apurl(actor_id) + if url_type != "actor" or not url_args: + raise exceptions.DataError( + f"invalid local actor ID: {actor_id}" + ) + account = url_args[0] + try: + account_user, account_host = account.split('@') + except ValueError: + raise exceptions.DataError( + f"invalid account from url: {actor_id}" + ) + if not account_user or account_host != self.public_url: + raise exceptions.DataError( + f"{account!r} is not a valid local account (from {actor_id})" + ) + return account + + url_parsed = parse.urlparse(actor_id) + actor_data = await self.get_actor_data(actor_id) + username = actor_data.get("preferredUsername") + if not username: + raise exceptions.DataError( + 'No "preferredUsername" field found, can\'t retrieve actor account' + ) + account = f"{username}@{url_parsed.hostname}" + # we try to retrieve the actor ID from the account to check it + found_id = await self.get_ap_actor_id_from_account(account) + if found_id != actor_id: + # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 + msg = ( + f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " + f"({actor_id!r}). This AP instance doesn't seems to use " + '"preferredUsername" as we expect.' + ) + log.warning(msg) + raise exceptions.DataError(msg) + return account + + async def get_ap_items( + self, + collection: dict, + max_items: Optional[int] = None, + chronological_pagination: bool = True, + after_id: Optional[str] = None, + start_index: Optional[int] = None, + parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, + only_ids: bool = False, + ) -> Tuple[List[domish.Element], rsm.RSMResponse]: + """Retrieve AP items and convert them to XMPP items + + @param account: AP account handle to get items from + @param max_items: maximum number of items to retrieve + retrieve all items by default + @param chronological_pagination: get pages in chronological order + AP use reversed chronological order for pagination, "first" page returns more + recent items. If "chronological_pagination" is True, "last" AP page will be + retrieved first. + @param after_id: if set, retrieve items starting from given ID + Due to ActivityStream Collection Paging limitations, this is inefficient and + if ``after_id`` is not already in cache, we have to retrieve every page until + we find it. + In most common cases, ``after_id`` should be in cache though (client usually + use known ID when in-order pagination is used). + @param start_index: start retrieving items from the one with given index + Due to ActivityStream Collection Paging limitations, this is inefficient and + all pages before the requested index will be retrieved to count items. + @param parser: method to use to parse AP items and get XMPP item elements + if None, use default generic parser + @param only_ids: if True, only retrieve items IDs + Retrieving only item IDs avoid HTTP requests to retrieve items, it may be + sufficient in some use cases (e.g. when retrieving following/followers + collections) + @return: XMPP Pubsub items and corresponding RSM Response + Items are always returned in chronological order in the result + """ + if parser is None: + parser = self.ap_item_2_mb_elt + + rsm_resp: Dict[str, Union[bool, int]] = {} + try: + count = collection["totalItems"] + except KeyError: + log.warning( + f'"totalItems" not found in collection {collection.get("id")}, ' + "defaulting to 20" + ) + count = 20 + else: + log.info(f"{collection.get('id')} has {count} item(s)") + + rsm_resp["count"] = count + + if start_index is not None: + assert chronological_pagination and after_id is None + if start_index >= count: + return [], rsm_resp + elif start_index == 0: + # this is the default behaviour + pass + elif start_index > 5000: + raise error.StanzaError( + "feature-not-implemented", + text="Maximum limit for previous_index has been reached, this limit" + "is set to avoid DoS" + ) + else: + # we'll convert "start_index" to "after_id", thus we need the item just + # before "start_index" + previous_index = start_index - 1 + retrieved_items = 0 + current_page = collection["last"] + while retrieved_items < count: + page_data, items = await self.parse_ap_page( + current_page, parser, only_ids + ) + if not items: + log.warning(f"found an empty AP page at {current_page}") + return [], rsm_resp + page_start_idx = retrieved_items + retrieved_items += len(items) + if previous_index <= retrieved_items: + after_id = items[previous_index - page_start_idx]["id"] + break + try: + current_page = page_data["prev"] + except KeyError: + log.warning( + f"missing previous page link at {current_page}: {page_data!r}" + ) + raise error.StanzaError( + "service-unavailable", + "Error while retrieving previous page from AP service at " + f"{current_page}" + ) + + init_page = "last" if chronological_pagination else "first" + page = collection.get(init_page) + if not page: + raise exceptions.DataError( + f"Initial page {init_page!r} not found for collection " + f"{collection.get('id')})" + ) + items = [] + page_items = [] + retrieved_items = 0 + found_after_id = False + + while retrieved_items < count: + __, page_items = await self.parse_ap_page(page, parser, only_ids) + if not page_items: + break + retrieved_items += len(page_items) + if after_id is not None and not found_after_id: + # if we have an after_id, we ignore all items until the requested one is + # found + try: + limit_idx = [i["id"] for i in page_items].index(after_id) + except ValueError: + # if "after_id" is not found, we don't add any item from this page + page_id = page.get("id") if isinstance(page, dict) else page + log.debug(f"{after_id!r} not found at {page_id}, skipping") + else: + found_after_id = True + if chronological_pagination: + start_index = retrieved_items - len(page_items) + limit_idx + 1 + page_items = page_items[limit_idx+1:] + else: + start_index = count - (retrieved_items - len(page_items) + + limit_idx + 1) + page_items = page_items[:limit_idx] + items.extend(page_items) + else: + items.extend(page_items) + if max_items is not None and len(items) >= max_items: + if chronological_pagination: + items = items[:max_items] + else: + items = items[-max_items:] + break + page = collection.get("prev" if chronological_pagination else "next") + if not page: + break + + if after_id is not None and not found_after_id: + raise error.StanzaError("item-not-found") + + if items: + if after_id is None: + rsm_resp["index"] = 0 if chronological_pagination else count - len(items) + if start_index is not None: + rsm_resp["index"] = start_index + elif after_id is not None: + log.warning("Can't determine index of first element") + elif chronological_pagination: + rsm_resp["index"] = 0 + else: + rsm_resp["index"] = count - len(items) + rsm_resp.update({ + "first": items[0]["id"], + "last": items[-1]["id"] + }) + + return items, rsm.RSMResponse(**rsm_resp) + + async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: + """Convert AP item to parsed microblog data and corresponding item element""" + mb_data = await self.ap_item_2_mb_data(ap_item) + item_elt = await self._m.mb_data_2_entry_elt( + self.client, mb_data, mb_data["id"], None, self._m.namespace + ) + if "repeated" in mb_data["extra"]: + item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] + else: + item_elt["publisher"] = mb_data["author_jid"] + return mb_data, item_elt + + async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: + """Convert AP item to XMPP item element""" + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) + return item_elt + + async def parse_ap_page( + self, + page: Union[str, dict], + parser: Callable[[dict], Awaitable[domish.Element]], + only_ids: bool = False + ) -> Tuple[dict, List[domish.Element]]: + """Convert AP objects from an AP page to XMPP items + + @param page: Can be either url linking and AP page, or the page data directly + @param parser: method to use to parse AP items and get XMPP item elements + @param only_ids: if True, only retrieve items IDs + @return: page data, pubsub items + """ + page_data = await self.ap_get_object(page) + if page_data is None: + log.warning('No data found in collection') + return {}, [] + ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids) + if ap_items is None: + ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids) + if not ap_items: + log.warning(f'No item field found in collection: {page_data!r}') + return page_data, [] + else: + log.warning( + "Items are not ordered, this is not spec compliant" + ) + items = [] + # AP Collections are in antichronological order, but we expect chronological in + # Pubsub, thus we reverse it + for ap_item in reversed(ap_items): + try: + items.append(await parser(ap_item)) + except (exceptions.DataError, NotImplementedError, error.StanzaError): + continue + + return page_data, items + + async def get_comments_nodes( + self, + item_id: str, + parent_id: Optional[str] + ) -> Tuple[Optional[str], Optional[str]]: + """Get node where this item is and node to use for comments + + if config option "comments_max_depth" is set, a common node will be used below the + given depth + @param item_id: ID of the reference item + @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") + @return: a tuple with parent_node_id, comments_node_id: + - parent_node_id is the ID of the node where reference item must be. None is + returned when the root node (i.e. not a comments node) must be used. + - comments_node_id: is the ID of the node to use for comments. None is + returned when no comment node must be used (happens when we have reached + "comments_max_depth") + """ + if parent_id is None or not self.comments_max_depth: + return ( + self._m.get_comments_node(parent_id) if parent_id is not None else None, + self._m.get_comments_node(item_id) + ) + parent_url = parent_id + parents = [] + for __ in range(COMMENTS_MAX_PARENTS): + parent_item = await self.ap_get(parent_url) + parents.insert(0, parent_item) + parent_url = parent_item.get("inReplyTo") + if parent_url is None: + break + parent_limit = self.comments_max_depth-1 + if len(parents) <= parent_limit: + return ( + self._m.get_comments_node(parents[-1]["id"]), + self._m.get_comments_node(item_id) + ) + else: + last_level_item = parents[parent_limit] + return ( + self._m.get_comments_node(last_level_item["id"]), + None + ) + + async def ap_item_2_mb_data(self, ap_item: dict) -> dict: + """Convert AP activity or object to microblog data + + @param ap_item: ActivityPub item to convert + Can be either an activity of an object + @return: AP Item's Object and microblog data + @raise exceptions.DataError: something is invalid in the AP item + @raise NotImplementedError: some AP data is not handled yet + @raise error.StanzaError: error while contacting the AP server + """ + is_activity = self.is_activity(ap_item) + if is_activity: + ap_object = await self.ap_get_object(ap_item, "object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + else: + ap_object = ap_item + item_id = ap_object.get("id") + if not item_id: + log.warning(f'No "id" found in AP item: {ap_object!r}') + raise exceptions.DataError + mb_data = {"id": item_id, "extra": {}} + + # content + try: + language, content_xhtml = ap_object["contentMap"].popitem() + except (KeyError, AttributeError): + try: + mb_data["content_xhtml"] = ap_object["content"] + except KeyError: + log.warning(f"no content found:\n{ap_object!r}") + raise exceptions.DataError + else: + mb_data["language"] = language + mb_data["content_xhtml"] = content_xhtml + + mb_data["content"] = await self._t.convert( + mb_data["content_xhtml"], + self._t.SYNTAX_XHTML, + self._t.SYNTAX_TEXT, + False, + ) + + if "attachment" in ap_object: + attachments = mb_data["extra"][C.KEY_ATTACHMENTS] = [] + for ap_attachment in ap_object["attachment"]: + try: + url = ap_attachment["url"] + except KeyError: + log.warning( + f'"url" missing in AP attachment, ignoring: {ap_attachment}' + ) + continue + + if not url.startswith("http"): + log.warning(f"non HTTP URL in attachment, ignoring: {ap_attachment}") + continue + attachment = {"url": url} + for ap_key, key in ( + ("mediaType", "media_type"), + # XXX: as weird as it seems, "name" is actually used for description + # in AP world + ("name", "desc"), + ): + value = ap_attachment.get(ap_key) + if value: + attachment[key] = value + attachments.append(attachment) + + # author + if is_activity: + authors = await self.ap_get_actors(ap_item, "actor") + else: + authors = await self.ap_get_actors(ap_object, "attributedTo") + if len(authors) > 1: + # we only keep first item as author + # TODO: handle multiple actors + log.warning("multiple actors are not managed") + + account = authors[0] + author_jid = self.get_local_jid_from_account(account).full() + + mb_data["author"] = account.split("@", 1)[0] + mb_data["author_jid"] = author_jid + + # published/updated + for field in ("published", "updated"): + value = ap_object.get(field) + if not value and field == "updated": + value = ap_object.get("published") + if value: + try: + mb_data[field] = calendar.timegm( + dateutil.parser.parse(str(value)).utctimetuple() + ) + except dateutil.parser.ParserError as e: + log.warning(f"Can't parse {field!r} field: {e}") + + # repeat + if "_repeated" in ap_item: + mb_data["extra"]["repeated"] = ap_item["_repeated"] + + # comments + in_reply_to = ap_object.get("inReplyTo") + __, comments_node = await self.get_comments_nodes(item_id, in_reply_to) + if comments_node is not None: + comments_data = { + "service": author_jid, + "node": comments_node, + "uri": uri.build_xmpp_uri( + "pubsub", + path=author_jid, + node=comments_node + ) + } + mb_data["comments"] = [comments_data] + + return mb_data + + async def get_reply_to_id_from_xmpp_node( + self, + client: SatXMPPEntity, + ap_account: str, + parent_item: str, + mb_data: dict + ) -> str: + """Get URL to use for ``inReplyTo`` field in AP item. + + There is currently no way to know the parent service of a comment with XEP-0277. + To work around that, we try to check if we have this item in the cache (we + should). If there is more that one item with this ID, we first try to find one + with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`. + + @param ap_account: AP account corresponding to the publication author + @param parent_item: ID of the node where the publication this item is replying to + has been posted + @param mb_data: microblog data of the publication + @return: URL to use in ``inReplyTo`` field + """ + # FIXME: propose a protoXEP to properly get parent item, node and service + + found_items = await self.host.memory.storage.search_pubsub_items({ + "profiles": [client.profile], + "names": [parent_item] + }) + if not found_items: + log.warning(f"parent item {parent_item!r} not found in cache") + parent_ap_account = ap_account + elif len(found_items) == 1: + cached_node = found_items[0].node + parent_ap_account = await self.get_ap_account_from_jid_and_node( + cached_node.service, + cached_node.name + ) + else: + # we found several cached item with given ID, we check if there is one + # corresponding to this author + try: + author = jid.JID(mb_data["author_jid"]).userhostJID() + cached_item = next( + i for i in found_items + if jid.JID(i.data["publisher"]).userhostJID() + == author + ) + except StopIteration: + # no item corresponding to this author, we use ap_account + log.warning( + "Can't find a single cached item for parent item " + f"{parent_item!r}" + ) + parent_ap_account = ap_account + else: + cached_node = cached_item.node + parent_ap_account = await self.get_ap_account_from_jid_and_node( + cached_node.service, + cached_node.name + ) + + return self.build_apurl( + TYPE_ITEM, parent_ap_account, parent_item + ) + + async def repeated_mb_2_ap_item( + self, + mb_data: dict + ) -> dict: + """Convert repeated blog item to suitable AP Announce activity + + @param mb_data: microblog metadata of an item repeating an other blog post + @return: Announce activity linking to the repeated item + """ + repeated = mb_data["extra"]["repeated"] + repeater = jid.JID(repeated["by"]) + repeater_account = await self.get_ap_account_from_jid_and_node( + repeater, + None + ) + repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) + repeated_uri = repeated["uri"] + + if not repeated_uri.startswith("xmpp:"): + log.warning( + "Only xmpp: URL are handled for repeated item at the moment, ignoring " + f"item {mb_data}" + ) + raise NotImplementedError + parsed_url = uri.parse_xmpp_uri(repeated_uri) + if parsed_url["type"] != "pubsub": + log.warning( + "Only pubsub URL are handled for repeated item at the moment, ignoring " + f"item {mb_data}" + ) + raise NotImplementedError + rep_service = jid.JID(parsed_url["path"]) + rep_item = parsed_url["item"] + activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"]) + + if self.is_virtual_jid(rep_service): + # it's an AP actor linked through this gateway + # in this case we can simply use the item ID + if not rep_item.startswith("https:"): + log.warning( + f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n" + f"{mb_data}" + ) + announced_uri = rep_item + repeated_account = self._e.unescape(rep_service.user) + else: + # the repeated item is an XMPP publication, we build the corresponding ID + rep_node = parsed_url["node"] + repeated_account = await self.get_ap_account_from_jid_and_node( + rep_service, rep_node + ) + announced_uri = self.build_apurl("item", repeated_account, rep_item) + + announce = self.create_activity( + "Announce", repeater_id, announced_uri, activity_id=activity_id + ) + announce["to"] = [NS_AP_PUBLIC] + announce["cc"] = [ + self.build_apurl(TYPE_FOLLOWERS, repeater_account), + await self.get_ap_actor_id_from_account(repeated_account) + ] + return announce + + async def mb_data_2_ap_item( + self, + client: SatXMPPEntity, + mb_data: dict, + public: bool =True, + is_new: bool = True, + ) -> dict: + """Convert Libervia Microblog Data to ActivityPub item + + @param mb_data: microblog data (as used in plugin XEP-0277) to convert + If ``public`` is True, ``service`` and ``node`` keys must be set. + If ``published`` is not set, current datetime will be used + @param public: True if the message is not a private/direct one + if True, the AP Item will be marked as public, and AP followers of target AP + account (which retrieve from ``service``) will be put in ``cc``. + ``inReplyTo`` will also be set if suitable + if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). + This is usually used for direct messages. + @param is_new: if True, the item is a new one (no instance has been found in + cache). + If True, a "Create" activity will be generated, otherwise an "Update" one will + be. + @return: Activity item + """ + extra = mb_data.get("extra", {}) + if "repeated" in extra: + return await self.repeated_mb_2_ap_item(mb_data) + if not mb_data.get("id"): + mb_data["id"] = shortuuid.uuid() + if not mb_data.get("author_jid"): + mb_data["author_jid"] = client.jid.userhost() + ap_account = await self.get_ap_account_from_jid_and_node( + jid.JID(mb_data["author_jid"]), + None + ) + url_actor = self.build_apurl(TYPE_ACTOR, ap_account) + url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) + ap_object = { + "id": url_item, + "type": "Note", + "published": utils.xmpp_date(mb_data.get("published")), + "attributedTo": url_actor, + "content": mb_data.get("content_xhtml") or mb_data["content"], + } + + language = mb_data.get("language") + if language: + ap_object["contentMap"] = {language: ap_object["content"]} + + attachments = extra.get(C.KEY_ATTACHMENTS) + if attachments: + ap_attachments = ap_object["attachment"] = [] + for attachment in attachments: + try: + url = next( + s['url'] for s in attachment["sources"] if 'url' in s + ) + except (StopIteration, KeyError): + log.warning( + f"Ignoring attachment without URL: {attachment}" + ) + continue + ap_attachment = { + "url": url + } + for key, ap_key in ( + ("media_type", "mediaType"), + # XXX: yes "name", cf. [ap_item_2_mb_data] + ("desc", "name"), + ): + value = attachment.get(key) + if value: + ap_attachment[ap_key] = value + ap_attachments.append(ap_attachment) + + if public: + ap_object["to"] = [NS_AP_PUBLIC] + if self.auto_mentions: + for m in RE_MENTION.finditer(ap_object["content"]): + mention = m.group() + mentioned = mention[1:] + __, m_host = mentioned.split("@", 1) + if m_host in (self.public_url, self.client.jid.host): + # we ignore mention of local users, they should be sent as XMPP + # references + continue + try: + mentioned_id = await self.get_ap_actor_id_from_account(mentioned) + except Exception as e: + log.warning(f"Can't add mention to {mentioned!r}: {e}") + else: + ap_object["to"].append(mentioned_id) + ap_object.setdefault("tag", []).append({ + "type": TYPE_MENTION, + "href": mentioned_id, + "name": mention, + }) + try: + node = mb_data["node"] + service = jid.JID(mb_data["service"]) + except KeyError: + # node and service must always be specified when this method is used + raise exceptions.InternalError( + "node or service is missing in mb_data" + ) + target_ap_account = await self.get_ap_account_from_jid_and_node( + service, node + ) + if self.is_virtual_jid(service): + # service is a proxy JID for AP account + actor_data = await self.get_ap_actor_data_from_account(target_ap_account) + followers = actor_data.get("followers") + else: + # service is a real XMPP entity + followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) + if followers: + ap_object["cc"] = [followers] + if self._m.is_comment_node(node): + parent_item = self._m.get_parent_item(node) + if self.is_virtual_jid(service): + # the publication is on a virtual node (i.e. an XMPP node managed by + # this gateway and linking to an ActivityPub actor) + ap_object["inReplyTo"] = parent_item + else: + # the publication is from a followed real XMPP node + ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( + client, + ap_account, + parent_item, + mb_data + ) + + return self.create_activity( + "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item + ) + + async def publish_message( + self, + client: SatXMPPEntity, + mess_data: dict, + service: jid.JID + ) -> None: + """Send an AP message + + .. note:: + + This is a temporary method used for development only + + @param mess_data: message data. Following keys must be set: + + ``node`` + identifier of message which is being replied (this will + correspond to pubsub node in the future) + + ``content_xhtml`` or ``content`` + message body (respectively in XHTML or plain text) + + @param service: JID corresponding to the AP actor. + """ + if not service.user: + raise ValueError("service must have a local part") + account = self._e.unescape(service.user) + ap_actor_data = await self.get_ap_actor_data_from_account(account) + + try: + inbox_url = ap_actor_data["endpoints"]["sharedInbox"] + except KeyError: + raise exceptions.DataError("Can't get ActivityPub actor inbox") + + item_data = await self.mb_data_2_ap_item(client, mess_data) + url_actor = item_data["actor"] + resp = await self.sign_and_post(inbox_url, url_actor, item_data) + + async def ap_delete_item( + self, + jid_: jid.JID, + node: Optional[str], + item_id: str, + public: bool = True + ) -> Tuple[str, Dict[str, Any]]: + """Build activity to delete an AP item + + @param jid_: JID of the entity deleting an item + @param node: node where the item is deleted + None if it's microblog or a message + @param item_id: ID of the item to delete + it's the Pubsub ID or message's origin ID + @param public: if True, the activity will be addressed to public namespace + @return: actor_id of the entity deleting the item, activity to send + """ + if node is None: + node = self._m.namespace + + author_account = await self.get_ap_account_from_jid_and_node(jid_, node) + author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) + + items = await self.host.memory.storage.search_pubsub_items({ + "profiles": [self.client.profile], + "services": [jid_], + "names": [item_id] + }) + if not items: + log.warning( + f"Deleting an unknown item at service {jid_}, node {node} and id " + f"{item_id}" + ) + else: + try: + mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) + if "repeated" in mb_data["extra"]: + # we are deleting a repeated item, we must translate this to an + # "Undo" of the "Announce" activity instead of a "Delete" one + announce = await self.repeated_mb_2_ap_item(mb_data) + undo = self.create_activity("Undo", author_actor_id, announce) + return author_actor_id, undo + except Exception as e: + log.debug( + f"Can't parse item, maybe it's not a blog item: {e}\n" + f"{items[0].toXml()}" + ) + + url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) + ap_item = self.create_activity( + "Delete", + author_actor_id, + { + "id": url_item, + "type": TYPE_TOMBSTONE + } + ) + if public: + ap_item["to"] = [NS_AP_PUBLIC] + return author_actor_id, ap_item + + def _message_received_trigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """add the gateway workflow on post treatment""" + if self.client is None: + log.debug(f"no client set, ignoring message: {message_elt.toXml()}") + return True + post_treat.addCallback( + lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) + ) + return True + + async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: + """Called once message has been parsed + + this method handle the conversion to AP items and posting + """ + if client != self.client: + return mess_data + if mess_data["type"] not in ("chat", "normal"): + log.warning(f"ignoring message with unexpected type: {mess_data}") + return mess_data + if not self.is_local(mess_data["from"]): + log.warning(f"ignoring non local message: {mess_data}") + return mess_data + if not mess_data["to"].user: + log.warning( + f"ignoring message addressed to gateway itself: {mess_data}" + ) + return mess_data + + actor_account = self._e.unescape(mess_data["to"].user) + actor_id = await self.get_ap_actor_id_from_account(actor_account) + inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) + + try: + language, message = next(iter(mess_data["message"].items())) + except (KeyError, StopIteration): + log.warning(f"ignoring empty message: {mess_data}") + return mess_data + + mb_data = { + "content": message, + } + if language: + mb_data["language"] = language + origin_id = mess_data["extra"].get("origin_id") + if origin_id: + # we need to use origin ID when present to be able to retract the message + mb_data["id"] = origin_id + attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) + if attachments: + mb_data["extra"] = { + C.KEY_ATTACHMENTS: attachments + } + + client = self.client.get_virtual_client(mess_data["from"]) + ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) + ap_object = ap_item["object"] + ap_object["to"] = ap_item["to"] = [actor_id] + # we add a mention to direct message, otherwise peer is not notified in some AP + # implementations (notably Mastodon), and the message may be missed easily. + ap_object.setdefault("tag", []).append({ + "type": TYPE_MENTION, + "href": actor_id, + "name": f"@{actor_account}", + }) + + await self.sign_and_post(inbox, ap_item["actor"], ap_item) + return mess_data + + async def _on_message_retract( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + retract_elt: domish.Element, + fastened_elts + ) -> bool: + if client != self.client: + return True + from_jid = jid.JID(message_elt["from"]) + if not self.is_local(from_jid): + log.debug( + f"ignoring retract request from non local jid {from_jid}" + ) + return False + to_jid = jid.JID(message_elt["to"]) + if (to_jid.host != self.client.jid.full() or not to_jid.user): + # to_jid should be a virtual JID from this gateway + raise exceptions.InternalError( + f"Invalid destinee's JID: {to_jid.full()}" + ) + ap_account = self._e.unescape(to_jid.user) + actor_id = await self.get_ap_actor_id_from_account(ap_account) + inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) + url_actor, ap_item = await self.ap_delete_item( + from_jid.userhostJID(), None, fastened_elts.id, public=False + ) + resp = await self.sign_and_post(inbox, url_actor, ap_item) + return False + + async def _on_reference_received( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + reference_data: Dict[str, Union[str, int]] + ) -> bool: + parsed_uri: dict = reference_data.get("parsed_uri") + if not parsed_uri: + log.warning(f"no parsed URI available in reference {reference_data}") + return False + + try: + mentioned = jid.JID(parsed_uri["path"]) + except RuntimeError: + log.warning(f"invalid target: {reference_data['uri']}") + return False + + if mentioned.host != self.client.jid.full() or not mentioned.user: + log.warning( + f"ignoring mentioned user {mentioned}, it's not a JID mapping an AP " + "account" + ) + return False + + ap_account = self._e.unescape(mentioned.user) + actor_id = await self.get_ap_actor_id_from_account(ap_account) + + parsed_anchor: dict = reference_data.get("parsed_anchor") + if not parsed_anchor: + log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}") + return False + + if parsed_anchor["type"] != "pubsub": + log.warning( + f"ignoring reference with non pubsub anchor, this is not supported: " + "{reference_data!r}" + ) + return False + + try: + pubsub_service = jid.JID(parsed_anchor["path"]) + except RuntimeError: + log.warning(f"invalid anchor: {reference_data['anchor']}") + return False + pubsub_node = parsed_anchor.get("node") + if not pubsub_node: + log.warning(f"missing pubsub node in anchor: {reference_data['anchor']}") + return False + pubsub_item = parsed_anchor.get("item") + if not pubsub_item: + log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}") + return False + + cached_node = await self.host.memory.storage.get_pubsub_node( + client, pubsub_service, pubsub_node + ) + if not cached_node: + log.warning(f"Anchored node not found in cache: {reference_data['anchor']}") + return False + + cached_items, __ = await self.host.memory.storage.get_items( + cached_node, item_ids=[pubsub_item] + ) + if not cached_items: + log.warning( + f"Anchored pubsub item not found in cache: {reference_data['anchor']}" + ) + return False + + cached_item = cached_items[0] + + mb_data = await self._m.item_2_mb_data( + client, cached_item.data, pubsub_service, pubsub_node + ) + ap_item = await self.mb_data_2_ap_item(client, mb_data) + ap_object = ap_item["object"] + ap_object["to"] = [actor_id] + ap_object.setdefault("tag", []).append({ + "type": TYPE_MENTION, + "href": actor_id, + "name": ap_account, + }) + + inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) + + resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item) + + return False + + async def new_reply_to_xmpp_item( + self, + client: SatXMPPEntity, + ap_item: dict, + targets: Dict[str, Set[str]], + mentions: List[Dict[str, str]], + ) -> None: + """We got an AP item which is a reply to an XMPP item""" + in_reply_to = ap_item["inReplyTo"] + url_type, url_args = self.parse_apurl(in_reply_to) + if url_type != "item": + log.warning( + "Ignoring AP item replying to an XMPP item with an unexpected URL " + f"type({url_type!r}):\n{pformat(ap_item)}" + ) + return + try: + parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) + except (IndexError, ValueError): + log.warning( + "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " + f"({in_reply_to!r}):\n{pformat(ap_item)}" + ) + return + parent_item_service, parent_item_node = await self.get_jid_and_node( + parent_item_account + ) + if parent_item_node is None: + parent_item_node = self._m.namespace + items, __ = await self._p.get_items( + client, parent_item_service, parent_item_node, item_ids=[parent_item_id] + ) + try: + parent_item_elt = items[0] + except IndexError: + log.warning( + f"Can't find parent item at {parent_item_service} (node " + f"{parent_item_node!r})\n{pformat(ap_item)}") + return + parent_item_parsed = await self._m.item_2_mb_data( + client, parent_item_elt, parent_item_service, parent_item_node + ) + try: + comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) + comment_node = parent_item_parsed["comments"][0]["node"] + except (KeyError, IndexError): + # we don't have a comment node set for this item + from libervia.backend.tools.xml_tools import pp_elt + log.info(f"{pp_elt(parent_item_elt.toXml())}") + raise NotImplementedError() + else: + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) + await self._p.publish(client, comment_service, comment_node, [item_elt]) + await self.notify_mentions( + targets, mentions, comment_service, comment_node, item_elt["id"] + ) + + def get_ap_item_targets( + self, + item: Dict[str, Any] + ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: + """Retrieve targets of an AP item, and indicate if it's a public one + + @param item: AP object payload + @return: Are returned: + - is_public flag, indicating if the item is world-readable + - a dict mapping target type to targets + """ + targets: Dict[str, Set[str]] = {} + is_public = False + # TODO: handle "audience" + for key in ("to", "bto", "cc", "bcc"): + values = item.get(key) + if not values: + continue + if isinstance(values, str): + values = [values] + for value in values: + if value in PUBLIC_TUPLE: + is_public = True + continue + if not value: + continue + if not self.is_local_url(value): + continue + target_type = self.parse_apurl(value)[0] + if target_type != TYPE_ACTOR: + log.debug(f"ignoring non actor type as a target: {href}") + else: + targets.setdefault(target_type, set()).add(value) + + mentions = [] + tags = item.get("tag") + if tags: + for tag in tags: + if tag.get("type") != TYPE_MENTION: + continue + href = tag.get("href") + if not href: + log.warning('Missing "href" field from mention object: {tag!r}') + continue + if not self.is_local_url(href): + continue + uri_type = self.parse_apurl(href)[0] + if uri_type != TYPE_ACTOR: + log.debug(f"ignoring non actor URI as a target: {href}") + continue + mention = {"uri": href} + mentions.append(mention) + name = tag.get("name") + if name: + mention["content"] = name + + return is_public, targets, mentions + + async def new_ap_item( + self, + client: SatXMPPEntity, + destinee: Optional[jid.JID], + node: str, + item: dict, + ) -> None: + """Analyse, cache and send notification for received AP item + + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + """ + is_public, targets, mentions = self.get_ap_item_targets(item) + if not is_public and targets.keys() == {TYPE_ACTOR}: + # this is a direct message + await self.handle_message_ap_item( + client, targets, mentions, destinee, item + ) + else: + await self.handle_pubsub_ap_item( + client, targets, mentions, destinee, node, item, is_public + ) + + async def handle_message_ap_item( + self, + client: SatXMPPEntity, + targets: Dict[str, Set[str]], + mentions: List[Dict[str, str]], + destinee: Optional[jid.JID], + item: dict, + ) -> None: + """Parse and deliver direct AP items translating to XMPP messages + + @param targets: actors where the item must be delivered + @param destinee: jid of the destinee, + @param item: AP object payload + """ + targets_jids = { + await self.get_jid_from_id(t) + for t_set in targets.values() + for t in t_set + } + if destinee is not None: + targets_jids.add(destinee) + mb_data = await self.ap_item_2_mb_data(item) + extra = { + "origin_id": mb_data["id"] + } + attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) + if attachments: + extra[C.KEY_ATTACHMENTS] = attachments + + defer_l = [] + for target_jid in targets_jids: + defer_l.append( + client.sendMessage( + target_jid, + {'': mb_data.get("content", "")}, + mb_data.get("title"), + extra=extra + ) + ) + await defer.DeferredList(defer_l) + + async def notify_mentions( + self, + targets: Dict[str, Set[str]], + mentions: List[Dict[str, str]], + service: jid.JID, + node: str, + item_id: str, + ) -> None: + """Send mention notifications to recipients and mentioned entities + + XEP-0372 (References) is used. + + Mentions are also sent to recipients as they are primary audience (see + https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). + + """ + anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) + seen = set() + # we start with explicit mentions because mentions' content will be used in the + # future to fill "begin" and "end" reference attributes (we can't do it at the + # moment as there is no way to specify the XML element to use in the blog item). + for mention in mentions: + mentioned_jid = await self.get_jid_from_id(mention["uri"]) + self._refs.send_reference( + self.client, + to_jid=mentioned_jid, + anchor=anchor + ) + seen.add(mentioned_jid) + + remaining = { + await self.get_jid_from_id(t) + for t_set in targets.values() + for t in t_set + } - seen + for target in remaining: + self._refs.send_reference( + self.client, + to_jid=target, + anchor=anchor + ) + + async def handle_pubsub_ap_item( + self, + client: SatXMPPEntity, + targets: Dict[str, Set[str]], + mentions: List[Dict[str, str]], + destinee: Optional[jid.JID], + node: str, + item: dict, + public: bool + ) -> None: + """Analyse, cache and deliver AP items translating to Pubsub + + @param targets: actors/collections where the item must be delivered + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + @param public: True if the item is public + """ + # XXX: "public" is not used for now + service = client.jid + in_reply_to = item.get("inReplyTo") + + if in_reply_to and isinstance(in_reply_to, list): + in_reply_to = in_reply_to[0] + if in_reply_to and isinstance(in_reply_to, str): + if self.is_local_url(in_reply_to): + # this is a reply to an XMPP item + await self.new_reply_to_xmpp_item(client, item, targets, mentions) + return + + # this item is a reply to an AP item, we use or create a corresponding node + # for comments + parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to) + node = parent_node or node + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, node, with_subscriptions=True, create=True, + create_kwargs={"subscribed": True} + ) + else: + # it is a root item (i.e. not a reply to an other item) + create = node == self._events.namespace + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, node, with_subscriptions=True, create=create + ) + if cached_node is None: + log.warning( + f"Received item in unknown node {node!r} at {service}. This may be " + f"due to a cache purge. We synchronise the node\n{item}" + + ) + return + if item.get("type") == TYPE_EVENT: + data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) + else: + data, item_elt = await self.ap_item_2_mb_data_and_elt(item) + await self.host.memory.storage.cache_pubsub_items( + client, + cached_node, + [item_elt], + [data] + ) + + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.pubsub_service.notifyPublish( + service, + node, + [(subscription.subscriber, None, [item_elt])] + ) + + await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) + + async def new_ap_delete_item( + self, + client: SatXMPPEntity, + destinee: Optional[jid.JID], + node: str, + item: dict, + ) -> None: + """Analyse, cache and send notification for received AP item + + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param activity: parent AP activity + @param item: AP object payload + only the "id" field is used + """ + item_id = item.get("id") + if not item_id: + raise exceptions.DataError('"id" attribute is missing in item') + if not item_id.startswith("http"): + raise exceptions.DataError(f"invalid id: {item_id!r}") + if self.is_local_url(item_id): + raise ValueError("Local IDs should not be used") + + # we have no way to know if a deleted item is a direct one (thus a message) or one + # converted to pubsub. We check if the id is in message history to decide what to + # do. + history = await self.host.memory.storage.get( + client, + History, + History.origin_id, + item_id, + (History.messages, History.subjects) + ) + + if history is not None: + # it's a direct message + if history.source_jid != client.jid: + log.warning( + f"retraction received from an entity ''{client.jid}) which is " + f"not the original sender of the message ({history.source_jid}), " + "hack attemps?" + ) + raise exceptions.PermissionError("forbidden") + + await self._r.retract_by_history(client, history) + else: + # no history in cache with this ID, it's probably a pubsub item + cached_node = await self.host.memory.storage.get_pubsub_node( + client, client.jid, node, with_subscriptions=True + ) + if cached_node is None: + log.warning( + f"Received an item retract for node {node!r} at {client.jid} " + "which is not cached" + ) + raise exceptions.NotFound + await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id]) + # notifyRetract is expecting domish.Element instances + item_elt = domish.Element((None, "item")) + item_elt["id"] = item_id + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.pubsub_service.notifyRetract( + client.jid, + node, + [(subscription.subscriber, None, [item_elt])] + )