changeset 3682:7c990aaa49d3

comp AP Gateway: ActivityPub Component first draft: this implement the base component, it is for the moment usable only through a developer API. rel: 362
author Goffi <goffi@goffi.org>
date Sun, 26 Sep 2021 16:41:55 +0200
parents 742e466fa000
children a1eff4e32848
files sat/plugins/plugin_comp_ap_gateway.py
diffstat 1 files changed, 415 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway.py	Sun Sep 26 16:41:55 2021 +0200
@@ -0,0 +1,415 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import time
+import json
+import base64
+import hashlib
+from urllib import parse
+from typing import Tuple
+from pathlib import Path
+import shortuuid
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import padding
+from twisted.internet import reactor, threads, defer
+from twisted.web import server, resource as web_resource, http
+from twisted.words.protocols.jabber import jid
+import treq
+from treq.response import _Response as TReqResponse
+from sat.core.i18n import _
+from sat.core.constants import Const as C
+from sat.core import exceptions
+from sat.core.log import getLogger
+from sat.core.core_types import SatXMPPEntity
+from sat.tools.common import tls, data_format
+from sat.tools import utils
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "ap-gateway"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "ActivityPub Gateway component",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
+    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "APGateway",
+    C.PI_HANDLER: C.BOOL_FALSE,
+    C.PI_DESCRIPTION: _(
+        "Gateway for bidirectional communication between XMPP and ActivityPub."
+    ),
+}
+
+CONF_SECTION = f"component {IMPORT_NAME}"
+CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
+TYPE_ACTOR = "actor"
+TYPE_INBOX = "inbox"
+TYPE_ITEM = "item"
+MEDIA_TYPE_AP = "application/activity+json"
+
+
+class HTTPAPGServer(web_resource.Resource):
+    """HTTP Server handling ActivityPub S2S protocol"""
+    isLeaf = True
+
+    def __init__(self, ap_gateway):
+        self.apg = ap_gateway
+        super().__init__()
+
+    def webfinger(self, request):
+        url_parsed = parse.urlparse(request.uri.decode())
+        query = parse.parse_qs(url_parsed.query)
+        resource = query.get("resource", [""])[0]
+        account = resource[5:].strip()
+        log.info(f"request pour {account}")
+        if not resource.startswith("acct:") or not account:
+            return web_resource.ErrorPage(
+                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
+            ).render(request)
+
+        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
+
+        resp = {
+            "subject": resource,
+            "links": [
+                {
+                    "rel": "self",
+                    "type": "application/activity+json",
+                    "href": actor_url
+                }
+            ]
+        }
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        return json.dumps(resp).encode()
+
+    def APRequest(self, request):
+        path = request.path.decode()
+        actor_url = parse.urljoin(
+            f"https://{self.apg.public_url}",
+            path
+        )
+        __, account = self.apg.parseAPURL(actor_url)
+        inbox_url = self.apg.buildAPURL(TYPE_INBOX, account)
+        username = account.split("@", 1)[0]
+        actor = {
+            "@context": [
+                "https://www.w3.org/ns/activitystreams",
+                "https://w3id.org/security/v1"
+            ],
+
+            "id": actor_url,
+            "type": "Person",
+            "preferredUsername": username,
+            "inbox": inbox_url,
+
+            "publicKey": {
+                "id": f"{actor_url}#main-key",
+                "owner": actor_url,
+                "publicKeyPem": self.apg.public_key_pem
+            }
+        }
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        return json.dumps(actor).encode()
+
+    def render_GET(self, request):
+        path = request.path.decode().lstrip("/")
+        if path.startswith(".well-known/webfinger"):
+            return self.webfinger(request)
+        elif path.startswith(self.apg.ap_path):
+            return self.APRequest(request)
+        return web_resource.NoResource().render(request)
+
+
+class HTTPRequest(server.Request):
+    pass
+
+
+class HTTPServer(server.Site):
+    requestFactory = HTTPRequest
+
+    def __init__(self, ap_gateway):
+        super().__init__(HTTPAPGServer(ap_gateway))
+
+
+class APGateway:
+
+    def __init__(self, host):
+        self.host = host
+        self.initialised = False
+
+        host.bridge.addMethod(
+            "APSend",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._publishMessage,
+            async_=True,
+        )
+
+    async def init(self, client):
+        if self.initialised:
+            return
+
+        self.initialised = True
+        log.info(_("ActivityPub Gateway initialization"))
+
+        # RSA keys
+        stored_data = await self.host.memory.storage.getPrivates(
+            IMPORT_NAME, ["rsa_key"], profile=client.profile
+        )
+        private_key_pem = stored_data.get("rsa_key")
+        if private_key_pem is None:
+            self.private_key = await threads.deferToThread(
+                rsa.generate_private_key,
+                public_exponent=65537,
+                key_size=4096,
+            )
+            private_key_pem = self.private_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.PKCS8,
+                encryption_algorithm=serialization.NoEncryption()
+            ).decode()
+            await self.host.memory.storage.setPrivateValue(
+                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
+            )
+        else:
+            self.private_key = serialization.load_pem_private_key(
+                private_key_pem.encode(),
+                password=None,
+            )
+        self.public_key = self.private_key.public_key()
+        self.public_key_pem = self.public_key.public_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PublicFormat.SubjectPublicKeyInfo
+        ).decode()
+
+        # params (URL and port)
+        self.public_url = self.host.memory.getConfig(
+            CONF_SECTION, "public_url"
+        ) or self.host.memory.getConfig(
+            CONF_SECTION, "xmpp_domain"
+        )
+        if self.public_url is None:
+            log.error(
+                '"public_url" not set in configuration, this is mandatory to have'
+                "ActivityPub Gateway running. Please set this option it to public facing "
+                f"url in {CONF_SECTION!r} configuration section."
+            )
+            return
+        if parse.urlparse(self.public_url).scheme:
+            log.error(
+                "Scheme must not be specified in \"public_url\", please remove it from "
+                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
+            )
+            return
+        self.http_port = int(self.host.memory.getConfig(
+            CONF_SECTION, 'http_port', 8123))
+        connection_type = self.host.memory.getConfig(
+            CONF_SECTION, 'http_connection_type', 'https')
+        if connection_type not in ('http', 'https'):
+            raise exceptions.ConfigError(
+                'bad ap-gateay http_connection_type, you must use one of "http" or '
+                '"https"'
+            )
+        self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
+        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
+
+        # HTTP server launch
+        self.server = HTTPServer(self)
+        if connection_type == 'http':
+            reactor.listenTCP(self.http_port, self.server)
+        else:
+            options = tls.getOptionsFromConfig(
+                self.host.memory.config, CONF_SECTION)
+            tls.TLSOptionsCheck(options)
+            context_factory = tls.getTLSContextFactory(options)
+            reactor.listenSSL(self.http_port, self.server, context_factory)
+
+    async def profileConnecting(self, client):
+        await self.init(client)
+
+    def parseAPURL(self, url: str) -> Tuple[str, str]:
+        """Parse an URL leading to an AP endpoint
+
+        @param url: URL to parse (schema is not mandatory)
+        @return: endpoint type and AP account
+        """
+        path = parse.urlparse(url).path.lstrip("/")
+        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
+        return type_, parse.unquote(account)
+
+    def buildAPURL(self, type_:str , *args: str) -> str:
+        """Build an AP endpoint URL
+
+        @param type_: type of AP endpoing
+        @param arg: endpoint dependant arguments
+        """
+        return parse.urljoin(
+            self.base_ap_url,
+            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
+        )
+
+    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
+        """Sign a documentent and post it to AP server
+
+        @param url: AP server endpoint
+        @param url_actor: URL generated by this gateway for local actor
+        @param doc: document to send
+        """
+        p_url = parse.urlparse(url)
+        date = http.datetimeToString().decode()
+        body = json.dumps(doc).encode()
+        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
+        digest = f"sha-256={digest_hash}"
+        to_sign = (
+            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
+            f"date: {date}\ndigest: {digest}"
+        )
+        signature = base64.b64encode(self.private_key.sign(
+            to_sign.encode(),
+            # we have to use PKCS1v15 padding to be compatible with Mastodon
+            padding.PKCS1v15(),
+            hashes.SHA256()
+        )).decode()
+        h_signature = (
+            f'keyId="{url_actor}",headers="(request-target) host date digest",'
+            f'signature="{signature}"'
+        )
+        return await treq.post(
+            url,
+            body,
+            headers={
+                "Host": [p_url.hostname],
+                "Date": [date],
+                "Digest": [digest],
+                "Signature": [h_signature],
+            }
+        )
+
+    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
+        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
+        service = jid.JID(service_s)
+        client = self.host.getClient(profile)
+        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
+
+    async def getAPActorData(self, account: str) -> dict:
+        """Retrieve ActivityPub Actor data
+
+        @param account: ActivityPub Actor identifier
+        """
+        if account.count("@") != 1 or "/" in account:
+            raise ValueError("Invalid account: {account!r}")
+        host = account.split("@")[1]
+        try:
+            finger_data = await treq.json_content(await treq.get(
+                f"https://{host}/.well-known/webfinger?"
+                f"resource=acct:{parse.quote_plus(account)}",
+            ))
+        except Exception as e:
+            raise exceptions.DataError(f"Can't get webfinger data: {e}")
+        for link in finger_data.get("links", []):
+            if (
+                link.get("type") == "application/activity+json"
+                and link.get("rel") == "self"
+            ):
+                href = link.get("href", "").strip()
+                if not href:
+                    raise ValueError(
+                        f"Invalid webfinger data for {account:r}: missing href"
+                    )
+                break
+        else:
+            raise ValueError(
+                f"No ActivityPub link found for {account!r}"
+            )
+        try:
+            ap_actor_data = await treq.json_content(await treq.get(
+                href,
+                headers = {
+                    "Accept": [MEDIA_TYPE_AP],
+                    "Content-Type": [MEDIA_TYPE_AP],
+                }
+            ))
+        except Exception as e:
+            raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}")
+
+        return ap_actor_data
+
+    async def publishMessage(
+        self,
+        client: SatXMPPEntity,
+        mess_data: dict,
+        service: jid.JID
+    ) -> None:
+        """Send an AP message
+
+        .. note::
+
+            This is a temporary method used for development only
+
+        @param mess_data: message data. Following keys must be set:
+
+            ``node``
+              identifier of message which is being replied (this will
+              correspond to pubsub node in the future)
+
+            ``content_xhtml`` or ``content``
+              message body (respectively in XHTML or plain text)
+
+        @param service: JID corresponding to the AP actor.
+        """
+        if not service.user:
+            raise ValueError("service must have a local part")
+        account = self.host.plugins["XEP-0106"].unescape(service.user)
+        ap_actor_data = await self.getAPActorData(account)
+
+        try:
+            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
+        except KeyError:
+            raise exceptions.DataError("Can't get ActivityPub actor inbox")
+
+        if not mess_data.get("id"):
+            mess_data["id"] = shortuuid.uuid()
+        url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost())
+        url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"])
+        now = time.time()
+        item_data = {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url_item,
+            "type": "Create",
+            "actor": url_actor,
+
+            "object": {
+                "id": url_item,
+                "type": "Note",
+                "published": utils.xmpp_date(now),
+                "attributedTo": url_actor,
+                "inReplyTo": mess_data["node"],
+                "content": mess_data.get("content_xhtml") or mess_data["content"],
+                "to": "https://www.w3.org/ns/activitystreams#Public"
+            }
+        }
+        resp = await self.signAndPost(inbox_url, url_actor, item_data)
+        if resp.code == 202:
+            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")