diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children bf0505d41c09
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,853 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+import hashlib
+import json
+from pathlib import Path
+from typing import Optional, Dict, Tuple, List, Union
+from urllib import parse
+import calendar
+import re
+
+import dateutil
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import padding
+import shortuuid
+import treq
+from treq.response import _Response as TReqResponse
+from twisted.internet import defer, reactor, threads
+from twisted.web import http
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import rsm
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.tools import utils
+from sat.tools.common import data_format, tls
+from sat.tools.common.async_utils import async_lru
+
+from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP,
+                       AP_MB_MAP, LRU_MAX_SIZE)
+from .http_server import HTTPServer
+from .pubsub_service import APPubsubService
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "ap-gateway"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "ActivityPub Gateway component",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
+    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "APGateway",
+    C.PI_HANDLER: C.BOOL_TRUE,
+    C.PI_DESCRIPTION: _(
+        "Gateway for bidirectional communication between XMPP and ActivityPub."
+    ),
+}
+
+HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
+RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
+RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
+RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
+
+
+class APGateway:
+
+    def __init__(self, host):
+        self.host = host
+        self.initialised = False
+        self._m = host.plugins["XEP-0277"]
+        self._p = host.plugins["XEP-0060"]
+
+        host.bridge.addMethod(
+            "APSend",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._publishMessage,
+            async_=True,
+        )
+
+    def getHandler(self, __):
+        return APPubsubService(self)
+
+    async def init(self, client):
+        if self.initialised:
+            return
+
+        self.initialised = True
+        log.info(_("ActivityPub Gateway initialization"))
+
+        # RSA keys
+        stored_data = await self.host.memory.storage.getPrivates(
+            IMPORT_NAME, ["rsa_key"], profile=client.profile
+        )
+        private_key_pem = stored_data.get("rsa_key")
+        if private_key_pem is None:
+            self.private_key = await threads.deferToThread(
+                rsa.generate_private_key,
+                public_exponent=65537,
+                key_size=4096,
+            )
+            private_key_pem = self.private_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.PKCS8,
+                encryption_algorithm=serialization.NoEncryption()
+            ).decode()
+            await self.host.memory.storage.setPrivateValue(
+                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
+            )
+        else:
+            self.private_key = serialization.load_pem_private_key(
+                private_key_pem.encode(),
+                password=None,
+            )
+        self.public_key = self.private_key.public_key()
+        self.public_key_pem = self.public_key.public_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PublicFormat.SubjectPublicKeyInfo
+        ).decode()
+
+        # params
+        # URL and port
+        self.public_url = self.host.memory.getConfig(
+            CONF_SECTION, "public_url"
+        ) or self.host.memory.getConfig(
+            CONF_SECTION, "xmpp_domain"
+        )
+        if self.public_url is None:
+            log.error(
+                '"public_url" not set in configuration, this is mandatory to have'
+                "ActivityPub Gateway running. Please set this option it to public facing "
+                f"url in {CONF_SECTION!r} configuration section."
+            )
+            return
+        if parse.urlparse(self.public_url).scheme:
+            log.error(
+                "Scheme must not be specified in \"public_url\", please remove it from "
+                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
+            )
+            return
+        self.http_port = int(self.host.memory.getConfig(
+            CONF_SECTION, 'http_port', 8123))
+        connection_type = self.host.memory.getConfig(
+            CONF_SECTION, 'http_connection_type', 'https')
+        if connection_type not in ('http', 'https'):
+            raise exceptions.ConfigError(
+                'bad ap-gateay http_connection_type, you must use one of "http" or '
+                '"https"'
+            )
+        self.max_items = self.host.memory.getConfig(
+            CONF_SECTION, 'new_node_max_items', 50
+
+        )
+        self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
+        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
+        # True (default) if we provide gateway only to entities/services from our server
+        self.local_only = C.bool(
+            self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
+        )
+
+        # HTTP server launch
+        self.server = HTTPServer(self)
+        if connection_type == 'http':
+            reactor.listenTCP(self.http_port, self.server)
+        else:
+            options = tls.getOptionsFromConfig(
+                self.host.memory.config, CONF_SECTION)
+            tls.TLSOptionsCheck(options)
+            context_factory = tls.getTLSContextFactory(options)
+            reactor.listenSSL(self.http_port, self.server, context_factory)
+
+    async def profileConnecting(self, client):
+        self.client = client
+        await self.init(client)
+
+    async def apGet(self, url: str) -> dict:
+        """Retrieve AP JSON from given URL
+
+        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
+            with AP server
+        """
+        try:
+            return await treq.json_content(await treq.get(
+                url,
+                headers = {
+                    "Accept": [MEDIA_TYPE_AP],
+                    "Content-Type": [MEDIA_TYPE_AP],
+                }
+            ))
+        except Exception as e:
+            raise error.StanzaError(
+                "service-unavailable",
+                text=f"Can't get AP data at {url}: {e}"
+            )
+
+    def mustEncode(self, text: str) -> bool:
+        """Indicate if a text must be period encoded"""
+        return (
+            not RE_ALLOWED_UNQUOTED.match(text)
+            or text.startswith("___")
+            or "---" in text
+        )
+
+    def periodEncode(self, text: str) -> str:
+        """Period encode a text
+
+        see [getJIDAndNode] for reasons of period encoding
+        """
+        return (
+            parse.quote(text, safe="")
+            .replace("---", "%2d%2d%2d")
+            .replace("___", "%5f%5f%5f")
+            .replace(".", "%2e")
+            .replace("~", "%7e")
+            .replace("%", ".")
+        )
+
+    async def getAPAccountFromJidAndNode(
+        self,
+        jid_: jid.JID,
+        node: Optional[str]
+    ) -> str:
+        """Construct AP account from JID and node
+
+        The account construction will use escaping when necessary
+        """
+        if not node or node == self._m.namespace:
+            node = None
+
+        if node and not jid_.user and not self.mustEncode(node):
+            is_pubsub = await self.isPubsub(jid_)
+            # when we have a pubsub service, the user part can be used to set the node
+            # this produces more user-friendly AP accounts
+            if is_pubsub:
+                jid_.user = node
+                node = None
+
+        is_local = self.isLocal(jid_)
+        user = jid_.user if is_local else jid_.userhost()
+        if user is None:
+            user = ""
+        account_elts = []
+        if node and self.mustEncode(node) or self.mustEncode(user):
+            account_elts = ["___"]
+            if node:
+                node = self.periodEncode(node)
+            user = self.periodEncode(user)
+
+        if not user:
+            raise exceptions.InternalError("there should be a user part")
+
+        if node:
+            account_elts.extend((node, "---"))
+
+        account_elts.extend((
+            user, "@", jid_.host if is_local else self.client.jid.userhost()
+        ))
+        return "".join(account_elts)
+
+    def isLocal(self, jid_: jid.JID) -> bool:
+        """Returns True if jid_ use a domain or subdomain of gateway's host"""
+        local_host = self.client.host.split(".")
+        assert local_host
+        return jid_.host.split(".")[-len(local_host):] == local_host
+
+    async def isPubsub(self, jid_: jid.JID) -> bool:
+        """Indicate if a JID is a Pubsub service"""
+        host_disco = await self.host.getDiscoInfos(self.client, jid_)
+        return (
+            ("pubsub", "service") in host_disco.identities
+            and not ("pubsub", "pep") in host_disco.identities
+        )
+
+    async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
+        """Decode raw AP account handle to get XMPP JID and Pubsub Node
+
+        Username are case insensitive.
+
+        By default, the username correspond to local username (i.e. username from
+        component's server).
+
+        If local name's domain is a pubsub service (and not PEP), the username is taken as
+        a pubsub node.
+
+        If ``---`` is present in username, the part before is used as pubsub node, and the
+        rest as a JID user part.
+
+        If username starts with ``___``, characters are encoded using period encoding
+        (i.e. percent encoding where a ``.`` is used instead of ``%``).
+
+        This horror is necessary due to limitation in some AP implementation (notably
+        Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
+
+        examples:
+
+        ``toto@example.org`` => JID = toto@example.org, node = None
+
+        ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a
+        non-local JID, and will work only if setings ``local_only`` is False), node = None
+
+        ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
+        JID = pubsub.example.org, node = toto
+
+        ``tata---toto@example.org`` => JID = toto@example.org, node = tata
+
+        ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
+        being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
+
+        @param ap_account: ActivityPub account handle (``username@domain.tld``)
+        @return: service JID and pubsub node
+            if pubsub is None, default microblog pubsub node (and possibly other nodes
+            that plugins may hanlde) will be used
+        @raise ValueError: invalid account
+        @raise PermissionError: non local jid is used when gateway doesn't allow them
+        """
+        if ap_account.count("@") != 1:
+            raise ValueError("Invalid AP account")
+        if ap_account.startswith("___"):
+            encoded = True
+            ap_account = ap_account[3:]
+        else:
+            encoded = False
+
+        username, domain = ap_account.split("@")
+
+        if "---" in username:
+            node, username = username.rsplit("---", 1)
+        else:
+            node = None
+
+        if encoded:
+            username = parse.unquote(
+                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
+                errors="strict"
+            )
+            if node:
+                node = parse.unquote(
+                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
+                    errors="strict"
+                )
+
+        if "@" in username:
+            username, domain = username.rsplit("@", 1)
+
+        if not node:
+            # we need to check host disco, because disco request to user may be
+            # blocked for privacy reason (see
+            # https://xmpp.org/extensions/xep-0030.html#security)
+            is_pubsub = await self.isPubsub(jid.JID(domain))
+
+            if is_pubsub:
+                # if the host is a pubsub service and not a PEP, we consider that username
+                # is in fact the node name
+                node = username
+                username = None
+
+        jid_s = f"{username}@{domain}" if username else domain
+        try:
+            jid_ = jid.JID(jid_s)
+        except RuntimeError:
+            raise ValueError(f"Invalid jid: {jid_s!r}")
+
+        if self.local_only and not self.isLocal(jid_):
+            raise exceptions.PermissionError(
+                "This gateway is configured to map only local entities and services"
+            )
+
+        return jid_, node
+
+    def parseAPURL(self, url: str) -> Tuple[str, str]:
+        """Parse an URL leading to an AP endpoint
+
+        @param url: URL to parse (schema is not mandatory)
+        @return: endpoint type and AP account
+        """
+        path = parse.urlparse(url).path.lstrip("/")
+        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
+        return type_, parse.unquote(account)
+
+    def buildAPURL(self, type_:str , *args: str) -> str:
+        """Build an AP endpoint URL
+
+        @param type_: type of AP endpoing
+        @param arg: endpoint dependant arguments
+        """
+        return parse.urljoin(
+            self.base_ap_url,
+            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
+        )
+
+    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
+        """Sign a documentent and post it to AP server
+
+        @param url: AP server endpoint
+        @param url_actor: URL generated by this gateway for local actor
+        @param doc: document to send
+        """
+        p_url = parse.urlparse(url)
+        date = http.datetimeToString().decode()
+        body = json.dumps(doc).encode()
+        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
+        digest = f"sha-256={digest_hash}"
+        to_sign = (
+            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
+            f"date: {date}\ndigest: {digest}"
+        )
+        signature = base64.b64encode(self.private_key.sign(
+            to_sign.encode(),
+            # we have to use PKCS1v15 padding to be compatible with Mastodon
+            padding.PKCS1v15(),
+            hashes.SHA256()
+        )).decode()
+        h_signature = (
+            f'keyId="{url_actor}",headers="(request-target) host date digest",'
+            f'signature="{signature}"'
+        )
+        return await treq.post(
+            url,
+            body,
+            headers={
+                "Host": [p_url.hostname],
+                "Date": [date],
+                "Digest": [digest],
+                "Signature": [h_signature],
+            }
+        )
+
+    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
+        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
+        service = jid.JID(service_s)
+        client = self.host.getClient(profile)
+        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
+
+    async def getAPActorIdFromAccount(self, account: str) -> str:
+        """Retrieve account ID from it's handle using WebFinger
+
+        @param account: AP handle (user@domain.tld)
+        @return: Actor ID (which is an URL)
+        """
+        if account.count("@") != 1 or "/" in account:
+            raise ValueError("Invalid account: {account!r}")
+        host = account.split("@")[1]
+        try:
+            finger_data = await treq.json_content(await treq.get(
+                f"https://{host}/.well-known/webfinger?"
+                f"resource=acct:{parse.quote_plus(account)}",
+            ))
+        except Exception as e:
+            raise exceptions.DataError(f"Can't get webfinger data: {e}")
+        for link in finger_data.get("links", []):
+            if (
+                link.get("type") == "application/activity+json"
+                and link.get("rel") == "self"
+            ):
+                href = link.get("href", "").strip()
+                if not href:
+                    raise ValueError(
+                        f"Invalid webfinger data for {account:r}: missing href"
+                    )
+                break
+        else:
+            raise ValueError(
+                f"No ActivityPub link found for {account!r}"
+            )
+        return href
+
+    async def getAPActorDataFromId(self, account: str) -> dict:
+        """Retrieve ActivityPub Actor data
+
+        @param account: ActivityPub Actor identifier
+        """
+        href = await self.getAPActorIdFromAccount(account)
+        return await self.apGet(href)
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getAPAccountFromId(self, actor_id: str):
+        """Retrieve AP account from the ID URL
+
+        @param actor_id: AP ID of the actor (URL to the actor data)
+        """
+        url_parsed = parse.urlparse(actor_id)
+        actor_data = await self.apGet(actor_id)
+        username = actor_data.get("preferredUsername")
+        if not username:
+            raise exceptions.DataError(
+                'No "preferredUsername" field found, can\'t retrieve actor account'
+            )
+        account = f"{username}@{url_parsed.hostname}"
+        # we try to retrieve the actor ID from the account to check it
+        found_id = await self.getAPActorIdFromAccount(account)
+        if found_id != actor_id:
+            # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
+            msg = (
+                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
+                f"({actor_id!r}). This AP instance doesn't seems to use "
+                '"preferredUsername" as we expect.'
+            )
+            log.warning(msg)
+            raise exceptions.DataError(msg)
+        return account
+
+    async def getAPItems(
+        self,
+        account: str,
+        max_items: Optional[int] = None,
+        chronological_pagination: bool = True,
+        after_id: Optional[str] = None,
+        start_index: Optional[int] = None,
+    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
+        """Retrieve AP items and convert them to XMPP items
+
+        @param account: AP account handle to get items from
+        @param max_items: maximum number of items to retrieve
+            retrieve all items by default
+        @param chronological_pagination: get pages in chronological order
+            AP use reversed chronological order for pagination, "first" page returns more
+            recent items. If "chronological_pagination" is True, "last" AP page will be
+            retrieved first.
+        @param after_id: if set, retrieve items starting from given ID
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            if ``after_id`` is not already in cache, we have to retrieve every page until
+            we find it.
+            In most common cases, ``after_id`` should be in cache though (client usually
+            use known ID when in-order pagination is used).
+        @param start_index: start retrieving items from the one with given index
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            all pages before the requested index will be retrieved to count items.
+        @return: XMPP Pubsub items and corresponding RSM Response
+            Items are always returned in chronological order in the result
+        """
+        actor_data = await self.getAPActorDataFromId(account)
+        outbox = actor_data.get("outbox")
+        rsm_resp: Dict[str, Union[bool, int]] = {}
+        if not outbox:
+            raise exceptions.DataError(f"No outbox found for actor {account}")
+        outbox_data = await self.apGet(outbox)
+        try:
+            count = outbox_data["totalItems"]
+        except KeyError:
+            log.warning(
+                f'"totalItems" not found in outbox of {account}, defaulting to 20'
+            )
+            count = 20
+        else:
+            log.info(f"{account}'s outbox has {count} item(s)")
+            rsm_resp["count"] = count
+
+        if start_index is not None:
+            assert chronological_pagination and after_id is None
+            if start_index >= count:
+                return [], rsm_resp
+            elif start_index == 0:
+                # this is the default behaviour
+                pass
+            elif start_index > 5000:
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text="Maximum limit for previous_index has been reached, this limit"
+                    "is set to avoid DoS"
+                )
+            else:
+                # we'll convert "start_index" to "after_id", thus we need the item just
+                # before "start_index"
+                previous_index = start_index - 1
+                retrieved_items = 0
+                current_page = outbox_data["last"]
+                while retrieved_items < count:
+                    page_data, items = await self.parseAPPage(current_page)
+                    if not items:
+                        log.warning(f"found an empty AP page at {current_page}")
+                        return [], rsm_resp
+                    page_start_idx = retrieved_items
+                    retrieved_items += len(items)
+                    if previous_index <= retrieved_items:
+                        after_id = items[previous_index - page_start_idx]["id"]
+                        break
+                    try:
+                        current_page = page_data["prev"]
+                    except KeyError:
+                        log.warning(
+                            f"missing previous page link at {current_page}: {page_data!r}"
+                        )
+                        raise error.StanzaError(
+                            "service-unavailable",
+                            "Error while retrieving previous page from AP service at "
+                            f"{current_page}"
+                        )
+
+        init_page = "last" if chronological_pagination else "first"
+        page = outbox_data.get(init_page)
+        if not page:
+            raise exceptions.DataError(
+                f"Initial page {init_page!r} not found for outbox {outbox}"
+            )
+        items = []
+        page_items = []
+        retrieved_items = 0
+        found_after_id = False
+
+        while retrieved_items < count:
+            __, page_items = await self.parseAPPage(page)
+            retrieved_items += len(page_items)
+            if after_id is not None and not found_after_id:
+                # if we have an after_id, we ignore all items until the requested one is
+                # found
+                try:
+                    limit_idx = [i["id"] for i in page_items].index(after_id)
+                except ValueError:
+                    # if "after_id" is not found, we don't add any item from this page
+                    log.debug(f"{after_id!r} not found at {page}, skipping")
+                else:
+                    found_after_id = True
+                    if chronological_pagination:
+                        start_index = retrieved_items - len(page_items) + limit_idx + 1
+                        page_items = page_items[limit_idx+1:]
+                    else:
+                        start_index = count - (retrieved_items - len(page_items) +
+                                               limit_idx + 1)
+                        page_items = page_items[:limit_idx]
+                    items.extend(page_items)
+            else:
+                items.extend(page_items)
+            if max_items is not None and len(items) >= max_items:
+                if chronological_pagination:
+                    items = items[:max_items]
+                else:
+                    items = items[-max_items:]
+                break
+            page = outbox_data.get("prev" if chronological_pagination else "next")
+            if not page:
+                break
+
+        if after_id is not None and not found_after_id:
+            raise error.StanzaError("item-not-found")
+
+        if after_id is None:
+            rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
+
+        if start_index is not None:
+            rsm_resp["index"] = start_index
+        elif after_id is not None:
+            log.warning("Can't determine index of first element")
+        elif chronological_pagination:
+            rsm_resp["index"] = 0
+        else:
+            rsm_resp["index"] = count - len(items)
+        if items:
+            rsm_resp.update({
+                "first": items[0]["id"],
+                "last": items[-1]["id"]
+            })
+
+        return items, rsm.RSMResponse(**rsm_resp)
+
+    async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
+
+        @param url: url linking and AP page
+        @return: page data, pubsub items
+        """
+        page_data = await self.apGet(url)
+        ap_items = page_data.get("orderedItems")
+        if not ap_items:
+            log.warning('No "orderedItems" collection found')
+            return page_data, []
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                ap_object, mb_data = await self.apItem2MBdata(ap_item)
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+
+            item_elt = await self._m.data2entry(
+                self.client, mb_data, ap_object["id"], None, self._m.namespace
+            )
+            item_elt["publisher"] = mb_data["author_jid"].full()
+            items.append(item_elt)
+
+        return page_data, items
+
+    async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
+        """Convert AP item to microblog data
+
+        @return: AP Item's Object and microblog data
+        @raise exceptions.DataError: something is invalid in the AP item
+        @raise NotImplemented: some AP data is not handled yet
+        @raise error.StanzaError: error while contacting the AP server
+        """
+        ap_object = ap_item.get("object")
+        if not ap_object:
+            log.warning(f'No "object" found in AP item {ap_item!r}')
+            raise exceptions.DataError
+        if isinstance(ap_object, str):
+            ap_object = await self.apGet(ap_object)
+        obj_id = ap_object.get("id")
+        if not obj_id:
+            log.warning(f'No "id" found in AP object: {ap_object!r}')
+            raise exceptions.DataError
+        if ap_object.get("inReplyTo") is not None:
+            raise NotImplementedError
+        mb_data = {}
+        for ap_key, mb_key in AP_MB_MAP.items():
+            data = ap_object.get(ap_key)
+            if data is None:
+                continue
+            mb_data[mb_key] = data
+
+        # content
+        try:
+            language, content_xhtml = ap_object["contentMap"].popitem()
+        except (KeyError, AttributeError):
+            try:
+                mb_data["content_xhtml"] = mb_data["content"]
+            except KeyError:
+                log.warning(f"no content found:\n{ap_object!r}")
+                raise exceptions.DataError
+        else:
+            mb_data["language"] = language
+            mb_data["content_xhtml"] = content_xhtml
+
+        # author
+        actor = ap_item.get("actor")
+        if not actor:
+            log.warning(f"no actor associated to object id {obj_id!r}")
+            raise exceptions.DataError
+        elif isinstance(actor, list):
+            # we only keep first item of list as author
+            # TODO: handle multiple actors
+            if len(actor) > 1:
+                log.warning("multiple actors are not managed")
+            actor = actor[0]
+
+        if isinstance(actor, dict):
+            actor = actor.get("id")
+            if not actor:
+                log.warning(f"no actor id found: {actor!r}")
+                raise exceptions.DataError
+
+        if isinstance(actor, str):
+            account = await self.getAPAccountFromId(actor)
+            mb_data["author"] = account.split("@", 1)[0]
+            author_jid = mb_data["author_jid"] = jid.JID(
+                None,
+                (
+                    self.host.plugins["XEP-0106"].escape(account),
+                    self.client.jid.host,
+                    None
+                )
+            )
+        else:
+            log.warning(f"unknown actor type found: {actor!r}")
+            raise exceptions.DataError
+
+        # published/updated
+        for field in ("published", "updated"):
+            value = ap_object.get(field)
+            if not value and field == "updated":
+                value = ap_object.get("published")
+            if value:
+                try:
+                    mb_data[field] = calendar.timegm(
+                        dateutil.parser.parse(str(value)).utctimetuple()
+                    )
+                except dateutil.parser.ParserError as e:
+                    log.warning(f"Can't parse {field!r} field: {e}")
+        return ap_object, mb_data
+
+    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item"""
+        if not mb_data.get("id"):
+            mb_data["id"] = shortuuid.uuid()
+        if not mb_data.get("author_jid"):
+            mb_data["author_jid"] = client.jid
+        ap_account = await self.getAPAccountFromJidAndNode(
+            jid.JID(mb_data["author_jid"]),
+            None
+        )
+        url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
+        url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
+        return {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url_item,
+            "type": "Create",
+            "actor": url_actor,
+
+            "object": {
+                "id": url_item,
+                "type": "Note",
+                "published": utils.xmpp_date(mb_data["published"]),
+                "attributedTo": url_actor,
+                "content": mb_data.get("content_xhtml") or mb_data["content"],
+                "to": "https://www.w3.org/ns/activitystreams#Public"
+            }
+        }
+
+    async def publishMessage(
+        self,
+        client: SatXMPPEntity,
+        mess_data: dict,
+        service: jid.JID
+    ) -> None:
+        """Send an AP message
+
+        .. note::
+
+            This is a temporary method used for development only
+
+        @param mess_data: message data. Following keys must be set:
+
+            ``node``
+              identifier of message which is being replied (this will
+              correspond to pubsub node in the future)
+
+            ``content_xhtml`` or ``content``
+              message body (respectively in XHTML or plain text)
+
+        @param service: JID corresponding to the AP actor.
+        """
+        if not service.user:
+            raise ValueError("service must have a local part")
+        account = self.host.plugins["XEP-0106"].unescape(service.user)
+        ap_actor_data = await self.getAPActorDataFromId(account)
+
+        try:
+            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
+        except KeyError:
+            raise exceptions.DataError("Can't get ActivityPub actor inbox")
+
+        item_data = await self.mbdata2APitem(client, mess_data)
+        url_actor = item_data["object"]["attributedTo"]
+        resp = await self.signAndPost(inbox_url, url_actor, item_data)
+        if resp.code != 202:
+            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")