diff libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/http_server.py@524856bd7b19
children 13b1079c27ec
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,1328 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import time
+import html
+from typing import Optional, Dict, List, Any
+import json
+from urllib import parse
+from collections import deque
+import unicodedata
+
+from twisted.web import http, resource as web_resource, server
+from twisted.web import static
+from twisted.web import util as web_util
+from twisted.python import failure
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, error
+from wokkel import pubsub, rsm
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.log import getLogger
+from libervia.backend.tools.common import date_utils, uri
+from libervia.backend.memory.sqla_mapping import SubscriptionState
+
+from .constants import (
+    NS_AP, MEDIA_TYPE_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX,
+    TYPE_OUTBOX, TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER,
+    ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS,
+    TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE
+)
+from .regex import RE_SIG_PARAM
+
+
+log = getLogger(__name__)
+
+VERSION = unicodedata.normalize(
+    'NFKD',
+    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
+)
+
+
+class HTTPAPGServer(web_resource.Resource):
+    """HTTP Server handling ActivityPub S2S protocol"""
+    isLeaf = True
+
+    def __init__(self, ap_gateway):
+        self.apg = ap_gateway
+        self._seen_digest = deque(maxlen=50)
+        super().__init__()
+
+    def response_code(
+        self,
+        request: "HTTPRequest",
+        http_code: int,
+        msg: Optional[str] = None
+    ) -> None:
+        """Log and set HTTP return code and associated message"""
+        if msg is not None:
+            log.warning(msg)
+        request.setResponseCode(http_code, None if msg is None else msg.encode())
+
+    def _on_request_error(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
+        exc = failure_.value
+        if isinstance(exc, exceptions.NotFound):
+            self.response_code(
+                request,
+                http.NOT_FOUND,
+                str(exc)
+            )
+        else:
+            log.exception(f"Internal error: {failure_.value}")
+            self.response_code(
+                request,
+                http.INTERNAL_SERVER_ERROR,
+                f"internal error: {failure_.value}"
+            )
+            request.finish()
+            raise failure_
+
+        request.finish()
+
+    async def webfinger(self, request):
+        url_parsed = parse.urlparse(request.uri.decode())
+        query = parse.parse_qs(url_parsed.query)
+        resource = query.get("resource", [""])[0]
+        account = resource[5:].strip()
+        if not resource.startswith("acct:") or not account:
+            return web_resource.ErrorPage(
+                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
+            ).render(request)
+
+        actor_url = self.apg.build_apurl(TYPE_ACTOR, account)
+
+        resp = {
+            "aliases": [actor_url],
+            "subject": resource,
+            "links": [
+                {
+                    "rel": "self",
+                    "type": "application/activity+json",
+                    "href": actor_url
+                }
+            ]
+        }
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        request.write(json.dumps(resp).encode())
+        request.finish()
+
+    async def handle_undo_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.get_virtual_client(signing_actor)
+        object_ = data.get("object")
+        if isinstance(object_, str):
+            # we check first if it's not a cached object
+            ap_cache_key = f"{ST_AP_CACHE}{object_}"
+            value = await self.apg.client._ap_storage.get(ap_cache_key)
+        else:
+            value = None
+        if value is not None:
+            objects = [value]
+            # because we'll undo the activity, we can remove it from cache
+            await self.apg.client._ap_storage.remove(ap_cache_key)
+        else:
+            objects = await self.apg.ap_get_list(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            actor = await self.apg.ap_get_sender_actor(obj)
+            if actor != signing_actor:
+                log.warning(f"ignoring object not attributed to signing actor: {data}")
+                continue
+
+            if type_ == "Follow":
+                try:
+                    target_account = obj["object"]
+                except KeyError:
+                    log.warning(f'ignoring invalid object, missing "object" key: {data}')
+                    continue
+                if not self.apg.is_local_url(target_account):
+                    log.warning(f"ignoring unfollow request to non local actor: {data}")
+                    continue
+                await self.apg._p.unsubscribe(
+                    client,
+                    account_jid,
+                    node,
+                    sender=client.jid,
+                )
+            elif type_ == "Announce":
+                # we can use directly the Announce object, as only the "id" field is
+                # needed
+                await self.apg.new_ap_delete_item(client, None, node, obj)
+            elif type_ == TYPE_LIKE:
+                await self.handle_attachment_item(client, obj, {"noticed": False})
+            elif type_ == TYPE_REACTION:
+                await self.handle_attachment_item(client, obj, {
+                    "reactions": {"operation": "update", "remove": [obj["content"]]}
+                })
+            else:
+                log.warning(f"Unmanaged undo type: {type_!r}")
+
+    async def handle_follow_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.get_virtual_client(signing_actor)
+        try:
+            subscription = await self.apg._p.subscribe(
+                client,
+                account_jid,
+                node,
+                # subscriptions from AP are always public
+                options=self.apg._pps.set_public_opt()
+            )
+        except pubsub.SubscriptionPending:
+            log.info(f"subscription to node {node!r} of {account_jid} is pending")
+        # TODO: manage SubscriptionUnconfigured
+        else:
+            if subscription.state != "subscribed":
+                # other states should raise an Exception
+                raise exceptions.InternalError('"subscribed" state was expected')
+            inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False)
+            actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account)
+            accept_data = self.apg.create_activity(
+                "Accept", actor_id, object_=data
+            )
+            await self.apg.sign_and_post(inbox, actor_id, accept_data)
+        await self.apg._c.synchronise(client, account_jid, node, resync=False)
+
+    async def handle_accept_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.get_virtual_client(signing_actor)
+        objects = await self.apg.ap_get_list(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            if type_ == "Follow":
+                follow_node = await self.apg.host.memory.storage.get_pubsub_node(
+                    client, client.jid, node, with_subscriptions=True
+                )
+                if follow_node is None:
+                    log.warning(
+                        f"Received a follow accept on an unknown node: {node!r} at "
+                        f"{client.jid}. Ignoring it"
+                    )
+                    continue
+                try:
+                    sub = next(
+                        s for s in follow_node.subscriptions if s.subscriber==account_jid
+                    )
+                except StopIteration:
+                    log.warning(
+                        "Received a follow accept on a node without subscription: "
+                        f"{node!r} at {client.jid}. Ignoring it"
+                    )
+                else:
+                    if sub.state == SubscriptionState.SUBSCRIBED:
+                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
+                    elif sub.state == SubscriptionState.PENDING:
+                        follow_node.subscribed = True
+                        sub.state = SubscriptionState.SUBSCRIBED
+                        await self.apg.host.memory.storage.add(follow_node)
+                    else:
+                        raise exceptions.InternalError(
+                            f"Unhandled subscription state {sub.state!r}"
+                        )
+            else:
+                log.warning(f"Unmanaged accept type: {type_!r}")
+
+    async def handle_delete_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.get_virtual_client(signing_actor)
+        objects = await self.apg.ap_get_list(data, "object")
+        for obj in objects:
+            await self.apg.new_ap_delete_item(client, account_jid, node, obj)
+
+    async def handle_new_ap_items(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        signing_actor: str,
+        repeated: bool = False,
+    ):
+        """Helper method to handle workflow for new AP items
+
+        accept globally the same parameter as for handle_create_activity
+        @param repeated: if True, the item is an item republished from somewhere else
+        """
+        if "_repeated" in data:
+            log.error(
+                '"_repeated" field already present in given AP item, this should not '
+                f"happen. Ignoring object from {signing_actor}\n{data}"
+            )
+            raise exceptions.DataError("unexpected field in item")
+        client = await self.apg.get_virtual_client(signing_actor)
+        objects = await self.apg.ap_get_list(data, "object")
+        for obj in objects:
+            if node is None:
+                if obj.get("type") == TYPE_EVENT:
+                    node = self.apg._events.namespace
+                else:
+                    node = self.apg._m.namespace
+            sender = await self.apg.ap_get_sender_actor(obj)
+            if repeated:
+                # we don't check sender when item is repeated, as it should be different
+                # from post author in this case
+                sender_jid = await self.apg.get_jid_from_id(sender)
+                repeater_jid = await self.apg.get_jid_from_id(signing_actor)
+                repeated_item_id = obj["id"]
+                if self.apg.is_local_url(repeated_item_id):
+                    # the repeated object is from XMPP, we need to parse the URL to find
+                    # the right ID
+                    url_type, url_args = self.apg.parse_apurl(repeated_item_id)
+                    if url_type != "item":
+                        raise exceptions.DataError(
+                            "local URI is not an item: {repeated_id}"
+                        )
+                    try:
+                        url_account, url_item_id = url_args
+                        if not url_account or not url_item_id:
+                            raise ValueError
+                    except (RuntimeError, ValueError):
+                        raise exceptions.DataError(
+                            "local URI is invalid: {repeated_id}"
+                        )
+                    else:
+                        url_jid, url_node = await self.apg.get_jid_and_node(url_account)
+                        if ((url_jid != sender_jid
+                             or url_node and url_node != self.apg._m.namespace)):
+                            raise exceptions.DataError(
+                                "announced ID doesn't match sender ({sender}): "
+                                f"[repeated_item_id]"
+                            )
+
+                    repeated_item_id = url_item_id
+
+                obj["_repeated"] = {
+                    "by": repeater_jid.full(),
+                    "at": data.get("published"),
+                    "uri": uri.build_xmpp_uri(
+                        "pubsub",
+                        path=sender_jid.full(),
+                        node=self.apg._m.namespace,
+                        item=repeated_item_id
+                    )
+                }
+                # we must use activity's id and targets, not the original item ones
+                for field in ("id", "to", "bto", "cc", "bcc"):
+                    obj[field] = data.get(field)
+            else:
+                if sender != signing_actor:
+                    log.warning(
+                        "Ignoring object not attributed to signing actor: {obj}"
+                    )
+                    continue
+
+            await self.apg.new_ap_item(client, account_jid, node, obj)
+
+    async def handle_create_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)
+
+    async def handle_update_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        # Update is the same as create: the item ID stays the same, thus the item will be
+        # overwritten
+        await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)
+
+    async def handle_announce_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        # we create a new item
+        await self.handle_new_ap_items(
+            request,
+            data,
+            account_jid,
+            node,
+            signing_actor,
+            repeated=True
+        )
+
+    async def handle_attachment_item(
+        self,
+        client: SatXMPPEntity,
+        data: dict,
+        attachment_data: dict
+    ) -> None:
+        target_ids = data.get("object")
+        if not target_ids:
+            raise exceptions.DataError("object should be set")
+        elif isinstance(target_ids, list):
+            try:
+                target_ids = [o["id"] for o in target_ids]
+            except (KeyError, TypeError):
+                raise exceptions.DataError(f"invalid object: {target_ids!r}")
+        elif isinstance(target_ids, dict):
+            obj_id = target_ids.get("id")
+            if not obj_id or not isinstance(obj_id, str):
+                raise exceptions.DataError(f"invalid object: {target_ids!r}")
+            target_ids = [obj_id]
+        elif isinstance(target_ids, str):
+            target_ids = [target_ids]
+
+        # XXX: we have to cache AP items because some implementation (Pleroma notably)
+        #   don't keep object accessible, and we need to be able to retrieve them for
+        #   UNDO. Current implementation will grow, we need to add a way to flush it after
+        #   a while.
+        # TODO: add a way to flush old cached AP items.
+        await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data)
+
+        for target_id in target_ids:
+            if not self.apg.is_local_url(target_id):
+                log.debug(f"ignoring non local target ID: {target_id}")
+                continue
+            url_type, url_args = self.apg.parse_apurl(target_id)
+            if url_type != TYPE_ITEM:
+                log.warning(f"unexpected local URL for attachment on item {target_id}")
+                continue
+            try:
+                account, item_id = url_args
+            except ValueError:
+                raise ValueError(f"invalid URL: {target_id}")
+            author_jid, item_node = await self.apg.get_jid_and_node(account)
+            if item_node is None:
+                item_node = self.apg._m.namespace
+            attachment_node = self.apg._pa.get_attachment_node_name(
+                author_jid, item_node, item_id
+            )
+            cached_node = await self.apg.host.memory.storage.get_pubsub_node(
+                client,
+                author_jid,
+                attachment_node,
+                with_subscriptions=True,
+                create=True
+            )
+            found_items, __ = await self.apg.host.memory.storage.get_items(
+                cached_node, item_ids=[client.jid.userhost()]
+            )
+            if not found_items:
+                old_item_elt = None
+            else:
+                found_item = found_items[0]
+                old_item_elt = found_item.data
+
+            item_elt = await self.apg._pa.apply_set_handler(
+                client,
+                {"extra": attachment_data},
+                old_item_elt,
+                None
+            )
+            # we reparse the element, as there can be other attachments
+            attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt])
+            # and we update the cache
+            await self.apg.host.memory.storage.cache_pubsub_items(
+                client,
+                cached_node,
+                [item_elt],
+                attachments_data or [{}]
+            )
+
+            if self.apg.is_virtual_jid(author_jid):
+                # the attachment is on t a virtual pubsub service (linking to an AP item),
+                # we notify all subscribers
+                for subscription in cached_node.subscriptions:
+                    if subscription.state != SubscriptionState.SUBSCRIBED:
+                        continue
+                    self.apg.pubsub_service.notifyPublish(
+                        author_jid,
+                        attachment_node,
+                        [(subscription.subscriber, None, [item_elt])]
+                    )
+            else:
+                # the attachment is on an XMPP item, we publish it to the attachment node
+                await self.apg._p.send_items(
+                    client, author_jid, attachment_node, [item_elt]
+                )
+
+    async def handle_like_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.get_virtual_client(signing_actor)
+        await self.handle_attachment_item(client, data, {"noticed": True})
+
+    async def handle_emojireact_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.get_virtual_client(signing_actor)
+        await self.handle_attachment_item(client, data, {
+            "reactions": {"operation": "update", "add": [data["content"]]}
+        })
+
+    async def handle_join_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.get_virtual_client(signing_actor)
+        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}})
+
+    async def handle_leave_activity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.get_virtual_client(signing_actor)
+        await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}})
+
+    async def ap_actor_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict:
+        inbox = self.apg.build_apurl(TYPE_INBOX, ap_account)
+        shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX)
+        outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account)
+        followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account)
+        following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account)
+
+        # we have to use AP account as preferredUsername because it is used to retrieve
+        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
+        preferred_username = ap_account.split("@", 1)[0]
+
+        identity_data = await self.apg._i.get_identity(self.apg.client, account_jid)
+        if node and node.startswith(self.apg._events.namespace):
+            events = outbox
+        else:
+            events_account = await self.apg.get_ap_account_from_jid_and_node(
+                account_jid, self.apg._events.namespace
+            )
+            events = self.apg.build_apurl(TYPE_OUTBOX, events_account)
+
+        actor_data = {
+            "@context": [
+                "https://www.w3.org/ns/activitystreams",
+                "https://w3id.org/security/v1"
+            ],
+
+            # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it
+            #   if it is escaped
+            "id": ap_url.replace("%40", "@"),
+            "type": "Person",
+            "preferredUsername": preferred_username,
+            "inbox": inbox,
+            "outbox": outbox,
+            "events": events,
+            "followers": followers,
+            "following": following,
+            "publicKey": {
+                "id": f"{ap_url}#main-key",
+                "owner": ap_url,
+                "publicKeyPem": self.apg.public_key_pem
+            },
+            "endpoints": {
+                "sharedInbox": shared_inbox,
+                "events": events,
+            },
+        }
+
+        if identity_data.get("nicknames"):
+            actor_data["name"] = identity_data["nicknames"][0]
+        if identity_data.get("description"):
+            # description is plain text while summary expects HTML
+            actor_data["summary"] = html.escape(identity_data["description"])
+        if identity_data.get("avatar"):
+            avatar_data = identity_data["avatar"]
+            try:
+                filename = avatar_data["filename"]
+                media_type = avatar_data["media_type"]
+            except KeyError:
+                log.error(f"incomplete avatar data: {identity_data!r}")
+            else:
+                avatar_url = self.apg.build_apurl("avatar", filename)
+                actor_data["icon"] = {
+                    "type": "Image",
+                    "url": avatar_url,
+                    "mediaType": media_type
+                }
+
+        return actor_data
+
+    def get_canonical_url(self, request: "HTTPRequest") -> str:
+        return parse.urljoin(
+            f"https://{self.apg.public_url}",
+            request.path.decode().rstrip("/")
+        # we unescape "@" for the same reason as in [ap_actor_request]
+        ).replace("%40", "@")
+
+    def query_data_2_rsm_request(
+        self,
+        query_data: Dict[str, List[str]]
+    ) -> rsm.RSMRequest:
+        """Get RSM kwargs to use with RSMRequest from query data"""
+        page = query_data.get("page")
+
+        if page == ["first"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
+        elif page == ["last"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE)
+        else:
+            for query_key in ("index", "before", "after"):
+                try:
+                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
+                except (KeyError, IndexError, ValueError):
+                    pass
+                else:
+                    return rsm.RSMRequest(**kwargs)
+        raise ValueError(f"Invalid query data: {query_data!r}")
+
+    async def ap_outbox_page_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        query_data: Dict[str, List[str]]
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+        # we only keep useful keys, and sort to have consistent URL which can
+        # be used as ID
+        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
+        query_data = {k: query_data[k] for k in url_keys}
+        try:
+            items, metadata = await self.apg._p.get_items(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                rsm_request=self.query_data_2_rsm_request(query_data),
+                extra = {C.KEY_USE_CACHE: False}
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+
+        base_url = self.get_canonical_url(request)
+        url = f"{base_url}?{parse.urlencode(query_data, True)}"
+        if node and node.startswith(self.apg._events.namespace):
+            ordered_items = [
+                await self.apg.ap_events.event_data_2_ap_item(
+                    self.apg._events.event_elt_2_event_data(item),
+                    account_jid
+                )
+                for item in reversed(items)
+            ]
+        else:
+            ordered_items = [
+                await self.apg.mb_data_2_ap_item(
+                    self.apg.client,
+                    await self.apg._m.item_2_mb_data(
+                        self.apg.client,
+                        item,
+                        account_jid,
+                        node
+                    )
+                )
+                for item in reversed(items)
+            ]
+        ret_data = {
+            "@context": ["https://www.w3.org/ns/activitystreams"],
+            "id": url,
+            "type": "OrderedCollectionPage",
+            "partOf": base_url,
+            "orderedItems": ordered_items
+        }
+
+        if "rsm" not in metadata:
+            # no RSM available, we return what we have
+            return ret_data
+
+        # AP OrderedCollection must be in reversed chronological order, thus the opposite
+        # of what we get with RSM (at least with Libervia Pubsub)
+        if not metadata["complete"]:
+            try:
+                last= metadata["rsm"]["last"]
+            except KeyError:
+                last = None
+            ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
+        if metadata["rsm"]["index"] != 0:
+            try:
+                first= metadata["rsm"]["first"]
+            except KeyError:
+                first = None
+            ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
+
+        return ret_data
+
+    async def ap_outbox_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+
+        parsed_url = parse.urlparse(request.uri.decode())
+        query_data = parse.parse_qs(parsed_url.query)
+        if query_data:
+            return await self.ap_outbox_page_request(
+                request, data, account_jid, node, ap_account, ap_url, query_data
+            )
+
+        # XXX: we can't use disco#info here because this request won't work on a bare jid
+        # due to security considerations of XEP-0030 (we don't have presence
+        # subscription).
+        # The current workaround is to do a request as if RSM was available, and actually
+        # check its availability according to result.
+        try:
+            __, metadata = await self.apg._p.get_items(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                max_items=0,
+                rsm_request=rsm.RSMRequest(max_=0),
+                extra = {C.KEY_USE_CACHE: False}
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+        try:
+            items_count = metadata["rsm"]["count"]
+        except KeyError:
+            log.warning(
+                f"No RSM metadata found when requesting pubsub node {node} at "
+                f"{account_jid}, defaulting to items_count=20"
+            )
+            items_count = 20
+
+        url = self.get_canonical_url(request)
+        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
+        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
+        return {
+            "@context": ["https://www.w3.org/ns/activitystreams"],
+            "id": url,
+            "totalItems": items_count,
+            "type": "OrderedCollection",
+            "first": url_first_page,
+            "last": url_last_page,
+        }
+
+    async def ap_inbox_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> None:
+        assert data is not None
+        if signing_actor is None:
+            raise exceptions.InternalError("signing_actor must be set for inbox requests")
+        await self.check_signing_actor(data, signing_actor)
+        activity_type = (data.get("type") or "").lower()
+        if not activity_type in ACTIVITY_TYPES_LOWER:
+            return self.response_code(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"request is not an activity, ignoring"
+            )
+
+        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
+            return self.response_code(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()!r} activity must target an account"
+            )
+
+        try:
+            method = getattr(self, f"handle_{activity_type}_activity")
+        except AttributeError:
+            return self.response_code(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()} activity is not yet supported"
+            )
+        else:
+            await method(
+                request, data, account_jid, node, ap_account, ap_url, signing_actor
+            )
+
+    async def ap_followers_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+        client = self.apg.client
+        subscribers = await self.apg._pps.get_public_node_subscriptions(
+            client, account_jid, node
+        )
+        followers = []
+        for subscriber in subscribers.keys():
+            if self.apg.is_virtual_jid(subscriber):
+                # the subscriber is an AP user subscribed with this gateway
+                ap_account = self.apg._e.unescape(subscriber.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.get_ap_account_from_jid_and_node(subscriber, node)
+            followers.append(ap_account)
+
+        url = self.get_canonical_url(request)
+        return {
+          "@context": ["https://www.w3.org/ns/activitystreams"],
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscribers),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": followers
+          }
+        }
+
+    async def ap_following_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict],
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict[str, Any]:
+        client = self.apg.client
+        subscriptions = await self.apg._pps.subscriptions(
+            client, account_jid, node
+        )
+        following = []
+        for sub_dict in subscriptions:
+            service = jid.JID(sub_dict["service"])
+            if self.apg.is_virtual_jid(service):
+                # the subscription is to an AP actor with this gateway
+                ap_account = self.apg._e.unescape(service.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.get_ap_account_from_jid_and_node(
+                    service, sub_dict["node"]
+                )
+            following.append(ap_account)
+
+        url = self.get_canonical_url(request)
+        return {
+          "@context": ["https://www.w3.org/ns/activitystreams"],
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscriptions),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": following
+          }
+        }
+
+    def _get_to_log(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict] = None,
+    ) -> List[str]:
+        """Get base data to logs in verbose mode"""
+        from pprint import pformat
+        to_log = [
+            "",
+            f"<<< got {request.method.decode()} request - {request.uri.decode()}"
+        ]
+        if data is not None:
+            to_log.append(pformat(data))
+        if self.apg.verbose>=3:
+            headers = "\n".join(
+                f"    {k.decode()}: {v.decode()}"
+                for k,v in request.getAllHeaders().items()
+            )
+            to_log.append(f"  headers:\n{headers}")
+        return to_log
+
+    async def ap_request(
+        self,
+        request: "HTTPRequest",
+        data: Optional[dict] = None,
+        signing_actor: Optional[str] = None
+    ) -> None:
+        if self.apg.verbose:
+            to_log = self._get_to_log(request, data)
+
+        path = request.path.decode()
+        ap_url = parse.urljoin(
+            f"https://{self.apg.public_url}",
+            path
+        )
+        request_type, extra_args = self.apg.parse_apurl(ap_url)
+        if ((MEDIA_TYPE_AP not in (request.getHeader("accept") or "")
+             and request_type in self.apg.html_redirect)):
+            # this is not a AP request, and we have a redirections for it
+            kw = {}
+            if extra_args:
+                kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0])
+                kw["jid_user"] = kw["jid"].user
+                if kw["node"] is None:
+                    kw["node"] = self.apg._m.namespace
+                if len(extra_args) > 1:
+                    kw["item"] = extra_args[1]
+                else:
+                    kw["item"] = ""
+            else:
+                kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", ""
+
+            redirections = self.apg.html_redirect[request_type]
+            for redirection in redirections:
+                filters = redirection["filters"]
+                if not filters:
+                    break
+                # if we have filter, they must all match
+                elif all(v in kw[k] for k,v in filters.items()):
+                    break
+            else:
+                # no redirection is matching
+                redirection = None
+
+            if redirection is not None:
+                kw = {k: parse.quote(str(v), safe="") for k,v in kw.items()}
+                target_url = redirection["url"].format(**kw)
+                content = web_util.redirectTo(target_url.encode(), request)
+                request.write(content)
+                request.finish()
+                return
+
+        if len(extra_args) == 0:
+            if request_type != "shared_inbox":
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            ret_data = await self.ap_inbox_request(
+                request, data, None, None, None, ap_url, signing_actor
+            )
+        elif request_type == "avatar":
+            if len(extra_args) != 1:
+                raise exceptions.DataError("avatar argument expected in URL")
+            avatar_filename = extra_args[0]
+            avatar_path = self.apg.host.common_cache.getPath(avatar_filename)
+            return static.File(str(avatar_path)).render(request)
+        elif request_type == "item":
+            ret_data = await self.apg.ap_get_local_object(ap_url)
+            if "@context" not in ret_data:
+                ret_data["@context"] = [NS_AP]
+        else:
+            if len(extra_args) > 1:
+                log.warning(f"unexpected extra arguments: {extra_args!r}")
+            ap_account = extra_args[0]
+            account_jid, node = await self.apg.get_jid_and_node(ap_account)
+            if request_type not in AP_REQUEST_TYPES.get(
+                    request.method.decode().upper(), []
+            ):
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            method = getattr(self, f"AP{request_type.title()}Request")
+            ret_data = await method(
+                request, data, account_jid, node, ap_account, ap_url, signing_actor
+            )
+        if ret_data is not None:
+            request.setHeader("content-type", CONTENT_TYPE_AP)
+            request.write(json.dumps(ret_data).encode())
+        if self.apg.verbose:
+            to_log.append(f"--- RET (code: {request.code})---")
+            if self.apg.verbose>=2:
+                if ret_data is not None:
+                    from pprint import pformat
+                    to_log.append(f"{pformat(ret_data)}")
+                    to_log.append("---")
+            log.info("\n".join(to_log))
+        request.finish()
+
+    async def ap_post_request(self, request: "HTTPRequest") -> None:
+        try:
+            data = json.load(request.content)
+            if not isinstance(data, dict):
+                log.warning(f"JSON data should be an object (uri={request.uri.decode()})")
+                self.response_code(
+                    request,
+                    http.BAD_REQUEST,
+                    f"invalid body, was expecting a JSON object"
+                )
+                request.finish()
+                return
+        except (json.JSONDecodeError, ValueError) as e:
+            self.response_code(
+                request,
+                http.BAD_REQUEST,
+                f"invalid json in inbox request: {e}"
+            )
+            request.finish()
+            return
+        else:
+            request.content.seek(0)
+
+        try:
+            if data["type"] == "Delete" and data["actor"] == data["object"]:
+                # we don't handle actor deletion
+                request.setResponseCode(http.ACCEPTED)
+                log.debug(f"ignoring actor deletion ({data['actor']})")
+                # TODO: clean data in cache coming from this actor, maybe with a tombstone
+                request.finish()
+                return
+        except KeyError:
+            pass
+
+        try:
+            signing_actor = await self.check_signature(request)
+        except exceptions.EncryptionError as e:
+            if self.apg.verbose:
+                to_log = self._get_to_log(request)
+                to_log.append(f"  body: {request.content.read()!r}")
+                request.content.seek(0)
+                log.info("\n".join(to_log))
+            self.response_code(
+                request,
+                http.FORBIDDEN,
+                f"invalid signature: {e}"
+            )
+            request.finish()
+            return
+        except Exception as e:
+            self.response_code(
+                request,
+                http.INTERNAL_SERVER_ERROR,
+                f"Can't check signature: {e}"
+            )
+            request.finish()
+            return
+
+        request.setResponseCode(http.ACCEPTED)
+
+        digest = request.getHeader("digest")
+        if digest in self._seen_digest:
+            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
+            request.finish()
+            return
+        self._seen_digest.append(digest)
+
+        # default response code, may be changed, e.g. in case of exception
+        try:
+            return await self.ap_request(request, data, signing_actor)
+        except Exception as e:
+            self._on_request_error(failure.Failure(e), request)
+
+    async def check_signing_actor(self, data: dict, signing_actor: str) -> None:
+        """That that signing actor correspond to actor declared in data
+
+        @param data: request payload
+        @param signing_actor: actor ID of the signing entity, as returned by
+            check_signature
+        @raise exceptions.NotFound: no actor found in data
+        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
+        """
+        actor = await self.apg.ap_get_sender_actor(data)
+
+        if signing_actor != actor:
+            raise exceptions.EncryptionError(
+                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
+            )
+
+    async def check_signature(self, request: "HTTPRequest") -> str:
+        """Check and validate HTTP signature
+
+        @return: id of the signing actor
+
+        @raise exceptions.EncryptionError: signature is not present or doesn't match
+        """
+        signature = request.getHeader("Signature")
+        if signature is None:
+            raise exceptions.EncryptionError("No signature found")
+        sign_data = {
+            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
+            for m in RE_SIG_PARAM.finditer(signature)
+        }
+        try:
+            key_id = sign_data["keyId"]
+        except KeyError:
+            raise exceptions.EncryptionError('"keyId" is missing from signature')
+        algorithm = sign_data.get("algorithm", HS2019)
+        signed_headers = sign_data.get(
+            "headers",
+            "(created)" if algorithm==HS2019 else "date"
+        ).lower().split()
+        try:
+            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
+        except KeyError:
+            raise exceptions.InternalError(
+                f"there should be a list of headers for {request.method} method"
+            )
+        if not headers_to_check:
+            raise exceptions.InternalError("headers_to_check must not be empty")
+
+        for header in headers_to_check:
+            if isinstance(header, tuple):
+                if len(set(header).intersection(signed_headers)) == 0:
+                    raise exceptions.EncryptionError(
+                        f"at least one of following header must be signed: {header}"
+                    )
+            elif header not in signed_headers:
+                raise exceptions.EncryptionError(
+                    f"the {header!r} header must be signed"
+                )
+
+        body = request.content.read()
+        request.content.seek(0)
+        headers = {}
+        for to_sign in signed_headers:
+            if to_sign == "(request-target)":
+                method = request.method.decode().lower()
+                uri = request.uri.decode()
+                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
+            elif to_sign in ("(created)", "(expires)"):
+                if algorithm != HS2019:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
+                        "algorithm"
+                    )
+                key = to_sign[1:-1]
+                value = sign_data.get(key)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        "{key!r} parameter is missing from signature"
+                    )
+                try:
+                    if float(value) < 0:
+                        raise ValueError
+                except ValueError:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign} must be a Unix timestamp"
+                    )
+                headers[to_sign] = value
+            else:
+                value = request.getHeader(to_sign)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        f"value of header {to_sign!r} is missing!"
+                    )
+                elif to_sign == "host":
+                    # we check Forwarded/X-Forwarded-Host headers
+                    # as we need original host if a proxy has modified the header
+                    forwarded = request.getHeader("forwarded")
+                    if forwarded is not None:
+                        try:
+                            host = [
+                                f[5:] for f in forwarded.split(";")
+                                if f.startswith("host=")
+                            ][0] or None
+                        except IndexError:
+                            host = None
+                    else:
+                        host = None
+                    if host is None:
+                        host = request.getHeader("x-forwarded-host")
+                    if host:
+                        value = host
+                elif to_sign == "digest":
+                    hashes = {
+                        algo.lower(): hash_ for algo, hash_ in (
+                            digest.split("=", 1) for digest in value.split(",")
+                        )
+                    }
+                    try:
+                        given_digest = hashes["sha-256"]
+                    except KeyError:
+                        raise exceptions.EncryptionError(
+                            "Only SHA-256 algorithm is currently supported for digest"
+                        )
+                    __, computed_digest = self.apg.get_digest(body)
+                    if given_digest != computed_digest:
+                        raise exceptions.EncryptionError(
+                            f"SHA-256 given and computed digest differ:\n"
+                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
+                        )
+                headers[to_sign] = value
+
+        # date check
+        limit_ts = time.time() + SIGN_EXP
+        if "(created)" in headers:
+            created = float(headers["created"])
+        else:
+            created = date_utils.date_parse(headers["date"])
+
+
+        try:
+            expires = float(headers["expires"])
+        except KeyError:
+            pass
+        else:
+            if expires < created:
+                log.warning(
+                    f"(expires) [{expires}] set in the past of (created) [{created}] "
+                    "ignoring it according to specs"
+                )
+            else:
+                limit_ts = min(limit_ts, expires)
+
+        if created > limit_ts:
+            raise exceptions.EncryptionError("Signature has expired")
+
+        try:
+            return await self.apg.check_signature(
+                sign_data["signature"],
+                key_id,
+                headers
+            )
+        except exceptions.EncryptionError:
+            method, url = headers["(request-target)"].rsplit(' ', 1)
+            headers["(request-target)"] = f"{method} {parse.unquote(url)}"
+            log.debug(
+                "Using workaround for (request-target) encoding bug in signature, "
+                "see https://github.com/mastodon/mastodon/issues/18871"
+            )
+            return await self.apg.check_signature(
+                sign_data["signature"],
+                key_id,
+                headers
+            )
+
+    def render(self, request):
+        request.setHeader("server", VERSION)
+        return super().render(request)
+
+    def render_GET(self, request):
+        path = request.path.decode().lstrip("/")
+        if path.startswith(".well-known/webfinger"):
+            defer.ensureDeferred(self.webfinger(request))
+            return server.NOT_DONE_YET
+        elif path.startswith(self.apg.ap_path):
+            d = defer.ensureDeferred(self.ap_request(request))
+            d.addErrback(self._on_request_error, request)
+            return server.NOT_DONE_YET
+
+        return web_resource.NoResource().render(request)
+
+    def render_POST(self, request):
+        path = request.path.decode().lstrip("/")
+        if not path.startswith(self.apg.ap_path):
+            return web_resource.NoResource().render(request)
+        defer.ensureDeferred(self.ap_post_request(request))
+        return server.NOT_DONE_YET
+
+
+class HTTPRequest(server.Request):
+    pass
+
+
+class HTTPServer(server.Site):
+    requestFactory = HTTPRequest
+
+    def __init__(self, ap_gateway):
+        super().__init__(HTTPAPGServer(ap_gateway))