Mercurial > libervia-backend
changeset 3682:7c990aaa49d3
comp AP Gateway: ActivityPub Component first draft:
this implement the base component, it is for the moment usable only through a developer
API.
rel: 362
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 26 Sep 2021 16:41:55 +0200 |
parents | 742e466fa000 |
children | a1eff4e32848 |
files | sat/plugins/plugin_comp_ap_gateway.py |
diffstat | 1 files changed, 415 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway.py Sun Sep 26 16:41:55 2021 +0200 @@ -0,0 +1,415 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +import json +import base64 +import hashlib +from urllib import parse +from typing import Tuple +from pathlib import Path +import shortuuid +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding +from twisted.internet import reactor, threads, defer +from twisted.web import server, resource as web_resource, http +from twisted.words.protocols.jabber import jid +import treq +from treq.response import _Response as TReqResponse +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.core.log import getLogger +from sat.core.core_types import SatXMPPEntity +from sat.tools.common import tls, data_format +from sat.tools import utils + + +log = getLogger(__name__) + +IMPORT_NAME = "ap-gateway" + +PLUGIN_INFO = { + C.PI_NAME: "ActivityPub Gateway component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0106"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "APGateway", + C.PI_HANDLER: C.BOOL_FALSE, + C.PI_DESCRIPTION: _( + "Gateway for bidirectional communication between XMPP and ActivityPub." + ), +} + +CONF_SECTION = f"component {IMPORT_NAME}" +CONTENT_TYPE_AP = "application/activity+json; charset=utf-8" +TYPE_ACTOR = "actor" +TYPE_INBOX = "inbox" +TYPE_ITEM = "item" +MEDIA_TYPE_AP = "application/activity+json" + + +class HTTPAPGServer(web_resource.Resource): + """HTTP Server handling ActivityPub S2S protocol""" + isLeaf = True + + def __init__(self, ap_gateway): + self.apg = ap_gateway + super().__init__() + + def webfinger(self, request): + url_parsed = parse.urlparse(request.uri.decode()) + query = parse.parse_qs(url_parsed.query) + resource = query.get("resource", [""])[0] + account = resource[5:].strip() + log.info(f"request pour {account}") + if not resource.startswith("acct:") or not account: + return web_resource.ErrorPage( + http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" + ).render(request) + + actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) + + resp = { + "subject": resource, + "links": [ + { + "rel": "self", + "type": "application/activity+json", + "href": actor_url + } + ] + } + request.setHeader("content-type", CONTENT_TYPE_AP) + return json.dumps(resp).encode() + + def APRequest(self, request): + path = request.path.decode() + actor_url = parse.urljoin( + f"https://{self.apg.public_url}", + path + ) + __, account = self.apg.parseAPURL(actor_url) + inbox_url = self.apg.buildAPURL(TYPE_INBOX, account) + username = account.split("@", 1)[0] + actor = { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1" + ], + + "id": actor_url, + "type": "Person", + "preferredUsername": username, + "inbox": inbox_url, + + "publicKey": { + "id": f"{actor_url}#main-key", + "owner": actor_url, + "publicKeyPem": self.apg.public_key_pem + } + } + request.setHeader("content-type", CONTENT_TYPE_AP) + return json.dumps(actor).encode() + + def render_GET(self, request): + path = request.path.decode().lstrip("/") + if path.startswith(".well-known/webfinger"): + return self.webfinger(request) + elif path.startswith(self.apg.ap_path): + return self.APRequest(request) + return web_resource.NoResource().render(request) + + +class HTTPRequest(server.Request): + pass + + +class HTTPServer(server.Site): + requestFactory = HTTPRequest + + def __init__(self, ap_gateway): + super().__init__(HTTPAPGServer(ap_gateway)) + + +class APGateway: + + def __init__(self, host): + self.host = host + self.initialised = False + + host.bridge.addMethod( + "APSend", + ".plugin", + in_sign="sss", + out_sign="", + method=self._publishMessage, + async_=True, + ) + + async def init(self, client): + if self.initialised: + return + + self.initialised = True + log.info(_("ActivityPub Gateway initialization")) + + # RSA keys + stored_data = await self.host.memory.storage.getPrivates( + IMPORT_NAME, ["rsa_key"], profile=client.profile + ) + private_key_pem = stored_data.get("rsa_key") + if private_key_pem is None: + self.private_key = await threads.deferToThread( + rsa.generate_private_key, + public_exponent=65537, + key_size=4096, + ) + private_key_pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode() + await self.host.memory.storage.setPrivateValue( + IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile + ) + else: + self.private_key = serialization.load_pem_private_key( + private_key_pem.encode(), + password=None, + ) + self.public_key = self.private_key.public_key() + self.public_key_pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ).decode() + + # params (URL and port) + self.public_url = self.host.memory.getConfig( + CONF_SECTION, "public_url" + ) or self.host.memory.getConfig( + CONF_SECTION, "xmpp_domain" + ) + if self.public_url is None: + log.error( + '"public_url" not set in configuration, this is mandatory to have' + "ActivityPub Gateway running. Please set this option it to public facing " + f"url in {CONF_SECTION!r} configuration section." + ) + return + if parse.urlparse(self.public_url).scheme: + log.error( + "Scheme must not be specified in \"public_url\", please remove it from " + "\"public_url\" configuration option. ActivityPub Gateway won't be run." + ) + return + self.http_port = int(self.host.memory.getConfig( + CONF_SECTION, 'http_port', 8123)) + connection_type = self.host.memory.getConfig( + CONF_SECTION, 'http_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + 'bad ap-gateay http_connection_type, you must use one of "http" or ' + '"https"' + ) + self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') + self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") + + # HTTP server launch + self.server = HTTPServer(self) + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.getOptionsFromConfig( + self.host.memory.config, CONF_SECTION) + tls.TLSOptionsCheck(options) + context_factory = tls.getTLSContextFactory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + + async def profileConnecting(self, client): + await self.init(client) + + def parseAPURL(self, url: str) -> Tuple[str, str]: + """Parse an URL leading to an AP endpoint + + @param url: URL to parse (schema is not mandatory) + @return: endpoint type and AP account + """ + path = parse.urlparse(url).path.lstrip("/") + type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) + return type_, parse.unquote(account) + + def buildAPURL(self, type_:str , *args: str) -> str: + """Build an AP endpoint URL + + @param type_: type of AP endpoing + @param arg: endpoint dependant arguments + """ + return parse.urljoin( + self.base_ap_url, + str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) + ) + + async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: + """Sign a documentent and post it to AP server + + @param url: AP server endpoint + @param url_actor: URL generated by this gateway for local actor + @param doc: document to send + """ + p_url = parse.urlparse(url) + date = http.datetimeToString().decode() + body = json.dumps(doc).encode() + digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() + digest = f"sha-256={digest_hash}" + to_sign = ( + f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" + f"date: {date}\ndigest: {digest}" + ) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), + hashes.SHA256() + )).decode() + h_signature = ( + f'keyId="{url_actor}",headers="(request-target) host date digest",' + f'signature="{signature}"' + ) + return await treq.post( + url, + body, + headers={ + "Host": [p_url.hostname], + "Date": [date], + "Digest": [digest], + "Signature": [h_signature], + } + ) + + def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): + mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore + service = jid.JID(service_s) + client = self.host.getClient(profile) + return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) + + async def getAPActorData(self, account: str) -> dict: + """Retrieve ActivityPub Actor data + + @param account: ActivityPub Actor identifier + """ + if account.count("@") != 1 or "/" in account: + raise ValueError("Invalid account: {account!r}") + host = account.split("@")[1] + try: + finger_data = await treq.json_content(await treq.get( + f"https://{host}/.well-known/webfinger?" + f"resource=acct:{parse.quote_plus(account)}", + )) + except Exception as e: + raise exceptions.DataError(f"Can't get webfinger data: {e}") + for link in finger_data.get("links", []): + if ( + link.get("type") == "application/activity+json" + and link.get("rel") == "self" + ): + href = link.get("href", "").strip() + if not href: + raise ValueError( + f"Invalid webfinger data for {account:r}: missing href" + ) + break + else: + raise ValueError( + f"No ActivityPub link found for {account!r}" + ) + try: + ap_actor_data = await treq.json_content(await treq.get( + href, + headers = { + "Accept": [MEDIA_TYPE_AP], + "Content-Type": [MEDIA_TYPE_AP], + } + )) + except Exception as e: + raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}") + + return ap_actor_data + + async def publishMessage( + self, + client: SatXMPPEntity, + mess_data: dict, + service: jid.JID + ) -> None: + """Send an AP message + + .. note:: + + This is a temporary method used for development only + + @param mess_data: message data. Following keys must be set: + + ``node`` + identifier of message which is being replied (this will + correspond to pubsub node in the future) + + ``content_xhtml`` or ``content`` + message body (respectively in XHTML or plain text) + + @param service: JID corresponding to the AP actor. + """ + if not service.user: + raise ValueError("service must have a local part") + account = self.host.plugins["XEP-0106"].unescape(service.user) + ap_actor_data = await self.getAPActorData(account) + + try: + inbox_url = ap_actor_data["endpoints"]["sharedInbox"] + except KeyError: + raise exceptions.DataError("Can't get ActivityPub actor inbox") + + if not mess_data.get("id"): + mess_data["id"] = shortuuid.uuid() + url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost()) + url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"]) + now = time.time() + item_data = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url_item, + "type": "Create", + "actor": url_actor, + + "object": { + "id": url_item, + "type": "Note", + "published": utils.xmpp_date(now), + "attributedTo": url_actor, + "inReplyTo": mess_data["node"], + "content": mess_data.get("content_xhtml") or mess_data["content"], + "to": "https://www.w3.org/ns/activitystreams#Public" + } + } + resp = await self.signAndPost(inbox_url, url_actor, item_data) + if resp.code == 202: + raise exceptions.NetworkError(f"unexpected return code: {resp.code}")