changeset 3728:b15644cae50d

component AP gateway: JID/node ⟺ AP outbox conversion: - convert a combination of JID and optional pubsub node to AP actor handle (see `getJIDAndNode` for details) and vice versa - the gateway now provides a Pubsub service - retrieve pubsub node and convert it to AP collection, AP pagination is converted to RSM - do the opposite: convert AP collection to pubsub and handle RSM request. Due to ActivityStream collection pagination limitations, some RSM request produce inefficient requests, but caching should be used most of the time in the future and avoid the problem. - set specific name to HTTP Server - new `local_only` setting (`True` by default) to indicate if the gateway can request or not XMPP Pubsub nodes from other servers - disco info now specifies important features such as Pubsub RSM, and nodes metadata ticket 363
author Goffi <goffi@goffi.org>
date Tue, 25 Jan 2022 17:54:06 +0100 (2022-01-25)
parents a6dfd3db372b
children 86eea17cafa7
files sat/plugins/plugin_comp_ap_gateway.py
diffstat 1 files changed, 877 insertions(+), 69 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway.py	Tue Jan 25 17:27:39 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway.py	Tue Jan 25 17:54:06 2022 +0100
@@ -16,30 +16,40 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-import time
-import json
 import base64
 import hashlib
-from urllib import parse
-from typing import Tuple
+import json
 from pathlib import Path
-import shortuuid
-from cryptography.hazmat.primitives.asymmetric import rsa
+import time
+from typing import Optional, Dict, Tuple, List, Union
+from urllib import parse
+import calendar
+import re
+import unicodedata
+
+import dateutil
 from cryptography.hazmat.primitives import serialization
 from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives.asymmetric import padding
-from twisted.internet import reactor, threads, defer
-from twisted.web import server, resource as web_resource, http
-from twisted.words.protocols.jabber import jid
+import shortuuid
 import treq
 from treq.response import _Response as TReqResponse
-from sat.core.i18n import _
+from twisted.internet import defer, reactor, threads
+from twisted.web import http, resource as web_resource, server
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import pubsub, rsm
+
+from sat.core import exceptions
 from sat.core.constants import Const as C
-from sat.core import exceptions
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.core.core_types import SatXMPPEntity
-from sat.tools.common import tls, data_format
 from sat.tools import utils
+from sat.tools.common import data_format, tls
+from sat.tools.common.async_utils import async_lru
+from sat.tools.utils import ensure_deferred
 
 
 log = getLogger(__name__)
@@ -52,21 +62,39 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106"],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
-    C.PI_HANDLER: C.BOOL_FALSE,
+    C.PI_HANDLER: C.BOOL_TRUE,
     C.PI_DESCRIPTION: _(
         "Gateway for bidirectional communication between XMPP and ActivityPub."
     ),
 }
 
+VERSION = unicodedata.normalize(
+    'NFKD',
+    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
+)
+HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
+RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
+RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
+RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
 CONF_SECTION = f"component {IMPORT_NAME}"
 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
 TYPE_ACTOR = "actor"
 TYPE_INBOX = "inbox"
+TYPE_OUTBOX = "outbox"
 TYPE_ITEM = "item"
 MEDIA_TYPE_AP = "application/activity+json"
+# mapping from AP metadata to microblog data
+AP_MB_MAP = {
+    "content": "content_xhtml",
+
+}
+AP_REQUEST_TYPES = {"actor", "outbox"}
+PAGE_SIZE = 10
+
+LRU_MAX_SIZE = 200
 
 
 class HTTPAPGServer(web_resource.Resource):
@@ -77,12 +105,11 @@
         self.apg = ap_gateway
         super().__init__()
 
-    def webfinger(self, request):
+    async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
         query = parse.parse_qs(url_parsed.query)
         resource = query.get("resource", [""])[0]
         account = resource[5:].strip()
-        log.info(f"request pour {account}")
         if not resource.startswith("acct:") or not account:
             return web_resource.ErrorPage(
                 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
@@ -101,18 +128,24 @@
             ]
         }
         request.setHeader("content-type", CONTENT_TYPE_AP)
-        return json.dumps(resp).encode()
+        request.write(json.dumps(resp).encode())
+        request.finish()
 
-    def APRequest(self, request):
-        path = request.path.decode()
-        actor_url = parse.urljoin(
-            f"https://{self.apg.public_url}",
-            path
-        )
-        __, account = self.apg.parseAPURL(actor_url)
-        inbox_url = self.apg.buildAPURL(TYPE_INBOX, account)
-        username = account.split("@", 1)[0]
-        actor = {
+    async def APActorRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        actor_url: str
+    ) -> dict:
+        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+
+        # we have to use AP account as preferredUsername because it is used to retrieve
+        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
+        preferred_username = ap_account.split("@", 1)[0]
+        return {
             "@context": [
                 "https://www.w3.org/ns/activitystreams",
                 "https://w3id.org/security/v1"
@@ -120,24 +153,191 @@
 
             "id": actor_url,
             "type": "Person",
-            "preferredUsername": username,
+            "preferredUsername": preferred_username,
             "inbox": inbox_url,
-
+            "outbox": outbox_url,
             "publicKey": {
                 "id": f"{actor_url}#main-key",
                 "owner": actor_url,
                 "publicKeyPem": self.apg.public_key_pem
             }
         }
+
+    def getCanonicalURL(self, request: "HTTPRequest") -> str:
+        return parse.urljoin(
+            f"https://{self.apg.public_url}",
+            request.path.decode().rstrip("/")
+        )
+
+    def queryData2RSMRequest(
+        self,
+        query_data: Dict[str, List[str]]
+    ) -> rsm.RSMRequest:
+        """Get RSM kwargs to use with RSMRequest from query data"""
+        page = query_data.get("page")
+
+        if page == ["first"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
+        elif page == ["last"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE)
+        else:
+            for query_key in ("index", "before", "after"):
+                try:
+                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
+                except (KeyError, IndexError, ValueError):
+                    pass
+                else:
+                    return rsm.RSMRequest(**kwargs)
+        raise ValueError(f"Invalid query data: {query_data!r}")
+
+    async def APOutboxPageRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        actor_url: str,
+        query_data: Dict[str, List[str]]
+    ) -> dict:
+        # we only keep useful keys, and sort to have consistent URL which can
+        # be used as ID
+        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
+        query_data = {k: query_data[k] for k in url_keys}
+        rsm_kwargs = self.queryData2RSMRequest(query_data)
+        try:
+            items, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                rsm_request=self.queryData2RSMRequest(query_data),
+                extra = {C.KEY_USE_CACHE: False}
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+
+        base_url = self.getCanonicalURL(request)
+        url = f"{base_url}?{parse.urlencode(query_data, True)}"
+        data = {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url,
+            "type": "OrderedCollectionPage",
+            "partOf": base_url,
+            "orderedItems" : [
+                await self.apg.mbdata2APitem(
+                    self.apg.client,
+                    await self.apg._m.item2mbdata(
+                        self.apg.client,
+                        item,
+                        account_jid,
+                        node
+                    )
+                )
+                for item in reversed(items)
+            ]
+        }
+
+        # AP OrderedCollection must be in reversed chronological order, thus the opposite
+        # of what we get with RSM (at least with Libervia Pubsub)
+        if not metadata["complete"]:
+            try:
+                last= metadata["rsm"]["last"]
+            except KeyError:
+                last = None
+            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
+        if metadata["rsm"]["index"] != 0:
+            try:
+                first= metadata["rsm"]["first"]
+            except KeyError:
+                first = None
+            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
+
+        return data
+
+    async def APOutboxRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        actor_url: str
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+
+        parsed_url = parse.urlparse(request.uri.decode())
+        query_data = parse.parse_qs(parsed_url.query)
+        if query_data:
+            return await self.APOutboxPageRequest(
+                request, account_jid, node, ap_account, actor_url, query_data
+            )
+
+        # XXX: we can't use disco#info here because this request won't work on a bare jid
+        # due to security considerations of XEP-0030 (we don't have presence
+        # subscription).
+        # The current workaround is to do a request as if RSM was available, and actually
+        # check its availability according to result.
+        try:
+            __, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                max_items=0,
+                rsm_request=rsm.RSMRequest(max_=0)
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+        try:
+            items_count = metadata["rsm"]["count"]
+        except KeyError:
+            log.warning(
+                f"No RSM metadata found when requesting pubsub node {node} at "
+                f"{account_jid}, defaulting to items_count=20"
+            )
+            items_count = 20
+
+        url = self.getCanonicalURL(request)
+        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
+        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
+        return {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url,
+            "totalItems": items_count,
+            "type": "OrderedCollection",
+            "first": url_first_page,
+            "last": url_last_page,
+        }
+
+    async def APRequest(self, request):
+        path = request.path.decode()
+        actor_url = parse.urljoin(
+            f"https://{self.apg.public_url}",
+            path
+        )
+        request_type, ap_account = self.apg.parseAPURL(actor_url)
+        account_jid, node = await self.apg.getJIDAndNode(ap_account)
+        if request_type not in AP_REQUEST_TYPES:
+            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+        method = getattr(self, f"AP{request_type.title()}Request")
+        ret_data = await method(request, account_jid, node, ap_account, actor_url)
         request.setHeader("content-type", CONTENT_TYPE_AP)
-        return json.dumps(actor).encode()
+        request.write(json.dumps(ret_data).encode())
+        request.finish()
+
+    def render(self, request):
+        request.setHeader("server", VERSION)
+        return super().render(request)
 
     def render_GET(self, request):
         path = request.path.decode().lstrip("/")
         if path.startswith(".well-known/webfinger"):
-            return self.webfinger(request)
+            defer.ensureDeferred(self.webfinger(request))
+            return server.NOT_DONE_YET
         elif path.startswith(self.apg.ap_path):
-            return self.APRequest(request)
+            defer.ensureDeferred(self.APRequest(request))
+            return server.NOT_DONE_YET
+
         return web_resource.NoResource().render(request)
 
 
@@ -157,6 +357,8 @@
     def __init__(self, host):
         self.host = host
         self.initialised = False
+        self._m = host.plugins["XEP-0277"]
+        self._p = host.plugins["XEP-0060"]
 
         host.bridge.addMethod(
             "APSend",
@@ -167,6 +369,9 @@
             async_=True,
         )
 
+    def getHandler(self, __):
+        return APPubsubService(self)
+
     async def init(self, client):
         if self.initialised:
             return
@@ -204,7 +409,8 @@
             format=serialization.PublicFormat.SubjectPublicKeyInfo
         ).decode()
 
-        # params (URL and port)
+        # params
+        # URL and port
         self.public_url = self.host.memory.getConfig(
             CONF_SECTION, "public_url"
         ) or self.host.memory.getConfig(
@@ -232,8 +438,16 @@
                 'bad ap-gateay http_connection_type, you must use one of "http" or '
                 '"https"'
             )
+        self.max_items = self.host.memory.getConfig(
+            CONF_SECTION, 'new_node_max_items', 50
+
+        )
         self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
         self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
+        # True (default) if we provide gateway only to entities/services from our server
+        self.local_only = C.bool(
+            self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
+        )
 
         # HTTP server launch
         self.server = HTTPServer(self)
@@ -247,8 +461,202 @@
             reactor.listenSSL(self.http_port, self.server, context_factory)
 
     async def profileConnecting(self, client):
+        self.client = client
         await self.init(client)
 
+    async def apGet(self, url: str) -> dict:
+        """Retrieve AP JSON from given URL
+
+        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
+            with AP server
+        """
+        try:
+            return await treq.json_content(await treq.get(
+                url,
+                headers = {
+                    "Accept": [MEDIA_TYPE_AP],
+                    "Content-Type": [MEDIA_TYPE_AP],
+                }
+            ))
+        except Exception as e:
+            raise error.StanzaError(
+                "service-unavailable",
+                text=f"Can't get AP data at {url}: {e}"
+            )
+
+    def mustEncode(self, text: str) -> bool:
+        """Indicate if a text must be period encoded"""
+        return (
+            not RE_ALLOWED_UNQUOTED.match(text)
+            or text.startswith("___")
+            or "---" in text
+        )
+
+    def periodEncode(self, text: str) -> str:
+        """Period encode a text
+
+        see [getJIDAndNode] for reasons of period encoding
+        """
+        return (
+            parse.quote(text, safe="")
+            .replace("---", "%2d%2d%2d")
+            .replace("___", "%5f%5f%5f")
+            .replace(".", "%2e")
+            .replace("~", "%7e")
+            .replace("%", ".")
+        )
+
+    async def getAPAccountFromJidAndNode(
+        self,
+        jid_: jid.JID,
+        node: Optional[str]
+    ) -> str:
+        """Construct AP account from JID and node
+
+        The account construction will use escaping when necessary
+        """
+        if not node or node == self._m.namespace:
+            node = None
+
+        if node and not jid_.user and not self.mustEncode(node):
+            is_pubsub = self.isPubsub(jid_)
+            # when we have a pubsub service, the user part can be used to set the node
+            # this produces more user-friendly AP accounts
+            if is_pubsub:
+                jid_.user = node
+                node = None
+
+        is_local = self.isLocal(jid_)
+        user = jid_.user if is_local else jid_.userhost()
+        if user is None:
+            user = ""
+        account_elts = []
+        if node and self.mustEncode(node) or self.mustEncode(user):
+            account_elts = ["___"]
+            if node:
+                node = self.periodEncode(node)
+            user = self.periodEncode(user)
+
+        if not user:
+            raise exceptions.InternalError("there should be a user part")
+
+        if node:
+            account_elts.extend((node, "---"))
+
+        account_elts.extend((
+            user, "@", jid_.host if is_local else self.client.jid.userhost()
+        ))
+        return "".join(account_elts)
+
+    def isLocal(self, jid_: jid.JID) -> bool:
+        """Returns True if jid_ use a domain or subdomain of gateway's host"""
+        local_host = self.client.host.split(".")
+        assert local_host
+        return jid_.host.split(".")[-len(local_host):] == local_host
+
+    async def isPubsub(self, jid_: jid.JID) -> bool:
+        """Indicate if a JID is a Pubsub service"""
+        host_disco = await self.host.getDiscoInfos(self.client, jid_)
+        return (
+            ("pubsub", "service") in host_disco.identities
+            and not ("pubsub", "pep") in host_disco.identities
+        )
+
+    async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
+        """Decode raw AP account handle to get XMPP JID and Pubsub Node
+
+        Username are case insensitive.
+
+        By default, the username correspond to local username (i.e. username from
+        component's server).
+
+        If local name's domain is a pubsub service (and not PEP), the username is taken as
+        a pubsub node.
+
+        If ``---`` is present in username, the part before is used as pubsub node, and the
+        rest as a JID user part.
+
+        If username starts with ``___``, characters are encoded using period encoding
+        (i.e. percent encoding where a ``.`` is used instead of ``%``).
+
+        This horror is necessary due to limitation in some AP implementation (notably
+        Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
+
+        examples::
+
+        ``toto@example.org`` => JID = toto@example.org, node = None
+
+        ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None
+
+        ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
+        JID = pubsub.example.org, node = toto
+
+        ``tata---toto@example.org`` => JID = toto@example.org, node = tata
+
+        ``___urn.3axmpp.3a.microblog.3a0@pubsub.example.org`` (with pubsub.example.org
+        being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
+
+        @param ap_account: ActivityPub account handle (``username@domain.tld``)
+        @return: service JID and pubsub node
+            if pubsub is None, default microblog pubsub node (and possibly other nodes
+            that plugins may hanlde) will be used
+        @raise ValueError: invalid account
+        @raise PermissionError: non local jid is used when gateway doesn't allow them
+        """
+        if ap_account.count("@") != 1:
+            raise ValueError("Invalid AP account")
+        if ap_account.startswith("___"):
+            encoded = True
+            ap_account = ap_account[3:]
+        else:
+            encoded = False
+
+        username, domain = ap_account.split("@")
+
+        if "---" in username:
+            node, username = username.rsplit("---", 1)
+        else:
+            node = None
+
+        if encoded:
+            username = parse.unquote(
+                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
+                errors="strict"
+            )
+            if node:
+                node = parse.unquote(
+                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
+                    errors="strict"
+                )
+
+        if "@" in username:
+            username, domain = username.rsplit("@", 1)
+
+        if not node:
+            # we need to check host disco, because disco request to user may be
+            # blocked for privacy reason (see
+            # https://xmpp.org/extensions/xep-0030.html#security)
+            is_pubsub = await self.isPubsub(jid.JID(domain))
+
+            if is_pubsub:
+                # if the host is a pubsub service and not a PEP, we consider that username
+                # is in fact the node name
+                node = username
+                username = None
+
+        jid_s = f"{username}@{domain}" if username else domain
+        try:
+            jid_ = jid.JID(jid_s)
+        except RuntimeError:
+            raise ValueError(f"Invalid jid: {jid_s!r}")
+
+        if self.local_only and not self.isLocal(jid_):
+            raise exceptions.PermissionError(
+                "This gateway is configured to map only local entities and services"
+            )
+
+        return jid_, node
+
     def parseAPURL(self, url: str) -> Tuple[str, str]:
         """Parse an URL leading to an AP endpoint
 
@@ -313,10 +721,11 @@
         client = self.host.getClient(profile)
         return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
 
-    async def getAPActorData(self, account: str) -> dict:
-        """Retrieve ActivityPub Actor data
+    async def getAPActorIdFromAccount(self, account: str) -> str:
+        """Retrieve account ID from it's handle using WebFinger
 
-        @param account: ActivityPub Actor identifier
+        @param account: AP handle (user@domain.tld)
+        @return: Actor ID (which is an URL)
         """
         if account.count("@") != 1 or "/" in account:
             raise ValueError("Invalid account: {account!r}")
@@ -343,18 +752,337 @@
             raise ValueError(
                 f"No ActivityPub link found for {account!r}"
             )
+        return href
+
+    async def getAPActorDataFromId(self, account: str) -> dict:
+        """Retrieve ActivityPub Actor data
+
+        @param account: ActivityPub Actor identifier
+        """
+        href = await self.getAPActorIdFromAccount(account)
+        return await self.apGet(href)
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getAPAccountFromId(self, actor_id: str):
+        """Retrieve AP account from the ID URL
+
+        @param actor_id: AP ID of the actor (URL to the actor data)
+        """
+        url_parsed = parse.urlparse(actor_id)
+        actor_data = await self.apGet(actor_id)
+        username = actor_data.get("preferredUsername")
+        if not username:
+            raise exceptions.DataError(
+                'No "preferredUsername" field found, can\'t retrieve actor account'
+            )
+        account = f"{username}@{url_parsed.hostname}"
+        # we try to retrieve the actor ID from the account to check it
+        found_id = await self.getAPActorIdFromAccount(account)
+        if found_id != actor_id:
+            # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
+            msg = (
+                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
+                f"({actor_id!r}). This AP instance doesn't seems to use "
+                '"preferredUsername" as we expect.'
+            )
+            log.warning(msg)
+            raise exceptions.DataError(msg)
+        return account
+
+    async def getAPItems(
+        self,
+        account: str,
+        max_items: Optional[int] = None,
+        chronological_pagination: bool = True,
+        after_id: Optional[str] = None,
+        start_index: Optional[int] = None,
+    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
+        """Retrieve AP items and convert them to XMPP items
+
+        @param account: AP account to get items from
+        @param max_items: maximum number of items to retrieve
+            retrieve all items by default
+        @param chronological_pagination: get pages in chronological order
+            AP use reversed chronological order for pagination, "first" page returns more
+            recent items. If "chronological_pagination" is True, "last" AP page will be
+            retrieved first.
+        @param after_id: if set, retrieve items starting from given ID
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            if ``after_id`` is not already in cache, we have to retrieve every page until
+            we find it.
+            In most common cases, ``after_id`` should be in cache though (client usually
+            use known ID when in-order pagination is used).
+        @param start_index: start retrieving items from the one with given index
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            all pages before the requested index will be retrieved to count items.
+        @return: XMPP Pubsub items and corresponding RSM Response
+            Items are always returned in chronological order in the result
+        """
+        actor_data = await self.getAPActorDataFromId(account)
+        outbox = actor_data.get("outbox")
+        rsm_resp: Dict[str, Union[bool, int]] = {}
+        if not outbox:
+            raise exceptions.DataError(f"No outbox found for actor {account}")
+        outbox_data = await self.apGet(outbox)
         try:
-            ap_actor_data = await treq.json_content(await treq.get(
-                href,
-                headers = {
-                    "Accept": [MEDIA_TYPE_AP],
-                    "Content-Type": [MEDIA_TYPE_AP],
-                }
-            ))
-        except Exception as e:
-            raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}")
+            count = outbox_data["totalItems"]
+        except KeyError:
+            log.warning(
+                f'"totalItems" not found in outbox of {account}, defaulting to 20'
+            )
+            count = 20
+        else:
+            log.info(f"{account}'s outbox has {count} item(s)")
+            rsm_resp["count"] = count
+
+        if start_index is not None:
+            assert chronological_pagination and after_id is None
+            if start_index >= count:
+                return [], rsm_resp
+            elif start_index == 0:
+                # this is the default behaviour
+                pass
+            elif start_index > 5000:
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text="Maximum limit for previous_index has been reached, this limit"
+                    "is set to avoid DoS"
+                )
+            else:
+                # we'll convert "start_index" to "after_id", thus we need the item just
+                # before "start_index"
+                previous_index = start_index - 1
+                retrieved_items = 0
+                current_page = outbox_data["last"]
+                while retrieved_items < count:
+                    page_data, items = await self.parseAPPage(current_page)
+                    if not items:
+                        log.warning(f"found an empty AP page at {current_page}")
+                        return [], rsm_resp
+                    page_start_idx = retrieved_items
+                    retrieved_items += len(items)
+                    if previous_index <= retrieved_items:
+                        after_id = items[previous_index - page_start_idx]["id"]
+                        break
+                    try:
+                        current_page = page_data["prev"]
+                    except KeyError:
+                        log.warning(
+                            f"missing previous page link at {current_page}: {page_data!r}"
+                        )
+                        raise error.StanzaError(
+                            "service-unavailable",
+                            "Error while retrieving previous page from AP service at "
+                            f"{current_page}"
+                        )
+
+        init_page = "last" if chronological_pagination else "first"
+        page = outbox_data.get(init_page)
+        if not page:
+            raise exceptions.DataError(
+                f"Initial page {init_page!r} not found for outbox {outbox}"
+            )
+        items = []
+        page_items = []
+        retrieved_items = 0
+        found_after_id = False
+
+        while retrieved_items < count:
+            __, page_items = await self.parseAPPage(page)
+            retrieved_items += len(page_items)
+            if after_id is not None and not found_after_id:
+                # if we have an after_id, we ignore all items until the requested one is
+                # found
+                limit_idx = [i["id"] for i in page_items].index(after_id)
+                if limit_idx == -1:
+                    # if "after_id" is not found, we don't add any item from this page
+                    log.debug(f"{after_id!r} not found at {page}, skipping")
+                else:
+                    found_after_id = True
+                    if chronological_pagination:
+                        page_items = page_items[limit_idx+1:]
+                        start_index = retrieved_items - len(page_items) + limit_idx + 1
+                    else:
+                        page_items = page_items[:limit_idx]
+                        start_index = count - (retrieved_items - len(page_items) +
+                                               limit_idx + 1)
+                    items.extend(page_items)
+            else:
+                items.extend(page_items)
+            if max_items is not None and len(items) >= max_items:
+                if chronological_pagination:
+                    items = items[:max_items]
+                else:
+                    items = items[-max_items:]
+                break
+            page = outbox_data.get("prev" if chronological_pagination else "next")
+            if not page:
+                break
+
+        if after_id is not None and not found_after_id:
+            raise error.StanzaError("item-not-found")
+
+        if after_id is None:
+            rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
+
+        if start_index is not None:
+            rsm_resp["index"] = start_index
+        elif after_id is not None:
+            log.warning("Can't determine index of first element")
+        elif chronological_pagination:
+            rsm_resp["index"] = 0
+        else:
+            rsm_resp["index"] = count - len(items)
+        if items:
+            rsm_resp.update({
+                "first": items[0]["id"],
+                "last": items[-1]["id"]
+            })
+
+        return items, rsm.RSMResponse(**rsm_resp)
+
+    async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
 
-        return ap_actor_data
+        @param url: url linking and AP page
+        @return: page data, pubsub items
+        """
+        page_data = await self.apGet(url)
+        ap_items = page_data.get("orderedItems")
+        if not ap_items:
+            log.warning('No "orderedItems" collection found')
+            return page_data, []
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                ap_object, mb_data = await self.apItem2MBdata(ap_item)
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+
+            item_elt = await self._m.data2entry(
+                self.client, mb_data, ap_object["id"], None, self._m.namespace
+            )
+            item_elt["publisher"] = mb_data["author_jid"].full()
+            items.append(item_elt)
+
+        return page_data, items
+
+    async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
+        """Convert AP item to microblog data
+
+        @return: AP Item's Object and microblog data
+        @raise exceptions.DataError: something is invalid in the AP item
+        @raise NotImplemented: some AP data is not handled yet
+        @raise error.StanzaError: error while contacting the AP server
+        """
+        ap_object = ap_item.get("object")
+        if not ap_object:
+            log.warning(f'No "object" found in AP item {ap_item!r}')
+            raise exceptions.DataError
+        if isinstance(ap_object, str):
+            ap_object = await self.apGet(ap_object)
+        obj_id = ap_object.get("id")
+        if not obj_id:
+            log.warning(f'No "id" found in AP object: {ap_object!r}')
+            raise exceptions.DataError
+        if ap_object.get("inReplyTo") is not None:
+            raise NotImplementedError
+        mb_data = {}
+        for ap_key, mb_key in AP_MB_MAP.items():
+            data = ap_object.get(ap_key)
+            if data is None:
+                continue
+            mb_data[mb_key] = data
+
+        # content
+        try:
+            language, content_xhtml = ap_object["contentMap"].popitem()
+        except (KeyError, AttributeError):
+            try:
+                mb_data["content_xhtml"] = mb_data["content"]
+            except KeyError:
+                log.warning(f"no content found:\n{ap_object!r}")
+                raise exceptions.DataError
+        else:
+            mb_data["language"] = language
+            mb_data["content_xhtml"] = content_xhtml
+
+        # author
+        actor = ap_item.get("actor")
+        if not actor:
+            log.warning(f"no actor associated to object id {obj_id!r}")
+            raise exceptions.DataError
+        elif isinstance(actor, list):
+            # we only keep first item of list as author
+            # TODO: handle multiple actors
+            if len(actor) > 1:
+                log.warning("multiple actors are not managed")
+            actor = actor[0]
+
+        if isinstance(actor, dict):
+            actor = actor.get("id")
+            if not actor:
+                log.warning(f"no actor id found: {actor!r}")
+                raise exceptions.DataError
+
+        if isinstance(actor, str):
+            account = await self.getAPAccountFromId(actor)
+            mb_data["author"] = account.split("@", 1)[0]
+            author_jid = mb_data["author_jid"] = jid.JID(
+                None,
+                (
+                    self.host.plugins["XEP-0106"].escape(account),
+                    self.client.jid.host,
+                    None
+                )
+            )
+        else:
+            log.warning(f"unknown actor type found: {actor!r}")
+            raise exceptions.DataError
+
+        # published/updated
+        for field in ("published", "updated"):
+            value = ap_object.get(field)
+            if not value and field == "updated":
+                value = ap_object.get("published")
+            if value:
+                try:
+                    mb_data[field] = calendar.timegm(
+                        dateutil.parser.parse(str(value)).utctimetuple()
+                    )
+                except dateutil.parser.ParserError as e:
+                    log.warning(f"Can't parse {field!r} field: {e}")
+        return ap_object, mb_data
+
+    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item"""
+        if not mb_data.get("id"):
+            mb_data["id"] = shortuuid.uuid()
+        if not mb_data.get("author_jid"):
+            mb_data["author_jid"] = client.jid
+        ap_account = await self.getAPAccountFromJidAndNode(
+            jid.JID(mb_data["author_jid"]),
+            None
+        )
+        url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
+        url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
+        return {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url_item,
+            "type": "Create",
+            "actor": url_actor,
+
+            "object": {
+                "id": url_item,
+                "type": "Note",
+                "published": utils.xmpp_date(mb_data["published"]),
+                "attributedTo": url_actor,
+                "content": mb_data.get("content_xhtml") or mb_data["content"],
+                "to": "https://www.w3.org/ns/activitystreams#Public"
+            }
+        }
 
     async def publishMessage(
         self,
@@ -382,34 +1110,114 @@
         if not service.user:
             raise ValueError("service must have a local part")
         account = self.host.plugins["XEP-0106"].unescape(service.user)
-        ap_actor_data = await self.getAPActorData(account)
+        ap_actor_data = await self.getAPActorDataFromId(account)
 
         try:
             inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
         except KeyError:
             raise exceptions.DataError("Can't get ActivityPub actor inbox")
 
-        if not mess_data.get("id"):
-            mess_data["id"] = shortuuid.uuid()
-        url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost())
-        url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"])
-        now = time.time()
-        item_data = {
-            "@context": "https://www.w3.org/ns/activitystreams",
-            "id": url_item,
-            "type": "Create",
-            "actor": url_actor,
-
-            "object": {
-                "id": url_item,
-                "type": "Note",
-                "published": utils.xmpp_date(now),
-                "attributedTo": url_actor,
-                "inReplyTo": mess_data["node"],
-                "content": mess_data.get("content_xhtml") or mess_data["content"],
-                "to": "https://www.w3.org/ns/activitystreams#Public"
-            }
-        }
+        item_data = await self.mbdata2APitem(client, mess_data)
+        url_actor = item_data["object"]["attributedTo"]
         resp = await self.signAndPost(inbox_url, url_actor, item_data)
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
+
+
+class APPubsubService(rsm.PubSubService):
+    """Pubsub service for XMPP requests"""
+
+    def __init__(self, apg):
+        super(APPubsubService, self).__init__()
+        self.host = apg.host
+        self.apg = apg
+        self.discoIdentity = {
+            "category": "pubsub",
+            "type": "service",
+            "name": "Libervia ActivityPub Gateway",
+        }
+
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
+        raise NotImplementedError
+
+    @ensure_deferred
+    async def items(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        node: str,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        rsm_req: Optional[rsm.RSMRequest]
+    ) -> List[domish.Element]:
+        if not service.user:
+            return []
+        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
+        if ap_account.count("@") != 1:
+            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
+            return []
+        if node != self.apg._m.namespace:
+            raise error.StanzaError(
+                "feature-not-implemented",
+                text=f"{VERSION} only supports {self.apg._m.namespace} "
+                "node for now"
+            )
+        if rsm_req is None:
+            if maxItems is None:
+                maxItems = 20
+            kwargs = {
+                "max_items": maxItems,
+                "chronological_pagination": False,
+            }
+        else:
+            if len(
+                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                 if v is not None]
+            ) > 1:
+                raise error.StanzaError(
+                    "bad-request",
+                    text="You can't use after, before and index at the same time"
+                )
+            kwargs = {"max_items": rsm_req.max}
+            if rsm_req.after is not None:
+                kwargs["after_id"] = rsm_req.after
+            elif rsm_req.before is not None:
+                kwargs["chronological_pagination"] = False
+                if rsm_req.before != "":
+                    kwargs["after_id"] = rsm_req.before
+            elif rsm_req.index is not None:
+                kwargs["start_index"] = rsm_req.index
+
+        log.info(
+            f"No cache found for node {node} at {service} (AP account {ap_account}), "
+            "using Collection Paging to RSM translation"
+        )
+        return await self.apg.getAPItems(ap_account, **kwargs)
+
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        raise NotImplementedError
+
+    def getNodeInfo(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> Optional[dict]:
+        if not nodeIdentifier:
+            return None
+        info = {
+            "type": "leaf",
+            "meta-data": [
+                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+                {"var": "pubsub#max_items", "value": "max"},
+                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+
+            ]
+
+        }
+        return info