view libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4309:b56b1eae7994

component email gateway: add multicasting: XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033 metadata are converted to suitable Email headers and vice versa. Email address and JID are both supported, and delivery is done by the gateway when suitable on incoming messages. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
import html
from typing import Optional, Dict, List, Any
import json
from urllib import parse
from collections import deque
import unicodedata

from twisted.web import http, resource as web_resource, server
from twisted.web import static
from twisted.web import util as web_util
from twisted.python import failure
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from wokkel import pubsub, rsm

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import date_utils, uri
from libervia.backend.memory.sqla_mapping import SubscriptionState

from .constants import (
    NS_AP,
    MEDIA_TYPE_AP,
    MEDIA_TYPE_AP_ALT,
    CONTENT_TYPE_WEBFINGER,
    CONTENT_TYPE_AP,
    TYPE_ACTOR,
    TYPE_INBOX,
    TYPE_SHARED_INBOX,
    TYPE_OUTBOX,
    TYPE_EVENT,
    AP_REQUEST_TYPES,
    PAGE_SIZE,
    ACTIVITY_TYPES_LOWER,
    ACTIVIY_NO_ACCOUNT_ALLOWED,
    SIGN_HEADERS,
    HS2019,
    SIGN_EXP,
    TYPE_FOLLOWERS,
    TYPE_FOLLOWING,
    TYPE_ITEM,
    TYPE_LIKE,
    TYPE_REACTION,
    ST_AP_CACHE,
)
from .regex import RE_SIG_PARAM


log = getLogger(__name__)

VERSION = unicodedata.normalize(
    "NFKD", f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
)


class HTTPAPGServer(web_resource.Resource):
    """HTTP Server handling ActivityPub S2S protocol"""

    isLeaf = True

    def __init__(self, ap_gateway):
        self.apg = ap_gateway
        self._seen_digest = deque(maxlen=50)
        super().__init__()

    def response_code(
        self, request: "HTTPRequest", http_code: int, msg: Optional[str] = None
    ) -> None:
        """Log and set HTTP return code and associated message"""
        if msg is not None:
            log.warning(msg)
        request.setResponseCode(http_code, None if msg is None else msg.encode())

    def _on_request_error(
        self, failure_: failure.Failure, request: "HTTPRequest"
    ) -> None:
        exc = failure_.value
        if isinstance(exc, exceptions.NotFound):
            self.response_code(request, http.NOT_FOUND, str(exc))
        else:
            log.exception(f"Internal error: {failure_.value}")
            self.response_code(
                request, http.INTERNAL_SERVER_ERROR, f"internal error: {failure_.value}"
            )
            request.finish()
            raise failure_

        request.finish()

    async def webfinger(self, request):
        url_parsed = parse.urlparse(request.uri.decode())
        query = parse.parse_qs(url_parsed.query)
        resource = query.get("resource", [""])[0]
        account = resource[5:].strip()
        if not resource.startswith("acct:") or not account:
            return web_resource.ErrorPage(
                http.BAD_REQUEST, "Bad Request", "Invalid webfinger resource"
            ).render(request)

        actor_url = self.apg.build_apurl(TYPE_ACTOR, account)

        resp = {
            "aliases": [actor_url],
            "subject": resource,
            "links": [
                {"rel": "self", "type": "application/activity+json", "href": actor_url}
            ],
        }
        request.setHeader("content-type", CONTENT_TYPE_WEBFINGER)
        request.write(json.dumps(resp).encode())
        request.finish()

    async def handle_undo_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str,
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        object_ = data.get("object")
        if isinstance(object_, str):
            # we check first if it's not a cached object
            ap_cache_key = f"{ST_AP_CACHE}{object_}"
            value = await self.apg.client._ap_storage.get(ap_cache_key)
        else:
            value = None
        if value is not None:
            objects = [value]
            # because we'll undo the activity, we can remove it from cache
            await self.apg.client._ap_storage.remove(ap_cache_key)
        else:
            objects = await self.apg.ap_get_list(requestor_actor_id, data, "object")
        for obj in objects:
            type_ = obj.get("type")
            actor = await self.apg.ap_get_sender_actor(requestor_actor_id, obj)
            if actor != signing_actor:
                log.warning(f"ignoring object not attributed to signing actor: {data}")
                continue

            if type_ == "Follow":
                try:
                    target_account = obj["object"]
                except KeyError:
                    log.warning(f'ignoring invalid object, missing "object" key: {data}')
                    continue
                if not self.apg.is_local_url(target_account):
                    log.warning(f"ignoring unfollow request to non local actor: {data}")
                    continue
                await self.apg._p.unsubscribe(
                    client,
                    account_jid,
                    node,
                    sender=client.jid,
                )
            elif type_ == "Announce":
                # we can use directly the Announce object, as only the "id" field is
                # needed
                await self.apg.new_ap_delete_item(client, None, node, obj)
            elif type_ == TYPE_LIKE:
                await self.handle_attachment_item(client, obj, {"noticed": False})
            elif type_ == TYPE_REACTION:
                await self.handle_attachment_item(
                    client,
                    obj,
                    {"reactions": {"operation": "update", "remove": [obj["content"]]}},
                )
            else:
                log.warning(f"Unmanaged undo type: {type_!r}")

    async def handle_follow_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str,
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        try:
            subscription = await self.apg._p.subscribe(
                client,
                account_jid,
                node,
                # subscriptions from AP are always public
                options=self.apg._pps.set_public_opt(),
            )
        except pubsub.SubscriptionPending:
            log.info(f"subscription to node {node!r} of {account_jid} is pending")
        # TODO: manage SubscriptionUnconfigured
        else:
            if subscription.state != "subscribed":
                # other states should raise an Exception
                raise exceptions.InternalError('"subscribed" state was expected')
            inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False)
            actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account)
            accept_data = self.apg.create_activity("Accept", actor_id, object_=data)
            await self.apg.sign_and_post(inbox, actor_id, accept_data)
        await self.apg._c.synchronise(client, account_jid, node, resync=False)

    async def handle_accept_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str,
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        objects = await self.apg.ap_get_list(requestor_actor_id, data, "object")
        for obj in objects:
            type_ = obj.get("type")
            if type_ == "Follow":
                follow_node = await self.apg.host.memory.storage.get_pubsub_node(
                    client, client.jid, node, with_subscriptions=True
                )
                if follow_node is None:
                    log.warning(
                        f"Received a follow accept on an unknown node: {node!r} at "
                        f"{client.jid}. Ignoring it"
                    )
                    continue
                try:
                    sub = next(
                        s
                        for s in follow_node.subscriptions
                        if s.subscriber == account_jid
                    )
                except StopIteration:
                    log.warning(
                        "Received a follow accept on a node without subscription: "
                        f"{node!r} at {client.jid}. Ignoring it"
                    )
                else:
                    if sub.state == SubscriptionState.SUBSCRIBED:
                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
                    elif sub.state == SubscriptionState.PENDING:
                        follow_node.subscribed = True
                        sub.state = SubscriptionState.SUBSCRIBED
                        await self.apg.host.memory.storage.add(follow_node)
                    else:
                        raise exceptions.InternalError(
                            f"Unhandled subscription state {sub.state!r}"
                        )
            else:
                log.warning(f"Unmanaged accept type: {type_!r}")

    async def handle_delete_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ):
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        objects = await self.apg.ap_get_list(requestor_actor_id, data, "object")
        for obj in objects:
            await self.apg.new_ap_delete_item(client, account_jid, node, obj)

    async def handle_new_ap_items(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        signing_actor: str,
        repeated: bool = False,
    ):
        """Helper method to handle workflow for new AP items

        accept globally the same parameter as for handle_create_activity
        @param repeated: if True, the item is an item republished from somewhere else
        """
        if "_repeated" in data:
            log.error(
                '"_repeated" field already present in given AP item, this should not '
                f"happen. Ignoring object from {signing_actor}\n{data}"
            )
            raise exceptions.DataError("unexpected field in item")
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        objects = await self.apg.ap_get_list(requestor_actor_id, data, "object")
        for obj in objects:
            if node is None:
                if obj.get("type") == TYPE_EVENT:
                    node = self.apg._events.namespace
                else:
                    node = self.apg._m.namespace
            sender = await self.apg.ap_get_sender_actor(requestor_actor_id, obj)
            if repeated:
                # we don't check sender when item is repeated, as it should be different
                # from post author in this case
                sender_jid = await self.apg.get_jid_from_id(requestor_actor_id, sender)
                repeater_jid = await self.apg.get_jid_from_id(
                    requestor_actor_id, signing_actor
                )
                repeated_item_id = obj["id"]
                if self.apg.is_local_url(repeated_item_id):
                    # the repeated object is from XMPP, we need to parse the URL to find
                    # the right ID
                    url_type, url_args = self.apg.parse_apurl(repeated_item_id)
                    if url_type != "item":
                        raise exceptions.DataError(
                            "local URI is not an item: {repeated_id}"
                        )
                    try:
                        url_account, url_item_id = url_args
                        if not url_account or not url_item_id:
                            raise ValueError
                    except (RuntimeError, ValueError):
                        raise exceptions.DataError("local URI is invalid: {repeated_id}")
                    else:
                        url_jid, url_node = await self.apg.get_jid_and_node(url_account)
                        if (
                            url_jid != sender_jid
                            or url_node
                            and url_node != self.apg._m.namespace
                        ):
                            raise exceptions.DataError(
                                "announced ID doesn't match sender ({sender}): "
                                f"[repeated_item_id]"
                            )

                    repeated_item_id = url_item_id

                obj["_repeated"] = {
                    "by": repeater_jid.full(),
                    "at": data.get("published"),
                    "uri": uri.build_xmpp_uri(
                        "pubsub",
                        path=sender_jid.full(),
                        node=self.apg._m.namespace,
                        item=repeated_item_id,
                    ),
                }
                # we must use activity's id and targets, not the original item ones
                for field in ("id", "to", "bto", "cc", "bcc"):
                    obj[field] = data.get(field)
            else:
                if sender != signing_actor:
                    log.warning("Ignoring object not attributed to signing actor: {obj}")
                    continue

            await self.apg.new_ap_item(client, account_jid, node, obj)

    async def handle_create_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ):
        await self.handle_new_ap_items(
            requestor_actor_id, request, data, account_jid, node, signing_actor
        )

    async def handle_update_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ):
        # Update is the same as create: the item ID stays the same, thus the item will be
        # overwritten
        await self.handle_new_ap_items(
            requestor_actor_id, request, data, account_jid, node, signing_actor
        )

    async def handle_announce_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ):
        # we create a new item
        await self.handle_new_ap_items(
            requestor_actor_id,
            request,
            data,
            account_jid,
            node,
            signing_actor,
            repeated=True,
        )

    async def handle_attachment_item(
        self, client: SatXMPPEntity, data: dict, attachment_data: dict
    ) -> None:
        target_ids = data.get("object")
        if not target_ids:
            raise exceptions.DataError("object should be set")
        elif isinstance(target_ids, list):
            try:
                target_ids = [o["id"] for o in target_ids]
            except (KeyError, TypeError):
                raise exceptions.DataError(f"invalid object: {target_ids!r}")
        elif isinstance(target_ids, dict):
            obj_id = target_ids.get("id")
            if not obj_id or not isinstance(obj_id, str):
                raise exceptions.DataError(f"invalid object: {target_ids!r}")
            target_ids = [obj_id]
        elif isinstance(target_ids, str):
            target_ids = [target_ids]

        # XXX: we have to cache AP items because some implementation (Pleroma notably)
        #   don't keep object accessible, and we need to be able to retrieve them for
        #   UNDO. Current implementation will grow, we need to add a way to flush it after
        #   a while.
        # TODO: add a way to flush old cached AP items.
        await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data)

        for target_id in target_ids:
            if not self.apg.is_local_url(target_id):
                log.debug(f"ignoring non local target ID: {target_id}")
                continue
            url_type, url_args = self.apg.parse_apurl(target_id)
            if url_type != TYPE_ITEM:
                log.warning(f"unexpected local URL for attachment on item {target_id}")
                continue
            try:
                account, item_id = url_args
            except ValueError:
                raise ValueError(f"invalid URL: {target_id}")
            author_jid, item_node = await self.apg.get_jid_and_node(account)
            if item_node is None:
                item_node = self.apg._m.namespace
            attachment_node = self.apg._pa.get_attachment_node_name(
                author_jid, item_node, item_id
            )
            cached_node = await self.apg.host.memory.storage.get_pubsub_node(
                client, author_jid, attachment_node, with_subscriptions=True, create=True
            )
            found_items, __ = await self.apg.host.memory.storage.get_items(
                cached_node, item_ids=[client.jid.userhost()]
            )
            if not found_items:
                old_item_elt = None
            else:
                found_item = found_items[0]
                old_item_elt = found_item.data

            item_elt = await self.apg._pa.apply_set_handler(
                client, {"extra": attachment_data}, old_item_elt, None
            )
            # we reparse the element, as there can be other attachments
            attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt])
            # and we update the cache
            await self.apg.host.memory.storage.cache_pubsub_items(
                client, cached_node, [item_elt], attachments_data or [{}]
            )

            if self.apg.is_virtual_jid(author_jid):
                # the attachment is on t a virtual pubsub service (linking to an AP item),
                # we notify all subscribers
                for subscription in cached_node.subscriptions:
                    if subscription.state != SubscriptionState.SUBSCRIBED:
                        continue
                    self.apg.pubsub_service.notifyPublish(
                        author_jid,
                        attachment_node,
                        [(subscription.subscriber, None, [item_elt])],
                    )
            else:
                # the attachment is on an XMPP item, we publish it to the attachment node
                await self.apg._p.send_items(
                    client, author_jid, attachment_node, [item_elt]
                )

    async def handle_like_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ) -> None:
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        await self.handle_attachment_item(client, data, {"noticed": True})

    async def handle_emojireact_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ) -> None:
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        await self.handle_attachment_item(
            client, data, {"reactions": {"operation": "update", "add": [data["content"]]}}
        )

    async def handle_join_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ) -> None:
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}})

    async def handle_leave_activity(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str,
    ) -> None:
        client = await self.apg.get_virtual_client(requestor_actor_id, signing_actor)
        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}})

    async def ap_actor_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: Optional[str],
    ) -> dict:
        inbox = self.apg.build_apurl(TYPE_INBOX, ap_account)
        shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX)
        outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account)
        followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account)
        following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account)

        # we have to use AP account as preferredUsername because it is used to retrieve
        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
        preferred_username = ap_account.split("@", 1)[0]

        identity_data = await self.apg._i.get_identity(self.apg.client, account_jid)
        if node and node.startswith(self.apg._events.namespace):
            events = outbox
        else:
            events_account = await self.apg.get_ap_account_from_jid_and_node(
                account_jid, self.apg._events.namespace
            )
            events = self.apg.build_apurl(TYPE_OUTBOX, events_account)

        actor_data = {
            "@context": [
                "https://www.w3.org/ns/activitystreams",
                "https://w3id.org/security/v1",
            ],
            # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it
            #   if it is escaped
            "id": ap_url.replace("%40", "@"),
            "type": "Person",
            "preferredUsername": preferred_username,
            "inbox": inbox,
            "outbox": outbox,
            "events": events,
            "followers": followers,
            "following": following,
            "publicKey": {
                "id": f"{ap_url}#main-key",
                "owner": ap_url,
                "publicKeyPem": self.apg.public_key_pem,
            },
            "endpoints": {
                "sharedInbox": shared_inbox,
                "events": events,
            },
        }

        if identity_data.get("nicknames"):
            actor_data["name"] = identity_data["nicknames"][0]
        if identity_data.get("description"):
            # description is plain text while summary expects HTML
            actor_data["summary"] = html.escape(identity_data["description"])
        if identity_data.get("avatar"):
            avatar_data = identity_data["avatar"]
            try:
                filename = avatar_data["filename"]
                media_type = avatar_data["media_type"]
            except KeyError:
                log.error(f"incomplete avatar data: {identity_data!r}")
            else:
                avatar_url = self.apg.build_apurl("avatar", filename)
                actor_data["icon"] = {
                    "type": "Image",
                    "url": avatar_url,
                    "mediaType": media_type,
                }

        return actor_data

    def get_canonical_url(self, request: "HTTPRequest") -> str:
        return parse.urljoin(
            f"https://{self.apg.public_url}",
            request.path.decode().rstrip("/"),
            # we unescape "@" for the same reason as in [ap_actor_request]
        ).replace("%40", "@")

    def query_data_2_rsm_request(
        self, query_data: Dict[str, List[str]]
    ) -> rsm.RSMRequest:
        """Get RSM kwargs to use with RSMRequest from query data"""
        page = query_data.get("page")

        if page == ["first"]:
            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
        elif page == ["last"]:
            return rsm.RSMRequest(max_=PAGE_SIZE)
        else:
            for query_key in ("index", "before", "after"):
                try:
                    kwargs = {query_key: query_data[query_key][0], "max_": PAGE_SIZE}
                except (KeyError, IndexError, ValueError):
                    pass
                else:
                    return rsm.RSMRequest(**kwargs)
        raise ValueError(f"Invalid query data: {query_data!r}")

    async def ap_outbox_page_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        query_data: Dict[str, List[str]],
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace
        # we only keep useful keys, and sort to have consistent URL which can
        # be used as ID
        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
        query_data = {k: query_data[k] for k in url_keys}
        try:
            items, metadata = await self.apg._p.get_items(
                client=self.apg.client,
                service=account_jid,
                node=node,
                rsm_request=self.query_data_2_rsm_request(query_data),
                extra={C.KEY_USE_CACHE: False},
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}

        base_url = self.get_canonical_url(request)
        url = f"{base_url}?{parse.urlencode(query_data, True)}"
        if node and node.startswith(self.apg._events.namespace):
            ordered_items = [
                await self.apg.ap_events.event_data_2_ap_item(
                    self.apg._events.event_elt_2_event_data(item), account_jid
                )
                for item in reversed(items)
            ]
        else:
            ordered_items = [
                await self.apg.mb_data_2_ap_item(
                    self.apg.client,
                    await self.apg._m.item_2_mb_data(
                        self.apg.client, item, account_jid, node
                    ),
                )
                for item in reversed(items)
            ]
        ret_data = {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "id": url,
            "type": "OrderedCollectionPage",
            "partOf": base_url,
            "orderedItems": ordered_items,
        }

        if "rsm" not in metadata:
            # no RSM available, we return what we have
            return ret_data

        # AP OrderedCollection must be in reversed chronological order, thus the opposite
        # of what we get with RSM (at least with Libervia Pubsub)
        if not metadata["complete"]:
            try:
                last = metadata["rsm"]["last"]
            except KeyError:
                last = None
            ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
        if metadata["rsm"]["index"] != 0:
            try:
                first = metadata["rsm"]["first"]
            except KeyError:
                first = None
            ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"

        return ret_data

    async def ap_outbox_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: Optional[str],
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace

        parsed_url = parse.urlparse(request.uri.decode())
        query_data = parse.parse_qs(parsed_url.query)
        if query_data:
            return await self.ap_outbox_page_request(
                request, data, account_jid, node, ap_account, ap_url, query_data
            )

        # XXX: we can't use disco#info here because this request won't work on a bare jid
        # due to security considerations of XEP-0030 (we don't have presence
        # subscription).
        # The current workaround is to do a request as if RSM was available, and actually
        # check its availability according to result.
        try:
            __, metadata = await self.apg._p.get_items(
                client=self.apg.client,
                service=account_jid,
                node=node,
                max_items=0,
                rsm_request=rsm.RSMRequest(max_=0),
                extra={C.KEY_USE_CACHE: False},
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}
        try:
            items_count = metadata["rsm"]["count"]
        except KeyError:
            log.warning(
                f"No RSM metadata found when requesting pubsub node {node} at "
                f"{account_jid}, defaulting to items_count=20"
            )
            items_count = 20

        url = self.get_canonical_url(request)
        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
        return {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "id": url,
            "totalItems": items_count,
            "type": "OrderedCollection",
            "first": url_first_page,
            "last": url_last_page,
        }

    async def ap_inbox_request(
        self,
        requestor_actor_id: str,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str],
    ) -> None:
        assert data is not None
        if signing_actor is None:
            raise exceptions.InternalError("signing_actor must be set for inbox requests")
        await self.check_signing_actor(requestor_actor_id, data, signing_actor)
        activity_type = (data.get("type") or "").lower()
        if not activity_type in ACTIVITY_TYPES_LOWER:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"request is not an activity, ignoring",
            )

        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()!r} activity must target an account",
            )

        try:
            method = getattr(self, f"handle_{activity_type}_activity")
        except AttributeError:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()} activity is not yet supported",
            )
        else:
            await method(
                requestor_actor_id,
                request,
                data,
                account_jid,
                node,
                ap_account,
                ap_url,
                signing_actor,
            )

    async def ap_followers_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str],
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace
        client = self.apg.client
        subscribers = await self.apg._pps.get_public_node_subscriptions(
            client, account_jid, node
        )
        followers = []
        for subscriber in subscribers.keys():
            if self.apg.is_virtual_jid(subscriber):
                # the subscriber is an AP user subscribed with this gateway
                ap_account = self.apg._e.unescape(subscriber.user)
            else:
                # regular XMPP user
                ap_account = await self.apg.get_ap_account_from_jid_and_node(
                    subscriber, node
                )
            followers.append(ap_account)

        url = self.get_canonical_url(request)
        return {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "type": "OrderedCollection",
            "id": url,
            "totalItems": len(subscribers),
            "first": {
                "type": "OrderedCollectionPage",
                "id": url,
                "orderedItems": followers,
            },
        }

    async def ap_following_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str],
    ) -> dict[str, Any]:
        client = self.apg.client
        subscriptions = await self.apg._pps.subscriptions(client, account_jid, node)
        following = []
        for sub_dict in subscriptions:
            service = jid.JID(sub_dict["service"])
            if self.apg.is_virtual_jid(service):
                # the subscription is to an AP actor with this gateway
                ap_account = self.apg._e.unescape(service.user)
            else:
                # regular XMPP user
                ap_account = await self.apg.get_ap_account_from_jid_and_node(
                    service, sub_dict["node"]
                )
            following.append(ap_account)

        url = self.get_canonical_url(request)
        return {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "type": "OrderedCollection",
            "id": url,
            "totalItems": len(subscriptions),
            "first": {
                "type": "OrderedCollectionPage",
                "id": url,
                "orderedItems": following,
            },
        }

    def _get_to_log(
        self,
        request: "HTTPRequest",
        data: Optional[dict] = None,
    ) -> List[str]:
        """Get base data to logs in verbose mode"""
        from pprint import pformat

        to_log = [
            "",
            f"<<< got {request.method.decode()} request - {request.uri.decode()}",
        ]
        if data is not None:
            to_log.append(pformat(data))
        if self.apg.verbose >= 3:
            headers = "\n".join(
                f"    {k.decode()}: {v.decode()}"
                for k, v in request.getAllHeaders().items()
            )
            to_log.append(f"  headers:\n{headers}")
        return to_log

    def get_requestor_actor_id(
        self, data: dict | None = None, uri_extra_args: list[str] | None = None
    ) -> str:
        """Find the actor ID of the requestor.

        The requestor here is actually the local actor which will do the requests to
        achieve the task (e.g. retrieve external actor data), not the requestor of the
        received AP request.

        It will notably be used as requestor actor ID to sign HTTP requests. We need to
        sign GET request too to access instance checking HTTP GET signature (e.g. Mastodon
        instances set in "secure mode").

        We look for the destinee of the request and check if it's a local actor, and
        default to a generic one if we can't find it.

        Destinee is first checked in data if any, otherwise in request URI.

        @param data: parsed JSON data of original AP request, if any.
        @param uri_extra_args: arguments of the AP request as returned by
            [self.apg.parse_apurl]. It is most of time the destinee of the request.
        @return: requestor_actor_id to use to sign HTTP request needed to answer the
            original request.
        """
        # We first check for destinee in data.
        if data:
            try:
                for to_ in data["to"]:
                    if self.apg.is_local_url(to_):
                        url_type, url_args = self.apg.parse_apurl(to_)
                        if url_type != TYPE_ACTOR or not url_args:
                            continue
                        ap_account = url_args[0]
                        if (
                            not ap_account.endswith(f"@{self.apg.public_url}")
                            or ap_account.count("@") != 1
                        ):
                            continue
                        return to_
            except KeyError:
                pass

        # If nothing relevant, we try URI arguments.
        if uri_extra_args:
            ap_account = uri_extra_args[0]
            if (
                ap_account.endswith(f"@{self.apg.public_url}")
                and ap_account.count("@") == 1
            ):
                return self.apg.build_apurl(TYPE_ACTOR, ap_account)

        # Still nothing, we'll have to use a generic actor.
        log.warning(
            'Can\'t find destinee in "to" field, using generic requestor for signature.'
        )
        return self.apg.build_apurl(TYPE_ACTOR, f"libervia@{self.apg.public_url}")

    async def ap_request(
        self,
        request: "HTTPRequest",
        data: dict | None = None,
        signing_actor: str | None = None,
        requestor_actor_id: str | None = None,
    ) -> None:
        if self.apg.verbose:
            to_log = self._get_to_log(request, data)

        path = request.path.decode()
        ap_url = parse.urljoin(f"https://{self.apg.public_url}", path)
        request_type, extra_args = self.apg.parse_apurl(ap_url)

        header_accept = request.getHeader("accept") or ""
        if (
            MEDIA_TYPE_AP not in header_accept
            and MEDIA_TYPE_AP_ALT not in header_accept
            and request_type in self.apg.html_redirect
        ):
            # this is not a AP request, and we have a redirections for it
            kw = {}
            if extra_args:
                kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0])
                kw["jid_user"] = kw["jid"].user
                if kw["node"] is None:
                    kw["node"] = self.apg._m.namespace
                if len(extra_args) > 1:
                    kw["item"] = extra_args[1]
                else:
                    kw["item"] = ""
            else:
                kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", ""

            redirections = self.apg.html_redirect[request_type]
            for redirection in redirections:
                filters = redirection["filters"]
                if not filters:
                    break
                # if we have filter, they must all match
                elif all(v in kw[k] for k, v in filters.items()):
                    break
            else:
                # no redirection is matching
                redirection = None

            if redirection is not None:
                kw = {k: parse.quote(str(v), safe="") for k, v in kw.items()}
                target_url = redirection["url"].format(**kw)
                content = web_util.redirectTo(target_url.encode(), request)
                request.write(content)
                request.finish()
                return

        if requestor_actor_id is None:
            requestor_actor_id = self.get_requestor_actor_id(data, extra_args)
        if len(extra_args) == 0:
            if request_type != "shared_inbox":
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            ret_data = await self.ap_inbox_request(
                requestor_actor_id, request, data, None, None, None, ap_url, signing_actor
            )
        elif request_type == "avatar":
            if len(extra_args) != 1:
                raise exceptions.DataError("avatar argument expected in URL")
            avatar_filename = extra_args[0]
            avatar_path = self.apg.host.common_cache.get_path(avatar_filename)
            return static.File(str(avatar_path)).render(request)
        elif request_type == "item":
            ret_data = await self.apg.ap_get_local_object(ap_url)
            if "@context" not in ret_data:
                ret_data["@context"] = [NS_AP]
        else:
            if len(extra_args) > 1:
                log.warning(f"unexpected extra arguments: {extra_args!r}")
            ap_account = extra_args[0]
            account_jid, node = await self.apg.get_jid_and_node(ap_account)
            if request_type not in AP_REQUEST_TYPES.get(
                request.method.decode().upper(), []
            ):
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            method = getattr(self, f"ap_{request_type}_request")
            ret_data = await method(
                requestor_actor_id,
                request,
                data,
                account_jid,
                node,
                ap_account,
                ap_url,
                signing_actor,
            )
        if ret_data is not None:
            request.setHeader("content-type", CONTENT_TYPE_AP)
            request.write(json.dumps(ret_data).encode())
        if self.apg.verbose:
            to_log.append(f"--- RET (code: {request.code})---")
            if self.apg.verbose >= 2:
                if ret_data is not None:
                    from pprint import pformat

                    to_log.append(f"{pformat(ret_data)}")
                    to_log.append("---")
            log.info("\n".join(to_log))
        request.finish()

    async def ap_post_request(self, request: "HTTPRequest") -> None:
        try:
            data = json.load(request.content)
            if not isinstance(data, dict):
                log.warning(f"JSON data should be an object (uri={request.uri.decode()})")
                self.response_code(
                    request,
                    http.BAD_REQUEST,
                    f"invalid body, was expecting a JSON object",
                )
                request.finish()
                return
        except (json.JSONDecodeError, ValueError) as e:
            self.response_code(
                request, http.BAD_REQUEST, f"invalid json in inbox request: {e}"
            )
            request.finish()
            return
        else:
            request.content.seek(0)

        requestor_actor_id = self.get_requestor_actor_id(data)

        try:
            if data["type"] == "Delete" and data["actor"] == data["object"]:
                # we don't handle actor deletion
                request.setResponseCode(http.ACCEPTED)
                log.debug(f"ignoring actor deletion ({data['actor']})")
                # TODO: clean data in cache coming from this actor, maybe with a tombstone
                request.finish()
                return
        except KeyError:
            pass

        try:
            signing_actor = await self.check_signature(requestor_actor_id, request)
        except exceptions.EncryptionError as e:
            if self.apg.verbose:
                to_log = self._get_to_log(request)
                to_log.append(f"  body: {request.content.read()!r}")
                request.content.seek(0)
                log.info("\n".join(to_log))
            self.response_code(request, http.FORBIDDEN, f"invalid signature: {e}")
            request.finish()
            return
        except Exception as e:
            self.response_code(
                request, http.INTERNAL_SERVER_ERROR, f"Can't check signature: {e}"
            )
            request.finish()
            return

        request.setResponseCode(http.ACCEPTED)

        digest = request.getHeader("digest")
        if digest in self._seen_digest:
            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
            request.finish()
            return
        self._seen_digest.append(digest)

        # default response code, may be changed, e.g. in case of exception
        try:
            return await self.ap_request(
                request, data, signing_actor, requestor_actor_id=requestor_actor_id
            )
        except Exception as e:
            self._on_request_error(failure.Failure(e), request)

    async def check_signing_actor(
        self, requestor_actor_id: str, data: dict, signing_actor: str
    ) -> None:
        """That that signing actor correspond to actor declared in data

        @param requestor_actor_id: ID of the actor doing the request.
        @param data: request payload
        @param signing_actor: actor ID of the signing entity, as returned by
            check_signature
        @raise exceptions.NotFound: no actor found in data
        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
        """
        actor = await self.apg.ap_get_sender_actor(requestor_actor_id, data)

        if signing_actor != actor:
            raise exceptions.EncryptionError(
                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
            )

    async def check_signature(
        self, requestor_actor_id: str, request: "HTTPRequest"
    ) -> str:
        """Check and validate HTTP signature

        @param requestor_actor_id: ID of the actor doing the request.
        @return: id of the signing actor

        @raise exceptions.EncryptionError: signature is not present or doesn't match
        """
        signature = request.getHeader("Signature")
        if signature is None:
            raise exceptions.EncryptionError("No signature found")
        sign_data = {
            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
            for m in RE_SIG_PARAM.finditer(signature)
        }
        try:
            key_id = sign_data["keyId"]
        except KeyError:
            raise exceptions.EncryptionError('"keyId" is missing from signature')
        algorithm = sign_data.get("algorithm", HS2019)
        signed_headers = (
            sign_data.get("headers", "(created)" if algorithm == HS2019 else "date")
            .lower()
            .split()
        )
        try:
            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
        except KeyError:
            raise exceptions.InternalError(
                f"there should be a list of headers for {request.method} method"
            )
        if not headers_to_check:
            raise exceptions.InternalError("headers_to_check must not be empty")

        for header in headers_to_check:
            if isinstance(header, tuple):
                if len(set(header).intersection(signed_headers)) == 0:
                    raise exceptions.EncryptionError(
                        f"at least one of following header must be signed: {header}"
                    )
            elif header not in signed_headers:
                raise exceptions.EncryptionError(f"the {header!r} header must be signed")

        body = request.content.read()
        request.content.seek(0)
        headers = {}
        for to_sign in signed_headers:
            if to_sign == "(request-target)":
                method = request.method.decode().lower()
                uri = request.uri.decode()
                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
            elif to_sign in ("(created)", "(expires)"):
                if algorithm != HS2019:
                    raise exceptions.EncryptionError(
                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
                        "algorithm"
                    )
                key = to_sign[1:-1]
                value = sign_data.get(key)
                if not value:
                    raise exceptions.EncryptionError(
                        "{key!r} parameter is missing from signature"
                    )
                try:
                    if float(value) < 0:
                        raise ValueError
                except ValueError:
                    raise exceptions.EncryptionError(
                        f"{to_sign} must be a Unix timestamp"
                    )
                headers[to_sign] = value
            else:
                value = request.getHeader(to_sign)
                if not value:
                    raise exceptions.EncryptionError(
                        f"value of header {to_sign!r} is missing!"
                    )
                elif to_sign == "host":
                    # we check Forwarded/X-Forwarded-Host headers
                    # as we need original host if a proxy has modified the header
                    forwarded = request.getHeader("forwarded")
                    if forwarded is not None:
                        try:
                            host = [
                                f[5:]
                                for f in forwarded.split(";")
                                if f.startswith("host=")
                            ][0] or None
                        except IndexError:
                            host = None
                    else:
                        host = None
                    if host is None:
                        host = request.getHeader("x-forwarded-host")
                    if host:
                        value = host
                elif to_sign == "digest":
                    hashes = {
                        algo.lower(): hash_
                        for algo, hash_ in (
                            digest.split("=", 1) for digest in value.split(",")
                        )
                    }
                    try:
                        given_digest = hashes["sha-256"]
                    except KeyError:
                        raise exceptions.EncryptionError(
                            "Only SHA-256 algorithm is currently supported for digest"
                        )
                    __, computed_digest = self.apg.get_digest(body)
                    if given_digest != computed_digest:
                        raise exceptions.EncryptionError(
                            f"SHA-256 given and computed digest differ:\n"
                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
                        )
                headers[to_sign] = value

        # date check
        limit_ts = time.time() + SIGN_EXP
        if "(created)" in headers:
            created = float(headers["created"])
        else:
            created = date_utils.date_parse(headers["date"])

        try:
            expires = float(headers["expires"])
        except KeyError:
            pass
        else:
            if expires < created:
                log.warning(
                    f"(expires) [{expires}] set in the past of (created) [{created}] "
                    "ignoring it according to specs"
                )
            else:
                limit_ts = min(limit_ts, expires)

        if created > limit_ts:
            raise exceptions.EncryptionError("Signature has expired")

        try:
            return await self.apg.check_signature(
                requestor_actor_id, sign_data["signature"], key_id, headers
            )
        except exceptions.EncryptionError:
            method, url = headers["(request-target)"].rsplit(" ", 1)
            headers["(request-target)"] = f"{method} {parse.unquote(url)}"
            log.debug(
                "Using workaround for (request-target) encoding bug in signature, "
                "see https://github.com/mastodon/mastodon/issues/18871"
            )
            return await self.apg.check_signature(
                requestor_actor_id, sign_data["signature"], key_id, headers
            )

    def render(self, request):
        request.setHeader("server", VERSION)
        return super().render(request)

    def render_GET(self, request):
        path = request.path.decode().lstrip("/")
        if path.startswith(".well-known/webfinger"):
            defer.ensureDeferred(self.webfinger(request))
            return server.NOT_DONE_YET
        elif path.startswith(self.apg.ap_path):
            d = defer.ensureDeferred(self.ap_request(request))
            d.addErrback(self._on_request_error, request)
            return server.NOT_DONE_YET

        return web_resource.NoResource().render(request)

    def render_POST(self, request):
        path = request.path.decode().lstrip("/")
        if not path.startswith(self.apg.ap_path):
            return web_resource.NoResource().render(request)
        defer.ensureDeferred(self.ap_post_request(request))
        return server.NOT_DONE_YET


class HTTPRequest(server.Request):
    pass


class HTTPServer(server.Site):
    requestFactory = HTTPRequest

    def __init__(self, ap_gateway):
        super().__init__(HTTPAPGServer(ap_gateway))