diff libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/__init__.py@c23cad65ae99
children c3b68fdc2de7
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,2781 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+import calendar
+import hashlib
+import json
+from pathlib import Path
+from pprint import pformat
+import re
+from typing import (
+    Any,
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+    overload,
+)
+from urllib import parse
+
+from cryptography.exceptions import InvalidSignature
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import padding
+import dateutil
+from dateutil.parser import parserinfo
+import shortuuid
+from sqlalchemy.exc import IntegrityError
+import treq
+from treq.response import _Response as TReqResponse
+from twisted.internet import defer, reactor, threads
+from twisted.web import http
+from twisted.words.protocols.jabber import error, jid
+from twisted.words.xish import domish
+from wokkel import pubsub, rsm
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory import persistent
+from libervia.backend.memory.sqla_mapping import History, SubscriptionState
+from libervia.backend.tools import utils
+from libervia.backend.tools.common import data_format, date_utils, tls, uri
+from libervia.backend.tools.common.async_utils import async_lru
+
+from .ad_hoc import APAdHocService
+from .events import APEvents
+from .constants import (
+    ACTIVITY_OBJECT_MANDATORY,
+    ACTIVITY_TARGET_MANDATORY,
+    ACTIVITY_TYPES,
+    ACTIVITY_TYPES_LOWER,
+    COMMENTS_MAX_PARENTS,
+    CONF_SECTION,
+    IMPORT_NAME,
+    LRU_MAX_SIZE,
+    MEDIA_TYPE_AP,
+    NS_AP,
+    NS_AP_PUBLIC,
+    PUBLIC_TUPLE,
+    TYPE_ACTOR,
+    TYPE_EVENT,
+    TYPE_FOLLOWERS,
+    TYPE_ITEM,
+    TYPE_LIKE,
+    TYPE_MENTION,
+    TYPE_REACTION,
+    TYPE_TOMBSTONE,
+    TYPE_JOIN,
+    TYPE_LEAVE
+)
+from .http_server import HTTPServer
+from .pubsub_service import APPubsubService
+from .regex import RE_MENTION
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "ap-gateway"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "ActivityPub Gateway component",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
+    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: [
+        "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277",
+        "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470",
+        "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY"
+    ],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "APGateway",
+    C.PI_HANDLER: C.BOOL_TRUE,
+    C.PI_DESCRIPTION: _(
+        "Gateway for bidirectional communication between XMPP and ActivityPub."
+    ),
+}
+
+HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
+RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
+RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
+RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
+
+
+class APGateway:
+    IMPORT_NAME = IMPORT_NAME
+    # show data send or received through HTTP, used for debugging
+    # 1: log POST objects
+    # 2: log POST and GET objects
+    # 3: log POST and GET objects with HTTP headers for GET requests
+    verbose = 0
+
+    def __init__(self, host):
+        self.host = host
+        self.initialised = False
+        self.client = None
+        self._p = host.plugins["XEP-0060"]
+        self._a = host.plugins["XEP-0084"]
+        self._e = host.plugins["XEP-0106"]
+        self._m = host.plugins["XEP-0277"]
+        self._v = host.plugins["XEP-0292"]
+        self._refs = host.plugins["XEP-0372"]
+        self._r = host.plugins["XEP-0424"]
+        self._sfs = host.plugins["XEP-0447"]
+        self._pps = host.plugins["XEP-0465"]
+        self._pa = host.plugins["XEP-0470"]
+        self._c = host.plugins["PUBSUB_CACHE"]
+        self._t = host.plugins["TEXT_SYNTAXES"]
+        self._i = host.plugins["IDENTITY"]
+        self._events = host.plugins["XEP-0471"]
+        self._p.add_managed_node(
+            "",
+            items_cb=self._items_received,
+            # we want to be sure that the callbacks are launched before pubsub cache's
+            # one, as we need to inspect items before they are actually removed from cache
+            # or updated
+            priority=1000
+        )
+        self.pubsub_service = APPubsubService(self)
+        self.ad_hoc = APAdHocService(self)
+        self.ap_events = APEvents(self)
+        host.trigger.add("message_received", self._message_received_trigger, priority=-1000)
+        host.trigger.add("XEP-0424_retractReceived", self._on_message_retract)
+        host.trigger.add("XEP-0372_ref_received", self._on_reference_received)
+
+        host.bridge.add_method(
+            "ap_send",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._publish_message,
+            async_=True,
+        )
+
+    def get_handler(self, __):
+        return self.pubsub_service
+
+    async def init(self, client):
+        if self.initialised:
+            return
+
+        self.initialised = True
+        log.info(_("ActivityPub Gateway initialization"))
+
+        # RSA keys
+        stored_data = await self.host.memory.storage.get_privates(
+            IMPORT_NAME, ["rsa_key"], profile=client.profile
+        )
+        private_key_pem = stored_data.get("rsa_key")
+        if private_key_pem is None:
+            self.private_key = await threads.deferToThread(
+                rsa.generate_private_key,
+                public_exponent=65537,
+                key_size=4096,
+            )
+            private_key_pem = self.private_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.PKCS8,
+                encryption_algorithm=serialization.NoEncryption()
+            ).decode()
+            await self.host.memory.storage.set_private_value(
+                IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
+            )
+        else:
+            self.private_key = serialization.load_pem_private_key(
+                private_key_pem.encode(),
+                password=None,
+            )
+        self.public_key = self.private_key.public_key()
+        self.public_key_pem = self.public_key.public_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PublicFormat.SubjectPublicKeyInfo
+        ).decode()
+
+        # params
+        # URL and port
+        self.public_url = self.host.memory.config_get(
+            CONF_SECTION, "public_url"
+        ) or self.host.memory.config_get(
+            CONF_SECTION, "xmpp_domain"
+        )
+        if self.public_url is None:
+            log.error(
+                '"public_url" not set in configuration, this is mandatory to have'
+                "ActivityPub Gateway running. Please set this option it to public facing "
+                f"url in {CONF_SECTION!r} configuration section."
+            )
+            return
+        if parse.urlparse(self.public_url).scheme:
+            log.error(
+                "Scheme must not be specified in \"public_url\", please remove it from "
+                "\"public_url\" configuration option. ActivityPub Gateway won't be run."
+            )
+            return
+        self.http_port = int(self.host.memory.config_get(
+            CONF_SECTION, 'http_port', 8123))
+        connection_type = self.host.memory.config_get(
+            CONF_SECTION, 'http_connection_type', 'https')
+        if connection_type not in ('http', 'https'):
+            raise exceptions.ConfigError(
+                'bad ap-gateay http_connection_type, you must use one of "http" or '
+                '"https"'
+            )
+        self.max_items = int(self.host.memory.config_get(
+            CONF_SECTION, 'new_node_max_items', 50
+
+        ))
+        self.comments_max_depth = int(self.host.memory.config_get(
+            CONF_SECTION, 'comments_max_depth', 0
+        ))
+        self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap')
+        self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
+        # True (default) if we provide gateway only to entities/services from our server
+        self.local_only = C.bool(
+            self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE)
+        )
+        # if True (default), mention will be parsed in non-private content coming from
+        # XMPP. This is necessary as XEP-0372 are coming separately from item where the
+        # mention is done, which is hard to impossible to translate to ActivityPub (where
+        # mention specified inside the item directly). See documentation for details.
+        self.auto_mentions = C.bool(
+            self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE)
+        )
+
+        html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get(
+            CONF_SECTION, 'html_redirect_dict', {}
+        )
+        self.html_redirect: Dict[str, List[dict]] = {}
+        for url_type, target in html_redirect.items():
+            if isinstance(target, str):
+                target = {"url": target}
+            elif not isinstance(target, dict):
+                raise exceptions.ConfigError(
+                    f"html_redirect target must be a URL or a dict, not {target!r}"
+                )
+            filters = target.setdefault("filters", {})
+            if "url" not in target:
+                log.warning(f"invalid HTML redirection, missing target URL: {target}")
+                continue
+            # a slash in the url_type is a syntactic shortcut to have a node filter
+            if "/" in url_type:
+                url_type, node_filter = url_type.split("/", 1)
+                filters["node"] = node_filter
+            self.html_redirect.setdefault(url_type, []).append(target)
+
+        # HTTP server launch
+        self.server = HTTPServer(self)
+        if connection_type == 'http':
+            reactor.listenTCP(self.http_port, self.server)
+        else:
+            options = tls.get_options_from_config(
+                self.host.memory.config, CONF_SECTION)
+            tls.tls_options_check(options)
+            context_factory = tls.get_tls_context_factory(options)
+            reactor.listenSSL(self.http_port, self.server, context_factory)
+
+    async def profile_connecting(self, client):
+        self.client = client
+        client.sendHistory = True
+        client._ap_storage = persistent.LazyPersistentBinaryDict(
+            IMPORT_NAME,
+            client.profile
+        )
+        await self.init(client)
+
+    def profile_connected(self, client):
+        self.ad_hoc.init(client)
+
+    async def _items_received(
+        self,
+        client: SatXMPPEntity,
+        itemsEvent: pubsub.ItemsEvent
+    ) -> None:
+        """Callback called when pubsub items are received
+
+        if the items are adressed to a JID corresponding to an AP actor, they are
+        converted to AP items and sent to the corresponding AP server.
+
+        If comments nodes are linked, they are automatically subscribed to get new items
+        from there too.
+        """
+        if client != self.client:
+            return
+        # we need recipient as JID and not gateway own JID to be able to use methods such
+        # as "subscribe"
+        client = self.client.get_virtual_client(itemsEvent.sender)
+        recipient = itemsEvent.recipient
+        if not recipient.user:
+            log.debug("ignoring items event without local part specified")
+            return
+
+        ap_account = self._e.unescape(recipient.user)
+
+        if self._pa.is_attachment_node(itemsEvent.nodeIdentifier):
+            await self.convert_and_post_attachments(
+                client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
+                itemsEvent.items
+            )
+        else:
+            await self.convert_and_post_items(
+                client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
+                itemsEvent.items
+            )
+
+    async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity:
+        """Get client for this component with a specified jid
+
+        This is needed to perform operations with the virtual JID corresponding to the AP
+        actor instead of the JID of the gateway itself.
+        @param actor_id: ID of the actor
+        @return: virtual client
+        """
+        local_jid = await self.get_jid_from_id(actor_id)
+        return self.client.get_virtual_client(local_jid)
+
+    def is_activity(self, data: dict) -> bool:
+        """Return True if the data has an activity type"""
+        try:
+            return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
+        except (KeyError, TypeError):
+            return False
+
+    async def ap_get(self, url: str) -> dict:
+        """Retrieve AP JSON from given URL
+
+        @raise error.StanzaError: "service-unavailable" is sent when something went wrong
+            with AP server
+        """
+        resp = await treq.get(
+            url,
+            headers = {
+                "Accept": [MEDIA_TYPE_AP],
+                "Content-Type": [MEDIA_TYPE_AP],
+            }
+        )
+        if resp.code >= 300:
+            text = await resp.text()
+            if resp.code == 404:
+                raise exceptions.NotFound(f"Can't find resource at {url}")
+            else:
+                msg = f"HTTP error {resp.code} (url: {url}): {text}"
+                raise exceptions.ExternalRequestError(msg)
+        try:
+            return await treq.json_content(resp)
+        except Exception as e:
+            raise error.StanzaError(
+                "service-unavailable",
+                text=f"Can't get AP data at {url}: {e}"
+            )
+
+    @overload
+    async def ap_get_object(self, data: dict, key: str) -> Optional[dict]:
+        ...
+
+    @overload
+    async def ap_get_object(
+        self, data: Union[str, dict], key: None = None
+    ) -> dict:
+        ...
+
+    async def ap_get_object(self, data, key = None):
+        """Retrieve an AP object, dereferencing when necessary
+
+        This method is to be used with attributes marked as "Functional" in
+        https://www.w3.org/TR/activitystreams-vocabulary
+        @param data: AP object where an other object is looked for, or the object itself
+        @param key: name of the object to look for, or None if data is the object directly
+        @return: found object if any
+        """
+        if key is not None:
+            value = data.get(key)
+        else:
+            value = data
+        if value is None:
+            if key is None:
+                raise ValueError("None can't be used with ap_get_object is key is None")
+            return None
+        elif isinstance(value, dict):
+            return value
+        elif isinstance(value, str):
+            if self.is_local_url(value):
+                return await self.ap_get_local_object(value)
+            else:
+                return await self.ap_get(value)
+        else:
+            raise NotImplementedError(
+                "was expecting a string or a dict, got {type(value)}: {value!r}}"
+            )
+
+    async def ap_get_local_object(
+        self,
+        url: str
+    ) -> dict:
+        """Retrieve or generate local object
+
+        for now, only handle XMPP items to convert to AP
+        """
+        url_type, url_args = self.parse_apurl(url)
+        if url_type == TYPE_ITEM:
+            try:
+                account, item_id = url_args
+            except ValueError:
+                raise ValueError(f"invalid URL: {url}")
+            author_jid, node = await self.get_jid_and_node(account)
+            if node is None:
+                node = self._m.namespace
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                self.client, author_jid, node
+            )
+            if not cached_node:
+                log.debug(f"node {node!r} at {author_jid} is not found in cache")
+                found_item = None
+            else:
+                cached_items, __ = await self.host.memory.storage.get_items(
+                    cached_node, item_ids=[item_id]
+                )
+                if not cached_items:
+                    log.debug(
+                        f"item {item_id!r} of {node!r} at {author_jid} is not found in "
+                        "cache"
+                    )
+                    found_item = None
+                else:
+                    found_item = cached_items[0].data
+
+            if found_item is None:
+                # the node is not in cache, we have to make a request to retrieve the item
+                # If the node doesn't exist, get_items will raise a NotFound exception
+                found_items, __ = await self._p.get_items(
+                    self.client, author_jid, node, item_ids=[item_id]
+                )
+                try:
+                    found_item = found_items[0]
+                except IndexError:
+                    raise exceptions.NotFound(f"requested item at {url} can't be found")
+
+            if node.startswith(self._events.namespace):
+                # this is an event
+                event_data = self._events.event_elt_2_event_data(found_item)
+                ap_item = await self.ap_events.event_data_2_ap_item(
+                    event_data, author_jid
+                )
+                # the URL must return the object and not the activity
+                ap_item["object"]["@context"] = ap_item["@context"]
+                return ap_item["object"]
+            else:
+                # this is a blog item
+                mb_data = await self._m.item_2_mb_data(
+                    self.client, found_item, author_jid, node
+                )
+                ap_item = await self.mb_data_2_ap_item(self.client, mb_data)
+                # the URL must return the object and not the activity
+                return ap_item["object"]
+        else:
+            raise NotImplementedError(
+                'only object from "item" URLs can be retrieved for now'
+            )
+
+    async def ap_get_list(
+        self,
+        data: dict,
+        key: str,
+        only_ids: bool = False
+    ) -> Optional[List[Dict[str, Any]]]:
+        """Retrieve a list of objects from AP data, dereferencing when necessary
+
+        This method is to be used with non functional vocabularies. Use ``ap_get_object``
+        otherwise.
+        If the value is a dictionary, it will be wrapped in a list
+        @param data: AP object where a list of objects is looked for
+        @param key: key of the list to look for
+        @param only_ids: if Trye, only items IDs are retrieved
+        @return: list of objects, or None if the key is not present
+        """
+        value = data.get(key)
+        if value is None:
+            return None
+        elif isinstance(value, str):
+            if self.is_local_url(value):
+                value = await self.ap_get_local_object(value)
+            else:
+                value = await self.ap_get(value)
+        if isinstance(value, dict):
+            return [value]
+        if not isinstance(value, list):
+            raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
+        if only_ids:
+            return [
+                {"id": v["id"]} if isinstance(v, dict) else {"id": v}
+                for v in value
+            ]
+        else:
+            return [await self.ap_get_object(i) for i in value]
+
+    async def ap_get_actors(
+        self,
+        data: dict,
+        key: str,
+        as_account: bool = True
+    ) -> List[str]:
+        """Retrieve AP actors from data
+
+        @param data: AP object containing a field with actors
+        @param key: field to use to retrieve actors
+        @param as_account: if True returns account handles, otherwise will return actor
+            IDs
+        @raise exceptions.DataError: there is not actor data or it is invalid
+        """
+        value = data.get(key)
+        if value is None:
+            raise exceptions.DataError(
+                f"no actor associated to object {data.get('id')!r}"
+            )
+        elif isinstance(value, dict):
+            actor_id = value.get("id")
+            if actor_id is None:
+                raise exceptions.DataError(
+                    f"invalid actor associated to object {data.get('id')!r}: {value!r}"
+                )
+            value = [actor_id]
+        elif isinstance(value, str):
+            value = [value]
+        elif isinstance(value, list):
+            try:
+                value = [a if isinstance(a, str) else a["id"] for a in value]
+            except (TypeError, KeyError):
+                raise exceptions.DataError(
+                    f"invalid actors list to object {data.get('id')!r}: {value!r}"
+                )
+        if not value:
+            raise exceptions.DataError(
+                f"list of actors is empty"
+            )
+        if as_account:
+            return [await self.get_ap_account_from_id(actor_id) for actor_id in value]
+        else:
+            return value
+
+    async def ap_get_sender_actor(
+        self,
+        data: dict,
+    ) -> str:
+        """Retrieve actor who sent data
+
+        This is done by checking "actor" field first, then "attributedTo" field.
+        Only the first found actor is taken into account
+        @param data: AP object
+        @return: actor id of the sender
+        @raise exceptions.NotFound: no actor has been found in data
+        """
+        try:
+            actors = await self.ap_get_actors(data, "actor", as_account=False)
+        except exceptions.DataError:
+            actors = None
+        if not actors:
+            try:
+                actors = await self.ap_get_actors(data, "attributedTo", as_account=False)
+            except exceptions.DataError:
+                raise exceptions.NotFound(
+                    'actor not specified in "actor" or "attributedTo"'
+                )
+        try:
+            return actors[0]
+        except IndexError:
+            raise exceptions.NotFound("list of actors is empty")
+
+    def must_encode(self, text: str) -> bool:
+        """Indicate if a text must be period encoded"""
+        return (
+            not RE_ALLOWED_UNQUOTED.match(text)
+            or text.startswith("___")
+            or "---" in text
+        )
+
+    def period_encode(self, text: str) -> str:
+        """Period encode a text
+
+        see [get_jid_and_node] for reasons of period encoding
+        """
+        return (
+            parse.quote(text, safe="")
+            .replace("---", "%2d%2d%2d")
+            .replace("___", "%5f%5f%5f")
+            .replace(".", "%2e")
+            .replace("~", "%7e")
+            .replace("%", ".")
+        )
+
+    async def get_ap_account_from_jid_and_node(
+        self,
+        jid_: jid.JID,
+        node: Optional[str]
+    ) -> str:
+        """Construct AP account from JID and node
+
+        The account construction will use escaping when necessary
+        """
+        if not node or node == self._m.namespace:
+            node = None
+
+        if self.client is None:
+            raise exceptions.InternalError("Client is not set yet")
+
+        if self.is_virtual_jid(jid_):
+            # this is an proxy JID to an AP Actor
+            return self._e.unescape(jid_.user)
+
+        if node and not jid_.user and not self.must_encode(node):
+            is_pubsub = await self.is_pubsub(jid_)
+            # when we have a pubsub service, the user part can be used to set the node
+            # this produces more user-friendly AP accounts
+            if is_pubsub:
+                jid_.user = node
+                node = None
+
+        is_local = self.is_local(jid_)
+        user = jid_.user if is_local else jid_.userhost()
+        if user is None:
+            user = ""
+        account_elts = []
+        if node and self.must_encode(node) or self.must_encode(user):
+            account_elts = ["___"]
+            if node:
+                node = self.period_encode(node)
+            user = self.period_encode(user)
+
+        if not user:
+            raise exceptions.InternalError("there should be a user part")
+
+        if node:
+            account_elts.extend((node, "---"))
+
+        account_elts.extend((
+            user, "@", jid_.host if is_local else self.client.jid.userhost()
+        ))
+        return "".join(account_elts)
+
+    def is_local(self, jid_: jid.JID) -> bool:
+        """Returns True if jid_ use a domain or subdomain of gateway's host"""
+        local_host = self.client.host.split(".")
+        assert local_host
+        return jid_.host.split(".")[-len(local_host):] == local_host
+
+    async def is_pubsub(self, jid_: jid.JID) -> bool:
+        """Indicate if a JID is a Pubsub service"""
+        host_disco = await self.host.get_disco_infos(self.client, jid_)
+        return (
+            ("pubsub", "service") in host_disco.identities
+            and not ("pubsub", "pep") in host_disco.identities
+        )
+
+    async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
+        """Decode raw AP account handle to get XMPP JID and Pubsub Node
+
+        Username are case insensitive.
+
+        By default, the username correspond to local username (i.e. username from
+        component's server).
+
+        If local name's domain is a pubsub service (and not PEP), the username is taken as
+        a pubsub node.
+
+        If ``---`` is present in username, the part before is used as pubsub node, and the
+        rest as a JID user part.
+
+        If username starts with ``___``, characters are encoded using period encoding
+        (i.e. percent encoding where a ``.`` is used instead of ``%``).
+
+        This horror is necessary due to limitation in some AP implementation (notably
+        Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
+
+        examples:
+
+        ``toto@example.org`` => JID = toto@example.org, node = None
+
+        ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a
+        non-local JID, and will work only if setings ``local_only`` is False), node = None
+
+        ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
+        JID = pubsub.example.org, node = toto
+
+        ``tata---toto@example.org`` => JID = toto@example.org, node = tata
+
+        ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
+        being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
+
+        @param ap_account: ActivityPub account handle (``username@domain.tld``)
+        @return: service JID and pubsub node
+            if pubsub node is None, default microblog pubsub node (and possibly other
+            nodes that plugins may hanlde) will be used
+        @raise ValueError: invalid account
+        @raise PermissionError: non local jid is used when gateway doesn't allow them
+        """
+        if ap_account.count("@") != 1:
+            raise ValueError("Invalid AP account")
+        if ap_account.startswith("___"):
+            encoded = True
+            ap_account = ap_account[3:]
+        else:
+            encoded = False
+
+        username, domain = ap_account.split("@")
+
+        if "---" in username:
+            node, username = username.rsplit("---", 1)
+        else:
+            node = None
+
+        if encoded:
+            username = parse.unquote(
+                RE_PERIOD_ENC.sub(r"%\g<hex>", username),
+                errors="strict"
+            )
+            if node:
+                node = parse.unquote(
+                    RE_PERIOD_ENC.sub(r"%\g<hex>", node),
+                    errors="strict"
+                )
+
+        if "@" in username:
+            username, domain = username.rsplit("@", 1)
+
+        if not node:
+            # we need to check host disco, because disco request to user may be
+            # blocked for privacy reason (see
+            # https://xmpp.org/extensions/xep-0030.html#security)
+            is_pubsub = await self.is_pubsub(jid.JID(domain))
+
+            if is_pubsub:
+                # if the host is a pubsub service and not a PEP, we consider that username
+                # is in fact the node name
+                node = username
+                username = None
+
+        jid_s = f"{username}@{domain}" if username else domain
+        try:
+            jid_ = jid.JID(jid_s)
+        except RuntimeError:
+            raise ValueError(f"Invalid jid: {jid_s!r}")
+
+        if self.local_only and not self.is_local(jid_):
+            raise exceptions.PermissionError(
+                "This gateway is configured to map only local entities and services"
+            )
+
+        return jid_, node
+
+    def get_local_jid_from_account(self, account: str) -> jid.JID:
+        """Compute JID linking to an AP account
+
+        The local jid is computer by escaping AP actor handle and using it as local part
+        of JID, where domain part is this gateway own JID
+        """
+        return jid.JID(
+            None,
+            (
+                self._e.escape(account),
+                self.client.jid.host,
+                None
+            )
+        )
+
+    async def get_jid_from_id(self, actor_id: str) -> jid.JID:
+        """Compute JID linking to an AP Actor ID
+
+        The local jid is computer by escaping AP actor handle and using it as local part
+        of JID, where domain part is this gateway own JID
+        If the actor_id comes from local server (checked with self.public_url), it means
+        that we have an XMPP entity, and the original JID is returned
+        """
+        if self.is_local_url(actor_id):
+            request_type, extra_args = self.parse_apurl(actor_id)
+            if request_type != TYPE_ACTOR or len(extra_args) != 1:
+                raise ValueError(f"invalid actor id: {actor_id!r}")
+            actor_jid, __ = await self.get_jid_and_node(extra_args[0])
+            return actor_jid
+
+        account = await self.get_ap_account_from_id(actor_id)
+        return self.get_local_jid_from_account(account)
+
+    def parse_apurl(self, url: str) -> Tuple[str, List[str]]:
+        """Parse an URL leading to an AP endpoint
+
+        @param url: URL to parse (schema is not mandatory)
+        @return: endpoint type and extra arguments
+        """
+        path = parse.urlparse(url).path.lstrip("/")
+        type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/")
+        return type_, [parse.unquote(a) for a in extra_args]
+
+    def build_apurl(self, type_:str , *args: str) -> str:
+        """Build an AP endpoint URL
+
+        @param type_: type of AP endpoing
+        @param arg: endpoint dependant arguments
+        """
+        return parse.urljoin(
+            self.base_ap_url,
+            str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args)))
+        )
+
+    def is_local_url(self, url: str) -> bool:
+        """Tells if an URL link to this component
+
+        ``public_url`` and ``ap_path`` are used to check the URL
+        """
+        return url.startswith(self.base_ap_url)
+
+    def is_virtual_jid(self, jid_: jid.JID) -> bool:
+        """Tell if a JID is an AP actor mapped through this gateway"""
+        return jid_.host == self.client.jid.userhost()
+
+    def build_signature_header(self, values: Dict[str, str]) -> str:
+        """Build key="<value>" signature header from signature data"""
+        fields = []
+        for key, value in values.items():
+            if key not in ("(created)", "(expired)"):
+                if '"' in value:
+                    raise NotImplementedError(
+                        "string escaping is not implemented, double-quote can't be used "
+                        f"in {value!r}"
+                    )
+                value = f'"{value}"'
+            fields.append(f"{key}={value}")
+
+        return ",".join(fields)
+
+    def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
+        """Get digest data to use in header and signature
+
+        @param body: body of the request
+        @return: hash name and digest
+        """
+        if algo != "SHA-256":
+            raise NotImplementedError("only SHA-256 is implemented for now")
+        return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def get_actor_data(self, actor_id) -> dict:
+        """Retrieve actor data with LRU cache"""
+        return await self.ap_get(actor_id)
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def get_actor_pub_key_data(
+        self,
+        actor_id: str
+    ) -> Tuple[str, str, rsa.RSAPublicKey]:
+        """Retrieve Public Key data from actor ID
+
+        @param actor_id: actor ID (url)
+        @return: key_id, owner and public_key
+        @raise KeyError: publicKey is missing from actor data
+        """
+        actor_data = await self.get_actor_data(actor_id)
+        pub_key_data = actor_data["publicKey"]
+        key_id = pub_key_data["id"]
+        owner = pub_key_data["owner"]
+        pub_key_pem = pub_key_data["publicKeyPem"]
+        pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
+        return key_id, owner, pub_key
+
+    def create_activity(
+        self,
+        activity: str,
+        actor_id: str,
+        object_: Optional[Union[str, dict]] = None,
+        target: Optional[Union[str, dict]] = None,
+        activity_id: Optional[str] = None,
+        **kwargs,
+    ) -> Dict[str, Any]:
+        """Generate base data for an activity
+
+        @param activity: one of ACTIVITY_TYPES
+        @param actor_id: AP actor ID of the sender
+        @param object_: content of "object" field
+        @param target: content of "target" field
+        @param activity_id: ID to use for the activity
+            if not set it will be automatically generated, but it is usually desirable to
+            set the ID manually so it can be retrieved (e.g. for Undo)
+        """
+        if activity not in ACTIVITY_TYPES:
+            raise exceptions.InternalError(f"invalid activity: {activity!r}")
+        if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
+            raise exceptions.InternalError(
+                f'"object_" is mandatory for activity {activity!r}'
+            )
+        if target is None and activity in ACTIVITY_TARGET_MANDATORY:
+            raise exceptions.InternalError(
+                f'"target" is mandatory for activity {activity!r}'
+            )
+        if activity_id is None:
+            activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
+        data: Dict[str, Any] = {
+            "@context": [NS_AP],
+            "actor": actor_id,
+            "id": activity_id,
+            "type": activity,
+        }
+        data.update(kwargs)
+        if object_ is not None:
+            data["object"] = object_
+        if target is not None:
+            data["target"] = target
+
+        return data
+
+    def get_key_id(self, actor_id: str) -> str:
+        """Get local key ID from actor ID"""
+        return f"{actor_id}#main-key"
+
+    async def check_signature(
+        self,
+        signature: str,
+        key_id: str,
+        headers: Dict[str, str]
+    ) -> str:
+        """Verify that signature matches given headers
+
+        see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
+
+        @param signature: Base64 encoded signature
+        @param key_id: ID of the key used to sign the data
+        @param headers: headers and their values, including pseudo-headers
+        @return: id of the signing actor
+
+        @raise InvalidSignature: signature doesn't match headers
+        """
+        to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
+        if key_id.startswith("acct:"):
+            actor = key_id[5:]
+            actor_id = await self.get_ap_actor_id_from_account(actor)
+        else:
+            actor_id = key_id.split("#", 1)[0]
+
+        pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id)
+        if pub_key_id != key_id or pub_key_owner != actor_id:
+            raise exceptions.EncryptionError("Public Key mismatch")
+
+        try:
+            pub_key.verify(
+                base64.b64decode(signature),
+                to_sign.encode(),
+                # we have to use PKCS1v15 padding to be compatible with Mastodon
+                padding.PKCS1v15(),  # type: ignore
+                hashes.SHA256()  # type: ignore
+            )
+        except InvalidSignature:
+            raise exceptions.EncryptionError(
+                "Invalid signature (using PKC0S1 v1.5 and SHA-256)"
+            )
+
+        return actor_id
+
+    def get_signature_data(
+            self,
+            key_id: str,
+            headers: Dict[str, str]
+    ) -> Tuple[Dict[str, str], Dict[str, str]]:
+        """Generate and return signature and corresponding headers
+
+        @param parsed_url: URL where the request is sent/has been received
+        @param key_id: ID of the key (URL linking to the data with public key)
+        @param date: HTTP datetime string of signature generation
+        @param body: body of the HTTP request
+        @param headers: headers to sign and their value:
+            default value will be used if not specified
+
+        @return: headers and signature data
+            ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
+            removed, and ``Signature`` added.
+        """
+        # headers must be lower case
+        l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()}
+        to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items())
+        signature = base64.b64encode(self.private_key.sign(
+            to_sign.encode(),
+            # we have to use PKCS1v15 padding to be compatible with Mastodon
+            padding.PKCS1v15(),  # type: ignore
+            hashes.SHA256()  # type: ignore
+        )).decode()
+        sign_data = {
+            "keyId": key_id,
+            "Algorithm": "rsa-sha256",
+            "headers": " ".join(l_headers.keys()),
+            "signature": signature
+        }
+        new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
+        new_headers["Signature"] = self.build_signature_header(sign_data)
+        return new_headers, sign_data
+
+    async def convert_and_post_items(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        service: jid.JID,
+        node: str,
+        items: List[domish.Element],
+        subscribe_extra_nodes: bool = True,
+    ) -> None:
+        """Convert XMPP items to AP items and post them to actor inbox
+
+        @param ap_account: account of ActivityPub actor receiving the item
+        @param service: JID of the (virtual) pubsub service where the item has been
+            published
+        @param node: (virtual) node corresponding where the item has been published
+        @param subscribe_extra_nodes: if True, extra data nodes will be automatically
+            subscribed, that is comment nodes if present and attachments nodes.
+        """
+        actor_id = await self.get_ap_actor_id_from_account(ap_account)
+        inbox = await self.get_ap_inbox_from_id(actor_id)
+        for item in items:
+            if item.name == "item":
+                cached_item = await self.host.memory.storage.search_pubsub_items({
+                    "profiles": [self.client.profile],
+                    "services": [service],
+                    "nodes": [node],
+                    "names": [item["id"]]
+                })
+                is_new = not bool(cached_item)
+                if node.startswith(self._events.namespace):
+                    # event item
+                    event_data = self._events.event_elt_2_event_data(item)
+                    try:
+                        author_jid = jid.JID(item["publisher"]).userhostJID()
+                    except (KeyError, RuntimeWarning):
+                        root_elt = item
+                        while root_elt.parent is not None:
+                            root_elt = root_elt.parent
+                        author_jid = jid.JID(root_elt["from"]).userhostJID()
+                    if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.get_local_jid_from_account(ap_account)
+                        recipient_client = self.client.get_virtual_client(recipient_jid)
+                        comments_data = event_data.get("comments")
+                        if comments_data:
+                            comment_service = jid.JID(comments_data["jid"])
+                            comment_node = comments_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, event_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {event_data['id']!r} "
+                                f"on {node!r} at {service}"
+                            )
+                    ap_item = await self.ap_events.event_data_2_ap_item(
+                        event_data, author_jid, is_new=is_new
+                    )
+                else:
+                    # blog item
+                    mb_data = await self._m.item_2_mb_data(client, item, service, node)
+                    author_jid = jid.JID(mb_data["author_jid"])
+                    if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.get_local_jid_from_account(ap_account)
+                        recipient_client = self.client.get_virtual_client(recipient_jid)
+                        for comment_data in mb_data.get("comments", []):
+                            comment_service = jid.JID(comment_data["service"])
+                            if self.is_virtual_jid(comment_service):
+                                log.debug(
+                                    f"ignoring virtual comment service: {comment_data}"
+                                )
+                                continue
+                            comment_node = comment_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, mb_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {mb_data['id']!r} on "
+                                f"{node!r} at {service}"
+                            )
+                    ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
+
+                url_actor = ap_item["actor"]
+            elif item.name == "retract":
+                url_actor, ap_item = await self.ap_delete_item(
+                    client.jid, node, item["id"]
+                )
+            else:
+                raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
+            await self.sign_and_post(inbox, url_actor, ap_item)
+
+    async def convert_and_post_attachments(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        service: jid.JID,
+        node: str,
+        items: List[domish.Element],
+        publisher: Optional[jid.JID] = None
+    ) -> None:
+        """Convert XMPP item attachments to AP activities and post them to actor inbox
+
+        @param ap_account: account of ActivityPub actor receiving the item
+        @param service: JID of the (virtual) pubsub service where the item has been
+            published
+        @param node: (virtual) node corresponding where the item has been published
+            subscribed, that is comment nodes if present and attachments nodes.
+        @param items: attachments items
+        @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub
+            service, it's the publisher of the item). To be filled only when the publisher
+            is known for sure, otherwise publisher will be determined either if
+            "publisher" attribute is set by pubsub service, or as a last resort, using
+            item's ID (which MUST be publisher bare JID according to pubsub-attachments
+            specification).
+        """
+        if len(items) != 1:
+            log.warning(
+                "we should get exactly one attachment item for an entity, got "
+                f"{len(items)})"
+            )
+
+        actor_id = await self.get_ap_actor_id_from_account(ap_account)
+        inbox = await self.get_ap_inbox_from_id(actor_id)
+
+        item_elt = items[0]
+        item_id = item_elt["id"]
+
+        if publisher is None:
+            item_pub_s = item_elt.getAttribute("publisher")
+            publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id)
+
+        if publisher.userhost() != item_id:
+            log.warning(
+                "attachments item ID must be publisher's bare JID, ignoring: "
+                f"{item_elt.toXml()}"
+            )
+            return
+
+        if self.is_virtual_jid(publisher):
+            log.debug(f"ignoring item coming from local virtual JID {publisher}")
+            return
+
+        if publisher is not None:
+            item_elt["publisher"] = publisher.userhost()
+
+        item_service, item_node, item_id = self._pa.attachment_node_2_item(node)
+        item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node)
+        if self.is_virtual_jid(item_service):
+            # it's a virtual JID mapping to an external AP actor, we can use the
+            # item_id directly
+            item_url = item_id
+            if not item_url.startswith("https:"):
+                log.warning(
+                    "item ID of external AP actor is not an https link, ignoring: "
+                    f"{item_id!r}"
+                )
+                return
+        else:
+            item_url = self.build_apurl(TYPE_ITEM, item_account, item_id)
+
+        old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({
+            "profiles": [self.client.profile],
+            "services": [service],
+            "nodes": [node],
+            "names": [item_elt["id"]]
+        })
+        if not old_attachment_pubsub_items:
+            old_attachment = {}
+        else:
+            old_attachment_items = [i.data for i in old_attachment_pubsub_items]
+            old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items)
+            try:
+                old_attachment = old_attachments[0]
+            except IndexError:
+                # no known element was present in attachments
+                old_attachment = {}
+        publisher_account = await self.get_ap_account_from_jid_and_node(
+            publisher,
+            None
+        )
+        publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account)
+        try:
+            attachments = self._pa.items_2_attachment_data(client, [item_elt])[0]
+        except IndexError:
+            # no known element was present in attachments
+            attachments = {}
+
+        # noticed
+        if "noticed" in attachments:
+            if not "noticed" in old_attachment:
+                # new "noticed" attachment, we translate to "Like" activity
+                activity_id = self.build_apurl("like", item_account, item_id)
+                activity = self.create_activity(
+                    TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
+                )
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.sign_and_post(inbox, publisher_actor_id, activity)
+        else:
+            if "noticed" in old_attachment:
+                # "noticed" attachment has been removed, we undo the "Like" activity
+                activity_id = self.build_apurl("like", item_account, item_id)
+                activity = self.create_activity(
+                    TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
+                )
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                undo = self.create_activity("Undo", publisher_actor_id, activity)
+                await self.sign_and_post(inbox, publisher_actor_id, undo)
+
+        # reactions
+        new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
+        old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
+        reactions_remove = old_reactions - new_reactions
+        reactions_add = new_reactions - old_reactions
+        for reactions, undo in ((reactions_remove, True), (reactions_add, False)):
+            for reaction in reactions:
+                activity_id = self.build_apurl(
+                    "reaction", item_account, item_id, reaction.encode().hex()
+                )
+                reaction_activity = self.create_activity(
+                    TYPE_REACTION, publisher_actor_id, item_url,
+                    activity_id=activity_id
+                )
+                reaction_activity["content"] = reaction
+                reaction_activity["to"] = [ap_account]
+                reaction_activity["cc"] = [NS_AP_PUBLIC]
+                if undo:
+                    activy = self.create_activity(
+                        "Undo", publisher_actor_id, reaction_activity
+                    )
+                else:
+                    activy = reaction_activity
+                await self.sign_and_post(inbox, publisher_actor_id, activy)
+
+        # RSVP
+        if "rsvp" in attachments:
+            attending = attachments["rsvp"].get("attending", "no")
+            old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+            if attending != old_attending:
+                activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
+                activity_id = self.build_apurl(activity_type.lower(), item_account, item_id)
+                activity = self.create_activity(
+                    activity_type, publisher_actor_id, item_url, activity_id=activity_id
+                )
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.sign_and_post(inbox, publisher_actor_id, activity)
+        else:
+            if "rsvp" in old_attachment:
+                old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+                if old_attending == "yes":
+                    activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id)
+                    activity = self.create_activity(
+                        TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
+                    )
+                    activity["to"] = [ap_account]
+                    activity["cc"] = [NS_AP_PUBLIC]
+                    await self.sign_and_post(inbox, publisher_actor_id, activity)
+
+        if service.user and self.is_virtual_jid(service):
+            # the item is on a virtual service, we need to store it in cache
+            log.debug("storing attachments item in cache")
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, node, with_subscriptions=True, create=True
+            )
+            await self.host.memory.storage.cache_pubsub_items(
+                self.client,
+                cached_node,
+                [item_elt],
+                [attachments]
+            )
+
+    async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
+        """Sign a documentent and post it to AP server
+
+        @param url: AP server endpoint
+        @param actor_id: originating actor ID (URL)
+        @param doc: document to send
+        """
+        if self.verbose:
+            __, actor_args = self.parse_apurl(actor_id)
+            actor_account = actor_args[0]
+            to_log = [
+                "",
+                f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}"
+            ]
+
+        p_url = parse.urlparse(url)
+        body = json.dumps(doc).encode()
+        digest_algo, digest_hash = self.get_digest(body)
+        digest = f"{digest_algo}={digest_hash}"
+
+        headers = {
+            "(request-target)": f"post {p_url.path}",
+            "Host": p_url.hostname,
+            "Date": http.datetimeToString().decode(),
+            "Digest": digest
+        }
+        headers["Content-Type"] = (
+            'application/activity+json'
+        )
+        headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers)
+
+        if self.verbose:
+            if self.verbose>=3:
+                h_to_log = "\n".join(f"    {k}: {v}" for k,v in headers.items())
+                to_log.append(f"  headers:\n{h_to_log}")
+            to_log.append("---")
+            log.info("\n".join(to_log))
+
+        resp = await treq.post(
+            url,
+            body,
+            headers=headers,
+        )
+        if resp.code >= 300:
+            text = await resp.text()
+            log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
+        elif self.verbose:
+            log.info(f"==> response code: {resp.code}")
+        return resp
+
+    def _publish_message(self, mess_data_s: str, service_s: str, profile: str):
+        mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
+        service = jid.JID(service_s)
+        client = self.host.get_client(profile)
+        return defer.ensureDeferred(self.publish_message(client, mess_data, service))
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def get_ap_actor_id_from_account(self, account: str) -> str:
+        """Retrieve account ID from it's handle using WebFinger
+
+        Don't use this method to get local actor id from a local account derivated for
+        JID: in this case, the actor ID is retrieve with
+        ``self.build_apurl(TYPE_ACTOR, ap_account)``
+
+        @param account: AP handle (user@domain.tld)
+        @return: Actor ID (which is an URL)
+        """
+        if account.count("@") != 1 or "/" in account:
+            raise ValueError(f"Invalid account: {account!r}")
+        host = account.split("@")[1]
+        try:
+            finger_data = await treq.json_content(await treq.get(
+                f"https://{host}/.well-known/webfinger?"
+                f"resource=acct:{parse.quote_plus(account)}",
+            ))
+        except Exception as e:
+            raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
+        for link in finger_data.get("links", []):
+            if (
+                link.get("type") == "application/activity+json"
+                and link.get("rel") == "self"
+            ):
+                href = link.get("href", "").strip()
+                if not href:
+                    raise ValueError(
+                        f"Invalid webfinger data for {account:r}: missing href"
+                    )
+                break
+        else:
+            raise ValueError(
+                f"No ActivityPub link found for {account!r}"
+            )
+        return href
+
+    async def get_ap_actor_data_from_account(self, account: str) -> dict:
+        """Retrieve ActivityPub Actor data
+
+        @param account: ActivityPub Actor identifier
+        """
+        href = await self.get_ap_actor_id_from_account(account)
+        return await self.ap_get(href)
+
+    async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str:
+        """Retrieve inbox of an actor_id
+
+        @param use_shared: if True, and a shared inbox exists, it will be used instead of
+            the user inbox
+        """
+        data = await self.get_actor_data(actor_id)
+        if use_shared:
+            try:
+                return data["endpoints"]["sharedInbox"]
+            except KeyError:
+                pass
+        return data["inbox"]
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def get_ap_account_from_id(self, actor_id: str) -> str:
+        """Retrieve AP account from the ID URL
+
+        Works with external or local actor IDs.
+        @param actor_id: AP ID of the actor (URL to the actor data)
+        @return: AP handle
+        """
+        if self.is_local_url(actor_id):
+            url_type, url_args = self.parse_apurl(actor_id)
+            if url_type != "actor" or not url_args:
+                raise exceptions.DataError(
+                    f"invalid local actor ID: {actor_id}"
+                )
+            account = url_args[0]
+            try:
+                account_user, account_host = account.split('@')
+            except ValueError:
+                raise exceptions.DataError(
+                    f"invalid account from url: {actor_id}"
+                )
+            if not account_user or account_host != self.public_url:
+                raise exceptions.DataError(
+                    f"{account!r} is not a valid local account (from {actor_id})"
+                )
+            return account
+
+        url_parsed = parse.urlparse(actor_id)
+        actor_data = await self.get_actor_data(actor_id)
+        username = actor_data.get("preferredUsername")
+        if not username:
+            raise exceptions.DataError(
+                'No "preferredUsername" field found, can\'t retrieve actor account'
+            )
+        account = f"{username}@{url_parsed.hostname}"
+        # we try to retrieve the actor ID from the account to check it
+        found_id = await self.get_ap_actor_id_from_account(account)
+        if found_id != actor_id:
+            # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
+            msg = (
+                f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
+                f"({actor_id!r}). This AP instance doesn't seems to use "
+                '"preferredUsername" as we expect.'
+            )
+            log.warning(msg)
+            raise exceptions.DataError(msg)
+        return account
+
+    async def get_ap_items(
+        self,
+        collection: dict,
+        max_items: Optional[int] = None,
+        chronological_pagination: bool = True,
+        after_id: Optional[str] = None,
+        start_index: Optional[int] = None,
+        parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
+        only_ids: bool = False,
+    ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
+        """Retrieve AP items and convert them to XMPP items
+
+        @param account: AP account handle to get items from
+        @param max_items: maximum number of items to retrieve
+            retrieve all items by default
+        @param chronological_pagination: get pages in chronological order
+            AP use reversed chronological order for pagination, "first" page returns more
+            recent items. If "chronological_pagination" is True, "last" AP page will be
+            retrieved first.
+        @param after_id: if set, retrieve items starting from given ID
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            if ``after_id`` is not already in cache, we have to retrieve every page until
+            we find it.
+            In most common cases, ``after_id`` should be in cache though (client usually
+            use known ID when in-order pagination is used).
+        @param start_index: start retrieving items from the one with given index
+            Due to ActivityStream Collection Paging limitations, this is inefficient and
+            all pages before the requested index will be retrieved to count items.
+        @param parser: method to use to parse AP items and get XMPP item elements
+            if None, use default generic parser
+        @param only_ids: if True, only retrieve items IDs
+            Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
+            sufficient in some use cases (e.g. when retrieving following/followers
+            collections)
+        @return: XMPP Pubsub items and corresponding RSM Response
+            Items are always returned in chronological order in the result
+        """
+        if parser is None:
+            parser = self.ap_item_2_mb_elt
+
+        rsm_resp: Dict[str, Union[bool, int]] = {}
+        try:
+            count = collection["totalItems"]
+        except KeyError:
+            log.warning(
+                f'"totalItems" not found in collection {collection.get("id")}, '
+                "defaulting to 20"
+            )
+            count = 20
+        else:
+            log.info(f"{collection.get('id')} has {count} item(s)")
+
+            rsm_resp["count"] = count
+
+        if start_index is not None:
+            assert chronological_pagination and after_id is None
+            if start_index >= count:
+                return [], rsm_resp
+            elif start_index == 0:
+                # this is the default behaviour
+                pass
+            elif start_index > 5000:
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text="Maximum limit for previous_index has been reached, this limit"
+                    "is set to avoid DoS"
+                )
+            else:
+                # we'll convert "start_index" to "after_id", thus we need the item just
+                # before "start_index"
+                previous_index = start_index - 1
+                retrieved_items = 0
+                current_page = collection["last"]
+                while retrieved_items < count:
+                    page_data, items = await self.parse_ap_page(
+                        current_page, parser, only_ids
+                    )
+                    if not items:
+                        log.warning(f"found an empty AP page at {current_page}")
+                        return [], rsm_resp
+                    page_start_idx = retrieved_items
+                    retrieved_items += len(items)
+                    if previous_index <= retrieved_items:
+                        after_id = items[previous_index - page_start_idx]["id"]
+                        break
+                    try:
+                        current_page = page_data["prev"]
+                    except KeyError:
+                        log.warning(
+                            f"missing previous page link at {current_page}: {page_data!r}"
+                        )
+                        raise error.StanzaError(
+                            "service-unavailable",
+                            "Error while retrieving previous page from AP service at "
+                            f"{current_page}"
+                        )
+
+        init_page = "last" if chronological_pagination else "first"
+        page = collection.get(init_page)
+        if not page:
+            raise exceptions.DataError(
+                f"Initial page {init_page!r} not found for collection "
+                f"{collection.get('id')})"
+            )
+        items = []
+        page_items = []
+        retrieved_items = 0
+        found_after_id = False
+
+        while retrieved_items < count:
+            __, page_items = await self.parse_ap_page(page, parser, only_ids)
+            if not page_items:
+                break
+            retrieved_items += len(page_items)
+            if after_id is not None and not found_after_id:
+                # if we have an after_id, we ignore all items until the requested one is
+                # found
+                try:
+                    limit_idx = [i["id"] for i in page_items].index(after_id)
+                except ValueError:
+                    # if "after_id" is not found, we don't add any item from this page
+                    page_id = page.get("id") if isinstance(page, dict) else page
+                    log.debug(f"{after_id!r} not found at {page_id}, skipping")
+                else:
+                    found_after_id = True
+                    if chronological_pagination:
+                        start_index = retrieved_items - len(page_items) + limit_idx + 1
+                        page_items = page_items[limit_idx+1:]
+                    else:
+                        start_index = count - (retrieved_items - len(page_items) +
+                                               limit_idx + 1)
+                        page_items = page_items[:limit_idx]
+                    items.extend(page_items)
+            else:
+                items.extend(page_items)
+            if max_items is not None and len(items) >= max_items:
+                if chronological_pagination:
+                    items = items[:max_items]
+                else:
+                    items = items[-max_items:]
+                break
+            page = collection.get("prev" if chronological_pagination else "next")
+            if not page:
+                break
+
+        if after_id is not None and not found_after_id:
+            raise error.StanzaError("item-not-found")
+
+        if items:
+            if after_id is None:
+                rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
+            if start_index is not None:
+                rsm_resp["index"] = start_index
+            elif after_id is not None:
+                log.warning("Can't determine index of first element")
+            elif chronological_pagination:
+                rsm_resp["index"] = 0
+            else:
+                rsm_resp["index"] = count - len(items)
+            rsm_resp.update({
+                "first": items[0]["id"],
+                "last": items[-1]["id"]
+            })
+
+        return items, rsm.RSMResponse(**rsm_resp)
+
+    async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
+        """Convert AP item to parsed microblog data and corresponding item element"""
+        mb_data = await self.ap_item_2_mb_data(ap_item)
+        item_elt = await self._m.mb_data_2_entry_elt(
+            self.client, mb_data, mb_data["id"], None, self._m.namespace
+        )
+        if "repeated" in mb_data["extra"]:
+            item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
+        else:
+            item_elt["publisher"] = mb_data["author_jid"]
+        return mb_data, item_elt
+
+    async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
+        """Convert AP item to XMPP item element"""
+        __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
+        return item_elt
+
+    async def parse_ap_page(
+        self,
+        page: Union[str, dict],
+        parser: Callable[[dict], Awaitable[domish.Element]],
+        only_ids: bool = False
+    ) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
+
+        @param page: Can be either url linking and AP page, or the page data directly
+        @param parser: method to use to parse AP items and get XMPP item elements
+        @param only_ids: if True, only retrieve items IDs
+        @return: page data, pubsub items
+        """
+        page_data = await self.ap_get_object(page)
+        if page_data is None:
+            log.warning('No data found in collection')
+            return {}, []
+        ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids)
+        if ap_items is None:
+            ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids)
+            if not ap_items:
+                log.warning(f'No item field found in collection: {page_data!r}')
+                return page_data, []
+            else:
+                log.warning(
+                    "Items are not ordered, this is not spec compliant"
+                )
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                items.append(await parser(ap_item))
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+
+        return page_data, items
+
+    async def get_comments_nodes(
+        self,
+        item_id: str,
+        parent_id: Optional[str]
+    ) -> Tuple[Optional[str], Optional[str]]:
+        """Get node where this item is and node to use for comments
+
+        if config option "comments_max_depth" is set, a common node will be used below the
+        given depth
+        @param item_id: ID of the reference item
+        @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
+        @return: a tuple with parent_node_id, comments_node_id:
+            - parent_node_id is the ID of the node where reference item must be. None is
+              returned when the root node (i.e. not a comments node) must be used.
+            - comments_node_id: is the ID of the node to use for comments. None is
+              returned when no comment node must be used (happens when we have reached
+              "comments_max_depth")
+        """
+        if parent_id is None or not self.comments_max_depth:
+            return (
+                self._m.get_comments_node(parent_id) if parent_id is not None else None,
+                self._m.get_comments_node(item_id)
+            )
+        parent_url = parent_id
+        parents = []
+        for __ in range(COMMENTS_MAX_PARENTS):
+            parent_item = await self.ap_get(parent_url)
+            parents.insert(0, parent_item)
+            parent_url = parent_item.get("inReplyTo")
+            if parent_url is None:
+                break
+        parent_limit = self.comments_max_depth-1
+        if len(parents) <= parent_limit:
+            return (
+                self._m.get_comments_node(parents[-1]["id"]),
+                self._m.get_comments_node(item_id)
+            )
+        else:
+            last_level_item = parents[parent_limit]
+            return (
+                self._m.get_comments_node(last_level_item["id"]),
+                None
+            )
+
+    async def ap_item_2_mb_data(self, ap_item: dict) -> dict:
+        """Convert AP activity or object to microblog data
+
+        @param ap_item: ActivityPub item to convert
+            Can be either an activity of an object
+        @return: AP Item's Object and microblog data
+        @raise exceptions.DataError: something is invalid in the AP item
+        @raise NotImplementedError: some AP data is not handled yet
+        @raise error.StanzaError: error while contacting the AP server
+        """
+        is_activity = self.is_activity(ap_item)
+        if is_activity:
+            ap_object = await self.ap_get_object(ap_item, "object")
+            if not ap_object:
+                log.warning(f'No "object" found in AP item {ap_item!r}')
+                raise exceptions.DataError
+        else:
+            ap_object = ap_item
+        item_id = ap_object.get("id")
+        if not item_id:
+            log.warning(f'No "id" found in AP item: {ap_object!r}')
+            raise exceptions.DataError
+        mb_data = {"id": item_id, "extra": {}}
+
+        # content
+        try:
+            language, content_xhtml = ap_object["contentMap"].popitem()
+        except (KeyError, AttributeError):
+            try:
+                mb_data["content_xhtml"] = ap_object["content"]
+            except KeyError:
+                log.warning(f"no content found:\n{ap_object!r}")
+                raise exceptions.DataError
+        else:
+            mb_data["language"] = language
+            mb_data["content_xhtml"] = content_xhtml
+
+        mb_data["content"] = await self._t.convert(
+            mb_data["content_xhtml"],
+            self._t.SYNTAX_XHTML,
+            self._t.SYNTAX_TEXT,
+            False,
+        )
+
+        if "attachment" in ap_object:
+            attachments = mb_data["extra"][C.KEY_ATTACHMENTS] = []
+            for ap_attachment in ap_object["attachment"]:
+                try:
+                    url = ap_attachment["url"]
+                except KeyError:
+                    log.warning(
+                        f'"url" missing in AP attachment, ignoring: {ap_attachment}'
+                    )
+                    continue
+
+                if not url.startswith("http"):
+                    log.warning(f"non HTTP URL in attachment, ignoring: {ap_attachment}")
+                    continue
+                attachment = {"url": url}
+                for ap_key, key in (
+                    ("mediaType", "media_type"),
+                    # XXX: as weird as it seems, "name" is actually used for description
+                    #   in AP world
+                    ("name", "desc"),
+                ):
+                    value = ap_attachment.get(ap_key)
+                    if value:
+                        attachment[key] = value
+                attachments.append(attachment)
+
+        # author
+        if is_activity:
+            authors = await self.ap_get_actors(ap_item, "actor")
+        else:
+            authors = await self.ap_get_actors(ap_object, "attributedTo")
+        if len(authors) > 1:
+            # we only keep first item as author
+            # TODO: handle multiple actors
+            log.warning("multiple actors are not managed")
+
+        account = authors[0]
+        author_jid = self.get_local_jid_from_account(account).full()
+
+        mb_data["author"] = account.split("@", 1)[0]
+        mb_data["author_jid"] = author_jid
+
+        # published/updated
+        for field in ("published", "updated"):
+            value = ap_object.get(field)
+            if not value and field == "updated":
+                value = ap_object.get("published")
+            if value:
+                try:
+                    mb_data[field] = calendar.timegm(
+                        dateutil.parser.parse(str(value)).utctimetuple()
+                    )
+                except dateutil.parser.ParserError as e:
+                    log.warning(f"Can't parse {field!r} field: {e}")
+
+        # repeat
+        if "_repeated" in ap_item:
+            mb_data["extra"]["repeated"] = ap_item["_repeated"]
+
+        # comments
+        in_reply_to = ap_object.get("inReplyTo")
+        __, comments_node = await self.get_comments_nodes(item_id, in_reply_to)
+        if comments_node is not None:
+            comments_data = {
+                "service": author_jid,
+                "node": comments_node,
+                "uri": uri.build_xmpp_uri(
+                    "pubsub",
+                    path=author_jid,
+                    node=comments_node
+                )
+            }
+            mb_data["comments"] = [comments_data]
+
+        return mb_data
+
+    async def get_reply_to_id_from_xmpp_node(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        parent_item: str,
+        mb_data: dict
+    ) -> str:
+        """Get URL to use for ``inReplyTo`` field in AP item.
+
+        There is currently no way to know the parent service of a comment with XEP-0277.
+        To work around that, we try to check if we have this item in the cache (we
+        should). If there is more that one item with this ID, we first try to find one
+        with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.
+
+        @param ap_account: AP account corresponding to the publication author
+        @param parent_item: ID of the node where the publication this item is replying to
+             has been posted
+        @param mb_data: microblog data of the publication
+        @return: URL to use in ``inReplyTo`` field
+        """
+        # FIXME: propose a protoXEP to properly get parent item, node and service
+
+        found_items = await self.host.memory.storage.search_pubsub_items({
+            "profiles": [client.profile],
+            "names": [parent_item]
+        })
+        if not found_items:
+            log.warning(f"parent item {parent_item!r} not found in cache")
+            parent_ap_account = ap_account
+        elif len(found_items) == 1:
+            cached_node = found_items[0].node
+            parent_ap_account = await self.get_ap_account_from_jid_and_node(
+                cached_node.service,
+                cached_node.name
+            )
+        else:
+            # we found several cached item with given ID, we check if there is one
+            # corresponding to this author
+            try:
+                author = jid.JID(mb_data["author_jid"]).userhostJID()
+                cached_item = next(
+                    i for i in found_items
+                    if jid.JID(i.data["publisher"]).userhostJID()
+                    == author
+                )
+            except StopIteration:
+                # no item corresponding to this author, we use ap_account
+                log.warning(
+                    "Can't find a single cached item for parent item "
+                    f"{parent_item!r}"
+                )
+                parent_ap_account = ap_account
+            else:
+                cached_node = cached_item.node
+                parent_ap_account = await self.get_ap_account_from_jid_and_node(
+                    cached_node.service,
+                    cached_node.name
+                )
+
+        return self.build_apurl(
+            TYPE_ITEM, parent_ap_account, parent_item
+        )
+
+    async def repeated_mb_2_ap_item(
+        self,
+        mb_data: dict
+    ) -> dict:
+        """Convert repeated blog item to suitable AP Announce activity
+
+        @param mb_data: microblog metadata of an item repeating an other blog post
+        @return: Announce activity linking to the repeated item
+        """
+        repeated = mb_data["extra"]["repeated"]
+        repeater = jid.JID(repeated["by"])
+        repeater_account = await self.get_ap_account_from_jid_and_node(
+            repeater,
+            None
+        )
+        repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account)
+        repeated_uri = repeated["uri"]
+
+        if not repeated_uri.startswith("xmpp:"):
+            log.warning(
+                "Only xmpp: URL are handled for repeated item at the moment, ignoring "
+                f"item {mb_data}"
+            )
+            raise NotImplementedError
+        parsed_url = uri.parse_xmpp_uri(repeated_uri)
+        if parsed_url["type"] != "pubsub":
+            log.warning(
+                "Only pubsub URL are handled for repeated item at the moment, ignoring "
+                f"item {mb_data}"
+            )
+            raise NotImplementedError
+        rep_service = jid.JID(parsed_url["path"])
+        rep_item = parsed_url["item"]
+        activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"])
+
+        if self.is_virtual_jid(rep_service):
+            # it's an AP actor linked through this gateway
+            # in this case we can simply use the item ID
+            if not rep_item.startswith("https:"):
+                log.warning(
+                    f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n"
+                    f"{mb_data}"
+                )
+            announced_uri = rep_item
+            repeated_account = self._e.unescape(rep_service.user)
+        else:
+            # the repeated item is an XMPP publication, we build the corresponding ID
+            rep_node = parsed_url["node"]
+            repeated_account = await self.get_ap_account_from_jid_and_node(
+                rep_service, rep_node
+            )
+            announced_uri = self.build_apurl("item", repeated_account, rep_item)
+
+        announce = self.create_activity(
+            "Announce", repeater_id, announced_uri, activity_id=activity_id
+        )
+        announce["to"] = [NS_AP_PUBLIC]
+        announce["cc"] = [
+            self.build_apurl(TYPE_FOLLOWERS, repeater_account),
+            await self.get_ap_actor_id_from_account(repeated_account)
+        ]
+        return announce
+
+    async def mb_data_2_ap_item(
+        self,
+        client: SatXMPPEntity,
+        mb_data: dict,
+        public: bool =True,
+        is_new: bool = True,
+    ) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item
+
+        @param mb_data: microblog data (as used in plugin XEP-0277) to convert
+            If ``public`` is True, ``service`` and ``node`` keys must be set.
+            If ``published`` is not set, current datetime will be used
+        @param public: True if the message is not a private/direct one
+            if True, the AP Item will be marked as public, and AP followers of target AP
+            account (which retrieve from ``service``) will be put in ``cc``.
+            ``inReplyTo`` will also be set if suitable
+            if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
+            This is usually used for direct messages.
+        @param is_new: if True, the item is a new one (no instance has been found in
+            cache).
+            If True, a "Create" activity will be generated, otherwise an "Update" one will
+            be.
+        @return: Activity item
+        """
+        extra = mb_data.get("extra", {})
+        if "repeated" in extra:
+            return await self.repeated_mb_2_ap_item(mb_data)
+        if not mb_data.get("id"):
+            mb_data["id"] = shortuuid.uuid()
+        if not mb_data.get("author_jid"):
+            mb_data["author_jid"] = client.jid.userhost()
+        ap_account = await self.get_ap_account_from_jid_and_node(
+            jid.JID(mb_data["author_jid"]),
+            None
+        )
+        url_actor = self.build_apurl(TYPE_ACTOR, ap_account)
+        url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"])
+        ap_object = {
+            "id": url_item,
+            "type": "Note",
+            "published": utils.xmpp_date(mb_data.get("published")),
+            "attributedTo": url_actor,
+            "content": mb_data.get("content_xhtml") or mb_data["content"],
+        }
+
+        language = mb_data.get("language")
+        if language:
+            ap_object["contentMap"] = {language: ap_object["content"]}
+
+        attachments = extra.get(C.KEY_ATTACHMENTS)
+        if attachments:
+            ap_attachments = ap_object["attachment"] = []
+            for attachment in attachments:
+                try:
+                    url = next(
+                        s['url'] for s in attachment["sources"] if 'url' in s
+                    )
+                except (StopIteration, KeyError):
+                    log.warning(
+                        f"Ignoring attachment without URL: {attachment}"
+                    )
+                    continue
+                ap_attachment = {
+                    "url": url
+                }
+                for key, ap_key in (
+                    ("media_type", "mediaType"),
+                    # XXX: yes "name", cf. [ap_item_2_mb_data]
+                    ("desc", "name"),
+                ):
+                    value = attachment.get(key)
+                    if value:
+                        ap_attachment[ap_key] = value
+                ap_attachments.append(ap_attachment)
+
+        if public:
+            ap_object["to"] = [NS_AP_PUBLIC]
+            if self.auto_mentions:
+                for m in RE_MENTION.finditer(ap_object["content"]):
+                    mention = m.group()
+                    mentioned = mention[1:]
+                    __, m_host = mentioned.split("@", 1)
+                    if m_host in (self.public_url, self.client.jid.host):
+                        # we ignore mention of local users, they should be sent as XMPP
+                        # references
+                        continue
+                    try:
+                        mentioned_id = await self.get_ap_actor_id_from_account(mentioned)
+                    except Exception as e:
+                        log.warning(f"Can't add mention to {mentioned!r}: {e}")
+                    else:
+                        ap_object["to"].append(mentioned_id)
+                        ap_object.setdefault("tag", []).append({
+                            "type": TYPE_MENTION,
+                            "href": mentioned_id,
+                            "name": mention,
+                        })
+            try:
+                node = mb_data["node"]
+                service = jid.JID(mb_data["service"])
+            except KeyError:
+                # node and service must always be specified when this method is used
+                raise exceptions.InternalError(
+                    "node or service is missing in mb_data"
+                )
+            target_ap_account = await self.get_ap_account_from_jid_and_node(
+                service, node
+            )
+            if self.is_virtual_jid(service):
+                # service is a proxy JID for AP account
+                actor_data = await self.get_ap_actor_data_from_account(target_ap_account)
+                followers = actor_data.get("followers")
+            else:
+                # service is a real XMPP entity
+                followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account)
+            if followers:
+                ap_object["cc"] = [followers]
+            if self._m.is_comment_node(node):
+                parent_item = self._m.get_parent_item(node)
+                if self.is_virtual_jid(service):
+                    # the publication is on a virtual node (i.e. an XMPP node managed by
+                    # this gateway and linking to an ActivityPub actor)
+                    ap_object["inReplyTo"] = parent_item
+                else:
+                    # the publication is from a followed real XMPP node
+                    ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node(
+                        client,
+                        ap_account,
+                        parent_item,
+                        mb_data
+                    )
+
+        return self.create_activity(
+            "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
+        )
+
+    async def publish_message(
+        self,
+        client: SatXMPPEntity,
+        mess_data: dict,
+        service: jid.JID
+    ) -> None:
+        """Send an AP message
+
+        .. note::
+
+            This is a temporary method used for development only
+
+        @param mess_data: message data. Following keys must be set:
+
+            ``node``
+              identifier of message which is being replied (this will
+              correspond to pubsub node in the future)
+
+            ``content_xhtml`` or ``content``
+              message body (respectively in XHTML or plain text)
+
+        @param service: JID corresponding to the AP actor.
+        """
+        if not service.user:
+            raise ValueError("service must have a local part")
+        account = self._e.unescape(service.user)
+        ap_actor_data = await self.get_ap_actor_data_from_account(account)
+
+        try:
+            inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
+        except KeyError:
+            raise exceptions.DataError("Can't get ActivityPub actor inbox")
+
+        item_data = await self.mb_data_2_ap_item(client, mess_data)
+        url_actor = item_data["actor"]
+        resp = await self.sign_and_post(inbox_url, url_actor, item_data)
+
+    async def ap_delete_item(
+        self,
+        jid_: jid.JID,
+        node: Optional[str],
+        item_id: str,
+        public: bool = True
+    ) -> Tuple[str, Dict[str, Any]]:
+        """Build activity to delete an AP item
+
+        @param jid_: JID of the entity deleting an item
+        @param node: node where the item is deleted
+            None if it's microblog or a message
+        @param item_id: ID of the item to delete
+            it's the Pubsub ID or message's origin ID
+        @param public: if True, the activity will be addressed to public namespace
+        @return: actor_id of the entity deleting the item, activity to send
+        """
+        if node is None:
+            node = self._m.namespace
+
+        author_account = await self.get_ap_account_from_jid_and_node(jid_, node)
+        author_actor_id = self.build_apurl(TYPE_ACTOR, author_account)
+
+        items = await self.host.memory.storage.search_pubsub_items({
+            "profiles": [self.client.profile],
+            "services": [jid_],
+            "names": [item_id]
+        })
+        if not items:
+            log.warning(
+                f"Deleting an unknown item at service {jid_}, node {node} and id "
+                f"{item_id}"
+            )
+        else:
+            try:
+                mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node)
+                if "repeated" in mb_data["extra"]:
+                    # we are deleting a repeated item, we must translate this to an
+                    # "Undo" of the "Announce" activity instead of a "Delete" one
+                    announce = await self.repeated_mb_2_ap_item(mb_data)
+                    undo = self.create_activity("Undo", author_actor_id, announce)
+                    return author_actor_id, undo
+            except Exception as e:
+                log.debug(
+                    f"Can't parse item, maybe it's not a blog item: {e}\n"
+                    f"{items[0].toXml()}"
+                )
+
+        url_item = self.build_apurl(TYPE_ITEM, author_account, item_id)
+        ap_item = self.create_activity(
+            "Delete",
+            author_actor_id,
+            {
+                "id": url_item,
+                "type": TYPE_TOMBSTONE
+            }
+        )
+        if public:
+            ap_item["to"] = [NS_AP_PUBLIC]
+        return author_actor_id, ap_item
+
+    def _message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """add the gateway workflow on post treatment"""
+        if self.client is None:
+            log.debug(f"no client set, ignoring message: {message_elt.toXml()}")
+            return True
+        post_treat.addCallback(
+            lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data))
+        )
+        return True
+
+    async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict:
+        """Called once message has been parsed
+
+        this method handle the conversion to AP items and posting
+        """
+        if client != self.client:
+            return mess_data
+        if mess_data["type"] not in ("chat", "normal"):
+            log.warning(f"ignoring message with unexpected type: {mess_data}")
+            return mess_data
+        if not self.is_local(mess_data["from"]):
+            log.warning(f"ignoring non local message: {mess_data}")
+            return mess_data
+        if not mess_data["to"].user:
+            log.warning(
+                f"ignoring message addressed to gateway itself: {mess_data}"
+            )
+            return mess_data
+
+        actor_account = self._e.unescape(mess_data["to"].user)
+        actor_id = await self.get_ap_actor_id_from_account(actor_account)
+        inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
+
+        try:
+            language, message = next(iter(mess_data["message"].items()))
+        except (KeyError, StopIteration):
+            log.warning(f"ignoring empty message: {mess_data}")
+            return mess_data
+
+        mb_data = {
+            "content": message,
+        }
+        if language:
+            mb_data["language"] = language
+        origin_id = mess_data["extra"].get("origin_id")
+        if origin_id:
+            # we need to use origin ID when present to be able to retract the message
+            mb_data["id"] = origin_id
+        attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS)
+        if attachments:
+            mb_data["extra"] = {
+                C.KEY_ATTACHMENTS: attachments
+            }
+
+        client = self.client.get_virtual_client(mess_data["from"])
+        ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
+        ap_object = ap_item["object"]
+        ap_object["to"] = ap_item["to"] = [actor_id]
+        # we add a mention to direct message, otherwise peer is not notified in some AP
+        # implementations (notably Mastodon), and the message may be missed easily.
+        ap_object.setdefault("tag", []).append({
+            "type": TYPE_MENTION,
+            "href": actor_id,
+            "name": f"@{actor_account}",
+        })
+
+        await self.sign_and_post(inbox, ap_item["actor"], ap_item)
+        return mess_data
+
+    async def _on_message_retract(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        retract_elt: domish.Element,
+        fastened_elts
+    ) -> bool:
+        if client != self.client:
+            return True
+        from_jid = jid.JID(message_elt["from"])
+        if not self.is_local(from_jid):
+            log.debug(
+                f"ignoring retract request from non local jid {from_jid}"
+            )
+            return False
+        to_jid = jid.JID(message_elt["to"])
+        if (to_jid.host != self.client.jid.full() or not to_jid.user):
+            # to_jid should be a virtual JID from this gateway
+            raise exceptions.InternalError(
+                f"Invalid destinee's JID: {to_jid.full()}"
+            )
+        ap_account = self._e.unescape(to_jid.user)
+        actor_id = await self.get_ap_actor_id_from_account(ap_account)
+        inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
+        url_actor, ap_item = await self.ap_delete_item(
+            from_jid.userhostJID(), None, fastened_elts.id, public=False
+        )
+        resp = await self.sign_and_post(inbox, url_actor, ap_item)
+        return False
+
+    async def _on_reference_received(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        reference_data: Dict[str, Union[str, int]]
+    ) -> bool:
+        parsed_uri: dict = reference_data.get("parsed_uri")
+        if not parsed_uri:
+            log.warning(f"no parsed URI available in reference {reference_data}")
+            return False
+
+        try:
+            mentioned = jid.JID(parsed_uri["path"])
+        except RuntimeError:
+            log.warning(f"invalid target: {reference_data['uri']}")
+            return False
+
+        if mentioned.host != self.client.jid.full() or not mentioned.user:
+            log.warning(
+                f"ignoring mentioned user {mentioned}, it's not a JID mapping an AP "
+                "account"
+            )
+            return False
+
+        ap_account = self._e.unescape(mentioned.user)
+        actor_id = await self.get_ap_actor_id_from_account(ap_account)
+
+        parsed_anchor: dict = reference_data.get("parsed_anchor")
+        if not parsed_anchor:
+            log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}")
+            return False
+
+        if parsed_anchor["type"] != "pubsub":
+            log.warning(
+                f"ignoring reference with non pubsub anchor, this is not supported: "
+                "{reference_data!r}"
+            )
+            return False
+
+        try:
+            pubsub_service = jid.JID(parsed_anchor["path"])
+        except RuntimeError:
+            log.warning(f"invalid anchor: {reference_data['anchor']}")
+            return False
+        pubsub_node = parsed_anchor.get("node")
+        if not pubsub_node:
+            log.warning(f"missing pubsub node in anchor: {reference_data['anchor']}")
+            return False
+        pubsub_item = parsed_anchor.get("item")
+        if not pubsub_item:
+            log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}")
+            return False
+
+        cached_node = await self.host.memory.storage.get_pubsub_node(
+            client, pubsub_service, pubsub_node
+        )
+        if not cached_node:
+            log.warning(f"Anchored node not found in cache: {reference_data['anchor']}")
+            return False
+
+        cached_items, __ = await self.host.memory.storage.get_items(
+            cached_node, item_ids=[pubsub_item]
+        )
+        if not cached_items:
+            log.warning(
+                f"Anchored pubsub item not found in cache: {reference_data['anchor']}"
+            )
+            return False
+
+        cached_item = cached_items[0]
+
+        mb_data = await self._m.item_2_mb_data(
+            client, cached_item.data, pubsub_service, pubsub_node
+        )
+        ap_item = await self.mb_data_2_ap_item(client, mb_data)
+        ap_object = ap_item["object"]
+        ap_object["to"] = [actor_id]
+        ap_object.setdefault("tag", []).append({
+            "type": TYPE_MENTION,
+            "href": actor_id,
+            "name": ap_account,
+        })
+
+        inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
+
+        resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item)
+
+        return False
+
+    async def new_reply_to_xmpp_item(
+        self,
+        client: SatXMPPEntity,
+        ap_item: dict,
+        targets: Dict[str, Set[str]],
+        mentions: List[Dict[str, str]],
+    ) -> None:
+        """We got an AP item which is a reply to an XMPP item"""
+        in_reply_to = ap_item["inReplyTo"]
+        url_type, url_args = self.parse_apurl(in_reply_to)
+        if url_type != "item":
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with an unexpected URL "
+                f"type({url_type!r}):\n{pformat(ap_item)}"
+            )
+            return
+        try:
+            parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:])
+        except (IndexError, ValueError):
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
+                f"({in_reply_to!r}):\n{pformat(ap_item)}"
+            )
+            return
+        parent_item_service, parent_item_node = await self.get_jid_and_node(
+            parent_item_account
+        )
+        if parent_item_node is None:
+            parent_item_node = self._m.namespace
+        items, __ = await self._p.get_items(
+            client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
+        )
+        try:
+            parent_item_elt = items[0]
+        except IndexError:
+            log.warning(
+                f"Can't find parent item at {parent_item_service} (node "
+                f"{parent_item_node!r})\n{pformat(ap_item)}")
+            return
+        parent_item_parsed = await self._m.item_2_mb_data(
+            client, parent_item_elt, parent_item_service, parent_item_node
+        )
+        try:
+            comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
+            comment_node = parent_item_parsed["comments"][0]["node"]
+        except (KeyError, IndexError):
+            # we don't have a comment node set for this item
+            from libervia.backend.tools.xml_tools import pp_elt
+            log.info(f"{pp_elt(parent_item_elt.toXml())}")
+            raise NotImplementedError()
+        else:
+            __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
+            await self._p.publish(client, comment_service, comment_node, [item_elt])
+            await self.notify_mentions(
+                targets, mentions, comment_service, comment_node, item_elt["id"]
+            )
+
+    def get_ap_item_targets(
+        self,
+        item: Dict[str, Any]
+    ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]:
+        """Retrieve targets of an AP item, and indicate if it's a public one
+
+        @param item: AP object payload
+        @return: Are returned:
+            - is_public flag, indicating if the item is world-readable
+            - a dict mapping target type to targets
+        """
+        targets: Dict[str, Set[str]] = {}
+        is_public = False
+        # TODO: handle "audience"
+        for key in ("to", "bto", "cc", "bcc"):
+            values = item.get(key)
+            if not values:
+                continue
+            if isinstance(values, str):
+                values = [values]
+            for value in values:
+                if value in PUBLIC_TUPLE:
+                    is_public = True
+                    continue
+                if not value:
+                    continue
+                if not self.is_local_url(value):
+                    continue
+                target_type = self.parse_apurl(value)[0]
+                if target_type != TYPE_ACTOR:
+                    log.debug(f"ignoring non actor type as a target: {href}")
+                else:
+                    targets.setdefault(target_type, set()).add(value)
+
+        mentions = []
+        tags = item.get("tag")
+        if tags:
+            for tag in tags:
+                if tag.get("type") != TYPE_MENTION:
+                    continue
+                href = tag.get("href")
+                if not href:
+                    log.warning('Missing "href" field from mention object: {tag!r}')
+                    continue
+                if not self.is_local_url(href):
+                    continue
+                uri_type = self.parse_apurl(href)[0]
+                if uri_type != TYPE_ACTOR:
+                    log.debug(f"ignoring non actor URI as a target: {href}")
+                    continue
+                mention = {"uri": href}
+                mentions.append(mention)
+                name = tag.get("name")
+                if name:
+                    mention["content"] = name
+
+        return is_public, targets, mentions
+
+    async def new_ap_item(
+        self,
+        client: SatXMPPEntity,
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+    ) -> None:
+        """Analyse, cache and send notification for received AP item
+
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        """
+        is_public, targets, mentions = self.get_ap_item_targets(item)
+        if not is_public and targets.keys() == {TYPE_ACTOR}:
+            # this is a direct message
+            await self.handle_message_ap_item(
+                client, targets, mentions, destinee, item
+            )
+        else:
+            await self.handle_pubsub_ap_item(
+                client, targets, mentions, destinee, node, item, is_public
+            )
+
+    async def handle_message_ap_item(
+        self,
+        client: SatXMPPEntity,
+        targets: Dict[str, Set[str]],
+        mentions: List[Dict[str, str]],
+        destinee: Optional[jid.JID],
+        item: dict,
+    ) -> None:
+        """Parse and deliver direct AP items translating to XMPP messages
+
+        @param targets: actors where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param item: AP object payload
+        """
+        targets_jids = {
+            await self.get_jid_from_id(t)
+            for t_set in targets.values()
+            for t in t_set
+        }
+        if destinee is not None:
+            targets_jids.add(destinee)
+        mb_data = await self.ap_item_2_mb_data(item)
+        extra = {
+            "origin_id": mb_data["id"]
+        }
+        attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS)
+        if attachments:
+            extra[C.KEY_ATTACHMENTS] = attachments
+
+        defer_l = []
+        for target_jid in targets_jids:
+            defer_l.append(
+                client.sendMessage(
+                    target_jid,
+                    {'': mb_data.get("content", "")},
+                    mb_data.get("title"),
+                    extra=extra
+                )
+            )
+        await defer.DeferredList(defer_l)
+
+    async def notify_mentions(
+        self,
+        targets: Dict[str, Set[str]],
+        mentions: List[Dict[str, str]],
+        service: jid.JID,
+        node: str,
+        item_id: str,
+    ) -> None:
+        """Send mention notifications to recipients and mentioned entities
+
+        XEP-0372 (References) is used.
+
+        Mentions are also sent to recipients as they are primary audience (see
+        https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes).
+
+        """
+        anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id)
+        seen = set()
+        # we start with explicit mentions because mentions' content will be used in the
+        # future to fill "begin" and "end" reference attributes (we can't do it at the
+        # moment as there is no way to specify the XML element to use in the blog item).
+        for mention in mentions:
+            mentioned_jid = await self.get_jid_from_id(mention["uri"])
+            self._refs.send_reference(
+                self.client,
+                to_jid=mentioned_jid,
+                anchor=anchor
+            )
+            seen.add(mentioned_jid)
+
+        remaining = {
+            await self.get_jid_from_id(t)
+            for t_set in targets.values()
+            for t in t_set
+        } - seen
+        for target in remaining:
+            self._refs.send_reference(
+                self.client,
+                to_jid=target,
+                anchor=anchor
+            )
+
+    async def handle_pubsub_ap_item(
+        self,
+        client: SatXMPPEntity,
+        targets: Dict[str, Set[str]],
+        mentions: List[Dict[str, str]],
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+        public: bool
+    ) -> None:
+        """Analyse, cache and deliver AP items translating to Pubsub
+
+        @param targets: actors/collections where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        @param public: True if the item is public
+        """
+        # XXX: "public" is not used for now
+        service = client.jid
+        in_reply_to = item.get("inReplyTo")
+
+        if in_reply_to and isinstance(in_reply_to, list):
+            in_reply_to = in_reply_to[0]
+        if in_reply_to and isinstance(in_reply_to, str):
+            if self.is_local_url(in_reply_to):
+                # this is a reply to an XMPP item
+                await self.new_reply_to_xmpp_item(client, item, targets, mentions)
+                return
+
+            # this item is a reply to an AP item, we use or create a corresponding node
+            # for comments
+            parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to)
+            node = parent_node or node
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, node, with_subscriptions=True, create=True,
+                create_kwargs={"subscribed": True}
+            )
+        else:
+            # it is a root item (i.e. not a reply to an other item)
+            create = node == self._events.namespace
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, node, with_subscriptions=True, create=create
+            )
+            if cached_node is None:
+                log.warning(
+                    f"Received item in unknown node {node!r} at {service}. This may be "
+                    f"due to a cache purge. We synchronise the node\n{item}"
+
+                )
+                return
+        if item.get("type") == TYPE_EVENT:
+            data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
+        else:
+            data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
+        await self.host.memory.storage.cache_pubsub_items(
+            client,
+            cached_node,
+            [item_elt],
+            [data]
+        )
+
+        for subscription in cached_node.subscriptions:
+            if subscription.state != SubscriptionState.SUBSCRIBED:
+                continue
+            self.pubsub_service.notifyPublish(
+                service,
+                node,
+                [(subscription.subscriber, None, [item_elt])]
+            )
+
+        await self.notify_mentions(targets, mentions, service, node, item_elt["id"])
+
+    async def new_ap_delete_item(
+        self,
+        client: SatXMPPEntity,
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+    ) -> None:
+        """Analyse, cache and send notification for received AP item
+
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param activity: parent AP activity
+        @param item: AP object payload
+            only the "id" field is used
+        """
+        item_id = item.get("id")
+        if not item_id:
+            raise exceptions.DataError('"id" attribute is missing in item')
+        if not item_id.startswith("http"):
+            raise exceptions.DataError(f"invalid id: {item_id!r}")
+        if self.is_local_url(item_id):
+            raise ValueError("Local IDs should not be used")
+
+        # we have no way to know if a deleted item is a direct one (thus a message) or one
+        # converted to pubsub. We check if the id is in message history to decide what to
+        # do.
+        history = await self.host.memory.storage.get(
+            client,
+            History,
+            History.origin_id,
+            item_id,
+            (History.messages, History.subjects)
+        )
+
+        if history is not None:
+            # it's a direct message
+            if history.source_jid != client.jid:
+                log.warning(
+                    f"retraction received from an entity ''{client.jid}) which is "
+                    f"not the original sender of the message ({history.source_jid}), "
+                    "hack attemps?"
+                )
+                raise exceptions.PermissionError("forbidden")
+
+            await self._r.retract_by_history(client, history)
+        else:
+            # no history in cache with this ID, it's probably a pubsub item
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, client.jid, node, with_subscriptions=True
+            )
+            if cached_node is None:
+                log.warning(
+                    f"Received an item retract for node {node!r} at {client.jid} "
+                    "which is not cached"
+                )
+                raise exceptions.NotFound
+            await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id])
+            # notifyRetract is expecting domish.Element instances
+            item_elt = domish.Element((None, "item"))
+            item_elt["id"] = item_id
+            for subscription in cached_node.subscriptions:
+                if subscription.state != SubscriptionState.SUBSCRIBED:
+                    continue
+                self.pubsub_service.notifyRetract(
+                    client.jid,
+                    node,
+                    [(subscription.subscriber, None, [item_elt])]
+                )