changeset 3366:e09cb08166a3

plugin XEP-0329, core(xmpp): moved `_compParseJids` to `SatXMPPComponent`: This method to retrieve owner and peer JID from an element is generally useful for components, thus it has been moved. The part to retrieve owner JID from local JID has been splitted in its own `getOwnerFromJid` method.
author Goffi <goffi@goffi.org>
date Sun, 20 Sep 2020 14:04:11 +0200
parents 626046010a2d
children 90aee1f2d07c
files sat/core/xmpp.py sat/plugins/plugin_xep_0329.py
diffstat 2 files changed, 46 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/xmpp.py	Sun Sep 20 11:03:24 2020 +0200
+++ b/sat/core/xmpp.py	Sun Sep 20 14:04:11 2020 +0200
@@ -21,6 +21,7 @@
 import calendar
 import uuid
 import mimetypes
+from typing import Tuple
 from urllib.parse import urlparse, unquote
 from functools import partial
 from pathlib import Path
@@ -1059,6 +1060,46 @@
         if self.sendHistory:
             post_xml_treatments.addCallback(self.messageAddToHistory)
 
+    def getOwnerFromJid(self, to_jid: jid.JID) -> jid.JID:
+        """Retrieve "owner" of a component resource from the destination jid of the request
+
+        This method needs plugin XEP-0106 for unescaping, if you use it you must add the
+        plugin to your dependencies.
+        A "user" part must be present in "to_jid" (otherwise, the component itself is addressed)
+        @param to_jid: destination JID of the request
+        """
+        try:
+            unescape = self.host_app.plugins['XEP-0106'].unescape
+        except KeyError:
+            raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner")
+        else:
+            user = unescape(to_jid.user)
+        if '@' in user:
+            # a full jid is specified
+            return jid.JID(user)
+        else:
+            # only user part is specified, we use our own host to build the full jid
+            return jid.JID(None, (user, self.host, None))
+
+    def getOwnerAndPeer(self, iq_elt: domish.Element) -> Tuple[jid.JID, jid.JID]:
+        """Retrieve owner of a component jid, and the jid of the requesting peer
+
+        "owner" is found by either unescaping full jid from node, or by combining node
+        with our host.
+        Peer jid is the requesting jid from the IQ element
+        @param iq_elt: IQ stanza sent from the requested
+        @return: owner and peer JIDs
+        """
+        to_jid = jid.JID(iq_elt['to'])
+        if to_jid.user:
+            owner = self.getOwnerFromJid(to_jid)
+        else:
+            owner = jid.JID(iq_elt["from"]).userhostJID()
+
+        peer_jid = jid.JID(iq_elt["from"])
+        return peer_jid, owner
+
+
 
 class SatMessageProtocol(xmppim.MessageProtocol):
 
--- a/sat/plugins/plugin_xep_0329.py	Sun Sep 20 11:03:24 2020 +0200
+++ b/sat/plugins/plugin_xep_0329.py	Sun Sep 20 14:04:11 2020 +0200
@@ -590,24 +590,10 @@
         @param iq_elt(domish.Element): IQ stanza of the FIS request
         @return (tuple[jid.JID, jid.JID]): peer_jid and owner
         """
-        to_jid = jid.JID(iq_elt['to'])
-        if to_jid.user:
-            user = self.host.plugins['XEP-0106'].unescape(to_jid.user)
-            if '@' in user:
-                # a full jid is specified
-                owner = jid.JID(user)
-            else:
-                # only user part is specified, we use our own host to build the full jid
-                owner = jid.JID(None, (user, client.host, None))
-        else:
-            owner = jid.JID(iq_elt["from"]).userhostJID()
-
-        peer_jid = jid.JID(iq_elt["from"])
-        return peer_jid, owner
 
     @defer.inlineCallbacks
     def _compGetRootNodesCb(self, client, iq_elt):
-        peer_jid, owner = self._compParseJids(client, iq_elt)
+        peer_jid, owner = client.getOwnerAndPeer(iq_elt)
         files_data = yield self.host.memory.getFiles(
             client,
             peer_jid=peer_jid,
@@ -632,7 +618,7 @@
                                                files_data):
             can be used to add data/elements
         """
-        peer_jid, owner = self._compParseJids(client, iq_elt)
+        peer_jid, owner = client.getOwnerAndPeer(iq_elt)
         try:
             files_data = yield self.host.memory.getFiles(
                 client, peer_jid=peer_jid, path=node_path, owner=owner
@@ -733,7 +719,7 @@
     # affiliations #
 
     async def _parseElement(self, client, iq_elt, element, namespace):
-        peer_jid, owner = self._compParseJids(client, iq_elt)
+        peer_jid, owner = client.getOwnerAndPeer(iq_elt)
         elt = next(iq_elt.elements(namespace, element))
         path = Path("/", elt['path'])
         if len(path.parts) < 2:
@@ -838,7 +824,7 @@
             return
         except RootPathException:
             # if root path is requested, we only get owner affiliation
-            peer_jid, owner = self._compParseJids(client, iq_elt)
+            peer_jid, owner = client.getOwnerAndPeer(iq_elt)
             is_owner = peer_jid.userhostJID() == owner
             affiliations = {owner: 'owner'}
         except exceptions.NotFound:
@@ -1080,7 +1066,7 @@
         defer.ensureDeferred(self.onComponentCreateDir(client, iq_elt))
 
     async def onComponentCreateDir(self, client, iq_elt):
-        peer_jid, owner = self._compParseJids(client, iq_elt)
+        peer_jid, owner = client.getOwnerAndPeer(iq_elt)
         if peer_jid.host not in client._file_sharing_allowed_hosts:
             client.sendError(iq_elt, 'forbidden')
             return