diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children efc34a89e70b
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri May 13 19:12:33 2022 +0200
@@ -23,7 +23,7 @@
 from pathlib import Path
 from pprint import pformat
 import re
-from typing import Any, Dict, List, Optional, Tuple, Union, overload
+from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload
 from urllib import parse
 
 from cryptography.exceptions import InvalidSignature
@@ -47,7 +47,7 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
+from sat.memory.sqla_mapping import SubscriptionState
 from sat.tools import utils
 from sat.tools.common import data_format, tls, uri
 from sat.tools.common.async_utils import async_lru
@@ -80,7 +80,7 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
     C.PI_HANDLER: C.BOOL_TRUE,
@@ -100,10 +100,15 @@
     def __init__(self, host):
         self.host = host
         self.initialised = False
+        self.client = None
         self._m = host.plugins["XEP-0277"]
         self._p = host.plugins["XEP-0060"]
         self._e = host.plugins["XEP-0106"]
+        self._pps = host.plugins["XEP-0465"]
         self._c = host.plugins["PUBSUB_CACHE"]
+        self._p.addManagedNode(
+            "", items_cb=self._itemsReceived
+        )
         self.pubsub_service = APPubsubService(self)
 
         host.bridge.addMethod(
@@ -213,6 +218,30 @@
         self.client = client
         await self.init(client)
 
+    async def _itemsReceived(self, client, itemsEvent):
+        """Callback called when pubsub items are received
+
+        if the items are adressed to a JID corresponding to an AP actor, they are
+        converted to AP items and sent to the corresponding AP server.
+
+        If comments nodes are linked, they are automatically subscribed to get new items
+        from there too.
+        """
+        if client != self.client:
+            return
+        # we need recipient as JID and not gateway own JID to be able to use methods such
+        # as "subscribe"
+        client = self.client.getVirtualClient(itemsEvent.sender)
+        recipient = itemsEvent.recipient
+        if not recipient.user:
+            log.debug("ignoring items event without local part specified")
+            return
+
+        ap_account = self._e.unescape(recipient.user)
+        await self.convertAndPostItems(
+            client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items
+        )
+
     async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
         """Get client for this component with a specified jid
 
@@ -221,8 +250,7 @@
         @param actor_id: ID of the actor
         @return: virtual client
         """
-        account = await self.getAPAccountFromId(actor_id)
-        local_jid = self.getLocalJIDFromAccount(account)
+        local_jid = await self.getJIDFromId(actor_id)
         return self.client.getVirtualClient(local_jid)
 
     def isActivity(self, data: dict) -> bool:
@@ -288,7 +316,12 @@
                 "was expecting a string or a dict, got {type(value)}: {value!r}}"
             )
 
-    async def apGetList(self, data: dict, key: str) -> Optional[List[dict]]:
+    async def apGetList(
+        self,
+        data: dict,
+        key: str,
+        only_ids: bool = False
+    ) -> Optional[List[Dict[str, Any]]]:
         """Retrieve a list of objects from AP data, dereferencing when necessary
 
         This method is to be used with non functional vocabularies. Use ``apGetObject``
@@ -296,6 +329,7 @@
         If the value is a dictionary, it will be wrapped in a list
         @param data: AP object where a list of objects is looked for
         @param key: key of the list to look for
+        @param only_ids: if Trye, only items IDs are retrieved
         @return: list of objects, or None if the key is not present
         """
         value = data.get(key)
@@ -307,7 +341,13 @@
             return [value]
         if not isinstance(value, list):
             raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
-        return [await self.apGetObject(i) for i in value]
+        if only_ids:
+            return [
+                {"id": v["id"]} if isinstance(v, dict) else {"id": v}
+                for v in value
+            ]
+        else:
+            return [await self.apGetObject(i) for i in value]
 
     async def apGetActors(
         self,
@@ -496,8 +536,8 @@
 
         @param ap_account: ActivityPub account handle (``username@domain.tld``)
         @return: service JID and pubsub node
-            if pubsub is None, default microblog pubsub node (and possibly other nodes
-            that plugins may hanlde) will be used
+            if pubsub node is None, default microblog pubsub node (and possibly other
+            nodes that plugins may hanlde) will be used
         @raise ValueError: invalid account
         @raise PermissionError: non local jid is used when gateway doesn't allow them
         """
@@ -570,6 +610,24 @@
             )
         )
 
+    async def getJIDFromId(self, actor_id: str) -> jid.JID:
+        """Compute JID linking to an AP Actor ID
+
+        The local jid is computer by escaping AP actor handle and using it as local part
+        of JID, where domain part is this gateway own JID
+        If the actor_id comes from local server (checked with self.public_url), it means
+        that we have an XMPP entity, and the original JID is returned
+        """
+        if self.isLocalURL(actor_id):
+            request_type, extra_args = self.parseAPURL(actor_id)
+            if request_type != TYPE_ACTOR or len(extra_args) != 1:
+                raise ValueError(f"invalid actor id: {actor_id!r}")
+            actor_jid, __ = await self.getJIDAndNode(extra_args[0])
+            return actor_jid
+
+        account = await self.getAPAccountFromId(actor_id)
+        return self.getLocalJIDFromAccount(account)
+
     def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
         """Parse an URL leading to an AP endpoint
 
@@ -591,6 +649,13 @@
             str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
         )
 
+    def isLocalURL(self, url: str) -> bool:
+        """Tells if an URL link to this component
+
+        ``public_url`` and ``ap_path`` are used to check the URL
+        """
+        return url.startswith(self.base_ap_url)
+
     def buildSignatureHeader(self, values: Dict[str, str]) -> str:
         """Build key="<value>" signature header from signature data"""
         fields = []
@@ -752,6 +817,43 @@
         new_headers["Signature"] = self.buildSignatureHeader(sign_data)
         return new_headers, sign_data
 
+    async def convertAndPostItems(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        service: jid.JID,
+        node: str,
+        items: List[domish.Element],
+        subscribe_comments_nodes: bool = False,
+    ) -> None:
+        """Convert XMPP items to AP items and post them to actor inbox
+
+        @param ap_account: account of ActivityPub actor
+        @param service: JID of the virtual pubsub service corresponding to the AP actor
+        @param node: virtual node corresponding to the AP actor and items
+        @param subscribe_comments_nodes: if True, comment nodes present in given items,
+            they will be automatically subscribed
+        """
+        actor_id = await self.getAPActorIdFromAccount(ap_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+        for item in items:
+            mb_data = await self._m.item2mbdata(client, item, service, node)
+            if subscribe_comments_nodes:
+                # we subscribe automatically to comment nodes if any
+                for comment_data in mb_data.get("comments", []):
+                    comment_service = jid.JID(comment_data["service"])
+                    comment_node = comment_data["node"]
+                    await self._p.subscribe(client, comment_service, comment_node)
+            ap_item = await self.mbdata2APitem(client, mb_data)
+            url_actor = ap_item["object"]["attributedTo"]
+            resp = await self.signAndPost(inbox, url_actor, ap_item)
+            if resp.code >= 300:
+                text = await resp.text()
+                log.warning(
+                    f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
+                    f"{pformat(ap_item)}"
+                )
+
     async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
         """Sign a documentent and post it to AP server
 
@@ -873,6 +975,8 @@
         chronological_pagination: bool = True,
         after_id: Optional[str] = None,
         start_index: Optional[int] = None,
+        parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
+        only_ids: bool = False,
     ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
         """Retrieve AP items and convert them to XMPP items
 
@@ -892,9 +996,18 @@
         @param start_index: start retrieving items from the one with given index
             Due to ActivityStream Collection Paging limitations, this is inefficient and
             all pages before the requested index will be retrieved to count items.
+        @param parser: method to use to parse AP items and get XMPP item elements
+            if None, use default generic parser
+        @param only_ids: if True, only retrieve items IDs
+            Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
+            sufficient in some use cases (e.g. when retrieving following/followers
+            collections)
         @return: XMPP Pubsub items and corresponding RSM Response
             Items are always returned in chronological order in the result
         """
+        if parser is None:
+            parser = self.apItem2Elt
+
         rsm_resp: Dict[str, Union[bool, int]] = {}
         try:
             count = collection["totalItems"]
@@ -929,7 +1042,9 @@
                 retrieved_items = 0
                 current_page = collection["last"]
                 while retrieved_items < count:
-                    page_data, items = await self.parseAPPage(current_page)
+                    page_data, items = await self.parseAPPage(
+                        current_page, parser, only_ids
+                    )
                     if not items:
                         log.warning(f"found an empty AP page at {current_page}")
                         return [], rsm_resp
@@ -963,7 +1078,7 @@
         found_after_id = False
 
         while retrieved_items < count:
-            __, page_items = await self.parseAPPage(page)
+            __, page_items = await self.parseAPPage(page, parser, only_ids)
             if not page_items:
                 break
             retrieved_items += len(page_items)
@@ -1019,40 +1134,6 @@
 
         return items, rsm.RSMResponse(**rsm_resp)
 
-    async def parseAPPage(
-        self,
-        page: Union[str, dict]
-    ) -> Tuple[dict, List[domish.Element]]:
-        """Convert AP objects from an AP page to XMPP items
-
-        @param page: Can be either url linking and AP page, or the page data directly
-        @return: page data, pubsub items
-        """
-        page_data = await self.apGetObject(page)
-        if page_data is None:
-            log.warning('No data found in collection')
-            return {}, []
-        ap_items = await self.apGetList(page_data, "orderedItems")
-        if ap_items is None:
-            ap_items = await self.apGetList(page_data, "items")
-            if not ap_items:
-                log.warning(f'No item field found in collection: {page_data!r}')
-                return page_data, []
-            else:
-                log.warning(
-                    "Items are not ordered, this is not spec compliant"
-                )
-        items = []
-        # AP Collections are in antichronological order, but we expect chronological in
-        # Pubsub, thus we reverse it
-        for ap_item in reversed(ap_items):
-            try:
-                items.append(await self.apItem2Elt(ap_item))
-            except (exceptions.DataError, NotImplementedError, error.StanzaError):
-                continue
-
-        return page_data, items
-
     async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
         """Convert AP item to parsed microblog data and corresponding item element"""
         mb_data = await self.apItem2MBdata(ap_item)
@@ -1067,6 +1148,44 @@
         __, item_elt = await self.apItem2MbDataAndElt(ap_item)
         return item_elt
 
+    async def parseAPPage(
+        self,
+        page: Union[str, dict],
+        parser: Callable[[dict], Awaitable[domish.Element]],
+        only_ids: bool = False
+    ) -> Tuple[dict, List[domish.Element]]:
+        """Convert AP objects from an AP page to XMPP items
+
+        @param page: Can be either url linking and AP page, or the page data directly
+        @param parser: method to use to parse AP items and get XMPP item elements
+        @param only_ids: if True, only retrieve items IDs
+        @return: page data, pubsub items
+        """
+        page_data = await self.apGetObject(page)
+        if page_data is None:
+            log.warning('No data found in collection')
+            return {}, []
+        ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids)
+        if ap_items is None:
+            ap_items = await self.apGetList(page_data, "items", only_ids=only_ids)
+            if not ap_items:
+                log.warning(f'No item field found in collection: {page_data!r}')
+                return page_data, []
+            else:
+                log.warning(
+                    "Items are not ordered, this is not spec compliant"
+                )
+        items = []
+        # AP Collections are in antichronological order, but we expect chronological in
+        # Pubsub, thus we reverse it
+        for ap_item in reversed(ap_items):
+            try:
+                items.append(await parser(ap_item))
+            except (exceptions.DataError, NotImplementedError, error.StanzaError):
+                continue
+
+        return page_data, items
+
     async def getCommentsNodes(
         self,
         item_id: str,
@@ -1197,8 +1316,79 @@
 
         return mb_data
 
+    async def getReplyToIdFromXMPPNode(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        parent_item: str,
+        mb_data: dict
+    ) -> str:
+        """Get URL to use for ``inReplyTo`` field in AP item.
+
+        There is currently no way to know the parent service of a comment with XEP-0277.
+        To work around that, we try to check if we have this item in the cache (we
+        should). If there is more that one item with this ID, we first try to find one
+        with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.
+
+        @param ap_account: AP account corresponding to the publication author
+        @param parent_item: ID of the node where the publication this item is replying to
+             has been posted
+        @param mb_data: microblog data of the publication
+        @return: URL to use in ``inReplyTo`` field
+        """
+        # FIXME: propose a protoXEP to properly get parent item, node and service
+
+        found_items = await self.host.memory.storage.searchPubsubItems({
+            "profiles": [client.profile],
+            "names": [parent_item]
+        })
+        if not found_items:
+            log.warning(f"parent item {parent_item!r} not found in cache")
+            parent_ap_account = ap_account
+        elif len(found_items) == 1:
+            cached_node = found_items[0].node
+            parent_ap_account = await self.getAPAccountFromJidAndNode(
+                cached_node.service,
+                cached_node.name
+            )
+        else:
+            # we found several cached item with given ID, we check if there is one
+            # corresponding to this author
+            try:
+                author = jid.JID(mb_data["author_jid"]).userhostJID()
+                cached_item = next(
+                    i for i in found_items
+                    if jid.JID(i.data["publisher"]).userhostJID()
+                    == author
+                )
+            except StopIteration:
+                # no item corresponding to this author, we use ap_account
+                log.warning(
+                    "Can't find a single cached item for parent item "
+                    f"{parent_item!r}"
+                )
+                parent_ap_account = ap_account
+            else:
+                cached_node = cached_item.node
+                parent_ap_account = await self.getAPAccountFromJidAndNode(
+                    cached_node.service,
+                    cached_node.name
+                )
+
+        return self.buildAPURL(
+            TYPE_ITEM, parent_ap_account, parent_item
+        )
+
     async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
         """Convert Libervia Microblog Data to ActivityPub item"""
+        try:
+            node = mb_data["node"]
+            service = jid.JID(mb_data["service"])
+        except KeyError:
+            # node and service must always be specified when this method is used
+            raise exceptions.InternalError(
+                "node or service is missing in mb_data"
+            )
         if not mb_data.get("id"):
             mb_data["id"] = shortuuid.uuid()
         if not mb_data.get("author_jid"):
@@ -1209,21 +1399,48 @@
         )
         url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
         url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
-        return {
+        ap_object = {
+            "id": url_item,
+            "type": "Note",
+            "published": utils.xmpp_date(mb_data["published"]),
+            "attributedTo": url_actor,
+            "content": mb_data.get("content_xhtml") or mb_data["content"],
+            "to": ["https://www.w3.org/ns/activitystreams#Public"],
+        }
+
+        target_ap_account = self._e.unescape(service.user)
+        actor_data = await self.getAPActorDataFromAccount(target_ap_account)
+        followers = actor_data.get("followers")
+        if followers:
+            ap_object["cc"] = [followers]
+
+        ap_item = {
             "@context": "https://www.w3.org/ns/activitystreams",
             "id": url_item,
             "type": "Create",
             "actor": url_actor,
 
-            "object": {
-                "id": url_item,
-                "type": "Note",
-                "published": utils.xmpp_date(mb_data["published"]),
-                "attributedTo": url_actor,
-                "content": mb_data.get("content_xhtml") or mb_data["content"],
-                "to": "https://www.w3.org/ns/activitystreams#Public"
-            }
+            "object": ap_object
         }
+        language = mb_data.get("language")
+        if language:
+            ap_object["contentMap"] = {language: ap_object["content"]}
+        if self._m.isCommentNode(node):
+            parent_item = self._m.getParentItem(node)
+            if service.host == self.client.jid.userhost():
+                # the publication is on a virtual node (i.e. an XMPP node managed by
+                # this gateway and linking to an ActivityPub actor)
+                ap_object["inReplyTo"] = parent_item
+            else:
+                # the publication is from a followed real XMPP node
+                ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
+                    client,
+                    ap_account,
+                    parent_item,
+                    mb_data
+                )
+
+        return ap_item
 
     async def publishMessage(
         self,
@@ -1264,6 +1481,56 @@
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
+    async def newReplyToXMPPItem(
+        self,
+        client: SatXMPPEntity,
+        ap_item: dict,
+    ) -> None:
+        """We got an AP item which is a reply to an XMPP item"""
+        in_reply_to = ap_item["inReplyTo"]
+        url_type, url_args = self.parseAPURL(in_reply_to)
+        if url_type != "item":
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with an unexpected URL "
+                f"type({url_type!r}):\n{pformat(ap_item)}"
+            )
+            return
+        try:
+            parent_item_account, parent_item_id = url_args[0].split("/", 1)
+        except (IndexError, ValueError):
+            log.warning(
+                "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
+                f"({in_reply_to!r}):\n{pformat(ap_item)}"
+            )
+            return
+        parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account)
+        if parent_item_node is None:
+            parent_item_node = self._m.namespace
+        items, __ = await self._p.getItems(
+            client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
+        )
+        try:
+            parent_item_elt = items[0]
+        except IndexError:
+            log.warning(
+                f"Can't find parent item at {parent_item_service} (node "
+                f"{parent_item_node!r})\n{pformat(ap_item)}")
+            return
+        parent_item_parsed = await self._m.item2mbdata(
+            client, parent_item_elt, parent_item_service, parent_item_node
+        )
+        try:
+            comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
+            comment_node = parent_item_parsed["comments"][0]["node"]
+        except (KeyError, IndexError):
+            # we don't have a comment node set for this item
+            from sat.tools.xml_tools import ppElt
+            log.info(f"{ppElt(parent_item_elt.toXml())}")
+            raise NotImplemented()
+        else:
+            __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+            await self._p.publish(client, comment_service, comment_node, [item_elt])
+
     async def newAPItem(
         self,
         client: SatXMPPEntity,
@@ -1279,8 +1546,15 @@
         """
         service = client.jid
         in_reply_to = item.get("inReplyTo")
+        if in_reply_to and isinstance(in_reply_to, list):
+            in_reply_to = in_reply_to[0]
         if in_reply_to and isinstance(in_reply_to, str):
-            # this item is a reply, we use or create a corresponding node for comments
+            if self.isLocalURL(in_reply_to):
+                # this is a reply to an XMPP item
+                return await self.newReplyToXMPPItem(client, item)
+
+            # this item is a reply to an AP item, we use or create a corresponding node
+            # for comments
             parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
             node = parent_node or node
             cached_node = await self.host.memory.storage.getPubsubNode(
@@ -1312,7 +1586,8 @@
             )
             if cached_node is None:
                 log.warning(
-                    f"Received item in unknown node {node!r} at {service}\n{item}"
+                    f"Received item in unknown node {node!r} at {service}. This may be "
+                    f"due to a cache purge. We synchronise the node\n{item}"
 
                 )
                 return