changeset 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 658ddbabaf36
children fa3dc4ed7906
files sat/core/exceptions.py sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/http_server.py sat/plugins/plugin_comp_ap_gateway/pubsub_service.py
diffstat 5 files changed, 1146 insertions(+), 189 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/exceptions.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/core/exceptions.py	Tue Mar 22 17:00:42 2022 +0100
@@ -123,6 +123,11 @@
     pass
 
 
+class EncryptionError(Exception):
+    """Invalid encryption"""
+    pass
+
+
 # Something which need to be done is not available yet
 class NotReady(Exception):
     pass
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue Mar 22 17:00:42 2022 +0100
@@ -17,25 +17,28 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import base64
+import calendar
 import hashlib
 import json
 from pathlib import Path
-from typing import Optional, Dict, Tuple, List, Union
+from pprint import pformat
+import re
+from typing import Any, Dict, List, Optional, Tuple, Union, overload
 from urllib import parse
-import calendar
-import re
 
-import dateutil
+from cryptography.exceptions import InvalidSignature
 from cryptography.hazmat.primitives import serialization
 from cryptography.hazmat.primitives import hashes
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives.asymmetric import padding
+import dateutil
 import shortuuid
+from sqlalchemy.exc import IntegrityError
 import treq
 from treq.response import _Response as TReqResponse
 from twisted.internet import defer, reactor, threads
 from twisted.web import http
-from twisted.words.protocols.jabber import jid, error
+from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
 from wokkel import rsm
 
@@ -44,12 +47,25 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
 from sat.tools import utils
-from sat.tools.common import data_format, tls
+from sat.tools.common import data_format, tls, uri
 from sat.tools.common.async_utils import async_lru
 
-from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP,
-                       AP_MB_MAP, LRU_MAX_SIZE)
+from .constants import (
+    ACTIVITY_OBJECT_MANDATORY,
+    ACTIVITY_TARGET_MANDATORY,
+    ACTIVITY_TYPES,
+    ACTIVITY_TYPES_LOWER,
+    AP_MB_MAP,
+    COMMENTS_MAX_PARENTS,
+    CONF_SECTION,
+    IMPORT_NAME,
+    LRU_MAX_SIZE,
+    MEDIA_TYPE_AP,
+    TYPE_ACTOR,
+    TYPE_ITEM,
+)
 from .http_server import HTTPServer
 from .pubsub_service import APPubsubService
 
@@ -64,7 +80,7 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
     C.PI_HANDLER: C.BOOL_TRUE,
@@ -86,6 +102,9 @@
         self.initialised = False
         self._m = host.plugins["XEP-0277"]
         self._p = host.plugins["XEP-0060"]
+        self._e = host.plugins["XEP-0106"]
+        self._c = host.plugins["PUBSUB_CACHE"]
+        self.pubsub_service = APPubsubService(self)
 
         host.bridge.addMethod(
             "APSend",
@@ -97,7 +116,7 @@
         )
 
     def getHandler(self, __):
-        return APPubsubService(self)
+        return self.pubsub_service
 
     async def init(self, client):
         if self.initialised:
@@ -165,10 +184,13 @@
                 'bad ap-gateay http_connection_type, you must use one of "http" or '
                 '"https"'
             )
-        self.max_items = self.host.memory.getConfig(
+        self.max_items = int(self.host.memory.getConfig(
             CONF_SECTION, 'new_node_max_items', 50
 
-        )
+        ))
+        self.comments_max_depth = int(self.host.memory.getConfig(
+            CONF_SECTION, 'comments_max_depth', 0
+        ))
         self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
         self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
         # True (default) if we provide gateway only to entities/services from our server
@@ -191,6 +213,25 @@
         self.client = client
         await self.init(client)
 
+    async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
+        """Get client for this component with a specified jid
+
+        This is needed to perform operations with the virtual JID corresponding to the AP
+        actor instead of the JID of the gateway itself.
+        @param actor_id: ID of the actor
+        @return: virtual client
+        """
+        account = await self.getAPAccountFromId(actor_id)
+        local_jid = self.getLocalJIDFromAccount(account)
+        return self.client.getVirtualClient(local_jid)
+
+    def isActivity(self, data: dict) -> bool:
+        """Return True if the data has an activity type"""
+        try:
+            return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
+        except (KeyError, TypeError):
+            return False
+
     async def apGet(self, url: str) -> dict:
         """Retrieve AP JSON from given URL
 
@@ -514,15 +555,30 @@
 
         return jid_, node
 
-    def parseAPURL(self, url: str) -> Tuple[str, str]:
+    def getLocalJIDFromAccount(self, account: str) -> jid.JID:
+        """Compute JID linking to an AP account
+
+        The local jid is computer by escaping AP actor handle and using it as local part
+        of JID, where domain part is this gateway own JID
+        """
+        return jid.JID(
+            None,
+            (
+                self._e.escape(account),
+                self.client.jid.host,
+                None
+            )
+        )
+
+    def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
         """Parse an URL leading to an AP endpoint
 
         @param url: URL to parse (schema is not mandatory)
-        @return: endpoint type and AP account
+        @return: endpoint type and extra arguments
         """
         path = parse.urlparse(url).path.lstrip("/")
-        type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
-        return type_, parse.unquote(account)
+        type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/", 1)
+        return type_, [parse.unquote(a) for a in extra_args]
 
     def buildAPURL(self, type_:str , *args: str) -> str:
         """Build an AP endpoint URL
@@ -535,42 +591,199 @@
             str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
         )
 
-    async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
+    def buildSignatureHeader(self, values: Dict[str, str]) -> str:
+        """Build key="<value>" signature header from signature data"""
+        fields = []
+        for key, value in values.items():
+            if key not in ("(created)", "(expired)"):
+                if '"' in value:
+                    raise NotImplementedError(
+                        "string escaping is not implemented, double-quote can't be used "
+                        f"in {value!r}"
+                    )
+                value = f'"{value}"'
+            fields.append(f"{key}={value}")
+
+        return ",".join(fields)
+
+    def getDigest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
+        """Get digest data to use in header and signature
+
+        @param body: body of the request
+        @return: hash name and digest
+        """
+        if algo != "SHA-256":
+            raise NotImplementedError("only SHA-256 is implemented for now")
+        return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getActorPubKeyData(
+        self,
+        actor_id: str
+    ) -> Tuple[str, str, rsa.RSAPublicKey]:
+        """Retrieve Public Key data from actor ID
+
+        @param actor_id: actor ID (url)
+        @return: key_id, owner and public_key
+        @raise KeyError: publicKey is missing from actor data
+        """
+        actor_data = await self.apGet(actor_id)
+        pub_key_data = actor_data["publicKey"]
+        key_id = pub_key_data["id"]
+        owner = pub_key_data["owner"]
+        pub_key_pem = pub_key_data["publicKeyPem"]
+        pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
+        return key_id, owner, pub_key
+
+    def createActivity(
+        self,
+        activity: str,
+        actor_id: str,
+        object_: Optional[Union[str, dict]] = None,
+        target: Optional[Union[str, dict]] = None,
+        **kwargs,
+    ) -> Dict[str, Any]:
+        """Generate base data for an activity
+
+        @param activity: one of ACTIVITY_TYPES
+        """
+        if activity not in ACTIVITY_TYPES:
+            raise exceptions.InternalError(f"invalid activity: {activity!r}")
+        if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
+            raise exceptions.InternalError(
+                f'"object_" is mandatory for activity {activity!r}'
+            )
+        if target is None and activity in ACTIVITY_TARGET_MANDATORY:
+            raise exceptions.InternalError(
+                f'"target" is mandatory for activity {activity!r}'
+            )
+        activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
+        data: Dict[str, Any] = {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "actor": actor_id,
+            "id": activity_id,
+            "type": activity,
+        }
+        data.update(kwargs)
+        if object_ is not None:
+            data["object"] = object_
+        if target is not None:
+            data["target"] = target
+
+        return data
+
+    def getKeyId(self, actor_id: str) -> str:
+        """Get local key ID from actor ID"""
+        return f"{actor_id}#main-key"
+
+    async def checkSignature(
+        self,
+        signature: str,
+        key_id: str,
+        headers: Dict[str, str]
+    ) -> str:
+        """Verify that signature matches given headers
+
+        see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
+
+        @param signature: Base64 encoded signature
+        @param key_id: ID of the key used to sign the data
+        @param headers: headers and their values, including pseudo-headers
+        @return: id of the signing actor
+
+        @raise InvalidSignature: signature doesn't match headers
+        """
+        to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
+        if key_id.startswith("acct:"):
+            actor = key_id[5:]
+            actor_id = await self.getAPActorIdFromAccount(actor)
+        else:
+            actor_id = key_id.split("#", 1)[0]
+
+        pub_key_id, pub_key_owner, pub_key = await self.getActorPubKeyData(actor_id)
+        if pub_key_id != key_id or pub_key_owner != actor_id:
+            raise exceptions.EncryptionError("Public Key mismatch")
+
+        try:
+            pub_key.verify(
+                base64.b64decode(signature),
+                to_sign.encode(),
+                # we have to use PKCS1v15 padding to be compatible with Mastodon
+                padding.PKCS1v15(),  # type: ignore
+                hashes.SHA256()  # type: ignore
+            )
+        except InvalidSignature:
+            raise exceptions.EncryptionError("Invalid signature (using PKC0S1 v1.5 and SHA-256)")
+
+        return actor_id
+
+    def getSignatureData(
+            self,
+            key_id: str,
+            headers: Dict[str, str]
+    ) -> Tuple[Dict[str, str], Dict[str, str]]:
+        """Generate and return signature and corresponding headers
+
+        @param parsed_url: URL where the request is sent/has been received
+        @param key_id: ID of the key (URL linking to the data with public key)
+        @param date: HTTP datetime string of signature generation
+        @param body: body of the HTTP request
+        @param headers: headers to sign and their value:
+            default value will be used if not specified
+
+        @return: headers and signature data
+            ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
+            removed, and ``Signature`` added.
+        """
+        to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
+        signature = base64.b64encode(self.private_key.sign(
+            to_sign.encode(),
+            # we have to use PKCS1v15 padding to be compatible with Mastodon
+            padding.PKCS1v15(),  # type: ignore
+            hashes.SHA256()  # type: ignore
+        )).decode()
+        sign_data = {
+            "keyId": key_id,
+            "Algorithm": "rsa-sha256",
+            "headers": " ".join(headers.keys()),
+            "signature": signature
+        }
+        new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
+        new_headers["Signature"] = self.buildSignatureHeader(sign_data)
+        return new_headers, sign_data
+
+    async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
         """Sign a documentent and post it to AP server
 
         @param url: AP server endpoint
-        @param url_actor: URL generated by this gateway for local actor
+        @param actor_id: originating actor ID (URL)
         @param doc: document to send
         """
         p_url = parse.urlparse(url)
-        date = http.datetimeToString().decode()
         body = json.dumps(doc).encode()
-        digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
-        digest = f"sha-256={digest_hash}"
-        to_sign = (
-            f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
-            f"date: {date}\ndigest: {digest}"
+        digest_algo, digest_hash = self.getDigest(body)
+        digest = f"{digest_algo}={digest_hash}"
+
+        headers = {
+            "(request-target)": f"post {p_url.path}",
+            "Host": p_url.hostname,
+            "Date": http.datetimeToString().decode(),
+            "Digest": digest
+        }
+        headers, __ = self.getSignatureData(self.getKeyId(actor_id), headers)
+
+        headers["Content-Type"] = (
+            'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'
         )
-        signature = base64.b64encode(self.private_key.sign(
-            to_sign.encode(),
-            # we have to use PKCS1v15 padding to be compatible with Mastodon
-            padding.PKCS1v15(),
-            hashes.SHA256()
-        )).decode()
-        h_signature = (
-            f'keyId="{url_actor}",headers="(request-target) host date digest",'
-            f'signature="{signature}"'
-        )
-        return await treq.post(
+        resp = await treq.post(
             url,
             body,
-            headers={
-                "Host": [p_url.hostname],
-                "Date": [date],
-                "Digest": [digest],
-                "Signature": [h_signature],
-            }
+            headers=headers,
         )
+        if resp.code >= 400:
+            text = await resp.text()
+            log.warning(f"POST request to {url} failed: {text}")
+        return resp
 
     def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
         mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
@@ -578,6 +791,7 @@
         client = self.host.getClient(profile)
         return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
 
+    @async_lru(maxsize=LRU_MAX_SIZE)
     async def getAPActorIdFromAccount(self, account: str) -> str:
         """Retrieve account ID from it's handle using WebFinger
 
@@ -611,7 +825,7 @@
             )
         return href
 
-    async def getAPActorDataFromId(self, account: str) -> dict:
+    async def getAPActorDataFromAccount(self, account: str) -> dict:
         """Retrieve ActivityPub Actor data
 
         @param account: ActivityPub Actor identifier
@@ -620,7 +834,13 @@
         return await self.apGet(href)
 
     @async_lru(maxsize=LRU_MAX_SIZE)
-    async def getAPAccountFromId(self, actor_id: str):
+    async def getAPInboxFromId(self, actor_id: str) -> str:
+        """Retrieve inbox of an actor_id"""
+        data = await self.apGet(actor_id)
+        return data["inbox"]
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getAPAccountFromId(self, actor_id: str) -> str:
         """Retrieve AP account from the ID URL
 
         @param actor_id: AP ID of the actor (URL to the actor data)
@@ -648,7 +868,7 @@
 
     async def getAPItems(
         self,
-        account: str,
+        collection: dict,
         max_items: Optional[int] = None,
         chronological_pagination: bool = True,
         after_id: Optional[str] = None,
@@ -675,21 +895,18 @@
         @return: XMPP Pubsub items and corresponding RSM Response
             Items are always returned in chronological order in the result
         """
-        actor_data = await self.getAPActorDataFromId(account)
-        outbox = actor_data.get("outbox")
         rsm_resp: Dict[str, Union[bool, int]] = {}
-        if not outbox:
-            raise exceptions.DataError(f"No outbox found for actor {account}")
-        outbox_data = await self.apGet(outbox)
         try:
-            count = outbox_data["totalItems"]
+            count = collection["totalItems"]
         except KeyError:
             log.warning(
-                f'"totalItems" not found in outbox of {account}, defaulting to 20'
+                f'"totalItems" not found in collection {collection.get("id")}, '
+                "defaulting to 20"
             )
             count = 20
         else:
-            log.info(f"{account}'s outbox has {count} item(s)")
+            log.info(f"{collection.get('id')} has {count} item(s)")
+
             rsm_resp["count"] = count
 
         if start_index is not None:
@@ -710,7 +927,7 @@
                 # before "start_index"
                 previous_index = start_index - 1
                 retrieved_items = 0
-                current_page = outbox_data["last"]
+                current_page = collection["last"]
                 while retrieved_items < count:
                     page_data, items = await self.parseAPPage(current_page)
                     if not items:
@@ -734,10 +951,11 @@
                         )
 
         init_page = "last" if chronological_pagination else "first"
-        page = outbox_data.get(init_page)
+        page = collection.get(init_page)
         if not page:
             raise exceptions.DataError(
-                f"Initial page {init_page!r} not found for outbox {outbox}"
+                f"Initial page {init_page!r} not found for collection "
+                f"{collection.get('id')})"
             )
         items = []
         page_items = []
@@ -746,6 +964,8 @@
 
         while retrieved_items < count:
             __, page_items = await self.parseAPPage(page)
+            if not page_items:
+                break
             retrieved_items += len(page_items)
             if after_id is not None and not found_after_id:
                 # if we have an after_id, we ignore all items until the requested one is
@@ -754,7 +974,8 @@
                     limit_idx = [i["id"] for i in page_items].index(after_id)
                 except ValueError:
                     # if "after_id" is not found, we don't add any item from this page
-                    log.debug(f"{after_id!r} not found at {page}, skipping")
+                    page_id = page.get("id") if isinstance(page, dict) else page
+                    log.debug(f"{after_id!r} not found at {page_id}, skipping")
                 else:
                     found_after_id = True
                     if chronological_pagination:
@@ -773,25 +994,24 @@
                 else:
                     items = items[-max_items:]
                 break
-            page = outbox_data.get("prev" if chronological_pagination else "next")
+            page = collection.get("prev" if chronological_pagination else "next")
             if not page:
                 break
 
         if after_id is not None and not found_after_id:
             raise error.StanzaError("item-not-found")
 
-        if after_id is None:
-            rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
-
-        if start_index is not None:
-            rsm_resp["index"] = start_index
-        elif after_id is not None:
-            log.warning("Can't determine index of first element")
-        elif chronological_pagination:
-            rsm_resp["index"] = 0
-        else:
-            rsm_resp["index"] = count - len(items)
         if items:
+            if after_id is None:
+                rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
+            if start_index is not None:
+                rsm_resp["index"] = start_index
+            elif after_id is not None:
+                log.warning("Can't determine index of first element")
+            elif chronological_pagination:
+                rsm_resp["index"] = 0
+            else:
+                rsm_resp["index"] = count - len(items)
             rsm_resp.update({
                 "first": items[0]["id"],
                 "last": items[-1]["id"]
@@ -799,55 +1019,119 @@
 
         return items, rsm.RSMResponse(**rsm_resp)
 
-    async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]:
+    async def parseAPPage(
+        self,
+        page: Union[str, dict]
+    ) -> Tuple[dict, List[domish.Element]]:
         """Convert AP objects from an AP page to XMPP items
 
-        @param url: url linking and AP page
+        @param page: Can be either url linking and AP page, or the page data directly
         @return: page data, pubsub items
         """
-        page_data = await self.apGet(url)
-        ap_items = page_data.get("orderedItems")
-        if not ap_items:
-            log.warning('No "orderedItems" collection found')
-            return page_data, []
+        page_data = await self.apGetObject(page)
+        if page_data is None:
+            log.warning('No data found in collection')
+            return {}, []
+        ap_items = await self.apGetList(page_data, "orderedItems")
+        if ap_items is None:
+            ap_items = await self.apGetList(page_data, "items")
+            if not ap_items:
+                log.warning(f'No item field found in collection: {page_data!r}')
+                return page_data, []
+            else:
+                log.warning(
+                    "Items are not ordered, this is not spec compliant"
+                )
         items = []
         # AP Collections are in antichronological order, but we expect chronological in
         # Pubsub, thus we reverse it
         for ap_item in reversed(ap_items):
             try:
-                ap_object, mb_data = await self.apItem2MBdata(ap_item)
+                items.append(await self.apItem2Elt(ap_item))
             except (exceptions.DataError, NotImplementedError, error.StanzaError):
                 continue
 
-            item_elt = await self._m.data2entry(
-                self.client, mb_data, ap_object["id"], None, self._m.namespace
-            )
-            item_elt["publisher"] = mb_data["author_jid"].full()
-            items.append(item_elt)
-
         return page_data, items
 
-    async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]:
-        """Convert AP item to microblog data
+    async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
+        """Convert AP item to parsed microblog data and corresponding item element"""
+        mb_data = await self.apItem2MBdata(ap_item)
+        item_elt = await self._m.data2entry(
+            self.client, mb_data, mb_data["id"], None, self._m.namespace
+        )
+        item_elt["publisher"] = mb_data["author_jid"]
+        return mb_data, item_elt
+
+    async def apItem2Elt(self, ap_item: dict) -> domish.Element:
+        """Convert AP item to XMPP item element"""
+        __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+        return item_elt
+
+    async def getCommentsNodes(
+        self,
+        item_id: str,
+        parent_id: Optional[str]
+    ) -> Tuple[Optional[str], Optional[str]]:
+        """Get node where this item is and node to use for comments
+
+        if config option "comments_max_depth" is set, a common node will be used below the
+        given depth
+        @param item_id: ID of the reference item
+        @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
+        @return: a tuple with parent_node_id, comments_node_id:
+            - parent_node_id is the ID of the node where reference item must be. None is
+              returned when the root node (i.e. not a comments node) must be used.
+            - comments_node_id: is the ID of the node to use for comments. None is
+              returned when no comment node must be used (happens when we have reached
+              "comments_max_depth")
+        """
+        if parent_id is None or not self.comments_max_depth:
+            return (
+                self._m.getCommentsNode(parent_id) if parent_id is not None else None,
+                self._m.getCommentsNode(item_id)
+            )
+        parent_url = parent_id
+        parents = []
+        for __ in range(COMMENTS_MAX_PARENTS):
+            parent_item = await self.apGet(parent_url)
+            parents.insert(0, parent_item)
+            parent_url = parent_item.get("inReplyTo")
+            if parent_url is None:
+                break
+        parent_limit = self.comments_max_depth-1
+        if len(parents) <= parent_limit:
+            return (
+                self._m.getCommentsNode(parents[-1]["id"]),
+                self._m.getCommentsNode(item_id)
+            )
+        else:
+            last_level_item = parents[parent_limit]
+            return (
+                self._m.getCommentsNode(last_level_item["id"]),
+                None
+            )
+
+    async def apItem2MBdata(self, ap_item: dict) -> dict:
+        """Convert AP activity or object to microblog data
 
         @return: AP Item's Object and microblog data
         @raise exceptions.DataError: something is invalid in the AP item
         @raise NotImplemented: some AP data is not handled yet
         @raise error.StanzaError: error while contacting the AP server
         """
-        ap_object = ap_item.get("object")
-        if not ap_object:
-            log.warning(f'No "object" found in AP item {ap_item!r}')
+        is_activity = self.isActivity(ap_item)
+        if is_activity:
+            ap_object = await self.apGetObject(ap_item, "object")
+            if not ap_object:
+                log.warning(f'No "object" found in AP item {ap_item!r}')
+                raise exceptions.DataError
+        else:
+            ap_object = ap_item
+        item_id = ap_object.get("id")
+        if not item_id:
+            log.warning(f'No "id" found in AP item: {ap_object!r}')
             raise exceptions.DataError
-        if isinstance(ap_object, str):
-            ap_object = await self.apGet(ap_object)
-        obj_id = ap_object.get("id")
-        if not obj_id:
-            log.warning(f'No "id" found in AP object: {ap_object!r}')
-            raise exceptions.DataError
-        if ap_object.get("inReplyTo") is not None:
-            raise NotImplementedError
-        mb_data = {}
+        mb_data = {"id": item_id}
         for ap_key, mb_key in AP_MB_MAP.items():
             data = ap_object.get(ap_key)
             if data is None:
@@ -868,37 +1152,20 @@
             mb_data["content_xhtml"] = content_xhtml
 
         # author
-        actor = ap_item.get("actor")
-        if not actor:
-            log.warning(f"no actor associated to object id {obj_id!r}")
-            raise exceptions.DataError
-        elif isinstance(actor, list):
-            # we only keep first item of list as author
+        if is_activity:
+            authors = await self.apGetActors(ap_item, "actor")
+        else:
+            authors = await self.apGetActors(ap_object, "attributedTo")
+        if len(authors) > 1:
+            # we only keep first item as author
             # TODO: handle multiple actors
-            if len(actor) > 1:
-                log.warning("multiple actors are not managed")
-            actor = actor[0]
+            log.warning("multiple actors are not managed")
 
-        if isinstance(actor, dict):
-            actor = actor.get("id")
-            if not actor:
-                log.warning(f"no actor id found: {actor!r}")
-                raise exceptions.DataError
+        account = authors[0]
+        author_jid = self.getLocalJIDFromAccount(account).full()
 
-        if isinstance(actor, str):
-            account = await self.getAPAccountFromId(actor)
-            mb_data["author"] = account.split("@", 1)[0]
-            author_jid = mb_data["author_jid"] = jid.JID(
-                None,
-                (
-                    self.host.plugins["XEP-0106"].escape(account),
-                    self.client.jid.host,
-                    None
-                )
-            )
-        else:
-            log.warning(f"unknown actor type found: {actor!r}")
-            raise exceptions.DataError
+        mb_data["author"] = account.split("@", 1)[0]
+        mb_data["author_jid"] = author_jid
 
         # published/updated
         for field in ("published", "updated"):
@@ -912,14 +1179,30 @@
                     )
                 except dateutil.parser.ParserError as e:
                     log.warning(f"Can't parse {field!r} field: {e}")
-        return ap_object, mb_data
+
+        # comments
+        in_reply_to = ap_object.get("inReplyTo")
+        __, comments_node = await self.getCommentsNodes(item_id, in_reply_to)
+        if comments_node is not None:
+            comments_data = {
+                "service": author_jid,
+                "node": comments_node,
+                "uri": uri.buildXMPPUri(
+                    "pubsub",
+                    path=author_jid,
+                    node=comments_node
+                )
+            }
+            mb_data["comments"] = [comments_data]
+
+        return mb_data
 
     async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
         """Convert Libervia Microblog Data to ActivityPub item"""
         if not mb_data.get("id"):
             mb_data["id"] = shortuuid.uuid()
         if not mb_data.get("author_jid"):
-            mb_data["author_jid"] = client.jid
+            mb_data["author_jid"] = client.jid.full()
         ap_account = await self.getAPAccountFromJidAndNode(
             jid.JID(mb_data["author_jid"]),
             None
@@ -967,8 +1250,8 @@
         """
         if not service.user:
             raise ValueError("service must have a local part")
-        account = self.host.plugins["XEP-0106"].unescape(service.user)
-        ap_actor_data = await self.getAPActorDataFromId(account)
+        account = self._e.unescape(service.user)
+        ap_actor_data = await self.getAPActorDataFromAccount(account)
 
         try:
             inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
@@ -980,3 +1263,72 @@
         resp = await self.signAndPost(inbox_url, url_actor, item_data)
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
+
+    async def newAPItem(
+        self,
+        client: SatXMPPEntity,
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+    ) -> None:
+        """Analyse, cache and send notification for received AP item
+
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        """
+        service = client.jid
+        in_reply_to = item.get("inReplyTo")
+        if in_reply_to and isinstance(in_reply_to, str):
+            # this item is a reply, we use or create a corresponding node for comments
+            parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
+            node = parent_node or node
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node, with_subscriptions=True
+            )
+            if cached_node is None:
+                try:
+                    cached_node = await self.host.memory.storage.setPubsubNode(
+                        client,
+                        service,
+                        node,
+                        subscribed=True
+                    )
+                except IntegrityError as e:
+                    if "unique" in str(e.orig).lower():
+                        # the node may already exist, if it has been created just after
+                        # getPubsubNode above
+                        log.debug("ignoring UNIQUE constraint error")
+                        cached_node = await self.host.memory.storage.getPubsubNode(
+                            client, service, node, with_subscriptions=True
+                        )
+                    else:
+                        raise e
+
+        else:
+            # it is a root item (i.e. not a reply to an other item)
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node, with_subscriptions=True
+            )
+            if cached_node is None:
+                log.warning(
+                    f"Received item in unknown node {node!r} at {service}\n{item}"
+
+                )
+                return
+        mb_data, item_elt = await self.apItem2MbDataAndElt(item)
+        await self.host.memory.storage.cachePubsubItems(
+            client,
+            cached_node,
+            [item_elt],
+            [mb_data]
+        )
+
+        for subscription in cached_node.subscriptions:
+            if subscription.state != SubscriptionState.SUBSCRIBED:
+                continue
+            self.pubsub_service.notifyPublish(
+                service,
+                node,
+                [(subscription.subscriber, None, [item_elt])]
+            )
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Tue Mar 22 17:00:42 2022 +0100
@@ -22,6 +22,7 @@
 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
 TYPE_ACTOR = "actor"
 TYPE_INBOX = "inbox"
+TYPE_SHARED_INBOX = "shared_inbox"
 TYPE_OUTBOX = "outbox"
 TYPE_ITEM = "item"
 MEDIA_TYPE_AP = "application/activity+json"
@@ -30,7 +31,39 @@
     "content": "content_xhtml",
 
 }
-AP_REQUEST_TYPES = {"actor", "outbox"}
+AP_REQUEST_TYPES = {
+    "GET": {"actor", "outbox"},
+    "POST": {"inbox"},
+}
+# headers to check for signature
+SIGN_HEADERS = {
+    # headers needed for all HTTP methods
+    None: [
+        # tuples are equivalent headers/pseudo headers, one of them must be present
+        ("date", "(created)"),
+        ("digest", "(request-target)"),
+    ],
+    b"GET": ["host"],
+    b"POST": ["digest"]
+}
 PAGE_SIZE = 10
+HS2019 = "hs2019"
+# delay after which a signed request is not accepted anymore
+SIGN_EXP = 12*60*60  # 12 hours (same value as for Mastodon)
 
 LRU_MAX_SIZE = 200
+ACTIVITY_TYPES = (
+    "Accept", "Add", "Announce", "Arrive", "Block", "Create", "Delete", "Dislike", "Flag",
+    "Follow", "Ignore", "Invite", "Join", "Leave", "Like", "Listen", "Move", "Offer",
+    "Question", "Reject", "Read", "Remove", "TentativeReject", "TentativeAccept",
+    "Travel", "Undo", "Update", "View"
+)
+ACTIVITY_TYPES_LOWER = [a.lower() for a in ACTIVITY_TYPES]
+ACTIVITY_OBJECT_MANDATORY = (
+    "Create", "Update", "Delete", "Follow", "Add", "Remove", "Like", "Block", "Undo"
+)
+ACTIVITY_TARGET_MANDATORY = ("Add", "Remove")
+# activities which can be used with Shared Inbox (i.e. with not account specified)
+ACTIVIY_NO_ACCOUNT_ALLOWED = ("create",)
+# maximum number of parents to retrieve when comments_max_depth option is set
+COMMENTS_MAX_PARENTS = 100
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Tue Mar 22 17:00:42 2022 +0100
@@ -16,14 +16,16 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import time
 from typing import Optional, Dict, List
 import json
 from urllib import parse
-import re
+from collections import deque
 import unicodedata
+from pprint import pformat
 
 from twisted.web import http, resource as web_resource, server
-from twisted.internet import defer
+from twisted.internet import reactor, defer
 from twisted.words.protocols.jabber import jid, error
 from wokkel import pubsub, rsm
 
@@ -31,9 +33,15 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.tools.common import date_utils
+from sat.memory.sqla_mapping import SubscriptionState
 
-from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX,
-                       AP_REQUEST_TYPES, PAGE_SIZE)
+from .constants import (
+    CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
+    AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
+    SIGN_HEADERS, HS2019, SIGN_EXP
+)
+from .regex import RE_SIG_PARAM
 
 
 log = getLogger(__name__)
@@ -50,8 +58,19 @@
 
     def __init__(self, ap_gateway):
         self.apg = ap_gateway
+        self._seen_digest = deque(maxlen=50)
         super().__init__()
 
+    def responseCode(
+        self,
+        request: "HTTPRequest",
+        http_code: int,
+        msg: Optional[str] = None
+    ) -> None:
+        """Log and set HTTP return code and associated message"""
+        log.warning(msg)
+        request.setResponseCode(http_code, None if msg is None else msg.encode())
+
     async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
         query = parse.parse_qs(url_parsed.query)
@@ -78,15 +97,127 @@
         request.write(json.dumps(resp).encode())
         request.finish()
 
+    async def handleFollowActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        try:
+            subscription = await self.apg._p.subscribe(
+                client,
+                account_jid,
+                node
+            )
+        except pubsub.SubscriptionPending:
+            log.info(f"subscription to node {node!r} of {account_jid} is pending")
+        # TODO: manage SubscriptionUnconfigured
+        else:
+            if subscription.state != "subscribed":
+                # other states should raise an Exception
+                raise exceptions.InternalError('"subscribed" state was expected')
+            inbox = await self.apg.getAPInboxFromId(signing_actor)
+            actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
+            accept_data = self.apg.createActivity(
+                "Accept", actor_id, object_=data
+            )
+            await self.apg.signAndPost(inbox, actor_id, accept_data)
+
+    async def handleAcceptActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            if type_ == "Follow":
+                follow_node = await self.apg.host.memory.storage.getPubsubNode(
+                    client, client.jid, node, with_subscriptions=True
+                )
+                if follow_node is None:
+                    log.warning(
+                        f"Received a follow accept on an unknown node: {node!r} at "
+                        f"{client.jid}. Ignoring it"
+                    )
+                    continue
+                try:
+                    sub = next(
+                        s for s in follow_node.subscriptions if s.subscriber==account_jid
+                    )
+                except StopIteration:
+                    log.warning(
+                        "Received a follow accept on a node without subscription: "
+                        f"{node!r} at {client.jid}. Ignoring it"
+                    )
+                else:
+                    if sub.state == SubscriptionState.SUBSCRIBED:
+                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
+                    elif sub.state == SubscriptionState.PENDING:
+                        follow_node.subscribed = True
+                        sub.state = SubscriptionState.SUBSCRIBED
+                        await self.apg.host.memory.storage.add(follow_node)
+                    else:
+                        raise exceptions.InternalError(
+                            f"Unhandled subscription state {sub.state!r}"
+                        )
+            else:
+                log.warning(f"Unmanaged accept type: {type_!r}")
+
+    async def handleCreateActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        digest = request.getHeader("digest")
+        if digest in self._seen_digest:
+            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
+            return
+        self._seen_digest.append(digest)
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            sender = await self.apg.apGetSenderActor(obj)
+            if sender != signing_actor:
+                log.warning(
+                    "Ignoring object not attributed to signing actor: {obj}"
+                )
+            else:
+                await self.apg.newAPItem(client, account_jid, node, obj)
+
     async def APActorRequest(
         self,
         request: "HTTPRequest",
         account_jid: jid.JID,
         node: Optional[str],
         ap_account: str,
-        actor_url: str
+        actor_url: str,
+        signing_actor: Optional[str]
     ) -> dict:
         inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
         outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
 
         # we have to use AP account as preferredUsername because it is used to retrieve
@@ -107,7 +238,10 @@
                 "id": f"{actor_url}#main-key",
                 "owner": actor_url,
                 "publicKeyPem": self.apg.public_key_pem
-            }
+            },
+            "endpoints": {
+                "sharedInbox": shared_inbox
+            },
         }
 
     def getCanonicalURL(self, request: "HTTPRequest") -> str:
@@ -206,7 +340,8 @@
         account_jid: jid.JID,
         node: Optional[str],
         ap_account: str,
-        ap_url: str
+        ap_url: str,
+        signing_actor: Optional[str]
     ) -> dict:
         if node is None:
             node = self.apg._m.namespace
@@ -229,7 +364,8 @@
                 service=account_jid,
                 node=node,
                 max_items=0,
-                rsm_request=rsm.RSMRequest(max_=0)
+                rsm_request=rsm.RSMRequest(max_=0),
+                extra = {C.KEY_USE_CACHE: False}
             )
         except error.StanzaError as e:
             log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
@@ -255,22 +391,268 @@
             "last": url_last_page,
         }
 
-    async def APRequest(self, request):
+    async def APInboxRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> None:
+        if signing_actor is None:
+            raise exceptions.InternalError("signing_actor must be set for inbox requests")
+        if node is None:
+            node = self.apg._m.namespace
+        try:
+            data = json.load(request.content)
+            if not isinstance(data, dict):
+                raise ValueError("data should be an object")
+        except (json.JSONDecodeError, ValueError) as e:
+            return self.responseCode(
+                request,
+                http.BAD_REQUEST,
+                f"invalid json in inbox request: {e}"
+            )
+        await self.checkSigningActor(data, signing_actor)
+        activity_type = (data.get("type") or "").lower()
+        if not activity_type in ACTIVITY_TYPES_LOWER:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"request is not an activity, ignoring"
+            )
+
+        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()!r} activity must target an account"
+            )
+
+        try:
+            method = getattr(self, f"handle{activity_type.title()}Activity")
+        except AttributeError:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()} activity is not yet supported"
+            )
+        else:
+            await method(
+                request, data, account_jid, node, ap_account, ap_url, signing_actor
+            )
+
+    async def APRequest(
+        self,
+        request: "HTTPRequest",
+        signing_actor: Optional[str] = None
+    ) -> None:
         path = request.path.decode()
         ap_url = parse.urljoin(
             f"https://{self.apg.public_url}",
             path
         )
-        request_type, ap_account = self.apg.parseAPURL(ap_url)
-        account_jid, node = await self.apg.getJIDAndNode(ap_account)
-        if request_type not in AP_REQUEST_TYPES:
-            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
-        method = getattr(self, f"AP{request_type.title()}Request")
-        ret_data = await method(request, account_jid, node, ap_account, ap_url)
-        request.setHeader("content-type", CONTENT_TYPE_AP)
-        request.write(json.dumps(ret_data).encode())
+        request_type, extra_args = self.apg.parseAPURL(ap_url)
+        if len(extra_args) == 0:
+            if request_type != "shared_inbox":
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            ret_data = await self.APInboxRequest(
+                request, None, None, None, ap_url, signing_actor
+            )
+        else:
+            if len(extra_args) > 1:
+                log.warning(f"unexpected extra arguments: {extra_args!r}")
+            ap_account = extra_args[0]
+            account_jid, node = await self.apg.getJIDAndNode(ap_account)
+            if request_type not in AP_REQUEST_TYPES.get(
+                    request.method.decode().upper(), []
+            ):
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            method = getattr(self, f"AP{request_type.title()}Request")
+            ret_data = await method(
+                request, account_jid, node, ap_account, ap_url, signing_actor
+            )
+        if ret_data is not None:
+            request.setHeader("content-type", CONTENT_TYPE_AP)
+            request.write(json.dumps(ret_data).encode())
         request.finish()
 
+    async def APPostRequest(self, request: "HTTPRequest"):
+        try:
+            signing_actor = await self.checkSignature(request)
+        except exceptions.EncryptionError as e:
+            self.responseCode(
+                request,
+                http.FORBIDDEN,
+                f"invalid signature: {e}"
+            )
+            request.finish()
+            return
+
+        return await self.APRequest(request, signing_actor)
+
+    async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
+        """That that signing actor correspond to actor declared in data
+
+        @param data: request payload
+        @param signing_actor: actor ID of the signing entity, as returned by
+            checkSignature
+        @raise exceptions.NotFound: no actor found in data
+        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
+        """
+        actor = await self.apg.apGetSenderActor(data)
+
+        if signing_actor != actor:
+            raise exceptions.EncryptionError(
+                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
+            )
+
+    async def checkSignature(self, request: "HTTPRequest") -> str:
+        """Check and validate HTTP signature
+
+        @return: id of the signing actor
+
+        @raise exceptions.EncryptionError: signature is not present or doesn't match
+        """
+        signature = request.getHeader("Signature")
+        if signature is None:
+            raise exceptions.EncryptionError("No signature found")
+        sign_data = {
+            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
+            for m in RE_SIG_PARAM.finditer(signature)
+        }
+        try:
+            key_id = sign_data["keyId"]
+        except KeyError:
+            raise exceptions.EncryptionError('"keyId" is missing from signature')
+        algorithm = sign_data.get("algorithm", HS2019)
+        signed_headers = sign_data.get(
+            "headers",
+            "(created)" if algorithm==HS2019 else "date"
+        ).lower().split()
+        try:
+            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
+        except KeyError:
+            raise exceptions.InternalError(
+                f"there should be a list of headers for {request.method} method"
+            )
+        if not headers_to_check:
+            raise exceptions.InternalError("headers_to_check must not be empty")
+
+        for header in headers_to_check:
+            if isinstance(header, tuple):
+                if len(set(header).intersection(signed_headers)) == 0:
+                    raise exceptions.EncryptionError(
+                        f"at least one of following header must be signed: {header}"
+                    )
+            elif header not in signed_headers:
+                raise exceptions.EncryptionError(
+                    f"the {header!r} header must be signed"
+                )
+
+        body = request.content.read()
+        request.content.seek(0)
+        headers = {}
+        for to_sign in signed_headers:
+            if to_sign == "(request-target)":
+                method = request.method.decode().lower()
+                uri = parse.unquote(request.uri.decode())
+                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
+            elif to_sign in ("(created)", "(expires)"):
+                if algorithm != HS2019:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
+                        "algorithm"
+                    )
+                key = to_sign[1:-1]
+                value = sign_data.get(key)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        "{key!r} parameter is missing from signature"
+                    )
+                try:
+                    if float(value) < 0:
+                        raise ValueError
+                except ValueError:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign} must be a Unix timestamp"
+                    )
+                headers[to_sign] = value
+            else:
+                value = request.getHeader(to_sign)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        f"value of header {to_sign!r} is missing!"
+                    )
+                elif to_sign == "host":
+                    # we check Forwarded/X-Forwarded-Host headers
+                    # as we need original host if a proxy has modified the header
+                    forwarded = request.getHeader("forwarded")
+                    if forwarded is not None:
+                        try:
+                            host = [
+                                f[5:] for f in forwarded.split(";")
+                                if f.startswith("host=")
+                            ][0] or None
+                        except IndexError:
+                            host = None
+                    else:
+                        host = None
+                    if host is None:
+                        host = request.getHeader("x-forwarded-host")
+                    if host:
+                        value = host
+                elif to_sign == "digest":
+                    hashes = {
+                        algo.lower(): hash_ for algo, hash_ in (
+                            digest.split("=", 1) for digest in value.split(",")
+                        )
+                    }
+                    try:
+                        given_digest = hashes["sha-256"]
+                    except KeyError:
+                        raise exceptions.EncryptionError(
+                            "Only SHA-256 algorithm is currently supported for digest"
+                        )
+                    __, computed_digest = self.apg.getDigest(body)
+                    if given_digest != computed_digest:
+                        raise exceptions.EncryptionError(
+                            f"SHA-256 given and computed digest differ:\n"
+                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
+                        )
+                headers[to_sign] = value
+
+        # date check
+        limit_ts = time.time() + SIGN_EXP
+        if "(created)" in headers:
+            created = float(headers["created"])
+        else:
+            created = date_utils.date_parse(headers["date"])
+
+
+        try:
+            expires = float(headers["expires"])
+        except KeyError:
+            pass
+        else:
+            if expires < created:
+                log.warning(
+                    f"(expires) [{expires}] set in the past of (created) [{created}] "
+                    "ignoring it according to specs"
+                )
+            else:
+                limit_ts = min(limit_ts, expires)
+
+        if created > limit_ts:
+            raise exceptions.EncryptionError("Signature has expired")
+
+        return await self.apg.checkSignature(
+            sign_data["signature"],
+            key_id,
+            headers
+        )
+
     def render(self, request):
         request.setHeader("server", VERSION)
         return super().render(request)
@@ -286,6 +668,13 @@
 
         return web_resource.NoResource().render(request)
 
+    def render_POST(self, request):
+        path = request.path.decode().lstrip("/")
+        if not path.startswith(self.apg.ap_path):
+            return web_resource.NoResource().render(request)
+        defer.ensureDeferred(self.APPostRequest(request))
+        return server.NOT_DONE_YET
+
 
 class HTTPRequest(server.Request):
     pass
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Tue Mar 22 17:00:42 2022 +0100
@@ -16,19 +16,41 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional, List
+from typing import Optional, Tuple, List, Dict, Any
 
+from twisted.internet import defer
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
-from wokkel import rsm
+from wokkel import rsm, pubsub, data_form
 
 from sat.core.i18n import _
+from sat.core import exceptions
 from sat.core.log import getLogger
+from sat.core.constants import Const as C
 from sat.tools.utils import ensure_deferred
+from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
+
+from .constants import (
+    TYPE_ACTOR,
+)
 
 
 log = getLogger(__name__)
 
+# all nodes have the same config
+NODE_CONFIG = [
+    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+    {"var": "pubsub#max_items", "value": "max"},
+    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+
+]
+
+NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
+NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
+for c in NODE_CONFIG:
+    NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
+
 
 class APPubsubService(rsm.PubSubService):
     """Pubsub service for XMPP requests"""
@@ -43,6 +65,31 @@
             "name": "Libervia ActivityPub Gateway",
         }
 
+    async def getAPActorIdsAndInbox(
+        self,
+        requestor: jid.JID,
+        recipient: jid.JID,
+    ) -> Tuple[str, str, str]:
+        """Get AP actor IDs from requestor and destinee JIDs
+
+        @param requestor: XMPP entity doing a request to an AP actor via the gateway
+        @param recipient: JID mapping an AP actor via the gateway
+        @return: requestor actor ID, recipient actor ID and recipient inbox
+        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
+            in requestor
+        """
+        if not recipient.user:
+            raise error.StanzaError(
+                "item-not-found",
+                text="No user part specified"
+            )
+        requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
+        recipient_account = self.apg._e.unescape(recipient.user)
+        recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
+        inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
+        return requestor_actor_id, recipient_actor_id, inbox
+
+
     @ensure_deferred
     async def publish(self, requestor, service, nodeIdentifier, items):
         raise NotImplementedError
@@ -56,55 +103,193 @@
         maxItems: Optional[int],
         itemIdentifiers: Optional[List[str]],
         rsm_req: Optional[rsm.RSMRequest]
-    ) -> List[domish.Element]:
+    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
         if not service.user:
-            return []
+            return [], None
         ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
         if ap_account.count("@") != 1:
             log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
-            return []
-        if node != self.apg._m.namespace:
+            return [], None
+        if not node.startswith(self.apg._m.namespace):
             raise error.StanzaError(
                 "feature-not-implemented",
-                text=f"{VERSION} only supports {self.apg._m.namespace} "
+                text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
                 "node for now"
             )
-        if rsm_req is None:
-            if maxItems is None:
-                maxItems = 20
-            kwargs = {
-                "max_items": maxItems,
-                "chronological_pagination": False,
-            }
+        client = self.apg.client
+        cached_node = await self.host.memory.storage.getPubsubNode(
+            client, service, node
+        )
+        # TODO: check if node is synchronised
+        if cached_node is not None:
+            # the node is cached, we return items from cache
+            log.debug(f"node {node!r} from {service} is in cache")
+            pubsub_items, metadata = await self.apg._c.getItemsFromCache(
+                client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+            )
+            try:
+                rsm_resp = rsm.RSMResponse(**metadata["rsm"])
+            except KeyError:
+                rsm_resp = None
+            return [i.data for i in pubsub_items], rsm_resp
+
+        if itemIdentifiers:
+            items = []
+            for item_id in itemIdentifiers:
+                item_data = await self.apg.apGet(item_id)
+                item_elt = await self.apg.apItem2Elt(item_data)
+                items.append(item_elt)
+            return items, None
         else:
-            if len(
-                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
-                 if v is not None]
-            ) > 1:
+            if rsm_req is None:
+                if maxItems is None:
+                    maxItems = 20
+                kwargs = {
+                    "max_items": maxItems,
+                    "chronological_pagination": False,
+                }
+            else:
+                if len(
+                    [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                     if v is not None]
+                ) > 1:
+                    raise error.StanzaError(
+                        "bad-request",
+                        text="You can't use after, before and index at the same time"
+                    )
+                kwargs = {"max_items": rsm_req.max}
+                if rsm_req.after is not None:
+                    kwargs["after_id"] = rsm_req.after
+                elif rsm_req.before is not None:
+                    kwargs["chronological_pagination"] = False
+                    if rsm_req.before != "":
+                        kwargs["after_id"] = rsm_req.before
+                elif rsm_req.index is not None:
+                    kwargs["start_index"] = rsm_req.index
+
+            log.info(
+                f"No cache found for node {node} at {service} (AP account {ap_account}), "
+                "using Collection Paging to RSM translation"
+            )
+            if self.apg._m.isCommentsNode(node):
+                parent_node = self.apg._m.getParentNode(node)
+                try:
+                    parent_data = await self.apg.apGet(parent_node)
+                    collection = await self.apg.apGetObject(
+                        parent_data.get("object", {}),
+                        "replies"
+                    )
+                except Exception as e:
+                    raise error.StanzaError(
+                        "item-not-found",
+                        text=e
+                    )
+            else:
+                actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
+                collection = await self.apg.apGetObject(actor_data, "outbox")
+            if not collection:
                 raise error.StanzaError(
-                    "bad-request",
-                    text="You can't use after, before and index at the same time"
+                    "item-not-found",
+                    text=f"No collection found for node {node!r} (account: {ap_account})"
                 )
-            kwargs = {"max_items": rsm_req.max}
-            if rsm_req.after is not None:
-                kwargs["after_id"] = rsm_req.after
-            elif rsm_req.before is not None:
-                kwargs["chronological_pagination"] = False
-                if rsm_req.before != "":
-                    kwargs["after_id"] = rsm_req.before
-            elif rsm_req.index is not None:
-                kwargs["start_index"] = rsm_req.index
-
-        log.info(
-            f"No cache found for node {node} at {service} (AP account {ap_account}), "
-            "using Collection Paging to RSM translation"
-        )
-        return await self.apg.getAPItems(ap_account, **kwargs)
+            return await self.apg.getAPItems(collection, **kwargs)
 
     @ensure_deferred
     async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
         raise NotImplementedError
 
+    @ensure_deferred
+    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
+        # TODO: handle comments nodes
+        client = self.apg.client
+        node = await self.host.memory.storage.getPubsubNode(
+            client, service, nodeIdentifier, with_subscriptions=True
+        )
+        if node is None:
+            node = await self.host.memory.storage.setPubsubNode(
+                client,
+                service,
+                nodeIdentifier,
+            )
+            subscription = None
+        else:
+            try:
+                subscription = next(
+                    s for s in node.subscriptions
+                    if s.subscriber == requestor.userhostJID()
+                )
+            except StopIteration:
+                subscription = None
+
+        if subscription is None:
+            subscription = PubsubSub(
+                subscriber=requestor.userhostJID(),
+                state=SubscriptionState.PENDING
+            )
+            node.subscriptions.append(subscription)
+            await self.host.memory.storage.add(node)
+        else:
+            if subscription.state is None:
+                subscription.state = SubscriptionState.PENDING
+                await self.host.memory.storage.add(node)
+            elif subscription.state == SubscriptionState.SUBSCRIBED:
+                log.info(
+                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
+                    f"at {service}. Doing the request anyway."
+                )
+            elif subscription.state == SubscriptionState.PENDING:
+                log.info(
+                    f"{requestor.userhostJID()} has already a pending subscription to "
+                    f"{node!r} at {service}. Doing the request anyway."
+                )
+            else:
+                raise exceptions.InternalError(
+                    f"unmanaged subscription state: {subscription.state}"
+                )
+
+        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
+            requestor, service
+        )
+
+        data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
+
+        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
+        if resp.code >= 400:
+            text = await resp.text()
+            raise error.StanzaError("service-unavailable", text=text)
+        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
+
+    @ensure_deferred
+    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
+        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
+            requestor, service
+        )
+        data = self.apg.createActivity(
+            "Undo",
+            req_actor_id,
+            self.apg.createActivity(
+                "Follow",
+                req_actor_id,
+                recip_actor_id
+            )
+        )
+
+        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
+        if resp.code >= 400:
+            text = await resp.text()
+            raise error.StanzaError("service-unavailable", text=text)
+
+    def getConfigurationOptions(self):
+        return NODE_OPTIONS
+
+    def getConfiguration(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str
+    ) -> defer.Deferred:
+        return defer.succeed(NODE_CONFIG_VALUES)
+
     def getNodeInfo(
         self,
         requestor: jid.JID,
@@ -117,13 +302,6 @@
             return None
         info = {
             "type": "leaf",
-            "meta-data": [
-                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
-                {"var": "pubsub#max_items", "value": "max"},
-                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
-                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
-
-            ]
-
+            "meta-data": NODE_CONFIG
         }
         return info