Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3735:04ecc8eeb81a
tests (ap-gateway): fix use of outbox URL to get items
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 21 Mar 2022 16:37:19 +0100 |
parents | 86eea17cafa7 |
children | bf0505d41c09 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import hashlib import json from pathlib import Path from typing import Optional, Dict, Tuple, List, Union from urllib import parse import calendar import re import dateutil from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding import shortuuid import treq from treq.response import _Response as TReqResponse from twisted.internet import defer, reactor, threads from twisted.web import http from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from wokkel import rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger from sat.tools import utils from sat.tools.common import data_format, tls from sat.tools.common.async_utils import async_lru from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, AP_MB_MAP, LRU_MAX_SIZE) from .http_server import HTTPServer from .pubsub_service import APPubsubService log = getLogger(__name__) IMPORT_NAME = "ap-gateway" PLUGIN_INFO = { C.PI_NAME: "ActivityPub Gateway component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: _( "Gateway for bidirectional communication between XMPP and ActivityPub." ), } HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") class APGateway: def __init__(self, host): self.host = host self.initialised = False self._m = host.plugins["XEP-0277"] self._p = host.plugins["XEP-0060"] host.bridge.addMethod( "APSend", ".plugin", in_sign="sss", out_sign="", method=self._publishMessage, async_=True, ) def getHandler(self, __): return APPubsubService(self) async def init(self, client): if self.initialised: return self.initialised = True log.info(_("ActivityPub Gateway initialization")) # RSA keys stored_data = await self.host.memory.storage.getPrivates( IMPORT_NAME, ["rsa_key"], profile=client.profile ) private_key_pem = stored_data.get("rsa_key") if private_key_pem is None: self.private_key = await threads.deferToThread( rsa.generate_private_key, public_exponent=65537, key_size=4096, ) private_key_pem = self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ).decode() await self.host.memory.storage.setPrivateValue( IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile ) else: self.private_key = serialization.load_pem_private_key( private_key_pem.encode(), password=None, ) self.public_key = self.private_key.public_key() self.public_key_pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ).decode() # params # URL and port self.public_url = self.host.memory.getConfig( CONF_SECTION, "public_url" ) or self.host.memory.getConfig( CONF_SECTION, "xmpp_domain" ) if self.public_url is None: log.error( '"public_url" not set in configuration, this is mandatory to have' "ActivityPub Gateway running. Please set this option it to public facing " f"url in {CONF_SECTION!r} configuration section." ) return if parse.urlparse(self.public_url).scheme: log.error( "Scheme must not be specified in \"public_url\", please remove it from " "\"public_url\" configuration option. ActivityPub Gateway won't be run." ) return self.http_port = int(self.host.memory.getConfig( CONF_SECTION, 'http_port', 8123)) connection_type = self.host.memory.getConfig( CONF_SECTION, 'http_connection_type', 'https') if connection_type not in ('http', 'https'): raise exceptions.ConfigError( 'bad ap-gateay http_connection_type, you must use one of "http" or ' '"https"' ) self.max_items = self.host.memory.getConfig( CONF_SECTION, 'new_node_max_items', 50 ) self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") # True (default) if we provide gateway only to entities/services from our server self.local_only = C.bool( self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) ) # HTTP server launch self.server = HTTPServer(self) if connection_type == 'http': reactor.listenTCP(self.http_port, self.server) else: options = tls.getOptionsFromConfig( self.host.memory.config, CONF_SECTION) tls.TLSOptionsCheck(options) context_factory = tls.getTLSContextFactory(options) reactor.listenSSL(self.http_port, self.server, context_factory) async def profileConnecting(self, client): self.client = client await self.init(client) async def apGet(self, url: str) -> dict: """Retrieve AP JSON from given URL @raise error.StanzaError: "service-unavailable" is sent when something went wrong with AP server """ try: return await treq.json_content(await treq.get( url, headers = { "Accept": [MEDIA_TYPE_AP], "Content-Type": [MEDIA_TYPE_AP], } )) except Exception as e: raise error.StanzaError( "service-unavailable", text=f"Can't get AP data at {url}: {e}" ) def mustEncode(self, text: str) -> bool: """Indicate if a text must be period encoded""" return ( not RE_ALLOWED_UNQUOTED.match(text) or text.startswith("___") or "---" in text ) def periodEncode(self, text: str) -> str: """Period encode a text see [getJIDAndNode] for reasons of period encoding """ return ( parse.quote(text, safe="") .replace("---", "%2d%2d%2d") .replace("___", "%5f%5f%5f") .replace(".", "%2e") .replace("~", "%7e") .replace("%", ".") ) async def getAPAccountFromJidAndNode( self, jid_: jid.JID, node: Optional[str] ) -> str: """Construct AP account from JID and node The account construction will use escaping when necessary """ if not node or node == self._m.namespace: node = None if node and not jid_.user and not self.mustEncode(node): is_pubsub = await self.isPubsub(jid_) # when we have a pubsub service, the user part can be used to set the node # this produces more user-friendly AP accounts if is_pubsub: jid_.user = node node = None is_local = self.isLocal(jid_) user = jid_.user if is_local else jid_.userhost() if user is None: user = "" account_elts = [] if node and self.mustEncode(node) or self.mustEncode(user): account_elts = ["___"] if node: node = self.periodEncode(node) user = self.periodEncode(user) if not user: raise exceptions.InternalError("there should be a user part") if node: account_elts.extend((node, "---")) account_elts.extend(( user, "@", jid_.host if is_local else self.client.jid.userhost() )) return "".join(account_elts) def isLocal(self, jid_: jid.JID) -> bool: """Returns True if jid_ use a domain or subdomain of gateway's host""" local_host = self.client.host.split(".") assert local_host return jid_.host.split(".")[-len(local_host):] == local_host async def isPubsub(self, jid_: jid.JID) -> bool: """Indicate if a JID is a Pubsub service""" host_disco = await self.host.getDiscoInfos(self.client, jid_) return ( ("pubsub", "service") in host_disco.identities and not ("pubsub", "pep") in host_disco.identities ) async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: """Decode raw AP account handle to get XMPP JID and Pubsub Node Username are case insensitive. By default, the username correspond to local username (i.e. username from component's server). If local name's domain is a pubsub service (and not PEP), the username is taken as a pubsub node. If ``---`` is present in username, the part before is used as pubsub node, and the rest as a JID user part. If username starts with ``___``, characters are encoded using period encoding (i.e. percent encoding where a ``.`` is used instead of ``%``). This horror is necessary due to limitation in some AP implementation (notably Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 examples: ``toto@example.org`` => JID = toto@example.org, node = None ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => JID = pubsub.example.org, node = toto ``tata---toto@example.org`` => JID = toto@example.org, node = tata ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 @param ap_account: ActivityPub account handle (``username@domain.tld``) @return: service JID and pubsub node if pubsub is None, default microblog pubsub node (and possibly other nodes that plugins may hanlde) will be used @raise ValueError: invalid account @raise PermissionError: non local jid is used when gateway doesn't allow them """ if ap_account.count("@") != 1: raise ValueError("Invalid AP account") if ap_account.startswith("___"): encoded = True ap_account = ap_account[3:] else: encoded = False username, domain = ap_account.split("@") if "---" in username: node, username = username.rsplit("---", 1) else: node = None if encoded: username = parse.unquote( RE_PERIOD_ENC.sub(r"%\g<hex>", username), errors="strict" ) if node: node = parse.unquote( RE_PERIOD_ENC.sub(r"%\g<hex>", node), errors="strict" ) if "@" in username: username, domain = username.rsplit("@", 1) if not node: # we need to check host disco, because disco request to user may be # blocked for privacy reason (see # https://xmpp.org/extensions/xep-0030.html#security) is_pubsub = await self.isPubsub(jid.JID(domain)) if is_pubsub: # if the host is a pubsub service and not a PEP, we consider that username # is in fact the node name node = username username = None jid_s = f"{username}@{domain}" if username else domain try: jid_ = jid.JID(jid_s) except RuntimeError: raise ValueError(f"Invalid jid: {jid_s!r}") if self.local_only and not self.isLocal(jid_): raise exceptions.PermissionError( "This gateway is configured to map only local entities and services" ) return jid_, node def parseAPURL(self, url: str) -> Tuple[str, str]: """Parse an URL leading to an AP endpoint @param url: URL to parse (schema is not mandatory) @return: endpoint type and AP account """ path = parse.urlparse(url).path.lstrip("/") type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) return type_, parse.unquote(account) def buildAPURL(self, type_:str , *args: str) -> str: """Build an AP endpoint URL @param type_: type of AP endpoing @param arg: endpoint dependant arguments """ return parse.urljoin( self.base_ap_url, str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) ) async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: """Sign a documentent and post it to AP server @param url: AP server endpoint @param url_actor: URL generated by this gateway for local actor @param doc: document to send """ p_url = parse.urlparse(url) date = http.datetimeToString().decode() body = json.dumps(doc).encode() digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() digest = f"sha-256={digest_hash}" to_sign = ( f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" f"date: {date}\ndigest: {digest}" ) signature = base64.b64encode(self.private_key.sign( to_sign.encode(), # we have to use PKCS1v15 padding to be compatible with Mastodon padding.PKCS1v15(), hashes.SHA256() )).decode() h_signature = ( f'keyId="{url_actor}",headers="(request-target) host date digest",' f'signature="{signature}"' ) return await treq.post( url, body, headers={ "Host": [p_url.hostname], "Date": [date], "Digest": [digest], "Signature": [h_signature], } ) def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore service = jid.JID(service_s) client = self.host.getClient(profile) return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) async def getAPActorIdFromAccount(self, account: str) -> str: """Retrieve account ID from it's handle using WebFinger @param account: AP handle (user@domain.tld) @return: Actor ID (which is an URL) """ if account.count("@") != 1 or "/" in account: raise ValueError("Invalid account: {account!r}") host = account.split("@")[1] try: finger_data = await treq.json_content(await treq.get( f"https://{host}/.well-known/webfinger?" f"resource=acct:{parse.quote_plus(account)}", )) except Exception as e: raise exceptions.DataError(f"Can't get webfinger data: {e}") for link in finger_data.get("links", []): if ( link.get("type") == "application/activity+json" and link.get("rel") == "self" ): href = link.get("href", "").strip() if not href: raise ValueError( f"Invalid webfinger data for {account:r}: missing href" ) break else: raise ValueError( f"No ActivityPub link found for {account!r}" ) return href async def getAPActorDataFromId(self, account: str) -> dict: """Retrieve ActivityPub Actor data @param account: ActivityPub Actor identifier """ href = await self.getAPActorIdFromAccount(account) return await self.apGet(href) @async_lru(maxsize=LRU_MAX_SIZE) async def getAPAccountFromId(self, actor_id: str): """Retrieve AP account from the ID URL @param actor_id: AP ID of the actor (URL to the actor data) """ url_parsed = parse.urlparse(actor_id) actor_data = await self.apGet(actor_id) username = actor_data.get("preferredUsername") if not username: raise exceptions.DataError( 'No "preferredUsername" field found, can\'t retrieve actor account' ) account = f"{username}@{url_parsed.hostname}" # we try to retrieve the actor ID from the account to check it found_id = await self.getAPActorIdFromAccount(account) if found_id != actor_id: # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 msg = ( f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " f"({actor_id!r}). This AP instance doesn't seems to use " '"preferredUsername" as we expect.' ) log.warning(msg) raise exceptions.DataError(msg) return account async def getAPItems( self, account: str, max_items: Optional[int] = None, chronological_pagination: bool = True, after_id: Optional[str] = None, start_index: Optional[int] = None, ) -> Tuple[List[domish.Element], rsm.RSMResponse]: """Retrieve AP items and convert them to XMPP items @param account: AP account handle to get items from @param max_items: maximum number of items to retrieve retrieve all items by default @param chronological_pagination: get pages in chronological order AP use reversed chronological order for pagination, "first" page returns more recent items. If "chronological_pagination" is True, "last" AP page will be retrieved first. @param after_id: if set, retrieve items starting from given ID Due to ActivityStream Collection Paging limitations, this is inefficient and if ``after_id`` is not already in cache, we have to retrieve every page until we find it. In most common cases, ``after_id`` should be in cache though (client usually use known ID when in-order pagination is used). @param start_index: start retrieving items from the one with given index Due to ActivityStream Collection Paging limitations, this is inefficient and all pages before the requested index will be retrieved to count items. @return: XMPP Pubsub items and corresponding RSM Response Items are always returned in chronological order in the result """ actor_data = await self.getAPActorDataFromId(account) outbox = actor_data.get("outbox") rsm_resp: Dict[str, Union[bool, int]] = {} if not outbox: raise exceptions.DataError(f"No outbox found for actor {account}") outbox_data = await self.apGet(outbox) try: count = outbox_data["totalItems"] except KeyError: log.warning( f'"totalItems" not found in outbox of {account}, defaulting to 20' ) count = 20 else: log.info(f"{account}'s outbox has {count} item(s)") rsm_resp["count"] = count if start_index is not None: assert chronological_pagination and after_id is None if start_index >= count: return [], rsm_resp elif start_index == 0: # this is the default behaviour pass elif start_index > 5000: raise error.StanzaError( "feature-not-implemented", text="Maximum limit for previous_index has been reached, this limit" "is set to avoid DoS" ) else: # we'll convert "start_index" to "after_id", thus we need the item just # before "start_index" previous_index = start_index - 1 retrieved_items = 0 current_page = outbox_data["last"] while retrieved_items < count: page_data, items = await self.parseAPPage(current_page) if not items: log.warning(f"found an empty AP page at {current_page}") return [], rsm_resp page_start_idx = retrieved_items retrieved_items += len(items) if previous_index <= retrieved_items: after_id = items[previous_index - page_start_idx]["id"] break try: current_page = page_data["prev"] except KeyError: log.warning( f"missing previous page link at {current_page}: {page_data!r}" ) raise error.StanzaError( "service-unavailable", "Error while retrieving previous page from AP service at " f"{current_page}" ) init_page = "last" if chronological_pagination else "first" page = outbox_data.get(init_page) if not page: raise exceptions.DataError( f"Initial page {init_page!r} not found for outbox {outbox}" ) items = [] page_items = [] retrieved_items = 0 found_after_id = False while retrieved_items < count: __, page_items = await self.parseAPPage(page) retrieved_items += len(page_items) if after_id is not None and not found_after_id: # if we have an after_id, we ignore all items until the requested one is # found try: limit_idx = [i["id"] for i in page_items].index(after_id) except ValueError: # if "after_id" is not found, we don't add any item from this page log.debug(f"{after_id!r} not found at {page}, skipping") else: found_after_id = True if chronological_pagination: start_index = retrieved_items - len(page_items) + limit_idx + 1 page_items = page_items[limit_idx+1:] else: start_index = count - (retrieved_items - len(page_items) + limit_idx + 1) page_items = page_items[:limit_idx] items.extend(page_items) else: items.extend(page_items) if max_items is not None and len(items) >= max_items: if chronological_pagination: items = items[:max_items] else: items = items[-max_items:] break page = outbox_data.get("prev" if chronological_pagination else "next") if not page: break if after_id is not None and not found_after_id: raise error.StanzaError("item-not-found") if after_id is None: rsm_resp["index"] = 0 if chronological_pagination else count - len(items) if start_index is not None: rsm_resp["index"] = start_index elif after_id is not None: log.warning("Can't determine index of first element") elif chronological_pagination: rsm_resp["index"] = 0 else: rsm_resp["index"] = count - len(items) if items: rsm_resp.update({ "first": items[0]["id"], "last": items[-1]["id"] }) return items, rsm.RSMResponse(**rsm_resp) async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: """Convert AP objects from an AP page to XMPP items @param url: url linking and AP page @return: page data, pubsub items """ page_data = await self.apGet(url) ap_items = page_data.get("orderedItems") if not ap_items: log.warning('No "orderedItems" collection found') return page_data, [] items = [] # AP Collections are in antichronological order, but we expect chronological in # Pubsub, thus we reverse it for ap_item in reversed(ap_items): try: ap_object, mb_data = await self.apItem2MBdata(ap_item) except (exceptions.DataError, NotImplementedError, error.StanzaError): continue item_elt = await self._m.data2entry( self.client, mb_data, ap_object["id"], None, self._m.namespace ) item_elt["publisher"] = mb_data["author_jid"].full() items.append(item_elt) return page_data, items async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: """Convert AP item to microblog data @return: AP Item's Object and microblog data @raise exceptions.DataError: something is invalid in the AP item @raise NotImplemented: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ ap_object = ap_item.get("object") if not ap_object: log.warning(f'No "object" found in AP item {ap_item!r}') raise exceptions.DataError if isinstance(ap_object, str): ap_object = await self.apGet(ap_object) obj_id = ap_object.get("id") if not obj_id: log.warning(f'No "id" found in AP object: {ap_object!r}') raise exceptions.DataError if ap_object.get("inReplyTo") is not None: raise NotImplementedError mb_data = {} for ap_key, mb_key in AP_MB_MAP.items(): data = ap_object.get(ap_key) if data is None: continue mb_data[mb_key] = data # content try: language, content_xhtml = ap_object["contentMap"].popitem() except (KeyError, AttributeError): try: mb_data["content_xhtml"] = mb_data["content"] except KeyError: log.warning(f"no content found:\n{ap_object!r}") raise exceptions.DataError else: mb_data["language"] = language mb_data["content_xhtml"] = content_xhtml # author actor = ap_item.get("actor") if not actor: log.warning(f"no actor associated to object id {obj_id!r}") raise exceptions.DataError elif isinstance(actor, list): # we only keep first item of list as author # TODO: handle multiple actors if len(actor) > 1: log.warning("multiple actors are not managed") actor = actor[0] if isinstance(actor, dict): actor = actor.get("id") if not actor: log.warning(f"no actor id found: {actor!r}") raise exceptions.DataError if isinstance(actor, str): account = await self.getAPAccountFromId(actor) mb_data["author"] = account.split("@", 1)[0] author_jid = mb_data["author_jid"] = jid.JID( None, ( self.host.plugins["XEP-0106"].escape(account), self.client.jid.host, None ) ) else: log.warning(f"unknown actor type found: {actor!r}") raise exceptions.DataError # published/updated for field in ("published", "updated"): value = ap_object.get(field) if not value and field == "updated": value = ap_object.get("published") if value: try: mb_data[field] = calendar.timegm( dateutil.parser.parse(str(value)).utctimetuple() ) except dateutil.parser.ParserError as e: log.warning(f"Can't parse {field!r} field: {e}") return ap_object, mb_data async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: """Convert Libervia Microblog Data to ActivityPub item""" if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): mb_data["author_jid"] = client.jid ap_account = await self.getAPAccountFromJidAndNode( jid.JID(mb_data["author_jid"]), None ) url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) return { "@context": "https://www.w3.org/ns/activitystreams", "id": url_item, "type": "Create", "actor": url_actor, "object": { "id": url_item, "type": "Note", "published": utils.xmpp_date(mb_data["published"]), "attributedTo": url_actor, "content": mb_data.get("content_xhtml") or mb_data["content"], "to": "https://www.w3.org/ns/activitystreams#Public" } } async def publishMessage( self, client: SatXMPPEntity, mess_data: dict, service: jid.JID ) -> None: """Send an AP message .. note:: This is a temporary method used for development only @param mess_data: message data. Following keys must be set: ``node`` identifier of message which is being replied (this will correspond to pubsub node in the future) ``content_xhtml`` or ``content`` message body (respectively in XHTML or plain text) @param service: JID corresponding to the AP actor. """ if not service.user: raise ValueError("service must have a local part") account = self.host.plugins["XEP-0106"].unescape(service.user) ap_actor_data = await self.getAPActorDataFromId(account) try: inbox_url = ap_actor_data["endpoints"]["sharedInbox"] except KeyError: raise exceptions.DataError("Can't get ActivityPub actor inbox") item_data = await self.mbdata2APitem(client, mess_data) url_actor = item_data["object"]["attributedTo"] resp = await self.signAndPost(inbox_url, url_actor, item_data) if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}")