diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3784:efc34a89e70b

comp AP gateway: message conversion: Convert direct AP items to XMPP `<message>` stanzas, and vice versa. Documentation will follow soon to explain the behaviour. rel 366
author Goffi <goffi@goffi.org>
date Tue, 24 May 2022 17:57:36 +0200
parents 125c7043b277
children 865167c34b82
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue May 24 17:49:14 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue May 24 17:57:36 2022 +0200
@@ -23,7 +23,9 @@
 from pathlib import Path
 from pprint import pformat
 import re
-from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload
+from typing import (
+    Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
+)
 from urllib import parse
 
 from cryptography.exceptions import InvalidSignature
@@ -65,6 +67,9 @@
     MEDIA_TYPE_AP,
     TYPE_ACTOR,
     TYPE_ITEM,
+    TYPE_FOLLOWERS,
+    NS_AP_PUBLIC,
+    PUBLIC_TUPLE
 )
 from .http_server import HTTPServer
 from .pubsub_service import APPubsubService
@@ -80,7 +85,9 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"],
+    C.PI_DEPENDENCIES: [
+        "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES"
+    ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
     C.PI_HANDLER: C.BOOL_TRUE,
@@ -109,6 +116,7 @@
         self._p.addManagedNode(
             "", items_cb=self._itemsReceived
         )
+        self._t = host.plugins["TEXT_SYNTAXES"]
         self.pubsub_service = APPubsubService(self)
 
         host.bridge.addMethod(
@@ -218,6 +226,9 @@
         self.client = client
         await self.init(client)
 
+    def profileConnected(self, client):
+        client.xmlstream.addObserver("/message/body", self.onMessage)
+
     async def _itemsReceived(self, client, itemsEvent):
         """Callback called when pubsub items are received
 
@@ -455,6 +466,13 @@
         if not node or node == self._m.namespace:
             node = None
 
+        if self.client is None:
+            raise exceptions.InternalError("Client is not set yet")
+
+        if jid_.host == self.client.jid.userhost():
+            # this is an proxy JId to an AP Actor
+            return self._e.unescape(jid_.user)
+
         if node and not jid_.user and not self.mustEncode(node):
             is_pubsub = await self.isPubsub(jid_)
             # when we have a pubsub service, the user part can be used to set the node
@@ -682,6 +700,11 @@
         return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
 
     @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getActorData(self, actor_id) -> dict:
+        """Retrieve actor data with LRU cache"""
+        return await self.apGet(actor_id)
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
     async def getActorPubKeyData(
         self,
         actor_id: str
@@ -692,7 +715,7 @@
         @return: key_id, owner and public_key
         @raise KeyError: publicKey is missing from actor data
         """
-        actor_data = await self.apGet(actor_id)
+        actor_data = await self.getActorData(actor_id)
         pub_key_data = actor_data["publicKey"]
         key_id = pub_key_data["id"]
         owner = pub_key_data["owner"]
@@ -901,7 +924,7 @@
         @return: Actor ID (which is an URL)
         """
         if account.count("@") != 1 or "/" in account:
-            raise ValueError("Invalid account: {account!r}")
+            raise ValueError(f"Invalid account: {account!r}")
         host = account.split("@")[1]
         try:
             finger_data = await treq.json_content(await treq.get(
@@ -909,7 +932,7 @@
                 f"resource=acct:{parse.quote_plus(account)}",
             ))
         except Exception as e:
-            raise exceptions.DataError(f"Can't get webfinger data: {e}")
+            raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
         for link in finger_data.get("links", []):
             if (
                 link.get("type") == "application/activity+json"
@@ -935,10 +958,9 @@
         href = await self.getAPActorIdFromAccount(account)
         return await self.apGet(href)
 
-    @async_lru(maxsize=LRU_MAX_SIZE)
     async def getAPInboxFromId(self, actor_id: str) -> str:
         """Retrieve inbox of an actor_id"""
-        data = await self.apGet(actor_id)
+        data = await self.getActorData(actor_id)
         return data["inbox"]
 
     @async_lru(maxsize=LRU_MAX_SIZE)
@@ -948,7 +970,7 @@
         @param actor_id: AP ID of the actor (URL to the actor data)
         """
         url_parsed = parse.urlparse(actor_id)
-        actor_data = await self.apGet(actor_id)
+        actor_data = await self.getActorData(actor_id)
         username = actor_data.get("preferredUsername")
         if not username:
             raise exceptions.DataError(
@@ -1269,6 +1291,13 @@
         else:
             mb_data["language"] = language
             mb_data["content_xhtml"] = content_xhtml
+            if not mb_data.get("content"):
+                mb_data["content"] = await self._t.convert(
+                    content_xhtml,
+                    self._t.SYNTAX_XHTML,
+                    self._t.SYNTAX_TEXT,
+                    False,
+                )
 
         # author
         if is_activity:
@@ -1379,20 +1408,29 @@
             TYPE_ITEM, parent_ap_account, parent_item
         )
 
-    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
-        """Convert Libervia Microblog Data to ActivityPub item"""
-        try:
-            node = mb_data["node"]
-            service = jid.JID(mb_data["service"])
-        except KeyError:
-            # node and service must always be specified when this method is used
-            raise exceptions.InternalError(
-                "node or service is missing in mb_data"
-            )
+    async def mbdata2APitem(
+        self,
+        client: SatXMPPEntity,
+        mb_data: dict,
+        public=True
+    ) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item
+
+        @param mb_data: microblog data (as used in plugin XEP-0277) to convert
+            If ``public`` is True, ``service`` and ``node`` keys must be set.
+            If ``published`` is not set, current datetime will be used
+        @param public: True if the message is not a private/direct one
+            if True, the AP Item will be marked as public, and AP followers of target AP
+            account (which retrieve from ``service``) will be put in ``cc``.
+            ``inReplyTo`` will also be set if suitable
+            if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
+            This is usually used for direct messages.
+        @return: AP item
+        """
         if not mb_data.get("id"):
             mb_data["id"] = shortuuid.uuid()
         if not mb_data.get("author_jid"):
-            mb_data["author_jid"] = client.jid.full()
+            mb_data["author_jid"] = client.jid.userhost()
         ap_account = await self.getAPAccountFromJidAndNode(
             jid.JID(mb_data["author_jid"]),
             None
@@ -1402,43 +1440,59 @@
         ap_object = {
             "id": url_item,
             "type": "Note",
-            "published": utils.xmpp_date(mb_data["published"]),
+            "published": utils.xmpp_date(mb_data.get("published")),
             "attributedTo": url_actor,
             "content": mb_data.get("content_xhtml") or mb_data["content"],
-            "to": ["https://www.w3.org/ns/activitystreams#Public"],
         }
 
-        target_ap_account = self._e.unescape(service.user)
-        actor_data = await self.getAPActorDataFromAccount(target_ap_account)
-        followers = actor_data.get("followers")
-        if followers:
-            ap_object["cc"] = [followers]
+        language = mb_data.get("language")
+        if language:
+            ap_object["contentMap"] = {language: ap_object["content"]}
+
+        if public:
+            ap_object["to"] = [NS_AP_PUBLIC]
+            try:
+                node = mb_data["node"]
+                service = jid.JID(mb_data["service"])
+            except KeyError:
+                # node and service must always be specified when this method is used
+                raise exceptions.InternalError(
+                    "node or service is missing in mb_data"
+                )
+            target_ap_account = await self.getAPAccountFromJidAndNode(
+                service, node
+            )
+            if service.host == self.client.jid.userhost:
+                # service is a proxy JID for AP account
+                actor_data = await self.getAPActorDataFromAccount(target_ap_account)
+                followers = actor_data.get("followers")
+            else:
+                # service is a real XMPP entity
+                followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account)
+            if followers:
+                ap_object["cc"] = [followers]
+            if self._m.isCommentNode(node):
+                parent_item = self._m.getParentItem(node)
+                if service.host == self.client.jid.userhost():
+                    # the publication is on a virtual node (i.e. an XMPP node managed by
+                    # this gateway and linking to an ActivityPub actor)
+                    ap_object["inReplyTo"] = parent_item
+                else:
+                    # the publication is from a followed real XMPP node
+                    ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
+                        client,
+                        ap_account,
+                        parent_item,
+                        mb_data
+                    )
 
         ap_item = {
             "@context": "https://www.w3.org/ns/activitystreams",
             "id": url_item,
             "type": "Create",
             "actor": url_actor,
-
             "object": ap_object
         }
-        language = mb_data.get("language")
-        if language:
-            ap_object["contentMap"] = {language: ap_object["content"]}
-        if self._m.isCommentNode(node):
-            parent_item = self._m.getParentItem(node)
-            if service.host == self.client.jid.userhost():
-                # the publication is on a virtual node (i.e. an XMPP node managed by
-                # this gateway and linking to an ActivityPub actor)
-                ap_object["inReplyTo"] = parent_item
-            else:
-                # the publication is from a followed real XMPP node
-                ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
-                    client,
-                    ap_account,
-                    parent_item,
-                    mb_data
-                )
 
         return ap_item
 
@@ -1481,6 +1535,39 @@
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
+    @utils.ensure_deferred
+    async def onMessage(self, message_elt: domish.Element) -> None:
+        """Called when a XMPP message is received"""
+        if not self.client:
+            log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
+            return
+        mess_type = message_elt.getAttribute("type")
+        if mess_type and mess_type not in ("chat", "normal"):
+            log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}")
+            return
+        from_jid = jid.JID(message_elt["from"])
+        if not self.isLocal(from_jid):
+            log.warning(f"ignoring non local message: {message_elt.toXml()}")
+            return
+        to_jid = jid.JID(message_elt["to"])
+        if not to_jid.user:
+            log.warning(
+                f"ignoring message addressed to gateway itself: {message_elt.toXml()}"
+            )
+            return
+
+        actor_account = self._e.unescape(to_jid.user)
+        actor_id = await self.getAPActorIdFromAccount(actor_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+
+        mb_data = {
+            "content": str(message_elt.body),
+        }
+        client = self.client.getVirtualClient(from_jid)
+        ap_item = await self.mbdata2APitem(client, mb_data, public=False)
+        ap_item["object"]["to"] = ap_item["to"] = [actor_id]
+        await self.signAndPost(inbox, ap_item["actor"], ap_item)
+
     async def newReplyToXMPPItem(
         self,
         client: SatXMPPEntity,
@@ -1544,8 +1631,87 @@
         @param node: XMPP pubsub node
         @param item: AP object payload
         """
+        targets: Set[str] = set()
+        is_public = False
+        # TODO: handle "audience"
+        for key in ("to", "bto", "cc", "bcc"):
+            values = item.get(key)
+            if not values:
+                continue
+            if isinstance(values, str):
+                values = [values]
+            for value in values:
+                if value in PUBLIC_TUPLE:
+                    is_public = True
+                    continue
+                if not value:
+                    continue
+                if not self.isLocalURL(value):
+                    continue
+                targets.add(value)
+
+        targets_types = {self.parseAPURL(t)[0] for t in targets}
+        if not is_public and targets_types == {TYPE_ACTOR}:
+            # this is a direct message
+            await self.handleMessageAPItem(
+                client, targets, destinee, item
+            )
+        else:
+            await self.handlePubsubAPItem(
+                client, targets, destinee, node, item, is_public
+            )
+
+    async def handleMessageAPItem(
+        self,
+        client: SatXMPPEntity,
+        targets: Set[str],
+        destinee: Optional[jid.JID],
+        item: dict,
+    ) -> None:
+        """Parse and deliver direct AP items translating to XMPP messages
+
+        @param targets: actors where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param item: AP object payload
+        """
+        targets_jids = {await self.getJIDFromId(t) for t in targets}
+        if destinee is not None:
+            targets_jids.add(destinee)
+        mb_data = await self.apItem2MBdata(item)
+        defer_l = []
+        for target_jid in targets_jids:
+            defer_l.append(
+                client.sendMessage(
+                    target_jid,
+                    {'': mb_data.get("content", "")},
+                    mb_data.get("title"),
+
+                )
+            )
+        await defer.DeferredList(defer_l)
+
+    async def handlePubsubAPItem(
+        self,
+        client: SatXMPPEntity,
+        targets: Set[str],
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+        public: bool
+    ) -> None:
+        """Analyse, cache and deliver AP items translating to Pubsub
+
+        @param targets: actors/collections where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        @param public: True if the item is public
+        """
+        # XXX: "public" is not used for now
+
         service = client.jid
         in_reply_to = item.get("inReplyTo")
+
         if in_reply_to and isinstance(in_reply_to, list):
             in_reply_to = in_reply_to[0]
         if in_reply_to and isinstance(in_reply_to, str):
@@ -1607,3 +1773,4 @@
                 node,
                 [(subscription.subscriber, None, [item_elt])]
             )
+