changeset 3784:efc34a89e70b

comp AP gateway: message conversion: Convert direct AP items to XMPP `<message>` stanzas, and vice versa. Documentation will follow soon to explain the behaviour. rel 366
author Goffi <goffi@goffi.org>
date Tue, 24 May 2022 17:57:36 +0200
parents fedbf7aade11
children 0b54be42d0aa
files sat/core/xmpp.py sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py
diffstat 3 files changed, 252 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/xmpp.py	Tue May 24 17:49:14 2022 +0200
+++ b/sat/core/xmpp.py	Tue May 24 17:57:36 2022 +0200
@@ -612,6 +612,43 @@
         """
         raise NotImplementedError
 
+    def send(self, obj):
+        # original send method accept string
+        # but we restrict to domish.Element to make trigger treatments easier
+        assert isinstance(obj, domish.Element)
+        # XXX: this trigger is the last one before sending stanza on wire
+        #      it is intended for things like end 2 end encryption.
+        #      *DO NOT* cancel (i.e. return False) without very good reason
+        #      (out of band transmission for instance).
+        #      e2e should have a priority of 0 here, and out of band transmission
+        #      a lower priority
+        #  FIXME: trigger not used yet, can be uncommented when e2e full stanza
+        #         encryption is implemented
+        #  if not self.host_app.trigger.point("send", self, obj):
+        #      return
+        super().send(obj)
+
+    @defer.inlineCallbacks
+    def sendMessageData(self, mess_data):
+        """Convenient method to send message data to stream
+
+        This method will send mess_data[u'xml'] to stream, but a trigger is there
+        The trigger can't be cancelled, it's a good place for e2e encryption which
+        don't handle full stanza encryption
+        This trigger can return a Deferred (it's an asyncPoint)
+        @param mess_data(dict): message data as constructed by onMessage workflow
+        @return (dict): mess_data (so it can be used in a deferred chain)
+        """
+        # XXX: This is the last trigger before u"send" (last but one globally)
+        #      for sending message.
+        #      This is intented for e2e encryption which doesn't do full stanza
+        #      encryption (e.g. OTR)
+        #      This trigger point can't cancel the method
+        yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data,
+            triggers_no_cancel=True)
+        self.send(mess_data["xml"])
+        defer.returnValue(mess_data)
+
     def sendMessage(
             self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None,
             no_trigger=False):
@@ -659,8 +696,7 @@
             # we try to guess the type
             if data["subject"]:
                 data["type"] = C.MESS_TYPE_NORMAL
-            elif not data["to"].resource:  # if to JID has a resource,
-                                           # the type is not 'groupchat'
+            elif not data["to"].resource:
                 # we may have a groupchat message, we check if the we know this jid
                 try:
                     entity_type = self.host_app.memory.getEntityDatum(
@@ -675,8 +711,7 @@
                 else:
                     data["type"] = C.MESS_TYPE_CHAT
             else:
-                data["type"] == C.MESS_TYPE_CHAT
-            data["type"] == C.MESS_TYPE_CHAT if data["subject"] else C.MESS_TYPE_NORMAL
+                data["type"] = C.MESS_TYPE_CHAT
 
         # FIXME: send_only is used by libervia's OTR plugin to avoid
         #        the triggers from frontend, and no_trigger do the same
@@ -895,43 +930,6 @@
         )
         post_xml_treatments.addCallback(self.messageSendToBridge)
 
-    def send(self, obj):
-        # original send method accept string
-        # but we restrict to domish.Element to make trigger treatments easier
-        assert isinstance(obj, domish.Element)
-        # XXX: this trigger is the last one before sending stanza on wire
-        #      it is intended for things like end 2 end encryption.
-        #      *DO NOT* cancel (i.e. return False) without very good reason
-        #      (out of band transmission for instance).
-        #      e2e should have a priority of 0 here, and out of band transmission
-        #      a lower priority
-        #  FIXME: trigger not used yet, can be uncommented when e2e full stanza
-        #         encryption is implemented
-        #  if not self.host_app.trigger.point("send", self, obj):
-        #      return
-        super(SatXMPPClient, self).send(obj)
-
-    @defer.inlineCallbacks
-    def sendMessageData(self, mess_data):
-        """Convenient method to send message data to stream
-
-        This method will send mess_data[u'xml'] to stream, but a trigger is there
-        The trigger can't be cancelled, it's a good place for e2e encryption which
-        don't handle full stanza encryption
-        This trigger can return a Deferred (it's an asyncPoint)
-        @param mess_data(dict): message data as constructed by onMessage workflow
-        @return (dict): mess_data (so it can be used in a deferred chain)
-        """
-        # XXX: This is the last trigger before u"send" (last but one globally)
-        #      for sending message.
-        #      This is intented for e2e encryption which doesn't do full stanza
-        #      encryption (e.g. OTR)
-        #      This trigger point can't cancel the method
-        yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data,
-            triggers_no_cancel=True)
-        self.send(mess_data["xml"])
-        defer.returnValue(mess_data)
-
     def feedback(self, to_jid, message, extra=None):
         """Send message to frontends
 
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue May 24 17:49:14 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Tue May 24 17:57:36 2022 +0200
@@ -23,7 +23,9 @@
 from pathlib import Path
 from pprint import pformat
 import re
-from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload
+from typing import (
+    Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
+)
 from urllib import parse
 
 from cryptography.exceptions import InvalidSignature
@@ -65,6 +67,9 @@
     MEDIA_TYPE_AP,
     TYPE_ACTOR,
     TYPE_ITEM,
+    TYPE_FOLLOWERS,
+    NS_AP_PUBLIC,
+    PUBLIC_TUPLE
 )
 from .http_server import HTTPServer
 from .pubsub_service import APPubsubService
@@ -80,7 +85,9 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"],
+    C.PI_DEPENDENCIES: [
+        "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES"
+    ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
     C.PI_HANDLER: C.BOOL_TRUE,
@@ -109,6 +116,7 @@
         self._p.addManagedNode(
             "", items_cb=self._itemsReceived
         )
+        self._t = host.plugins["TEXT_SYNTAXES"]
         self.pubsub_service = APPubsubService(self)
 
         host.bridge.addMethod(
@@ -218,6 +226,9 @@
         self.client = client
         await self.init(client)
 
+    def profileConnected(self, client):
+        client.xmlstream.addObserver("/message/body", self.onMessage)
+
     async def _itemsReceived(self, client, itemsEvent):
         """Callback called when pubsub items are received
 
@@ -455,6 +466,13 @@
         if not node or node == self._m.namespace:
             node = None
 
+        if self.client is None:
+            raise exceptions.InternalError("Client is not set yet")
+
+        if jid_.host == self.client.jid.userhost():
+            # this is an proxy JId to an AP Actor
+            return self._e.unescape(jid_.user)
+
         if node and not jid_.user and not self.mustEncode(node):
             is_pubsub = await self.isPubsub(jid_)
             # when we have a pubsub service, the user part can be used to set the node
@@ -682,6 +700,11 @@
         return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
 
     @async_lru(maxsize=LRU_MAX_SIZE)
+    async def getActorData(self, actor_id) -> dict:
+        """Retrieve actor data with LRU cache"""
+        return await self.apGet(actor_id)
+
+    @async_lru(maxsize=LRU_MAX_SIZE)
     async def getActorPubKeyData(
         self,
         actor_id: str
@@ -692,7 +715,7 @@
         @return: key_id, owner and public_key
         @raise KeyError: publicKey is missing from actor data
         """
-        actor_data = await self.apGet(actor_id)
+        actor_data = await self.getActorData(actor_id)
         pub_key_data = actor_data["publicKey"]
         key_id = pub_key_data["id"]
         owner = pub_key_data["owner"]
@@ -901,7 +924,7 @@
         @return: Actor ID (which is an URL)
         """
         if account.count("@") != 1 or "/" in account:
-            raise ValueError("Invalid account: {account!r}")
+            raise ValueError(f"Invalid account: {account!r}")
         host = account.split("@")[1]
         try:
             finger_data = await treq.json_content(await treq.get(
@@ -909,7 +932,7 @@
                 f"resource=acct:{parse.quote_plus(account)}",
             ))
         except Exception as e:
-            raise exceptions.DataError(f"Can't get webfinger data: {e}")
+            raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
         for link in finger_data.get("links", []):
             if (
                 link.get("type") == "application/activity+json"
@@ -935,10 +958,9 @@
         href = await self.getAPActorIdFromAccount(account)
         return await self.apGet(href)
 
-    @async_lru(maxsize=LRU_MAX_SIZE)
     async def getAPInboxFromId(self, actor_id: str) -> str:
         """Retrieve inbox of an actor_id"""
-        data = await self.apGet(actor_id)
+        data = await self.getActorData(actor_id)
         return data["inbox"]
 
     @async_lru(maxsize=LRU_MAX_SIZE)
@@ -948,7 +970,7 @@
         @param actor_id: AP ID of the actor (URL to the actor data)
         """
         url_parsed = parse.urlparse(actor_id)
-        actor_data = await self.apGet(actor_id)
+        actor_data = await self.getActorData(actor_id)
         username = actor_data.get("preferredUsername")
         if not username:
             raise exceptions.DataError(
@@ -1269,6 +1291,13 @@
         else:
             mb_data["language"] = language
             mb_data["content_xhtml"] = content_xhtml
+            if not mb_data.get("content"):
+                mb_data["content"] = await self._t.convert(
+                    content_xhtml,
+                    self._t.SYNTAX_XHTML,
+                    self._t.SYNTAX_TEXT,
+                    False,
+                )
 
         # author
         if is_activity:
@@ -1379,20 +1408,29 @@
             TYPE_ITEM, parent_ap_account, parent_item
         )
 
-    async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
-        """Convert Libervia Microblog Data to ActivityPub item"""
-        try:
-            node = mb_data["node"]
-            service = jid.JID(mb_data["service"])
-        except KeyError:
-            # node and service must always be specified when this method is used
-            raise exceptions.InternalError(
-                "node or service is missing in mb_data"
-            )
+    async def mbdata2APitem(
+        self,
+        client: SatXMPPEntity,
+        mb_data: dict,
+        public=True
+    ) -> dict:
+        """Convert Libervia Microblog Data to ActivityPub item
+
+        @param mb_data: microblog data (as used in plugin XEP-0277) to convert
+            If ``public`` is True, ``service`` and ``node`` keys must be set.
+            If ``published`` is not set, current datetime will be used
+        @param public: True if the message is not a private/direct one
+            if True, the AP Item will be marked as public, and AP followers of target AP
+            account (which retrieve from ``service``) will be put in ``cc``.
+            ``inReplyTo`` will also be set if suitable
+            if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
+            This is usually used for direct messages.
+        @return: AP item
+        """
         if not mb_data.get("id"):
             mb_data["id"] = shortuuid.uuid()
         if not mb_data.get("author_jid"):
-            mb_data["author_jid"] = client.jid.full()
+            mb_data["author_jid"] = client.jid.userhost()
         ap_account = await self.getAPAccountFromJidAndNode(
             jid.JID(mb_data["author_jid"]),
             None
@@ -1402,43 +1440,59 @@
         ap_object = {
             "id": url_item,
             "type": "Note",
-            "published": utils.xmpp_date(mb_data["published"]),
+            "published": utils.xmpp_date(mb_data.get("published")),
             "attributedTo": url_actor,
             "content": mb_data.get("content_xhtml") or mb_data["content"],
-            "to": ["https://www.w3.org/ns/activitystreams#Public"],
         }
 
-        target_ap_account = self._e.unescape(service.user)
-        actor_data = await self.getAPActorDataFromAccount(target_ap_account)
-        followers = actor_data.get("followers")
-        if followers:
-            ap_object["cc"] = [followers]
+        language = mb_data.get("language")
+        if language:
+            ap_object["contentMap"] = {language: ap_object["content"]}
+
+        if public:
+            ap_object["to"] = [NS_AP_PUBLIC]
+            try:
+                node = mb_data["node"]
+                service = jid.JID(mb_data["service"])
+            except KeyError:
+                # node and service must always be specified when this method is used
+                raise exceptions.InternalError(
+                    "node or service is missing in mb_data"
+                )
+            target_ap_account = await self.getAPAccountFromJidAndNode(
+                service, node
+            )
+            if service.host == self.client.jid.userhost:
+                # service is a proxy JID for AP account
+                actor_data = await self.getAPActorDataFromAccount(target_ap_account)
+                followers = actor_data.get("followers")
+            else:
+                # service is a real XMPP entity
+                followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account)
+            if followers:
+                ap_object["cc"] = [followers]
+            if self._m.isCommentNode(node):
+                parent_item = self._m.getParentItem(node)
+                if service.host == self.client.jid.userhost():
+                    # the publication is on a virtual node (i.e. an XMPP node managed by
+                    # this gateway and linking to an ActivityPub actor)
+                    ap_object["inReplyTo"] = parent_item
+                else:
+                    # the publication is from a followed real XMPP node
+                    ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
+                        client,
+                        ap_account,
+                        parent_item,
+                        mb_data
+                    )
 
         ap_item = {
             "@context": "https://www.w3.org/ns/activitystreams",
             "id": url_item,
             "type": "Create",
             "actor": url_actor,
-
             "object": ap_object
         }
-        language = mb_data.get("language")
-        if language:
-            ap_object["contentMap"] = {language: ap_object["content"]}
-        if self._m.isCommentNode(node):
-            parent_item = self._m.getParentItem(node)
-            if service.host == self.client.jid.userhost():
-                # the publication is on a virtual node (i.e. an XMPP node managed by
-                # this gateway and linking to an ActivityPub actor)
-                ap_object["inReplyTo"] = parent_item
-            else:
-                # the publication is from a followed real XMPP node
-                ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
-                    client,
-                    ap_account,
-                    parent_item,
-                    mb_data
-                )
 
         return ap_item
 
@@ -1481,6 +1535,39 @@
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
+    @utils.ensure_deferred
+    async def onMessage(self, message_elt: domish.Element) -> None:
+        """Called when a XMPP message is received"""
+        if not self.client:
+            log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
+            return
+        mess_type = message_elt.getAttribute("type")
+        if mess_type and mess_type not in ("chat", "normal"):
+            log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}")
+            return
+        from_jid = jid.JID(message_elt["from"])
+        if not self.isLocal(from_jid):
+            log.warning(f"ignoring non local message: {message_elt.toXml()}")
+            return
+        to_jid = jid.JID(message_elt["to"])
+        if not to_jid.user:
+            log.warning(
+                f"ignoring message addressed to gateway itself: {message_elt.toXml()}"
+            )
+            return
+
+        actor_account = self._e.unescape(to_jid.user)
+        actor_id = await self.getAPActorIdFromAccount(actor_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+
+        mb_data = {
+            "content": str(message_elt.body),
+        }
+        client = self.client.getVirtualClient(from_jid)
+        ap_item = await self.mbdata2APitem(client, mb_data, public=False)
+        ap_item["object"]["to"] = ap_item["to"] = [actor_id]
+        await self.signAndPost(inbox, ap_item["actor"], ap_item)
+
     async def newReplyToXMPPItem(
         self,
         client: SatXMPPEntity,
@@ -1544,8 +1631,87 @@
         @param node: XMPP pubsub node
         @param item: AP object payload
         """
+        targets: Set[str] = set()
+        is_public = False
+        # TODO: handle "audience"
+        for key in ("to", "bto", "cc", "bcc"):
+            values = item.get(key)
+            if not values:
+                continue
+            if isinstance(values, str):
+                values = [values]
+            for value in values:
+                if value in PUBLIC_TUPLE:
+                    is_public = True
+                    continue
+                if not value:
+                    continue
+                if not self.isLocalURL(value):
+                    continue
+                targets.add(value)
+
+        targets_types = {self.parseAPURL(t)[0] for t in targets}
+        if not is_public and targets_types == {TYPE_ACTOR}:
+            # this is a direct message
+            await self.handleMessageAPItem(
+                client, targets, destinee, item
+            )
+        else:
+            await self.handlePubsubAPItem(
+                client, targets, destinee, node, item, is_public
+            )
+
+    async def handleMessageAPItem(
+        self,
+        client: SatXMPPEntity,
+        targets: Set[str],
+        destinee: Optional[jid.JID],
+        item: dict,
+    ) -> None:
+        """Parse and deliver direct AP items translating to XMPP messages
+
+        @param targets: actors where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param item: AP object payload
+        """
+        targets_jids = {await self.getJIDFromId(t) for t in targets}
+        if destinee is not None:
+            targets_jids.add(destinee)
+        mb_data = await self.apItem2MBdata(item)
+        defer_l = []
+        for target_jid in targets_jids:
+            defer_l.append(
+                client.sendMessage(
+                    target_jid,
+                    {'': mb_data.get("content", "")},
+                    mb_data.get("title"),
+
+                )
+            )
+        await defer.DeferredList(defer_l)
+
+    async def handlePubsubAPItem(
+        self,
+        client: SatXMPPEntity,
+        targets: Set[str],
+        destinee: Optional[jid.JID],
+        node: str,
+        item: dict,
+        public: bool
+    ) -> None:
+        """Analyse, cache and deliver AP items translating to Pubsub
+
+        @param targets: actors/collections where the item must be delivered
+        @param destinee: jid of the destinee,
+        @param node: XMPP pubsub node
+        @param item: AP object payload
+        @param public: True if the item is public
+        """
+        # XXX: "public" is not used for now
+
         service = client.jid
         in_reply_to = item.get("inReplyTo")
+
         if in_reply_to and isinstance(in_reply_to, list):
             in_reply_to = in_reply_to[0]
         if in_reply_to and isinstance(in_reply_to, str):
@@ -1607,3 +1773,4 @@
                 node,
                 [(subscription.subscriber, None, [item_elt])]
             )
+
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Tue May 24 17:49:14 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Tue May 24 17:57:36 2022 +0200
@@ -29,6 +29,8 @@
 TYPE_ITEM = "item"
 MEDIA_TYPE_AP = "application/activity+json"
 NS_AP_PUBLIC = "https://www.w3.org/ns/activitystreams#Public"
+# 3 values can be used, see https://www.w3.org/TR/activitypub/#public-addressing
+PUBLIC_TUPLE = (NS_AP_PUBLIC, "as:Public", "Public")
 # mapping from AP metadata to microblog data
 AP_MB_MAP = {
     "content": "content_xhtml",