changeset 3803:d5f343939239

component AP gateway: message retractation => AP deletion Convert XEP-0424 message retractation request to suitable AP delete activity. The message workflow and its triggers are now used instead of a direct observer, as it is now possible to do so with component, and this let other plugin to parse and eventually update metadata. rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents 983df907d456
children 36b167ddbfca
files sat/plugins/plugin_comp_ap_gateway/__init__.py
diffstat 1 files changed, 122 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri Jun 17 14:15:23 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Fri Jun 17 14:15:23 2022 +0200
@@ -87,7 +87,8 @@
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
     C.PI_DEPENDENCIES: [
-        "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES"
+        "XEP-0060", "XEP-0106", "XEP-0277", "XEP-0329", "XEP-0424", "XEP-0465",
+        "PUBSUB_CACHE", "TEXT_SYNTAXES"
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
@@ -112,6 +113,7 @@
         self._m = host.plugins["XEP-0277"]
         self._p = host.plugins["XEP-0060"]
         self._e = host.plugins["XEP-0106"]
+        self._r = host.plugins["XEP-0424"]
         self._pps = host.plugins["XEP-0465"]
         self._c = host.plugins["PUBSUB_CACHE"]
         self._p.addManagedNode(
@@ -119,6 +121,8 @@
         )
         self._t = host.plugins["TEXT_SYNTAXES"]
         self.pubsub_service = APPubsubService(self)
+        host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
+        host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)
 
         host.bridge.addMethod(
             "APSend",
@@ -227,9 +231,6 @@
         self.client = client
         await self.init(client)
 
-    def profileConnected(self, client):
-        client.xmlstream.addObserver("/message/body", self.onMessage)
-
     async def _itemsReceived(self, client, itemsEvent):
         """Callback called when pubsub items are received
 
@@ -872,19 +873,9 @@
                 ap_item = await self.mbdata2APitem(client, mb_data)
                 url_actor = ap_item["object"]["attributedTo"]
             elif item.name == "retract":
-                author_account = await self.getAPAccountFromJidAndNode(client.jid, node)
-                author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account)
-                url_item = self.buildAPURL(TYPE_ITEM, author_account, item["id"])
-                ap_item = self.createActivity(
-                    "Delete",
-                    author_actor_id,
-                    {
-                        "id": url_item,
-                        "type": TYPE_TOMBSTONE
-                    }
+                url_actor, ap_item = await self.apDeleteItem(
+                    client.jid, node, item["id"]
                 )
-                ap_item["to"] = [NS_AP_PUBLIC]
-                url_actor = author_actor_id
             else:
                 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
             resp = await self.signAndPost(inbox, url_actor, ap_item)
@@ -925,7 +916,7 @@
         )
         if resp.code >= 400:
             text = await resp.text()
-            log.warning(f"POST request to {url} failed: {text}")
+            log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
         return resp
 
     def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
@@ -1553,38 +1544,133 @@
         if resp.code != 202:
             raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
 
-    @utils.ensure_deferred
-    async def onMessage(self, message_elt: domish.Element) -> None:
-        """Called when a XMPP message is received"""
+    async def apDeleteItem(
+        self,
+        jid_: jid.JID,
+        node: Optional[str],
+        item_id: str,
+        public: bool = True
+    ) -> Tuple[str, Dict[str, Any]]:
+        """Build activity to delete an AP item
+
+        @param jid_: JID of the entity deleting an item
+        @param node: node where the item is deleted
+            None if it's microblog or a message
+        @param item_id: ID of the item to delete
+            it's the Pubsub ID or message's origin ID
+        @param public: if True, the activity will be addressed to public namespace
+        @return: actor_id of the entity deleting the item, activity to send
+        """
+        author_account = await self.getAPAccountFromJidAndNode(jid_, node)
+        author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account)
+        url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id)
+        ap_item = self.createActivity(
+            "Delete",
+            author_actor_id,
+            {
+                "id": url_item,
+                "type": TYPE_TOMBSTONE
+            }
+        )
+        if public:
+            ap_item["to"] = [NS_AP_PUBLIC]
+        url_actor = author_actor_id
+        return url_actor, ap_item
+
+    def _messageReceivedTrigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """add the gateway workflow on post treatment"""
         if not self.client:
             log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
-            return
-        mess_type = message_elt.getAttribute("type")
-        if mess_type and mess_type not in ("chat", "normal"):
-            log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}")
-            return
-        from_jid = jid.JID(message_elt["from"])
-        if not self.isLocal(from_jid):
-            log.warning(f"ignoring non local message: {message_elt.toXml()}")
-            return
-        to_jid = jid.JID(message_elt["to"])
-        if not to_jid.user:
+            return True
+        post_treat.addCallback(
+            lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data))
+        )
+        return True
+
+    async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict:
+        """Called once message has been parsed
+
+        this method handle the conversion to AP items and posting
+        """
+        if client != self.client:
+            return mess_data
+        if mess_data["type"] not in ("chat", "normal"):
+            log.warning(f"ignoring message with unexpected type: {mess_data['xml'].toXml()}")
+            return mess_data
+        if not self.isLocal(mess_data["from"]):
+            log.warning(f"ignoring non local message: {mess_data['xml'].toXml()}")
+            return mess_data
+        if not mess_data["to"].user:
             log.warning(
-                f"ignoring message addressed to gateway itself: {message_elt.toXml()}"
+                f"ignoring message addressed to gateway itself: {mess_data['xml'].toXml()}"
             )
-            return
+            return mess_data
 
-        actor_account = self._e.unescape(to_jid.user)
+        actor_account = self._e.unescape(mess_data["to"].user)
         actor_id = await self.getAPActorIdFromAccount(actor_account)
         inbox = await self.getAPInboxFromId(actor_id)
 
+        try:
+            language, message = next(iter(mess_data["message"].items()))
+        except (KeyError, StopIteration):
+            log.warning(f"ignoring empty message: {mess_data}")
+            return mess_data
+
         mb_data = {
-            "content": str(message_elt.body),
+            "content": message,
         }
-        client = self.client.getVirtualClient(from_jid)
+        if language:
+            mb_data["language"] = language
+        origin_id = mess_data["extra"].get("origin_id")
+        if origin_id:
+            # we need to use origin ID when present to be able to retract the message
+            mb_data["id"] = origin_id
+        client = self.client.getVirtualClient(mess_data["from"])
         ap_item = await self.mbdata2APitem(client, mb_data, public=False)
         ap_item["object"]["to"] = ap_item["to"] = [actor_id]
         await self.signAndPost(inbox, ap_item["actor"], ap_item)
+        return mess_data
+
+    async def _onMessageRetract(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        retract_elt: domish.Element,
+        fastened_elts
+    ) -> bool:
+        if client != self.client:
+            return True
+        from_jid = jid.JID(message_elt["from"])
+        if not self.isLocal(from_jid):
+            log.debug(
+                f"ignoring retract request from non local jid {from_jid}"
+            )
+            return False
+        to_jid = jid.JID(message_elt["to"])
+        if (to_jid.host != self.client.jid.full() or not to_jid.user):
+            # to_jid should be a virtual JID from this gateway
+            raise exceptions.InternalError(
+                f"Invalid destinee's JID: {to_jid.full()}"
+            )
+        ap_account = self._e.unescape(to_jid.user)
+        actor_id = await self.getAPActorIdFromAccount(ap_account)
+        inbox = await self.getAPInboxFromId(actor_id)
+        url_actor, ap_item = await self.apDeleteItem(
+            from_jid.userhostJID(), None, fastened_elts.id, public=False
+        )
+        resp = await self.signAndPost(inbox, url_actor, ap_item)
+        if resp.code >= 300:
+            text = await resp.text()
+            log.warning(
+                f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
+                f"{pformat(ap_item)}"
+            )
+        return False
 
     async def newReplyToXMPPItem(
         self,