changeset 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <>
date Mon, 31 Jan 2022 18:35:49 +0100 (2022-01-31)
parents b15644cae50d
children 43cc8c27adc7
files sat/plugins/ sat/plugins/plugin_comp_ap_gateway/ sat/plugins/plugin_comp_ap_gateway/ sat/plugins/plugin_comp_ap_gateway/ sat/plugins/plugin_comp_ap_gateway/
diffstat 5 files changed, 1316 insertions(+), 1223 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/	Tue Jan 25 17:54:06 2022 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1223 +0,0 @@
-#!/usr/bin/env python3
-# Libervia ActivityPub Gateway
-# Copyright (C) 2009-2021 Jérôme Poisson (
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# GNU Affero General Public License for more details.
-# You should have received a copy of the GNU Affero General Public License
-# along with this program.  If not, see <>.
-import base64
-import hashlib
-import json
-from pathlib import Path
-import time
-from typing import Optional, Dict, Tuple, List, Union
-from urllib import parse
-import calendar
-import re
-import unicodedata
-import dateutil
-from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.primitives import hashes
-from cryptography.hazmat.primitives.asymmetric import rsa
-from cryptography.hazmat.primitives.asymmetric import padding
-import shortuuid
-import treq
-from treq.response import _Response as TReqResponse
-from twisted.internet import defer, reactor, threads
-from twisted.web import http, resource as web_resource, server
-from twisted.words.protocols.jabber import jid, error
-from twisted.words.xish import domish
-from wokkel import pubsub, rsm
-from sat.core import exceptions
-from sat.core.constants import Const as C
-from sat.core.core_types import SatXMPPEntity
-from sat.core.i18n import _
-from sat.core.log import getLogger
-from import utils
-from import data_format, tls
-from import async_lru
-from import ensure_deferred
-log = getLogger(__name__)
-IMPORT_NAME = "ap-gateway"
-    C.PI_NAME: "ActivityPub Gateway component",
-    C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
-    C.PI_MAIN: "APGateway",
-        "Gateway for bidirectional communication between XMPP and ActivityPub."
-    ),
-VERSION = unicodedata.normalize(
-    'NFKD',
-    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
-HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
-RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
-RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
-RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
-CONF_SECTION = f"component {IMPORT_NAME}"
-CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
-TYPE_ACTOR = "actor"
-TYPE_INBOX = "inbox"
-TYPE_OUTBOX = "outbox"
-TYPE_ITEM = "item"
-MEDIA_TYPE_AP = "application/activity+json"
-# mapping from AP metadata to microblog data
-AP_MB_MAP = {
-    "content": "content_xhtml",
-AP_REQUEST_TYPES = {"actor", "outbox"}
-class HTTPAPGServer(web_resource.Resource):
-    """HTTP Server handling ActivityPub S2S protocol"""
-    isLeaf = True
-    def __init__(self, ap_gateway):
-        self.apg = ap_gateway
-        super().__init__()
-    async def webfinger(self, request):
-        url_parsed = parse.urlparse(request.uri.decode())
-        query = parse.parse_qs(url_parsed.query)
-        resource = query.get("resource", [""])[0]
-        account = resource[5:].strip()
-        if not resource.startswith("acct:") or not account:
-            return web_resource.ErrorPage(
-                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
-            ).render(request)
-        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
-        resp = {
-            "subject": resource,
-            "links": [
-                {
-                    "rel": "self",
-                    "type": "application/activity+json",
-                    "href": actor_url
-                }
-            ]
-        }
-        request.setHeader("content-type", CONTENT_TYPE_AP)
-        request.write(json.dumps(resp).encode())
-        request.finish()
-    async def APActorRequest(
-        self,
-        request: "HTTPRequest",
-        account_jid: jid.JID,
-        node: Optional[str],
-        ap_account: str,
-        actor_url: str
-    ) -> dict:
-        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
-        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
-        # we have to use AP account as preferredUsername because it is used to retrieve
-        # actor handle (see
-        preferred_username = ap_account.split("@", 1)[0]
-        return {
-            "@context": [
-                "",
-                ""
-            ],
-            "id": actor_url,
-            "type": "Person",
-            "preferredUsername": preferred_username,
-            "inbox": inbox_url,
-            "outbox": outbox_url,
-            "publicKey": {
-                "id": f"{actor_url}#main-key",
-                "owner": actor_url,
-                "publicKeyPem": self.apg.public_key_pem
-            }
-        }
-    def getCanonicalURL(self, request: "HTTPRequest") -> str:
-        return parse.urljoin(
-            f"https://{self.apg.public_url}",
-            request.path.decode().rstrip("/")
-        )
-    def queryData2RSMRequest(
-        self,
-        query_data: Dict[str, List[str]]
-    ) -> rsm.RSMRequest:
-        """Get RSM kwargs to use with RSMRequest from query data"""
-        page = query_data.get("page")
-        if page == ["first"]:
-            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
-        elif page == ["last"]:
-            return rsm.RSMRequest(max_=PAGE_SIZE)
-        else:
-            for query_key in ("index", "before", "after"):
-                try:
-                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
-                except (KeyError, IndexError, ValueError):
-                    pass
-                else:
-                    return rsm.RSMRequest(**kwargs)
-        raise ValueError(f"Invalid query data: {query_data!r}")
-    async def APOutboxPageRequest(
-        self,
-        request: "HTTPRequest",
-        account_jid: jid.JID,
-        node: Optional[str],
-        ap_account: str,
-        actor_url: str,
-        query_data: Dict[str, List[str]]
-    ) -> dict:
-        # we only keep useful keys, and sort to have consistent URL which can
-        # be used as ID
-        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
-        query_data = {k: query_data[k] for k in url_keys}
-        rsm_kwargs = self.queryData2RSMRequest(query_data)
-        try:
-            items, metadata = await self.apg._p.getItems(
-                client=self.apg.client,
-                service=account_jid,
-                node=node,
-                rsm_request=self.queryData2RSMRequest(query_data),
-                extra = {C.KEY_USE_CACHE: False}
-            )
-        except error.StanzaError as e:
-            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
-            return {}
-        base_url = self.getCanonicalURL(request)
-        url = f"{base_url}?{parse.urlencode(query_data, True)}"
-        data = {
-            "@context": "",
-            "id": url,
-            "type": "OrderedCollectionPage",
-            "partOf": base_url,
-            "orderedItems" : [
-                await self.apg.mbdata2APitem(
-                    self.apg.client,
-                    await self.apg._m.item2mbdata(
-                        self.apg.client,
-                        item,
-                        account_jid,
-                        node
-                    )
-                )
-                for item in reversed(items)
-            ]
-        }
-        # AP OrderedCollection must be in reversed chronological order, thus the opposite
-        # of what we get with RSM (at least with Libervia Pubsub)
-        if not metadata["complete"]:
-            try:
-                last= metadata["rsm"]["last"]
-            except KeyError:
-                last = None
-            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
-        if metadata["rsm"]["index"] != 0:
-            try:
-                first= metadata["rsm"]["first"]
-            except KeyError:
-                first = None
-            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
-        return data
-    async def APOutboxRequest(
-        self,
-        request: "HTTPRequest",
-        account_jid: jid.JID,
-        node: Optional[str],
-        ap_account: str,
-        actor_url: str
-    ) -> dict:
-        if node is None:
-            node = self.apg._m.namespace
-        parsed_url = parse.urlparse(request.uri.decode())
-        query_data = parse.parse_qs(parsed_url.query)
-        if query_data:
-            return await self.APOutboxPageRequest(
-                request, account_jid, node, ap_account, actor_url, query_data
-            )
-        # XXX: we can't use disco#info here because this request won't work on a bare jid
-        # due to security considerations of XEP-0030 (we don't have presence
-        # subscription).
-        # The current workaround is to do a request as if RSM was available, and actually
-        # check its availability according to result.
-        try:
-            __, metadata = await self.apg._p.getItems(
-                client=self.apg.client,
-                service=account_jid,
-                node=node,
-                max_items=0,
-                rsm_request=rsm.RSMRequest(max_=0)
-            )
-        except error.StanzaError as e:
-            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
-            return {}
-        try:
-            items_count = metadata["rsm"]["count"]
-        except KeyError:
-            log.warning(
-                f"No RSM metadata found when requesting pubsub node {node} at "
-                f"{account_jid}, defaulting to items_count=20"
-            )
-            items_count = 20
-        url = self.getCanonicalURL(request)
-        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
-        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
-        return {
-            "@context": "",
-            "id": url,
-            "totalItems": items_count,
-            "type": "OrderedCollection",
-            "first": url_first_page,
-            "last": url_last_page,
-        }
-    async def APRequest(self, request):
-        path = request.path.decode()
-        actor_url = parse.urljoin(
-            f"https://{self.apg.public_url}",
-            path
-        )
-        request_type, ap_account = self.apg.parseAPURL(actor_url)
-        account_jid, node = await self.apg.getJIDAndNode(ap_account)
-        if request_type not in AP_REQUEST_TYPES:
-            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
-        method = getattr(self, f"AP{request_type.title()}Request")
-        ret_data = await method(request, account_jid, node, ap_account, actor_url)
-        request.setHeader("content-type", CONTENT_TYPE_AP)
-        request.write(json.dumps(ret_data).encode())
-        request.finish()
-    def render(self, request):
-        request.setHeader("server", VERSION)
-        return super().render(request)
-    def render_GET(self, request):
-        path = request.path.decode().lstrip("/")
-        if path.startswith(".well-known/webfinger"):
-            defer.ensureDeferred(self.webfinger(request))
-            return server.NOT_DONE_YET
-        elif path.startswith(self.apg.ap_path):
-            defer.ensureDeferred(self.APRequest(request))
-            return server.NOT_DONE_YET
-        return web_resource.NoResource().render(request)
-class HTTPRequest(server.Request):
-    pass
-class HTTPServer(server.Site):
-    requestFactory = HTTPRequest
-    def __init__(self, ap_gateway):
-        super().__init__(HTTPAPGServer(ap_gateway))
-class APGateway:
-    def __init__(self, host):
- = host
-        self.initialised = False
-        self._m = host.plugins["XEP-0277"]
-        self._p = host.plugins["XEP-0060"]
-        host.bridge.addMethod(
-            "APSend",
-            ".plugin",
-            in_sign="sss",
-            out_sign="",
-            method=self._publishMessage,
-            async_=True,
-        )
-    def getHandler(self, __):
-        return APPubsubService(self)
-    async def init(self, client):
-        if self.initialised:
-            return
-        self.initialised = True
-"ActivityPub Gateway initialization"))
-        # RSA keys
-        stored_data = await
-            IMPORT_NAME, ["rsa_key"], profile=client.profile
-        )
-        private_key_pem = stored_data.get("rsa_key")
-        if private_key_pem is None:
-            self.private_key = await threads.deferToThread(
-                rsa.generate_private_key,
-                public_exponent=65537,
-                key_size=4096,
-            )
-            private_key_pem = self.private_key.private_bytes(
-                encoding=serialization.Encoding.PEM,
-                format=serialization.PrivateFormat.PKCS8,
-                encryption_algorithm=serialization.NoEncryption()
-            ).decode()
-            await
-                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
-            )
-        else:
-            self.private_key = serialization.load_pem_private_key(
-                private_key_pem.encode(),
-                password=None,
-            )
-        self.public_key = self.private_key.public_key()
-        self.public_key_pem = self.public_key.public_bytes(
-            encoding=serialization.Encoding.PEM,
-            format=serialization.PublicFormat.SubjectPublicKeyInfo
-        ).decode()
-        # params
-        # URL and port
-        self.public_url =
-            CONF_SECTION, "public_url"
-        ) or
-            CONF_SECTION, "xmpp_domain"
-        )
-        if self.public_url is None:
-            log.error(
-                '"public_url" not set in configuration, this is mandatory to have'
-                "ActivityPub Gateway running. Please set this option it to public facing "
-                f"url in {CONF_SECTION!r} configuration section."
-            )
-            return
-        if parse.urlparse(self.public_url).scheme:
-            log.error(
-                "Scheme must not be specified in \"public_url\", please remove it from "
-                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
-            )
-            return
-        self.http_port = int(
-            CONF_SECTION, 'http_port', 8123))
-        connection_type =
-            CONF_SECTION, 'http_connection_type', 'https')
-        if connection_type not in ('http', 'https'):
-            raise exceptions.ConfigError(
-                'bad ap-gateay http_connection_type, you must use one of "http" or '
-                '"https"'
-            )
-        self.max_items =
-            CONF_SECTION, 'new_node_max_items', 50
-        )
-        self.ap_path =, 'ap_path', '_ap')
-        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
-        # True (default) if we provide gateway only to entities/services from our server
-        self.local_only = C.bool(
-  , 'local_only', C.BOOL_TRUE)
-        )
-        # HTTP server launch
-        self.server = HTTPServer(self)
-        if connection_type == 'http':
-            reactor.listenTCP(self.http_port, self.server)
-        else:
-            options = tls.getOptionsFromConfig(
-      , CONF_SECTION)
-            tls.TLSOptionsCheck(options)
-            context_factory = tls.getTLSContextFactory(options)
-            reactor.listenSSL(self.http_port, self.server, context_factory)
-    async def profileConnecting(self, client):
-        self.client = client
-        await self.init(client)
-    async def apGet(self, url: str) -> dict:
-        """Retrieve AP JSON from given URL
-        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
-            with AP server
-        """
-        try:
-            return await treq.json_content(await treq.get(
-                url,
-                headers = {
-                    "Accept": [MEDIA_TYPE_AP],
-                    "Content-Type": [MEDIA_TYPE_AP],
-                }
-            ))
-        except Exception as e:
-            raise error.StanzaError(
-                "service-unavailable",
-                text=f"Can't get AP data at {url}: {e}"
-            )
-    def mustEncode(self, text: str) -> bool:
-        """Indicate if a text must be period encoded"""
-        return (
-            not RE_ALLOWED_UNQUOTED.match(text)
-            or text.startswith("___")
-            or "---" in text
-        )
-    def periodEncode(self, text: str) -> str:
-        """Period encode a text
-        see [getJIDAndNode] for reasons of period encoding
-        """
-        return (
-            parse.quote(text, safe="")
-            .replace("---", "%2d%2d%2d")
-            .replace("___", "%5f%5f%5f")
-            .replace(".", "%2e")
-            .replace("~", "%7e")
-            .replace("%", ".")
-        )
-    async def getAPAccountFromJidAndNode(
-        self,
-        jid_: jid.JID,
-        node: Optional[str]
-    ) -> str:
-        """Construct AP account from JID and node
-        The account construction will use escaping when necessary
-        """
-        if not node or node == self._m.namespace:
-            node = None
-        if node and not jid_.user and not self.mustEncode(node):
-            is_pubsub = self.isPubsub(jid_)
-            # when we have a pubsub service, the user part can be used to set the node
-            # this produces more user-friendly AP accounts
-            if is_pubsub:
-                jid_.user = node
-                node = None
-        is_local = self.isLocal(jid_)
-        user = jid_.user if is_local else jid_.userhost()
-        if user is None:
-            user = ""
-        account_elts = []
-        if node and self.mustEncode(node) or self.mustEncode(user):
-            account_elts = ["___"]
-            if node:
-                node = self.periodEncode(node)
-            user = self.periodEncode(user)
-        if not user:
-            raise exceptions.InternalError("there should be a user part")
-        if node:
-            account_elts.extend((node, "---"))
-        account_elts.extend((
-            user, "@", if is_local else self.client.jid.userhost()
-        ))
-        return "".join(account_elts)
-    def isLocal(self, jid_: jid.JID) -> bool:
-        """Returns True if jid_ use a domain or subdomain of gateway's host"""
-        local_host =".")
-        assert local_host
-        return".")[-len(local_host):] == local_host
-    async def isPubsub(self, jid_: jid.JID) -> bool:
-        """Indicate if a JID is a Pubsub service"""
-        host_disco = await, jid_)
-        return (
-            ("pubsub", "service") in host_disco.identities
-            and not ("pubsub", "pep") in host_disco.identities
-        )
-    async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
-        """Decode raw AP account handle to get XMPP JID and Pubsub Node
-        Username are case insensitive.
-        By default, the username correspond to local username (i.e. username from
-        component's server).
-        If local name's domain is a pubsub service (and not PEP), the username is taken as
-        a pubsub node.
-        If ``---`` is present in username, the part before is used as pubsub node, and the
-        rest as a JID user part.
-        If username starts with ``___``, characters are encoded using period encoding
-        (i.e. percent encoding where a ``.`` is used instead of ``%``).
-        This horror is necessary due to limitation in some AP implementation (notably
-        Mastodon), cf.
-        examples::
-        ```` => JID =, node = None
-        ```` => JID = (this one is a non-local JID, and will work only if setings ``local_only`` is False), node = None
-        ```` (with being a pubsub service) =>
-        JID =, node = toto
-        ```` => JID =, node = tata
-        ```` (with
-        being a pubsub service) ==> JID =, node = urn:xmpp:microblog:0
-        @param ap_account: ActivityPub account handle (``username@domain.tld``)
-        @return: service JID and pubsub node
-            if pubsub is None, default microblog pubsub node (and possibly other nodes
-            that plugins may hanlde) will be used
-        @raise ValueError: invalid account
-        @raise PermissionError: non local jid is used when gateway doesn't allow them
-        """
-        if ap_account.count("@") != 1:
-            raise ValueError("Invalid AP account")
-        if ap_account.startswith("___"):
-            encoded = True
-            ap_account = ap_account[3:]
-        else:
-            encoded = False
-        username, domain = ap_account.split("@")
-        if "---" in username:
-            node, username = username.rsplit("---", 1)
-        else:
-            node = None
-        if encoded:
-            username = parse.unquote(
-                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
-                errors="strict"
-            )
-            if node:
-                node = parse.unquote(
-                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
-                    errors="strict"
-                )
-        if "@" in username:
-            username, domain = username.rsplit("@", 1)
-        if not node:
-            # we need to check host disco, because disco request to user may be
-            # blocked for privacy reason (see
-            #
-            is_pubsub = await self.isPubsub(jid.JID(domain))
-            if is_pubsub:
-                # if the host is a pubsub service and not a PEP, we consider that username
-                # is in fact the node name
-                node = username
-                username = None
-        jid_s = f"{username}@{domain}" if username else domain
-        try:
-            jid_ = jid.JID(jid_s)
-        except RuntimeError:
-            raise ValueError(f"Invalid jid: {jid_s!r}")
-        if self.local_only and not self.isLocal(jid_):
-            raise exceptions.PermissionError(
-                "This gateway is configured to map only local entities and services"
-            )
-        return jid_, node
-    def parseAPURL(self, url: str) -> Tuple[str, str]:
-        """Parse an URL leading to an AP endpoint
-        @param url: URL to parse (schema is not mandatory)
-        @return: endpoint type and AP account
-        """
-        path = parse.urlparse(url).path.lstrip("/")
-        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
-        return type_, parse.unquote(account)
-    def buildAPURL(self, type_:str , *args: str) -> str:
-        """Build an AP endpoint URL
-        @param type_: type of AP endpoing
-        @param arg: endpoint dependant arguments
-        """
-        return parse.urljoin(
-            self.base_ap_url,
-            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
-        )
-    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
-        """Sign a documentent and post it to AP server
-        @param url: AP server endpoint
-        @param url_actor: URL generated by this gateway for local actor
-        @param doc: document to send
-        """
-        p_url = parse.urlparse(url)
-        date = http.datetimeToString().decode()
-        body = json.dumps(doc).encode()
-        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
-        digest = f"sha-256={digest_hash}"
-        to_sign = (
-            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
-            f"date: {date}\ndigest: {digest}"
-        )
-        signature = base64.b64encode(self.private_key.sign(
-            to_sign.encode(),
-            # we have to use PKCS1v15 padding to be compatible with Mastodon
-            padding.PKCS1v15(),
-            hashes.SHA256()
-        )).decode()
-        h_signature = (
-            f'keyId="{url_actor}",headers="(request-target) host date digest",'
-            f'signature="{signature}"'
-        )
-        return await
-            url,
-            body,
-            headers={
-                "Host": [p_url.hostname],
-                "Date": [date],
-                "Digest": [digest],
-                "Signature": [h_signature],
-            }
-        )
-    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
-        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
-        service = jid.JID(service_s)
-        client =
-        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
-    async def getAPActorIdFromAccount(self, account: str) -> str:
-        """Retrieve account ID from it's handle using WebFinger
-        @param account: AP handle (user@domain.tld)
-        @return: Actor ID (which is an URL)
-        """
-        if account.count("@") != 1 or "/" in account:
-            raise ValueError("Invalid account: {account!r}")
-        host = account.split("@")[1]
-        try:
-            finger_data = await treq.json_content(await treq.get(
-                f"https://{host}/.well-known/webfinger?"
-                f"resource=acct:{parse.quote_plus(account)}",
-            ))
-        except Exception as e:
-            raise exceptions.DataError(f"Can't get webfinger data: {e}")
-        for link in finger_data.get("links", []):
-            if (
-                link.get("type") == "application/activity+json"
-                and link.get("rel") == "self"
-            ):
-                href = link.get("href", "").strip()
-                if not href:
-                    raise ValueError(
-                        f"Invalid webfinger data for {account:r}: missing href"
-                    )
-                break
-        else:
-            raise ValueError(
-                f"No ActivityPub link found for {account!r}"
-            )
-        return href
-    async def getAPActorDataFromId(self, account: str) -> dict:
-        """Retrieve ActivityPub Actor data
-        @param account: ActivityPub Actor identifier
-        """
-        href = await self.getAPActorIdFromAccount(account)
-        return await self.apGet(href)
-    @async_lru(maxsize=LRU_MAX_SIZE)
-    async def getAPAccountFromId(self, actor_id: str):
-        """Retrieve AP account from the ID URL
-        @param actor_id: AP ID of the actor (URL to the actor data)
-        """
-        url_parsed = parse.urlparse(actor_id)
-        actor_data = await self.apGet(actor_id)
-        username = actor_data.get("preferredUsername")
-        if not username:
-            raise exceptions.DataError(
-                'No "preferredUsername" field found, can\'t retrieve actor account'
-            )
-        account = f"{username}@{url_parsed.hostname}"
-        # we try to retrieve the actor ID from the account to check it
-        found_id = await self.getAPActorIdFromAccount(account)
-        if found_id != actor_id:
-            # cf.
-            msg = (
-                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
-                f"({actor_id!r}). This AP instance doesn't seems to use "
-                '"preferredUsername" as we expect.'
-            )
-            log.warning(msg)
-            raise exceptions.DataError(msg)
-        return account
-    async def getAPItems(
-        self,
-        account: str,
-        max_items: Optional[int] = None,
-        chronological_pagination: bool = True,
-        after_id: Optional[str] = None,
-        start_index: Optional[int] = None,
-    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
-        """Retrieve AP items and convert them to XMPP items
-        @param account: AP account to get items from
-        @param max_items: maximum number of items to retrieve
-            retrieve all items by default
-        @param chronological_pagination: get pages in chronological order
-            AP use reversed chronological order for pagination, "first" page returns more
-            recent items. If "chronological_pagination" is True, "last" AP page will be
-            retrieved first.
-        @param after_id: if set, retrieve items starting from given ID
-            Due to ActivityStream Collection Paging limitations, this is inefficient and
-            if ``after_id`` is not already in cache, we have to retrieve every page until
-            we find it.
-            In most common cases, ``after_id`` should be in cache though (client usually
-            use known ID when in-order pagination is used).
-        @param start_index: start retrieving items from the one with given index
-            Due to ActivityStream Collection Paging limitations, this is inefficient and
-            all pages before the requested index will be retrieved to count items.
-        @return: XMPP Pubsub items and corresponding RSM Response
-            Items are always returned in chronological order in the result
-        """
-        actor_data = await self.getAPActorDataFromId(account)
-        outbox = actor_data.get("outbox")
-        rsm_resp: Dict[str, Union[bool, int]] = {}
-        if not outbox:
-            raise exceptions.DataError(f"No outbox found for actor {account}")
-        outbox_data = await self.apGet(outbox)
-        try:
-            count = outbox_data["totalItems"]
-        except KeyError:
-            log.warning(
-                f'"totalItems" not found in outbox of {account}, defaulting to 20'
-            )
-            count = 20
-        else:
-  "{account}'s outbox has {count} item(s)")
-            rsm_resp["count"] = count
-        if start_index is not None:
-            assert chronological_pagination and after_id is None
-            if start_index >= count:
-                return [], rsm_resp
-            elif start_index == 0:
-                # this is the default behaviour
-                pass
-            elif start_index > 5000:
-                raise error.StanzaError(
-                    "feature-not-implemented",
-                    text="Maximum limit for previous_index has been reached, this limit"
-                    "is set to avoid DoS"
-                )
-            else:
-                # we'll convert "start_index" to "after_id", thus we need the item just
-                # before "start_index"
-                previous_index = start_index - 1
-                retrieved_items = 0
-                current_page = outbox_data["last"]
-                while retrieved_items < count:
-                    page_data, items = await self.parseAPPage(current_page)
-                    if not items:
-                        log.warning(f"found an empty AP page at {current_page}")
-                        return [], rsm_resp
-                    page_start_idx = retrieved_items
-                    retrieved_items += len(items)
-                    if previous_index <= retrieved_items:
-                        after_id = items[previous_index - page_start_idx]["id"]
-                        break
-                    try:
-                        current_page = page_data["prev"]
-                    except KeyError:
-                        log.warning(
-                            f"missing previous page link at {current_page}: {page_data!r}"
-                        )
-                        raise error.StanzaError(
-                            "service-unavailable",
-                            "Error while retrieving previous page from AP service at "
-                            f"{current_page}"
-                        )
-        init_page = "last" if chronological_pagination else "first"
-        page = outbox_data.get(init_page)
-        if not page:
-            raise exceptions.DataError(
-                f"Initial page {init_page!r} not found for outbox {outbox}"
-            )
-        items = []
-        page_items = []
-        retrieved_items = 0
-        found_after_id = False
-        while retrieved_items < count:
-            __, page_items = await self.parseAPPage(page)
-            retrieved_items += len(page_items)
-            if after_id is not None and not found_after_id:
-                # if we have an after_id, we ignore all items until the requested one is
-                # found
-                limit_idx = [i["id"] for i in page_items].index(after_id)
-                if limit_idx == -1:
-                    # if "after_id" is not found, we don't add any item from this page
-                    log.debug(f"{after_id!r} not found at {page}, skipping")
-                else:
-                    found_after_id = True
-                    if chronological_pagination:
-                        page_items = page_items[limit_idx+1:]
-                        start_index = retrieved_items - len(page_items) + limit_idx + 1
-                    else:
-                        page_items = page_items[:limit_idx]
-                        start_index = count - (retrieved_items - len(page_items) +
-                                               limit_idx + 1)
-                    items.extend(page_items)
-            else:
-                items.extend(page_items)
-            if max_items is not None and len(items) >= max_items:
-                if chronological_pagination:
-                    items = items[:max_items]
-                else:
-                    items = items[-max_items:]
-                break
-            page = outbox_data.get("prev" if chronological_pagination else "next")
-            if not page:
-                break
-        if after_id is not None and not found_after_id:
-            raise error.StanzaError("item-not-found")
-        if after_id is None:
-            rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
-        if start_index is not None:
-            rsm_resp["index"] = start_index
-        elif after_id is not None:
-            log.warning("Can't determine index of first element")
-        elif chronological_pagination:
-            rsm_resp["index"] = 0
-        else:
-            rsm_resp["index"] = count - len(items)
-        if items:
-            rsm_resp.update({
-                "first": items[0]["id"],
-                "last": items[-1]["id"]
-            })
-        return items, rsm.RSMResponse(**rsm_resp)
-    async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
-        """Convert AP objects from an AP page to XMPP items
-        @param url: url linking and AP page
-        @return: page data, pubsub items
-        """
-        page_data = await self.apGet(url)
-        ap_items = page_data.get("orderedItems")
-        if not ap_items:
-            log.warning('No "orderedItems" collection found')
-            return page_data, []
-        items = []
-        # AP Collections are in antichronological order, but we expect chronological in
-        # Pubsub, thus we reverse it
-        for ap_item in reversed(ap_items):
-            try:
-                ap_object, mb_data = await self.apItem2MBdata(ap_item)
-            except (exceptions.DataError, NotImplementedError, error.StanzaError):
-                continue
-            item_elt = await self._m.data2entry(
-                self.client, mb_data, ap_object["id"], None, self._m.namespace
-            )
-            item_elt["publisher"] = mb_data["author_jid"].full()
-            items.append(item_elt)
-        return page_data, items
-    async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
-        """Convert AP item to microblog data
-        @return: AP Item's Object and microblog data
-        @raise exceptions.DataError: something is invalid in the AP item
-        @raise NotImplemented: some AP data is not handled yet
-        @raise error.StanzaError: error while contacting the AP server
-        """
-        ap_object = ap_item.get("object")
-        if not ap_object:
-            log.warning(f'No "object" found in AP item {ap_item!r}')
-            raise exceptions.DataError
-        if isinstance(ap_object, str):
-            ap_object = await self.apGet(ap_object)
-        obj_id = ap_object.get("id")
-        if not obj_id:
-            log.warning(f'No "id" found in AP object: {ap_object!r}')
-            raise exceptions.DataError
-        if ap_object.get("inReplyTo") is not None:
-            raise NotImplementedError
-        mb_data = {}
-        for ap_key, mb_key in AP_MB_MAP.items():
-            data = ap_object.get(ap_key)
-            if data is None:
-                continue
-            mb_data[mb_key] = data
-        # content
-        try:
-            language, content_xhtml = ap_object["contentMap"].popitem()
-        except (KeyError, AttributeError):
-            try:
-                mb_data["content_xhtml"] = mb_data["content"]
-            except KeyError:
-                log.warning(f"no content found:\n{ap_object!r}")
-                raise exceptions.DataError
-        else:
-            mb_data["language"] = language
-            mb_data["content_xhtml"] = content_xhtml
-        # author
-        actor = ap_item.get("actor")
-        if not actor:
-            log.warning(f"no actor associated to object id {obj_id!r}")
-            raise exceptions.DataError
-        elif isinstance(actor, list):
-            # we only keep first item of list as author
-            # TODO: handle multiple actors
-            if len(actor) > 1:
-                log.warning("multiple actors are not managed")
-            actor = actor[0]
-        if isinstance(actor, dict):
-            actor = actor.get("id")
-            if not actor:
-                log.warning(f"no actor id found: {actor!r}")
-                raise exceptions.DataError
-        if isinstance(actor, str):
-            account = await self.getAPAccountFromId(actor)
-            mb_data["author"] = account.split("@", 1)[0]
-            author_jid = mb_data["author_jid"] = jid.JID(
-                None,
-                (
-          ["XEP-0106"].escape(account),
-          ,
-                    None
-                )
-            )
-        else:
-            log.warning(f"unknown actor type found: {actor!r}")
-            raise exceptions.DataError
-        # published/updated
-        for field in ("published", "updated"):
-            value = ap_object.get(field)
-            if not value and field == "updated":
-                value = ap_object.get("published")
-            if value:
-                try:
-                    mb_data[field] = calendar.timegm(
-                        dateutil.parser.parse(str(value)).utctimetuple()
-                    )
-                except dateutil.parser.ParserError as e:
-                    log.warning(f"Can't parse {field!r} field: {e}")
-        return ap_object, mb_data
-    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
-        """Convert Libervia Microblog Data to ActivityPub item"""
-        if not mb_data.get("id"):
-            mb_data["id"] = shortuuid.uuid()
-        if not mb_data.get("author_jid"):
-            mb_data["author_jid"] = client.jid
-        ap_account = await self.getAPAccountFromJidAndNode(
-            jid.JID(mb_data["author_jid"]),
-            None
-        )
-        url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
-        url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
-        return {
-            "@context": "",
-            "id": url_item,
-            "type": "Create",
-            "actor": url_actor,
-            "object": {
-                "id": url_item,
-                "type": "Note",
-                "published": utils.xmpp_date(mb_data["published"]),
-                "attributedTo": url_actor,
-                "content": mb_data.get("content_xhtml") or mb_data["content"],
-                "to": ""
-            }
-        }
-    async def publishMessage(
-        self,
-        client: SatXMPPEntity,
-        mess_data: dict,
-        service: jid.JID
-    ) -> None:
-        """Send an AP message
-        .. note::
-            This is a temporary method used for development only
-        @param mess_data: message data. Following keys must be set:
-            ``node``
-              identifier of message which is being replied (this will
-              correspond to pubsub node in the future)
-            ``content_xhtml`` or ``content``
-              message body (respectively in XHTML or plain text)
-        @param service: JID corresponding to the AP actor.
-        """
-        if not service.user:
-            raise ValueError("service must have a local part")
-        account =["XEP-0106"].unescape(service.user)
-        ap_actor_data = await self.getAPActorDataFromId(account)
-        try:
-            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
-        except KeyError:
-            raise exceptions.DataError("Can't get ActivityPub actor inbox")
-        item_data = await self.mbdata2APitem(client, mess_data)
-        url_actor = item_data["object"]["attributedTo"]
-        resp = await self.signAndPost(inbox_url, url_actor, item_data)
-        if resp.code != 202:
-            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
-class APPubsubService(rsm.PubSubService):
-    """Pubsub service for XMPP requests"""
-    def __init__(self, apg):
-        super(APPubsubService, self).__init__()
- =
-        self.apg = apg
-        self.discoIdentity = {
-            "category": "pubsub",
-            "type": "service",
-            "name": "Libervia ActivityPub Gateway",
-        }
-    @ensure_deferred
-    async def publish(self, requestor, service, nodeIdentifier, items):
-        raise NotImplementedError
-    @ensure_deferred
-    async def items(
-        self,
-        requestor: jid.JID,
-        service: jid.JID,
-        node: str,
-        maxItems: Optional[int],
-        itemIdentifiers: Optional[List[str]],
-        rsm_req: Optional[rsm.RSMRequest]
-    ) -> List[domish.Element]:
-        if not service.user:
-            return []
-        ap_account =["XEP-0106"].unescape(service.user)
-        if ap_account.count("@") != 1:
-            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
-            return []
-        if node != self.apg._m.namespace:
-            raise error.StanzaError(
-                "feature-not-implemented",
-                text=f"{VERSION} only supports {self.apg._m.namespace} "
-                "node for now"
-            )
-        if rsm_req is None:
-            if maxItems is None:
-                maxItems = 20
-            kwargs = {
-                "max_items": maxItems,
-                "chronological_pagination": False,
-            }
-        else:
-            if len(
-                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
-                 if v is not None]
-            ) > 1:
-                raise error.StanzaError(
-                    "bad-request",
-                    text="You can't use after, before and index at the same time"
-                )
-            kwargs = {"max_items": rsm_req.max}
-            if rsm_req.after is not None:
-                kwargs["after_id"] = rsm_req.after
-            elif rsm_req.before is not None:
-                kwargs["chronological_pagination"] = False
-                if rsm_req.before != "":
-                    kwargs["after_id"] = rsm_req.before
-            elif rsm_req.index is not None:
-                kwargs["start_index"] = rsm_req.index
-            f"No cache found for node {node} at {service} (AP account {ap_account}), "
-            "using Collection Paging to RSM translation"
-        )
-        return await self.apg.getAPItems(ap_account, **kwargs)
-    @ensure_deferred
-    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
-        raise NotImplementedError
-    def getNodeInfo(
-        self,
-        requestor: jid.JID,
-        service: jid.JID,
-        nodeIdentifier: str,
-        pep: bool = False,
-        recipient: Optional[jid.JID] = None
-    ) -> Optional[dict]:
-        if not nodeIdentifier:
-            return None
-        info = {
-            "type": "leaf",
-            "meta-data": [
-                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
-                {"var": "pubsub#max_items", "value": "max"},
-                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
-                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
-            ]
-        }
-        return info
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,853 @@
+#!/usr/bin/env python3
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+import base64
+import hashlib
+import json
+from pathlib import Path
+from typing import Optional, Dict, Tuple, List, Union
+from urllib import parse
+import calendar
+import re
+import dateutil
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import padding
+import shortuuid
+import treq
+from treq.response import _Response as TReqResponse
+from twisted.internet import defer, reactor, threads
+from twisted.web import http
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import rsm
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from import utils
+from import data_format, tls
+from import async_lru
+                       AP_MB_MAP, LRU_MAX_SIZE)
+from .http_server import HTTPServer
+from .pubsub_service import APPubsubService
+log = getLogger(__name__)
+IMPORT_NAME = "ap-gateway"
+    C.PI_NAME: "ActivityPub Gateway component",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
+    C.PI_MAIN: "APGateway",
+        "Gateway for bidirectional communication between XMPP and ActivityPub."
+    ),
+HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
+RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
+RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
+RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
+class APGateway:
+    def __init__(self, host):
+ = host
+        self.initialised = False
+        self._m = host.plugins["XEP-0277"]
+        self._p = host.plugins["XEP-0060"]
+        host.bridge.addMethod(
+            "APSend",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._publishMessage,
+            async_=True,
+        )
+    def getHandler(self, __):
+        return APPubsubService(self)
+    async def init(self, client):
+        if self.initialised:
+            return
+        self.initialised = True
+"ActivityPub Gateway initialization"))
+        # RSA keys
+        stored_data = await
+            IMPORT_NAME, ["rsa_key"], profile=client.profile
+        )
+        private_key_pem = stored_data.get("rsa_key")
+        if private_key_pem is None:
+            self.private_key = await threads.deferToThread(
+                rsa.generate_private_key,
+                public_exponent=65537,
+                key_size=4096,
+            )
+            private_key_pem = self.private_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.PKCS8,
+                encryption_algorithm=serialization.NoEncryption()
+            ).decode()
+            await
+                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
+            )
+        else:
+            self.private_key = serialization.load_pem_private_key(
+                private_key_pem.encode(),
+                password=None,
+            )
+        self.public_key = self.private_key.public_key()
+        self.public_key_pem = self.public_key.public_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PublicFormat.SubjectPublicKeyInfo
+        ).decode()
+        # params
+        # URL and port
+        self.public_url =
+            CONF_SECTION, "public_url"
+        ) or
+            CONF_SECTION, "xmpp_domain"
+        )
+        if self.public_url is None:
+            log.error(
+                '"public_url" not set in configuration, this is mandatory to have'
+                "ActivityPub Gateway running. Please set this option it to public facing "
+                f"url in {CONF_SECTION!r} configuration section."
+            )
+            return
+        if parse.urlparse(self.public_url).scheme:
+            log.error(
+                "Scheme must not be specified in \"public_url\", please remove it from "
+                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
+            )
+            return
+        self.http_port = int(
+            CONF_SECTION, 'http_port', 8123))
+        connection_type =
+            CONF_SECTION, 'http_connection_type', 'https')
+        if connection_type not in ('http', 'https'):
+            raise exceptions.ConfigError(
+                'bad ap-gateay http_connection_type, you must use one of "http" or '
+                '"https"'
+            )
+        self.max_items =
+            CONF_SECTION, 'new_node_max_items', 50
+        )
+        self.ap_path =, 'ap_path', '_ap')
+        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
+        # True (default) if we provide gateway only to entities/services from our server
+        self.local_only = C.bool(
+  , 'local_only', C.BOOL_TRUE)
+        )
+        # HTTP server launch
+        self.server = HTTPServer(self)
+        if connection_type == 'http':
+            reactor.listenTCP(self.http_port, self.server)
+        else:
+            options = tls.getOptionsFromConfig(
+      , CONF_SECTION)
+            tls.TLSOptionsCheck(options)
+            context_factory = tls.getTLSContextFactory(options)
+            reactor.listenSSL(self.http_port, self.server, context_factory)
+    async def profileConnecting(self, client):
+        self.client = client
+        await self.init(client)
+    async def apGet(self, url: str) -> dict:
+        """Retrieve AP JSON from given URL
+        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
+            with AP server
+        """
+        try:
+            return await treq.json_content(await treq.get(
+                url,
+                headers = {
+                    "Accept": [MEDIA_TYPE_AP],
+                    "Content-Type": [MEDIA_TYPE_AP],
+                }
+            ))
+        except Exception as e:
+            raise error.StanzaError(
+                "service-unavailable",
+                text=f"Can't get AP data at {url}: {e}"
+            )
+    def mustEncode(self, text: str) -> bool:
+        """Indicate if a text must be period encoded"""
+        return (
+            not RE_ALLOWED_UNQUOTED.match(text)
+            or text.startswith("___")
+            or "---" in text
+        )
+    def periodEncode(self, text: str) -> str:
+        """Period encode a text
+        see [getJIDAndNode] for reasons of period encoding
+        """
+        return (
+            parse.quote(text, safe="")
+            .replace("---", "%2d%2d%2d")
+            .replace("___", "%5f%5f%5f")
+            .replace(".", "%2e")
+            .replace("~", "%7e")
+            .replace("%", ".")
+        )
+    async def getAPAccountFromJidAndNode(
+        self,
+        jid_: jid.JID,
+        node: Optional[str]
+    ) -> str:
+        """Construct AP account from JID and node
+        The account construction will use escaping when necessary
+        """
+        if not node or node == self._m.namespace:
+            node = None
+        if node and not jid_.user and not self.mustEncode(node):
+            is_pubsub = await self.isPubsub(jid_)
+            # when we have a pubsub service, the user part can be used to set the node
+            # this produces more user-friendly AP accounts
+            if is_pubsub:
+                jid_.user = node
+                node = None
+        is_local = self.isLocal(jid_)
+        user = jid_.user if is_local else jid_.userhost()
+        if user is None:
+            user = ""
+        account_elts = []
+        if node and self.mustEncode(node) or self.mustEncode(user):
+            account_elts = ["___"]
+            if node:
+                node = self.periodEncode(node)
+            user = self.periodEncode(user)
+        if not user:
+            raise exceptions.InternalError("there should be a user part")
+        if node:
+            account_elts.extend((node, "---"))
+        account_elts.extend((
+            user, "@", if is_local else self.client.jid.userhost()
+        ))
+        return "".join(account_elts)
+    def isLocal(self, jid_: jid.JID) -> bool:
+        """Returns True if jid_ use a domain or subdomain of gateway's host"""
+        local_host =".")
+        assert local_host
+        return".")[-len(local_host):] == local_host
+    async def isPubsub(self, jid_: jid.JID) -> bool:
+        """Indicate if a JID is a Pubsub service"""
+        host_disco = await, jid_)
+        return (
+            ("pubsub", "service") in host_disco.identities
+            and not ("pubsub", "pep") in host_disco.identities
+        )
+    async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
+        """Decode raw AP account handle to get XMPP JID and Pubsub Node
+        Username are case insensitive.
+        By default, the username correspond to local username (i.e. username from
+        component's server).
+        If local name's domain is a pubsub service (and not PEP), the username is taken as
+        a pubsub node.
+        If ``---`` is present in username, the part before is used as pubsub node, and the
+        rest as a JID user part.
+        If username starts with ``___``, characters are encoded using period encoding
+        (i.e. percent encoding where a ``.`` is used instead of ``%``).
+        This horror is necessary due to limitation in some AP implementation (notably
+        Mastodon), cf.
+        examples:
+        ```` => JID =, node = None
+        ```` => JID = (this one is a
+        non-local JID, and will work only if setings ``local_only`` is False), node = None
+        ```` (with being a pubsub service) =>
+        JID =, node = toto
+        ```` => JID =, node = tata
+        ```` (with
+        being a pubsub service) ==> JID =, node = urn:xmpp:microblog:0
+        @param ap_account: ActivityPub account handle (``username@domain.tld``)
+        @return: service JID and pubsub node
+            if pubsub is None, default microblog pubsub node (and possibly other nodes
+            that plugins may hanlde) will be used
+        @raise ValueError: invalid account
+        @raise PermissionError: non local jid is used when gateway doesn't allow them
+        """
+        if ap_account.count("@") != 1:
+            raise ValueError("Invalid AP account")
+        if ap_account.startswith("___"):
+            encoded = True
+            ap_account = ap_account[3:]
+        else:
+            encoded = False
+        username, domain = ap_account.split("@")
+        if "---" in username:
+            node, username = username.rsplit("---", 1)
+        else:
+            node = None
+        if encoded:
+            username = parse.unquote(
+                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
+                errors="strict"
+            )
+            if node:
+                node = parse.unquote(
+                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
+                    errors="strict"
+                )
+        if "@" in username:
+            username, domain = username.rsplit("@", 1)
+        if not node:
+            # we need to check host disco, because disco request to user may be
+            # blocked for privacy reason (see
+            #
+            is_pubsub = await self.isPubsub(jid.JID(domain))
+            if is_pubsub:
+                # if the host is a pubsub service and not a PEP, we consider that username
+                # is in fact the node name
+                node = username
+                username = None
+        jid_s = f"{username}@{domain}" if username else domain
+        try:
+            jid_ = jid.JID(jid_s)
+        except RuntimeError:
+            raise ValueError(f"Invalid jid: {jid_s!r}")
+        if self.local_only and not self.isLocal(jid_):
+            raise exceptions.PermissionError(
+                "This gateway is configured to map only local entities and services"
+            )
+        return jid_, node
+    def parseAPURL(self, url: str) -> Tuple[str, str]:
+        """Parse an URL leading to an AP endpoint
+        @param url: URL to parse (schema is not mandatory)
+        @return: endpoint type and AP account
+        """
+        path = parse.urlparse(url).path.lstrip("/")
+        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
+        return type_, parse.unquote(account)
+    def buildAPURL(self, type_:str , *args: str) -> str:
+        """Build an AP endpoint URL
+        @param type_: type of AP endpoing
+        @param arg: endpoint dependant arguments
+        """
+        return parse.urljoin(
+            self.base_ap_url,
+            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
+        )
+    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
+        """Sign a documentent and post it to AP server
+        @param url: AP server endpoint
+        @param url_actor: URL generated by this gateway for local actor
+        @param doc: document to send
+        """
+        p_url = parse.urlparse(url)
+        date = http.datetimeToString().decode()
+        body = json.dumps(doc).encode()
+        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
+        digest = f"sha-256={digest_hash}"
+        to_sign = (
+            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
+            f"date: {date}\ndigest: {digest}"
+        )
+        signature = base64.b64encode(self.private_key.sign(
+            to_sign.encode(),
+            # we have to use PKCS1v15 padding to be compatible with Mastodon
+            padding.PKCS1v15(),
+            hashes.SHA256()
+        )).decode()
+        h_signature = (
+            f'keyId="{url_actor}",headers="(request-target) host date digest",'
+            f'signature="{signature}"'
+        )
+        return await
+            url,
+            body,
+            headers={
+                "Host": [p_url.hostname],
+                "Date": [date],
+                "Digest": [digest],
+                "Signature": [h_signature],
+            }
+        )
+    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
+        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
+        service = jid.JID(service_s)
+        client =
+        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
+    async def getAPActorIdFromAccount(self, account: str) -> str:
+        """Retrieve account ID from it's handle using WebFinger
+        @param account: AP handle (user@domain.tld)
+        @return: Actor ID (which is an URL)
+        """
+        if account.count("@") != 1 or "/" in account:
+            raise ValueError("Invalid account: {account!r}")
+        host = account.split("@")[1]
+        try:
+            finger_data = await treq.json_content(await treq.get(
+                f"https://{host}/.well-known/webfinger?"
+                f"resource=acct:{parse.quote_plus(account)}",
+            ))
+        except Exception as e:
+            raise exceptions.DataError(f"Can't get webfinger data: {e}")
+        for link in finger_data.get("links", []):
+            if (
+                link.get("type") == "application/activity+json"
+                and link.get("rel") == "self"
+            ):
+                href = link.get("href", "").strip()
+                if not href:
+                    raise ValueError(
+                        f"Invalid webfinger data for {account:r}: missing href"
+                    )
+                break
+        else:
+            raise ValueError(
+                f"No ActivityPub link found for {account!r}"
+            )
+        return href
+    async def getAPActorDataFromId(self, account: str) -> dict:
+        """Retrieve ActivityPub Actor data
+        @param account: ActivityPub Actor identifier
+        """
+        href = await self.getAPActorIdFromAccount(account)
+        return await self.apGet(href)
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getAPAccountFromId(self, actor_id: str):
+        """Retrieve AP account from the ID URL
+        @param actor_id: AP ID of the actor (URL to the actor data)
+        """
+        url_parsed = parse.urlparse(actor_id)
+        actor_data = await self.apGet(actor_id)
+        username = actor_data.get("preferredUsername")
+        if not username:
+            raise exceptions.DataError(
+                'No "preferredUsername" field found, can\'t retrieve actor account'
+            )
+        account = f"{username}@{url_parsed.hostname}"
+        # we try to retrieve the actor ID from the account to check it
+        found_id = await self.getAPActorIdFromAccount(account)
+        if found_id != actor_id:
+            # cf.
+            msg = (
+                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
+                f"({actor_id!r}). This AP instance doesn't seems to use "
+                '"preferredUsername" as we expect.'
+            )
+            log.warning(msg)
+            raise exceptions.DataError(msg)
+        return account
+    async def getAPItems(
+        self,
+        account: str,
+        max_items: Optional[int] = None,
+        chronological_pagination: bool = True,
+        after_id: Optional[str] = None,
+        start_index: Optional[int] = None,
+    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
+        """Retrieve AP items and convert them to XMPP items
+        @param account: AP account handle to get items from
+        @param max_items: maximum number of items to retrieve
+            retrieve all items by default
+        @param chronological_pagination: get pages in chronological order
+            AP use reversed chronological order for pagination, "first" page returns more
+            recent items. If "chronological_pagination" is True, "last" AP page will be
+            retrieved first.
+        @param after_id: if set, retrieve items starting from given ID
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            if ``after_id`` is not already in cache, we have to retrieve every page until
+            we find it.
+            In most common cases, ``after_id`` should be in cache though (client usually
+            use known ID when in-order pagination is used).
+        @param start_index: start retrieving items from the one with given index
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            all pages before the requested index will be retrieved to count items.
+        @return: XMPP Pubsub items and corresponding RSM Response
+            Items are always returned in chronological order in the result
+        """
+        actor_data = await self.getAPActorDataFromId(account)
+        outbox = actor_data.get("outbox")
+        rsm_resp: Dict[str, Union[bool, int]] = {}
+        if not outbox:
+            raise exceptions.DataError(f"No outbox found for actor {account}")
+        outbox_data = await self.apGet(outbox)
+        try:
+            count = outbox_data["totalItems"]
+        except KeyError:
+            log.warning(
+                f'"totalItems" not found in outbox of {account}, defaulting to 20'
+            )
+            count = 20
+        else:
+  "{account}'s outbox has {count} item(s)")
+            rsm_resp["count"] = count
+        if start_index is not None:
+            assert chronological_pagination and after_id is None
+            if start_index >= count:
+                return [], rsm_resp
+            elif start_index == 0:
+                # this is the default behaviour
+                pass
+            elif start_index > 5000:
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text="Maximum limit for previous_index has been reached, this limit"
+                    "is set to avoid DoS"
+                )
+            else:
+                # we'll convert "start_index" to "after_id", thus we need the item just
+                # before "start_index"
+                previous_index = start_index - 1
+                retrieved_items = 0
+                current_page = outbox_data["last"]
+                while retrieved_items < count:
+                    page_data, items = await self.parseAPPage(current_page)
+                    if not items:
+                        log.warning(f"found an empty AP page at {current_page}")
+                        return [], rsm_resp
+                    page_start_idx = retrieved_items
+                    retrieved_items += len(items)
+                    if previous_index <= retrieved_items:
+                        after_id = items[previous_index - page_start_idx]["id"]
+                        break
+                    try:
+                        current_page = page_data["prev"]
+                    except KeyError:
+                        log.warning(
+                            f"missing previous page link at {current_page}: {page_data!r}"
+                        )
+                        raise error.StanzaError(
+                            "service-unavailable",
+                            "Error while retrieving previous page from AP service at "
+                            f"{current_page}"
+                        )
+        init_page = "last" if chronological_pagination else "first"
+        page = outbox_data.get(init_page)
+        if not page:
+            raise exceptions.DataError(
+                f"Initial page {init_page!r} not found for outbox {outbox}"
+            )
+        items = []
+        page_items = []
+        retrieved_items = 0
+        found_after_id = False
+        while retrieved_items < count:
+            __, page_items = await self.parseAPPage(page)
+            retrieved_items += len(page_items)
+            if after_id is not None and not found_after_id:
+                # if we have an after_id, we ignore all items until the requested one is
+                # found
+                try:
+                    limit_idx = [i["id"] for i in page_items].index(after_id)
+                except ValueError:
+                    # if "after_id" is not found, we don't add any item from this page
+                    log.debug(f"{after_id!r} not found at {page}, skipping")
+                else:
+                    found_after_id = True
+                    if chronological_pagination:
+                        start_index = retrieved_items - len(page_items) + limit_idx + 1
+                        page_items = page_items[limit_idx+1:]
+                    else:
+                        start_index = count - (retrieved_items - len(page_items) +
+                                               limit_idx + 1)
+                        page_items = page_items[:limit_idx]
+                    items.extend(page_items)
+            else:
+                items.extend(page_items)
+            if max_items is not None and len(items) >= max_items:
+                if chronological_pagination:
+                    items = items[:max_items]
+                else:
+                    items = items[-max_items:]
+                break
+            page = outbox_data.get("prev" if chronological_pagination else "next")
+            if not page:
+                break
+        if after_id is not None and not found_after_id:
+            raise error.StanzaError("item-not-found")
+        if after_id is None:
+            rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
+        if start_index is not None:
+            rsm_resp["index"] = start_index
+        elif after_id is not None:
+            log.warning("Can't determine index of first element")
+        elif chronological_pagination:
+            rsm_resp["index"] = 0
+        else:
+            rsm_resp["index"] = count - len(items)
+        if items:
+            rsm_resp.update({
+                "first": items[0]["id"],
+                "last": items[-1]["id"]
+            })
+        return items, rsm.RSMResponse(**rsm_resp)
+    async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
+        @param url: url linking and AP page
+        @return: page data, pubsub items
+        """
+        page_data = await self.apGet(url)
+        ap_items = page_data.get("orderedItems")
+        if not ap_items:
+            log.warning('No "orderedItems" collection found')
+            return page_data, []
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                ap_object, mb_data = await self.apItem2MBdata(ap_item)
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+            item_elt = await self._m.data2entry(
+                self.client, mb_data, ap_object["id"], None, self._m.namespace
+            )
+            item_elt["publisher"] = mb_data["author_jid"].full()
+            items.append(item_elt)
+        return page_data, items
+    async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
+        """Convert AP item to microblog data
+        @return: AP Item's Object and microblog data
+        @raise exceptions.DataError: something is invalid in the AP item
+        @raise NotImplemented: some AP data is not handled yet
+        @raise error.StanzaError: error while contacting the AP server
+        """
+        ap_object = ap_item.get("object")
+        if not ap_object:
+            log.warning(f'No "object" found in AP item {ap_item!r}')
+            raise exceptions.DataError
+        if isinstance(ap_object, str):
+            ap_object = await self.apGet(ap_object)
+        obj_id = ap_object.get("id")
+        if not obj_id:
+            log.warning(f'No "id" found in AP object: {ap_object!r}')
+            raise exceptions.DataError
+        if ap_object.get("inReplyTo") is not None:
+            raise NotImplementedError
+        mb_data = {}
+        for ap_key, mb_key in AP_MB_MAP.items():
+            data = ap_object.get(ap_key)
+            if data is None:
+                continue
+            mb_data[mb_key] = data
+        # content
+        try:
+            language, content_xhtml = ap_object["contentMap"].popitem()
+        except (KeyError, AttributeError):
+            try:
+                mb_data["content_xhtml"] = mb_data["content"]
+            except KeyError:
+                log.warning(f"no content found:\n{ap_object!r}")
+                raise exceptions.DataError
+        else:
+            mb_data["language"] = language
+            mb_data["content_xhtml"] = content_xhtml
+        # author
+        actor = ap_item.get("actor")
+        if not actor:
+            log.warning(f"no actor associated to object id {obj_id!r}")
+            raise exceptions.DataError
+        elif isinstance(actor, list):
+            # we only keep first item of list as author
+            # TODO: handle multiple actors
+            if len(actor) > 1:
+                log.warning("multiple actors are not managed")
+            actor = actor[0]
+        if isinstance(actor, dict):
+            actor = actor.get("id")
+            if not actor:
+                log.warning(f"no actor id found: {actor!r}")
+                raise exceptions.DataError
+        if isinstance(actor, str):
+            account = await self.getAPAccountFromId(actor)
+            mb_data["author"] = account.split("@", 1)[0]
+            author_jid = mb_data["author_jid"] = jid.JID(
+                None,
+                (
+          ["XEP-0106"].escape(account),
+          ,
+                    None
+                )
+            )
+        else:
+            log.warning(f"unknown actor type found: {actor!r}")
+            raise exceptions.DataError
+        # published/updated
+        for field in ("published", "updated"):
+            value = ap_object.get(field)
+            if not value and field == "updated":
+                value = ap_object.get("published")
+            if value:
+                try:
+                    mb_data[field] = calendar.timegm(
+                        dateutil.parser.parse(str(value)).utctimetuple()
+                    )
+                except dateutil.parser.ParserError as e:
+                    log.warning(f"Can't parse {field!r} field: {e}")
+        return ap_object, mb_data
+    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item"""
+        if not mb_data.get("id"):
+            mb_data["id"] = shortuuid.uuid()
+        if not mb_data.get("author_jid"):
+            mb_data["author_jid"] = client.jid
+        ap_account = await self.getAPAccountFromJidAndNode(
+            jid.JID(mb_data["author_jid"]),
+            None
+        )
+        url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
+        url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
+        return {
+            "@context": "",
+            "id": url_item,
+            "type": "Create",
+            "actor": url_actor,
+            "object": {
+                "id": url_item,
+                "type": "Note",
+                "published": utils.xmpp_date(mb_data["published"]),
+                "attributedTo": url_actor,
+                "content": mb_data.get("content_xhtml") or mb_data["content"],
+                "to": ""
+            }
+        }
+    async def publishMessage(
+        self,
+        client: SatXMPPEntity,
+        mess_data: dict,
+        service: jid.JID
+    ) -> None:
+        """Send an AP message
+        .. note::
+            This is a temporary method used for development only
+        @param mess_data: message data. Following keys must be set:
+            ``node``
+              identifier of message which is being replied (this will
+              correspond to pubsub node in the future)
+            ``content_xhtml`` or ``content``
+              message body (respectively in XHTML or plain text)
+        @param service: JID corresponding to the AP actor.
+        """
+        if not service.user:
+            raise ValueError("service must have a local part")
+        account =["XEP-0106"].unescape(service.user)
+        ap_actor_data = await self.getAPActorDataFromId(account)
+        try:
+            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
+        except KeyError:
+            raise exceptions.DataError("Can't get ActivityPub actor inbox")
+        item_data = await self.mbdata2APitem(client, mess_data)
+        url_actor = item_data["object"]["attributedTo"]
+        resp = await self.signAndPost(inbox_url, url_actor, item_data)
+        if resp.code != 202:
+            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,36 @@
+#!/usr/bin/env python3
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+IMPORT_NAME = "ap-gateway"
+CONF_SECTION = f"component {IMPORT_NAME}"
+CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
+TYPE_ACTOR = "actor"
+TYPE_INBOX = "inbox"
+TYPE_OUTBOX = "outbox"
+TYPE_ITEM = "item"
+MEDIA_TYPE_AP = "application/activity+json"
+# mapping from AP metadata to microblog data
+AP_MB_MAP = {
+    "content": "content_xhtml",
+AP_REQUEST_TYPES = {"actor", "outbox"}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,298 @@
+#!/usr/bin/env python3
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+from typing import Optional, Dict, List
+import json
+from urllib import parse
+import re
+import unicodedata
+from twisted.web import http, resource as web_resource, server
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, error
+from wokkel import pubsub, rsm
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.i18n import _
+from sat.core.log import getLogger
+                       AP_REQUEST_TYPES, PAGE_SIZE)
+log = getLogger(__name__)
+VERSION = unicodedata.normalize(
+    'NFKD',
+    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
+class HTTPAPGServer(web_resource.Resource):
+    """HTTP Server handling ActivityPub S2S protocol"""
+    isLeaf = True
+    def __init__(self, ap_gateway):
+        self.apg = ap_gateway
+        super().__init__()
+    async def webfinger(self, request):
+        url_parsed = parse.urlparse(request.uri.decode())
+        query = parse.parse_qs(url_parsed.query)
+        resource = query.get("resource", [""])[0]
+        account = resource[5:].strip()
+        if not resource.startswith("acct:") or not account:
+            return web_resource.ErrorPage(
+                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
+            ).render(request)
+        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
+        resp = {
+            "subject": resource,
+            "links": [
+                {
+                    "rel": "self",
+                    "type": "application/activity+json",
+                    "href": actor_url
+                }
+            ]
+        }
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        request.write(json.dumps(resp).encode())
+        request.finish()
+    async def APActorRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        actor_url: str
+    ) -> dict:
+        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+        # we have to use AP account as preferredUsername because it is used to retrieve
+        # actor handle (see
+        preferred_username = ap_account.split("@", 1)[0]
+        return {
+            "@context": [
+                "",
+                ""
+            ],
+            "id": actor_url,
+            "type": "Person",
+            "preferredUsername": preferred_username,
+            "inbox": inbox_url,
+            "outbox": outbox_url,
+            "publicKey": {
+                "id": f"{actor_url}#main-key",
+                "owner": actor_url,
+                "publicKeyPem": self.apg.public_key_pem
+            }
+        }
+    def getCanonicalURL(self, request: "HTTPRequest") -> str:
+        return parse.urljoin(
+            f"https://{self.apg.public_url}",
+            request.path.decode().rstrip("/")
+        )
+    def queryData2RSMRequest(
+        self,
+        query_data: Dict[str, List[str]]
+    ) -> rsm.RSMRequest:
+        """Get RSM kwargs to use with RSMRequest from query data"""
+        page = query_data.get("page")
+        if page == ["first"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
+        elif page == ["last"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE)
+        else:
+            for query_key in ("index", "before", "after"):
+                try:
+                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
+                except (KeyError, IndexError, ValueError):
+                    pass
+                else:
+                    return rsm.RSMRequest(**kwargs)
+        raise ValueError(f"Invalid query data: {query_data!r}")
+    async def APOutboxPageRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        query_data: Dict[str, List[str]]
+    ) -> dict:
+        # we only keep useful keys, and sort to have consistent URL which can
+        # be used as ID
+        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
+        query_data = {k: query_data[k] for k in url_keys}
+        try:
+            items, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                rsm_request=self.queryData2RSMRequest(query_data),
+                extra = {C.KEY_USE_CACHE: False}
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+        base_url = self.getCanonicalURL(request)
+        url = f"{base_url}?{parse.urlencode(query_data, True)}"
+        data = {
+            "@context": "",
+            "id": url,
+            "type": "OrderedCollectionPage",
+            "partOf": base_url,
+            "orderedItems" : [
+                await self.apg.mbdata2APitem(
+                    self.apg.client,
+                    await self.apg._m.item2mbdata(
+                        self.apg.client,
+                        item,
+                        account_jid,
+                        node
+                    )
+                )
+                for item in reversed(items)
+            ]
+        }
+        # AP OrderedCollection must be in reversed chronological order, thus the opposite
+        # of what we get with RSM (at least with Libervia Pubsub)
+        if not metadata["complete"]:
+            try:
+                last= metadata["rsm"]["last"]
+            except KeyError:
+                last = None
+            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
+        if metadata["rsm"]["index"] != 0:
+            try:
+                first= metadata["rsm"]["first"]
+            except KeyError:
+                first = None
+            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
+        return data
+    async def APOutboxRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+        parsed_url = parse.urlparse(request.uri.decode())
+        query_data = parse.parse_qs(parsed_url.query)
+        if query_data:
+            return await self.APOutboxPageRequest(
+                request, account_jid, node, ap_account, ap_url, query_data
+            )
+        # XXX: we can't use disco#info here because this request won't work on a bare jid
+        # due to security considerations of XEP-0030 (we don't have presence
+        # subscription).
+        # The current workaround is to do a request as if RSM was available, and actually
+        # check its availability according to result.
+        try:
+            __, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                max_items=0,
+                rsm_request=rsm.RSMRequest(max_=0)
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+        try:
+            items_count = metadata["rsm"]["count"]
+        except KeyError:
+            log.warning(
+                f"No RSM metadata found when requesting pubsub node {node} at "
+                f"{account_jid}, defaulting to items_count=20"
+            )
+            items_count = 20
+        url = self.getCanonicalURL(request)
+        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
+        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
+        return {
+            "@context": "",
+            "id": url,
+            "totalItems": items_count,
+            "type": "OrderedCollection",
+            "first": url_first_page,
+            "last": url_last_page,
+        }
+    async def APRequest(self, request):
+        path = request.path.decode()
+        ap_url = parse.urljoin(
+            f"https://{self.apg.public_url}",
+            path
+        )
+        request_type, ap_account = self.apg.parseAPURL(ap_url)
+        account_jid, node = await self.apg.getJIDAndNode(ap_account)
+        if request_type not in AP_REQUEST_TYPES:
+            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+        method = getattr(self, f"AP{request_type.title()}Request")
+        ret_data = await method(request, account_jid, node, ap_account, ap_url)
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        request.write(json.dumps(ret_data).encode())
+        request.finish()
+    def render(self, request):
+        request.setHeader("server", VERSION)
+        return super().render(request)
+    def render_GET(self, request):
+        path = request.path.decode().lstrip("/")
+        if path.startswith(".well-known/webfinger"):
+            defer.ensureDeferred(self.webfinger(request))
+            return server.NOT_DONE_YET
+        elif path.startswith(self.apg.ap_path):
+            defer.ensureDeferred(self.APRequest(request))
+            return server.NOT_DONE_YET
+        return web_resource.NoResource().render(request)
+class HTTPRequest(server.Request):
+    pass
+class HTTPServer(server.Site):
+    requestFactory = HTTPRequest
+    def __init__(self, ap_gateway):
+        super().__init__(HTTPAPGServer(ap_gateway))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,129 @@
+#!/usr/bin/env python3
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+from typing import Optional, List
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import rsm
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from import ensure_deferred
+log = getLogger(__name__)
+class APPubsubService(rsm.PubSubService):
+    """Pubsub service for XMPP requests"""
+    def __init__(self, apg):
+        super(APPubsubService, self).__init__()
+ =
+        self.apg = apg
+        self.discoIdentity = {
+            "category": "pubsub",
+            "type": "service",
+            "name": "Libervia ActivityPub Gateway",
+        }
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
+        raise NotImplementedError
+    @ensure_deferred
+    async def items(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        node: str,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        rsm_req: Optional[rsm.RSMRequest]
+    ) -> List[domish.Element]:
+        if not service.user:
+            return []
+        ap_account =["XEP-0106"].unescape(service.user)
+        if ap_account.count("@") != 1:
+            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
+            return []
+        if node != self.apg._m.namespace:
+            raise error.StanzaError(
+                "feature-not-implemented",
+                text=f"{VERSION} only supports {self.apg._m.namespace} "
+                "node for now"
+            )
+        if rsm_req is None:
+            if maxItems is None:
+                maxItems = 20
+            kwargs = {
+                "max_items": maxItems,
+                "chronological_pagination": False,
+            }
+        else:
+            if len(
+                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                 if v is not None]
+            ) > 1:
+                raise error.StanzaError(
+                    "bad-request",
+                    text="You can't use after, before and index at the same time"
+                )
+            kwargs = {"max_items": rsm_req.max}
+            if rsm_req.after is not None:
+                kwargs["after_id"] = rsm_req.after
+            elif rsm_req.before is not None:
+                kwargs["chronological_pagination"] = False
+                if rsm_req.before != "":
+                    kwargs["after_id"] = rsm_req.before
+            elif rsm_req.index is not None:
+                kwargs["start_index"] = rsm_req.index
+            f"No cache found for node {node} at {service} (AP account {ap_account}), "
+            "using Collection Paging to RSM translation"
+        )
+        return await self.apg.getAPItems(ap_account, **kwargs)
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        raise NotImplementedError
+    def getNodeInfo(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> Optional[dict]:
+        if not nodeIdentifier:
+            return None
+        info = {
+            "type": "leaf",
+            "meta-data": [
+                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+                {"var": "pubsub#max_items", "value": "max"},
+                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+            ]
+        }
+        return info