view tests/unit/test_ap-gateway.py @ 3813:1a10b8b4f169

core (memory/sqla): `session_add` must have a default value in `delete`
author Goffi <goffi@goffi.org>
date Wed, 29 Jun 2022 10:16:03 +0200
parents 04b57c0b2278
children 81c79b7cafa7
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from copy import deepcopy
from unittest.mock import MagicMock, AsyncMock, patch, DEFAULT
from urllib import parse
from functools import partial

import pytest
from pytest_twisted import ensureDeferred as ed
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.web.server import Request
from twisted.words.xish import domish
from wokkel import rsm, pubsub
from treq.response import _Response as TReqResponse

from sat.core import exceptions
from sat.core.constants import Const as C
from sat.plugins import plugin_comp_ap_gateway
from sat.plugins.plugin_comp_ap_gateway import constants as ap_const
from sat.plugins.plugin_comp_ap_gateway.http_server import HTTPServer
from sat.plugins.plugin_xep_0277 import NS_ATOM
from sat.plugins.plugin_xep_0422 import NS_FASTEN
from sat.plugins.plugin_xep_0424 import NS_MESSAGE_RETRACT
from sat.plugins.plugin_xep_0465 import NS_PPS
from sat.tools.utils import xmpp_date
from sat.tools import xml_tools
from sat.plugins.plugin_comp_ap_gateway import TYPE_ACTOR
from sat.memory.sqla_mapping import SubscriptionState


TEST_BASE_URL = "https://example.org"
TEST_USER = "test_user"
TEST_AP_ACCOUNT = f"{TEST_USER}@example.org"
TEST_AP_ACTOR_ID = f"{TEST_BASE_URL}/users/{TEST_USER}"
PUBLIC_URL = "test.example"
TEST_JID = jid.JID(f"some_user@{PUBLIC_URL}")

AP_REQUESTS = {
    f"{TEST_BASE_URL}/.well-known/webfinger?"
    f"resource=acct:{parse.quote(TEST_AP_ACCOUNT)}": {
        "aliases": [
            f"{TEST_BASE_URL}/@{TEST_USER}",
            f"{TEST_BASE_URL}/users/{TEST_USER}"
        ],
        "links": [
            {
                "href": f"{TEST_BASE_URL}/users/{TEST_USER}",
                "rel": "self",
                "type": "application/activity+json"
            },
        ],
        "subject": f"acct:{TEST_AP_ACCOUNT}"
    },

    f"{TEST_BASE_URL}/users/{TEST_USER}": {
        "@context": [
            "https://www.w3.org/ns/activitystreams",
        ],
        "endpoints": {
            "sharedInbox": f"{TEST_BASE_URL}/inbox"
        },
        "followers": f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
        "following": f"{TEST_BASE_URL}/users/{TEST_USER}/following",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}",
        "inbox": f"{TEST_BASE_URL}/users/{TEST_USER}/inbox",
        "name": "",
        "outbox": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox",
        "preferredUsername": f"{TEST_USER}",
        "type": "Person",
        "url": f"{TEST_BASE_URL}/@{TEST_USER}"
    },
    f"{TEST_BASE_URL}/.well-known/webfinger?"
    f"resource=acct:{parse.quote('ext_user@example.org')}": {
        "aliases": [
            f"{TEST_BASE_URL}/@ext_user",
            f"{TEST_BASE_URL}/users/ext_user"
        ],
        "links": [
            {
                "href": f"{TEST_BASE_URL}/users/ext_user",
                "rel": "self",
                "type": "application/activity+json"
            },
        ],
        "subject": f"acct:ext_user@example.org"
    },
    f"{TEST_BASE_URL}/users/ext_user": {
        "@context": [
            "https://www.w3.org/ns/activitystreams",
        ],
        "endpoints": {
            "sharedInbox": f"{TEST_BASE_URL}/inbox"
        },
        "followers": f"{TEST_BASE_URL}/users/ext_user/followers",
        "following": f"{TEST_BASE_URL}/users/ext_user/following",
        "id": f"{TEST_BASE_URL}/users/ext_user",
        "inbox": f"{TEST_BASE_URL}/users/ext_user/inbox",
        "name": "",
        "outbox": f"{TEST_BASE_URL}/users/ext_user/outbox",
        "preferredUsername": f"ext_user",
        "type": "Person",
        "url": f"{TEST_BASE_URL}/@ext_user"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/outbox": {
        "@context": "https://www.w3.org/ns/activitystreams",
        "first": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox",
        "last": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true",
        "totalItems": 4,
        "type": "OrderedCollection"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true": {
        "@context": [
            "https://www.w3.org/ns/activitystreams",
        ],
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true",
        "orderedItems": [
            {
                "actor": f"{TEST_BASE_URL}/users/{TEST_USER}",
                "cc": [
                    f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                ],
                "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/1/activity",
                "object": {
                    "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}",
                    "cc": [
                        f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                    ],
                    "content": "<p>test message 1</p>",
                    "contentMap": {
                        "en": "<p>test message 1</p>"
                    },
                    "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/1",
                    "inReplyTo": None,
                    "published": "2021-12-16T17:28:03Z",
                    "sensitive": False,
                    "summary": None,
                    "tag": [],
                    "to": [
                        "https://www.w3.org/ns/activitystreams#Public"
                    ],
                    "type": "Note",
                    "url": f"{TEST_BASE_URL}/@{TEST_USER}/1"
                },
                "published": "2021-12-16T17:28:03Z",
                "to": [
                    "https://www.w3.org/ns/activitystreams#Public"
                ],
                "type": "Create"
            },
            {
                "actor": f"{TEST_BASE_URL}/users/{TEST_USER}",
                "cc": [
                    f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                ],
                "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/2/activity",
                "object": {
                    "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}",
                    "cc": [
                        f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                    ],
                    "content": "<p>test message 2</p>",
                    "contentMap": {
                        "en": "<p>test message 2</p>"
                    },
                    "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/2",
                    "inReplyTo": None,
                    "published": "2021-12-16T17:27:03Z",
                    "sensitive": False,
                    "summary": None,
                    "tag": [],
                    "to": [
                        "https://www.w3.org/ns/activitystreams#Public"
                    ],
                    "type": "Note",
                    "url": f"{TEST_BASE_URL}/@{TEST_USER}/2"
                },
                "published": "2021-12-16T17:27:03Z",
                "to": [
                    "https://www.w3.org/ns/activitystreams#Public"
                ],
                "type": "Create"
            },
            {
                "actor": f"{TEST_BASE_URL}/users/{TEST_USER}",
                "cc": [
                    f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                ],
                "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/3/activity",
                "object": {
                    "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}",
                    "cc": [
                        f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                    ],
                    "content": "<p>test message 3</p>",
                    "contentMap": {
                        "en": "<p>test message 3</p>"
                    },
                    "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/3",
                    "inReplyTo": None,
                    "published": "2021-12-16T17:26:03Z",
                    "sensitive": False,
                    "summary": None,
                    "tag": [],
                    "to": [
                        "https://www.w3.org/ns/activitystreams#Public"
                    ],
                    "type": "Note",
                    "url": f"{TEST_BASE_URL}/@{TEST_USER}/3"
                },
                "published": "2021-12-16T17:26:03Z",
                "to": [
                    "https://www.w3.org/ns/activitystreams#Public"
                ],
                "type": "Create"
            },
            {
                "actor": f"{TEST_BASE_URL}/users/{TEST_USER}",
                "cc": [
                    f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                ],
                "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/4/activity",
                "object": {
                    "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}",
                    "cc": [
                        f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
                    ],
                    "content": "<p>test message 4</p>",
                    "contentMap": {
                        "en": "<p>test message 4</p>"
                    },
                    "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/4",
                    "inReplyTo": None,
                    "published": "2021-12-16T17:25:03Z",
                    "sensitive": False,
                    "summary": None,
                    "tag": [],
                    "to": [
                        "https://www.w3.org/ns/activitystreams#Public"
                    ],
                    "type": "Note",
                    "url": f"{TEST_BASE_URL}/@{TEST_USER}/4"
                },
                "published": "2021-12-16T17:25:03Z",
                "to": [
                    "https://www.w3.org/ns/activitystreams#Public"
                ],
                "type": "Create"
            },
    ],
        "partOf": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox",
        "prev": None,
        "type": "OrderedCollectionPage"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/following": {
        "@context": "https://www.w3.org/ns/activitystreams",
        "first": f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/following",
        "totalItems": 2,
        "type": "OrderedCollection"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1": {
        "@context": "https://www.w3.org/ns/activitystreams",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1",
        "orderedItems": [
            f"{TEST_BASE_URL}/users/ext_user",
            f"https://{PUBLIC_URL}/_ap/{TYPE_ACTOR}/local_user%40{PUBLIC_URL}",
        ],
        "partOf": "{TEST_BASE_URL}/users/{TEST_USER}/following",
        "totalItems": 2,
        "type": "OrderedCollectionPage"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/followers": {
        "@context": "https://www.w3.org/ns/activitystreams",
        "first": f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/followers",
        "totalItems": 2,
        "type": "OrderedCollection"
    },
    f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1": {
        "@context": "https://www.w3.org/ns/activitystreams",
        "id": f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1",
        "orderedItems": [
            f"{TEST_BASE_URL}/users/ext_user",
            f"https://{PUBLIC_URL}/_ap/{TYPE_ACTOR}/local_user%40{PUBLIC_URL}",
        ],
        "partOf": "{TEST_BASE_URL}/users/{TEST_USER}/followers",
        "totalItems": 2,
        "type": "OrderedCollectionPage"
    },

}

XMPP_ITEM_TPL = """
<item id='{id}' publisher='{publisher_jid}'>
  <entry xmlns='http://www.w3.org/2005/Atom' xml:lang='en'>
    <title type='xhtml'>
      <div xmlns='http://www.w3.org/1999/xhtml'>
        <p>
          XMPP item {id}
        </p>
      </div>
    </title>
    <title type='text'>
      XMPP item {id}
    </title>
    <author>
      <name>
        test_user
      </name>
      <uri>
        xmpp:{publisher_jid}
      </uri>
    </author>
    <updated>
      {updated}
    </updated>
    <published>
      {published}
    </published>
    <id>
      xmpp:{publisher_jid}?;node=urn%3Axmpp%3Amicroblog%3A0;item={id}
    </id>
  </entry>
</item>
"""

ITEM_BASE_TS = 1643385499
XMPP_ITEMS = [
    xml_tools.parse(
        "".join(
            l.strip() for l in XMPP_ITEM_TPL.format(
                id=i,
                publisher_jid="some_user@test.example",
                updated=xmpp_date(ITEM_BASE_TS + i * 60),
                published=xmpp_date(ITEM_BASE_TS + i * 60),
            ).split("\n")
        ),
        namespace=pubsub.NS_PUBSUB
    )
    for i in range(1, 5)
]


async def mock_ap_get(url):
    return deepcopy(AP_REQUESTS[url])


async def mock_treq_json(data):
    return dict(data)


async def mock_getItems(*args, **kwargs):
    ret_items = kwargs.pop("ret_items", XMPP_ITEMS)
    rsm_resp = rsm.RSMResponse(
        first=ret_items[0]["id"],
        last=ret_items[-1]["id"],
        index=0,
        count=len(ret_items)
    )
    return ret_items, {"rsm": rsm_resp.toDict(), "complete": True}


def getVirtualClient(jid):
    client = MagicMock()
    client.jid = jid
    return client


class FakeTReqPostResponse:
    code = 202


@pytest.fixture(scope="session")
def ap_gateway(host):
    gateway = plugin_comp_ap_gateway.APGateway(host)
    gateway.initialised = True
    gateway.isPubsub = AsyncMock()
    gateway.isPubsub.return_value = False
    client = MagicMock()
    client.jid = jid.JID("ap.test.example")
    client.host = "test.example"
    client.getVirtualClient = getVirtualClient
    gateway.client = client
    gateway.local_only = True
    gateway.public_url = PUBLIC_URL
    gateway.ap_path = '_ap'
    gateway.base_ap_url = parse.urljoin(
        f"https://{gateway.public_url}",
        f"{gateway.ap_path}/"
    )
    gateway.server = HTTPServer(gateway)
    return gateway


class TestActivityPubGateway:

    def getTitleXHTML(self, item_elt: domish.Element) -> domish.Element:
        return next(
            t for t in item_elt.entry.elements(NS_ATOM, "title")
            if t.getAttribute("type") == "xhtml"
        )


    @ed
    async def test_jid_and_node_convert_to_ap_handle(self, ap_gateway):
        """JID and pubsub node are converted correctly to an AP actor handle"""
        get_account = ap_gateway.getAPAccountFromJidAndNode

        # local jid
        assert await get_account(
            jid_ = jid.JID("simple@test.example"),
            node = None
        ) == "simple@test.example"

        # non local jid
        assert await get_account(
            jid_ = jid.JID("simple@example.org"),
            node = None
        ) == "___simple.40example.2eorg@ap.test.example"

        # local jid with non microblog node
        assert await get_account(
            jid_ = jid.JID("simple@test.example"),
            node = "some_other_node"
        ) == "some_other_node---simple@test.example"

        # local pubsub node
        with patch.object(ap_gateway, "isPubsub") as isPubsub:
            isPubsub.return_value = True
            assert await get_account(
                jid_ = jid.JID("pubsub.test.example"),
                node = "some_node"
            ) == "some_node@pubsub.test.example"

        # non local pubsub node
        with patch.object(ap_gateway, "isPubsub") as isPubsub:
            isPubsub.return_value = True
            assert await get_account(
                jid_ = jid.JID("pubsub.example.org"),
                node = "some_node"
            ) == "___some_node.40pubsub.2eexample.2eorg@ap.test.example"

    @ed
    async def test_ap_handle_convert_to_jid_and_node(self, ap_gateway, monkeypatch):
        """AP actor handle convert correctly to JID and pubsub node"""
        get_jid_node = ap_gateway.getJIDAndNode

        # for following assertion, host is not a pubsub service
        with patch.object(ap_gateway, "isPubsub") as isPubsub:
            isPubsub.return_value = False

            # simple local jid
            assert await get_jid_node(
                "toto@test.example"
            ) == (jid.JID("toto@test.example"), None)

            # simple external jid

            ## with "local_only" set, it should raise an exception
            with pytest.raises(exceptions.PermissionError):
                await get_jid_node("toto@example.org")

            ## with "local_only" unset, it should work
            with monkeypatch.context() as m:
                m.setattr(ap_gateway, "local_only", False, raising=True)
                assert await get_jid_node(
                    "toto@example.org"
                ) == (jid.JID("toto@example.org"), None)

            # explicit node
            assert await get_jid_node(
                "tata---toto@test.example"
            ) == (jid.JID("toto@test.example"), "tata")

        # for following assertion, host is a pubsub service
        with patch.object(ap_gateway, "isPubsub") as isPubsub:
            isPubsub.return_value = True

            # simple local node
            assert await get_jid_node(
                "toto@pubsub.test.example"
            ) == (jid.JID("pubsub.test.example"), "toto")

            # encoded local node
            assert await get_jid_node(
                "___urn.3axmpp.3amicroblog.3a0@pubsub.test.example"
            ) == (jid.JID("pubsub.test.example"), "urn:xmpp:microblog:0")

    @ed
    async def test_ap_to_pubsub_conversion(self, ap_gateway, monkeypatch):
        """AP requests are converted to pubsub"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)

        actor_data = await ap_gateway.getAPActorDataFromAccount(TEST_AP_ACCOUNT)
        outbox = await ap_gateway.apGetObject(actor_data, "outbox")
        items, rsm_resp = await ap_gateway.getAPItems(outbox, 2)

        assert rsm_resp.count == 4
        assert rsm_resp.index == 0
        assert rsm_resp.first == "https://example.org/users/test_user/statuses/4"
        assert rsm_resp.last == "https://example.org/users/test_user/statuses/3"

        title_xhtml = self.getTitleXHTML(items[0])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 4</p></div>"
            "</title>"
        )
        author_uri = str(
            [e for e in items[0].entry.author.elements() if e.name == "uri"][0]
        )
        assert author_uri == "xmpp:test_user\\40example.org@ap.test.example"
        assert str(items[0].entry.published) == "2021-12-16T17:25:03Z"

        title_xhtml = self.getTitleXHTML(items[1])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 3</p></div>"
            "</title>"
        )
        author_uri = str(
            [e for e in items[1].entry.author.elements() if e.name == "uri"][0]
        )
        assert author_uri == "xmpp:test_user\\40example.org@ap.test.example"
        assert str(items[1].entry.published) == "2021-12-16T17:26:03Z"

        items, rsm_resp = await ap_gateway.getAPItems(
            outbox,
            max_items=2,
            after_id="https://example.org/users/test_user/statuses/3",
        )

        assert rsm_resp.count == 4
        assert rsm_resp.index == 2
        assert rsm_resp.first == "https://example.org/users/test_user/statuses/2"
        assert rsm_resp.last == "https://example.org/users/test_user/statuses/1"

        title_xhtml = self.getTitleXHTML(items[0])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 2</p></div>"
            "</title>"
        )
        author_uri = str(
            [e for e in items[0].entry.author.elements() if e.name == "uri"][0]
        )
        assert author_uri == "xmpp:test_user\\40example.org@ap.test.example"
        assert str(items[0].entry.published) == "2021-12-16T17:27:03Z"

        title_xhtml = self.getTitleXHTML(items[1])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 1</p></div>"
            "</title>"
        )
        author_uri = str(
            [e for e in items[1].entry.author.elements() if e.name == "uri"][0]
        )
        assert author_uri == "xmpp:test_user\\40example.org@ap.test.example"
        assert str(items[1].entry.published) == "2021-12-16T17:28:03Z"

        items, rsm_resp = await ap_gateway.getAPItems(
            outbox,
            max_items=1,
            start_index=2
        )

        assert rsm_resp.count == 4
        assert rsm_resp.index == 2
        assert rsm_resp.first == "https://example.org/users/test_user/statuses/2"
        assert rsm_resp.last == "https://example.org/users/test_user/statuses/2"
        assert len(items) == 1

        title_xhtml = self.getTitleXHTML(items[0])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 2</p></div>"
            "</title>"
        )
        assert str(items[0].entry.published) == "2021-12-16T17:27:03Z"

        items, rsm_resp = await ap_gateway.getAPItems(
            outbox,
            max_items=3,
            chronological_pagination=False
        )
        assert rsm_resp.count == 4
        assert rsm_resp.index == 1
        assert rsm_resp.first == "https://example.org/users/test_user/statuses/3"
        assert rsm_resp.last == "https://example.org/users/test_user/statuses/1"
        assert len(items) == 3
        title_xhtml = self.getTitleXHTML(items[0])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 3</p></div>"
            "</title>"
        )
        title_xhtml = self.getTitleXHTML(items[2])
        assert title_xhtml.toXml() == (
            "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>"
            "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 1</p></div>"
            "</title>"
        )

    def ap_request_params(self, ap_gateway, type_=None, url=None, query_data=None):
        """Generate parameters for HTTPAPGServer's AP*Request

        @param type_: one of the AP query type (e.g. "outbox")
        @param url: URL to query (mutually exclusif with type_)
        @param query_data: query data as returned by parse.parse_qs
        @return dict with kw params to use
        """
        assert type_ and url is None or url and type_ is None
        if type_ is not None:
            path = f"_ap/{type_}/some_user%40test.example"
        else:
            url_parsed = parse.urlparse(url)
            path = url_parsed.path.lstrip("/")
            type_ = path.split("/")[1]
            if query_data is None:
                query_data = parse.parse_qs(url_parsed.query)

        if query_data:
            uri = f"{path}?{parse.urlencode(query_data, doseq=True)}"
        else:
            uri = path

        test_jid = jid.JID("some_user@test.example")
        request = Request(MagicMock())
        request.path = path.encode()
        request.uri = uri.encode()
        ap_url = parse.urljoin(
            f"https://{ap_gateway.public_url}",
            path
        )
        kwargs = {
            "request": request,
            "account_jid": test_jid,
            "node": None,
            "ap_account": test_jid.full(),
            "ap_url": ap_url,
            "signing_actor": None
        }
        if type_ == "outbox" and query_data:
            kwargs["query_data"] = query_data
            # signing_actor is not used for page requests
            del kwargs["signing_actor"]
        return kwargs

    @ed
    async def test_pubsub_to_ap_conversion(self, ap_gateway, monkeypatch):
        """Pubsub nodes are converted to AP collections"""
        monkeypatch.setattr(ap_gateway._p, "getItems", mock_getItems)
        outbox = await ap_gateway.server.resource.APOutboxRequest(
            **self.ap_request_params(ap_gateway, "outbox")
        )
        assert outbox["@context"] == "https://www.w3.org/ns/activitystreams"
        assert outbox["id"] == "https://test.example/_ap/outbox/some_user%40test.example"
        assert outbox["totalItems"] == len(XMPP_ITEMS)
        assert outbox["type"] == "OrderedCollection"
        assert outbox["first"]
        assert outbox["last"]

        first_page = await ap_gateway.server.resource.APOutboxPageRequest(
            **self.ap_request_params(ap_gateway, url=outbox["first"])
        )
        assert first_page["@context"] == "https://www.w3.org/ns/activitystreams"
        assert first_page["id"] == "https://test.example/_ap/outbox/some_user%40test.example?page=first"
        assert first_page["type"] == "OrderedCollectionPage"
        assert first_page["partOf"] == outbox["id"]
        assert len(first_page["orderedItems"]) == len(XMPP_ITEMS)
        first_item = first_page["orderedItems"][0]
        assert first_item["@context"] == "https://www.w3.org/ns/activitystreams"
        assert first_item["id"] == "https://test.example/_ap/item/some_user%40test.example/4"
        assert first_item["actor"] == "https://test.example/_ap/actor/some_user%40test.example"
        assert first_item["type"] == "Create"
        first_item_obj = first_item["object"]
        assert first_item_obj["id"] == first_item["id"]
        assert first_item_obj["type"] == "Note"
        assert first_item_obj["published"] == "2022-01-28T16:02:19Z"
        assert first_item_obj["attributedTo"] == first_item["actor"]
        assert first_item_obj["content"] == "<div><p>XMPP item 4</p></div>"
        assert first_item_obj["to"] == ["https://www.w3.org/ns/activitystreams#Public"]

    @ed
    async def test_following_to_pps(self, ap_gateway, monkeypatch):
        """AP following items are converted to Public Pubsub Subscription subscriptions"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)

        items, __ = await ap_gateway.pubsub_service.items(
            jid.JID("toto@example.org"),
            ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT),
            ap_gateway._pps.subscriptions_node,
            None,
            None,
            None
        )
        assert len(items) == 2
        for idx, entity in enumerate((
                "local_user@test.example",
                "ext_user\\40example.org@ap.test.example"
        )):
            subscription_elt = next(items[idx].elements(NS_PPS, "subscription"), None)
            assert subscription_elt is not None
            assert subscription_elt["node"] == ap_gateway._m.namespace
            assert subscription_elt["service"] == entity

    @ed
    async def test_followers_to_pps(self, ap_gateway, monkeypatch):
        """AP followers items are converted to Public Pubsub Subscription subscribers"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)

        items, __ = await ap_gateway.pubsub_service.items(
            jid.JID("toto@example.org"),
            ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT),
            ap_gateway._pps.getPublicSubscribersNode(ap_gateway._m.namespace),
            None,
            None,
            None
        )
        assert len(items) == 2
        for idx, entity in enumerate((
                "local_user@test.example",
                "ext_user\\40example.org@ap.test.example"
        )):
            subscriber_elt = next(items[idx].elements(NS_PPS, "subscriber"), None)
            assert subscriber_elt is not None
            assert subscriber_elt["jid"] == entity

    @ed
    async def test_pps_to_following(self, ap_gateway, monkeypatch):
        """Public Pubsub Subscription subscriptions are converted to AP following items"""
        subscriptions = [
            pubsub.Item(
                id="subscription_1",
                payload = ap_gateway._pps.buildSubscriptionElt(
                    ap_gateway._m.namespace,
                    jid.JID("local_user@test.example")
                )
            ),
            pubsub.Item(
                id="subscription_2",
                payload = ap_gateway._pps.buildSubscriptionElt(
                    ap_gateway._m.namespace,
                    jid.JID("ext_user\\40example.org@ap.test.example")
                )
            )
        ]
        monkeypatch.setattr(ap_gateway._p, "getItems", partial(
            mock_getItems,
            ret_items=subscriptions
        ))
        following = await ap_gateway.server.resource.APFollowingRequest(
            **self.ap_request_params(ap_gateway, "following")
        )
        assert following["@context"] == "https://www.w3.org/ns/activitystreams"
        assert following["id"] == "https://test.example/_ap/following/some_user%40test.example"
        assert following["totalItems"] == len(subscriptions)
        assert following["type"] == "OrderedCollection"
        assert following.get("first")

        first_page = following["first"]
        assert first_page["type"] == "OrderedCollectionPage"
        assert len(first_page["orderedItems"]) == len(subscriptions)
        items = first_page["orderedItems"]
        assert items == ['local_user@test.example', 'ext_user@example.org']

    @ed
    async def test_pps_to_followers(self, ap_gateway, monkeypatch):
        """Public Pubsub Subscription subscribers are converted to AP followers"""
        subscribers = [
            pubsub.Item(
                id="subscriber_1",
                payload = ap_gateway._pps.buildSubscriberElt(
                    jid.JID("local_user@test.example")
                )
            ),
            pubsub.Item(
                id="subscriber_2",
                payload = ap_gateway._pps.buildSubscriberElt(
                    jid.JID("ext_user\\40example.org@ap.test.example")
                )
            )
        ]
        monkeypatch.setattr(ap_gateway._p, "getItems", partial(
            mock_getItems,
            ret_items=subscribers
        ))
        followers = await ap_gateway.server.resource.APFollowersRequest(
            **self.ap_request_params(ap_gateway, "followers")
        )
        assert followers["@context"] == "https://www.w3.org/ns/activitystreams"
        assert followers["id"] == "https://test.example/_ap/followers/some_user%40test.example"
        assert followers["totalItems"] == len(subscribers)
        assert followers["type"] == "OrderedCollection"
        assert followers.get("first")

        first_page = followers["first"]
        assert first_page["type"] == "OrderedCollectionPage"
        assert len(first_page["orderedItems"]) == len(subscribers)
        items = first_page["orderedItems"]
        assert items == ['local_user@test.example', 'ext_user@example.org']

    @ed
    async def test_xmpp_message_to_ap_direct_message(self, ap_gateway, monkeypatch):
        """XMPP message are sent as AP direct message"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)
        mess_data = {
            "from": TEST_JID,
            "to": ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT),
            "type": "chat",
            "message": {"": "This is a test message."},
            "extra": {
                "origin-id": "123"
            }
        }
        with patch.object(ap_gateway, "signAndPost") as signAndPost:
            await ap_gateway.onMessage(ap_gateway.client, mess_data)
            url, actor_id, doc = signAndPost.call_args[0]
        assert url == "https://example.org/users/test_user/inbox"
        assert actor_id == "https://test.example/_ap/actor/some_user%40test.example"
        obj = doc["object"]
        assert doc["@context"] == "https://www.w3.org/ns/activitystreams"
        assert doc["actor"] == "https://test.example/_ap/actor/some_user%40test.example"
        assert obj["type"] == "Note"
        assert obj["content"] == "This is a test message."
        assert obj["attributedTo"] == (
            "https://test.example/_ap/actor/some_user%40test.example"
        )
        # we must have a direct message, thus the item must be only addressed to destinee
        # ("to" attribute of the message), and the "Public" namespace must not be set
        assert doc["to"] == ["https://example.org/users/test_user"]
        assert obj["to"] == ["https://example.org/users/test_user"]
        for field in ("bto", "cc", "bcc", "audience"):
            assert field not in doc
            assert field not in obj

    @ed
    async def test_ap_direct_message_to_xmpp_message(self, ap_gateway, monkeypatch):
        """AP direct message are sent as XMPP message (not Pubsub)"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)
        # we have to patch DeferredList to not wait forever
        monkeypatch.setattr(defer, "DeferredList", AsyncMock())

        xmpp_actor_id = ap_gateway.buildAPURL(ap_const.TYPE_ACTOR, TEST_JID.userhost())
        direct_ap_message = {
            'attributedTo': TEST_AP_ACTOR_ID,
            'cc': [],
            'content': '<p>test direct message</p>',
            'contentMap': {'en': '<p>test direct message</p>'},
            'id': f'{TEST_AP_ACTOR_ID}/statuses/123',
            'published': '2022-05-20T08:14:39Z',
            'to': [xmpp_actor_id],
            'type': 'Note',
        }
        client = ap_gateway.client.getVirtualClient(
            ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT)
        )
        with patch.object(client, "sendMessage") as sendMessage:
            await ap_gateway.newAPItem(
                client, None, ap_gateway._m.namespace, direct_ap_message
            )

        # sendMessage must be called for <message> stanza, and the "message" argument must
        # be set to the content of the original AP message
        assert sendMessage.called
        assert sendMessage.call_args.args[0] == TEST_JID
        assert sendMessage.call_args.args[1] == {"": "test direct message"}

    @ed
    async def test_pubsub_retract_to_ap_delete(self, ap_gateway, monkeypatch):
        """Pubsub retract requests are converted to AP delete activity"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)
        retract_id = "retract_123"
        retract_elt = domish.Element((pubsub.NS_PUBSUB_EVENT, "retract"))
        retract_elt["id"] = retract_id
        items_event = pubsub.ItemsEvent(
            sender=TEST_JID,
            recipient=ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT),
            nodeIdentifier=ap_gateway._m.namespace,
            items=[retract_elt],
            headers={}
        )
        with patch.object(ap_gateway, "signAndPost") as signAndPost:
            signAndPost.return_value = FakeTReqPostResponse()
            # we simulate the reception of a retract event
            await ap_gateway._itemsReceived(ap_gateway.client, items_event)
            url, actor_id, doc = signAndPost.call_args[0]
        jid_account = await ap_gateway.getAPAccountFromJidAndNode(TEST_JID, None)
        jid_actor_id = ap_gateway.buildAPURL(ap_const.TYPE_ACTOR, jid_account)
        assert url == f"{TEST_BASE_URL}/users/{TEST_USER}/inbox"
        assert actor_id == jid_actor_id
        assert doc["type"] == "Delete"
        assert doc["actor"] == jid_actor_id
        obj = doc["object"]
        assert obj["type"] == ap_const.TYPE_TOMBSTONE
        url_item_id = ap_gateway.buildAPURL(ap_const.TYPE_ITEM, jid_account, retract_id)
        assert obj["id"] == url_item_id

    @ed
    async def test_ap_delete_to_pubsub_retract(self, ap_gateway):
        """AP delete activity is converted to pubsub retract"""
        client = ap_gateway.client.getVirtualClient(
            ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT)
        )

        ap_item = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "actor": TEST_AP_ACTOR_ID,
            "id": "https://test.example/retract_123",
            "type": "Delete",
            "object": {"id": f"{TEST_AP_ACTOR_ID}/item/123",
                       "type": "Tombstone"},
            "to": ["https://www.w3.org/ns/activitystreams#Public"]
        }
        with patch.multiple(
            ap_gateway.host.memory.storage,
            get=DEFAULT,
            getPubsubNode=DEFAULT,
            deletePubsubItems=DEFAULT,
        ) as mock_objs:
            mock_objs["get"].return_value=None
            cached_node = MagicMock()
            mock_objs["getPubsubNode"].return_value=cached_node
            subscription = MagicMock()
            subscription.state = SubscriptionState.SUBSCRIBED
            subscription.subscriber = TEST_JID
            cached_node.subscriptions = [subscription]
            with patch.object(
                ap_gateway.pubsub_service, "notifyRetract"
            ) as notifyRetract:
                # we simulate a received Delete activity
                await ap_gateway.newAPDeleteItem(
                    client=client,
                    destinee=None,
                    node=ap_gateway._m.namespace,
                    item=ap_item
                )

        # item is deleted from database
        deletePubsubItems = mock_objs["deletePubsubItems"]
        assert deletePubsubItems.call_count == 1
        assert deletePubsubItems.call_args.args[1] == [ap_item["id"]]

        # retraction notification is sent to subscribers
        assert notifyRetract.call_count == 1
        assert notifyRetract.call_args.args[0] == client.jid
        assert notifyRetract.call_args.args[1] == ap_gateway._m.namespace
        notifications = notifyRetract.call_args.args[2]
        assert len(notifications) == 1
        subscriber, __, item_elts = notifications[0]
        assert subscriber == TEST_JID
        assert len(item_elts) == 1
        item_elt = item_elts[0]
        assert isinstance(item_elt, domish.Element)
        assert item_elt.name == "item"
        assert item_elt["id"] == ap_item["id"]

    @ed
    async def test_message_retract_to_ap_delete(self, ap_gateway, monkeypatch):
        """Message retract requests are converted to AP delete activity"""
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get)
        monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json)
        monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get)
        # origin ID is the ID of the message to retract
        origin_id = "mess_retract_123"

        # we call retractByOriginId to get the message element of a retraction request
        fake_client = MagicMock()
        fake_client.jid = TEST_JID
        dest_jid = ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT)
        ap_gateway._r.retractByOriginId(fake_client, dest_jid, origin_id)
        # message_retract_elt is the message which would be sent for a retraction
        message_retract_elt = fake_client.send.call_args.args[0]
        apply_to_elt = next(message_retract_elt.elements(NS_FASTEN, "apply-to"))
        retract_elt = apply_to_elt.retract

        with patch.object(ap_gateway, "signAndPost") as signAndPost:
            signAndPost.return_value = FakeTReqPostResponse()
            fake_fastened_elts = MagicMock()
            fake_fastened_elts.id = origin_id
            # we simulate the reception of a retract event using the message element that
            # we generated above
            await ap_gateway._onMessageRetract(
                ap_gateway.client,
                message_retract_elt,
                retract_elt,
                fake_fastened_elts
            )
            url, actor_id, doc = signAndPost.call_args[0]

        # the AP delete activity must have been sent through signAndPost
        # we check its values
        jid_account = await ap_gateway.getAPAccountFromJidAndNode(TEST_JID, None)
        jid_actor_id = ap_gateway.buildAPURL(ap_const.TYPE_ACTOR, jid_account)
        assert url == f"{TEST_BASE_URL}/users/{TEST_USER}/inbox"
        assert actor_id == jid_actor_id
        assert doc["type"] == "Delete"
        assert doc["actor"] == jid_actor_id
        obj = doc["object"]
        assert obj["type"] == ap_const.TYPE_TOMBSTONE
        url_item_id = ap_gateway.buildAPURL(ap_const.TYPE_ITEM, jid_account, origin_id)
        assert obj["id"] == url_item_id

    @ed
    async def test_ap_delete_to_message_retract(self, ap_gateway, monkeypatch):
        """AP delete activity is converted to message retract"""
        # note: a message retract is used when suitable message is found in history,
        # otherwise it should be in pubsub cache and it's a pubsub retract (tested above
        # by ``test_ap_delete_to_pubsub_retract``)

        # we don't want actual queries in database
        retractDBHistory = AsyncMock()
        monkeypatch.setattr(ap_gateway._r, "retractDBHistory", retractDBHistory)

        client = ap_gateway.client.getVirtualClient(
            ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT)
        )
        fake_send = MagicMock()
        monkeypatch.setattr(client, "send", fake_send)

        ap_item = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "actor": TEST_AP_ACTOR_ID,
            "id": "https://test.example/retract_123",
            "type": "Delete",
            "object": {"id": f"{TEST_AP_ACTOR_ID}/item/123",
                       "type": "Tombstone"},
            "to": ["https://www.w3.org/ns/activitystreams#Public"]
        }
        with patch.object(ap_gateway.host.memory.storage, "get") as storage_get:
            fake_history = MagicMock()
            fake_history.source_jid = client.jid
            fake_history.dest_jid = TEST_JID
            fake_history.origin_id = ap_item["id"]
            storage_get.return_value = fake_history
            # we simulate a received Delete activity
            await ap_gateway.newAPDeleteItem(
                client=client,
                destinee=None,
                node=ap_gateway._m.namespace,
                item=ap_item
            )

        # item is deleted from database
        assert retractDBHistory.call_count == 1
        assert retractDBHistory.call_args.args[0] == client
        assert retractDBHistory.call_args.args[1] == fake_history

        # retraction notification is sent to destinee
        assert fake_send.call_count == 1
        sent_elt = fake_send.call_args.args[0]
        assert sent_elt.name == "message"
        assert sent_elt["from"] == client.jid.full()
        assert sent_elt["to"] == TEST_JID.full()
        apply_to_elt = next(sent_elt.elements(NS_FASTEN, "apply-to"))
        assert apply_to_elt["id"] == ap_item["id"]
        retract_elt = apply_to_elt.retract
        assert retract_elt is not None
        assert retract_elt.uri == NS_MESSAGE_RETRACT