Mercurial > libervia-backend
view tests/unit/test_ap-gateway.py @ 3794:a58715ffa445
core (constants): type hints
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 14:15:23 +0200 |
parents | 0b54be42d0aa |
children | 39fc2e1b3793 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from copy import deepcopy from unittest.mock import MagicMock, AsyncMock, patch from urllib import parse from functools import partial import pytest from pytest_twisted import ensureDeferred as ed from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.web.server import Request from twisted.words.xish import domish from wokkel import rsm, pubsub from sat.core import exceptions from sat.core.constants import Const as C from sat.plugins import plugin_comp_ap_gateway from sat.plugins.plugin_comp_ap_gateway import constants as ap_const from sat.plugins.plugin_comp_ap_gateway.http_server import HTTPServer from sat.plugins.plugin_xep_0277 import NS_ATOM from sat.plugins.plugin_xep_0465 import NS_PPS from sat.tools.utils import xmpp_date from sat.tools import xml_tools from sat.plugins.plugin_comp_ap_gateway import TYPE_ACTOR TEST_BASE_URL = "https://example.org" TEST_USER = "test_user" TEST_AP_ACCOUNT = f"{TEST_USER}@example.org" TEST_AP_ACTOR_ID = f"{TEST_BASE_URL}/users/{TEST_USER}" PUBLIC_URL = "test.example" TEST_JID = jid.JID(f"some_user@{PUBLIC_URL}") AP_REQUESTS = { f"{TEST_BASE_URL}/.well-known/webfinger?" f"resource=acct:{parse.quote(TEST_AP_ACCOUNT)}": { "aliases": [ f"{TEST_BASE_URL}/@{TEST_USER}", f"{TEST_BASE_URL}/users/{TEST_USER}" ], "links": [ { "href": f"{TEST_BASE_URL}/users/{TEST_USER}", "rel": "self", "type": "application/activity+json" }, ], "subject": f"acct:{TEST_AP_ACCOUNT}" }, f"{TEST_BASE_URL}/users/{TEST_USER}": { "@context": [ "https://www.w3.org/ns/activitystreams", ], "endpoints": { "sharedInbox": f"{TEST_BASE_URL}/inbox" }, "followers": f"{TEST_BASE_URL}/users/{TEST_USER}/followers", "following": f"{TEST_BASE_URL}/users/{TEST_USER}/following", "id": f"{TEST_BASE_URL}/users/{TEST_USER}", "inbox": f"{TEST_BASE_URL}/users/{TEST_USER}/inbox", "name": "", "outbox": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox", "preferredUsername": f"{TEST_USER}", "type": "Person", "url": f"{TEST_BASE_URL}/@{TEST_USER}" }, f"{TEST_BASE_URL}/.well-known/webfinger?" f"resource=acct:{parse.quote('ext_user@example.org')}": { "aliases": [ f"{TEST_BASE_URL}/@ext_user", f"{TEST_BASE_URL}/users/ext_user" ], "links": [ { "href": f"{TEST_BASE_URL}/users/ext_user", "rel": "self", "type": "application/activity+json" }, ], "subject": f"acct:ext_user@example.org" }, f"{TEST_BASE_URL}/users/ext_user": { "@context": [ "https://www.w3.org/ns/activitystreams", ], "endpoints": { "sharedInbox": f"{TEST_BASE_URL}/inbox" }, "followers": f"{TEST_BASE_URL}/users/ext_user/followers", "following": f"{TEST_BASE_URL}/users/ext_user/following", "id": f"{TEST_BASE_URL}/users/ext_user", "inbox": f"{TEST_BASE_URL}/users/ext_user/inbox", "name": "", "outbox": f"{TEST_BASE_URL}/users/ext_user/outbox", "preferredUsername": f"ext_user", "type": "Person", "url": f"{TEST_BASE_URL}/@ext_user" }, f"{TEST_BASE_URL}/users/{TEST_USER}/outbox": { "@context": "https://www.w3.org/ns/activitystreams", "first": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true", "id": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox", "last": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true", "totalItems": 4, "type": "OrderedCollection" }, f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true": { "@context": [ "https://www.w3.org/ns/activitystreams", ], "id": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox?page=true", "orderedItems": [ { "actor": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/1/activity", "object": { "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "content": "<p>test message 1</p>", "contentMap": { "en": "<p>test message 1</p>" }, "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/1", "inReplyTo": None, "published": "2021-12-16T17:28:03Z", "sensitive": False, "summary": None, "tag": [], "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Note", "url": f"{TEST_BASE_URL}/@{TEST_USER}/1" }, "published": "2021-12-16T17:28:03Z", "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Create" }, { "actor": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/2/activity", "object": { "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "content": "<p>test message 2</p>", "contentMap": { "en": "<p>test message 2</p>" }, "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/2", "inReplyTo": None, "published": "2021-12-16T17:27:03Z", "sensitive": False, "summary": None, "tag": [], "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Note", "url": f"{TEST_BASE_URL}/@{TEST_USER}/2" }, "published": "2021-12-16T17:27:03Z", "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Create" }, { "actor": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/3/activity", "object": { "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "content": "<p>test message 3</p>", "contentMap": { "en": "<p>test message 3</p>" }, "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/3", "inReplyTo": None, "published": "2021-12-16T17:26:03Z", "sensitive": False, "summary": None, "tag": [], "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Note", "url": f"{TEST_BASE_URL}/@{TEST_USER}/3" }, "published": "2021-12-16T17:26:03Z", "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Create" }, { "actor": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/4/activity", "object": { "attributedTo": f"{TEST_BASE_URL}/users/{TEST_USER}", "cc": [ f"{TEST_BASE_URL}/users/{TEST_USER}/followers", ], "content": "<p>test message 4</p>", "contentMap": { "en": "<p>test message 4</p>" }, "id": f"{TEST_BASE_URL}/users/{TEST_USER}/statuses/4", "inReplyTo": None, "published": "2021-12-16T17:25:03Z", "sensitive": False, "summary": None, "tag": [], "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Note", "url": f"{TEST_BASE_URL}/@{TEST_USER}/4" }, "published": "2021-12-16T17:25:03Z", "to": [ "https://www.w3.org/ns/activitystreams#Public" ], "type": "Create" }, ], "partOf": f"{TEST_BASE_URL}/users/{TEST_USER}/outbox", "prev": None, "type": "OrderedCollectionPage" }, f"{TEST_BASE_URL}/users/{TEST_USER}/following": { "@context": "https://www.w3.org/ns/activitystreams", "first": f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1", "id": f"{TEST_BASE_URL}/users/{TEST_USER}/following", "totalItems": 2, "type": "OrderedCollection" }, f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1": { "@context": "https://www.w3.org/ns/activitystreams", "id": f"{TEST_BASE_URL}/users/{TEST_USER}/following?page=1", "orderedItems": [ f"{TEST_BASE_URL}/users/ext_user", f"https://{PUBLIC_URL}/_ap/{TYPE_ACTOR}/local_user%40{PUBLIC_URL}", ], "partOf": "{TEST_BASE_URL}/users/{TEST_USER}/following", "totalItems": 2, "type": "OrderedCollectionPage" }, f"{TEST_BASE_URL}/users/{TEST_USER}/followers": { "@context": "https://www.w3.org/ns/activitystreams", "first": f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1", "id": f"{TEST_BASE_URL}/users/{TEST_USER}/followers", "totalItems": 2, "type": "OrderedCollection" }, f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1": { "@context": "https://www.w3.org/ns/activitystreams", "id": f"{TEST_BASE_URL}/users/{TEST_USER}/followers?page=1", "orderedItems": [ f"{TEST_BASE_URL}/users/ext_user", f"https://{PUBLIC_URL}/_ap/{TYPE_ACTOR}/local_user%40{PUBLIC_URL}", ], "partOf": "{TEST_BASE_URL}/users/{TEST_USER}/followers", "totalItems": 2, "type": "OrderedCollectionPage" }, } XMPP_ITEM_TPL = """ <item id='{id}' publisher='{publisher_jid}'> <entry xmlns='http://www.w3.org/2005/Atom' xml:lang='en'> <title type='xhtml'> <div xmlns='http://www.w3.org/1999/xhtml'> <p> XMPP item {id} </p> </div> </title> <title type='text'> XMPP item {id} </title> <author> <name> test_user </name> <uri> xmpp:{publisher_jid} </uri> </author> <updated> {updated} </updated> <published> {published} </published> <id> xmpp:{publisher_jid}?;node=urn%3Axmpp%3Amicroblog%3A0;item={id} </id> </entry> </item> """ ITEM_BASE_TS = 1643385499 XMPP_ITEMS = [ xml_tools.parse( "".join( l.strip() for l in XMPP_ITEM_TPL.format( id=i, publisher_jid="some_user@test.example", updated=xmpp_date(ITEM_BASE_TS + i * 60), published=xmpp_date(ITEM_BASE_TS + i * 60), ).split("\n") ), namespace=pubsub.NS_PUBSUB ) for i in range(1, 5) ] async def mock_ap_get(url): return deepcopy(AP_REQUESTS[url]) async def mock_treq_json(data): return dict(data) async def mock_getItems(*args, **kwargs): ret_items = kwargs.pop("ret_items", XMPP_ITEMS) rsm_resp = rsm.RSMResponse( first=ret_items[0]["id"], last=ret_items[-1]["id"], index=0, count=len(ret_items) ) return ret_items, {"rsm": rsm_resp.toDict(), "complete": True} def getVirtualClient(jid): client = MagicMock() client.jid = jid return client @pytest.fixture(scope="session") def ap_gateway(host): gateway = plugin_comp_ap_gateway.APGateway(host) gateway.initialised = True gateway.isPubsub = AsyncMock() gateway.isPubsub.return_value = False client = MagicMock() client.jid = jid.JID("ap.test.example") client.host = "test.example" client.getVirtualClient = getVirtualClient gateway.client = client gateway.local_only = True gateway.public_url = PUBLIC_URL gateway.ap_path = '_ap' gateway.base_ap_url = parse.urljoin( f"https://{gateway.public_url}", f"{gateway.ap_path}/" ) gateway.server = HTTPServer(gateway) return gateway class TestActivityPubGateway: def getTitleXHTML(self, item_elt: domish.Element) -> domish.Element: return next( t for t in item_elt.entry.elements(NS_ATOM, "title") if t.getAttribute("type") == "xhtml" ) @ed async def test_jid_and_node_convert_to_ap_handle(self, ap_gateway): """JID and pubsub node are converted correctly to an AP actor handle""" get_account = ap_gateway.getAPAccountFromJidAndNode # local jid assert await get_account( jid_ = jid.JID("simple@test.example"), node = None ) == "simple@test.example" # non local jid assert await get_account( jid_ = jid.JID("simple@example.org"), node = None ) == "___simple.40example.2eorg@ap.test.example" # local jid with non microblog node assert await get_account( jid_ = jid.JID("simple@test.example"), node = "some_other_node" ) == "some_other_node---simple@test.example" # local pubsub node with patch.object(ap_gateway, "isPubsub") as isPubsub: isPubsub.return_value = True assert await get_account( jid_ = jid.JID("pubsub.test.example"), node = "some_node" ) == "some_node@pubsub.test.example" # non local pubsub node with patch.object(ap_gateway, "isPubsub") as isPubsub: isPubsub.return_value = True assert await get_account( jid_ = jid.JID("pubsub.example.org"), node = "some_node" ) == "___some_node.40pubsub.2eexample.2eorg@ap.test.example" @ed async def test_ap_handle_convert_to_jid_and_node(self, ap_gateway, monkeypatch): """AP actor handle convert correctly to JID and pubsub node""" get_jid_node = ap_gateway.getJIDAndNode # for following assertion, host is not a pubsub service with patch.object(ap_gateway, "isPubsub") as isPubsub: isPubsub.return_value = False # simple local jid assert await get_jid_node( "toto@test.example" ) == (jid.JID("toto@test.example"), None) # simple external jid ## with "local_only" set, it should raise an exception with pytest.raises(exceptions.PermissionError): await get_jid_node("toto@example.org") ## with "local_only" unset, it should work with monkeypatch.context() as m: m.setattr(ap_gateway, "local_only", False, raising=True) assert await get_jid_node( "toto@example.org" ) == (jid.JID("toto@example.org"), None) # explicit node assert await get_jid_node( "tata---toto@test.example" ) == (jid.JID("toto@test.example"), "tata") # for following assertion, host is a pubsub service with patch.object(ap_gateway, "isPubsub") as isPubsub: isPubsub.return_value = True # simple local node assert await get_jid_node( "toto@pubsub.test.example" ) == (jid.JID("pubsub.test.example"), "toto") # encoded local node assert await get_jid_node( "___urn.3axmpp.3amicroblog.3a0@pubsub.test.example" ) == (jid.JID("pubsub.test.example"), "urn:xmpp:microblog:0") @ed async def test_ap_to_pubsub_conversion(self, ap_gateway, monkeypatch): """AP requests are converted to pubsub""" monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get) monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json) monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get) actor_data = await ap_gateway.getAPActorDataFromAccount(TEST_AP_ACCOUNT) outbox = await ap_gateway.apGetObject(actor_data, "outbox") items, rsm_resp = await ap_gateway.getAPItems(outbox, 2) assert rsm_resp.count == 4 assert rsm_resp.index == 0 assert rsm_resp.first == "https://example.org/users/test_user/statuses/4" assert rsm_resp.last == "https://example.org/users/test_user/statuses/3" title_xhtml = self.getTitleXHTML(items[0]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 4</p></div>" "</title>" ) author_uri = str( [e for e in items[0].entry.author.elements() if e.name == "uri"][0] ) assert author_uri == "xmpp:test_user\\40example.org@ap.test.example" assert str(items[0].entry.published) == "2021-12-16T17:25:03Z" title_xhtml = self.getTitleXHTML(items[1]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 3</p></div>" "</title>" ) author_uri = str( [e for e in items[1].entry.author.elements() if e.name == "uri"][0] ) assert author_uri == "xmpp:test_user\\40example.org@ap.test.example" assert str(items[1].entry.published) == "2021-12-16T17:26:03Z" items, rsm_resp = await ap_gateway.getAPItems( outbox, max_items=2, after_id="https://example.org/users/test_user/statuses/3", ) assert rsm_resp.count == 4 assert rsm_resp.index == 2 assert rsm_resp.first == "https://example.org/users/test_user/statuses/2" assert rsm_resp.last == "https://example.org/users/test_user/statuses/1" title_xhtml = self.getTitleXHTML(items[0]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 2</p></div>" "</title>" ) author_uri = str( [e for e in items[0].entry.author.elements() if e.name == "uri"][0] ) assert author_uri == "xmpp:test_user\\40example.org@ap.test.example" assert str(items[0].entry.published) == "2021-12-16T17:27:03Z" title_xhtml = self.getTitleXHTML(items[1]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 1</p></div>" "</title>" ) author_uri = str( [e for e in items[1].entry.author.elements() if e.name == "uri"][0] ) assert author_uri == "xmpp:test_user\\40example.org@ap.test.example" assert str(items[1].entry.published) == "2021-12-16T17:28:03Z" items, rsm_resp = await ap_gateway.getAPItems( outbox, max_items=1, start_index=2 ) assert rsm_resp.count == 4 assert rsm_resp.index == 2 assert rsm_resp.first == "https://example.org/users/test_user/statuses/2" assert rsm_resp.last == "https://example.org/users/test_user/statuses/2" assert len(items) == 1 title_xhtml = self.getTitleXHTML(items[0]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 2</p></div>" "</title>" ) assert str(items[0].entry.published) == "2021-12-16T17:27:03Z" items, rsm_resp = await ap_gateway.getAPItems( outbox, max_items=3, chronological_pagination=False ) assert rsm_resp.count == 4 assert rsm_resp.index == 1 assert rsm_resp.first == "https://example.org/users/test_user/statuses/3" assert rsm_resp.last == "https://example.org/users/test_user/statuses/1" assert len(items) == 3 title_xhtml = self.getTitleXHTML(items[0]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 3</p></div>" "</title>" ) title_xhtml = self.getTitleXHTML(items[2]) assert title_xhtml.toXml() == ( "<title xmlns='http://www.w3.org/2005/Atom' type='xhtml'>" "<div xmlns='http://www.w3.org/1999/xhtml'><p>test message 1</p></div>" "</title>" ) def ap_request_params(self, ap_gateway, type_=None, url=None, query_data=None): """Generate parameters for HTTPAPGServer's AP*Request @param type_: one of the AP query type (e.g. "outbox") @param url: URL to query (mutually exclusif with type_) @param query_data: query data as returned by parse.parse_qs @return dict with kw params to use """ assert type_ and url is None or url and type_ is None if type_ is not None: path = f"_ap/{type_}/some_user%40test.example" else: url_parsed = parse.urlparse(url) path = url_parsed.path.lstrip("/") type_ = path.split("/")[1] if query_data is None: query_data = parse.parse_qs(url_parsed.query) if query_data: uri = f"{path}?{parse.urlencode(query_data, doseq=True)}" else: uri = path test_jid = jid.JID("some_user@test.example") request = Request(MagicMock()) request.path = path.encode() request.uri = uri.encode() ap_url = parse.urljoin( f"https://{ap_gateway.public_url}", path ) kwargs = { "request": request, "account_jid": test_jid, "node": None, "ap_account": test_jid.full(), "ap_url": ap_url, "signing_actor": None } if type_ == "outbox" and query_data: kwargs["query_data"] = query_data # signing_actor is not used for page requests del kwargs["signing_actor"] return kwargs @ed async def test_pubsub_to_ap_conversion(self, ap_gateway, monkeypatch): """Pubsub nodes are converted to AP collections""" monkeypatch.setattr(ap_gateway._p, "getItems", mock_getItems) outbox = await ap_gateway.server.resource.APOutboxRequest( **self.ap_request_params(ap_gateway, "outbox") ) assert outbox["@context"] == "https://www.w3.org/ns/activitystreams" assert outbox["id"] == "https://test.example/_ap/outbox/some_user%40test.example" assert outbox["totalItems"] == len(XMPP_ITEMS) assert outbox["type"] == "OrderedCollection" assert outbox["first"] assert outbox["last"] first_page = await ap_gateway.server.resource.APOutboxPageRequest( **self.ap_request_params(ap_gateway, url=outbox["first"]) ) assert first_page["@context"] == "https://www.w3.org/ns/activitystreams" assert first_page["id"] == "https://test.example/_ap/outbox/some_user%40test.example?page=first" assert first_page["type"] == "OrderedCollectionPage" assert first_page["partOf"] == outbox["id"] assert len(first_page["orderedItems"]) == len(XMPP_ITEMS) first_item = first_page["orderedItems"][0] assert first_item["@context"] == "https://www.w3.org/ns/activitystreams" assert first_item["id"] == "https://test.example/_ap/item/some_user%40test.example/4" assert first_item["actor"] == "https://test.example/_ap/actor/some_user%40test.example" assert first_item["type"] == "Create" first_item_obj = first_item["object"] assert first_item_obj["id"] == first_item["id"] assert first_item_obj["type"] == "Note" assert first_item_obj["published"] == "2022-01-28T16:02:19Z" assert first_item_obj["attributedTo"] == first_item["actor"] assert first_item_obj["content"] == "<div><p>XMPP item 4</p></div>" assert first_item_obj["to"] == ["https://www.w3.org/ns/activitystreams#Public"] @ed async def test_following_to_pps(self, ap_gateway, monkeypatch): """AP following items are converted to Public Pubsub Subscription subscriptions""" monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get) monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json) monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get) items, __ = await ap_gateway.pubsub_service.items( jid.JID("toto@example.org"), ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT), ap_gateway._pps.subscriptions_node, None, None, None ) assert len(items) == 2 for idx, entity in enumerate(( "local_user@test.example", "ext_user\\40example.org@ap.test.example" )): subscription_elt = next(items[idx].elements(NS_PPS, "subscription"), None) assert subscription_elt is not None assert subscription_elt["node"] == ap_gateway._m.namespace assert subscription_elt["service"] == entity @ed async def test_followers_to_pps(self, ap_gateway, monkeypatch): """AP followers items are converted to Public Pubsub Subscription subscribers""" monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get) monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json) monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get) items, __ = await ap_gateway.pubsub_service.items( jid.JID("toto@example.org"), ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT), ap_gateway._pps.getPublicSubscribersNode(ap_gateway._m.namespace), None, None, None ) assert len(items) == 2 for idx, entity in enumerate(( "local_user@test.example", "ext_user\\40example.org@ap.test.example" )): subscriber_elt = next(items[idx].elements(NS_PPS, "subscriber"), None) assert subscriber_elt is not None assert subscriber_elt["jid"] == entity @ed async def test_pps_to_following(self, ap_gateway, monkeypatch): """Public Pubsub Subscription subscriptions are converted to AP following items""" subscriptions = [ pubsub.Item( id="subscription_1", payload = ap_gateway._pps.buildSubscriptionElt( ap_gateway._m.namespace, jid.JID("local_user@test.example") ) ), pubsub.Item( id="subscription_2", payload = ap_gateway._pps.buildSubscriptionElt( ap_gateway._m.namespace, jid.JID("ext_user\\40example.org@ap.test.example") ) ) ] monkeypatch.setattr(ap_gateway._p, "getItems", partial( mock_getItems, ret_items=subscriptions )) following = await ap_gateway.server.resource.APFollowingRequest( **self.ap_request_params(ap_gateway, "following") ) assert following["@context"] == "https://www.w3.org/ns/activitystreams" assert following["id"] == "https://test.example/_ap/following/some_user%40test.example" assert following["totalItems"] == len(subscriptions) assert following["type"] == "OrderedCollection" assert following.get("first") first_page = following["first"] assert first_page["type"] == "OrderedCollectionPage" assert len(first_page["orderedItems"]) == len(subscriptions) items = first_page["orderedItems"] assert items == ['local_user@test.example', 'ext_user@example.org'] @ed async def test_pps_to_followers(self, ap_gateway, monkeypatch): """Public Pubsub Subscription subscribers are converted to AP followers""" subscribers = [ pubsub.Item( id="subscriber_1", payload = ap_gateway._pps.buildSubscriberElt( jid.JID("local_user@test.example") ) ), pubsub.Item( id="subscriber_2", payload = ap_gateway._pps.buildSubscriberElt( jid.JID("ext_user\\40example.org@ap.test.example") ) ) ] monkeypatch.setattr(ap_gateway._p, "getItems", partial( mock_getItems, ret_items=subscribers )) followers = await ap_gateway.server.resource.APFollowersRequest( **self.ap_request_params(ap_gateway, "followers") ) assert followers["@context"] == "https://www.w3.org/ns/activitystreams" assert followers["id"] == "https://test.example/_ap/followers/some_user%40test.example" assert followers["totalItems"] == len(subscribers) assert followers["type"] == "OrderedCollection" assert followers.get("first") first_page = followers["first"] assert first_page["type"] == "OrderedCollectionPage" assert len(first_page["orderedItems"]) == len(subscribers) items = first_page["orderedItems"] assert items == ['local_user@test.example', 'ext_user@example.org'] @ed async def test_xmpp_message_to_ap_direct_message(self, ap_gateway, monkeypatch): """XMPP message are sent as AP direct message""" monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get) monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json) monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get) message_elt = domish.Element((None, "message")) message_elt.addElement("body", content="This is a test message.") message_elt["from"] = TEST_JID.full() message_elt["to"] = ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT).full() with patch.object(ap_gateway, "signAndPost") as signAndPost: await ap_gateway.onMessage(message_elt) url, actor_id, doc = signAndPost.call_args[0] assert url == "https://example.org/users/test_user/inbox" assert actor_id == "https://test.example/_ap/actor/some_user%40test.example" obj = doc["object"] assert doc["@context"] == "https://www.w3.org/ns/activitystreams" assert doc["actor"] == "https://test.example/_ap/actor/some_user%40test.example" assert obj["type"] == "Note" assert obj["content"] == "This is a test message." assert obj["attributedTo"] == ( "https://test.example/_ap/actor/some_user%40test.example" ) # we must have a direct message, thus the item must be only addressed to destinee # ("to" attribute of the message), and the "Public" namespace must not be set assert doc["to"] == ["https://example.org/users/test_user"] assert obj["to"] == ["https://example.org/users/test_user"] for field in ("bto", "cc", "bcc", "audience"): assert field not in doc assert field not in obj @ed async def test_ap_direct_message_to_xmpp_message(self, ap_gateway, monkeypatch): """AP direct message are sent as XMPP message (not Pubsub)""" monkeypatch.setattr(plugin_comp_ap_gateway.treq, "get", mock_ap_get) monkeypatch.setattr(plugin_comp_ap_gateway.treq, "json_content", mock_treq_json) monkeypatch.setattr(ap_gateway, "apGet", mock_ap_get) # we have to patch DeferredList to not wait forever monkeypatch.setattr(defer, "DeferredList", AsyncMock()) xmpp_actor_id = ap_gateway.buildAPURL(ap_const.TYPE_ACTOR, TEST_JID.userhost()) direct_ap_message = { 'attributedTo': TEST_AP_ACTOR_ID, 'cc': [], 'content': '<p>test direct message</p>', 'contentMap': {'en': '<p>test direct message</p>'}, 'id': f'{TEST_AP_ACTOR_ID}/statuses/123', 'published': '2022-05-20T08:14:39Z', 'to': [xmpp_actor_id], 'type': 'Note', } client = ap_gateway.client.getVirtualClient( ap_gateway.getLocalJIDFromAccount(TEST_AP_ACCOUNT) ) with patch.object(client, "sendMessage") as sendMessage: await ap_gateway.newAPItem( client, None, ap_gateway._m.namespace, direct_ap_message ) # sendMessage must be called for <message> stanza, and the "message" argument must # be set to the content of the original AP message assert sendMessage.called assert sendMessage.call_args.args[0] == TEST_JID assert sendMessage.call_args.args[1] == {"": "test direct message"}