view libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4143:849721e1563b

cli: `call` command: This command has 2 subcommands: `make` and `receive` to make a new call or wait for one. When call is in progress, a window will be created to show incoming stream and local feedback, and a text UI is available to (un)mute audio or video, and hang up. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:10:00 +0100
parents 7067b0d73183
children 5f2d496c633f
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import time
import html
from typing import Optional, Dict, List, Any
import json
from urllib import parse
from collections import deque
import unicodedata

from twisted.web import http, resource as web_resource, server
from twisted.web import static
from twisted.web import util as web_util
from twisted.python import failure
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from wokkel import pubsub, rsm

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import date_utils, uri
from libervia.backend.memory.sqla_mapping import SubscriptionState

from .constants import (
    NS_AP, MEDIA_TYPE_AP, MEDIA_TYPE_AP_ALT, CONTENT_TYPE_WEBFINGER, CONTENT_TYPE_AP,
    TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, TYPE_EVENT, AP_REQUEST_TYPES,
    PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019,
    SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION,
    ST_AP_CACHE
)
from .regex import RE_SIG_PARAM


log = getLogger(__name__)

VERSION = unicodedata.normalize(
    'NFKD',
    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
)


class HTTPAPGServer(web_resource.Resource):
    """HTTP Server handling ActivityPub S2S protocol"""
    isLeaf = True

    def __init__(self, ap_gateway):
        self.apg = ap_gateway
        self._seen_digest = deque(maxlen=50)
        super().__init__()

    def response_code(
        self,
        request: "HTTPRequest",
        http_code: int,
        msg: Optional[str] = None
    ) -> None:
        """Log and set HTTP return code and associated message"""
        if msg is not None:
            log.warning(msg)
        request.setResponseCode(http_code, None if msg is None else msg.encode())

    def _on_request_error(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
        exc = failure_.value
        if isinstance(exc, exceptions.NotFound):
            self.response_code(
                request,
                http.NOT_FOUND,
                str(exc)
            )
        else:
            log.exception(f"Internal error: {failure_.value}")
            self.response_code(
                request,
                http.INTERNAL_SERVER_ERROR,
                f"internal error: {failure_.value}"
            )
            request.finish()
            raise failure_

        request.finish()

    async def webfinger(self, request):
        url_parsed = parse.urlparse(request.uri.decode())
        query = parse.parse_qs(url_parsed.query)
        resource = query.get("resource", [""])[0]
        account = resource[5:].strip()
        if not resource.startswith("acct:") or not account:
            return web_resource.ErrorPage(
                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
            ).render(request)

        actor_url = self.apg.build_apurl(TYPE_ACTOR, account)

        resp = {
            "aliases": [actor_url],
            "subject": resource,
            "links": [
                {
                    "rel": "self",
                    "type": "application/activity+json",
                    "href": actor_url
                }
            ]
        }
        request.setHeader("content-type", CONTENT_TYPE_WEBFINGER)
        request.write(json.dumps(resp).encode())
        request.finish()

    async def handle_undo_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(signing_actor)
        object_ = data.get("object")
        if isinstance(object_, str):
            # we check first if it's not a cached object
            ap_cache_key = f"{ST_AP_CACHE}{object_}"
            value = await self.apg.client._ap_storage.get(ap_cache_key)
        else:
            value = None
        if value is not None:
            objects = [value]
            # because we'll undo the activity, we can remove it from cache
            await self.apg.client._ap_storage.remove(ap_cache_key)
        else:
            objects = await self.apg.ap_get_list(data, "object")
        for obj in objects:
            type_ = obj.get("type")
            actor = await self.apg.ap_get_sender_actor(obj)
            if actor != signing_actor:
                log.warning(f"ignoring object not attributed to signing actor: {data}")
                continue

            if type_ == "Follow":
                try:
                    target_account = obj["object"]
                except KeyError:
                    log.warning(f'ignoring invalid object, missing "object" key: {data}')
                    continue
                if not self.apg.is_local_url(target_account):
                    log.warning(f"ignoring unfollow request to non local actor: {data}")
                    continue
                await self.apg._p.unsubscribe(
                    client,
                    account_jid,
                    node,
                    sender=client.jid,
                )
            elif type_ == "Announce":
                # we can use directly the Announce object, as only the "id" field is
                # needed
                await self.apg.new_ap_delete_item(client, None, node, obj)
            elif type_ == TYPE_LIKE:
                await self.handle_attachment_item(client, obj, {"noticed": False})
            elif type_ == TYPE_REACTION:
                await self.handle_attachment_item(client, obj, {
                    "reactions": {"operation": "update", "remove": [obj["content"]]}
                })
            else:
                log.warning(f"Unmanaged undo type: {type_!r}")

    async def handle_follow_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(signing_actor)
        try:
            subscription = await self.apg._p.subscribe(
                client,
                account_jid,
                node,
                # subscriptions from AP are always public
                options=self.apg._pps.set_public_opt()
            )
        except pubsub.SubscriptionPending:
            log.info(f"subscription to node {node!r} of {account_jid} is pending")
        # TODO: manage SubscriptionUnconfigured
        else:
            if subscription.state != "subscribed":
                # other states should raise an Exception
                raise exceptions.InternalError('"subscribed" state was expected')
            inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False)
            actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account)
            accept_data = self.apg.create_activity(
                "Accept", actor_id, object_=data
            )
            await self.apg.sign_and_post(inbox, actor_id, accept_data)
        await self.apg._c.synchronise(client, account_jid, node, resync=False)

    async def handle_accept_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: str
    ) -> None:
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(signing_actor)
        objects = await self.apg.ap_get_list(data, "object")
        for obj in objects:
            type_ = obj.get("type")
            if type_ == "Follow":
                follow_node = await self.apg.host.memory.storage.get_pubsub_node(
                    client, client.jid, node, with_subscriptions=True
                )
                if follow_node is None:
                    log.warning(
                        f"Received a follow accept on an unknown node: {node!r} at "
                        f"{client.jid}. Ignoring it"
                    )
                    continue
                try:
                    sub = next(
                        s for s in follow_node.subscriptions if s.subscriber==account_jid
                    )
                except StopIteration:
                    log.warning(
                        "Received a follow accept on a node without subscription: "
                        f"{node!r} at {client.jid}. Ignoring it"
                    )
                else:
                    if sub.state == SubscriptionState.SUBSCRIBED:
                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
                    elif sub.state == SubscriptionState.PENDING:
                        follow_node.subscribed = True
                        sub.state = SubscriptionState.SUBSCRIBED
                        await self.apg.host.memory.storage.add(follow_node)
                    else:
                        raise exceptions.InternalError(
                            f"Unhandled subscription state {sub.state!r}"
                        )
            else:
                log.warning(f"Unmanaged accept type: {type_!r}")

    async def handle_delete_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ):
        if node is None:
            node = self.apg._m.namespace
        client = await self.apg.get_virtual_client(signing_actor)
        objects = await self.apg.ap_get_list(data, "object")
        for obj in objects:
            await self.apg.new_ap_delete_item(client, account_jid, node, obj)

    async def handle_new_ap_items(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        signing_actor: str,
        repeated: bool = False,
    ):
        """Helper method to handle workflow for new AP items

        accept globally the same parameter as for handle_create_activity
        @param repeated: if True, the item is an item republished from somewhere else
        """
        if "_repeated" in data:
            log.error(
                '"_repeated" field already present in given AP item, this should not '
                f"happen. Ignoring object from {signing_actor}\n{data}"
            )
            raise exceptions.DataError("unexpected field in item")
        client = await self.apg.get_virtual_client(signing_actor)
        objects = await self.apg.ap_get_list(data, "object")
        for obj in objects:
            if node is None:
                if obj.get("type") == TYPE_EVENT:
                    node = self.apg._events.namespace
                else:
                    node = self.apg._m.namespace
            sender = await self.apg.ap_get_sender_actor(obj)
            if repeated:
                # we don't check sender when item is repeated, as it should be different
                # from post author in this case
                sender_jid = await self.apg.get_jid_from_id(sender)
                repeater_jid = await self.apg.get_jid_from_id(signing_actor)
                repeated_item_id = obj["id"]
                if self.apg.is_local_url(repeated_item_id):
                    # the repeated object is from XMPP, we need to parse the URL to find
                    # the right ID
                    url_type, url_args = self.apg.parse_apurl(repeated_item_id)
                    if url_type != "item":
                        raise exceptions.DataError(
                            "local URI is not an item: {repeated_id}"
                        )
                    try:
                        url_account, url_item_id = url_args
                        if not url_account or not url_item_id:
                            raise ValueError
                    except (RuntimeError, ValueError):
                        raise exceptions.DataError(
                            "local URI is invalid: {repeated_id}"
                        )
                    else:
                        url_jid, url_node = await self.apg.get_jid_and_node(url_account)
                        if ((url_jid != sender_jid
                             or url_node and url_node != self.apg._m.namespace)):
                            raise exceptions.DataError(
                                "announced ID doesn't match sender ({sender}): "
                                f"[repeated_item_id]"
                            )

                    repeated_item_id = url_item_id

                obj["_repeated"] = {
                    "by": repeater_jid.full(),
                    "at": data.get("published"),
                    "uri": uri.build_xmpp_uri(
                        "pubsub",
                        path=sender_jid.full(),
                        node=self.apg._m.namespace,
                        item=repeated_item_id
                    )
                }
                # we must use activity's id and targets, not the original item ones
                for field in ("id", "to", "bto", "cc", "bcc"):
                    obj[field] = data.get(field)
            else:
                if sender != signing_actor:
                    log.warning(
                        "Ignoring object not attributed to signing actor: {obj}"
                    )
                    continue

            await self.apg.new_ap_item(client, account_jid, node, obj)

    async def handle_create_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ):
        await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)

    async def handle_update_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ):
        # Update is the same as create: the item ID stays the same, thus the item will be
        # overwritten
        await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)

    async def handle_announce_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ):
        # we create a new item
        await self.handle_new_ap_items(
            request,
            data,
            account_jid,
            node,
            signing_actor,
            repeated=True
        )

    async def handle_attachment_item(
        self,
        client: SatXMPPEntity,
        data: dict,
        attachment_data: dict
    ) -> None:
        target_ids = data.get("object")
        if not target_ids:
            raise exceptions.DataError("object should be set")
        elif isinstance(target_ids, list):
            try:
                target_ids = [o["id"] for o in target_ids]
            except (KeyError, TypeError):
                raise exceptions.DataError(f"invalid object: {target_ids!r}")
        elif isinstance(target_ids, dict):
            obj_id = target_ids.get("id")
            if not obj_id or not isinstance(obj_id, str):
                raise exceptions.DataError(f"invalid object: {target_ids!r}")
            target_ids = [obj_id]
        elif isinstance(target_ids, str):
            target_ids = [target_ids]

        # XXX: we have to cache AP items because some implementation (Pleroma notably)
        #   don't keep object accessible, and we need to be able to retrieve them for
        #   UNDO. Current implementation will grow, we need to add a way to flush it after
        #   a while.
        # TODO: add a way to flush old cached AP items.
        await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data)

        for target_id in target_ids:
            if not self.apg.is_local_url(target_id):
                log.debug(f"ignoring non local target ID: {target_id}")
                continue
            url_type, url_args = self.apg.parse_apurl(target_id)
            if url_type != TYPE_ITEM:
                log.warning(f"unexpected local URL for attachment on item {target_id}")
                continue
            try:
                account, item_id = url_args
            except ValueError:
                raise ValueError(f"invalid URL: {target_id}")
            author_jid, item_node = await self.apg.get_jid_and_node(account)
            if item_node is None:
                item_node = self.apg._m.namespace
            attachment_node = self.apg._pa.get_attachment_node_name(
                author_jid, item_node, item_id
            )
            cached_node = await self.apg.host.memory.storage.get_pubsub_node(
                client,
                author_jid,
                attachment_node,
                with_subscriptions=True,
                create=True
            )
            found_items, __ = await self.apg.host.memory.storage.get_items(
                cached_node, item_ids=[client.jid.userhost()]
            )
            if not found_items:
                old_item_elt = None
            else:
                found_item = found_items[0]
                old_item_elt = found_item.data

            item_elt = await self.apg._pa.apply_set_handler(
                client,
                {"extra": attachment_data},
                old_item_elt,
                None
            )
            # we reparse the element, as there can be other attachments
            attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt])
            # and we update the cache
            await self.apg.host.memory.storage.cache_pubsub_items(
                client,
                cached_node,
                [item_elt],
                attachments_data or [{}]
            )

            if self.apg.is_virtual_jid(author_jid):
                # the attachment is on t a virtual pubsub service (linking to an AP item),
                # we notify all subscribers
                for subscription in cached_node.subscriptions:
                    if subscription.state != SubscriptionState.SUBSCRIBED:
                        continue
                    self.apg.pubsub_service.notifyPublish(
                        author_jid,
                        attachment_node,
                        [(subscription.subscriber, None, [item_elt])]
                    )
            else:
                # the attachment is on an XMPP item, we publish it to the attachment node
                await self.apg._p.send_items(
                    client, author_jid, attachment_node, [item_elt]
                )

    async def handle_like_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ) -> None:
        client = await self.apg.get_virtual_client(signing_actor)
        await self.handle_attachment_item(client, data, {"noticed": True})

    async def handle_emojireact_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ) -> None:
        client = await self.apg.get_virtual_client(signing_actor)
        await self.handle_attachment_item(client, data, {
            "reactions": {"operation": "update", "add": [data["content"]]}
        })

    async def handle_join_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ) -> None:
        client = await self.apg.get_virtual_client(signing_actor)
        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}})

    async def handle_leave_activity(
        self,
        request: "HTTPRequest",
        data: dict,
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: str
    ) -> None:
        client = await self.apg.get_virtual_client(signing_actor)
        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}})

    async def ap_actor_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: Optional[str]
    ) -> dict:
        inbox = self.apg.build_apurl(TYPE_INBOX, ap_account)
        shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX)
        outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account)
        followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account)
        following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account)

        # we have to use AP account as preferredUsername because it is used to retrieve
        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
        preferred_username = ap_account.split("@", 1)[0]

        identity_data = await self.apg._i.get_identity(self.apg.client, account_jid)
        if node and node.startswith(self.apg._events.namespace):
            events = outbox
        else:
            events_account = await self.apg.get_ap_account_from_jid_and_node(
                account_jid, self.apg._events.namespace
            )
            events = self.apg.build_apurl(TYPE_OUTBOX, events_account)

        actor_data = {
            "@context": [
                "https://www.w3.org/ns/activitystreams",
                "https://w3id.org/security/v1"
            ],

            # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it
            #   if it is escaped
            "id": ap_url.replace("%40", "@"),
            "type": "Person",
            "preferredUsername": preferred_username,
            "inbox": inbox,
            "outbox": outbox,
            "events": events,
            "followers": followers,
            "following": following,
            "publicKey": {
                "id": f"{ap_url}#main-key",
                "owner": ap_url,
                "publicKeyPem": self.apg.public_key_pem
            },
            "endpoints": {
                "sharedInbox": shared_inbox,
                "events": events,
            },
        }

        if identity_data.get("nicknames"):
            actor_data["name"] = identity_data["nicknames"][0]
        if identity_data.get("description"):
            # description is plain text while summary expects HTML
            actor_data["summary"] = html.escape(identity_data["description"])
        if identity_data.get("avatar"):
            avatar_data = identity_data["avatar"]
            try:
                filename = avatar_data["filename"]
                media_type = avatar_data["media_type"]
            except KeyError:
                log.error(f"incomplete avatar data: {identity_data!r}")
            else:
                avatar_url = self.apg.build_apurl("avatar", filename)
                actor_data["icon"] = {
                    "type": "Image",
                    "url": avatar_url,
                    "mediaType": media_type
                }

        return actor_data

    def get_canonical_url(self, request: "HTTPRequest") -> str:
        return parse.urljoin(
            f"https://{self.apg.public_url}",
            request.path.decode().rstrip("/")
        # we unescape "@" for the same reason as in [ap_actor_request]
        ).replace("%40", "@")

    def query_data_2_rsm_request(
        self,
        query_data: Dict[str, List[str]]
    ) -> rsm.RSMRequest:
        """Get RSM kwargs to use with RSMRequest from query data"""
        page = query_data.get("page")

        if page == ["first"]:
            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
        elif page == ["last"]:
            return rsm.RSMRequest(max_=PAGE_SIZE)
        else:
            for query_key in ("index", "before", "after"):
                try:
                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
                except (KeyError, IndexError, ValueError):
                    pass
                else:
                    return rsm.RSMRequest(**kwargs)
        raise ValueError(f"Invalid query data: {query_data!r}")

    async def ap_outbox_page_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        query_data: Dict[str, List[str]]
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace
        # we only keep useful keys, and sort to have consistent URL which can
        # be used as ID
        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
        query_data = {k: query_data[k] for k in url_keys}
        try:
            items, metadata = await self.apg._p.get_items(
                client=self.apg.client,
                service=account_jid,
                node=node,
                rsm_request=self.query_data_2_rsm_request(query_data),
                extra = {C.KEY_USE_CACHE: False}
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}

        base_url = self.get_canonical_url(request)
        url = f"{base_url}?{parse.urlencode(query_data, True)}"
        if node and node.startswith(self.apg._events.namespace):
            ordered_items = [
                await self.apg.ap_events.event_data_2_ap_item(
                    self.apg._events.event_elt_2_event_data(item),
                    account_jid
                )
                for item in reversed(items)
            ]
        else:
            ordered_items = [
                await self.apg.mb_data_2_ap_item(
                    self.apg.client,
                    await self.apg._m.item_2_mb_data(
                        self.apg.client,
                        item,
                        account_jid,
                        node
                    )
                )
                for item in reversed(items)
            ]
        ret_data = {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "id": url,
            "type": "OrderedCollectionPage",
            "partOf": base_url,
            "orderedItems": ordered_items
        }

        if "rsm" not in metadata:
            # no RSM available, we return what we have
            return ret_data

        # AP OrderedCollection must be in reversed chronological order, thus the opposite
        # of what we get with RSM (at least with Libervia Pubsub)
        if not metadata["complete"]:
            try:
                last= metadata["rsm"]["last"]
            except KeyError:
                last = None
            ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
        if metadata["rsm"]["index"] != 0:
            try:
                first= metadata["rsm"]["first"]
            except KeyError:
                first = None
            ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"

        return ret_data

    async def ap_outbox_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: str,
        ap_url: str,
        signing_actor: Optional[str]
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace

        parsed_url = parse.urlparse(request.uri.decode())
        query_data = parse.parse_qs(parsed_url.query)
        if query_data:
            return await self.ap_outbox_page_request(
                request, data, account_jid, node, ap_account, ap_url, query_data
            )

        # XXX: we can't use disco#info here because this request won't work on a bare jid
        # due to security considerations of XEP-0030 (we don't have presence
        # subscription).
        # The current workaround is to do a request as if RSM was available, and actually
        # check its availability according to result.
        try:
            __, metadata = await self.apg._p.get_items(
                client=self.apg.client,
                service=account_jid,
                node=node,
                max_items=0,
                rsm_request=rsm.RSMRequest(max_=0),
                extra = {C.KEY_USE_CACHE: False}
            )
        except error.StanzaError as e:
            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
            return {}
        try:
            items_count = metadata["rsm"]["count"]
        except KeyError:
            log.warning(
                f"No RSM metadata found when requesting pubsub node {node} at "
                f"{account_jid}, defaulting to items_count=20"
            )
            items_count = 20

        url = self.get_canonical_url(request)
        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
        return {
            "@context": ["https://www.w3.org/ns/activitystreams"],
            "id": url,
            "totalItems": items_count,
            "type": "OrderedCollection",
            "first": url_first_page,
            "last": url_last_page,
        }

    async def ap_inbox_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: Optional[jid.JID],
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str]
    ) -> None:
        assert data is not None
        if signing_actor is None:
            raise exceptions.InternalError("signing_actor must be set for inbox requests")
        await self.check_signing_actor(data, signing_actor)
        activity_type = (data.get("type") or "").lower()
        if not activity_type in ACTIVITY_TYPES_LOWER:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"request is not an activity, ignoring"
            )

        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()!r} activity must target an account"
            )

        try:
            method = getattr(self, f"handle_{activity_type}_activity")
        except AttributeError:
            return self.response_code(
                request,
                http.UNSUPPORTED_MEDIA_TYPE,
                f"{activity_type.title()} activity is not yet supported"
            )
        else:
            await method(
                request, data, account_jid, node, ap_account, ap_url, signing_actor
            )

    async def ap_followers_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str]
    ) -> dict:
        if node is None:
            node = self.apg._m.namespace
        client = self.apg.client
        subscribers = await self.apg._pps.get_public_node_subscriptions(
            client, account_jid, node
        )
        followers = []
        for subscriber in subscribers.keys():
            if self.apg.is_virtual_jid(subscriber):
                # the subscriber is an AP user subscribed with this gateway
                ap_account = self.apg._e.unescape(subscriber.user)
            else:
                # regular XMPP user
                ap_account = await self.apg.get_ap_account_from_jid_and_node(subscriber, node)
            followers.append(ap_account)

        url = self.get_canonical_url(request)
        return {
          "@context": ["https://www.w3.org/ns/activitystreams"],
          "type": "OrderedCollection",
          "id": url,
          "totalItems": len(subscribers),
          "first": {
            "type": "OrderedCollectionPage",
            "id": url,
            "orderedItems": followers
          }
        }

    async def ap_following_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict],
        account_jid: jid.JID,
        node: Optional[str],
        ap_account: Optional[str],
        ap_url: str,
        signing_actor: Optional[str]
    ) -> dict[str, Any]:
        client = self.apg.client
        subscriptions = await self.apg._pps.subscriptions(
            client, account_jid, node
        )
        following = []
        for sub_dict in subscriptions:
            service = jid.JID(sub_dict["service"])
            if self.apg.is_virtual_jid(service):
                # the subscription is to an AP actor with this gateway
                ap_account = self.apg._e.unescape(service.user)
            else:
                # regular XMPP user
                ap_account = await self.apg.get_ap_account_from_jid_and_node(
                    service, sub_dict["node"]
                )
            following.append(ap_account)

        url = self.get_canonical_url(request)
        return {
          "@context": ["https://www.w3.org/ns/activitystreams"],
          "type": "OrderedCollection",
          "id": url,
          "totalItems": len(subscriptions),
          "first": {
            "type": "OrderedCollectionPage",
            "id": url,
            "orderedItems": following
          }
        }

    def _get_to_log(
        self,
        request: "HTTPRequest",
        data: Optional[dict] = None,
    ) -> List[str]:
        """Get base data to logs in verbose mode"""
        from pprint import pformat
        to_log = [
            "",
            f"<<< got {request.method.decode()} request - {request.uri.decode()}"
        ]
        if data is not None:
            to_log.append(pformat(data))
        if self.apg.verbose>=3:
            headers = "\n".join(
                f"    {k.decode()}: {v.decode()}"
                for k,v in request.getAllHeaders().items()
            )
            to_log.append(f"  headers:\n{headers}")
        return to_log

    async def ap_request(
        self,
        request: "HTTPRequest",
        data: Optional[dict] = None,
        signing_actor: Optional[str] = None
    ) -> None:
        if self.apg.verbose:
            to_log = self._get_to_log(request, data)

        path = request.path.decode()
        ap_url = parse.urljoin(
            f"https://{self.apg.public_url}",
            path
        )
        request_type, extra_args = self.apg.parse_apurl(ap_url)
        header_accept = request.getHeader("accept") or ""
        if ((MEDIA_TYPE_AP not in header_accept
             and MEDIA_TYPE_AP_ALT not in header_accept
             and request_type in self.apg.html_redirect)):
            # this is not a AP request, and we have a redirections for it
            kw = {}
            if extra_args:
                kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0])
                kw["jid_user"] = kw["jid"].user
                if kw["node"] is None:
                    kw["node"] = self.apg._m.namespace
                if len(extra_args) > 1:
                    kw["item"] = extra_args[1]
                else:
                    kw["item"] = ""
            else:
                kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", ""

            redirections = self.apg.html_redirect[request_type]
            for redirection in redirections:
                filters = redirection["filters"]
                if not filters:
                    break
                # if we have filter, they must all match
                elif all(v in kw[k] for k,v in filters.items()):
                    break
            else:
                # no redirection is matching
                redirection = None

            if redirection is not None:
                kw = {k: parse.quote(str(v), safe="") for k,v in kw.items()}
                target_url = redirection["url"].format(**kw)
                content = web_util.redirectTo(target_url.encode(), request)
                request.write(content)
                request.finish()
                return

        if len(extra_args) == 0:
            if request_type != "shared_inbox":
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            ret_data = await self.ap_inbox_request(
                request, data, None, None, None, ap_url, signing_actor
            )
        elif request_type == "avatar":
            if len(extra_args) != 1:
                raise exceptions.DataError("avatar argument expected in URL")
            avatar_filename = extra_args[0]
            avatar_path = self.apg.host.common_cache.getPath(avatar_filename)
            return static.File(str(avatar_path)).render(request)
        elif request_type == "item":
            ret_data = await self.apg.ap_get_local_object(ap_url)
            if "@context" not in ret_data:
                ret_data["@context"] = [NS_AP]
        else:
            if len(extra_args) > 1:
                log.warning(f"unexpected extra arguments: {extra_args!r}")
            ap_account = extra_args[0]
            account_jid, node = await self.apg.get_jid_and_node(ap_account)
            if request_type not in AP_REQUEST_TYPES.get(
                    request.method.decode().upper(), []
            ):
                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
            method = getattr(self, f"ap_{request_type}_request")
            ret_data = await method(
                request, data, account_jid, node, ap_account, ap_url, signing_actor
            )
        if ret_data is not None:
            request.setHeader("content-type", CONTENT_TYPE_AP)
            request.write(json.dumps(ret_data).encode())
        if self.apg.verbose:
            to_log.append(f"--- RET (code: {request.code})---")
            if self.apg.verbose>=2:
                if ret_data is not None:
                    from pprint import pformat
                    to_log.append(f"{pformat(ret_data)}")
                    to_log.append("---")
            log.info("\n".join(to_log))
        request.finish()

    async def ap_post_request(self, request: "HTTPRequest") -> None:
        try:
            data = json.load(request.content)
            if not isinstance(data, dict):
                log.warning(f"JSON data should be an object (uri={request.uri.decode()})")
                self.response_code(
                    request,
                    http.BAD_REQUEST,
                    f"invalid body, was expecting a JSON object"
                )
                request.finish()
                return
        except (json.JSONDecodeError, ValueError) as e:
            self.response_code(
                request,
                http.BAD_REQUEST,
                f"invalid json in inbox request: {e}"
            )
            request.finish()
            return
        else:
            request.content.seek(0)

        try:
            if data["type"] == "Delete" and data["actor"] == data["object"]:
                # we don't handle actor deletion
                request.setResponseCode(http.ACCEPTED)
                log.debug(f"ignoring actor deletion ({data['actor']})")
                # TODO: clean data in cache coming from this actor, maybe with a tombstone
                request.finish()
                return
        except KeyError:
            pass

        try:
            signing_actor = await self.check_signature(request)
        except exceptions.EncryptionError as e:
            if self.apg.verbose:
                to_log = self._get_to_log(request)
                to_log.append(f"  body: {request.content.read()!r}")
                request.content.seek(0)
                log.info("\n".join(to_log))
            self.response_code(
                request,
                http.FORBIDDEN,
                f"invalid signature: {e}"
            )
            request.finish()
            return
        except Exception as e:
            self.response_code(
                request,
                http.INTERNAL_SERVER_ERROR,
                f"Can't check signature: {e}"
            )
            request.finish()
            return

        request.setResponseCode(http.ACCEPTED)

        digest = request.getHeader("digest")
        if digest in self._seen_digest:
            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
            request.finish()
            return
        self._seen_digest.append(digest)

        # default response code, may be changed, e.g. in case of exception
        try:
            return await self.ap_request(request, data, signing_actor)
        except Exception as e:
            self._on_request_error(failure.Failure(e), request)

    async def check_signing_actor(self, data: dict, signing_actor: str) -> None:
        """That that signing actor correspond to actor declared in data

        @param data: request payload
        @param signing_actor: actor ID of the signing entity, as returned by
            check_signature
        @raise exceptions.NotFound: no actor found in data
        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
        """
        actor = await self.apg.ap_get_sender_actor(data)

        if signing_actor != actor:
            raise exceptions.EncryptionError(
                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
            )

    async def check_signature(self, request: "HTTPRequest") -> str:
        """Check and validate HTTP signature

        @return: id of the signing actor

        @raise exceptions.EncryptionError: signature is not present or doesn't match
        """
        signature = request.getHeader("Signature")
        if signature is None:
            raise exceptions.EncryptionError("No signature found")
        sign_data = {
            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
            for m in RE_SIG_PARAM.finditer(signature)
        }
        try:
            key_id = sign_data["keyId"]
        except KeyError:
            raise exceptions.EncryptionError('"keyId" is missing from signature')
        algorithm = sign_data.get("algorithm", HS2019)
        signed_headers = sign_data.get(
            "headers",
            "(created)" if algorithm==HS2019 else "date"
        ).lower().split()
        try:
            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
        except KeyError:
            raise exceptions.InternalError(
                f"there should be a list of headers for {request.method} method"
            )
        if not headers_to_check:
            raise exceptions.InternalError("headers_to_check must not be empty")

        for header in headers_to_check:
            if isinstance(header, tuple):
                if len(set(header).intersection(signed_headers)) == 0:
                    raise exceptions.EncryptionError(
                        f"at least one of following header must be signed: {header}"
                    )
            elif header not in signed_headers:
                raise exceptions.EncryptionError(
                    f"the {header!r} header must be signed"
                )

        body = request.content.read()
        request.content.seek(0)
        headers = {}
        for to_sign in signed_headers:
            if to_sign == "(request-target)":
                method = request.method.decode().lower()
                uri = request.uri.decode()
                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
            elif to_sign in ("(created)", "(expires)"):
                if algorithm != HS2019:
                    raise exceptions.EncryptionError(
                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
                        "algorithm"
                    )
                key = to_sign[1:-1]
                value = sign_data.get(key)
                if not value:
                    raise exceptions.EncryptionError(
                        "{key!r} parameter is missing from signature"
                    )
                try:
                    if float(value) < 0:
                        raise ValueError
                except ValueError:
                    raise exceptions.EncryptionError(
                        f"{to_sign} must be a Unix timestamp"
                    )
                headers[to_sign] = value
            else:
                value = request.getHeader(to_sign)
                if not value:
                    raise exceptions.EncryptionError(
                        f"value of header {to_sign!r} is missing!"
                    )
                elif to_sign == "host":
                    # we check Forwarded/X-Forwarded-Host headers
                    # as we need original host if a proxy has modified the header
                    forwarded = request.getHeader("forwarded")
                    if forwarded is not None:
                        try:
                            host = [
                                f[5:] for f in forwarded.split(";")
                                if f.startswith("host=")
                            ][0] or None
                        except IndexError:
                            host = None
                    else:
                        host = None
                    if host is None:
                        host = request.getHeader("x-forwarded-host")
                    if host:
                        value = host
                elif to_sign == "digest":
                    hashes = {
                        algo.lower(): hash_ for algo, hash_ in (
                            digest.split("=", 1) for digest in value.split(",")
                        )
                    }
                    try:
                        given_digest = hashes["sha-256"]
                    except KeyError:
                        raise exceptions.EncryptionError(
                            "Only SHA-256 algorithm is currently supported for digest"
                        )
                    __, computed_digest = self.apg.get_digest(body)
                    if given_digest != computed_digest:
                        raise exceptions.EncryptionError(
                            f"SHA-256 given and computed digest differ:\n"
                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
                        )
                headers[to_sign] = value

        # date check
        limit_ts = time.time() + SIGN_EXP
        if "(created)" in headers:
            created = float(headers["created"])
        else:
            created = date_utils.date_parse(headers["date"])


        try:
            expires = float(headers["expires"])
        except KeyError:
            pass
        else:
            if expires < created:
                log.warning(
                    f"(expires) [{expires}] set in the past of (created) [{created}] "
                    "ignoring it according to specs"
                )
            else:
                limit_ts = min(limit_ts, expires)

        if created > limit_ts:
            raise exceptions.EncryptionError("Signature has expired")

        try:
            return await self.apg.check_signature(
                sign_data["signature"],
                key_id,
                headers
            )
        except exceptions.EncryptionError:
            method, url = headers["(request-target)"].rsplit(' ', 1)
            headers["(request-target)"] = f"{method} {parse.unquote(url)}"
            log.debug(
                "Using workaround for (request-target) encoding bug in signature, "
                "see https://github.com/mastodon/mastodon/issues/18871"
            )
            return await self.apg.check_signature(
                sign_data["signature"],
                key_id,
                headers
            )

    def render(self, request):
        request.setHeader("server", VERSION)
        return super().render(request)

    def render_GET(self, request):
        path = request.path.decode().lstrip("/")
        if path.startswith(".well-known/webfinger"):
            defer.ensureDeferred(self.webfinger(request))
            return server.NOT_DONE_YET
        elif path.startswith(self.apg.ap_path):
            d = defer.ensureDeferred(self.ap_request(request))
            d.addErrback(self._on_request_error, request)
            return server.NOT_DONE_YET

        return web_resource.NoResource().render(request)

    def render_POST(self, request):
        path = request.path.decode().lstrip("/")
        if not path.startswith(self.apg.ap_path):
            return web_resource.NoResource().render(request)
        defer.ensureDeferred(self.ap_post_request(request))
        return server.NOT_DONE_YET


class HTTPRequest(server.Request):
    pass


class HTTPServer(server.Site):
    requestFactory = HTTPRequest

    def __init__(self, ap_gateway):
        super().__init__(HTTPAPGServer(ap_gateway))