Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | 4b842c1fb686 |
children | c3b68fdc2de7 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import calendar import hashlib import json from pathlib import Path from pprint import pformat import re from typing import ( Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union, overload, ) from urllib import parse from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding import dateutil from dateutil.parser import parserinfo import shortuuid from sqlalchemy.exc import IntegrityError import treq from treq.response import _Response as TReqResponse from twisted.internet import defer, reactor, threads from twisted.web import http from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish from wokkel import pubsub, rsm from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.memory.sqla_mapping import History, SubscriptionState from libervia.backend.tools import utils from libervia.backend.tools.common import data_format, date_utils, tls, uri from libervia.backend.tools.common.async_utils import async_lru from .ad_hoc import APAdHocService from .events import APEvents from .constants import ( ACTIVITY_OBJECT_MANDATORY, ACTIVITY_TARGET_MANDATORY, ACTIVITY_TYPES, ACTIVITY_TYPES_LOWER, COMMENTS_MAX_PARENTS, CONF_SECTION, IMPORT_NAME, LRU_MAX_SIZE, MEDIA_TYPE_AP, NS_AP, NS_AP_PUBLIC, PUBLIC_TUPLE, TYPE_ACTOR, TYPE_EVENT, TYPE_FOLLOWERS, TYPE_ITEM, TYPE_LIKE, TYPE_MENTION, TYPE_REACTION, TYPE_TOMBSTONE, TYPE_JOIN, TYPE_LEAVE ) from .http_server import HTTPServer from .pubsub_service import APPubsubService from .regex import RE_MENTION log = getLogger(__name__) IMPORT_NAME = "ap-gateway" PLUGIN_INFO = { C.PI_NAME: "ActivityPub Gateway component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: _( "Gateway for bidirectional communication between XMPP and ActivityPub." ), } HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") class APGateway: IMPORT_NAME = IMPORT_NAME # show data send or received through HTTP, used for debugging # 1: log POST objects # 2: log POST and GET objects # 3: log POST and GET objects with HTTP headers for GET requests verbose = 0 def __init__(self, host): self.host = host self.initialised = False self.client = None self._p = host.plugins["XEP-0060"] self._a = host.plugins["XEP-0084"] self._e = host.plugins["XEP-0106"] self._m = host.plugins["XEP-0277"] self._v = host.plugins["XEP-0292"] self._refs = host.plugins["XEP-0372"] self._r = host.plugins["XEP-0424"] self._sfs = host.plugins["XEP-0447"] self._pps = host.plugins["XEP-0465"] self._pa = host.plugins["XEP-0470"] self._c = host.plugins["PUBSUB_CACHE"] self._t = host.plugins["TEXT_SYNTAXES"] self._i = host.plugins["IDENTITY"] self._events = host.plugins["XEP-0471"] self._p.add_managed_node( "", items_cb=self._items_received, # we want to be sure that the callbacks are launched before pubsub cache's # one, as we need to inspect items before they are actually removed from cache # or updated priority=1000 ) self.pubsub_service = APPubsubService(self) self.ad_hoc = APAdHocService(self) self.ap_events = APEvents(self) host.trigger.add("message_received", self._message_received_trigger, priority=-1000) host.trigger.add("XEP-0424_retractReceived", self._on_message_retract) host.trigger.add("XEP-0372_ref_received", self._on_reference_received) host.bridge.add_method( "ap_send", ".plugin", in_sign="sss", out_sign="", method=self._publish_message, async_=True, ) def get_handler(self, __): return self.pubsub_service async def init(self, client): if self.initialised: return self.initialised = True log.info(_("ActivityPub Gateway initialization")) # RSA keys stored_data = await self.host.memory.storage.get_privates( IMPORT_NAME, ["rsa_key"], profile=client.profile ) private_key_pem = stored_data.get("rsa_key") if private_key_pem is None: self.private_key = await threads.deferToThread( rsa.generate_private_key, public_exponent=65537, key_size=4096, ) private_key_pem = self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ).decode() await self.host.memory.storage.set_private_value( IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile ) else: self.private_key = serialization.load_pem_private_key( private_key_pem.encode(), password=None, ) self.public_key = self.private_key.public_key() self.public_key_pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ).decode() # params # URL and port self.public_url = self.host.memory.config_get( CONF_SECTION, "public_url" ) or self.host.memory.config_get( CONF_SECTION, "xmpp_domain" ) if self.public_url is None: log.error( '"public_url" not set in configuration, this is mandatory to have' "ActivityPub Gateway running. Please set this option it to public facing " f"url in {CONF_SECTION!r} configuration section." ) return if parse.urlparse(self.public_url).scheme: log.error( "Scheme must not be specified in \"public_url\", please remove it from " "\"public_url\" configuration option. ActivityPub Gateway won't be run." ) return self.http_port = int(self.host.memory.config_get( CONF_SECTION, 'http_port', 8123)) connection_type = self.host.memory.config_get( CONF_SECTION, 'http_connection_type', 'https') if connection_type not in ('http', 'https'): raise exceptions.ConfigError( 'bad ap-gateay http_connection_type, you must use one of "http" or ' '"https"' ) self.max_items = int(self.host.memory.config_get( CONF_SECTION, 'new_node_max_items', 50 )) self.comments_max_depth = int(self.host.memory.config_get( CONF_SECTION, 'comments_max_depth', 0 )) self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap') self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") # True (default) if we provide gateway only to entities/services from our server self.local_only = C.bool( self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) ) # if True (default), mention will be parsed in non-private content coming from # XMPP. This is necessary as XEP-0372 are coming separately from item where the # mention is done, which is hard to impossible to translate to ActivityPub (where # mention specified inside the item directly). See documentation for details. self.auto_mentions = C.bool( self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) ) html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( CONF_SECTION, 'html_redirect_dict', {} ) self.html_redirect: Dict[str, List[dict]] = {} for url_type, target in html_redirect.items(): if isinstance(target, str): target = {"url": target} elif not isinstance(target, dict): raise exceptions.ConfigError( f"html_redirect target must be a URL or a dict, not {target!r}" ) filters = target.setdefault("filters", {}) if "url" not in target: log.warning(f"invalid HTML redirection, missing target URL: {target}") continue # a slash in the url_type is a syntactic shortcut to have a node filter if "/" in url_type: url_type, node_filter = url_type.split("/", 1) filters["node"] = node_filter self.html_redirect.setdefault(url_type, []).append(target) # HTTP server launch self.server = HTTPServer(self) if connection_type == 'http': reactor.listenTCP(self.http_port, self.server) else: options = tls.get_options_from_config( self.host.memory.config, CONF_SECTION) tls.tls_options_check(options) context_factory = tls.get_tls_context_factory(options) reactor.listenSSL(self.http_port, self.server, context_factory) async def profile_connecting(self, client): self.client = client client.sendHistory = True client._ap_storage = persistent.LazyPersistentBinaryDict( IMPORT_NAME, client.profile ) await self.init(client) def profile_connected(self, client): self.ad_hoc.init(client) async def _items_received( self, client: SatXMPPEntity, itemsEvent: pubsub.ItemsEvent ) -> None: """Callback called when pubsub items are received if the items are adressed to a JID corresponding to an AP actor, they are converted to AP items and sent to the corresponding AP server. If comments nodes are linked, they are automatically subscribed to get new items from there too. """ if client != self.client: return # we need recipient as JID and not gateway own JID to be able to use methods such # as "subscribe" client = self.client.get_virtual_client(itemsEvent.sender) recipient = itemsEvent.recipient if not recipient.user: log.debug("ignoring items event without local part specified") return ap_account = self._e.unescape(recipient.user) if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): await self.convert_and_post_attachments( client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, itemsEvent.items ) else: await self.convert_and_post_items( client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, itemsEvent.items ) async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity: """Get client for this component with a specified jid This is needed to perform operations with the virtual JID corresponding to the AP actor instead of the JID of the gateway itself. @param actor_id: ID of the actor @return: virtual client """ local_jid = await self.get_jid_from_id(actor_id) return self.client.get_virtual_client(local_jid) def is_activity(self, data: dict) -> bool: """Return True if the data has an activity type""" try: return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER except (KeyError, TypeError): return False async def ap_get(self, url: str) -> dict: """Retrieve AP JSON from given URL @raise error.StanzaError: "service-unavailable" is sent when something went wrong with AP server """ resp = await treq.get( url, headers = { "Accept": [MEDIA_TYPE_AP], "Content-Type": [MEDIA_TYPE_AP], } ) if resp.code >= 300: text = await resp.text() if resp.code == 404: raise exceptions.NotFound(f"Can't find resource at {url}") else: msg = f"HTTP error {resp.code} (url: {url}): {text}" raise exceptions.ExternalRequestError(msg) try: return await treq.json_content(resp) except Exception as e: raise error.StanzaError( "service-unavailable", text=f"Can't get AP data at {url}: {e}" ) @overload async def ap_get_object(self, data: dict, key: str) -> Optional[dict]: ... @overload async def ap_get_object( self, data: Union[str, dict], key: None = None ) -> dict: ... async def ap_get_object(self, data, key = None): """Retrieve an AP object, dereferencing when necessary This method is to be used with attributes marked as "Functional" in https://www.w3.org/TR/activitystreams-vocabulary @param data: AP object where an other object is looked for, or the object itself @param key: name of the object to look for, or None if data is the object directly @return: found object if any """ if key is not None: value = data.get(key) else: value = data if value is None: if key is None: raise ValueError("None can't be used with ap_get_object is key is None") return None elif isinstance(value, dict): return value elif isinstance(value, str): if self.is_local_url(value): return await self.ap_get_local_object(value) else: return await self.ap_get(value) else: raise NotImplementedError( "was expecting a string or a dict, got {type(value)}: {value!r}}" ) async def ap_get_local_object( self, url: str ) -> dict: """Retrieve or generate local object for now, only handle XMPP items to convert to AP """ url_type, url_args = self.parse_apurl(url) if url_type == TYPE_ITEM: try: account, item_id = url_args except ValueError: raise ValueError(f"invalid URL: {url}") author_jid, node = await self.get_jid_and_node(account) if node is None: node = self._m.namespace cached_node = await self.host.memory.storage.get_pubsub_node( self.client, author_jid, node ) if not cached_node: log.debug(f"node {node!r} at {author_jid} is not found in cache") found_item = None else: cached_items, __ = await self.host.memory.storage.get_items( cached_node, item_ids=[item_id] ) if not cached_items: log.debug( f"item {item_id!r} of {node!r} at {author_jid} is not found in " "cache" ) found_item = None else: found_item = cached_items[0].data if found_item is None: # the node is not in cache, we have to make a request to retrieve the item # If the node doesn't exist, get_items will raise a NotFound exception found_items, __ = await self._p.get_items( self.client, author_jid, node, item_ids=[item_id] ) try: found_item = found_items[0] except IndexError: raise exceptions.NotFound(f"requested item at {url} can't be found") if node.startswith(self._events.namespace): # this is an event event_data = self._events.event_elt_2_event_data(found_item) ap_item = await self.ap_events.event_data_2_ap_item( event_data, author_jid ) # the URL must return the object and not the activity ap_item["object"]["@context"] = ap_item["@context"] return ap_item["object"] else: # this is a blog item mb_data = await self._m.item_2_mb_data( self.client, found_item, author_jid, node ) ap_item = await self.mb_data_2_ap_item(self.client, mb_data) # the URL must return the object and not the activity return ap_item["object"] else: raise NotImplementedError( 'only object from "item" URLs can be retrieved for now' ) async def ap_get_list( self, data: dict, key: str, only_ids: bool = False ) -> Optional[List[Dict[str, Any]]]: """Retrieve a list of objects from AP data, dereferencing when necessary This method is to be used with non functional vocabularies. Use ``ap_get_object`` otherwise. If the value is a dictionary, it will be wrapped in a list @param data: AP object where a list of objects is looked for @param key: key of the list to look for @param only_ids: if Trye, only items IDs are retrieved @return: list of objects, or None if the key is not present """ value = data.get(key) if value is None: return None elif isinstance(value, str): if self.is_local_url(value): value = await self.ap_get_local_object(value) else: value = await self.ap_get(value) if isinstance(value, dict): return [value] if not isinstance(value, list): raise ValueError(f"A list was expected, got {type(value)}: {value!r}") if only_ids: return [ {"id": v["id"]} if isinstance(v, dict) else {"id": v} for v in value ] else: return [await self.ap_get_object(i) for i in value] async def ap_get_actors( self, data: dict, key: str, as_account: bool = True ) -> List[str]: """Retrieve AP actors from data @param data: AP object containing a field with actors @param key: field to use to retrieve actors @param as_account: if True returns account handles, otherwise will return actor IDs @raise exceptions.DataError: there is not actor data or it is invalid """ value = data.get(key) if value is None: raise exceptions.DataError( f"no actor associated to object {data.get('id')!r}" ) elif isinstance(value, dict): actor_id = value.get("id") if actor_id is None: raise exceptions.DataError( f"invalid actor associated to object {data.get('id')!r}: {value!r}" ) value = [actor_id] elif isinstance(value, str): value = [value] elif isinstance(value, list): try: value = [a if isinstance(a, str) else a["id"] for a in value] except (TypeError, KeyError): raise exceptions.DataError( f"invalid actors list to object {data.get('id')!r}: {value!r}" ) if not value: raise exceptions.DataError( f"list of actors is empty" ) if as_account: return [await self.get_ap_account_from_id(actor_id) for actor_id in value] else: return value async def ap_get_sender_actor( self, data: dict, ) -> str: """Retrieve actor who sent data This is done by checking "actor" field first, then "attributedTo" field. Only the first found actor is taken into account @param data: AP object @return: actor id of the sender @raise exceptions.NotFound: no actor has been found in data """ try: actors = await self.ap_get_actors(data, "actor", as_account=False) except exceptions.DataError: actors = None if not actors: try: actors = await self.ap_get_actors(data, "attributedTo", as_account=False) except exceptions.DataError: raise exceptions.NotFound( 'actor not specified in "actor" or "attributedTo"' ) try: return actors[0] except IndexError: raise exceptions.NotFound("list of actors is empty") def must_encode(self, text: str) -> bool: """Indicate if a text must be period encoded""" return ( not RE_ALLOWED_UNQUOTED.match(text) or text.startswith("___") or "---" in text ) def period_encode(self, text: str) -> str: """Period encode a text see [get_jid_and_node] for reasons of period encoding """ return ( parse.quote(text, safe="") .replace("---", "%2d%2d%2d") .replace("___", "%5f%5f%5f") .replace(".", "%2e") .replace("~", "%7e") .replace("%", ".") ) async def get_ap_account_from_jid_and_node( self, jid_: jid.JID, node: Optional[str] ) -> str: """Construct AP account from JID and node The account construction will use escaping when necessary """ if not node or node == self._m.namespace: node = None if self.client is None: raise exceptions.InternalError("Client is not set yet") if self.is_virtual_jid(jid_): # this is an proxy JID to an AP Actor return self._e.unescape(jid_.user) if node and not jid_.user and not self.must_encode(node): is_pubsub = await self.is_pubsub(jid_) # when we have a pubsub service, the user part can be used to set the node # this produces more user-friendly AP accounts if is_pubsub: jid_.user = node node = None is_local = self.is_local(jid_) user = jid_.user if is_local else jid_.userhost() if user is None: user = "" account_elts = [] if node and self.must_encode(node) or self.must_encode(user): account_elts = ["___"] if node: node = self.period_encode(node) user = self.period_encode(user) if not user: raise exceptions.InternalError("there should be a user part") if node: account_elts.extend((node, "---")) account_elts.extend(( user, "@", jid_.host if is_local else self.client.jid.userhost() )) return "".join(account_elts) def is_local(self, jid_: jid.JID) -> bool: """Returns True if jid_ use a domain or subdomain of gateway's host""" local_host = self.client.host.split(".") assert local_host return jid_.host.split(".")[-len(local_host):] == local_host async def is_pubsub(self, jid_: jid.JID) -> bool: """Indicate if a JID is a Pubsub service""" host_disco = await self.host.get_disco_infos(self.client, jid_) return ( ("pubsub", "service") in host_disco.identities and not ("pubsub", "pep") in host_disco.identities ) async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: """Decode raw AP account handle to get XMPP JID and Pubsub Node Username are case insensitive. By default, the username correspond to local username (i.e. username from component's server). If local name's domain is a pubsub service (and not PEP), the username is taken as a pubsub node. If ``---`` is present in username, the part before is used as pubsub node, and the rest as a JID user part. If username starts with ``___``, characters are encoded using period encoding (i.e. percent encoding where a ``.`` is used instead of ``%``). This horror is necessary due to limitation in some AP implementation (notably Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 examples: ``toto@example.org`` => JID = toto@example.org, node = None ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => JID = pubsub.example.org, node = toto ``tata---toto@example.org`` => JID = toto@example.org, node = tata ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 @param ap_account: ActivityPub account handle (``username@domain.tld``) @return: service JID and pubsub node if pubsub node is None, default microblog pubsub node (and possibly other nodes that plugins may hanlde) will be used @raise ValueError: invalid account @raise PermissionError: non local jid is used when gateway doesn't allow them """ if ap_account.count("@") != 1: raise ValueError("Invalid AP account") if ap_account.startswith("___"): encoded = True ap_account = ap_account[3:] else: encoded = False username, domain = ap_account.split("@") if "---" in username: node, username = username.rsplit("---", 1) else: node = None if encoded: username = parse.unquote( RE_PERIOD_ENC.sub(r"%\g<hex>", username), errors="strict" ) if node: node = parse.unquote( RE_PERIOD_ENC.sub(r"%\g<hex>", node), errors="strict" ) if "@" in username: username, domain = username.rsplit("@", 1) if not node: # we need to check host disco, because disco request to user may be # blocked for privacy reason (see # https://xmpp.org/extensions/xep-0030.html#security) is_pubsub = await self.is_pubsub(jid.JID(domain)) if is_pubsub: # if the host is a pubsub service and not a PEP, we consider that username # is in fact the node name node = username username = None jid_s = f"{username}@{domain}" if username else domain try: jid_ = jid.JID(jid_s) except RuntimeError: raise ValueError(f"Invalid jid: {jid_s!r}") if self.local_only and not self.is_local(jid_): raise exceptions.PermissionError( "This gateway is configured to map only local entities and services" ) return jid_, node def get_local_jid_from_account(self, account: str) -> jid.JID: """Compute JID linking to an AP account The local jid is computer by escaping AP actor handle and using it as local part of JID, where domain part is this gateway own JID """ return jid.JID( None, ( self._e.escape(account), self.client.jid.host, None ) ) async def get_jid_from_id(self, actor_id: str) -> jid.JID: """Compute JID linking to an AP Actor ID The local jid is computer by escaping AP actor handle and using it as local part of JID, where domain part is this gateway own JID If the actor_id comes from local server (checked with self.public_url), it means that we have an XMPP entity, and the original JID is returned """ if self.is_local_url(actor_id): request_type, extra_args = self.parse_apurl(actor_id) if request_type != TYPE_ACTOR or len(extra_args) != 1: raise ValueError(f"invalid actor id: {actor_id!r}") actor_jid, __ = await self.get_jid_and_node(extra_args[0]) return actor_jid account = await self.get_ap_account_from_id(actor_id) return self.get_local_jid_from_account(account) def parse_apurl(self, url: str) -> Tuple[str, List[str]]: """Parse an URL leading to an AP endpoint @param url: URL to parse (schema is not mandatory) @return: endpoint type and extra arguments """ path = parse.urlparse(url).path.lstrip("/") type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") return type_, [parse.unquote(a) for a in extra_args] def build_apurl(self, type_:str , *args: str) -> str: """Build an AP endpoint URL @param type_: type of AP endpoing @param arg: endpoint dependant arguments """ return parse.urljoin( self.base_ap_url, str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) ) def is_local_url(self, url: str) -> bool: """Tells if an URL link to this component ``public_url`` and ``ap_path`` are used to check the URL """ return url.startswith(self.base_ap_url) def is_virtual_jid(self, jid_: jid.JID) -> bool: """Tell if a JID is an AP actor mapped through this gateway""" return jid_.host == self.client.jid.userhost() def build_signature_header(self, values: Dict[str, str]) -> str: """Build key="<value>" signature header from signature data""" fields = [] for key, value in values.items(): if key not in ("(created)", "(expired)"): if '"' in value: raise NotImplementedError( "string escaping is not implemented, double-quote can't be used " f"in {value!r}" ) value = f'"{value}"' fields.append(f"{key}={value}") return ",".join(fields) def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: """Get digest data to use in header and signature @param body: body of the request @return: hash name and digest """ if algo != "SHA-256": raise NotImplementedError("only SHA-256 is implemented for now") return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() @async_lru(maxsize=LRU_MAX_SIZE) async def get_actor_data(self, actor_id) -> dict: """Retrieve actor data with LRU cache""" return await self.ap_get(actor_id) @async_lru(maxsize=LRU_MAX_SIZE) async def get_actor_pub_key_data( self, actor_id: str ) -> Tuple[str, str, rsa.RSAPublicKey]: """Retrieve Public Key data from actor ID @param actor_id: actor ID (url) @return: key_id, owner and public_key @raise KeyError: publicKey is missing from actor data """ actor_data = await self.get_actor_data(actor_id) pub_key_data = actor_data["publicKey"] key_id = pub_key_data["id"] owner = pub_key_data["owner"] pub_key_pem = pub_key_data["publicKeyPem"] pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) return key_id, owner, pub_key def create_activity( self, activity: str, actor_id: str, object_: Optional[Union[str, dict]] = None, target: Optional[Union[str, dict]] = None, activity_id: Optional[str] = None, **kwargs, ) -> Dict[str, Any]: """Generate base data for an activity @param activity: one of ACTIVITY_TYPES @param actor_id: AP actor ID of the sender @param object_: content of "object" field @param target: content of "target" field @param activity_id: ID to use for the activity if not set it will be automatically generated, but it is usually desirable to set the ID manually so it can be retrieved (e.g. for Undo) """ if activity not in ACTIVITY_TYPES: raise exceptions.InternalError(f"invalid activity: {activity!r}") if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: raise exceptions.InternalError( f'"object_" is mandatory for activity {activity!r}' ) if target is None and activity in ACTIVITY_TARGET_MANDATORY: raise exceptions.InternalError( f'"target" is mandatory for activity {activity!r}' ) if activity_id is None: activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" data: Dict[str, Any] = { "@context": [NS_AP], "actor": actor_id, "id": activity_id, "type": activity, } data.update(kwargs) if object_ is not None: data["object"] = object_ if target is not None: data["target"] = target return data def get_key_id(self, actor_id: str) -> str: """Get local key ID from actor ID""" return f"{actor_id}#main-key" async def check_signature( self, signature: str, key_id: str, headers: Dict[str, str] ) -> str: """Verify that signature matches given headers see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 @param signature: Base64 encoded signature @param key_id: ID of the key used to sign the data @param headers: headers and their values, including pseudo-headers @return: id of the signing actor @raise InvalidSignature: signature doesn't match headers """ to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) if key_id.startswith("acct:"): actor = key_id[5:] actor_id = await self.get_ap_actor_id_from_account(actor) else: actor_id = key_id.split("#", 1)[0] pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id) if pub_key_id != key_id or pub_key_owner != actor_id: raise exceptions.EncryptionError("Public Key mismatch") try: pub_key.verify( base64.b64decode(signature), to_sign.encode(), # we have to use PKCS1v15 padding to be compatible with Mastodon padding.PKCS1v15(), # type: ignore hashes.SHA256() # type: ignore ) except InvalidSignature: raise exceptions.EncryptionError( "Invalid signature (using PKC0S1 v1.5 and SHA-256)" ) return actor_id def get_signature_data( self, key_id: str, headers: Dict[str, str] ) -> Tuple[Dict[str, str], Dict[str, str]]: """Generate and return signature and corresponding headers @param parsed_url: URL where the request is sent/has been received @param key_id: ID of the key (URL linking to the data with public key) @param date: HTTP datetime string of signature generation @param body: body of the HTTP request @param headers: headers to sign and their value: default value will be used if not specified @return: headers and signature data ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers removed, and ``Signature`` added. """ # headers must be lower case l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) signature = base64.b64encode(self.private_key.sign( to_sign.encode(), # we have to use PKCS1v15 padding to be compatible with Mastodon padding.PKCS1v15(), # type: ignore hashes.SHA256() # type: ignore )).decode() sign_data = { "keyId": key_id, "Algorithm": "rsa-sha256", "headers": " ".join(l_headers.keys()), "signature": signature } new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} new_headers["Signature"] = self.build_signature_header(sign_data) return new_headers, sign_data async def convert_and_post_items( self, client: SatXMPPEntity, ap_account: str, service: jid.JID, node: str, items: List[domish.Element], subscribe_extra_nodes: bool = True, ) -> None: """Convert XMPP items to AP items and post them to actor inbox @param ap_account: account of ActivityPub actor receiving the item @param service: JID of the (virtual) pubsub service where the item has been published @param node: (virtual) node corresponding where the item has been published @param subscribe_extra_nodes: if True, extra data nodes will be automatically subscribed, that is comment nodes if present and attachments nodes. """ actor_id = await self.get_ap_actor_id_from_account(ap_account) inbox = await self.get_ap_inbox_from_id(actor_id) for item in items: if item.name == "item": cached_item = await self.host.memory.storage.search_pubsub_items({ "profiles": [self.client.profile], "services": [service], "nodes": [node], "names": [item["id"]] }) is_new = not bool(cached_item) if node.startswith(self._events.namespace): # event item event_data = self._events.event_elt_2_event_data(item) try: author_jid = jid.JID(item["publisher"]).userhostJID() except (KeyError, RuntimeWarning): root_elt = item while root_elt.parent is not None: root_elt = root_elt.parent author_jid = jid.JID(root_elt["from"]).userhostJID() if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): # we subscribe automatically to comment nodes if any recipient_jid = self.get_local_jid_from_account(ap_account) recipient_client = self.client.get_virtual_client(recipient_jid) comments_data = event_data.get("comments") if comments_data: comment_service = jid.JID(comments_data["jid"]) comment_node = comments_data["node"] await self._p.subscribe( recipient_client, comment_service, comment_node ) try: await self._pa.subscribe( recipient_client, service, node, event_data["id"] ) except exceptions.NotFound: log.debug( f"no attachment node found for item {event_data['id']!r} " f"on {node!r} at {service}" ) ap_item = await self.ap_events.event_data_2_ap_item( event_data, author_jid, is_new=is_new ) else: # blog item mb_data = await self._m.item_2_mb_data(client, item, service, node) author_jid = jid.JID(mb_data["author_jid"]) if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): # we subscribe automatically to comment nodes if any recipient_jid = self.get_local_jid_from_account(ap_account) recipient_client = self.client.get_virtual_client(recipient_jid) for comment_data in mb_data.get("comments", []): comment_service = jid.JID(comment_data["service"]) if self.is_virtual_jid(comment_service): log.debug( f"ignoring virtual comment service: {comment_data}" ) continue comment_node = comment_data["node"] await self._p.subscribe( recipient_client, comment_service, comment_node ) try: await self._pa.subscribe( recipient_client, service, node, mb_data["id"] ) except exceptions.NotFound: log.debug( f"no attachment node found for item {mb_data['id']!r} on " f"{node!r} at {service}" ) ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) url_actor = ap_item["actor"] elif item.name == "retract": url_actor, ap_item = await self.ap_delete_item( client.jid, node, item["id"] ) else: raise exceptions.InternalError(f"unexpected element: {item.toXml()}") await self.sign_and_post(inbox, url_actor, ap_item) async def convert_and_post_attachments( self, client: SatXMPPEntity, ap_account: str, service: jid.JID, node: str, items: List[domish.Element], publisher: Optional[jid.JID] = None ) -> None: """Convert XMPP item attachments to AP activities and post them to actor inbox @param ap_account: account of ActivityPub actor receiving the item @param service: JID of the (virtual) pubsub service where the item has been published @param node: (virtual) node corresponding where the item has been published subscribed, that is comment nodes if present and attachments nodes. @param items: attachments items @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub service, it's the publisher of the item). To be filled only when the publisher is known for sure, otherwise publisher will be determined either if "publisher" attribute is set by pubsub service, or as a last resort, using item's ID (which MUST be publisher bare JID according to pubsub-attachments specification). """ if len(items) != 1: log.warning( "we should get exactly one attachment item for an entity, got " f"{len(items)})" ) actor_id = await self.get_ap_actor_id_from_account(ap_account) inbox = await self.get_ap_inbox_from_id(actor_id) item_elt = items[0] item_id = item_elt["id"] if publisher is None: item_pub_s = item_elt.getAttribute("publisher") publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id) if publisher.userhost() != item_id: log.warning( "attachments item ID must be publisher's bare JID, ignoring: " f"{item_elt.toXml()}" ) return if self.is_virtual_jid(publisher): log.debug(f"ignoring item coming from local virtual JID {publisher}") return if publisher is not None: item_elt["publisher"] = publisher.userhost() item_service, item_node, item_id = self._pa.attachment_node_2_item(node) item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node) if self.is_virtual_jid(item_service): # it's a virtual JID mapping to an external AP actor, we can use the # item_id directly item_url = item_id if not item_url.startswith("https:"): log.warning( "item ID of external AP actor is not an https link, ignoring: " f"{item_id!r}" ) return else: item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ "profiles": [self.client.profile], "services": [service], "nodes": [node], "names": [item_elt["id"]] }) if not old_attachment_pubsub_items: old_attachment = {} else: old_attachment_items = [i.data for i in old_attachment_pubsub_items] old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items) try: old_attachment = old_attachments[0] except IndexError: # no known element was present in attachments old_attachment = {} publisher_account = await self.get_ap_account_from_jid_and_node( publisher, None ) publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) try: attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] except IndexError: # no known element was present in attachments attachments = {} # noticed if "noticed" in attachments: if not "noticed" in old_attachment: # new "noticed" attachment, we translate to "Like" activity activity_id = self.build_apurl("like", item_account, item_id) activity = self.create_activity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) activity["to"] = [ap_account] activity["cc"] = [NS_AP_PUBLIC] await self.sign_and_post(inbox, publisher_actor_id, activity) else: if "noticed" in old_attachment: # "noticed" attachment has been removed, we undo the "Like" activity activity_id = self.build_apurl("like", item_account, item_id) activity = self.create_activity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) activity["to"] = [ap_account] activity["cc"] = [NS_AP_PUBLIC] undo = self.create_activity("Undo", publisher_actor_id, activity) await self.sign_and_post(inbox, publisher_actor_id, undo) # reactions new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) reactions_remove = old_reactions - new_reactions reactions_add = new_reactions - old_reactions for reactions, undo in ((reactions_remove, True), (reactions_add, False)): for reaction in reactions: activity_id = self.build_apurl( "reaction", item_account, item_id, reaction.encode().hex() ) reaction_activity = self.create_activity( TYPE_REACTION, publisher_actor_id, item_url, activity_id=activity_id ) reaction_activity["content"] = reaction reaction_activity["to"] = [ap_account] reaction_activity["cc"] = [NS_AP_PUBLIC] if undo: activy = self.create_activity( "Undo", publisher_actor_id, reaction_activity ) else: activy = reaction_activity await self.sign_and_post(inbox, publisher_actor_id, activy) # RSVP if "rsvp" in attachments: attending = attachments["rsvp"].get("attending", "no") old_attending = old_attachment.get("rsvp", {}).get("attending", "no") if attending != old_attending: activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) activity = self.create_activity( activity_type, publisher_actor_id, item_url, activity_id=activity_id ) activity["to"] = [ap_account] activity["cc"] = [NS_AP_PUBLIC] await self.sign_and_post(inbox, publisher_actor_id, activity) else: if "rsvp" in old_attachment: old_attending = old_attachment.get("rsvp", {}).get("attending", "no") if old_attending == "yes": activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) activity = self.create_activity( TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id ) activity["to"] = [ap_account] activity["cc"] = [NS_AP_PUBLIC] await self.sign_and_post(inbox, publisher_actor_id, activity) if service.user and self.is_virtual_jid(service): # the item is on a virtual service, we need to store it in cache log.debug("storing attachments item in cache") cached_node = await self.host.memory.storage.get_pubsub_node( client, service, node, with_subscriptions=True, create=True ) await self.host.memory.storage.cache_pubsub_items( self.client, cached_node, [item_elt], [attachments] ) async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse: """Sign a documentent and post it to AP server @param url: AP server endpoint @param actor_id: originating actor ID (URL) @param doc: document to send """ if self.verbose: __, actor_args = self.parse_apurl(actor_id) actor_account = actor_args[0] to_log = [ "", f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" ] p_url = parse.urlparse(url) body = json.dumps(doc).encode() digest_algo, digest_hash = self.get_digest(body) digest = f"{digest_algo}={digest_hash}" headers = { "(request-target)": f"post {p_url.path}", "Host": p_url.hostname, "Date": http.datetimeToString().decode(), "Digest": digest } headers["Content-Type"] = ( 'application/activity+json' ) headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers) if self.verbose: if self.verbose>=3: h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items()) to_log.append(f" headers:\n{h_to_log}") to_log.append("---") log.info("\n".join(to_log)) resp = await treq.post( url, body, headers=headers, ) if resp.code >= 300: text = await resp.text() log.warning(f"POST request to {url} failed [{resp.code}]: {text}") elif self.verbose: log.info(f"==> response code: {resp.code}") return resp def _publish_message(self, mess_data_s: str, service_s: str, profile: str): mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore service = jid.JID(service_s) client = self.host.get_client(profile) return defer.ensureDeferred(self.publish_message(client, mess_data, service)) @async_lru(maxsize=LRU_MAX_SIZE) async def get_ap_actor_id_from_account(self, account: str) -> str: """Retrieve account ID from it's handle using WebFinger Don't use this method to get local actor id from a local account derivated for JID: in this case, the actor ID is retrieve with ``self.build_apurl(TYPE_ACTOR, ap_account)`` @param account: AP handle (user@domain.tld) @return: Actor ID (which is an URL) """ if account.count("@") != 1 or "/" in account: raise ValueError(f"Invalid account: {account!r}") host = account.split("@")[1] try: finger_data = await treq.json_content(await treq.get( f"https://{host}/.well-known/webfinger?" f"resource=acct:{parse.quote_plus(account)}", )) except Exception as e: raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") for link in finger_data.get("links", []): if ( link.get("type") == "application/activity+json" and link.get("rel") == "self" ): href = link.get("href", "").strip() if not href: raise ValueError( f"Invalid webfinger data for {account:r}: missing href" ) break else: raise ValueError( f"No ActivityPub link found for {account!r}" ) return href async def get_ap_actor_data_from_account(self, account: str) -> dict: """Retrieve ActivityPub Actor data @param account: ActivityPub Actor identifier """ href = await self.get_ap_actor_id_from_account(account) return await self.ap_get(href) async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str: """Retrieve inbox of an actor_id @param use_shared: if True, and a shared inbox exists, it will be used instead of the user inbox """ data = await self.get_actor_data(actor_id) if use_shared: try: return data["endpoints"]["sharedInbox"] except KeyError: pass return data["inbox"] @async_lru(maxsize=LRU_MAX_SIZE) async def get_ap_account_from_id(self, actor_id: str) -> str: """Retrieve AP account from the ID URL Works with external or local actor IDs. @param actor_id: AP ID of the actor (URL to the actor data) @return: AP handle """ if self.is_local_url(actor_id): url_type, url_args = self.parse_apurl(actor_id) if url_type != "actor" or not url_args: raise exceptions.DataError( f"invalid local actor ID: {actor_id}" ) account = url_args[0] try: account_user, account_host = account.split('@') except ValueError: raise exceptions.DataError( f"invalid account from url: {actor_id}" ) if not account_user or account_host != self.public_url: raise exceptions.DataError( f"{account!r} is not a valid local account (from {actor_id})" ) return account url_parsed = parse.urlparse(actor_id) actor_data = await self.get_actor_data(actor_id) username = actor_data.get("preferredUsername") if not username: raise exceptions.DataError( 'No "preferredUsername" field found, can\'t retrieve actor account' ) account = f"{username}@{url_parsed.hostname}" # we try to retrieve the actor ID from the account to check it found_id = await self.get_ap_actor_id_from_account(account) if found_id != actor_id: # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 msg = ( f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " f"({actor_id!r}). This AP instance doesn't seems to use " '"preferredUsername" as we expect.' ) log.warning(msg) raise exceptions.DataError(msg) return account async def get_ap_items( self, collection: dict, max_items: Optional[int] = None, chronological_pagination: bool = True, after_id: Optional[str] = None, start_index: Optional[int] = None, parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, only_ids: bool = False, ) -> Tuple[List[domish.Element], rsm.RSMResponse]: """Retrieve AP items and convert them to XMPP items @param account: AP account handle to get items from @param max_items: maximum number of items to retrieve retrieve all items by default @param chronological_pagination: get pages in chronological order AP use reversed chronological order for pagination, "first" page returns more recent items. If "chronological_pagination" is True, "last" AP page will be retrieved first. @param after_id: if set, retrieve items starting from given ID Due to ActivityStream Collection Paging limitations, this is inefficient and if ``after_id`` is not already in cache, we have to retrieve every page until we find it. In most common cases, ``after_id`` should be in cache though (client usually use known ID when in-order pagination is used). @param start_index: start retrieving items from the one with given index Due to ActivityStream Collection Paging limitations, this is inefficient and all pages before the requested index will be retrieved to count items. @param parser: method to use to parse AP items and get XMPP item elements if None, use default generic parser @param only_ids: if True, only retrieve items IDs Retrieving only item IDs avoid HTTP requests to retrieve items, it may be sufficient in some use cases (e.g. when retrieving following/followers collections) @return: XMPP Pubsub items and corresponding RSM Response Items are always returned in chronological order in the result """ if parser is None: parser = self.ap_item_2_mb_elt rsm_resp: Dict[str, Union[bool, int]] = {} try: count = collection["totalItems"] except KeyError: log.warning( f'"totalItems" not found in collection {collection.get("id")}, ' "defaulting to 20" ) count = 20 else: log.info(f"{collection.get('id')} has {count} item(s)") rsm_resp["count"] = count if start_index is not None: assert chronological_pagination and after_id is None if start_index >= count: return [], rsm_resp elif start_index == 0: # this is the default behaviour pass elif start_index > 5000: raise error.StanzaError( "feature-not-implemented", text="Maximum limit for previous_index has been reached, this limit" "is set to avoid DoS" ) else: # we'll convert "start_index" to "after_id", thus we need the item just # before "start_index" previous_index = start_index - 1 retrieved_items = 0 current_page = collection["last"] while retrieved_items < count: page_data, items = await self.parse_ap_page( current_page, parser, only_ids ) if not items: log.warning(f"found an empty AP page at {current_page}") return [], rsm_resp page_start_idx = retrieved_items retrieved_items += len(items) if previous_index <= retrieved_items: after_id = items[previous_index - page_start_idx]["id"] break try: current_page = page_data["prev"] except KeyError: log.warning( f"missing previous page link at {current_page}: {page_data!r}" ) raise error.StanzaError( "service-unavailable", "Error while retrieving previous page from AP service at " f"{current_page}" ) init_page = "last" if chronological_pagination else "first" page = collection.get(init_page) if not page: raise exceptions.DataError( f"Initial page {init_page!r} not found for collection " f"{collection.get('id')})" ) items = [] page_items = [] retrieved_items = 0 found_after_id = False while retrieved_items < count: __, page_items = await self.parse_ap_page(page, parser, only_ids) if not page_items: break retrieved_items += len(page_items) if after_id is not None and not found_after_id: # if we have an after_id, we ignore all items until the requested one is # found try: limit_idx = [i["id"] for i in page_items].index(after_id) except ValueError: # if "after_id" is not found, we don't add any item from this page page_id = page.get("id") if isinstance(page, dict) else page log.debug(f"{after_id!r} not found at {page_id}, skipping") else: found_after_id = True if chronological_pagination: start_index = retrieved_items - len(page_items) + limit_idx + 1 page_items = page_items[limit_idx+1:] else: start_index = count - (retrieved_items - len(page_items) + limit_idx + 1) page_items = page_items[:limit_idx] items.extend(page_items) else: items.extend(page_items) if max_items is not None and len(items) >= max_items: if chronological_pagination: items = items[:max_items] else: items = items[-max_items:] break page = collection.get("prev" if chronological_pagination else "next") if not page: break if after_id is not None and not found_after_id: raise error.StanzaError("item-not-found") if items: if after_id is None: rsm_resp["index"] = 0 if chronological_pagination else count - len(items) if start_index is not None: rsm_resp["index"] = start_index elif after_id is not None: log.warning("Can't determine index of first element") elif chronological_pagination: rsm_resp["index"] = 0 else: rsm_resp["index"] = count - len(items) rsm_resp.update({ "first": items[0]["id"], "last": items[-1]["id"] }) return items, rsm.RSMResponse(**rsm_resp) async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: """Convert AP item to parsed microblog data and corresponding item element""" mb_data = await self.ap_item_2_mb_data(ap_item) item_elt = await self._m.mb_data_2_entry_elt( self.client, mb_data, mb_data["id"], None, self._m.namespace ) if "repeated" in mb_data["extra"]: item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] else: item_elt["publisher"] = mb_data["author_jid"] return mb_data, item_elt async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: """Convert AP item to XMPP item element""" __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) return item_elt async def parse_ap_page( self, page: Union[str, dict], parser: Callable[[dict], Awaitable[domish.Element]], only_ids: bool = False ) -> Tuple[dict, List[domish.Element]]: """Convert AP objects from an AP page to XMPP items @param page: Can be either url linking and AP page, or the page data directly @param parser: method to use to parse AP items and get XMPP item elements @param only_ids: if True, only retrieve items IDs @return: page data, pubsub items """ page_data = await self.ap_get_object(page) if page_data is None: log.warning('No data found in collection') return {}, [] ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids) if ap_items is None: ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids) if not ap_items: log.warning(f'No item field found in collection: {page_data!r}') return page_data, [] else: log.warning( "Items are not ordered, this is not spec compliant" ) items = [] # AP Collections are in antichronological order, but we expect chronological in # Pubsub, thus we reverse it for ap_item in reversed(ap_items): try: items.append(await parser(ap_item)) except (exceptions.DataError, NotImplementedError, error.StanzaError): continue return page_data, items async def get_comments_nodes( self, item_id: str, parent_id: Optional[str] ) -> Tuple[Optional[str], Optional[str]]: """Get node where this item is and node to use for comments if config option "comments_max_depth" is set, a common node will be used below the given depth @param item_id: ID of the reference item @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") @return: a tuple with parent_node_id, comments_node_id: - parent_node_id is the ID of the node where reference item must be. None is returned when the root node (i.e. not a comments node) must be used. - comments_node_id: is the ID of the node to use for comments. None is returned when no comment node must be used (happens when we have reached "comments_max_depth") """ if parent_id is None or not self.comments_max_depth: return ( self._m.get_comments_node(parent_id) if parent_id is not None else None, self._m.get_comments_node(item_id) ) parent_url = parent_id parents = [] for __ in range(COMMENTS_MAX_PARENTS): parent_item = await self.ap_get(parent_url) parents.insert(0, parent_item) parent_url = parent_item.get("inReplyTo") if parent_url is None: break parent_limit = self.comments_max_depth-1 if len(parents) <= parent_limit: return ( self._m.get_comments_node(parents[-1]["id"]), self._m.get_comments_node(item_id) ) else: last_level_item = parents[parent_limit] return ( self._m.get_comments_node(last_level_item["id"]), None ) async def ap_item_2_mb_data(self, ap_item: dict) -> dict: """Convert AP activity or object to microblog data @param ap_item: ActivityPub item to convert Can be either an activity of an object @return: AP Item's Object and microblog data @raise exceptions.DataError: something is invalid in the AP item @raise NotImplementedError: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ is_activity = self.is_activity(ap_item) if is_activity: ap_object = await self.ap_get_object(ap_item, "object") if not ap_object: log.warning(f'No "object" found in AP item {ap_item!r}') raise exceptions.DataError else: ap_object = ap_item item_id = ap_object.get("id") if not item_id: log.warning(f'No "id" found in AP item: {ap_object!r}') raise exceptions.DataError mb_data = {"id": item_id, "extra": {}} # content try: language, content_xhtml = ap_object["contentMap"].popitem() except (KeyError, AttributeError): try: mb_data["content_xhtml"] = ap_object["content"] except KeyError: log.warning(f"no content found:\n{ap_object!r}") raise exceptions.DataError else: mb_data["language"] = language mb_data["content_xhtml"] = content_xhtml mb_data["content"] = await self._t.convert( mb_data["content_xhtml"], self._t.SYNTAX_XHTML, self._t.SYNTAX_TEXT, False, ) if "attachment" in ap_object: attachments = mb_data["extra"][C.KEY_ATTACHMENTS] = [] for ap_attachment in ap_object["attachment"]: try: url = ap_attachment["url"] except KeyError: log.warning( f'"url" missing in AP attachment, ignoring: {ap_attachment}' ) continue if not url.startswith("http"): log.warning(f"non HTTP URL in attachment, ignoring: {ap_attachment}") continue attachment = {"url": url} for ap_key, key in ( ("mediaType", "media_type"), # XXX: as weird as it seems, "name" is actually used for description # in AP world ("name", "desc"), ): value = ap_attachment.get(ap_key) if value: attachment[key] = value attachments.append(attachment) # author if is_activity: authors = await self.ap_get_actors(ap_item, "actor") else: authors = await self.ap_get_actors(ap_object, "attributedTo") if len(authors) > 1: # we only keep first item as author # TODO: handle multiple actors log.warning("multiple actors are not managed") account = authors[0] author_jid = self.get_local_jid_from_account(account).full() mb_data["author"] = account.split("@", 1)[0] mb_data["author_jid"] = author_jid # published/updated for field in ("published", "updated"): value = ap_object.get(field) if not value and field == "updated": value = ap_object.get("published") if value: try: mb_data[field] = calendar.timegm( dateutil.parser.parse(str(value)).utctimetuple() ) except dateutil.parser.ParserError as e: log.warning(f"Can't parse {field!r} field: {e}") # repeat if "_repeated" in ap_item: mb_data["extra"]["repeated"] = ap_item["_repeated"] # comments in_reply_to = ap_object.get("inReplyTo") __, comments_node = await self.get_comments_nodes(item_id, in_reply_to) if comments_node is not None: comments_data = { "service": author_jid, "node": comments_node, "uri": uri.build_xmpp_uri( "pubsub", path=author_jid, node=comments_node ) } mb_data["comments"] = [comments_data] return mb_data async def get_reply_to_id_from_xmpp_node( self, client: SatXMPPEntity, ap_account: str, parent_item: str, mb_data: dict ) -> str: """Get URL to use for ``inReplyTo`` field in AP item. There is currently no way to know the parent service of a comment with XEP-0277. To work around that, we try to check if we have this item in the cache (we should). If there is more that one item with this ID, we first try to find one with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`. @param ap_account: AP account corresponding to the publication author @param parent_item: ID of the node where the publication this item is replying to has been posted @param mb_data: microblog data of the publication @return: URL to use in ``inReplyTo`` field """ # FIXME: propose a protoXEP to properly get parent item, node and service found_items = await self.host.memory.storage.search_pubsub_items({ "profiles": [client.profile], "names": [parent_item] }) if not found_items: log.warning(f"parent item {parent_item!r} not found in cache") parent_ap_account = ap_account elif len(found_items) == 1: cached_node = found_items[0].node parent_ap_account = await self.get_ap_account_from_jid_and_node( cached_node.service, cached_node.name ) else: # we found several cached item with given ID, we check if there is one # corresponding to this author try: author = jid.JID(mb_data["author_jid"]).userhostJID() cached_item = next( i for i in found_items if jid.JID(i.data["publisher"]).userhostJID() == author ) except StopIteration: # no item corresponding to this author, we use ap_account log.warning( "Can't find a single cached item for parent item " f"{parent_item!r}" ) parent_ap_account = ap_account else: cached_node = cached_item.node parent_ap_account = await self.get_ap_account_from_jid_and_node( cached_node.service, cached_node.name ) return self.build_apurl( TYPE_ITEM, parent_ap_account, parent_item ) async def repeated_mb_2_ap_item( self, mb_data: dict ) -> dict: """Convert repeated blog item to suitable AP Announce activity @param mb_data: microblog metadata of an item repeating an other blog post @return: Announce activity linking to the repeated item """ repeated = mb_data["extra"]["repeated"] repeater = jid.JID(repeated["by"]) repeater_account = await self.get_ap_account_from_jid_and_node( repeater, None ) repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) repeated_uri = repeated["uri"] if not repeated_uri.startswith("xmpp:"): log.warning( "Only xmpp: URL are handled for repeated item at the moment, ignoring " f"item {mb_data}" ) raise NotImplementedError parsed_url = uri.parse_xmpp_uri(repeated_uri) if parsed_url["type"] != "pubsub": log.warning( "Only pubsub URL are handled for repeated item at the moment, ignoring " f"item {mb_data}" ) raise NotImplementedError rep_service = jid.JID(parsed_url["path"]) rep_item = parsed_url["item"] activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"]) if self.is_virtual_jid(rep_service): # it's an AP actor linked through this gateway # in this case we can simply use the item ID if not rep_item.startswith("https:"): log.warning( f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n" f"{mb_data}" ) announced_uri = rep_item repeated_account = self._e.unescape(rep_service.user) else: # the repeated item is an XMPP publication, we build the corresponding ID rep_node = parsed_url["node"] repeated_account = await self.get_ap_account_from_jid_and_node( rep_service, rep_node ) announced_uri = self.build_apurl("item", repeated_account, rep_item) announce = self.create_activity( "Announce", repeater_id, announced_uri, activity_id=activity_id ) announce["to"] = [NS_AP_PUBLIC] announce["cc"] = [ self.build_apurl(TYPE_FOLLOWERS, repeater_account), await self.get_ap_actor_id_from_account(repeated_account) ] return announce async def mb_data_2_ap_item( self, client: SatXMPPEntity, mb_data: dict, public: bool =True, is_new: bool = True, ) -> dict: """Convert Libervia Microblog Data to ActivityPub item @param mb_data: microblog data (as used in plugin XEP-0277) to convert If ``public`` is True, ``service`` and ``node`` keys must be set. If ``published`` is not set, current datetime will be used @param public: True if the message is not a private/direct one if True, the AP Item will be marked as public, and AP followers of target AP account (which retrieve from ``service``) will be put in ``cc``. ``inReplyTo`` will also be set if suitable if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). This is usually used for direct messages. @param is_new: if True, the item is a new one (no instance has been found in cache). If True, a "Create" activity will be generated, otherwise an "Update" one will be. @return: Activity item """ extra = mb_data.get("extra", {}) if "repeated" in extra: return await self.repeated_mb_2_ap_item(mb_data) if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): mb_data["author_jid"] = client.jid.userhost() ap_account = await self.get_ap_account_from_jid_and_node( jid.JID(mb_data["author_jid"]), None ) url_actor = self.build_apurl(TYPE_ACTOR, ap_account) url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) ap_object = { "id": url_item, "type": "Note", "published": utils.xmpp_date(mb_data.get("published")), "attributedTo": url_actor, "content": mb_data.get("content_xhtml") or mb_data["content"], } language = mb_data.get("language") if language: ap_object["contentMap"] = {language: ap_object["content"]} attachments = extra.get(C.KEY_ATTACHMENTS) if attachments: ap_attachments = ap_object["attachment"] = [] for attachment in attachments: try: url = next( s['url'] for s in attachment["sources"] if 'url' in s ) except (StopIteration, KeyError): log.warning( f"Ignoring attachment without URL: {attachment}" ) continue ap_attachment = { "url": url } for key, ap_key in ( ("media_type", "mediaType"), # XXX: yes "name", cf. [ap_item_2_mb_data] ("desc", "name"), ): value = attachment.get(key) if value: ap_attachment[ap_key] = value ap_attachments.append(ap_attachment) if public: ap_object["to"] = [NS_AP_PUBLIC] if self.auto_mentions: for m in RE_MENTION.finditer(ap_object["content"]): mention = m.group() mentioned = mention[1:] __, m_host = mentioned.split("@", 1) if m_host in (self.public_url, self.client.jid.host): # we ignore mention of local users, they should be sent as XMPP # references continue try: mentioned_id = await self.get_ap_actor_id_from_account(mentioned) except Exception as e: log.warning(f"Can't add mention to {mentioned!r}: {e}") else: ap_object["to"].append(mentioned_id) ap_object.setdefault("tag", []).append({ "type": TYPE_MENTION, "href": mentioned_id, "name": mention, }) try: node = mb_data["node"] service = jid.JID(mb_data["service"]) except KeyError: # node and service must always be specified when this method is used raise exceptions.InternalError( "node or service is missing in mb_data" ) target_ap_account = await self.get_ap_account_from_jid_and_node( service, node ) if self.is_virtual_jid(service): # service is a proxy JID for AP account actor_data = await self.get_ap_actor_data_from_account(target_ap_account) followers = actor_data.get("followers") else: # service is a real XMPP entity followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) if followers: ap_object["cc"] = [followers] if self._m.is_comment_node(node): parent_item = self._m.get_parent_item(node) if self.is_virtual_jid(service): # the publication is on a virtual node (i.e. an XMPP node managed by # this gateway and linking to an ActivityPub actor) ap_object["inReplyTo"] = parent_item else: # the publication is from a followed real XMPP node ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( client, ap_account, parent_item, mb_data ) return self.create_activity( "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item ) async def publish_message( self, client: SatXMPPEntity, mess_data: dict, service: jid.JID ) -> None: """Send an AP message .. note:: This is a temporary method used for development only @param mess_data: message data. Following keys must be set: ``node`` identifier of message which is being replied (this will correspond to pubsub node in the future) ``content_xhtml`` or ``content`` message body (respectively in XHTML or plain text) @param service: JID corresponding to the AP actor. """ if not service.user: raise ValueError("service must have a local part") account = self._e.unescape(service.user) ap_actor_data = await self.get_ap_actor_data_from_account(account) try: inbox_url = ap_actor_data["endpoints"]["sharedInbox"] except KeyError: raise exceptions.DataError("Can't get ActivityPub actor inbox") item_data = await self.mb_data_2_ap_item(client, mess_data) url_actor = item_data["actor"] resp = await self.sign_and_post(inbox_url, url_actor, item_data) async def ap_delete_item( self, jid_: jid.JID, node: Optional[str], item_id: str, public: bool = True ) -> Tuple[str, Dict[str, Any]]: """Build activity to delete an AP item @param jid_: JID of the entity deleting an item @param node: node where the item is deleted None if it's microblog or a message @param item_id: ID of the item to delete it's the Pubsub ID or message's origin ID @param public: if True, the activity will be addressed to public namespace @return: actor_id of the entity deleting the item, activity to send """ if node is None: node = self._m.namespace author_account = await self.get_ap_account_from_jid_and_node(jid_, node) author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) items = await self.host.memory.storage.search_pubsub_items({ "profiles": [self.client.profile], "services": [jid_], "names": [item_id] }) if not items: log.warning( f"Deleting an unknown item at service {jid_}, node {node} and id " f"{item_id}" ) else: try: mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) if "repeated" in mb_data["extra"]: # we are deleting a repeated item, we must translate this to an # "Undo" of the "Announce" activity instead of a "Delete" one announce = await self.repeated_mb_2_ap_item(mb_data) undo = self.create_activity("Undo", author_actor_id, announce) return author_actor_id, undo except Exception as e: log.debug( f"Can't parse item, maybe it's not a blog item: {e}\n" f"{items[0].toXml()}" ) url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) ap_item = self.create_activity( "Delete", author_actor_id, { "id": url_item, "type": TYPE_TOMBSTONE } ) if public: ap_item["to"] = [NS_AP_PUBLIC] return author_actor_id, ap_item def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred ) -> bool: """add the gateway workflow on post treatment""" if self.client is None: log.debug(f"no client set, ignoring message: {message_elt.toXml()}") return True post_treat.addCallback( lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) ) return True async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: """Called once message has been parsed this method handle the conversion to AP items and posting """ if client != self.client: return mess_data if mess_data["type"] not in ("chat", "normal"): log.warning(f"ignoring message with unexpected type: {mess_data}") return mess_data if not self.is_local(mess_data["from"]): log.warning(f"ignoring non local message: {mess_data}") return mess_data if not mess_data["to"].user: log.warning( f"ignoring message addressed to gateway itself: {mess_data}" ) return mess_data actor_account = self._e.unescape(mess_data["to"].user) actor_id = await self.get_ap_actor_id_from_account(actor_account) inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) try: language, message = next(iter(mess_data["message"].items())) except (KeyError, StopIteration): log.warning(f"ignoring empty message: {mess_data}") return mess_data mb_data = { "content": message, } if language: mb_data["language"] = language origin_id = mess_data["extra"].get("origin_id") if origin_id: # we need to use origin ID when present to be able to retract the message mb_data["id"] = origin_id attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) if attachments: mb_data["extra"] = { C.KEY_ATTACHMENTS: attachments } client = self.client.get_virtual_client(mess_data["from"]) ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) ap_object = ap_item["object"] ap_object["to"] = ap_item["to"] = [actor_id] # we add a mention to direct message, otherwise peer is not notified in some AP # implementations (notably Mastodon), and the message may be missed easily. ap_object.setdefault("tag", []).append({ "type": TYPE_MENTION, "href": actor_id, "name": f"@{actor_account}", }) await self.sign_and_post(inbox, ap_item["actor"], ap_item) return mess_data async def _on_message_retract( self, client: SatXMPPEntity, message_elt: domish.Element, retract_elt: domish.Element, fastened_elts ) -> bool: if client != self.client: return True from_jid = jid.JID(message_elt["from"]) if not self.is_local(from_jid): log.debug( f"ignoring retract request from non local jid {from_jid}" ) return False to_jid = jid.JID(message_elt["to"]) if (to_jid.host != self.client.jid.full() or not to_jid.user): # to_jid should be a virtual JID from this gateway raise exceptions.InternalError( f"Invalid destinee's JID: {to_jid.full()}" ) ap_account = self._e.unescape(to_jid.user) actor_id = await self.get_ap_actor_id_from_account(ap_account) inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) url_actor, ap_item = await self.ap_delete_item( from_jid.userhostJID(), None, fastened_elts.id, public=False ) resp = await self.sign_and_post(inbox, url_actor, ap_item) return False async def _on_reference_received( self, client: SatXMPPEntity, message_elt: domish.Element, reference_data: Dict[str, Union[str, int]] ) -> bool: parsed_uri: dict = reference_data.get("parsed_uri") if not parsed_uri: log.warning(f"no parsed URI available in reference {reference_data}") return False try: mentioned = jid.JID(parsed_uri["path"]) except RuntimeError: log.warning(f"invalid target: {reference_data['uri']}") return False if mentioned.host != self.client.jid.full() or not mentioned.user: log.warning( f"ignoring mentioned user {mentioned}, it's not a JID mapping an AP " "account" ) return False ap_account = self._e.unescape(mentioned.user) actor_id = await self.get_ap_actor_id_from_account(ap_account) parsed_anchor: dict = reference_data.get("parsed_anchor") if not parsed_anchor: log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}") return False if parsed_anchor["type"] != "pubsub": log.warning( f"ignoring reference with non pubsub anchor, this is not supported: " "{reference_data!r}" ) return False try: pubsub_service = jid.JID(parsed_anchor["path"]) except RuntimeError: log.warning(f"invalid anchor: {reference_data['anchor']}") return False pubsub_node = parsed_anchor.get("node") if not pubsub_node: log.warning(f"missing pubsub node in anchor: {reference_data['anchor']}") return False pubsub_item = parsed_anchor.get("item") if not pubsub_item: log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}") return False cached_node = await self.host.memory.storage.get_pubsub_node( client, pubsub_service, pubsub_node ) if not cached_node: log.warning(f"Anchored node not found in cache: {reference_data['anchor']}") return False cached_items, __ = await self.host.memory.storage.get_items( cached_node, item_ids=[pubsub_item] ) if not cached_items: log.warning( f"Anchored pubsub item not found in cache: {reference_data['anchor']}" ) return False cached_item = cached_items[0] mb_data = await self._m.item_2_mb_data( client, cached_item.data, pubsub_service, pubsub_node ) ap_item = await self.mb_data_2_ap_item(client, mb_data) ap_object = ap_item["object"] ap_object["to"] = [actor_id] ap_object.setdefault("tag", []).append({ "type": TYPE_MENTION, "href": actor_id, "name": ap_account, }) inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item) return False async def new_reply_to_xmpp_item( self, client: SatXMPPEntity, ap_item: dict, targets: Dict[str, Set[str]], mentions: List[Dict[str, str]], ) -> None: """We got an AP item which is a reply to an XMPP item""" in_reply_to = ap_item["inReplyTo"] url_type, url_args = self.parse_apurl(in_reply_to) if url_type != "item": log.warning( "Ignoring AP item replying to an XMPP item with an unexpected URL " f"type({url_type!r}):\n{pformat(ap_item)}" ) return try: parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) except (IndexError, ValueError): log.warning( "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " f"({in_reply_to!r}):\n{pformat(ap_item)}" ) return parent_item_service, parent_item_node = await self.get_jid_and_node( parent_item_account ) if parent_item_node is None: parent_item_node = self._m.namespace items, __ = await self._p.get_items( client, parent_item_service, parent_item_node, item_ids=[parent_item_id] ) try: parent_item_elt = items[0] except IndexError: log.warning( f"Can't find parent item at {parent_item_service} (node " f"{parent_item_node!r})\n{pformat(ap_item)}") return parent_item_parsed = await self._m.item_2_mb_data( client, parent_item_elt, parent_item_service, parent_item_node ) try: comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) comment_node = parent_item_parsed["comments"][0]["node"] except (KeyError, IndexError): # we don't have a comment node set for this item from libervia.backend.tools.xml_tools import pp_elt log.info(f"{pp_elt(parent_item_elt.toXml())}") raise NotImplementedError() else: __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) await self._p.publish(client, comment_service, comment_node, [item_elt]) await self.notify_mentions( targets, mentions, comment_service, comment_node, item_elt["id"] ) def get_ap_item_targets( self, item: Dict[str, Any] ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: """Retrieve targets of an AP item, and indicate if it's a public one @param item: AP object payload @return: Are returned: - is_public flag, indicating if the item is world-readable - a dict mapping target type to targets """ targets: Dict[str, Set[str]] = {} is_public = False # TODO: handle "audience" for key in ("to", "bto", "cc", "bcc"): values = item.get(key) if not values: continue if isinstance(values, str): values = [values] for value in values: if value in PUBLIC_TUPLE: is_public = True continue if not value: continue if not self.is_local_url(value): continue target_type = self.parse_apurl(value)[0] if target_type != TYPE_ACTOR: log.debug(f"ignoring non actor type as a target: {href}") else: targets.setdefault(target_type, set()).add(value) mentions = [] tags = item.get("tag") if tags: for tag in tags: if tag.get("type") != TYPE_MENTION: continue href = tag.get("href") if not href: log.warning('Missing "href" field from mention object: {tag!r}') continue if not self.is_local_url(href): continue uri_type = self.parse_apurl(href)[0] if uri_type != TYPE_ACTOR: log.debug(f"ignoring non actor URI as a target: {href}") continue mention = {"uri": href} mentions.append(mention) name = tag.get("name") if name: mention["content"] = name return is_public, targets, mentions async def new_ap_item( self, client: SatXMPPEntity, destinee: Optional[jid.JID], node: str, item: dict, ) -> None: """Analyse, cache and send notification for received AP item @param destinee: jid of the destinee, @param node: XMPP pubsub node @param item: AP object payload """ is_public, targets, mentions = self.get_ap_item_targets(item) if not is_public and targets.keys() == {TYPE_ACTOR}: # this is a direct message await self.handle_message_ap_item( client, targets, mentions, destinee, item ) else: await self.handle_pubsub_ap_item( client, targets, mentions, destinee, node, item, is_public ) async def handle_message_ap_item( self, client: SatXMPPEntity, targets: Dict[str, Set[str]], mentions: List[Dict[str, str]], destinee: Optional[jid.JID], item: dict, ) -> None: """Parse and deliver direct AP items translating to XMPP messages @param targets: actors where the item must be delivered @param destinee: jid of the destinee, @param item: AP object payload """ targets_jids = { await self.get_jid_from_id(t) for t_set in targets.values() for t in t_set } if destinee is not None: targets_jids.add(destinee) mb_data = await self.ap_item_2_mb_data(item) extra = { "origin_id": mb_data["id"] } attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) if attachments: extra[C.KEY_ATTACHMENTS] = attachments defer_l = [] for target_jid in targets_jids: defer_l.append( client.sendMessage( target_jid, {'': mb_data.get("content", "")}, mb_data.get("title"), extra=extra ) ) await defer.DeferredList(defer_l) async def notify_mentions( self, targets: Dict[str, Set[str]], mentions: List[Dict[str, str]], service: jid.JID, node: str, item_id: str, ) -> None: """Send mention notifications to recipients and mentioned entities XEP-0372 (References) is used. Mentions are also sent to recipients as they are primary audience (see https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). """ anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) seen = set() # we start with explicit mentions because mentions' content will be used in the # future to fill "begin" and "end" reference attributes (we can't do it at the # moment as there is no way to specify the XML element to use in the blog item). for mention in mentions: mentioned_jid = await self.get_jid_from_id(mention["uri"]) self._refs.send_reference( self.client, to_jid=mentioned_jid, anchor=anchor ) seen.add(mentioned_jid) remaining = { await self.get_jid_from_id(t) for t_set in targets.values() for t in t_set } - seen for target in remaining: self._refs.send_reference( self.client, to_jid=target, anchor=anchor ) async def handle_pubsub_ap_item( self, client: SatXMPPEntity, targets: Dict[str, Set[str]], mentions: List[Dict[str, str]], destinee: Optional[jid.JID], node: str, item: dict, public: bool ) -> None: """Analyse, cache and deliver AP items translating to Pubsub @param targets: actors/collections where the item must be delivered @param destinee: jid of the destinee, @param node: XMPP pubsub node @param item: AP object payload @param public: True if the item is public """ # XXX: "public" is not used for now service = client.jid in_reply_to = item.get("inReplyTo") if in_reply_to and isinstance(in_reply_to, list): in_reply_to = in_reply_to[0] if in_reply_to and isinstance(in_reply_to, str): if self.is_local_url(in_reply_to): # this is a reply to an XMPP item await self.new_reply_to_xmpp_item(client, item, targets, mentions) return # this item is a reply to an AP item, we use or create a corresponding node # for comments parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to) node = parent_node or node cached_node = await self.host.memory.storage.get_pubsub_node( client, service, node, with_subscriptions=True, create=True, create_kwargs={"subscribed": True} ) else: # it is a root item (i.e. not a reply to an other item) create = node == self._events.namespace cached_node = await self.host.memory.storage.get_pubsub_node( client, service, node, with_subscriptions=True, create=create ) if cached_node is None: log.warning( f"Received item in unknown node {node!r} at {service}. This may be " f"due to a cache purge. We synchronise the node\n{item}" ) return if item.get("type") == TYPE_EVENT: data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) else: data, item_elt = await self.ap_item_2_mb_data_and_elt(item) await self.host.memory.storage.cache_pubsub_items( client, cached_node, [item_elt], [data] ) for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.pubsub_service.notifyPublish( service, node, [(subscription.subscriber, None, [item_elt])] ) await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) async def new_ap_delete_item( self, client: SatXMPPEntity, destinee: Optional[jid.JID], node: str, item: dict, ) -> None: """Analyse, cache and send notification for received AP item @param destinee: jid of the destinee, @param node: XMPP pubsub node @param activity: parent AP activity @param item: AP object payload only the "id" field is used """ item_id = item.get("id") if not item_id: raise exceptions.DataError('"id" attribute is missing in item') if not item_id.startswith("http"): raise exceptions.DataError(f"invalid id: {item_id!r}") if self.is_local_url(item_id): raise ValueError("Local IDs should not be used") # we have no way to know if a deleted item is a direct one (thus a message) or one # converted to pubsub. We check if the id is in message history to decide what to # do. history = await self.host.memory.storage.get( client, History, History.origin_id, item_id, (History.messages, History.subjects) ) if history is not None: # it's a direct message if history.source_jid != client.jid: log.warning( f"retraction received from an entity ''{client.jid}) which is " f"not the original sender of the message ({history.source_jid}), " "hack attemps?" ) raise exceptions.PermissionError("forbidden") await self._r.retract_by_history(client, history) else: # no history in cache with this ID, it's probably a pubsub item cached_node = await self.host.memory.storage.get_pubsub_node( client, client.jid, node, with_subscriptions=True ) if cached_node is None: log.warning( f"Received an item retract for node {node!r} at {client.jid} " "which is not cached" ) raise exceptions.NotFound await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id]) # notifyRetract is expecting domish.Element instances item_elt = domish.Element((None, "item")) item_elt["id"] = item_id for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.pubsub_service.notifyRetract( client.jid, node, [(subscription.subscriber, None, [item_elt])] )