changeset 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents b2ade5ecdbab
children ea204216a505
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/http_server.py sat/plugins/plugin_comp_ap_gateway/pubsub_service.py
diffstat 4 files changed, 574 insertions(+), 97 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri May 13 19:12:33 2022 +0200
@@ -23,7 +23,7 @@
 from pathlib import Path
 from pprint import pformat
 import re
-from typing import Any, Dict, List, Optional, Tuple, Union, overload
+from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload
 from urllib import parse
 
 from cryptography.exceptions import InvalidSignature
@@ -47,7 +47,7 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
+from sat.memory.sqla_mapping import SubscriptionState
 from sat.tools import utils
 from sat.tools.common import data_format, tls, uri
 from sat.tools.common.async_utils import async_lru
@@ -80,7 +80,7 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
     C.PI_HANDLER: C.BOOL_TRUE,
@@ -100,10 +100,15 @@
     def __init__(self, host):
         self.host = host
         self.initialised = False
+        self.client = None
         self._m = host.plugins["XEP-0277"]
         self._p = host.plugins["XEP-0060"]
         self._e = host.plugins["XEP-0106"]
+        self._pps = host.plugins["XEP-0465"]
         self._c = host.plugins["PUBSUB_CACHE"]
+        self._p.addManagedNode(
+            "", items_cb=self._itemsReceived
+        )
         self.pubsub_service = APPubsubService(self)
 
         host.bridge.addMethod(
@@ -213,6 +218,30 @@
         self.client = client
         await self.init(client)
 
+    async def _itemsReceived(self, client, itemsEvent):
+        """Callback called when pubsub items are received
+
+        if the items are adressed to a JID corresponding to an AP actor, they are
+        converted to AP items and sent to the corresponding AP server.
+
+        If comments nodes are linked, they are automatically subscribed to get new items
+        from there too.
+        """
+        if client != self.client:
+            return
+        # we need recipient as JID and not gateway own JID to be able to use methods such
+        # as "subscribe"
+        client = self.client.getVirtualClient(itemsEvent.sender)
+        recipient = itemsEvent.recipient
+        if not recipient.user:
+            log.debug("ignoring items event without local part specified")
+            return
+
+        ap_account = self._e.unescape(recipient.user)
+        await self.convertAndPostItems(
+            client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items
+        )
+
     async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
         """Get client for this component with a specified jid
 
@@ -221,8 +250,7 @@
         @param actor_id: ID of the actor
         @return: virtual client
         """
-        account = await self.getAPAccountFromId(actor_id)
-        local_jid = self.getLocalJIDFromAccount(account)
+        local_jid = await self.getJIDFromId(actor_id)
         return self.client.getVirtualClient(local_jid)
 
     def isActivity(self, data: dict) -> bool:
@@ -288,7 +316,12 @@
                 "was expecting a string or a dict, got {type(value)}: {value!r}}"
             )
 
-    async def apGetList(self, data: dict, key: str) -> Optional[List[dict]]:
+    async def apGetList(
+        self,
+        data: dict,
+        key: str,
+        only_ids: bool = False
+    ) -> Optional[List[Dict[str, Any]]]:
         """Retrieve a list of objects from AP data, dereferencing when necessary
 
         This method is to be used with non functional vocabularies. Use ``apGetObject``
@@ -296,6 +329,7 @@
         If the value is a dictionary, it will be wrapped in a list
         @param data: AP object where a list of objects is looked for
         @param key: key of the list to look for
+        @param only_ids: if Trye, only items IDs are retrieved
         @return: list of objects, or None if the key is not present
         """
         value = data.get(key)
@@ -307,7 +341,13 @@
             return [value]
         if not isinstance(value, list):
             raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
-        return [await self.apGetObject(i) for i in value]
+        if only_ids:
+            return [
+                {"id": v["id"]} if isinstance(v, dict) else {"id": v}
+                for v in value
+            ]
+        else:
+            return [await self.apGetObject(i) for i in value]
 
     async def apGetActors(
         self,
@@ -496,8 +536,8 @@
 
         @param ap_account: ActivityPub account handle (``username@domain.tld``)
         @return: service JID and pubsub node
-            if pubsub is None, default microblog pubsub node (and possibly other nodes
-            that plugins may hanlde) will be used
+            if pubsub node is None, default microblog pubsub node (and possibly other
+            nodes that plugins may hanlde) will be used
         @raise ValueError: invalid account
         @raise PermissionError: non local jid is used when gateway doesn't allow them
         """
@@ -570,6 +610,24 @@
             )
         )
 
+    async def getJIDFromId(self, actor_id: str) -> jid.JID:
+        """Compute JID linking to an AP Actor ID
+
+        The local jid is computer by escaping AP actor handle and using it as local part
+        of JID, where domain part is this gateway own JID
+        If the actor_id comes from local server (checked with self.public_url), it means
+        that we have an XMPP entity, and the original JID is returned
+        """
+        if self.isLocalURL(actor_id):
+            request_type, extra_args = self.parseAPURL(actor_id)
+            if request_type != TYPE_ACTOR or len(extra_args) != 1:
+                raise ValueError(f"invalid actor id: {actor_id!r}")
+            actor_jid, __ = await self.getJIDAndNode(extra_args[0])
+            return actor_jid
+
+        account = await self.getAPAccountFromId(actor_id)
+        return self.getLocalJIDFromAccount(account)
+
     def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
         """Parse an URL leading to an AP endpoint
 
@@ -591,6 +649,13 @@
             str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
         )
 
+    def isLocalURL(self, url: str) -> bool:
+        """Tells if an URL link to this component
+
+        ``public_url`` and ``ap_path`` are used to check the URL
+        """
+        return url.startswith(self.base_ap_url)
+
     def buildSignatureHeader(self, values: Dict[str, str]) -> str:
         """Build key="<value>" signature header from signature data"""
         fields = []
@@ -752,6 +817,43 @@
         new_headers["Signature"] = self.buildSignatureHeader(sign_data)
         return new_headers, sign_data
 
+    async def convertAndPostItems(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        service: jid.JID,
+        node: str,
+        items: List[domish.Element],
+        subscribe_comments_nodes: bool = False,
+    ) -> None:
+        """Convert XMPP items to AP items and post them to actor inbox
+
+        @param ap_account: account of ActivityPub actor
+        @param service: JID of the virtual pubsub service corresponding to the AP actor
+        @param node: virtual node corresponding to the AP actor and items
+        @param subscribe_comments_nodes: if True, comment nodes present in given items,
+            they will be automatically subscribed
+        """
+        actor_id = await self.getAPActorIdFromAccount(ap_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+        for item in items:
+            mb_data = await self._m.item2mbdata(client, item, service, node)
+            if subscribe_comments_nodes:
+                # we subscribe automatically to comment nodes if any
+                for comment_data in mb_data.get("comments", []):
+                    comment_service = jid.JID(comment_data["service"])
+                    comment_node = comment_data["node"]
+                    await self._p.subscribe(client, comment_service, comment_node)
+            ap_item = await self.mbdata2APitem(client, mb_data)
+            url_actor = ap_item["object"]["attributedTo"]
+            resp = await self.signAndPost(inbox, url_actor, ap_item)
+            if resp.code >= 300:
+                text = await resp.text()
+                log.warning(
+                    f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
+                    f"{pformat(ap_item)}"
+                )
+
     async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
         """Sign a documentent and post it to AP server
 
@@ -873,6 +975,8 @@
         chronological_pagination: bool = True,
         after_id: Optional[str] = None,
         start_index: Optional[int] = None,
+        parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
+        only_ids: bool = False,
     ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
         """Retrieve AP items and convert them to XMPP items
 
@@ -892,9 +996,18 @@
         @param start_index: start retrieving items from the one with given index
             Due to ActivityStream Collection Paging limitations, this is inefficient and
             all pages before the requested index will be retrieved to count items.
+        @param parser: method to use to parse AP items and get XMPP item elements
+            if None, use default generic parser
+        @param only_ids: if True, only retrieve items IDs
+            Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
+            sufficient in some use cases (e.g. when retrieving following/followers
+            collections)
         @return: XMPP Pubsub items and corresponding RSM Response
             Items are always returned in chronological order in the result
         """
+        if parser is None:
+            parser = self.apItem2Elt
+
         rsm_resp: Dict[str, Union[bool, int]] = {}
         try:
             count = collection["totalItems"]
@@ -929,7 +1042,9 @@
                 retrieved_items = 0
                 current_page = collection["last"]
                 while retrieved_items < count:
-                    page_data, items = await self.parseAPPage(current_page)
+                    page_data, items = await self.parseAPPage(
+                        current_page, parser, only_ids
+                    )
                     if not items:
                         log.warning(f"found an empty AP page at {current_page}")
                         return [], rsm_resp
@@ -963,7 +1078,7 @@
         found_after_id = False
 
         while retrieved_items < count:
-            __, page_items = await self.parseAPPage(page)
+            __, page_items = await self.parseAPPage(page, parser, only_ids)
             if not page_items:
                 break
             retrieved_items += len(page_items)
@@ -1019,40 +1134,6 @@
 
         return items, rsm.RSMResponse(**rsm_resp)
 
-    async def parseAPPage(
-        self,
-        page: Union[str, dict]
-    ) -> Tuple[dict, List[domish.Element]]:
-        """Convert AP objects from an AP page to XMPP items
-
-        @param page: Can be either url linking and AP page, or the page data directly
-        @return: page data, pubsub items
-        """
-        page_data = await self.apGetObject(page)
-        if page_data is None:
-            log.warning('No data found in collection')
-            return {}, []
-        ap_items = await self.apGetList(page_data, "orderedItems")
-        if ap_items is None:
-            ap_items = await self.apGetList(page_data, "items")
-            if not ap_items:
-                log.warning(f'No item field found in collection: {page_data!r}')
-                return page_data, []
-            else:
-                log.warning(
-                    "Items are not ordered, this is not spec compliant"
-                )
-        items = []
-        # AP Collections are in antichronological order, but we expect chronological in
-        # Pubsub, thus we reverse it
-        for ap_item in reversed(ap_items):
-            try:
-                items.append(await self.apItem2Elt(ap_item))
-            except (exceptions.DataError, NotImplementedError, error.StanzaError):
-                continue
-
-        return page_data, items
-
     async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
         """Convert AP item to parsed microblog data and corresponding item element"""
         mb_data = await self.apItem2MBdata(ap_item)
@@ -1067,6 +1148,44 @@
         __, item_elt = await self.apItem2MbDataAndElt(ap_item)
         return item_elt
 
+    async def parseAPPage(
+        self,
+        page: Union[str, dict],
+        parser: Callable[[dict], Awaitable[domish.Element]],
+        only_ids: bool = False
+    ) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
+
+        @param page: Can be either url linking and AP page, or the page data directly
+        @param parser: method to use to parse AP items and get XMPP item elements
+        @param only_ids: if True, only retrieve items IDs
+        @return: page data, pubsub items
+        """
+        page_data = await self.apGetObject(page)
+        if page_data is None:
+            log.warning('No data found in collection')
+            return {}, []
+        ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids)
+        if ap_items is None:
+            ap_items = await self.apGetList(page_data, "items", only_ids=only_ids)
+            if not ap_items:
+                log.warning(f'No item field found in collection: {page_data!r}')
+                return page_data, []
+            else:
+                log.warning(
+                    "Items are not ordered, this is not spec compliant"
+                )
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                items.append(await parser(ap_item))
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+
+        return page_data, items
+
     async def getCommentsNodes(
         self,
         item_id: str,
@@ -1197,8 +1316,79 @@
 
         return mb_data
 
+    async def getReplyToIdFromXMPPNode(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        parent_item: str,
+        mb_data: dict
+    ) -> str:
+        """Get URL to use for ``inReplyTo`` field in AP item.
+
+        There is currently no way to know the parent service of a comment with XEP-0277.
+        To work around that, we try to check if we have this item in the cache (we
+        should). If there is more that one item with this ID, we first try to find one
+        with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.
+
+        @param ap_account: AP account corresponding to the publication author
+        @param parent_item: ID of the node where the publication this item is replying to
+             has been posted
+        @param mb_data: microblog data of the publication
+        @return: URL to use in ``inReplyTo`` field
+        """
+        # FIXME: propose a protoXEP to properly get parent item, node and service
+
+        found_items = await self.host.memory.storage.searchPubsubItems({
+            "profiles": [client.profile],
+            "names": [parent_item]
+        })
+        if not found_items:
+            log.warning(f"parent item {parent_item!r} not found in cache")
+            parent_ap_account = ap_account
+        elif len(found_items) == 1:
+            cached_node = found_items[0].node
+            parent_ap_account = await self.getAPAccountFromJidAndNode(
+                cached_node.service,
+                cached_node.name
+            )
+        else:
+            # we found several cached item with given ID, we check if there is one
+            # corresponding to this author
+            try:
+                author = jid.JID(mb_data["author_jid"]).userhostJID()
+                cached_item = next(
+                    i for i in found_items
+                    if jid.JID(i.data["publisher"]).userhostJID()
+                    == author
+                )
+            except StopIteration:
+                # no item corresponding to this author, we use ap_account
+                log.warning(
+                    "Can't find a single cached item for parent item "
+                    f"{parent_item!r}"
+                )
+                parent_ap_account = ap_account
+            else:
+                cached_node = cached_item.node
+                parent_ap_account = await self.getAPAccountFromJidAndNode(
+                    cached_node.service,
+                    cached_node.name
+                )
+
+        return self.buildAPURL(
+            TYPE_ITEM, parent_ap_account, parent_item
+        )
+
     async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
         """Convert Libervia Microblog Data to ActivityPub item"""
+        try:
+            node = mb_data["node"]
+            service = jid.JID(mb_data["service"])
+        except KeyError:
+            # node and service must always be specified when this method is used
+            raise exceptions.InternalError(
+                "node or service is missing in mb_data"
+            )
         if not mb_data.get("id"):
             mb_data["id"] = shortuuid.uuid()
         if not mb_data.get("author_jid"):
@@ -1209,21 +1399,48 @@
         )
         url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
         url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
-        return {
+        ap_object = {
+            "id": url_item,
+            "type": "Note",
+            "published": utils.xmpp_date(mb_data["published"]),
+            "attributedTo": url_actor,
+            "content": mb_data.get("content_xhtml") or mb_data["content"],
+            "to": ["https://www.w3.org/ns/activitystreams#Public"],
+        }
+
+        target_ap_account = self._e.unescape(service.user)
+        actor_data = await self.getAPActorDataFromAccount(target_ap_account)
+        followers = actor_data.get("followers")
+        if followers:
+            ap_object["cc"] = [followers]
+
+        ap_item = {
             "@context": "https://www.w3.org/ns/activitystreams",
             "id": url_item,
             "type": "Create",
             "actor": url_actor,
 
-            "object": {
-                "id": url_item,
-                "type": "Note",
-                "published": utils.xmpp_date(mb_data["published"]),
-                "attributedTo": url_actor,
-                "content": mb_data.get("content_xhtml") or mb_data["content"],
-                "to": "https://www.w3.org/ns/activitystreams#Public"
-            }
+            "object": ap_object
         }
+        language = mb_data.get("language")
+        if language:
+            ap_object["contentMap"] = {language: ap_object["content"]}
+        if self._m.isCommentNode(node):
+            parent_item = self._m.getParentItem(node)
+            if service.host == self.client.jid.userhost():
+                # the publication is on a virtual node (i.e. an XMPP node managed by
+                # this gateway and linking to an ActivityPub actor)
+                ap_object["inReplyTo"] = parent_item
+            else:
+                # the publication is from a followed real XMPP node
+                ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
+                    client,
+                    ap_account,
+                    parent_item,
+                    mb_data
+                )
+
+        return ap_item
 
     async def publishMessage(
         self,
@@ -1264,6 +1481,56 @@
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
+    async def newReplyToXMPPItem(
+        self,
+        client: SatXMPPEntity,
+        ap_item: dict,
+    ) -> None:
+        """We got an AP item which is a reply to an XMPP item"""
+        in_reply_to = ap_item["inReplyTo"]
+        url_type, url_args = self.parseAPURL(in_reply_to)
+        if url_type != "item":
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with an unexpected URL "
+                f"type({url_type!r}):\n{pformat(ap_item)}"
+            )
+            return
+        try:
+            parent_item_account, parent_item_id = url_args[0].split("/", 1)
+        except (IndexError, ValueError):
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
+                f"({in_reply_to!r}):\n{pformat(ap_item)}"
+            )
+            return
+        parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account)
+        if parent_item_node is None:
+            parent_item_node = self._m.namespace
+        items, __ = await self._p.getItems(
+            client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
+        )
+        try:
+            parent_item_elt = items[0]
+        except IndexError:
+            log.warning(
+                f"Can't find parent item at {parent_item_service} (node "
+                f"{parent_item_node!r})\n{pformat(ap_item)}")
+            return
+        parent_item_parsed = await self._m.item2mbdata(
+            client, parent_item_elt, parent_item_service, parent_item_node
+        )
+        try:
+            comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
+            comment_node = parent_item_parsed["comments"][0]["node"]
+        except (KeyError, IndexError):
+            # we don't have a comment node set for this item
+            from sat.tools.xml_tools import ppElt
+            log.info(f"{ppElt(parent_item_elt.toXml())}")
+            raise NotImplemented()
+        else:
+            __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+            await self._p.publish(client, comment_service, comment_node, [item_elt])
+
     async def newAPItem(
         self,
         client: SatXMPPEntity,
@@ -1279,8 +1546,15 @@
         """
         service = client.jid
         in_reply_to = item.get("inReplyTo")
+        if in_reply_to and isinstance(in_reply_to, list):
+            in_reply_to = in_reply_to[0]
         if in_reply_to and isinstance(in_reply_to, str):
-            # this item is a reply, we use or create a corresponding node for comments
+            if self.isLocalURL(in_reply_to):
+                # this is a reply to an XMPP item
+                return await self.newReplyToXMPPItem(client, item)
+
+            # this item is a reply to an AP item, we use or create a corresponding node
+            # for comments
             parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
             node = parent_node or node
             cached_node = await self.host.memory.storage.getPubsubNode(
@@ -1312,7 +1586,8 @@
             )
             if cached_node is None:
                 log.warning(
-                    f"Received item in unknown node {node!r} at {service}\n{item}"
+                    f"Received item in unknown node {node!r} at {service}. This may be "
+                    f"due to a cache purge. We synchronise the node\n{item}"
 
                 )
                 return
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Fri May 13 19:12:33 2022 +0200
@@ -24,15 +24,18 @@
 TYPE_INBOX = "inbox"
 TYPE_SHARED_INBOX = "shared_inbox"
 TYPE_OUTBOX = "outbox"
+TYPE_FOLLOWERS = "followers"
+TYPE_FOLLOWING = "following"
 TYPE_ITEM = "item"
 MEDIA_TYPE_AP = "application/activity+json"
+NS_AP_PUBLIC = "https://www.w3.org/ns/activitystreams#Public"
 # mapping from AP metadata to microblog data
 AP_MB_MAP = {
     "content": "content_xhtml",
 
 }
 AP_REQUEST_TYPES = {
-    "GET": {"actor", "outbox"},
+    "GET": {TYPE_ACTOR, TYPE_OUTBOX, TYPE_FOLLOWERS, TYPE_FOLLOWING},
     "POST": {"inbox"},
 }
 # headers to check for signature
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri May 13 19:12:33 2022 +0200
@@ -17,7 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import time
-from typing import Optional, Dict, List
+from typing import Optional, Dict, List, Any
 import json
 from urllib import parse
 from collections import deque
@@ -25,6 +25,7 @@
 from pprint import pformat
 
 from twisted.web import http, resource as web_resource, server
+from twisted.python import failure
 from twisted.internet import reactor, defer
 from twisted.words.protocols.jabber import jid, error
 from wokkel import pubsub, rsm
@@ -33,13 +34,14 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.tools import utils
 from sat.tools.common import date_utils
 from sat.memory.sqla_mapping import SubscriptionState
 
 from .constants import (
     CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
     AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
-    SIGN_HEADERS, HS2019, SIGN_EXP
+    SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING
 )
 from .regex import RE_SIG_PARAM
 
@@ -71,6 +73,15 @@
         log.warning(msg)
         request.setResponseCode(http_code, None if msg is None else msg.encode())
 
+    def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
+        log.error(f"Internal error: {failure_.value}")
+        self.responseCode(
+            request,
+            http.INTERNAL_SERVER_ERROR,
+            f"internal error: {failure_.value}"
+        )
+        request.finish()
+
     async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
         query = parse.parse_qs(url_parsed.query)
@@ -97,6 +108,45 @@
         request.write(json.dumps(resp).encode())
         request.finish()
 
+    async def handleUndoActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            actor = await self.apg.apGetSenderActor(obj)
+            if actor != signing_actor:
+                log.warning(f"ignoring object not attributed to signing actor: {data}")
+                continue
+            try:
+                target_account = obj["object"]
+            except KeyError:
+                log.warning(f'ignoring invalid object, missing "object" key: {data}')
+                continue
+            if not self.apg.isLocalURL(target_account):
+                log.warning(f"ignoring unfollow request to non local actor: {data}")
+                continue
+
+            if type_ == "Follow":
+                await self.apg._p.unsubscribe(
+                    client,
+                    account_jid,
+                    node,
+                    sender=client.jid,
+                )
+            else:
+                log.warning(f"Unmanaged undo type: {type_!r}")
+
     async def handleFollowActivity(
         self,
         request: "HTTPRequest",
@@ -114,7 +164,9 @@
             subscription = await self.apg._p.subscribe(
                 client,
                 account_jid,
-                node
+                node,
+                # subscriptions from AP are always public
+                options=self.apg._pps.setPublicOpt()
             )
         except pubsub.SubscriptionPending:
             log.info(f"subscription to node {node!r} of {account_jid} is pending")
@@ -129,6 +181,7 @@
                 "Accept", actor_id, object_=data
             )
             await self.apg.signAndPost(inbox, actor_id, accept_data)
+        await self.apg._c.synchronise(client, account_jid, node, resync=False)
 
     async def handleAcceptActivity(
         self,
@@ -216,9 +269,11 @@
         actor_url: str,
         signing_actor: Optional[str]
     ) -> dict:
-        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account)
         shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
-        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+        outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+        followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account)
+        following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account)
 
         # we have to use AP account as preferredUsername because it is used to retrieve
         # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
@@ -232,8 +287,10 @@
             "id": actor_url,
             "type": "Person",
             "preferredUsername": preferred_username,
-            "inbox": inbox_url,
-            "outbox": outbox_url,
+            "inbox": inbox,
+            "outbox": outbox,
+            "followers": followers,
+            "following": following,
             "publicKey": {
                 "id": f"{actor_url}#main-key",
                 "owner": actor_url,
@@ -280,6 +337,8 @@
         ap_url: str,
         query_data: Dict[str, List[str]]
     ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
         # we only keep useful keys, and sort to have consistent URL which can
         # be used as ID
         url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
@@ -443,6 +502,83 @@
                 request, data, account_jid, node, ap_account, ap_url, signing_actor
             )
 
+    async def APFollowersRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+        client = self.apg.client
+        subscribers = await self.apg._pps.getPublicNodeSubscriptions(
+            client, account_jid, node
+        )
+        followers = []
+        for subscriber in subscribers.keys():
+            if subscriber.host == self.apg.client.jid.userhost():
+                # the subscriber is an AP user subscribed with this gateway
+                ap_account = self.apg._e.unescape(subscriber.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node)
+            followers.append(ap_account)
+
+        url = self.getCanonicalURL(request)
+        return {
+          "@context": "https://www.w3.org/ns/activitystreams",
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscribers),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": followers
+          }
+        }
+
+    async def APFollowingRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict[str, Any]:
+        client = self.apg.client
+        subscriptions = await self.apg._pps.subscriptions(
+            client, account_jid, node
+        )
+        following = []
+        for sub_dict in subscriptions:
+            service = jid.JID(sub_dict["service"])
+            if service.host == self.apg.client.jid.userhost():
+                # the subscription is to an AP actor with this gateway
+                ap_account = self.apg._e.unescape(service.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.getAPAccountFromJidAndNode(
+                    service, sub_dict["node"]
+                )
+            following.append(ap_account)
+
+        url = self.getCanonicalURL(request)
+        return {
+          "@context": "https://www.w3.org/ns/activitystreams",
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscriptions),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": following
+          }
+        }
+
     async def APRequest(
         self,
         request: "HTTPRequest",
@@ -490,7 +626,10 @@
             request.finish()
             return
 
-        return await self.APRequest(request, signing_actor)
+        try:
+            return await self.APRequest(request, signing_actor)
+        except Exception as e:
+            self._onRequestError(failure.Failure(e), request)
 
     async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
         """That that signing actor correspond to actor declared in data
@@ -663,7 +802,8 @@
             defer.ensureDeferred(self.webfinger(request))
             return server.NOT_DONE_YET
         elif path.startswith(self.apg.ap_path):
-            defer.ensureDeferred(self.APRequest(request))
+            d = defer.ensureDeferred(self.APRequest(request))
+            d.addErrback(self._onRequestError, request)
             return server.NOT_DONE_YET
 
         return web_resource.NoResource().render(request)
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Fri May 13 19:12:33 2022 +0200
@@ -16,12 +16,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional, Tuple, List, Dict, Any
+from typing import Optional, Tuple, List, Union
 
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
-from wokkel import rsm, pubsub, data_form
+from wokkel import rsm, pubsub, disco
 
 from sat.core.i18n import _
 from sat.core import exceptions
@@ -92,7 +92,45 @@
 
     @ensure_deferred
     async def publish(self, requestor, service, nodeIdentifier, items):
-        raise NotImplementedError
+        if self.apg.local_only and not self.apg.isLocal(requestor):
+            raise error.StanzaError(
+                "forbidden",
+                "Only local users can publish on this gateway."
+            )
+        if not service.user:
+            raise error.StanzaError(
+                "bad-request",
+                "You must specify an ActivityPub actor account in JID user part."
+            )
+        ap_account = self.apg._e.unescape(service.user)
+        if ap_account.count("@") != 1:
+            raise error.StanzaError(
+                "bad-request",
+                f"{ap_account!r} is not a valid ActivityPub actor account."
+            )
+
+        client = self.apg.client.getVirtualClient(requestor)
+        await self.apg.convertAndPostItems(
+            client, ap_account, service, nodeIdentifier, items
+        )
+
+    async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from following collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.getJIDFromId(actor_id)
+        subscription_elt = self.apg._pps.buildSubscriptionElt(
+            self.apg._m.namespace, actor_jid
+        )
+        item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
+        return item_elt
+
+    async def apFollower2Elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from followers collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.getJIDFromId(actor_id)
+        subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid)
+        item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
+        return item_elt
 
     @ensure_deferred
     async def items(
@@ -110,44 +148,63 @@
         if ap_account.count("@") != 1:
             log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
             return [], None
-        if not node.startswith(self.apg._m.namespace):
-            raise error.StanzaError(
-                "feature-not-implemented",
-                text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
-                "node for now"
-            )
+
+        kwargs = {}
+
+        if node == self.apg._pps.subscriptions_node:
+            collection_name = "following"
+            parser = self.apFollowing2Elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        elif node.startswith(self.apg._pps.subscribers_node_prefix):
+            collection_name = "followers"
+            parser = self.apFollower2Elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        else:
+            if not node.startswith(self.apg._m.namespace):
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text=f"AP Gateway {C.APP_VERSION} only supports "
+                    f"{self.apg._m.namespace} node for now"
+                )
+            collection_name = "outbox"
+            parser = self.apg.apItem2Elt
+            use_cache = True
+
         client = self.apg.client
-        cached_node = await self.host.memory.storage.getPubsubNode(
-            client, service, node
-        )
-        # TODO: check if node is synchronised
-        if cached_node is not None:
-            # the node is cached, we return items from cache
-            log.debug(f"node {node!r} from {service} is in cache")
-            pubsub_items, metadata = await self.apg._c.getItemsFromCache(
-                client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+        if use_cache:
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node
             )
-            try:
-                rsm_resp = rsm.RSMResponse(**metadata["rsm"])
-            except KeyError:
-                rsm_resp = None
-            return [i.data for i in pubsub_items], rsm_resp
+            # TODO: check if node is synchronised
+            if cached_node is not None:
+                # the node is cached, we return items from cache
+                log.debug(f"node {node!r} from {service} is in cache")
+                pubsub_items, metadata = await self.apg._c.getItemsFromCache(
+                    client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+                )
+                try:
+                    rsm_resp = rsm.RSMResponse(**metadata["rsm"])
+                except KeyError:
+                    rsm_resp = None
+                return [i.data for i in pubsub_items], rsm_resp
 
         if itemIdentifiers:
             items = []
             for item_id in itemIdentifiers:
                 item_data = await self.apg.apGet(item_id)
-                item_elt = await self.apg.apItem2Elt(item_data)
+                item_elt = await parser(item_data)
                 items.append(item_elt)
             return items, None
         else:
             if rsm_req is None:
                 if maxItems is None:
                     maxItems = 20
-                kwargs = {
+                kwargs.update({
                     "max_items": maxItems,
                     "chronological_pagination": False,
-                }
+                })
             else:
                 if len(
                     [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
@@ -157,7 +214,7 @@
                         "bad-request",
                         text="You can't use after, before and index at the same time"
                     )
-                kwargs = {"max_items": rsm_req.max}
+                kwargs.update({"max_items": rsm_req.max})
                 if rsm_req.after is not None:
                     kwargs["after_id"] = rsm_req.after
                 elif rsm_req.before is not None:
@@ -171,10 +228,10 @@
                 f"No cache found for node {node} at {service} (AP account {ap_account}), "
                 "using Collection Paging to RSM translation"
             )
-            if self.apg._m.isCommentsNode(node):
-                parent_node = self.apg._m.getParentNode(node)
+            if self.apg._m.isCommentNode(node):
+                parent_item = self.apg._m.getParentItem(node)
                 try:
-                    parent_data = await self.apg.apGet(parent_node)
+                    parent_data = await self.apg.apGet(parent_item)
                     collection = await self.apg.apGetObject(
                         parent_data.get("object", {}),
                         "replies"
@@ -186,12 +243,14 @@
                     )
             else:
                 actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
-                collection = await self.apg.apGetObject(actor_data, "outbox")
+                collection = await self.apg.apGetObject(actor_data, collection_name)
             if not collection:
                 raise error.StanzaError(
                     "item-not-found",
                     text=f"No collection found for node {node!r} (account: {ap_account})"
                 )
+
+            kwargs["parser"] = parser
             return await self.apg.getAPItems(collection, **kwargs)
 
     @ensure_deferred