view sat/plugins/plugin_comp_ap_gateway.py @ 3682:7c990aaa49d3

comp AP Gateway: ActivityPub Component first draft: this implement the base component, it is for the moment usable only through a developer API. rel: 362
author Goffi <goffi@goffi.org>
date Sun, 26 Sep 2021 16:41:55 +0200
parents
children 8353cc3b8db9
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
import json
import base64
import hashlib
from urllib import parse
from typing import Tuple
from pathlib import Path
import shortuuid
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from twisted.internet import reactor, threads, defer
from twisted.web import server, resource as web_resource, http
from twisted.words.protocols.jabber import jid
import treq
from treq.response import _Response as TReqResponse
from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core import exceptions
from sat.core.log import getLogger
from sat.core.core_types import SatXMPPEntity
from sat.tools.common import tls, data_format
from sat.tools import utils


log = getLogger(__name__)

IMPORT_NAME = "ap-gateway"

PLUGIN_INFO = {
    C.PI_NAME: "ActivityPub Gateway component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "APGateway",
    C.PI_HANDLER: C.BOOL_FALSE,
    C.PI_DESCRIPTION: _(
        "Gateway for bidirectional communication between XMPP and ActivityPub."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"
CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
TYPE_ACTOR = "actor"
TYPE_INBOX = "inbox"
TYPE_ITEM = "item"
MEDIA_TYPE_AP = "application/activity+json"


class HTTPAPGServer(web_resource.Resource):
    """HTTP Server handling ActivityPub S2S protocol"""
    isLeaf = True

    def __init__(self, ap_gateway):
        self.apg = ap_gateway
        super().__init__()

    def webfinger(self, request):
        url_parsed = parse.urlparse(request.uri.decode())
        query = parse.parse_qs(url_parsed.query)
        resource = query.get("resource", [""])[0]
        account = resource[5:].strip()
        log.info(f"request pour {account}")
        if not resource.startswith("acct:") or not account:
            return web_resource.ErrorPage(
                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
            ).render(request)

        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)

        resp = {
            "subject": resource,
            "links": [
                {
                    "rel": "self",
                    "type": "application/activity+json",
                    "href": actor_url
                }
            ]
        }
        request.setHeader("content-type", CONTENT_TYPE_AP)
        return json.dumps(resp).encode()

    def APRequest(self, request):
        path = request.path.decode()
        actor_url = parse.urljoin(
            f"https://{self.apg.public_url}",
            path
        )
        __, account = self.apg.parseAPURL(actor_url)
        inbox_url = self.apg.buildAPURL(TYPE_INBOX, account)
        username = account.split("@", 1)[0]
        actor = {
            "@context": [
                "https://www.w3.org/ns/activitystreams",
                "https://w3id.org/security/v1"
            ],

            "id": actor_url,
            "type": "Person",
            "preferredUsername": username,
            "inbox": inbox_url,

            "publicKey": {
                "id": f"{actor_url}#main-key",
                "owner": actor_url,
                "publicKeyPem": self.apg.public_key_pem
            }
        }
        request.setHeader("content-type", CONTENT_TYPE_AP)
        return json.dumps(actor).encode()

    def render_GET(self, request):
        path = request.path.decode().lstrip("/")
        if path.startswith(".well-known/webfinger"):
            return self.webfinger(request)
        elif path.startswith(self.apg.ap_path):
            return self.APRequest(request)
        return web_resource.NoResource().render(request)


class HTTPRequest(server.Request):
    pass


class HTTPServer(server.Site):
    requestFactory = HTTPRequest

    def __init__(self, ap_gateway):
        super().__init__(HTTPAPGServer(ap_gateway))


class APGateway:

    def __init__(self, host):
        self.host = host
        self.initialised = False

        host.bridge.addMethod(
            "APSend",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._publishMessage,
            async_=True,
        )

    async def init(self, client):
        if self.initialised:
            return

        self.initialised = True
        log.info(_("ActivityPub Gateway initialization"))

        # RSA keys
        stored_data = await self.host.memory.storage.getPrivates(
            IMPORT_NAME, ["rsa_key"], profile=client.profile
        )
        private_key_pem = stored_data.get("rsa_key")
        if private_key_pem is None:
            self.private_key = await threads.deferToThread(
                rsa.generate_private_key,
                public_exponent=65537,
                key_size=4096,
            )
            private_key_pem = self.private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            ).decode()
            await self.host.memory.storage.setPrivateValue(
                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
            )
        else:
            self.private_key = serialization.load_pem_private_key(
                private_key_pem.encode(),
                password=None,
            )
        self.public_key = self.private_key.public_key()
        self.public_key_pem = self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ).decode()

        # params (URL and port)
        self.public_url = self.host.memory.getConfig(
            CONF_SECTION, "public_url"
        ) or self.host.memory.getConfig(
            CONF_SECTION, "xmpp_domain"
        )
        if self.public_url is None:
            log.error(
                '"public_url" not set in configuration, this is mandatory to have'
                "ActivityPub Gateway running. Please set this option it to public facing "
                f"url in {CONF_SECTION!r} configuration section."
            )
            return
        if parse.urlparse(self.public_url).scheme:
            log.error(
                "Scheme must not be specified in \"public_url\", please remove it from "
                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
            )
            return
        self.http_port = int(self.host.memory.getConfig(
            CONF_SECTION, 'http_port', 8123))
        connection_type = self.host.memory.getConfig(
            CONF_SECTION, 'http_connection_type', 'https')
        if connection_type not in ('http', 'https'):
            raise exceptions.ConfigError(
                'bad ap-gateay http_connection_type, you must use one of "http" or '
                '"https"'
            )
        self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")

        # HTTP server launch
        self.server = HTTPServer(self)
        if connection_type == 'http':
            reactor.listenTCP(self.http_port, self.server)
        else:
            options = tls.getOptionsFromConfig(
                self.host.memory.config, CONF_SECTION)
            tls.TLSOptionsCheck(options)
            context_factory = tls.getTLSContextFactory(options)
            reactor.listenSSL(self.http_port, self.server, context_factory)

    async def profileConnecting(self, client):
        await self.init(client)

    def parseAPURL(self, url: str) -> Tuple[str, str]:
        """Parse an URL leading to an AP endpoint

        @param url: URL to parse (schema is not mandatory)
        @return: endpoint type and AP account
        """
        path = parse.urlparse(url).path.lstrip("/")
        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
        return type_, parse.unquote(account)

    def buildAPURL(self, type_:str , *args: str) -> str:
        """Build an AP endpoint URL

        @param type_: type of AP endpoing
        @param arg: endpoint dependant arguments
        """
        return parse.urljoin(
            self.base_ap_url,
            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
        )

    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
        """Sign a documentent and post it to AP server

        @param url: AP server endpoint
        @param url_actor: URL generated by this gateway for local actor
        @param doc: document to send
        """
        p_url = parse.urlparse(url)
        date = http.datetimeToString().decode()
        body = json.dumps(doc).encode()
        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
        digest = f"sha-256={digest_hash}"
        to_sign = (
            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
            f"date: {date}\ndigest: {digest}"
        )
        signature = base64.b64encode(self.private_key.sign(
            to_sign.encode(),
            # we have to use PKCS1v15 padding to be compatible with Mastodon
            padding.PKCS1v15(),
            hashes.SHA256()
        )).decode()
        h_signature = (
            f'keyId="{url_actor}",headers="(request-target) host date digest",'
            f'signature="{signature}"'
        )
        return await treq.post(
            url,
            body,
            headers={
                "Host": [p_url.hostname],
                "Date": [date],
                "Digest": [digest],
                "Signature": [h_signature],
            }
        )

    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
        service = jid.JID(service_s)
        client = self.host.getClient(profile)
        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))

    async def getAPActorData(self, account: str) -> dict:
        """Retrieve ActivityPub Actor data

        @param account: ActivityPub Actor identifier
        """
        if account.count("@") != 1 or "/" in account:
            raise ValueError("Invalid account: {account!r}")
        host = account.split("@")[1]
        try:
            finger_data = await treq.json_content(await treq.get(
                f"https://{host}/.well-known/webfinger?"
                f"resource=acct:{parse.quote_plus(account)}",
            ))
        except Exception as e:
            raise exceptions.DataError(f"Can't get webfinger data: {e}")
        for link in finger_data.get("links", []):
            if (
                link.get("type") == "application/activity+json"
                and link.get("rel") == "self"
            ):
                href = link.get("href", "").strip()
                if not href:
                    raise ValueError(
                        f"Invalid webfinger data for {account:r}: missing href"
                    )
                break
        else:
            raise ValueError(
                f"No ActivityPub link found for {account!r}"
            )
        try:
            ap_actor_data = await treq.json_content(await treq.get(
                href,
                headers = {
                    "Accept": [MEDIA_TYPE_AP],
                    "Content-Type": [MEDIA_TYPE_AP],
                }
            ))
        except Exception as e:
            raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}")

        return ap_actor_data

    async def publishMessage(
        self,
        client: SatXMPPEntity,
        mess_data: dict,
        service: jid.JID
    ) -> None:
        """Send an AP message

        .. note::

            This is a temporary method used for development only

        @param mess_data: message data. Following keys must be set:

            ``node``
              identifier of message which is being replied (this will
              correspond to pubsub node in the future)

            ``content_xhtml`` or ``content``
              message body (respectively in XHTML or plain text)

        @param service: JID corresponding to the AP actor.
        """
        if not service.user:
            raise ValueError("service must have a local part")
        account = self.host.plugins["XEP-0106"].unescape(service.user)
        ap_actor_data = await self.getAPActorData(account)

        try:
            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
        except KeyError:
            raise exceptions.DataError("Can't get ActivityPub actor inbox")

        if not mess_data.get("id"):
            mess_data["id"] = shortuuid.uuid()
        url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost())
        url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"])
        now = time.time()
        item_data = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url_item,
            "type": "Create",
            "actor": url_actor,

            "object": {
                "id": url_item,
                "type": "Note",
                "published": utils.xmpp_date(now),
                "attributedTo": url_actor,
                "inReplyTo": mess_data["node"],
                "content": mess_data.get("content_xhtml") or mess_data["content"],
                "to": "https://www.w3.org/ns/activitystreams#Public"
            }
        }
        resp = await self.signAndPost(inbox_url, url_actor, item_data)
        if resp.code == 202:
            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")