# HG changeset patch # User Goffi # Date 1643650549 -3600 # Node ID 86eea17cafa7d11cce42376b4f04aa1d3364bcab # Parent b15644cae50db805db57917f82c401cfef04f9cf component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363 diff -r b15644cae50d -r 86eea17cafa7 sat/plugins/plugin_comp_ap_gateway.py --- a/sat/plugins/plugin_comp_ap_gateway.py Tue Jan 25 17:54:06 2022 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1223 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia ActivityPub Gateway -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import base64 -import hashlib -import json -from pathlib import Path -import time -from typing import Optional, Dict, Tuple, List, Union -from urllib import parse -import calendar -import re -import unicodedata - -import dateutil -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives.asymmetric import padding -import shortuuid -import treq -from treq.response import _Response as TReqResponse -from twisted.internet import defer, reactor, threads -from twisted.web import http, resource as web_resource, server -from twisted.words.protocols.jabber import jid, error -from twisted.words.xish import domish -from wokkel import pubsub, rsm - -from sat.core import exceptions -from sat.core.constants import Const as C -from sat.core.core_types import SatXMPPEntity -from sat.core.i18n import _ -from sat.core.log import getLogger -from sat.tools import utils -from sat.tools.common import data_format, tls -from sat.tools.common.async_utils import async_lru -from sat.tools.utils import ensure_deferred - - -log = getLogger(__name__) - -IMPORT_NAME = "ap-gateway" - -PLUGIN_INFO = { - C.PI_NAME: "ActivityPub Gateway component", - C.PI_IMPORT_NAME: IMPORT_NAME, - C.PI_MODES: [C.PLUG_MODE_COMPONENT], - C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, - C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], - C.PI_RECOMMENDATIONS: [], - C.PI_MAIN: "APGateway", - C.PI_HANDLER: C.BOOL_TRUE, - C.PI_DESCRIPTION: _( - "Gateway for bidirectional communication between XMPP and ActivityPub." - ), -} - -VERSION = unicodedata.normalize( - 'NFKD', - f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" -) -HEXA_ENC = r"(?P[0-9a-fA-f]{2})" -RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") -RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") -RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") -CONF_SECTION = f"component {IMPORT_NAME}" -CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" -TYPE_ACTOR = "actor" -TYPE_INBOX = "inbox" -TYPE_OUTBOX = "outbox" -TYPE_ITEM = "item" -MEDIA_TYPE_AP = "application/activity+json" -# mapping from AP metadata to microblog data -AP_MB_MAP = { - "content": "content_xhtml", - -} -AP_REQUEST_TYPES = {"actor", "outbox"} -PAGE_SIZE = 10 - -LRU_MAX_SIZE = 200 - - -class HTTPAPGServer(web_resource.Resource): - """HTTP Server handling ActivityPub S2S protocol""" - isLeaf = True - - def __init__(self, ap_gateway): - self.apg = ap_gateway - super().__init__() - - async def webfinger(self, request): - url_parsed = parse.urlparse(request.uri.decode()) - query = parse.parse_qs(url_parsed.query) - resource = query.get("resource", [""])[0] - account = resource[5:].strip() - if not resource.startswith("acct:") or not account: - return web_resource.ErrorPage( - http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" - ).render(request) - - actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) - - resp = { - "subject": resource, - "links": [ - { - "rel": "self", - "type": "application/activity+json", - "href": actor_url - } - ] - } - request.setHeader("content-type", CONTENT_TYPE_AP) - request.write(json.dumps(resp).encode()) - request.finish() - - async def APActorRequest( - self, - request: "HTTPRequest", - account_jid: jid.JID, - node: Optional[str], - ap_account: str, - actor_url: str - ) -> dict: - inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) - outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) - - # we have to use AP account as preferredUsername because it is used to retrieve - # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) - preferred_username = ap_account.split("@", 1)[0] - return { - "@context": [ - "https://www.w3.org/ns/activitystreams", - "https://w3id.org/security/v1" - ], - - "id": actor_url, - "type": "Person", - "preferredUsername": preferred_username, - "inbox": inbox_url, - "outbox": outbox_url, - "publicKey": { - "id": f"{actor_url}#main-key", - "owner": actor_url, - "publicKeyPem": self.apg.public_key_pem - } - } - - def getCanonicalURL(self, request: "HTTPRequest") -> str: - return parse.urljoin( - f"https://{self.apg.public_url}", - request.path.decode().rstrip("/") - ) - - def queryData2RSMRequest( - self, - query_data: Dict[str, List[str]] - ) -> rsm.RSMRequest: - """Get RSM kwargs to use with RSMRequest from query data""" - page = query_data.get("page") - - if page == ["first"]: - return rsm.RSMRequest(max_=PAGE_SIZE, before="") - elif page == ["last"]: - return rsm.RSMRequest(max_=PAGE_SIZE) - else: - for query_key in ("index", "before", "after"): - try: - kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} - except (KeyError, IndexError, ValueError): - pass - else: - return rsm.RSMRequest(**kwargs) - raise ValueError(f"Invalid query data: {query_data!r}") - - async def APOutboxPageRequest( - self, - request: "HTTPRequest", - account_jid: jid.JID, - node: Optional[str], - ap_account: str, - actor_url: str, - query_data: Dict[str, List[str]] - ) -> dict: - # we only keep useful keys, and sort to have consistent URL which can - # be used as ID - url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) - query_data = {k: query_data[k] for k in url_keys} - rsm_kwargs = self.queryData2RSMRequest(query_data) - try: - items, metadata = await self.apg._p.getItems( - client=self.apg.client, - service=account_jid, - node=node, - rsm_request=self.queryData2RSMRequest(query_data), - extra = {C.KEY_USE_CACHE: False} - ) - except error.StanzaError as e: - log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") - return {} - - base_url = self.getCanonicalURL(request) - url = f"{base_url}?{parse.urlencode(query_data, True)}" - data = { - "@context": "https://www.w3.org/ns/activitystreams", - "id": url, - "type": "OrderedCollectionPage", - "partOf": base_url, - "orderedItems" : [ - await self.apg.mbdata2APitem( - self.apg.client, - await self.apg._m.item2mbdata( - self.apg.client, - item, - account_jid, - node - ) - ) - for item in reversed(items) - ] - } - - # AP OrderedCollection must be in reversed chronological order, thus the opposite - # of what we get with RSM (at least with Libervia Pubsub) - if not metadata["complete"]: - try: - last= metadata["rsm"]["last"] - except KeyError: - last = None - data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" - if metadata["rsm"]["index"] != 0: - try: - first= metadata["rsm"]["first"] - except KeyError: - first = None - data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" - - return data - - async def APOutboxRequest( - self, - request: "HTTPRequest", - account_jid: jid.JID, - node: Optional[str], - ap_account: str, - actor_url: str - ) -> dict: - if node is None: - node = self.apg._m.namespace - - parsed_url = parse.urlparse(request.uri.decode()) - query_data = parse.parse_qs(parsed_url.query) - if query_data: - return await self.APOutboxPageRequest( - request, account_jid, node, ap_account, actor_url, query_data - ) - - # XXX: we can't use disco#info here because this request won't work on a bare jid - # due to security considerations of XEP-0030 (we don't have presence - # subscription). - # The current workaround is to do a request as if RSM was available, and actually - # check its availability according to result. - try: - __, metadata = await self.apg._p.getItems( - client=self.apg.client, - service=account_jid, - node=node, - max_items=0, - rsm_request=rsm.RSMRequest(max_=0) - ) - except error.StanzaError as e: - log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") - return {} - try: - items_count = metadata["rsm"]["count"] - except KeyError: - log.warning( - f"No RSM metadata found when requesting pubsub node {node} at " - f"{account_jid}, defaulting to items_count=20" - ) - items_count = 20 - - url = self.getCanonicalURL(request) - url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" - url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" - return { - "@context": "https://www.w3.org/ns/activitystreams", - "id": url, - "totalItems": items_count, - "type": "OrderedCollection", - "first": url_first_page, - "last": url_last_page, - } - - async def APRequest(self, request): - path = request.path.decode() - actor_url = parse.urljoin( - f"https://{self.apg.public_url}", - path - ) - request_type, ap_account = self.apg.parseAPURL(actor_url) - account_jid, node = await self.apg.getJIDAndNode(ap_account) - if request_type not in AP_REQUEST_TYPES: - raise exceptions.DataError(f"Invalid request type: {request_type!r}") - method = getattr(self, f"AP{request_type.title()}Request") - ret_data = await method(request, account_jid, node, ap_account, actor_url) - request.setHeader("content-type", CONTENT_TYPE_AP) - request.write(json.dumps(ret_data).encode()) - request.finish() - - def render(self, request): - request.setHeader("server", VERSION) - return super().render(request) - - def render_GET(self, request): - path = request.path.decode().lstrip("/") - if path.startswith(".well-known/webfinger"): - defer.ensureDeferred(self.webfinger(request)) - return server.NOT_DONE_YET - elif path.startswith(self.apg.ap_path): - defer.ensureDeferred(self.APRequest(request)) - return server.NOT_DONE_YET - - return web_resource.NoResource().render(request) - - -class HTTPRequest(server.Request): - pass - - -class HTTPServer(server.Site): - requestFactory = HTTPRequest - - def __init__(self, ap_gateway): - super().__init__(HTTPAPGServer(ap_gateway)) - - -class APGateway: - - def __init__(self, host): - self.host = host - self.initialised = False - self._m = host.plugins["XEP-0277"] - self._p = host.plugins["XEP-0060"] - - host.bridge.addMethod( - "APSend", - ".plugin", - in_sign="sss", - out_sign="", - method=self._publishMessage, - async_=True, - ) - - def getHandler(self, __): - return APPubsubService(self) - - async def init(self, client): - if self.initialised: - return - - self.initialised = True - log.info(_("ActivityPub Gateway initialization")) - - # RSA keys - stored_data = await self.host.memory.storage.getPrivates( - IMPORT_NAME, ["rsa_key"], profile=client.profile - ) - private_key_pem = stored_data.get("rsa_key") - if private_key_pem is None: - self.private_key = await threads.deferToThread( - rsa.generate_private_key, - public_exponent=65537, - key_size=4096, - ) - private_key_pem = self.private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - ).decode() - await self.host.memory.storage.setPrivateValue( - IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile - ) - else: - self.private_key = serialization.load_pem_private_key( - private_key_pem.encode(), - password=None, - ) - self.public_key = self.private_key.public_key() - self.public_key_pem = self.public_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo - ).decode() - - # params - # URL and port - self.public_url = self.host.memory.getConfig( - CONF_SECTION, "public_url" - ) or self.host.memory.getConfig( - CONF_SECTION, "xmpp_domain" - ) - if self.public_url is None: - log.error( - '"public_url" not set in configuration, this is mandatory to have' - "ActivityPub Gateway running. Please set this option it to public facing " - f"url in {CONF_SECTION!r} configuration section." - ) - return - if parse.urlparse(self.public_url).scheme: - log.error( - "Scheme must not be specified in \"public_url\", please remove it from " - "\"public_url\" configuration option. ActivityPub Gateway won't be run." - ) - return - self.http_port = int(self.host.memory.getConfig( - CONF_SECTION, 'http_port', 8123)) - connection_type = self.host.memory.getConfig( - CONF_SECTION, 'http_connection_type', 'https') - if connection_type not in ('http', 'https'): - raise exceptions.ConfigError( - 'bad ap-gateay http_connection_type, you must use one of "http" or ' - '"https"' - ) - self.max_items = self.host.memory.getConfig( - CONF_SECTION, 'new_node_max_items', 50 - - ) - self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') - self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") - # True (default) if we provide gateway only to entities/services from our server - self.local_only = C.bool( - self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) - ) - - # HTTP server launch - self.server = HTTPServer(self) - if connection_type == 'http': - reactor.listenTCP(self.http_port, self.server) - else: - options = tls.getOptionsFromConfig( - self.host.memory.config, CONF_SECTION) - tls.TLSOptionsCheck(options) - context_factory = tls.getTLSContextFactory(options) - reactor.listenSSL(self.http_port, self.server, context_factory) - - async def profileConnecting(self, client): - self.client = client - await self.init(client) - - async def apGet(self, url: str) -> dict: - """Retrieve AP JSON from given URL - - @raise error.StanzaError: "service-unavailable" is sent when something went wrong - with AP server - """ - try: - return await treq.json_content(await treq.get( - url, - headers = { - "Accept": [MEDIA_TYPE_AP], - "Content-Type": [MEDIA_TYPE_AP], - } - )) - except Exception as e: - raise error.StanzaError( - "service-unavailable", - text=f"Can't get AP data at {url}: {e}" - ) - - def mustEncode(self, text: str) -> bool: - """Indicate if a text must be period encoded""" - return ( - not RE_ALLOWED_UNQUOTED.match(text) - or text.startswith("___") - or "---" in text - ) - - def periodEncode(self, text: str) -> str: - """Period encode a text - - see [getJIDAndNode] for reasons of period encoding - """ - return ( - parse.quote(text, safe="") - .replace("---", "%2d%2d%2d") - .replace("___", "%5f%5f%5f") - .replace(".", "%2e") - .replace("~", "%7e") - .replace("%", ".") - ) - - async def getAPAccountFromJidAndNode( - self, - jid_: jid.JID, - node: Optional[str] - ) -> str: - """Construct AP account from JID and node - - The account construction will use escaping when necessary - """ - if not node or node == self._m.namespace: - node = None - - if node and not jid_.user and not self.mustEncode(node): - is_pubsub = self.isPubsub(jid_) - # when we have a pubsub service, the user part can be used to set the node - # this produces more user-friendly AP accounts - if is_pubsub: - jid_.user = node - node = None - - is_local = self.isLocal(jid_) - user = jid_.user if is_local else jid_.userhost() - if user is None: - user = "" - account_elts = [] - if node and self.mustEncode(node) or self.mustEncode(user): - account_elts = ["___"] - if node: - node = self.periodEncode(node) - user = self.periodEncode(user) - - if not user: - raise exceptions.InternalError("there should be a user part") - - if node: - account_elts.extend((node, "---")) - - account_elts.extend(( - user, "@", jid_.host if is_local else self.client.jid.userhost() - )) - return "".join(account_elts) - - def isLocal(self, jid_: jid.JID) -> bool: - """Returns True if jid_ use a domain or subdomain of gateway's host""" - local_host = self.client.host.split(".") - assert local_host - return jid_.host.split(".")[-len(local_host):] == local_host - - async def isPubsub(self, jid_: jid.JID) -> bool: - """Indicate if a JID is a Pubsub service""" - host_disco = await self.host.getDiscoInfos(self.client, jid_) - return ( - ("pubsub", "service") in host_disco.identities - and not ("pubsub", "pep") in host_disco.identities - ) - - async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: - """Decode raw AP account handle to get XMPP JID and Pubsub Node - - Username are case insensitive. - - By default, the username correspond to local username (i.e. username from - component's server). - - If local name's domain is a pubsub service (and not PEP), the username is taken as - a pubsub node. - - If ``---`` is present in username, the part before is used as pubsub node, and the - rest as a JID user part. - - If username starts with ``___``, characters are encoded using period encoding - (i.e. percent encoding where a ``.`` is used instead of ``%``). - - This horror is necessary due to limitation in some AP implementation (notably - Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 - - examples:: - - ``toto@example.org`` => JID = toto@example.org, node = None - - ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None - - ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => - JID = pubsub.example.org, node = toto - - ``tata---toto@example.org`` => JID = toto@example.org, node = tata - - ``___urn.3axmpp.3a.microblog.3a0@pubsub.example.org`` (with pubsub.example.org - being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 - - @param ap_account: ActivityPub account handle (``username@domain.tld``) - @return: service JID and pubsub node - if pubsub is None, default microblog pubsub node (and possibly other nodes - that plugins may hanlde) will be used - @raise ValueError: invalid account - @raise PermissionError: non local jid is used when gateway doesn't allow them - """ - if ap_account.count("@") != 1: - raise ValueError("Invalid AP account") - if ap_account.startswith("___"): - encoded = True - ap_account = ap_account[3:] - else: - encoded = False - - username, domain = ap_account.split("@") - - if "---" in username: - node, username = username.rsplit("---", 1) - else: - node = None - - if encoded: - username = parse.unquote( - RE_PERIOD_ENC.sub(r"%\g", username), - errors="strict" - ) - if node: - node = parse.unquote( - RE_PERIOD_ENC.sub(r"%\g", node), - errors="strict" - ) - - if "@" in username: - username, domain = username.rsplit("@", 1) - - if not node: - # we need to check host disco, because disco request to user may be - # blocked for privacy reason (see - # https://xmpp.org/extensions/xep-0030.html#security) - is_pubsub = await self.isPubsub(jid.JID(domain)) - - if is_pubsub: - # if the host is a pubsub service and not a PEP, we consider that username - # is in fact the node name - node = username - username = None - - jid_s = f"{username}@{domain}" if username else domain - try: - jid_ = jid.JID(jid_s) - except RuntimeError: - raise ValueError(f"Invalid jid: {jid_s!r}") - - if self.local_only and not self.isLocal(jid_): - raise exceptions.PermissionError( - "This gateway is configured to map only local entities and services" - ) - - return jid_, node - - def parseAPURL(self, url: str) -> Tuple[str, str]: - """Parse an URL leading to an AP endpoint - - @param url: URL to parse (schema is not mandatory) - @return: endpoint type and AP account - """ - path = parse.urlparse(url).path.lstrip("/") - type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) - return type_, parse.unquote(account) - - def buildAPURL(self, type_:str , *args: str) -> str: - """Build an AP endpoint URL - - @param type_: type of AP endpoing - @param arg: endpoint dependant arguments - """ - return parse.urljoin( - self.base_ap_url, - str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) - ) - - async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: - """Sign a documentent and post it to AP server - - @param url: AP server endpoint - @param url_actor: URL generated by this gateway for local actor - @param doc: document to send - """ - p_url = parse.urlparse(url) - date = http.datetimeToString().decode() - body = json.dumps(doc).encode() - digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() - digest = f"sha-256={digest_hash}" - to_sign = ( - f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" - f"date: {date}\ndigest: {digest}" - ) - signature = base64.b64encode(self.private_key.sign( - to_sign.encode(), - # we have to use PKCS1v15 padding to be compatible with Mastodon - padding.PKCS1v15(), - hashes.SHA256() - )).decode() - h_signature = ( - f'keyId="{url_actor}",headers="(request-target) host date digest",' - f'signature="{signature}"' - ) - return await treq.post( - url, - body, - headers={ - "Host": [p_url.hostname], - "Date": [date], - "Digest": [digest], - "Signature": [h_signature], - } - ) - - def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): - mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore - service = jid.JID(service_s) - client = self.host.getClient(profile) - return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) - - async def getAPActorIdFromAccount(self, account: str) -> str: - """Retrieve account ID from it's handle using WebFinger - - @param account: AP handle (user@domain.tld) - @return: Actor ID (which is an URL) - """ - if account.count("@") != 1 or "/" in account: - raise ValueError("Invalid account: {account!r}") - host = account.split("@")[1] - try: - finger_data = await treq.json_content(await treq.get( - f"https://{host}/.well-known/webfinger?" - f"resource=acct:{parse.quote_plus(account)}", - )) - except Exception as e: - raise exceptions.DataError(f"Can't get webfinger data: {e}") - for link in finger_data.get("links", []): - if ( - link.get("type") == "application/activity+json" - and link.get("rel") == "self" - ): - href = link.get("href", "").strip() - if not href: - raise ValueError( - f"Invalid webfinger data for {account:r}: missing href" - ) - break - else: - raise ValueError( - f"No ActivityPub link found for {account!r}" - ) - return href - - async def getAPActorDataFromId(self, account: str) -> dict: - """Retrieve ActivityPub Actor data - - @param account: ActivityPub Actor identifier - """ - href = await self.getAPActorIdFromAccount(account) - return await self.apGet(href) - - @async_lru(maxsize=LRU_MAX_SIZE) - async def getAPAccountFromId(self, actor_id: str): - """Retrieve AP account from the ID URL - - @param actor_id: AP ID of the actor (URL to the actor data) - """ - url_parsed = parse.urlparse(actor_id) - actor_data = await self.apGet(actor_id) - username = actor_data.get("preferredUsername") - if not username: - raise exceptions.DataError( - 'No "preferredUsername" field found, can\'t retrieve actor account' - ) - account = f"{username}@{url_parsed.hostname}" - # we try to retrieve the actor ID from the account to check it - found_id = await self.getAPActorIdFromAccount(account) - if found_id != actor_id: - # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 - msg = ( - f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " - f"({actor_id!r}). This AP instance doesn't seems to use " - '"preferredUsername" as we expect.' - ) - log.warning(msg) - raise exceptions.DataError(msg) - return account - - async def getAPItems( - self, - account: str, - max_items: Optional[int] = None, - chronological_pagination: bool = True, - after_id: Optional[str] = None, - start_index: Optional[int] = None, - ) -> Tuple[List[domish.Element], rsm.RSMResponse]: - """Retrieve AP items and convert them to XMPP items - - @param account: AP account to get items from - @param max_items: maximum number of items to retrieve - retrieve all items by default - @param chronological_pagination: get pages in chronological order - AP use reversed chronological order for pagination, "first" page returns more - recent items. If "chronological_pagination" is True, "last" AP page will be - retrieved first. - @param after_id: if set, retrieve items starting from given ID - Due to ActivityStream Collection Paging limitations, this is inefficient and - if ``after_id`` is not already in cache, we have to retrieve every page until - we find it. - In most common cases, ``after_id`` should be in cache though (client usually - use known ID when in-order pagination is used). - @param start_index: start retrieving items from the one with given index - Due to ActivityStream Collection Paging limitations, this is inefficient and - all pages before the requested index will be retrieved to count items. - @return: XMPP Pubsub items and corresponding RSM Response - Items are always returned in chronological order in the result - """ - actor_data = await self.getAPActorDataFromId(account) - outbox = actor_data.get("outbox") - rsm_resp: Dict[str, Union[bool, int]] = {} - if not outbox: - raise exceptions.DataError(f"No outbox found for actor {account}") - outbox_data = await self.apGet(outbox) - try: - count = outbox_data["totalItems"] - except KeyError: - log.warning( - f'"totalItems" not found in outbox of {account}, defaulting to 20' - ) - count = 20 - else: - log.info(f"{account}'s outbox has {count} item(s)") - rsm_resp["count"] = count - - if start_index is not None: - assert chronological_pagination and after_id is None - if start_index >= count: - return [], rsm_resp - elif start_index == 0: - # this is the default behaviour - pass - elif start_index > 5000: - raise error.StanzaError( - "feature-not-implemented", - text="Maximum limit for previous_index has been reached, this limit" - "is set to avoid DoS" - ) - else: - # we'll convert "start_index" to "after_id", thus we need the item just - # before "start_index" - previous_index = start_index - 1 - retrieved_items = 0 - current_page = outbox_data["last"] - while retrieved_items < count: - page_data, items = await self.parseAPPage(current_page) - if not items: - log.warning(f"found an empty AP page at {current_page}") - return [], rsm_resp - page_start_idx = retrieved_items - retrieved_items += len(items) - if previous_index <= retrieved_items: - after_id = items[previous_index - page_start_idx]["id"] - break - try: - current_page = page_data["prev"] - except KeyError: - log.warning( - f"missing previous page link at {current_page}: {page_data!r}" - ) - raise error.StanzaError( - "service-unavailable", - "Error while retrieving previous page from AP service at " - f"{current_page}" - ) - - init_page = "last" if chronological_pagination else "first" - page = outbox_data.get(init_page) - if not page: - raise exceptions.DataError( - f"Initial page {init_page!r} not found for outbox {outbox}" - ) - items = [] - page_items = [] - retrieved_items = 0 - found_after_id = False - - while retrieved_items < count: - __, page_items = await self.parseAPPage(page) - retrieved_items += len(page_items) - if after_id is not None and not found_after_id: - # if we have an after_id, we ignore all items until the requested one is - # found - limit_idx = [i["id"] for i in page_items].index(after_id) - if limit_idx == -1: - # if "after_id" is not found, we don't add any item from this page - log.debug(f"{after_id!r} not found at {page}, skipping") - else: - found_after_id = True - if chronological_pagination: - page_items = page_items[limit_idx+1:] - start_index = retrieved_items - len(page_items) + limit_idx + 1 - else: - page_items = page_items[:limit_idx] - start_index = count - (retrieved_items - len(page_items) + - limit_idx + 1) - items.extend(page_items) - else: - items.extend(page_items) - if max_items is not None and len(items) >= max_items: - if chronological_pagination: - items = items[:max_items] - else: - items = items[-max_items:] - break - page = outbox_data.get("prev" if chronological_pagination else "next") - if not page: - break - - if after_id is not None and not found_after_id: - raise error.StanzaError("item-not-found") - - if after_id is None: - rsm_resp["index"] = 0 if chronological_pagination else count - len(items) - - if start_index is not None: - rsm_resp["index"] = start_index - elif after_id is not None: - log.warning("Can't determine index of first element") - elif chronological_pagination: - rsm_resp["index"] = 0 - else: - rsm_resp["index"] = count - len(items) - if items: - rsm_resp.update({ - "first": items[0]["id"], - "last": items[-1]["id"] - }) - - return items, rsm.RSMResponse(**rsm_resp) - - async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: - """Convert AP objects from an AP page to XMPP items - - @param url: url linking and AP page - @return: page data, pubsub items - """ - page_data = await self.apGet(url) - ap_items = page_data.get("orderedItems") - if not ap_items: - log.warning('No "orderedItems" collection found') - return page_data, [] - items = [] - # AP Collections are in antichronological order, but we expect chronological in - # Pubsub, thus we reverse it - for ap_item in reversed(ap_items): - try: - ap_object, mb_data = await self.apItem2MBdata(ap_item) - except (exceptions.DataError, NotImplementedError, error.StanzaError): - continue - - item_elt = await self._m.data2entry( - self.client, mb_data, ap_object["id"], None, self._m.namespace - ) - item_elt["publisher"] = mb_data["author_jid"].full() - items.append(item_elt) - - return page_data, items - - async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: - """Convert AP item to microblog data - - @return: AP Item's Object and microblog data - @raise exceptions.DataError: something is invalid in the AP item - @raise NotImplemented: some AP data is not handled yet - @raise error.StanzaError: error while contacting the AP server - """ - ap_object = ap_item.get("object") - if not ap_object: - log.warning(f'No "object" found in AP item {ap_item!r}') - raise exceptions.DataError - if isinstance(ap_object, str): - ap_object = await self.apGet(ap_object) - obj_id = ap_object.get("id") - if not obj_id: - log.warning(f'No "id" found in AP object: {ap_object!r}') - raise exceptions.DataError - if ap_object.get("inReplyTo") is not None: - raise NotImplementedError - mb_data = {} - for ap_key, mb_key in AP_MB_MAP.items(): - data = ap_object.get(ap_key) - if data is None: - continue - mb_data[mb_key] = data - - # content - try: - language, content_xhtml = ap_object["contentMap"].popitem() - except (KeyError, AttributeError): - try: - mb_data["content_xhtml"] = mb_data["content"] - except KeyError: - log.warning(f"no content found:\n{ap_object!r}") - raise exceptions.DataError - else: - mb_data["language"] = language - mb_data["content_xhtml"] = content_xhtml - - # author - actor = ap_item.get("actor") - if not actor: - log.warning(f"no actor associated to object id {obj_id!r}") - raise exceptions.DataError - elif isinstance(actor, list): - # we only keep first item of list as author - # TODO: handle multiple actors - if len(actor) > 1: - log.warning("multiple actors are not managed") - actor = actor[0] - - if isinstance(actor, dict): - actor = actor.get("id") - if not actor: - log.warning(f"no actor id found: {actor!r}") - raise exceptions.DataError - - if isinstance(actor, str): - account = await self.getAPAccountFromId(actor) - mb_data["author"] = account.split("@", 1)[0] - author_jid = mb_data["author_jid"] = jid.JID( - None, - ( - self.host.plugins["XEP-0106"].escape(account), - self.client.jid.host, - None - ) - ) - else: - log.warning(f"unknown actor type found: {actor!r}") - raise exceptions.DataError - - # published/updated - for field in ("published", "updated"): - value = ap_object.get(field) - if not value and field == "updated": - value = ap_object.get("published") - if value: - try: - mb_data[field] = calendar.timegm( - dateutil.parser.parse(str(value)).utctimetuple() - ) - except dateutil.parser.ParserError as e: - log.warning(f"Can't parse {field!r} field: {e}") - return ap_object, mb_data - - async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: - """Convert Libervia Microblog Data to ActivityPub item""" - if not mb_data.get("id"): - mb_data["id"] = shortuuid.uuid() - if not mb_data.get("author_jid"): - mb_data["author_jid"] = client.jid - ap_account = await self.getAPAccountFromJidAndNode( - jid.JID(mb_data["author_jid"]), - None - ) - url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) - url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) - return { - "@context": "https://www.w3.org/ns/activitystreams", - "id": url_item, - "type": "Create", - "actor": url_actor, - - "object": { - "id": url_item, - "type": "Note", - "published": utils.xmpp_date(mb_data["published"]), - "attributedTo": url_actor, - "content": mb_data.get("content_xhtml") or mb_data["content"], - "to": "https://www.w3.org/ns/activitystreams#Public" - } - } - - async def publishMessage( - self, - client: SatXMPPEntity, - mess_data: dict, - service: jid.JID - ) -> None: - """Send an AP message - - .. note:: - - This is a temporary method used for development only - - @param mess_data: message data. Following keys must be set: - - ``node`` - identifier of message which is being replied (this will - correspond to pubsub node in the future) - - ``content_xhtml`` or ``content`` - message body (respectively in XHTML or plain text) - - @param service: JID corresponding to the AP actor. - """ - if not service.user: - raise ValueError("service must have a local part") - account = self.host.plugins["XEP-0106"].unescape(service.user) - ap_actor_data = await self.getAPActorDataFromId(account) - - try: - inbox_url = ap_actor_data["endpoints"]["sharedInbox"] - except KeyError: - raise exceptions.DataError("Can't get ActivityPub actor inbox") - - item_data = await self.mbdata2APitem(client, mess_data) - url_actor = item_data["object"]["attributedTo"] - resp = await self.signAndPost(inbox_url, url_actor, item_data) - if resp.code != 202: - raise exceptions.NetworkError(f"unexpected return code: {resp.code}") - - -class APPubsubService(rsm.PubSubService): - """Pubsub service for XMPP requests""" - - def __init__(self, apg): - super(APPubsubService, self).__init__() - self.host = apg.host - self.apg = apg - self.discoIdentity = { - "category": "pubsub", - "type": "service", - "name": "Libervia ActivityPub Gateway", - } - - @ensure_deferred - async def publish(self, requestor, service, nodeIdentifier, items): - raise NotImplementedError - - @ensure_deferred - async def items( - self, - requestor: jid.JID, - service: jid.JID, - node: str, - maxItems: Optional[int], - itemIdentifiers: Optional[List[str]], - rsm_req: Optional[rsm.RSMRequest] - ) -> List[domish.Element]: - if not service.user: - return [] - ap_account = self.host.plugins["XEP-0106"].unescape(service.user) - if ap_account.count("@") != 1: - log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") - return [] - if node != self.apg._m.namespace: - raise error.StanzaError( - "feature-not-implemented", - text=f"{VERSION} only supports {self.apg._m.namespace} " - "node for now" - ) - if rsm_req is None: - if maxItems is None: - maxItems = 20 - kwargs = { - "max_items": maxItems, - "chronological_pagination": False, - } - else: - if len( - [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) - if v is not None] - ) > 1: - raise error.StanzaError( - "bad-request", - text="You can't use after, before and index at the same time" - ) - kwargs = {"max_items": rsm_req.max} - if rsm_req.after is not None: - kwargs["after_id"] = rsm_req.after - elif rsm_req.before is not None: - kwargs["chronological_pagination"] = False - if rsm_req.before != "": - kwargs["after_id"] = rsm_req.before - elif rsm_req.index is not None: - kwargs["start_index"] = rsm_req.index - - log.info( - f"No cache found for node {node} at {service} (AP account {ap_account}), " - "using Collection Paging to RSM translation" - ) - return await self.apg.getAPItems(ap_account, **kwargs) - - @ensure_deferred - async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): - raise NotImplementedError - - def getNodeInfo( - self, - requestor: jid.JID, - service: jid.JID, - nodeIdentifier: str, - pep: bool = False, - recipient: Optional[jid.JID] = None - ) -> Optional[dict]: - if not nodeIdentifier: - return None - info = { - "type": "leaf", - "meta-data": [ - {"var": "pubsub#persist_items", "type": "boolean", "value": True}, - {"var": "pubsub#max_items", "value": "max"}, - {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, - {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, - - ] - - } - return info diff -r b15644cae50d -r 86eea17cafa7 sat/plugins/plugin_comp_ap_gateway/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,853 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +import hashlib +import json +from pathlib import Path +from typing import Optional, Dict, Tuple, List, Union +from urllib import parse +import calendar +import re + +import dateutil +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import padding +import shortuuid +import treq +from treq.response import _Response as TReqResponse +from twisted.internet import defer, reactor, threads +from twisted.web import http +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from wokkel import rsm + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools import utils +from sat.tools.common import data_format, tls +from sat.tools.common.async_utils import async_lru + +from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, + AP_MB_MAP, LRU_MAX_SIZE) +from .http_server import HTTPServer +from .pubsub_service import APPubsubService + + +log = getLogger(__name__) + +IMPORT_NAME = "ap-gateway" + +PLUGIN_INFO = { + C.PI_NAME: "ActivityPub Gateway component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "APGateway", + C.PI_HANDLER: C.BOOL_TRUE, + C.PI_DESCRIPTION: _( + "Gateway for bidirectional communication between XMPP and ActivityPub." + ), +} + +HEXA_ENC = r"(?P[0-9a-fA-f]{2})" +RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") +RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") +RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") + + +class APGateway: + + def __init__(self, host): + self.host = host + self.initialised = False + self._m = host.plugins["XEP-0277"] + self._p = host.plugins["XEP-0060"] + + host.bridge.addMethod( + "APSend", + ".plugin", + in_sign="sss", + out_sign="", + method=self._publishMessage, + async_=True, + ) + + def getHandler(self, __): + return APPubsubService(self) + + async def init(self, client): + if self.initialised: + return + + self.initialised = True + log.info(_("ActivityPub Gateway initialization")) + + # RSA keys + stored_data = await self.host.memory.storage.getPrivates( + IMPORT_NAME, ["rsa_key"], profile=client.profile + ) + private_key_pem = stored_data.get("rsa_key") + if private_key_pem is None: + self.private_key = await threads.deferToThread( + rsa.generate_private_key, + public_exponent=65537, + key_size=4096, + ) + private_key_pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode() + await self.host.memory.storage.setPrivateValue( + IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile + ) + else: + self.private_key = serialization.load_pem_private_key( + private_key_pem.encode(), + password=None, + ) + self.public_key = self.private_key.public_key() + self.public_key_pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ).decode() + + # params + # URL and port + self.public_url = self.host.memory.getConfig( + CONF_SECTION, "public_url" + ) or self.host.memory.getConfig( + CONF_SECTION, "xmpp_domain" + ) + if self.public_url is None: + log.error( + '"public_url" not set in configuration, this is mandatory to have' + "ActivityPub Gateway running. Please set this option it to public facing " + f"url in {CONF_SECTION!r} configuration section." + ) + return + if parse.urlparse(self.public_url).scheme: + log.error( + "Scheme must not be specified in \"public_url\", please remove it from " + "\"public_url\" configuration option. ActivityPub Gateway won't be run." + ) + return + self.http_port = int(self.host.memory.getConfig( + CONF_SECTION, 'http_port', 8123)) + connection_type = self.host.memory.getConfig( + CONF_SECTION, 'http_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + 'bad ap-gateay http_connection_type, you must use one of "http" or ' + '"https"' + ) + self.max_items = self.host.memory.getConfig( + CONF_SECTION, 'new_node_max_items', 50 + + ) + self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') + self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") + # True (default) if we provide gateway only to entities/services from our server + self.local_only = C.bool( + self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) + ) + + # HTTP server launch + self.server = HTTPServer(self) + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.getOptionsFromConfig( + self.host.memory.config, CONF_SECTION) + tls.TLSOptionsCheck(options) + context_factory = tls.getTLSContextFactory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + + async def profileConnecting(self, client): + self.client = client + await self.init(client) + + async def apGet(self, url: str) -> dict: + """Retrieve AP JSON from given URL + + @raise error.StanzaError: "service-unavailable" is sent when something went wrong + with AP server + """ + try: + return await treq.json_content(await treq.get( + url, + headers = { + "Accept": [MEDIA_TYPE_AP], + "Content-Type": [MEDIA_TYPE_AP], + } + )) + except Exception as e: + raise error.StanzaError( + "service-unavailable", + text=f"Can't get AP data at {url}: {e}" + ) + + def mustEncode(self, text: str) -> bool: + """Indicate if a text must be period encoded""" + return ( + not RE_ALLOWED_UNQUOTED.match(text) + or text.startswith("___") + or "---" in text + ) + + def periodEncode(self, text: str) -> str: + """Period encode a text + + see [getJIDAndNode] for reasons of period encoding + """ + return ( + parse.quote(text, safe="") + .replace("---", "%2d%2d%2d") + .replace("___", "%5f%5f%5f") + .replace(".", "%2e") + .replace("~", "%7e") + .replace("%", ".") + ) + + async def getAPAccountFromJidAndNode( + self, + jid_: jid.JID, + node: Optional[str] + ) -> str: + """Construct AP account from JID and node + + The account construction will use escaping when necessary + """ + if not node or node == self._m.namespace: + node = None + + if node and not jid_.user and not self.mustEncode(node): + is_pubsub = await self.isPubsub(jid_) + # when we have a pubsub service, the user part can be used to set the node + # this produces more user-friendly AP accounts + if is_pubsub: + jid_.user = node + node = None + + is_local = self.isLocal(jid_) + user = jid_.user if is_local else jid_.userhost() + if user is None: + user = "" + account_elts = [] + if node and self.mustEncode(node) or self.mustEncode(user): + account_elts = ["___"] + if node: + node = self.periodEncode(node) + user = self.periodEncode(user) + + if not user: + raise exceptions.InternalError("there should be a user part") + + if node: + account_elts.extend((node, "---")) + + account_elts.extend(( + user, "@", jid_.host if is_local else self.client.jid.userhost() + )) + return "".join(account_elts) + + def isLocal(self, jid_: jid.JID) -> bool: + """Returns True if jid_ use a domain or subdomain of gateway's host""" + local_host = self.client.host.split(".") + assert local_host + return jid_.host.split(".")[-len(local_host):] == local_host + + async def isPubsub(self, jid_: jid.JID) -> bool: + """Indicate if a JID is a Pubsub service""" + host_disco = await self.host.getDiscoInfos(self.client, jid_) + return ( + ("pubsub", "service") in host_disco.identities + and not ("pubsub", "pep") in host_disco.identities + ) + + async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: + """Decode raw AP account handle to get XMPP JID and Pubsub Node + + Username are case insensitive. + + By default, the username correspond to local username (i.e. username from + component's server). + + If local name's domain is a pubsub service (and not PEP), the username is taken as + a pubsub node. + + If ``---`` is present in username, the part before is used as pubsub node, and the + rest as a JID user part. + + If username starts with ``___``, characters are encoded using period encoding + (i.e. percent encoding where a ``.`` is used instead of ``%``). + + This horror is necessary due to limitation in some AP implementation (notably + Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 + + examples: + + ``toto@example.org`` => JID = toto@example.org, node = None + + ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a + non-local JID, and will work only if setings ``local_only`` is False), node = None + + ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => + JID = pubsub.example.org, node = toto + + ``tata---toto@example.org`` => JID = toto@example.org, node = tata + + ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org + being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 + + @param ap_account: ActivityPub account handle (``username@domain.tld``) + @return: service JID and pubsub node + if pubsub is None, default microblog pubsub node (and possibly other nodes + that plugins may hanlde) will be used + @raise ValueError: invalid account + @raise PermissionError: non local jid is used when gateway doesn't allow them + """ + if ap_account.count("@") != 1: + raise ValueError("Invalid AP account") + if ap_account.startswith("___"): + encoded = True + ap_account = ap_account[3:] + else: + encoded = False + + username, domain = ap_account.split("@") + + if "---" in username: + node, username = username.rsplit("---", 1) + else: + node = None + + if encoded: + username = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g", username), + errors="strict" + ) + if node: + node = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g", node), + errors="strict" + ) + + if "@" in username: + username, domain = username.rsplit("@", 1) + + if not node: + # we need to check host disco, because disco request to user may be + # blocked for privacy reason (see + # https://xmpp.org/extensions/xep-0030.html#security) + is_pubsub = await self.isPubsub(jid.JID(domain)) + + if is_pubsub: + # if the host is a pubsub service and not a PEP, we consider that username + # is in fact the node name + node = username + username = None + + jid_s = f"{username}@{domain}" if username else domain + try: + jid_ = jid.JID(jid_s) + except RuntimeError: + raise ValueError(f"Invalid jid: {jid_s!r}") + + if self.local_only and not self.isLocal(jid_): + raise exceptions.PermissionError( + "This gateway is configured to map only local entities and services" + ) + + return jid_, node + + def parseAPURL(self, url: str) -> Tuple[str, str]: + """Parse an URL leading to an AP endpoint + + @param url: URL to parse (schema is not mandatory) + @return: endpoint type and AP account + """ + path = parse.urlparse(url).path.lstrip("/") + type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) + return type_, parse.unquote(account) + + def buildAPURL(self, type_:str , *args: str) -> str: + """Build an AP endpoint URL + + @param type_: type of AP endpoing + @param arg: endpoint dependant arguments + """ + return parse.urljoin( + self.base_ap_url, + str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) + ) + + async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: + """Sign a documentent and post it to AP server + + @param url: AP server endpoint + @param url_actor: URL generated by this gateway for local actor + @param doc: document to send + """ + p_url = parse.urlparse(url) + date = http.datetimeToString().decode() + body = json.dumps(doc).encode() + digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() + digest = f"sha-256={digest_hash}" + to_sign = ( + f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" + f"date: {date}\ndigest: {digest}" + ) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), + hashes.SHA256() + )).decode() + h_signature = ( + f'keyId="{url_actor}",headers="(request-target) host date digest",' + f'signature="{signature}"' + ) + return await treq.post( + url, + body, + headers={ + "Host": [p_url.hostname], + "Date": [date], + "Digest": [digest], + "Signature": [h_signature], + } + ) + + def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): + mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore + service = jid.JID(service_s) + client = self.host.getClient(profile) + return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) + + async def getAPActorIdFromAccount(self, account: str) -> str: + """Retrieve account ID from it's handle using WebFinger + + @param account: AP handle (user@domain.tld) + @return: Actor ID (which is an URL) + """ + if account.count("@") != 1 or "/" in account: + raise ValueError("Invalid account: {account!r}") + host = account.split("@")[1] + try: + finger_data = await treq.json_content(await treq.get( + f"https://{host}/.well-known/webfinger?" + f"resource=acct:{parse.quote_plus(account)}", + )) + except Exception as e: + raise exceptions.DataError(f"Can't get webfinger data: {e}") + for link in finger_data.get("links", []): + if ( + link.get("type") == "application/activity+json" + and link.get("rel") == "self" + ): + href = link.get("href", "").strip() + if not href: + raise ValueError( + f"Invalid webfinger data for {account:r}: missing href" + ) + break + else: + raise ValueError( + f"No ActivityPub link found for {account!r}" + ) + return href + + async def getAPActorDataFromId(self, account: str) -> dict: + """Retrieve ActivityPub Actor data + + @param account: ActivityPub Actor identifier + """ + href = await self.getAPActorIdFromAccount(account) + return await self.apGet(href) + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getAPAccountFromId(self, actor_id: str): + """Retrieve AP account from the ID URL + + @param actor_id: AP ID of the actor (URL to the actor data) + """ + url_parsed = parse.urlparse(actor_id) + actor_data = await self.apGet(actor_id) + username = actor_data.get("preferredUsername") + if not username: + raise exceptions.DataError( + 'No "preferredUsername" field found, can\'t retrieve actor account' + ) + account = f"{username}@{url_parsed.hostname}" + # we try to retrieve the actor ID from the account to check it + found_id = await self.getAPActorIdFromAccount(account) + if found_id != actor_id: + # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 + msg = ( + f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " + f"({actor_id!r}). This AP instance doesn't seems to use " + '"preferredUsername" as we expect.' + ) + log.warning(msg) + raise exceptions.DataError(msg) + return account + + async def getAPItems( + self, + account: str, + max_items: Optional[int] = None, + chronological_pagination: bool = True, + after_id: Optional[str] = None, + start_index: Optional[int] = None, + ) -> Tuple[List[domish.Element], rsm.RSMResponse]: + """Retrieve AP items and convert them to XMPP items + + @param account: AP account handle to get items from + @param max_items: maximum number of items to retrieve + retrieve all items by default + @param chronological_pagination: get pages in chronological order + AP use reversed chronological order for pagination, "first" page returns more + recent items. If "chronological_pagination" is True, "last" AP page will be + retrieved first. + @param after_id: if set, retrieve items starting from given ID + Due to ActivityStream Collection Paging limitations, this is inefficient and + if ``after_id`` is not already in cache, we have to retrieve every page until + we find it. + In most common cases, ``after_id`` should be in cache though (client usually + use known ID when in-order pagination is used). + @param start_index: start retrieving items from the one with given index + Due to ActivityStream Collection Paging limitations, this is inefficient and + all pages before the requested index will be retrieved to count items. + @return: XMPP Pubsub items and corresponding RSM Response + Items are always returned in chronological order in the result + """ + actor_data = await self.getAPActorDataFromId(account) + outbox = actor_data.get("outbox") + rsm_resp: Dict[str, Union[bool, int]] = {} + if not outbox: + raise exceptions.DataError(f"No outbox found for actor {account}") + outbox_data = await self.apGet(outbox) + try: + count = outbox_data["totalItems"] + except KeyError: + log.warning( + f'"totalItems" not found in outbox of {account}, defaulting to 20' + ) + count = 20 + else: + log.info(f"{account}'s outbox has {count} item(s)") + rsm_resp["count"] = count + + if start_index is not None: + assert chronological_pagination and after_id is None + if start_index >= count: + return [], rsm_resp + elif start_index == 0: + # this is the default behaviour + pass + elif start_index > 5000: + raise error.StanzaError( + "feature-not-implemented", + text="Maximum limit for previous_index has been reached, this limit" + "is set to avoid DoS" + ) + else: + # we'll convert "start_index" to "after_id", thus we need the item just + # before "start_index" + previous_index = start_index - 1 + retrieved_items = 0 + current_page = outbox_data["last"] + while retrieved_items < count: + page_data, items = await self.parseAPPage(current_page) + if not items: + log.warning(f"found an empty AP page at {current_page}") + return [], rsm_resp + page_start_idx = retrieved_items + retrieved_items += len(items) + if previous_index <= retrieved_items: + after_id = items[previous_index - page_start_idx]["id"] + break + try: + current_page = page_data["prev"] + except KeyError: + log.warning( + f"missing previous page link at {current_page}: {page_data!r}" + ) + raise error.StanzaError( + "service-unavailable", + "Error while retrieving previous page from AP service at " + f"{current_page}" + ) + + init_page = "last" if chronological_pagination else "first" + page = outbox_data.get(init_page) + if not page: + raise exceptions.DataError( + f"Initial page {init_page!r} not found for outbox {outbox}" + ) + items = [] + page_items = [] + retrieved_items = 0 + found_after_id = False + + while retrieved_items < count: + __, page_items = await self.parseAPPage(page) + retrieved_items += len(page_items) + if after_id is not None and not found_after_id: + # if we have an after_id, we ignore all items until the requested one is + # found + try: + limit_idx = [i["id"] for i in page_items].index(after_id) + except ValueError: + # if "after_id" is not found, we don't add any item from this page + log.debug(f"{after_id!r} not found at {page}, skipping") + else: + found_after_id = True + if chronological_pagination: + start_index = retrieved_items - len(page_items) + limit_idx + 1 + page_items = page_items[limit_idx+1:] + else: + start_index = count - (retrieved_items - len(page_items) + + limit_idx + 1) + page_items = page_items[:limit_idx] + items.extend(page_items) + else: + items.extend(page_items) + if max_items is not None and len(items) >= max_items: + if chronological_pagination: + items = items[:max_items] + else: + items = items[-max_items:] + break + page = outbox_data.get("prev" if chronological_pagination else "next") + if not page: + break + + if after_id is not None and not found_after_id: + raise error.StanzaError("item-not-found") + + if after_id is None: + rsm_resp["index"] = 0 if chronological_pagination else count - len(items) + + if start_index is not None: + rsm_resp["index"] = start_index + elif after_id is not None: + log.warning("Can't determine index of first element") + elif chronological_pagination: + rsm_resp["index"] = 0 + else: + rsm_resp["index"] = count - len(items) + if items: + rsm_resp.update({ + "first": items[0]["id"], + "last": items[-1]["id"] + }) + + return items, rsm.RSMResponse(**rsm_resp) + + async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: + """Convert AP objects from an AP page to XMPP items + + @param url: url linking and AP page + @return: page data, pubsub items + """ + page_data = await self.apGet(url) + ap_items = page_data.get("orderedItems") + if not ap_items: + log.warning('No "orderedItems" collection found') + return page_data, [] + items = [] + # AP Collections are in antichronological order, but we expect chronological in + # Pubsub, thus we reverse it + for ap_item in reversed(ap_items): + try: + ap_object, mb_data = await self.apItem2MBdata(ap_item) + except (exceptions.DataError, NotImplementedError, error.StanzaError): + continue + + item_elt = await self._m.data2entry( + self.client, mb_data, ap_object["id"], None, self._m.namespace + ) + item_elt["publisher"] = mb_data["author_jid"].full() + items.append(item_elt) + + return page_data, items + + async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: + """Convert AP item to microblog data + + @return: AP Item's Object and microblog data + @raise exceptions.DataError: something is invalid in the AP item + @raise NotImplemented: some AP data is not handled yet + @raise error.StanzaError: error while contacting the AP server + """ + ap_object = ap_item.get("object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + if isinstance(ap_object, str): + ap_object = await self.apGet(ap_object) + obj_id = ap_object.get("id") + if not obj_id: + log.warning(f'No "id" found in AP object: {ap_object!r}') + raise exceptions.DataError + if ap_object.get("inReplyTo") is not None: + raise NotImplementedError + mb_data = {} + for ap_key, mb_key in AP_MB_MAP.items(): + data = ap_object.get(ap_key) + if data is None: + continue + mb_data[mb_key] = data + + # content + try: + language, content_xhtml = ap_object["contentMap"].popitem() + except (KeyError, AttributeError): + try: + mb_data["content_xhtml"] = mb_data["content"] + except KeyError: + log.warning(f"no content found:\n{ap_object!r}") + raise exceptions.DataError + else: + mb_data["language"] = language + mb_data["content_xhtml"] = content_xhtml + + # author + actor = ap_item.get("actor") + if not actor: + log.warning(f"no actor associated to object id {obj_id!r}") + raise exceptions.DataError + elif isinstance(actor, list): + # we only keep first item of list as author + # TODO: handle multiple actors + if len(actor) > 1: + log.warning("multiple actors are not managed") + actor = actor[0] + + if isinstance(actor, dict): + actor = actor.get("id") + if not actor: + log.warning(f"no actor id found: {actor!r}") + raise exceptions.DataError + + if isinstance(actor, str): + account = await self.getAPAccountFromId(actor) + mb_data["author"] = account.split("@", 1)[0] + author_jid = mb_data["author_jid"] = jid.JID( + None, + ( + self.host.plugins["XEP-0106"].escape(account), + self.client.jid.host, + None + ) + ) + else: + log.warning(f"unknown actor type found: {actor!r}") + raise exceptions.DataError + + # published/updated + for field in ("published", "updated"): + value = ap_object.get(field) + if not value and field == "updated": + value = ap_object.get("published") + if value: + try: + mb_data[field] = calendar.timegm( + dateutil.parser.parse(str(value)).utctimetuple() + ) + except dateutil.parser.ParserError as e: + log.warning(f"Can't parse {field!r} field: {e}") + return ap_object, mb_data + + async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: + """Convert Libervia Microblog Data to ActivityPub item""" + if not mb_data.get("id"): + mb_data["id"] = shortuuid.uuid() + if not mb_data.get("author_jid"): + mb_data["author_jid"] = client.jid + ap_account = await self.getAPAccountFromJidAndNode( + jid.JID(mb_data["author_jid"]), + None + ) + url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) + url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url_item, + "type": "Create", + "actor": url_actor, + + "object": { + "id": url_item, + "type": "Note", + "published": utils.xmpp_date(mb_data["published"]), + "attributedTo": url_actor, + "content": mb_data.get("content_xhtml") or mb_data["content"], + "to": "https://www.w3.org/ns/activitystreams#Public" + } + } + + async def publishMessage( + self, + client: SatXMPPEntity, + mess_data: dict, + service: jid.JID + ) -> None: + """Send an AP message + + .. note:: + + This is a temporary method used for development only + + @param mess_data: message data. Following keys must be set: + + ``node`` + identifier of message which is being replied (this will + correspond to pubsub node in the future) + + ``content_xhtml`` or ``content`` + message body (respectively in XHTML or plain text) + + @param service: JID corresponding to the AP actor. + """ + if not service.user: + raise ValueError("service must have a local part") + account = self.host.plugins["XEP-0106"].unescape(service.user) + ap_actor_data = await self.getAPActorDataFromId(account) + + try: + inbox_url = ap_actor_data["endpoints"]["sharedInbox"] + except KeyError: + raise exceptions.DataError("Can't get ActivityPub actor inbox") + + item_data = await self.mbdata2APitem(client, mess_data) + url_actor = item_data["object"]["attributedTo"] + resp = await self.signAndPost(inbox_url, url_actor, item_data) + if resp.code != 202: + raise exceptions.NetworkError(f"unexpected return code: {resp.code}") diff -r b15644cae50d -r 86eea17cafa7 sat/plugins/plugin_comp_ap_gateway/constants.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +IMPORT_NAME = "ap-gateway" +CONF_SECTION = f"component {IMPORT_NAME}" +CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" +TYPE_ACTOR = "actor" +TYPE_INBOX = "inbox" +TYPE_OUTBOX = "outbox" +TYPE_ITEM = "item" +MEDIA_TYPE_AP = "application/activity+json" +# mapping from AP metadata to microblog data +AP_MB_MAP = { + "content": "content_xhtml", + +} +AP_REQUEST_TYPES = {"actor", "outbox"} +PAGE_SIZE = 10 + +LRU_MAX_SIZE = 200 diff -r b15644cae50d -r 86eea17cafa7 sat/plugins/plugin_comp_ap_gateway/http_server.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Optional, Dict, List +import json +from urllib import parse +import re +import unicodedata + +from twisted.web import http, resource as web_resource, server +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, error +from wokkel import pubsub, rsm + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger + +from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, + AP_REQUEST_TYPES, PAGE_SIZE) + + +log = getLogger(__name__) + +VERSION = unicodedata.normalize( + 'NFKD', + f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" +) + + +class HTTPAPGServer(web_resource.Resource): + """HTTP Server handling ActivityPub S2S protocol""" + isLeaf = True + + def __init__(self, ap_gateway): + self.apg = ap_gateway + super().__init__() + + async def webfinger(self, request): + url_parsed = parse.urlparse(request.uri.decode()) + query = parse.parse_qs(url_parsed.query) + resource = query.get("resource", [""])[0] + account = resource[5:].strip() + if not resource.startswith("acct:") or not account: + return web_resource.ErrorPage( + http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" + ).render(request) + + actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) + + resp = { + "subject": resource, + "links": [ + { + "rel": "self", + "type": "application/activity+json", + "href": actor_url + } + ] + } + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(resp).encode()) + request.finish() + + async def APActorRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + actor_url: str + ) -> dict: + inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) + outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) + + # we have to use AP account as preferredUsername because it is used to retrieve + # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) + preferred_username = ap_account.split("@", 1)[0] + return { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1" + ], + + "id": actor_url, + "type": "Person", + "preferredUsername": preferred_username, + "inbox": inbox_url, + "outbox": outbox_url, + "publicKey": { + "id": f"{actor_url}#main-key", + "owner": actor_url, + "publicKeyPem": self.apg.public_key_pem + } + } + + def getCanonicalURL(self, request: "HTTPRequest") -> str: + return parse.urljoin( + f"https://{self.apg.public_url}", + request.path.decode().rstrip("/") + ) + + def queryData2RSMRequest( + self, + query_data: Dict[str, List[str]] + ) -> rsm.RSMRequest: + """Get RSM kwargs to use with RSMRequest from query data""" + page = query_data.get("page") + + if page == ["first"]: + return rsm.RSMRequest(max_=PAGE_SIZE, before="") + elif page == ["last"]: + return rsm.RSMRequest(max_=PAGE_SIZE) + else: + for query_key in ("index", "before", "after"): + try: + kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} + except (KeyError, IndexError, ValueError): + pass + else: + return rsm.RSMRequest(**kwargs) + raise ValueError(f"Invalid query data: {query_data!r}") + + async def APOutboxPageRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + query_data: Dict[str, List[str]] + ) -> dict: + # we only keep useful keys, and sort to have consistent URL which can + # be used as ID + url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) + query_data = {k: query_data[k] for k in url_keys} + try: + items, metadata = await self.apg._p.getItems( + client=self.apg.client, + service=account_jid, + node=node, + rsm_request=self.queryData2RSMRequest(query_data), + extra = {C.KEY_USE_CACHE: False} + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + + base_url = self.getCanonicalURL(request) + url = f"{base_url}?{parse.urlencode(query_data, True)}" + data = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url, + "type": "OrderedCollectionPage", + "partOf": base_url, + "orderedItems" : [ + await self.apg.mbdata2APitem( + self.apg.client, + await self.apg._m.item2mbdata( + self.apg.client, + item, + account_jid, + node + ) + ) + for item in reversed(items) + ] + } + + # AP OrderedCollection must be in reversed chronological order, thus the opposite + # of what we get with RSM (at least with Libervia Pubsub) + if not metadata["complete"]: + try: + last= metadata["rsm"]["last"] + except KeyError: + last = None + data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" + if metadata["rsm"]["index"] != 0: + try: + first= metadata["rsm"]["first"] + except KeyError: + first = None + data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" + + return data + + async def APOutboxRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str + ) -> dict: + if node is None: + node = self.apg._m.namespace + + parsed_url = parse.urlparse(request.uri.decode()) + query_data = parse.parse_qs(parsed_url.query) + if query_data: + return await self.APOutboxPageRequest( + request, account_jid, node, ap_account, ap_url, query_data + ) + + # XXX: we can't use disco#info here because this request won't work on a bare jid + # due to security considerations of XEP-0030 (we don't have presence + # subscription). + # The current workaround is to do a request as if RSM was available, and actually + # check its availability according to result. + try: + __, metadata = await self.apg._p.getItems( + client=self.apg.client, + service=account_jid, + node=node, + max_items=0, + rsm_request=rsm.RSMRequest(max_=0) + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + try: + items_count = metadata["rsm"]["count"] + except KeyError: + log.warning( + f"No RSM metadata found when requesting pubsub node {node} at " + f"{account_jid}, defaulting to items_count=20" + ) + items_count = 20 + + url = self.getCanonicalURL(request) + url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" + url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url, + "totalItems": items_count, + "type": "OrderedCollection", + "first": url_first_page, + "last": url_last_page, + } + + async def APRequest(self, request): + path = request.path.decode() + ap_url = parse.urljoin( + f"https://{self.apg.public_url}", + path + ) + request_type, ap_account = self.apg.parseAPURL(ap_url) + account_jid, node = await self.apg.getJIDAndNode(ap_account) + if request_type not in AP_REQUEST_TYPES: + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + method = getattr(self, f"AP{request_type.title()}Request") + ret_data = await method(request, account_jid, node, ap_account, ap_url) + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(ret_data).encode()) + request.finish() + + def render(self, request): + request.setHeader("server", VERSION) + return super().render(request) + + def render_GET(self, request): + path = request.path.decode().lstrip("/") + if path.startswith(".well-known/webfinger"): + defer.ensureDeferred(self.webfinger(request)) + return server.NOT_DONE_YET + elif path.startswith(self.apg.ap_path): + defer.ensureDeferred(self.APRequest(request)) + return server.NOT_DONE_YET + + return web_resource.NoResource().render(request) + + +class HTTPRequest(server.Request): + pass + + +class HTTPServer(server.Site): + requestFactory = HTTPRequest + + def __init__(self, ap_gateway): + super().__init__(HTTPAPGServer(ap_gateway)) diff -r b15644cae50d -r 86eea17cafa7 sat/plugins/plugin_comp_ap_gateway/pubsub_service.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Optional, List + +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from wokkel import rsm + +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools.utils import ensure_deferred + + +log = getLogger(__name__) + + +class APPubsubService(rsm.PubSubService): + """Pubsub service for XMPP requests""" + + def __init__(self, apg): + super(APPubsubService, self).__init__() + self.host = apg.host + self.apg = apg + self.discoIdentity = { + "category": "pubsub", + "type": "service", + "name": "Libervia ActivityPub Gateway", + } + + @ensure_deferred + async def publish(self, requestor, service, nodeIdentifier, items): + raise NotImplementedError + + @ensure_deferred + async def items( + self, + requestor: jid.JID, + service: jid.JID, + node: str, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + rsm_req: Optional[rsm.RSMRequest] + ) -> List[domish.Element]: + if not service.user: + return [] + ap_account = self.host.plugins["XEP-0106"].unescape(service.user) + if ap_account.count("@") != 1: + log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") + return [] + if node != self.apg._m.namespace: + raise error.StanzaError( + "feature-not-implemented", + text=f"{VERSION} only supports {self.apg._m.namespace} " + "node for now" + ) + if rsm_req is None: + if maxItems is None: + maxItems = 20 + kwargs = { + "max_items": maxItems, + "chronological_pagination": False, + } + else: + if len( + [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) + if v is not None] + ) > 1: + raise error.StanzaError( + "bad-request", + text="You can't use after, before and index at the same time" + ) + kwargs = {"max_items": rsm_req.max} + if rsm_req.after is not None: + kwargs["after_id"] = rsm_req.after + elif rsm_req.before is not None: + kwargs["chronological_pagination"] = False + if rsm_req.before != "": + kwargs["after_id"] = rsm_req.before + elif rsm_req.index is not None: + kwargs["start_index"] = rsm_req.index + + log.info( + f"No cache found for node {node} at {service} (AP account {ap_account}), " + "using Collection Paging to RSM translation" + ) + return await self.apg.getAPItems(ap_account, **kwargs) + + @ensure_deferred + async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + raise NotImplementedError + + def getNodeInfo( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str, + pep: bool = False, + recipient: Optional[jid.JID] = None + ) -> Optional[dict]: + if not nodeIdentifier: + return None + info = { + "type": "leaf", + "meta-data": [ + {"var": "pubsub#persist_items", "type": "boolean", "value": True}, + {"var": "pubsub#max_items", "value": "max"}, + {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, + {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, + + ] + + } + return info