view sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3826:81c79b7cafa7

tests (unit/ap-gateway): tests for XMPP identity/vCard4 <=> AP actor data conversion: rel 368
author Goffi <goffi@goffi.org>
date Wed, 29 Jun 2022 14:06:33 +0200
parents 6329ee6b6df4
children 201a22bfbb74
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
import calendar
import hashlib
import json
from pathlib import Path
from pprint import pformat
import re
from typing import (
    Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
)
from urllib import parse

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
import dateutil
import shortuuid
from sqlalchemy.exc import IntegrityError
import treq
from treq.response import _Response as TReqResponse
from twisted.internet import defer, reactor, threads
from twisted.web import http
from twisted.words.protocols.jabber import error, jid
from twisted.words.xish import domish
from wokkel import rsm, pubsub

from sat.core import exceptions
from sat.core.constants import Const as C
from sat.core.core_types import SatXMPPEntity
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.memory.sqla_mapping import SubscriptionState, History
from sat.memory import persistent
from sat.tools import utils
from sat.tools.common import data_format, tls, uri
from sat.tools.common.async_utils import async_lru

from .constants import (
    ACTIVITY_OBJECT_MANDATORY,
    ACTIVITY_TARGET_MANDATORY,
    ACTIVITY_TYPES,
    ACTIVITY_TYPES_LOWER,
    AP_MB_MAP,
    COMMENTS_MAX_PARENTS,
    CONF_SECTION,
    IMPORT_NAME,
    LRU_MAX_SIZE,
    MEDIA_TYPE_AP,
    TYPE_ACTOR,
    TYPE_ITEM,
    TYPE_FOLLOWERS,
    TYPE_TOMBSTONE,
    NS_AP_PUBLIC,
    PUBLIC_TUPLE
)
from .http_server import HTTPServer
from .pubsub_service import APPubsubService


log = getLogger(__name__)

IMPORT_NAME = "ap-gateway"

PLUGIN_INFO = {
    C.PI_NAME: "ActivityPub Gateway component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [
        "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329",
        "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "XEP-0054"
    ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "APGateway",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: _(
        "Gateway for bidirectional communication between XMPP and ActivityPub."
    ),
}

HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")


class APGateway:
    IMPORT_NAME = IMPORT_NAME

    def __init__(self, host):
        self.host = host
        self.initialised = False
        self.client = None
        self._p = host.plugins["XEP-0060"]
        self._a = host.plugins["XEP-0084"]
        self._e = host.plugins["XEP-0106"]
        self._m = host.plugins["XEP-0277"]
        self._v = host.plugins["XEP-0292"]
        self._r = host.plugins["XEP-0424"]
        self._pps = host.plugins["XEP-0465"]
        self._c = host.plugins["PUBSUB_CACHE"]
        self._t = host.plugins["TEXT_SYNTAXES"]
        self._i = host.plugins["IDENTITY"]
        self._p.addManagedNode(
            "", items_cb=self._itemsReceived
        )
        self.pubsub_service = APPubsubService(self)
        host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
        host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)

        host.bridge.addMethod(
            "APSend",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._publishMessage,
            async_=True,
        )

    def getHandler(self, __):
        return self.pubsub_service

    async def init(self, client):
        if self.initialised:
            return

        self.initialised = True
        log.info(_("ActivityPub Gateway initialization"))

        # RSA keys
        stored_data = await self.host.memory.storage.getPrivates(
            IMPORT_NAME, ["rsa_key"], profile=client.profile
        )
        private_key_pem = stored_data.get("rsa_key")
        if private_key_pem is None:
            self.private_key = await threads.deferToThread(
                rsa.generate_private_key,
                public_exponent=65537,
                key_size=4096,
            )
            private_key_pem = self.private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption()
            ).decode()
            await self.host.memory.storage.setPrivateValue(
                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
            )
        else:
            self.private_key = serialization.load_pem_private_key(
                private_key_pem.encode(),
                password=None,
            )
        self.public_key = self.private_key.public_key()
        self.public_key_pem = self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ).decode()

        # params
        # URL and port
        self.public_url = self.host.memory.getConfig(
            CONF_SECTION, "public_url"
        ) or self.host.memory.getConfig(
            CONF_SECTION, "xmpp_domain"
        )
        if self.public_url is None:
            log.error(
                '"public_url" not set in configuration, this is mandatory to have'
                "ActivityPub Gateway running. Please set this option it to public facing "
                f"url in {CONF_SECTION!r} configuration section."
            )
            return
        if parse.urlparse(self.public_url).scheme:
            log.error(
                "Scheme must not be specified in \"public_url\", please remove it from "
                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
            )
            return
        self.http_port = int(self.host.memory.getConfig(
            CONF_SECTION, 'http_port', 8123))
        connection_type = self.host.memory.getConfig(
            CONF_SECTION, 'http_connection_type', 'https')
        if connection_type not in ('http', 'https'):
            raise exceptions.ConfigError(
                'bad ap-gateay http_connection_type, you must use one of "http" or '
                '"https"'
            )
        self.max_items = int(self.host.memory.getConfig(
            CONF_SECTION, 'new_node_max_items', 50

        ))
        self.comments_max_depth = int(self.host.memory.getConfig(
            CONF_SECTION, 'comments_max_depth', 0
        ))
        self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
        # True (default) if we provide gateway only to entities/services from our server
        self.local_only = C.bool(
            self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE)
        )

        # HTTP server launch
        self.server = HTTPServer(self)
        if connection_type == 'http':
            reactor.listenTCP(self.http_port, self.server)
        else:
            options = tls.getOptionsFromConfig(
                self.host.memory.config, CONF_SECTION)
            tls.TLSOptionsCheck(options)
            context_factory = tls.getTLSContextFactory(options)
            reactor.listenSSL(self.http_port, self.server, context_factory)

    async def profileConnecting(self, client):
        self.client = client
        client.sendHistory = True
        client._ap_storage = persistent.LazyPersistentBinaryDict(
            IMPORT_NAME,
            client.profile
        )
        await self.init(client)

    async def _itemsReceived(
        self,
        client: SatXMPPEntity,
        itemsEvent: pubsub.ItemsEvent
    ) -> None:
        """Callback called when pubsub items are received

        if the items are adressed to a JID corresponding to an AP actor, they are
        converted to AP items and sent to the corresponding AP server.

        If comments nodes are linked, they are automatically subscribed to get new items
        from there too.
        """
        if client != self.client:
            return
        # we need recipient as JID and not gateway own JID to be able to use methods such
        # as "subscribe"
        client = self.client.getVirtualClient(itemsEvent.sender)
        recipient = itemsEvent.recipient
        if not recipient.user:
            log.debug("ignoring items event without local part specified")
            return

        ap_account = self._e.unescape(recipient.user)
        await self.convertAndPostItems(
            client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items
        )

    async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
        """Get client for this component with a specified jid

        This is needed to perform operations with the virtual JID corresponding to the AP
        actor instead of the JID of the gateway itself.
        @param actor_id: ID of the actor
        @return: virtual client
        """
        local_jid = await self.getJIDFromId(actor_id)
        return self.client.getVirtualClient(local_jid)

    def isActivity(self, data: dict) -> bool:
        """Return True if the data has an activity type"""
        try:
            return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
        except (KeyError, TypeError):
            return False

    async def apGet(self, url: str) -> dict:
        """Retrieve AP JSON from given URL

        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
            with AP server
        """
        try:
            return await treq.json_content(await treq.get(
                url,
                headers = {
                    "Accept": [MEDIA_TYPE_AP],
                    "Content-Type": [MEDIA_TYPE_AP],
                }
            ))
        except Exception as e:
            raise error.StanzaError(
                "service-unavailable",
                text=f"Can't get AP data at {url}: {e}"
            )

    @overload
    async def apGetObject(self, data: dict, key: str) -> Optional[dict]:
        ...

    @overload
    async def apGetObject(
        self, data: Union[str, dict], key: None = None
    ) -> dict:
        ...

    async def apGetObject(self, data, key = None):
        """Retrieve an AP object, dereferencing when necessary

        This method is to be used with attributes marked as "Functional" in
        https://www.w3.org/TR/activitystreams-vocabulary
        @param data: AP object where an other object is looked for, or the object itself
        @param key: name of the object to look for, or None if data is the object directly
        @return: found object if any
        """
        if key is not None:
            value = data.get(key)
        else:
            value = data
        if value is None:
            if key is None:
                raise ValueError("None can't be used with apGetObject is key is None")
            return None
        elif isinstance(value, dict):
            return value
        elif isinstance(value, str):
            return await self.apGet(value)
        else:
            raise NotImplementedError(
                "was expecting a string or a dict, got {type(value)}: {value!r}}"
            )

    async def apGetList(
        self,
        data: dict,
        key: str,
        only_ids: bool = False
    ) -> Optional[List[Dict[str, Any]]]:
        """Retrieve a list of objects from AP data, dereferencing when necessary

        This method is to be used with non functional vocabularies. Use ``apGetObject``
        otherwise.
        If the value is a dictionary, it will be wrapped in a list
        @param data: AP object where a list of objects is looked for
        @param key: key of the list to look for
        @param only_ids: if Trye, only items IDs are retrieved
        @return: list of objects, or None if the key is not present
        """
        value = data.get(key)
        if value is None:
            return None
        elif isinstance(value, str):
            value = await self.apGet(value)
        if isinstance(value, dict):
            return [value]
        if not isinstance(value, list):
            raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
        if only_ids:
            return [
                {"id": v["id"]} if isinstance(v, dict) else {"id": v}
                for v in value
            ]
        else:
            return [await self.apGetObject(i) for i in value]

    async def apGetActors(
        self,
        data: dict,
        key: str,
        as_account: bool = True
    ) -> List[str]:
        """Retrieve AP actors from data

        @param data: AP object containing a field with actors
        @param key: field to use to retrieve actors
        @param as_account: if True returns account handles, otherwise will return actor
            IDs
        @raise exceptions.DataError: there is not actor data or it is invalid
        """
        value = data.get(key)
        if value is None:
            raise exceptions.DataError(
                f"no actor associated to object {data.get('id')!r}"
            )
        elif isinstance(value, dict):
            actor_id = value.get("id")
            if actor_id is None:
                raise exceptions.DataError(
                    f"invalid actor associated to object {data.get('id')!r}: {value!r}"
                )
            value = [actor_id]
        elif isinstance(value, str):
            value = [value]
        elif isinstance(value, list):
            try:
                value = [a if isinstance(a, str) else a["id"] for a in value]
            except (TypeError, KeyError):
                raise exceptions.DataError(
                    f"invalid actors list to object {data.get('id')!r}: {value!r}"
                )
        if not value:
            raise exceptions.DataError(
                f"list of actors is empty"
            )
        if as_account:
            return [await self.getAPAccountFromId(actor_id) for actor_id in value]
        else:
            return value

    async def apGetSenderActor(
        self,
        data: dict,
    ) -> str:
        """Retrieve actor who sent data

        This is done by checking "attributedTo" field first, then "actor" field.
        Only the first found actor is taken into accoun
        @param data: AP object
        @return: actor id of the sender
        @raise exceptions.NotFound: no actor has been found in data
        """
        try:
            actors = await self.apGetActors(data, "attributedTo", as_account=False)
        except exceptions.DataError:
            actors = None
        if not actors:
            try:
                actors = await self.apGetActors(data, "actor", as_account=False)
            except exceptions.DataError:
                raise exceptions.NotFound(
                    'actor not specified in "attributedTo" or "actor"'
                )
        try:
            return actors[0]
        except IndexError:
            raise exceptions.NotFound("list of actors is empty")

    def mustEncode(self, text: str) -> bool:
        """Indicate if a text must be period encoded"""
        return (
            not RE_ALLOWED_UNQUOTED.match(text)
            or text.startswith("___")
            or "---" in text
        )

    def periodEncode(self, text: str) -> str:
        """Period encode a text

        see [getJIDAndNode] for reasons of period encoding
        """
        return (
            parse.quote(text, safe="")
            .replace("---", "%2d%2d%2d")
            .replace("___", "%5f%5f%5f")
            .replace(".", "%2e")
            .replace("~", "%7e")
            .replace("%", ".")
        )

    async def getAPAccountFromJidAndNode(
        self,
        jid_: jid.JID,
        node: Optional[str]
    ) -> str:
        """Construct AP account from JID and node

        The account construction will use escaping when necessary
        """
        if not node or node == self._m.namespace:
            node = None

        if self.client is None:
            raise exceptions.InternalError("Client is not set yet")

        if jid_.host == self.client.jid.userhost():
            # this is an proxy JID to an AP Actor
            return self._e.unescape(jid_.user)

        if node and not jid_.user and not self.mustEncode(node):
            is_pubsub = await self.isPubsub(jid_)
            # when we have a pubsub service, the user part can be used to set the node
            # this produces more user-friendly AP accounts
            if is_pubsub:
                jid_.user = node
                node = None

        is_local = self.isLocal(jid_)
        user = jid_.user if is_local else jid_.userhost()
        if user is None:
            user = ""
        account_elts = []
        if node and self.mustEncode(node) or self.mustEncode(user):
            account_elts = ["___"]
            if node:
                node = self.periodEncode(node)
            user = self.periodEncode(user)

        if not user:
            raise exceptions.InternalError("there should be a user part")

        if node:
            account_elts.extend((node, "---"))

        account_elts.extend((
            user, "@", jid_.host if is_local else self.client.jid.userhost()
        ))
        return "".join(account_elts)

    def isLocal(self, jid_: jid.JID) -> bool:
        """Returns True if jid_ use a domain or subdomain of gateway's host"""
        local_host = self.client.host.split(".")
        assert local_host
        return jid_.host.split(".")[-len(local_host):] == local_host

    async def isPubsub(self, jid_: jid.JID) -> bool:
        """Indicate if a JID is a Pubsub service"""
        host_disco = await self.host.getDiscoInfos(self.client, jid_)
        return (
            ("pubsub", "service") in host_disco.identities
            and not ("pubsub", "pep") in host_disco.identities
        )

    async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
        """Decode raw AP account handle to get XMPP JID and Pubsub Node

        Username are case insensitive.

        By default, the username correspond to local username (i.e. username from
        component's server).

        If local name's domain is a pubsub service (and not PEP), the username is taken as
        a pubsub node.

        If ``---`` is present in username, the part before is used as pubsub node, and the
        rest as a JID user part.

        If username starts with ``___``, characters are encoded using period encoding
        (i.e. percent encoding where a ``.`` is used instead of ``%``).

        This horror is necessary due to limitation in some AP implementation (notably
        Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222

        examples:

        ``toto@example.org`` => JID = toto@example.org, node = None

        ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a
        non-local JID, and will work only if setings ``local_only`` is False), node = None

        ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
        JID = pubsub.example.org, node = toto

        ``tata---toto@example.org`` => JID = toto@example.org, node = tata

        ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
        being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0

        @param ap_account: ActivityPub account handle (``username@domain.tld``)
        @return: service JID and pubsub node
            if pubsub node is None, default microblog pubsub node (and possibly other
            nodes that plugins may hanlde) will be used
        @raise ValueError: invalid account
        @raise PermissionError: non local jid is used when gateway doesn't allow them
        """
        if ap_account.count("@") != 1:
            raise ValueError("Invalid AP account")
        if ap_account.startswith("___"):
            encoded = True
            ap_account = ap_account[3:]
        else:
            encoded = False

        username, domain = ap_account.split("@")

        if "---" in username:
            node, username = username.rsplit("---", 1)
        else:
            node = None

        if encoded:
            username = parse.unquote(
                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
                errors="strict"
            )
            if node:
                node = parse.unquote(
                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
                    errors="strict"
                )

        if "@" in username:
            username, domain = username.rsplit("@", 1)

        if not node:
            # we need to check host disco, because disco request to user may be
            # blocked for privacy reason (see
            # https://xmpp.org/extensions/xep-0030.html#security)
            is_pubsub = await self.isPubsub(jid.JID(domain))

            if is_pubsub:
                # if the host is a pubsub service and not a PEP, we consider that username
                # is in fact the node name
                node = username
                username = None

        jid_s = f"{username}@{domain}" if username else domain
        try:
            jid_ = jid.JID(jid_s)
        except RuntimeError:
            raise ValueError(f"Invalid jid: {jid_s!r}")

        if self.local_only and not self.isLocal(jid_):
            raise exceptions.PermissionError(
                "This gateway is configured to map only local entities and services"
            )

        return jid_, node

    def getLocalJIDFromAccount(self, account: str) -> jid.JID:
        """Compute JID linking to an AP account

        The local jid is computer by escaping AP actor handle and using it as local part
        of JID, where domain part is this gateway own JID
        """
        return jid.JID(
            None,
            (
                self._e.escape(account),
                self.client.jid.host,
                None
            )
        )

    async def getJIDFromId(self, actor_id: str) -> jid.JID:
        """Compute JID linking to an AP Actor ID

        The local jid is computer by escaping AP actor handle and using it as local part
        of JID, where domain part is this gateway own JID
        If the actor_id comes from local server (checked with self.public_url), it means
        that we have an XMPP entity, and the original JID is returned
        """
        if self.isLocalURL(actor_id):
            request_type, extra_args = self.parseAPURL(actor_id)
            if request_type != TYPE_ACTOR or len(extra_args) != 1:
                raise ValueError(f"invalid actor id: {actor_id!r}")
            actor_jid, __ = await self.getJIDAndNode(extra_args[0])
            return actor_jid

        account = await self.getAPAccountFromId(actor_id)
        return self.getLocalJIDFromAccount(account)

    def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
        """Parse an URL leading to an AP endpoint

        @param url: URL to parse (schema is not mandatory)
        @return: endpoint type and extra arguments
        """
        path = parse.urlparse(url).path.lstrip("/")
        type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1)
        return type_, [parse.unquote(a) for a in extra_args]

    def buildAPURL(self, type_:str , *args: str) -> str:
        """Build an AP endpoint URL

        @param type_: type of AP endpoing
        @param arg: endpoint dependant arguments
        """
        return parse.urljoin(
            self.base_ap_url,
            str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
        )

    def isLocalURL(self, url: str) -> bool:
        """Tells if an URL link to this component

        ``public_url`` and ``ap_path`` are used to check the URL
        """
        return url.startswith(self.base_ap_url)

    def buildSignatureHeader(self, values: Dict[str, str]) -> str:
        """Build key="<value>" signature header from signature data"""
        fields = []
        for key, value in values.items():
            if key not in ("(created)", "(expired)"):
                if '"' in value:
                    raise NotImplementedError(
                        "string escaping is not implemented, double-quote can't be used "
                        f"in {value!r}"
                    )
                value = f'"{value}"'
            fields.append(f"{key}={value}")

        return ",".join(fields)

    def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
        """Get digest data to use in header and signature

        @param body: body of the request
        @return: hash name and digest
        """
        if algo != "SHA-256":
            raise NotImplementedError("only SHA-256 is implemented for now")
        return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()

    @async_lru(maxsize=LRU_MAX_SIZE)
    async def getActorData(self, actor_id) -> dict:
        """Retrieve actor data with LRU cache"""
        return await self.apGet(actor_id)

    @async_lru(maxsize=LRU_MAX_SIZE)
    async def getActorPubKeyData(
        self,
        actor_id: str
    ) -> Tuple[str, str, rsa.RSAPublicKey]:
        """Retrieve Public Key data from actor ID

        @param actor_id: actor ID (url)
        @return: key_id, owner and public_key
        @raise KeyError: publicKey is missing from actor data
        """
        actor_data = await self.getActorData(actor_id)
        pub_key_data = actor_data["publicKey"]
        key_id = pub_key_data["id"]
        owner = pub_key_data["owner"]
        pub_key_pem = pub_key_data["publicKeyPem"]
        pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
        return key_id, owner, pub_key

    def createActivity(
        self,
        activity: str,
        actor_id: str,
        object_: Optional[Union[str, dict]] = None,
        target: Optional[Union[str, dict]] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Generate base data for an activity

        @param activity: one of ACTIVITY_TYPES
        """
        if activity not in ACTIVITY_TYPES:
            raise exceptions.InternalError(f"invalid activity: {activity!r}")
        if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
            raise exceptions.InternalError(
                f'"object_" is mandatory for activity {activity!r}'
            )
        if target is None and activity in ACTIVITY_TARGET_MANDATORY:
            raise exceptions.InternalError(
                f'"target" is mandatory for activity {activity!r}'
            )
        activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
        data: Dict[str, Any] = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "actor": actor_id,
            "id": activity_id,
            "type": activity,
        }
        data.update(kwargs)
        if object_ is not None:
            data["object"] = object_
        if target is not None:
            data["target"] = target

        return data

    def getKeyId(self, actor_id: str) -> str:
        """Get local key ID from actor ID"""
        return f"{actor_id}#main-key"

    async def checkSignature(
        self,
        signature: str,
        key_id: str,
        headers: Dict[str, str]
    ) -> str:
        """Verify that signature matches given headers

        see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2

        @param signature: Base64 encoded signature
        @param key_id: ID of the key used to sign the data
        @param headers: headers and their values, including pseudo-headers
        @return: id of the signing actor

        @raise InvalidSignature: signature doesn't match headers
        """
        to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
        if key_id.startswith("acct:"):
            actor = key_id[5:]
            actor_id = await self.getAPActorIdFromAccount(actor)
        else:
            actor_id = key_id.split("#", 1)[0]

        pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id)
        if pub_key_id != key_id or pub_key_owner != actor_id:
            raise exceptions.EncryptionError("Public Key mismatch")

        try:
            pub_key.verify(
                base64.b64decode(signature),
                to_sign.encode(),
                # we have to use PKCS1v15 padding to be compatible with Mastodon
                padding.PKCS1v15(),  # type: ignore
                hashes.SHA256()  # type: ignore
            )
        except InvalidSignature:
            raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)")

        return actor_id

    def getSignatureData(
            self,
            key_id: str,
            headers: Dict[str, str]
    ) -> Tuple[Dict[str, str], Dict[str, str]]:
        """Generate and return signature and corresponding headers

        @param parsed_url: URL where the request is sent/has been received
        @param key_id: ID of the key (URL linking to the data with public key)
        @param date: HTTP datetime string of signature generation
        @param body: body of the HTTP request
        @param headers: headers to sign and their value:
            default value will be used if not specified

        @return: headers and signature data
            ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
            removed, and ``Signature`` added.
        """
        to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
        signature = base64.b64encode(self.private_key.sign(
            to_sign.encode(),
            # we have to use PKCS1v15 padding to be compatible with Mastodon
            padding.PKCS1v15(),  # type: ignore
            hashes.SHA256()  # type: ignore
        )).decode()
        sign_data = {
            "keyId": key_id,
            "Algorithm": "rsa-sha256",
            "headers": " ".join(headers.keys()),
            "signature": signature
        }
        new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
        new_headers["Signature"] = self.buildSignatureHeader(sign_data)
        return new_headers, sign_data

    async def convertAndPostItems(
        self,
        client: SatXMPPEntity,
        ap_account: str,
        service: jid.JID,
        node: str,
        items: List[domish.Element],
        subscribe_comments_nodes: bool = False,
    ) -> None:
        """Convert XMPP items to AP items and post them to actor inbox

        @param ap_account: account of ActivityPub actor
        @param service: JID of the virtual pubsub service corresponding to the AP actor
        @param node: virtual node corresponding to the AP actor and items
        @param subscribe_comments_nodes: if True, comment nodes present in given items,
            they will be automatically subscribed
        """
        actor_id = await self.getAPActorIdFromAccount(ap_account)
        inbox = await self.getAPInboxFromId(actor_id)
        for item in items:
            if item.name == "item":
                mb_data = await self._m.item2mbdata(client, item, service, node)
                if subscribe_comments_nodes:
                    # we subscribe automatically to comment nodes if any
                    for comment_data in mb_data.get("comments", []):
                        comment_service = jid.JID(comment_data["service"])
                        comment_node = comment_data["node"]
                        await self._p.subscribe(client, comment_service, comment_node)
                ap_item = await self.mbdata2APitem(client, mb_data)
                url_actor = ap_item["object"]["attributedTo"]
            elif item.name == "retract":
                url_actor, ap_item = await self.apDeleteItem(
                    client.jid, node, item["id"]
                )
            else:
                raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
            resp = await self.signAndPost(inbox, url_actor, ap_item)
            if resp.code >= 300:
                text = await resp.text()
                log.warning(
                    f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
                    f"{pformat(ap_item)}"
                )

    async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
        """Sign a documentent and post it to AP server

        @param url: AP server endpoint
        @param actor_id: originating actor ID (URL)
        @param doc: document to send
        """
        p_url = parse.urlparse(url)
        body = json.dumps(doc).encode()
        digest_algo, digest_hash = self.getDigest(body)
        digest = f"{digest_algo}={digest_hash}"

        headers = {
            "(request-target)": f"post {p_url.path}",
            "Host": p_url.hostname,
            "Date": http.datetimeToString().decode(),
            "Digest": digest
        }
        headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers)

        headers["Content-Type"] = (
            'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'
        )
        resp = await treq.post(
            url,
            body,
            headers=headers,
        )
        if resp.code >= 400:
            text = await resp.text()
            log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
        return resp

    def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
        service = jid.JID(service_s)
        client = self.host.getClient(profile)
        return defer.ensureDeferred(self.publishMessage(client, mess_data, service))

    @async_lru(maxsize=LRU_MAX_SIZE)
    async def getAPActorIdFromAccount(self, account: str) -> str:
        """Retrieve account ID from it's handle using WebFinger

        Don't use this method to get local actor id from a local account derivated for
        JID: in this case, the actor ID is retrieve with
        ``self.buildAPURL(TYPE_ACTOR, ap_account)``

        @param account: AP handle (user@domain.tld)
        @return: Actor ID (which is an URL)
        """
        if account.count("@") != 1 or "/" in account:
            raise ValueError(f"Invalid account: {account!r}")
        host = account.split("@")[1]
        try:
            finger_data = await treq.json_content(await treq.get(
                f"https://{host}/.well-known/webfinger?"
                f"resource=acct:{parse.quote_plus(account)}",
            ))
        except Exception as e:
            raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
        for link in finger_data.get("links", []):
            if (
                link.get("type") == "application/activity+json"
                and link.get("rel") == "self"
            ):
                href = link.get("href", "").strip()
                if not href:
                    raise ValueError(
                        f"Invalid webfinger data for {account:r}: missing href"
                    )
                break
        else:
            raise ValueError(
                f"No ActivityPub link found for {account!r}"
            )
        return href

    async def getAPActorDataFromAccount(self, account: str) -> dict:
        """Retrieve ActivityPub Actor data

        @param account: ActivityPub Actor identifier
        """
        href = await self.getAPActorIdFromAccount(account)
        return await self.apGet(href)

    async def getAPInboxFromId(self, actor_id: str) -> str:
        """Retrieve inbox of an actor_id"""
        data = await self.getActorData(actor_id)
        return data["inbox"]

    @async_lru(maxsize=LRU_MAX_SIZE)
    async def getAPAccountFromId(self, actor_id: str) -> str:
        """Retrieve AP account from the ID URL

        @param actor_id: AP ID of the actor (URL to the actor data)
        """
        url_parsed = parse.urlparse(actor_id)
        actor_data = await self.getActorData(actor_id)
        username = actor_data.get("preferredUsername")
        if not username:
            raise exceptions.DataError(
                'No "preferredUsername" field found, can\'t retrieve actor account'
            )
        account = f"{username}@{url_parsed.hostname}"
        # we try to retrieve the actor ID from the account to check it
        found_id = await self.getAPActorIdFromAccount(account)
        if found_id != actor_id:
            # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
            msg = (
                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
                f"({actor_id!r}). This AP instance doesn't seems to use "
                '"preferredUsername" as we expect.'
            )
            log.warning(msg)
            raise exceptions.DataError(msg)
        return account

    async def getAPItems(
        self,
        collection: dict,
        max_items: Optional[int] = None,
        chronological_pagination: bool = True,
        after_id: Optional[str] = None,
        start_index: Optional[int] = None,
        parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
        only_ids: bool = False,
    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
        """Retrieve AP items and convert them to XMPP items

        @param account: AP account handle to get items from
        @param max_items: maximum number of items to retrieve
            retrieve all items by default
        @param chronological_pagination: get pages in chronological order
            AP use reversed chronological order for pagination, "first" page returns more
            recent items. If "chronological_pagination" is True, "last" AP page will be
            retrieved first.
        @param after_id: if set, retrieve items starting from given ID
            Due to ActivityStream Collection Paging limitations, this is inefficient and
            if ``after_id`` is not already in cache, we have to retrieve every page until
            we find it.
            In most common cases, ``after_id`` should be in cache though (client usually
            use known ID when in-order pagination is used).
        @param start_index: start retrieving items from the one with given index
            Due to ActivityStream Collection Paging limitations, this is inefficient and
            all pages before the requested index will be retrieved to count items.
        @param parser: method to use to parse AP items and get XMPP item elements
            if None, use default generic parser
        @param only_ids: if True, only retrieve items IDs
            Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
            sufficient in some use cases (e.g. when retrieving following/followers
            collections)
        @return: XMPP Pubsub items and corresponding RSM Response
            Items are always returned in chronological order in the result
        """
        if parser is None:
            parser = self.apItem2Elt

        rsm_resp: Dict[str, Union[bool, int]] = {}
        try:
            count = collection["totalItems"]
        except KeyError:
            log.warning(
                f'"totalItems" not found in collection {collection.get("id")}, '
                "defaulting to 20"
            )
            count = 20
        else:
            log.info(f"{collection.get('id')} has {count} item(s)")

            rsm_resp["count"] = count

        if start_index is not None:
            assert chronological_pagination and after_id is None
            if start_index >= count:
                return [], rsm_resp
            elif start_index == 0:
                # this is the default behaviour
                pass
            elif start_index > 5000:
                raise error.StanzaError(
                    "feature-not-implemented",
                    text="Maximum limit for previous_index has been reached, this limit"
                    "is set to avoid DoS"
                )
            else:
                # we'll convert "start_index" to "after_id", thus we need the item just
                # before "start_index"
                previous_index = start_index - 1
                retrieved_items = 0
                current_page = collection["last"]
                while retrieved_items < count:
                    page_data, items = await self.parseAPPage(
                        current_page, parser, only_ids
                    )
                    if not items:
                        log.warning(f"found an empty AP page at {current_page}")
                        return [], rsm_resp
                    page_start_idx = retrieved_items
                    retrieved_items += len(items)
                    if previous_index <= retrieved_items:
                        after_id = items[previous_index - page_start_idx]["id"]
                        break
                    try:
                        current_page = page_data["prev"]
                    except KeyError:
                        log.warning(
                            f"missing previous page link at {current_page}: {page_data!r}"
                        )
                        raise error.StanzaError(
                            "service-unavailable",
                            "Error while retrieving previous page from AP service at "
                            f"{current_page}"
                        )

        init_page = "last" if chronological_pagination else "first"
        page = collection.get(init_page)
        if not page:
            raise exceptions.DataError(
                f"Initial page {init_page!r} not found for collection "
                f"{collection.get('id')})"
            )
        items = []
        page_items = []
        retrieved_items = 0
        found_after_id = False

        while retrieved_items < count:
            __, page_items = await self.parseAPPage(page, parser, only_ids)
            if not page_items:
                break
            retrieved_items += len(page_items)
            if after_id is not None and not found_after_id:
                # if we have an after_id, we ignore all items until the requested one is
                # found
                try:
                    limit_idx = [i["id"] for i in page_items].index(after_id)
                except ValueError:
                    # if "after_id" is not found, we don't add any item from this page
                    page_id = page.get("id") if isinstance(page, dict) else page
                    log.debug(f"{after_id!r} not found at {page_id}, skipping")
                else:
                    found_after_id = True
                    if chronological_pagination:
                        start_index = retrieved_items - len(page_items) + limit_idx + 1
                        page_items = page_items[limit_idx+1:]
                    else:
                        start_index = count - (retrieved_items - len(page_items) +
                                               limit_idx + 1)
                        page_items = page_items[:limit_idx]
                    items.extend(page_items)
            else:
                items.extend(page_items)
            if max_items is not None and len(items) >= max_items:
                if chronological_pagination:
                    items = items[:max_items]
                else:
                    items = items[-max_items:]
                break
            page = collection.get("prev" if chronological_pagination else "next")
            if not page:
                break

        if after_id is not None and not found_after_id:
            raise error.StanzaError("item-not-found")

        if items:
            if after_id is None:
                rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
            if start_index is not None:
                rsm_resp["index"] = start_index
            elif after_id is not None:
                log.warning("Can't determine index of first element")
            elif chronological_pagination:
                rsm_resp["index"] = 0
            else:
                rsm_resp["index"] = count - len(items)
            rsm_resp.update({
                "first": items[0]["id"],
                "last": items[-1]["id"]
            })

        return items, rsm.RSMResponse(**rsm_resp)

    async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
        """Convert AP item to parsed microblog data and corresponding item element"""
        mb_data = await self.apItem2MBdata(ap_item)
        item_elt = await self._m.data2entry(
            self.client, mb_data, mb_data["id"], None, self._m.namespace
        )
        item_elt["publisher"] = mb_data["author_jid"]
        return mb_data, item_elt

    async def apItem2Elt(self, ap_item: dict) -> domish.Element:
        """Convert AP item to XMPP item element"""
        __, item_elt = await self.apItem2MbDataAndElt(ap_item)
        return item_elt

    async def parseAPPage(
        self,
        page: Union[str, dict],
        parser: Callable[[dict], Awaitable[domish.Element]],
        only_ids: bool = False
    ) -> Tuple[dict, List[domish.Element]]:
        """Convert AP objects from an AP page to XMPP items

        @param page: Can be either url linking and AP page, or the page data directly
        @param parser: method to use to parse AP items and get XMPP item elements
        @param only_ids: if True, only retrieve items IDs
        @return: page data, pubsub items
        """
        page_data = await self.apGetObject(page)
        if page_data is None:
            log.warning('No data found in collection')
            return {}, []
        ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids)
        if ap_items is None:
            ap_items = await self.apGetList(page_data, "items", only_ids=only_ids)
            if not ap_items:
                log.warning(f'No item field found in collection: {page_data!r}')
                return page_data, []
            else:
                log.warning(
                    "Items are not ordered, this is not spec compliant"
                )
        items = []
        # AP Collections are in antichronological order, but we expect chronological in
        # Pubsub, thus we reverse it
        for ap_item in reversed(ap_items):
            try:
                items.append(await parser(ap_item))
            except (exceptions.DataError, NotImplementedError, error.StanzaError):
                continue

        return page_data, items

    async def getCommentsNodes(
        self,
        item_id: str,
        parent_id: Optional[str]
    ) -> Tuple[Optional[str], Optional[str]]:
        """Get node where this item is and node to use for comments

        if config option "comments_max_depth" is set, a common node will be used below the
        given depth
        @param item_id: ID of the reference item
        @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
        @return: a tuple with parent_node_id, comments_node_id:
            - parent_node_id is the ID of the node where reference item must be. None is
              returned when the root node (i.e. not a comments node) must be used.
            - comments_node_id: is the ID of the node to use for comments. None is
              returned when no comment node must be used (happens when we have reached
              "comments_max_depth")
        """
        if parent_id is None or not self.comments_max_depth:
            return (
                self._m.getCommentsNode(parent_id) if parent_id is not None else None,
                self._m.getCommentsNode(item_id)
            )
        parent_url = parent_id
        parents = []
        for __ in range(COMMENTS_MAX_PARENTS):
            parent_item = await self.apGet(parent_url)
            parents.insert(0, parent_item)
            parent_url = parent_item.get("inReplyTo")
            if parent_url is None:
                break
        parent_limit = self.comments_max_depth-1
        if len(parents) <= parent_limit:
            return (
                self._m.getCommentsNode(parents[-1]["id"]),
                self._m.getCommentsNode(item_id)
            )
        else:
            last_level_item = parents[parent_limit]
            return (
                self._m.getCommentsNode(last_level_item["id"]),
                None
            )

    async def apItem2MBdata(self, ap_item: dict) -> dict:
        """Convert AP activity or object to microblog data

        @return: AP Item's Object and microblog data
        @raise exceptions.DataError: something is invalid in the AP item
        @raise NotImplemented: some AP data is not handled yet
        @raise error.StanzaError: error while contacting the AP server
        """
        is_activity = self.isActivity(ap_item)
        if is_activity:
            ap_object = await self.apGetObject(ap_item, "object")
            if not ap_object:
                log.warning(f'No "object" found in AP item {ap_item!r}')
                raise exceptions.DataError
        else:
            ap_object = ap_item
        item_id = ap_object.get("id")
        if not item_id:
            log.warning(f'No "id" found in AP item: {ap_object!r}')
            raise exceptions.DataError
        mb_data = {"id": item_id}
        for ap_key, mb_key in AP_MB_MAP.items():
            data = ap_object.get(ap_key)
            if data is None:
                continue
            mb_data[mb_key] = data

        # content
        try:
            language, content_xhtml = ap_object["contentMap"].popitem()
        except (KeyError, AttributeError):
            try:
                mb_data["content_xhtml"] = mb_data["content"]
            except KeyError:
                log.warning(f"no content found:\n{ap_object!r}")
                raise exceptions.DataError
        else:
            mb_data["language"] = language
            mb_data["content_xhtml"] = content_xhtml
            if not mb_data.get("content"):
                mb_data["content"] = await self._t.convert(
                    content_xhtml,
                    self._t.SYNTAX_XHTML,
                    self._t.SYNTAX_TEXT,
                    False,
                )

        # author
        if is_activity:
            authors = await self.apGetActors(ap_item, "actor")
        else:
            authors = await self.apGetActors(ap_object, "attributedTo")
        if len(authors) > 1:
            # we only keep first item as author
            # TODO: handle multiple actors
            log.warning("multiple actors are not managed")

        account = authors[0]
        author_jid = self.getLocalJIDFromAccount(account).full()

        mb_data["author"] = account.split("@", 1)[0]
        mb_data["author_jid"] = author_jid

        # published/updated
        for field in ("published", "updated"):
            value = ap_object.get(field)
            if not value and field == "updated":
                value = ap_object.get("published")
            if value:
                try:
                    mb_data[field] = calendar.timegm(
                        dateutil.parser.parse(str(value)).utctimetuple()
                    )
                except dateutil.parser.ParserError as e:
                    log.warning(f"Can't parse {field!r} field: {e}")

        # comments
        in_reply_to = ap_object.get("inReplyTo")
        __, comments_node = await self.getCommentsNodes(item_id, in_reply_to)
        if comments_node is not None:
            comments_data = {
                "service": author_jid,
                "node": comments_node,
                "uri": uri.buildXMPPUri(
                    "pubsub",
                    path=author_jid,
                    node=comments_node
                )
            }
            mb_data["comments"] = [comments_data]

        return mb_data

    async def getReplyToIdFromXMPPNode(
        self,
        client: SatXMPPEntity,
        ap_account: str,
        parent_item: str,
        mb_data: dict
    ) -> str:
        """Get URL to use for ``inReplyTo`` field in AP item.

        There is currently no way to know the parent service of a comment with XEP-0277.
        To work around that, we try to check if we have this item in the cache (we
        should). If there is more that one item with this ID, we first try to find one
        with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.

        @param ap_account: AP account corresponding to the publication author
        @param parent_item: ID of the node where the publication this item is replying to
             has been posted
        @param mb_data: microblog data of the publication
        @return: URL to use in ``inReplyTo`` field
        """
        # FIXME: propose a protoXEP to properly get parent item, node and service

        found_items = await self.host.memory.storage.searchPubsubItems({
            "profiles": [client.profile],
            "names": [parent_item]
        })
        if not found_items:
            log.warning(f"parent item {parent_item!r} not found in cache")
            parent_ap_account = ap_account
        elif len(found_items) == 1:
            cached_node = found_items[0].node
            parent_ap_account = await self.getAPAccountFromJidAndNode(
                cached_node.service,
                cached_node.name
            )
        else:
            # we found several cached item with given ID, we check if there is one
            # corresponding to this author
            try:
                author = jid.JID(mb_data["author_jid"]).userhostJID()
                cached_item = next(
                    i for i in found_items
                    if jid.JID(i.data["publisher"]).userhostJID()
                    == author
                )
            except StopIteration:
                # no item corresponding to this author, we use ap_account
                log.warning(
                    "Can't find a single cached item for parent item "
                    f"{parent_item!r}"
                )
                parent_ap_account = ap_account
            else:
                cached_node = cached_item.node
                parent_ap_account = await self.getAPAccountFromJidAndNode(
                    cached_node.service,
                    cached_node.name
                )

        return self.buildAPURL(
            TYPE_ITEM, parent_ap_account, parent_item
        )

    async def mbdata2APitem(
        self,
        client: SatXMPPEntity,
        mb_data: dict,
        public=True
    ) -> dict:
        """Convert Libervia Microblog Data to ActivityPub item

        @param mb_data: microblog data (as used in plugin XEP-0277) to convert
            If ``public`` is True, ``service`` and ``node`` keys must be set.
            If ``published`` is not set, current datetime will be used
        @param public: True if the message is not a private/direct one
            if True, the AP Item will be marked as public, and AP followers of target AP
            account (which retrieve from ``service``) will be put in ``cc``.
            ``inReplyTo`` will also be set if suitable
            if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
            This is usually used for direct messages.
        @return: AP item
        """
        if not mb_data.get("id"):
            mb_data["id"] = shortuuid.uuid()
        if not mb_data.get("author_jid"):
            mb_data["author_jid"] = client.jid.userhost()
        ap_account = await self.getAPAccountFromJidAndNode(
            jid.JID(mb_data["author_jid"]),
            None
        )
        url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
        url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
        ap_object = {
            "id": url_item,
            "type": "Note",
            "published": utils.xmpp_date(mb_data.get("published")),
            "attributedTo": url_actor,
            "content": mb_data.get("content_xhtml") or mb_data["content"],
        }

        language = mb_data.get("language")
        if language:
            ap_object["contentMap"] = {language: ap_object["content"]}

        if public:
            ap_object["to"] = [NS_AP_PUBLIC]
            try:
                node = mb_data["node"]
                service = jid.JID(mb_data["service"])
            except KeyError:
                # node and service must always be specified when this method is used
                raise exceptions.InternalError(
                    "node or service is missing in mb_data"
                )
            target_ap_account = await self.getAPAccountFromJidAndNode(
                service, node
            )
            if service.host == self.client.jid.userhost:
                # service is a proxy JID for AP account
                actor_data = await self.getAPActorDataFromAccount(target_ap_account)
                followers = actor_data.get("followers")
            else:
                # service is a real XMPP entity
                followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account)
            if followers:
                ap_object["cc"] = [followers]
            if self._m.isCommentNode(node):
                parent_item = self._m.getParentItem(node)
                if service.host == self.client.jid.userhost():
                    # the publication is on a virtual node (i.e. an XMPP node managed by
                    # this gateway and linking to an ActivityPub actor)
                    ap_object["inReplyTo"] = parent_item
                else:
                    # the publication is from a followed real XMPP node
                    ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
                        client,
                        ap_account,
                        parent_item,
                        mb_data
                    )

        ap_item = {
            "@context": "https://www.w3.org/ns/activitystreams",
            "id": url_item,
            "type": "Create",
            "actor": url_actor,
            "object": ap_object
        }

        return ap_item

    async def publishMessage(
        self,
        client: SatXMPPEntity,
        mess_data: dict,
        service: jid.JID
    ) -> None:
        """Send an AP message

        .. note::

            This is a temporary method used for development only

        @param mess_data: message data. Following keys must be set:

            ``node``
              identifier of message which is being replied (this will
              correspond to pubsub node in the future)

            ``content_xhtml`` or ``content``
              message body (respectively in XHTML or plain text)

        @param service: JID corresponding to the AP actor.
        """
        if not service.user:
            raise ValueError("service must have a local part")
        account = self._e.unescape(service.user)
        ap_actor_data = await self.getAPActorDataFromAccount(account)

        try:
            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
        except KeyError:
            raise exceptions.DataError("Can't get ActivityPub actor inbox")

        item_data = await self.mbdata2APitem(client, mess_data)
        url_actor = item_data["object"]["attributedTo"]
        resp = await self.signAndPost(inbox_url, url_actor, item_data)
        if resp.code != 202:
            raise exceptions.NetworkError(f"unexpected return code: {resp.code}")

    async def apDeleteItem(
        self,
        jid_: jid.JID,
        node: Optional[str],
        item_id: str,
        public: bool = True
    ) -> Tuple[str, Dict[str, Any]]:
        """Build activity to delete an AP item

        @param jid_: JID of the entity deleting an item
        @param node: node where the item is deleted
            None if it's microblog or a message
        @param item_id: ID of the item to delete
            it's the Pubsub ID or message's origin ID
        @param public: if True, the activity will be addressed to public namespace
        @return: actor_id of the entity deleting the item, activity to send
        """
        author_account = await self.getAPAccountFromJidAndNode(jid_, node)
        author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account)
        url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id)
        ap_item = self.createActivity(
            "Delete",
            author_actor_id,
            {
                "id": url_item,
                "type": TYPE_TOMBSTONE
            }
        )
        if public:
            ap_item["to"] = [NS_AP_PUBLIC]
        url_actor = author_actor_id
        return url_actor, ap_item

    def _messageReceivedTrigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        """add the gateway workflow on post treatment"""
        if not self.client:
            log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
            return True
        post_treat.addCallback(
            lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data))
        )
        return True

    async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict:
        """Called once message has been parsed

        this method handle the conversion to AP items and posting
        """
        if client != self.client:
            return mess_data
        if mess_data["type"] not in ("chat", "normal"):
            log.warning(f"ignoring message with unexpected type: {mess_data['xml'].toXml()}")
            return mess_data
        if not self.isLocal(mess_data["from"]):
            log.warning(f"ignoring non local message: {mess_data['xml'].toXml()}")
            return mess_data
        if not mess_data["to"].user:
            log.warning(
                f"ignoring message addressed to gateway itself: {mess_data['xml'].toXml()}"
            )
            return mess_data

        actor_account = self._e.unescape(mess_data["to"].user)
        actor_id = await self.getAPActorIdFromAccount(actor_account)
        inbox = await self.getAPInboxFromId(actor_id)

        try:
            language, message = next(iter(mess_data["message"].items()))
        except (KeyError, StopIteration):
            log.warning(f"ignoring empty message: {mess_data}")
            return mess_data

        mb_data = {
            "content": message,
        }
        if language:
            mb_data["language"] = language
        origin_id = mess_data["extra"].get("origin_id")
        if origin_id:
            # we need to use origin ID when present to be able to retract the message
            mb_data["id"] = origin_id
        client = self.client.getVirtualClient(mess_data["from"])
        ap_item = await self.mbdata2APitem(client, mb_data, public=False)
        ap_item["object"]["to"] = ap_item["to"] = [actor_id]
        await self.signAndPost(inbox, ap_item["actor"], ap_item)
        return mess_data

    async def _onMessageRetract(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        retract_elt: domish.Element,
        fastened_elts
    ) -> bool:
        if client != self.client:
            return True
        from_jid = jid.JID(message_elt["from"])
        if not self.isLocal(from_jid):
            log.debug(
                f"ignoring retract request from non local jid {from_jid}"
            )
            return False
        to_jid = jid.JID(message_elt["to"])
        if (to_jid.host != self.client.jid.full() or not to_jid.user):
            # to_jid should be a virtual JID from this gateway
            raise exceptions.InternalError(
                f"Invalid destinee's JID: {to_jid.full()}"
            )
        ap_account = self._e.unescape(to_jid.user)
        actor_id = await self.getAPActorIdFromAccount(ap_account)
        inbox = await self.getAPInboxFromId(actor_id)
        url_actor, ap_item = await self.apDeleteItem(
            from_jid.userhostJID(), None, fastened_elts.id, public=False
        )
        resp = await self.signAndPost(inbox, url_actor, ap_item)
        if resp.code >= 300:
            text = await resp.text()
            log.warning(
                f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
                f"{pformat(ap_item)}"
            )
        return False

    async def newReplyToXMPPItem(
        self,
        client: SatXMPPEntity,
        ap_item: dict,
    ) -> None:
        """We got an AP item which is a reply to an XMPP item"""
        in_reply_to = ap_item["inReplyTo"]
        url_type, url_args = self.parseAPURL(in_reply_to)
        if url_type != "item":
            log.warning(
                "Ignoring AP item replying to an XMPP item with an unexpected URL "
                f"type({url_type!r}):\n{pformat(ap_item)}"
            )
            return
        try:
            parent_item_account, parent_item_id = url_args[0].split("/", 1)
        except (IndexError, ValueError):
            log.warning(
                "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
                f"({in_reply_to!r}):\n{pformat(ap_item)}"
            )
            return
        parent_item_service, parent_item_node = await self.getJIDAndNode(
            parent_item_account
        )
        if parent_item_node is None:
            parent_item_node = self._m.namespace
        items, __ = await self._p.getItems(
            client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
        )
        try:
            parent_item_elt = items[0]
        except IndexError:
            log.warning(
                f"Can't find parent item at {parent_item_service} (node "
                f"{parent_item_node!r})\n{pformat(ap_item)}")
            return
        parent_item_parsed = await self._m.item2mbdata(
            client, parent_item_elt, parent_item_service, parent_item_node
        )
        try:
            comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
            comment_node = parent_item_parsed["comments"][0]["node"]
        except (KeyError, IndexError):
            # we don't have a comment node set for this item
            from sat.tools.xml_tools import ppElt
            log.info(f"{ppElt(parent_item_elt.toXml())}")
            raise NotImplemented()
        else:
            __, item_elt = await self.apItem2MbDataAndElt(ap_item)
            await self._p.publish(client, comment_service, comment_node, [item_elt])

    def getAPItemTargets(self, item: Dict[str, Any]) -> Tuple[bool, Set[str], Set[str]]:
        """Retrieve targets of an AP item, and indicate if it's a public one

        @param item: AP object payload
        @return: Are returned:
            - is_public flag, indicating if the item is world-readable
            - targets of the item
            - targets of the items
        """
        targets: Set[str] = set()
        is_public = False
        # TODO: handle "audience"
        for key in ("to", "bto", "cc", "bcc"):
            values = item.get(key)
            if not values:
                continue
            if isinstance(values, str):
                values = [values]
            for value in values:
                if value in PUBLIC_TUPLE:
                    is_public = True
                    continue
                if not value:
                    continue
                if not self.isLocalURL(value):
                    continue
                targets.add(value)

        targets_types = {self.parseAPURL(t)[0] for t in targets}
        return is_public, targets, targets_types

    async def newAPItem(
        self,
        client: SatXMPPEntity,
        destinee: Optional[jid.JID],
        node: str,
        item: dict,
    ) -> None:
        """Analyse, cache and send notification for received AP item

        @param destinee: jid of the destinee,
        @param node: XMPP pubsub node
        @param item: AP object payload
        """
        is_public, targets, targets_types = self.getAPItemTargets(item)
        if not is_public and targets_types == {TYPE_ACTOR}:
            # this is a direct message
            await self.handleMessageAPItem(
                client, targets, destinee, item
            )
        else:
            await self.handlePubsubAPItem(
                client, targets, destinee, node, item, is_public
            )

    async def handleMessageAPItem(
        self,
        client: SatXMPPEntity,
        targets: Set[str],
        destinee: Optional[jid.JID],
        item: dict,
    ) -> None:
        """Parse and deliver direct AP items translating to XMPP messages

        @param targets: actors where the item must be delivered
        @param destinee: jid of the destinee,
        @param item: AP object payload
        """
        targets_jids = {await self.getJIDFromId(t) for t in targets}
        if destinee is not None:
            targets_jids.add(destinee)
        mb_data = await self.apItem2MBdata(item)
        defer_l = []
        for target_jid in targets_jids:
            defer_l.append(
                client.sendMessage(
                    target_jid,
                    {'': mb_data.get("content", "")},
                    mb_data.get("title"),
                    extra={"origin_id": mb_data["id"]}
                )
            )
        await defer.DeferredList(defer_l)

    async def handlePubsubAPItem(
        self,
        client: SatXMPPEntity,
        targets: Set[str],
        destinee: Optional[jid.JID],
        node: str,
        item: dict,
        public: bool
    ) -> None:
        """Analyse, cache and deliver AP items translating to Pubsub

        @param targets: actors/collections where the item must be delivered
        @param destinee: jid of the destinee,
        @param node: XMPP pubsub node
        @param item: AP object payload
        @param public: True if the item is public
        """
        # XXX: "public" is not used for now

        service = client.jid
        in_reply_to = item.get("inReplyTo")

        if in_reply_to and isinstance(in_reply_to, list):
            in_reply_to = in_reply_to[0]
        if in_reply_to and isinstance(in_reply_to, str):
            if self.isLocalURL(in_reply_to):
                # this is a reply to an XMPP item
                return await self.newReplyToXMPPItem(client, item)

            # this item is a reply to an AP item, we use or create a corresponding node
            # for comments
            parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
            node = parent_node or node
            cached_node = await self.host.memory.storage.getPubsubNode(
                client, service, node, with_subscriptions=True
            )
            if cached_node is None:
                try:
                    cached_node = await self.host.memory.storage.setPubsubNode(
                        client,
                        service,
                        node,
                        subscribed=True
                    )
                except IntegrityError as e:
                    if "unique" in str(e.orig).lower():
                        # the node may already exist, if it has been created just after
                        # getPubsubNode above
                        log.debug("ignoring UNIQUE constraint error")
                        cached_node = await self.host.memory.storage.getPubsubNode(
                            client, service, node, with_subscriptions=True
                        )
                    else:
                        raise e

        else:
            # it is a root item (i.e. not a reply to an other item)
            cached_node = await self.host.memory.storage.getPubsubNode(
                client, service, node, with_subscriptions=True
            )
            if cached_node is None:
                log.warning(
                    f"Received item in unknown node {node!r} at {service}. This may be "
                    f"due to a cache purge. We synchronise the node\n{item}"

                )
                return
        mb_data, item_elt = await self.apItem2MbDataAndElt(item)
        await self.host.memory.storage.cachePubsubItems(
            client,
            cached_node,
            [item_elt],
            [mb_data]
        )

        for subscription in cached_node.subscriptions:
            if subscription.state != SubscriptionState.SUBSCRIBED:
                continue
            self.pubsub_service.notifyPublish(
                service,
                node,
                [(subscription.subscriber, None, [item_elt])]
            )

    async def newAPDeleteItem(
        self,
        client: SatXMPPEntity,
        destinee: Optional[jid.JID],
        node: str,
        item: dict,
    ) -> None:
        """Analyse, cache and send notification for received AP item

        @param destinee: jid of the destinee,
        @param node: XMPP pubsub node
        @param activity: parent AP activity
        @param item: AP object payload
        """
        item_id = item.get("id")
        if not item_id:
            raise exceptions.DataError('"id" attribute is missing in item')
        if not item_id.startswith("http"):
            raise exceptions.DataError(f"invalid id: {item_id!r}")
        if self.isLocalURL(item_id):
            raise ValueError("Local IDs should not be used")

        # we have no way to know if a deleted item is a direct one (thus a message) or one
        # converted to pubsub. We check if the id is in message history to decide what to
        # do.
        history = await self.host.memory.storage.get(
            client,
            History,
            History.origin_id,
            item_id,
            (History.messages, History.subjects)
        )

        if history is not None:
            # it's a direct message
            if history.source_jid != client.jid:
                log.warning(
                    f"retraction received from an entity ''{client.jid}) which is "
                    f"not the original sender of the message ({history.source_jid}), "
                    "hack attemps?"
                )
                raise exceptions.PermissionError("forbidden")

            await self._r.retractByHistory(client, history)
        else:
            # no history in cache with this ID, it's probably a pubsub item
            cached_node = await self.host.memory.storage.getPubsubNode(
                client, client.jid, node, with_subscriptions=True
            )
            if cached_node is None:
                log.warning(
                    f"Received an item retract for node {node!r} at {client.jid} "
                    "which is not cached"
                )
                raise exceptions.NotFound
            await self.host.memory.storage.deletePubsubItems(cached_node, [item_id])
            # notifyRetract is expecting domish.Element instances
            item_elt = domish.Element((None, "item"))
            item_elt["id"] = item_id
            for subscription in cached_node.subscriptions:
                if subscription.state != SubscriptionState.SUBSCRIBED:
                    continue
                self.pubsub_service.notifyRetract(
                    client.jid,
                    node,
                    [(subscription.subscriber, None, [item_elt])]
                )