view sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children a8c7e5cef0cb
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Dict, List
import json
from urllib import parse
import re
import unicodedata

from twisted.web import http, resource as web_resource, server
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from wokkel import pubsub, rsm

from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger

from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX,
                       AP_REQUEST_TYPES, PAGE_SIZE)


log = getLogger(__name__)

VERSION = unicodedata.normalize(
    'NFKD',
    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
)


class HTTPAPGServer(web_resource.Resource):
    """HTTP Server handling ActivityPub S2S protocol"""
    isLeaf = True

    def __init__(self, ap_gateway):
        self.apg = ap_gateway
        super().__init__()

    async def webfinger(self, request):
        url_parsed = parse.urlparse(request.uri.decode())
        query = parse.parse_qs(url_parsed.query)
        resource = query.get("resource", [""])[0]
        account = resource[5:].strip()
        if not resource.startswith("acct:") or not account:
            return web_resource.ErrorPage(
                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
            ).render(request)

        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)

        resp = {
            "subject": resource,
            "links": [
                {
                    "rel": "self",
                    "type": "application/activity+json",
                    "href": actor_url
                }
            ]
        }
        request.setHeader("content-type", CONTENT_TYPE_AP)
        request.write(json.dumps(resp).encode())
        request.finish()

    async def APActorRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        actor_url: str
    ) -> dict:
        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)

        # we have to use AP account as preferredUsername because it is used to retrieve
        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
        preferred_username = ap_account.split("@", 1)[0]
        return {
            "@context": [
                "https://www.w3.org/ns/activitystreams",
                "https://w3id.org/security/v1"
            ],

            "id": actor_url,
            "type": "Person",
            "preferredUsername": preferred_username,
            "inbox": inbox_url,
            "outbox": outbox_url,
            "publicKey": {
                "id": f"{actor_url}#main-key",
                "owner": actor_url,
                "publicKeyPem": self.apg.public_key_pem
            }
        }

    def getCanonicalURL(self, request: "HTTPRequest") -> str:
        return parse.urljoin(
            f"https://{self.apg.public_url}",
            request.path.decode().rstrip("/")
        )

    def queryData2RSMRequest(
        self,
        query_data: Dict[str, List[str]]
    ) -> rsm.RSMRequest:
        """Get RSM kwargs to use with RSMRequest from query data"""
        page = query_data.get("page")

        if page == ["first"]:
            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
        elif page == ["last"]:
            return rsm.RSMRequest(max_=PAGE_SIZE)
        else:
            for query_key in ("index", "before", "after"):
                try:
                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
                except (KeyError, IndexError, ValueError):
                    pass
                else:
                    return rsm.RSMRequest(**kwargs)
        raise ValueError(f"Invalid query data: {query_data!r}")

    async def APOutboxPageRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        query_data: Dict[str, List[str]]
    ) -> dict:
        # we only keep useful keys, and sort to have consistent URL which can
        # be used as ID
        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
        query_data = {k: query_data[k] for k in url_keys}
        try:
            items, metadata = await self.apg._p.getItems(
                client=self.apg.client,
                service=account_jid,
                node=node,
                rsm_request=self.queryData2RSMRequest(query_data),
                extra = {C.KEY_USE_CACHE: False}
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}

        base_url = self.getCanonicalURL(request)
        url = f"{base_url}?{parse.urlencode(query_data, True)}"
        data = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url,
            "type": "OrderedCollectionPage",
            "partOf": base_url,
            "orderedItems" : [
                await self.apg.mbdata2APitem(
                    self.apg.client,
                    await self.apg._m.item2mbdata(
                        self.apg.client,
                        item,
                        account_jid,
                        node
                    )
                )
                for item in reversed(items)
            ]
        }

        # AP OrderedCollection must be in reversed chronological order, thus the opposite
        # of what we get with RSM (at least with Libervia Pubsub)
        if not metadata["complete"]:
            try:
                last= metadata["rsm"]["last"]
            except KeyError:
                last = None
            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
        if metadata["rsm"]["index"] != 0:
            try:
                first= metadata["rsm"]["first"]
            except KeyError:
                first = None
            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"

        return data

    async def APOutboxRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace

        parsed_url = parse.urlparse(request.uri.decode())
        query_data = parse.parse_qs(parsed_url.query)
        if query_data:
            return await self.APOutboxPageRequest(
                request, account_jid, node, ap_account, ap_url, query_data
            )

        # XXX: we can't use disco#info here because this request won't work on a bare jid
        # due to security considerations of XEP-0030 (we don't have presence
        # subscription).
        # The current workaround is to do a request as if RSM was available, and actually
        # check its availability according to result.
        try:
            __, metadata = await self.apg._p.getItems(
                client=self.apg.client,
                service=account_jid,
                node=node,
                max_items=0,
                rsm_request=rsm.RSMRequest(max_=0)
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}
        try:
            items_count = metadata["rsm"]["count"]
        except KeyError:
            log.warning(
                f"No RSM metadata found when requesting pubsub node {node} at "
                f"{account_jid}, defaulting to items_count=20"
            )
            items_count = 20

        url = self.getCanonicalURL(request)
        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
        return {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url,
            "totalItems": items_count,
            "type": "OrderedCollection",
            "first": url_first_page,
            "last": url_last_page,
        }

    async def APRequest(self, request):
        path = request.path.decode()
        ap_url = parse.urljoin(
            f"https://{self.apg.public_url}",
            path
        )
        request_type, ap_account = self.apg.parseAPURL(ap_url)
        account_jid, node = await self.apg.getJIDAndNode(ap_account)
        if request_type not in AP_REQUEST_TYPES:
            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
        method = getattr(self, f"AP{request_type.title()}Request")
        ret_data = await method(request, account_jid, node, ap_account, ap_url)
        request.setHeader("content-type", CONTENT_TYPE_AP)
        request.write(json.dumps(ret_data).encode())
        request.finish()

    def render(self, request):
        request.setHeader("server", VERSION)
        return super().render(request)

    def render_GET(self, request):
        path = request.path.decode().lstrip("/")
        if path.startswith(".well-known/webfinger"):
            defer.ensureDeferred(self.webfinger(request))
            return server.NOT_DONE_YET
        elif path.startswith(self.apg.ap_path):
            defer.ensureDeferred(self.APRequest(request))
            return server.NOT_DONE_YET

        return web_resource.NoResource().render(request)


class HTTPRequest(server.Request):
    pass


class HTTPServer(server.Site):
    requestFactory = HTTPRequest

    def __init__(self, ap_gateway):
        super().__init__(HTTPAPGServer(ap_gateway))