view sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3753:10b71e3526bd

core (memory/sqla): add attribute to filter on `item_ids` in `getItems`
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 17:58:51 +0200
parents a8c7e5cef0cb
children 125c7043b277
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
from typing import Optional, Dict, List
import json
from urllib import parse
from collections import deque
import unicodedata
from pprint import pformat

from twisted.web import http, resource as web_resource, server
from twisted.internet import reactor, defer
from twisted.words.protocols.jabber import jid, error
from wokkel import pubsub, rsm

from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.tools.common import date_utils
from sat.memory.sqla_mapping import SubscriptionState

from .constants import (
    CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
    AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
    SIGN_HEADERS, HS2019, SIGN_EXP
)
from .regex import RE_SIG_PARAM


log = getLogger(__name__)

VERSION = unicodedata.normalize(
    'NFKD',
    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
)


class HTTPAPGServer(web_resource.Resource):
    """HTTP Server handling ActivityPub S2S protocol"""
    isLeaf = True

    def __init__(self, ap_gateway):
        self.apg = ap_gateway
        self._seen_digest = deque(maxlen=50)
        super().__init__()

    def responseCode(
        self,
        request: "HTTPRequest",
        http_code: int,
        msg: Optional[str] = None
    ) -> None:
        """Log and set HTTP return code and associated message"""
        log.warning(msg)
        request.setResponseCode(http_code, None if msg is None else msg.encode())

    async def webfinger(self, request):
        url_parsed = parse.urlparse(request.uri.decode())
        query = parse.parse_qs(url_parsed.query)
        resource = query.get("resource", [""])[0]
        account = resource[5:].strip()
        if not resource.startswith("acct:") or not account:
            return web_resource.ErrorPage(
                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
            ).render(request)

        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)

        resp = {
            "subject": resource,
            "links": [
                {
                    "rel": "self",
                    "type": "application/activity+json",
                    "href": actor_url
                }
            ]
        }
        request.setHeader("content-type", CONTENT_TYPE_AP)
        request.write(json.dumps(resp).encode())
        request.finish()

    async def handleFollowActivity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.getVirtualClient(signing_actor)
        try:
            subscription = await self.apg._p.subscribe(
                client,
                account_jid,
                node
            )
        except pubsub.SubscriptionPending:
            log.info(f"subscription to node {node!r} of {account_jid} is pending")
        # TODO: manage SubscriptionUnconfigured
        else:
            if subscription.state != "subscribed":
                # other states should raise an Exception
                raise exceptions.InternalError('"subscribed" state was expected')
            inbox = await self.apg.getAPInboxFromId(signing_actor)
            actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
            accept_data = self.apg.createActivity(
                "Accept", actor_id, object_=data
            )
            await self.apg.signAndPost(inbox, actor_id, accept_data)

    async def handleAcceptActivity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.getVirtualClient(signing_actor)
        objects = await self.apg.apGetList(data, "object")
        for obj in objects:
            type_ = obj.get("type")
            if type_ == "Follow":
                follow_node = await self.apg.host.memory.storage.getPubsubNode(
                    client, client.jid, node, with_subscriptions=True
                )
                if follow_node is None:
                    log.warning(
                        f"Received a follow accept on an unknown node: {node!r} at "
                        f"{client.jid}. Ignoring it"
                    )
                    continue
                try:
                    sub = next(
                        s for s in follow_node.subscriptions if s.subscriber==account_jid
                    )
                except StopIteration:
                    log.warning(
                        "Received a follow accept on a node without subscription: "
                        f"{node!r} at {client.jid}. Ignoring it"
                    )
                else:
                    if sub.state == SubscriptionState.SUBSCRIBED:
                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
                    elif sub.state == SubscriptionState.PENDING:
                        follow_node.subscribed = True
                        sub.state = SubscriptionState.SUBSCRIBED
                        await self.apg.host.memory.storage.add(follow_node)
                    else:
                        raise exceptions.InternalError(
                            f"Unhandled subscription state {sub.state!r}"
                        )
            else:
                log.warning(f"Unmanaged accept type: {type_!r}")

    async def handleCreateActivity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ):
        digest = request.getHeader("digest")
        if digest in self._seen_digest:
            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
            return
        self._seen_digest.append(digest)
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.getVirtualClient(signing_actor)
        objects = await self.apg.apGetList(data, "object")
        for obj in objects:
            sender = await self.apg.apGetSenderActor(obj)
            if sender != signing_actor:
                log.warning(
                    "Ignoring object not attributed to signing actor: {obj}"
                )
            else:
                await self.apg.newAPItem(client, account_jid, node, obj)

    async def APActorRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        actor_url: str,
        signing_actor: Optional[str]
    ) -> dict:
        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
        shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)

        # we have to use AP account as preferredUsername because it is used to retrieve
        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
        preferred_username = ap_account.split("@", 1)[0]
        return {
            "@context": [
                "https://www.w3.org/ns/activitystreams",
                "https://w3id.org/security/v1"
            ],

            "id": actor_url,
            "type": "Person",
            "preferredUsername": preferred_username,
            "inbox": inbox_url,
            "outbox": outbox_url,
            "publicKey": {
                "id": f"{actor_url}#main-key",
                "owner": actor_url,
                "publicKeyPem": self.apg.public_key_pem
            },
            "endpoints": {
                "sharedInbox": shared_inbox
            },
        }

    def getCanonicalURL(self, request: "HTTPRequest") -> str:
        return parse.urljoin(
            f"https://{self.apg.public_url}",
            request.path.decode().rstrip("/")
        )

    def queryData2RSMRequest(
        self,
        query_data: Dict[str, List[str]]
    ) -> rsm.RSMRequest:
        """Get RSM kwargs to use with RSMRequest from query data"""
        page = query_data.get("page")

        if page == ["first"]:
            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
        elif page == ["last"]:
            return rsm.RSMRequest(max_=PAGE_SIZE)
        else:
            for query_key in ("index", "before", "after"):
                try:
                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
                except (KeyError, IndexError, ValueError):
                    pass
                else:
                    return rsm.RSMRequest(**kwargs)
        raise ValueError(f"Invalid query data: {query_data!r}")

    async def APOutboxPageRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        query_data: Dict[str, List[str]]
    ) -> dict:
        # we only keep useful keys, and sort to have consistent URL which can
        # be used as ID
        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
        query_data = {k: query_data[k] for k in url_keys}
        try:
            items, metadata = await self.apg._p.getItems(
                client=self.apg.client,
                service=account_jid,
                node=node,
                rsm_request=self.queryData2RSMRequest(query_data),
                extra = {C.KEY_USE_CACHE: False}
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}

        base_url = self.getCanonicalURL(request)
        url = f"{base_url}?{parse.urlencode(query_data, True)}"
        data = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url,
            "type": "OrderedCollectionPage",
            "partOf": base_url,
            "orderedItems" : [
                await self.apg.mbdata2APitem(
                    self.apg.client,
                    await self.apg._m.item2mbdata(
                        self.apg.client,
                        item,
                        account_jid,
                        node
                    )
                )
                for item in reversed(items)
            ]
        }

        # AP OrderedCollection must be in reversed chronological order, thus the opposite
        # of what we get with RSM (at least with Libervia Pubsub)
        if not metadata["complete"]:
            try:
                last= metadata["rsm"]["last"]
            except KeyError:
                last = None
            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
        if metadata["rsm"]["index"] != 0:
            try:
                first= metadata["rsm"]["first"]
            except KeyError:
                first = None
            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"

        return data

    async def APOutboxRequest(
        self,
        request: "HTTPRequest",
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: Optional[str]
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace

        parsed_url = parse.urlparse(request.uri.decode())
        query_data = parse.parse_qs(parsed_url.query)
        if query_data:
            return await self.APOutboxPageRequest(
                request, account_jid, node, ap_account, ap_url, query_data
            )

        # XXX: we can't use disco#info here because this request won't work on a bare jid
        # due to security considerations of XEP-0030 (we don't have presence
        # subscription).
        # The current workaround is to do a request as if RSM was available, and actually
        # check its availability according to result.
        try:
            __, metadata = await self.apg._p.getItems(
                client=self.apg.client,
                service=account_jid,
                node=node,
                max_items=0,
                rsm_request=rsm.RSMRequest(max_=0),
                extra = {C.KEY_USE_CACHE: False}
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}
        try:
            items_count = metadata["rsm"]["count"]
        except KeyError:
            log.warning(
                f"No RSM metadata found when requesting pubsub node {node} at "
                f"{account_jid}, defaulting to items_count=20"
            )
            items_count = 20

        url = self.getCanonicalURL(request)
        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
        return {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url,
            "totalItems": items_count,
            "type": "OrderedCollection",
            "first": url_first_page,
            "last": url_last_page,
        }

    async def APInboxRequest(
        self,
        request: "HTTPRequest",
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str]
    ) -> None:
        if signing_actor is None:
            raise exceptions.InternalError("signing_actor must be set for inbox requests")
        if node is None:
            node = self.apg._m.namespace
        try:
            data = json.load(request.content)
            if not isinstance(data, dict):
                raise ValueError("data should be an object")
        except (json.JSONDecodeError, ValueError) as e:
            return self.responseCode(
                request,
                http.BAD_REQUEST,
                f"invalid json in inbox request: {e}"
            )
        await self.checkSigningActor(data, signing_actor)
        activity_type = (data.get("type") or "").lower()
        if not activity_type in ACTIVITY_TYPES_LOWER:
            return self.responseCode(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"request is not an activity, ignoring"
            )

        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
            return self.responseCode(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()!r} activity must target an account"
            )

        try:
            method = getattr(self, f"handle{activity_type.title()}Activity")
        except AttributeError:
            return self.responseCode(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()} activity is not yet supported"
            )
        else:
            await method(
                request, data, account_jid, node, ap_account, ap_url, signing_actor
            )

    async def APRequest(
        self,
        request: "HTTPRequest",
        signing_actor: Optional[str] = None
    ) -> None:
        path = request.path.decode()
        ap_url = parse.urljoin(
            f"https://{self.apg.public_url}",
            path
        )
        request_type, extra_args = self.apg.parseAPURL(ap_url)
        if len(extra_args) == 0:
            if request_type != "shared_inbox":
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            ret_data = await self.APInboxRequest(
                request, None, None, None, ap_url, signing_actor
            )
        else:
            if len(extra_args) > 1:
                log.warning(f"unexpected extra arguments: {extra_args!r}")
            ap_account = extra_args[0]
            account_jid, node = await self.apg.getJIDAndNode(ap_account)
            if request_type not in AP_REQUEST_TYPES.get(
                    request.method.decode().upper(), []
            ):
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            method = getattr(self, f"AP{request_type.title()}Request")
            ret_data = await method(
                request, account_jid, node, ap_account, ap_url, signing_actor
            )
        if ret_data is not None:
            request.setHeader("content-type", CONTENT_TYPE_AP)
            request.write(json.dumps(ret_data).encode())
        request.finish()

    async def APPostRequest(self, request: "HTTPRequest"):
        try:
            signing_actor = await self.checkSignature(request)
        except exceptions.EncryptionError as e:
            self.responseCode(
                request,
                http.FORBIDDEN,
                f"invalid signature: {e}"
            )
            request.finish()
            return

        return await self.APRequest(request, signing_actor)

    async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
        """That that signing actor correspond to actor declared in data

        @param data: request payload
        @param signing_actor: actor ID of the signing entity, as returned by
            checkSignature
        @raise exceptions.NotFound: no actor found in data
        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
        """
        actor = await self.apg.apGetSenderActor(data)

        if signing_actor != actor:
            raise exceptions.EncryptionError(
                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
            )

    async def checkSignature(self, request: "HTTPRequest") -> str:
        """Check and validate HTTP signature

        @return: id of the signing actor

        @raise exceptions.EncryptionError: signature is not present or doesn't match
        """
        signature = request.getHeader("Signature")
        if signature is None:
            raise exceptions.EncryptionError("No signature found")
        sign_data = {
            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
            for m in RE_SIG_PARAM.finditer(signature)
        }
        try:
            key_id = sign_data["keyId"]
        except KeyError:
            raise exceptions.EncryptionError('"keyId" is missing from signature')
        algorithm = sign_data.get("algorithm", HS2019)
        signed_headers = sign_data.get(
            "headers",
            "(created)" if algorithm==HS2019 else "date"
        ).lower().split()
        try:
            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
        except KeyError:
            raise exceptions.InternalError(
                f"there should be a list of headers for {request.method} method"
            )
        if not headers_to_check:
            raise exceptions.InternalError("headers_to_check must not be empty")

        for header in headers_to_check:
            if isinstance(header, tuple):
                if len(set(header).intersection(signed_headers)) == 0:
                    raise exceptions.EncryptionError(
                        f"at least one of following header must be signed: {header}"
                    )
            elif header not in signed_headers:
                raise exceptions.EncryptionError(
                    f"the {header!r} header must be signed"
                )

        body = request.content.read()
        request.content.seek(0)
        headers = {}
        for to_sign in signed_headers:
            if to_sign == "(request-target)":
                method = request.method.decode().lower()
                uri = parse.unquote(request.uri.decode())
                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
            elif to_sign in ("(created)", "(expires)"):
                if algorithm != HS2019:
                    raise exceptions.EncryptionError(
                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
                        "algorithm"
                    )
                key = to_sign[1:-1]
                value = sign_data.get(key)
                if not value:
                    raise exceptions.EncryptionError(
                        "{key!r} parameter is missing from signature"
                    )
                try:
                    if float(value) < 0:
                        raise ValueError
                except ValueError:
                    raise exceptions.EncryptionError(
                        f"{to_sign} must be a Unix timestamp"
                    )
                headers[to_sign] = value
            else:
                value = request.getHeader(to_sign)
                if not value:
                    raise exceptions.EncryptionError(
                        f"value of header {to_sign!r} is missing!"
                    )
                elif to_sign == "host":
                    # we check Forwarded/X-Forwarded-Host headers
                    # as we need original host if a proxy has modified the header
                    forwarded = request.getHeader("forwarded")
                    if forwarded is not None:
                        try:
                            host = [
                                f[5:] for f in forwarded.split(";")
                                if f.startswith("host=")
                            ][0] or None
                        except IndexError:
                            host = None
                    else:
                        host = None
                    if host is None:
                        host = request.getHeader("x-forwarded-host")
                    if host:
                        value = host
                elif to_sign == "digest":
                    hashes = {
                        algo.lower(): hash_ for algo, hash_ in (
                            digest.split("=", 1) for digest in value.split(",")
                        )
                    }
                    try:
                        given_digest = hashes["sha-256"]
                    except KeyError:
                        raise exceptions.EncryptionError(
                            "Only SHA-256 algorithm is currently supported for digest"
                        )
                    __, computed_digest = self.apg.getDigest(body)
                    if given_digest != computed_digest:
                        raise exceptions.EncryptionError(
                            f"SHA-256 given and computed digest differ:\n"
                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
                        )
                headers[to_sign] = value

        # date check
        limit_ts = time.time() + SIGN_EXP
        if "(created)" in headers:
            created = float(headers["created"])
        else:
            created = date_utils.date_parse(headers["date"])


        try:
            expires = float(headers["expires"])
        except KeyError:
            pass
        else:
            if expires < created:
                log.warning(
                    f"(expires) [{expires}] set in the past of (created) [{created}] "
                    "ignoring it according to specs"
                )
            else:
                limit_ts = min(limit_ts, expires)

        if created > limit_ts:
            raise exceptions.EncryptionError("Signature has expired")

        return await self.apg.checkSignature(
            sign_data["signature"],
            key_id,
            headers
        )

    def render(self, request):
        request.setHeader("server", VERSION)
        return super().render(request)

    def render_GET(self, request):
        path = request.path.decode().lstrip("/")
        if path.startswith(".well-known/webfinger"):
            defer.ensureDeferred(self.webfinger(request))
            return server.NOT_DONE_YET
        elif path.startswith(self.apg.ap_path):
            defer.ensureDeferred(self.APRequest(request))
            return server.NOT_DONE_YET

        return web_resource.NoResource().render(request)

    def render_POST(self, request):
        path = request.path.decode().lstrip("/")
        if not path.startswith(self.apg.ap_path):
            return web_resource.NoResource().render(request)
        defer.ensureDeferred(self.APPostRequest(request))
        return server.NOT_DONE_YET


class HTTPRequest(server.Request):
    pass


class HTTPServer(server.Site):
    requestFactory = HTTPRequest

    def __init__(self, ap_gateway):
        super().__init__(HTTPAPGServer(ap_gateway))