changeset 2927:69e4716d6268

plugins (jingle) file transfer: use initial "from" attribute as local jid instead of client.jid: while client.jid is fine in a client context, for components it's not the right jid to use: it is the jid of the component itself while the file transfer/jingle session entity may be established with this jid + a local part (e.g. if client is files.example.net, session may be established with louise@files.example.net, in which case "from" is louise@files.example.net, while client.jid will be files.example.net). As a consequence, using client.jid was causing trouble with components. This patch fixes it for jingle and plugins linked to file transfer by keeping a "local_jid" variable in the session, where the jid from the original "from" attribute is used.
author Goffi <goffi@goffi.org>
date Sun, 28 Apr 2019 08:55:13 +0200 (2019-04-28)
parents 4cd7545c4ebb
children c0f6fd75af5f
files sat/plugins/plugin_xep_0047.py sat/plugins/plugin_xep_0065.py sat/plugins/plugin_xep_0096.py sat/plugins/plugin_xep_0166.py sat/plugins/plugin_xep_0234.py sat/plugins/plugin_xep_0260.py sat/plugins/plugin_xep_0261.py
diffstat 7 files changed, 75 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0047.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0047.py	Sun Apr 28 08:55:13 2019 +0200
@@ -131,10 +131,11 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, client, stream_object, to_jid, sid):
+    def _createSession(self, client, stream_object, local_jid, to_jid, sid):
         """Called when a bytestream is imminent
 
         @param stream_object(IConsumer): stream object where data will be written
+        @param local_jid(jid.JID): same as [startStream]
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
         @return (dict): session data
@@ -144,6 +145,7 @@
         session_data = client.xep_0047_current_stream[sid] = {
             "id": sid,
             DEFER_KEY: defer.Deferred(),
+            "local_jid": local_jid,
             "to": to_jid,
             "stream_object": stream_object,
             "seq": -1,
@@ -285,15 +287,19 @@
             self._killSession(sid, client, error_condition)
         client.send(iq_elt)
 
-    def startStream(self, client, stream_object, to_jid, sid, block_size=None):
+    def startStream(self, client, stream_object, local_jid, to_jid, sid, block_size=None):
         """Launch the stream workflow
 
         @param stream_object(ifaces.IStreamProducer): stream object to send
+        @param local_jid(jid.JID): jid to use as local jid
+            This is needed for client which can be addressed with a different jid than
+            client.jid if a local part is used (e.g. piotr@file.example.net where
+            client.jid would be file.example.net)
         @param to_jid(jid.JID): JID of the recipient
         @param sid(unicode): Stream session id
         @param block_size(int, None): size of the block (or None for default)
         """
-        session_data = self._createSession(client, stream_object, to_jid, sid)
+        session_data = self._createSession(client, stream_object, local_jid, to_jid, sid)
 
         if block_size is None:
             block_size = XEP_0047.BLOCK_SIZE
@@ -301,6 +307,7 @@
         session_data["block_size"] = block_size
 
         iq_elt = client.IQ()
+        iq_elt["from"] = local_jid.full()
         iq_elt["to"] = to_jid.full()
         open_elt = iq_elt.addElement((NS_IBB, "open"))
         open_elt["block-size"] = str(block_size)
@@ -323,6 +330,7 @@
         buffer_ = session_data["stream_object"].read(session_data["block_size"])
         if buffer_:
             next_iq_elt = client.IQ()
+            next_iq_elt["from"] = session_data["local_jid"].full()
             next_iq_elt["to"] = session_data["to"].full()
             data_elt = next_iq_elt.addElement((NS_IBB, "data"))
             seq = session_data["seq"] = (session_data["seq"] + 1) % 65535
@@ -350,6 +358,7 @@
         @param failure_reason(unicode, None): reason of the failure, or None if steam was successful
         """
         iq_elt = client.IQ()
+        iq_elt["from"] = session_data["local_jid"].full()
         iq_elt["to"] = session_data["to"].full()
         close_elt = iq_elt.addElement((NS_IBB, "close"))
         close_elt["sid"] = session_data["id"]
--- a/sat/plugins/plugin_xep_0065.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0065.py	Sun Apr 28 08:55:13 2019 +0200
@@ -273,7 +273,7 @@
             raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
         return 2 ** 16 * multiplier + self._local_priority
 
-    def activate(self, sid, peer_jid, client):
+    def activate(self, client, sid, peer_jid, local_jid):
         """Activate the proxy candidate
 
         Send activation request as explained in XEP-0065 ยง 6.3.5
@@ -284,6 +284,7 @@
         """
         assert self.type == XEP_0065.TYPE_PROXY
         iq_elt = client.IQ()
+        iq_elt["from"] = local_jid.full()
         iq_elt["to"] = self.jid.full()
         query_elt = iq_elt.addElement((NS_BS, "query"))
         query_elt["sid"] = sid
@@ -801,20 +802,22 @@
         return self._server_factory
 
     @defer.inlineCallbacks
-    def getProxy(self, client):
+    def getProxy(self, client, local_jid):
         """Return the proxy available for this profile
 
         cache is used between clients using the same server
+        @param local_jid(jid.JID): same as for [getCandidates]
         @return ((D)(ProxyInfos, None)): Found proxy infos,
             or None if not acceptable proxy is found
+        @raise exceptions.NotFound: no Proxy found
         """
 
         def notFound(server):
             log.info(u"No proxy found on this server")
             self._cache_proxies[server] = None
-            defer.returnValue(None)
+            raise exceptions.NotFound
 
-        server = client.jid.host
+        server = client.host if client.is_component else client.jid.host
         try:
             defer.returnValue(self._cache_proxies[server])
         except KeyError:
@@ -826,6 +829,7 @@
         except (defer.CancelledError, StopIteration, KeyError):
             notFound(server)
         iq_elt = client.IQ("get")
+        iq_elt["from"] = local_jid.full()
         iq_elt["to"] = proxy.full()
         iq_elt.addElement((NS_BS, "query"))
 
@@ -885,14 +889,21 @@
         defer.returnValue((local_port, self._external_port, local_ips, external_ip))
 
     @defer.inlineCallbacks
-    def getCandidates(self, client):
+    def getCandidates(self, client, local_jid):
         """Return a list of our stream candidates
 
+        @param local_jid(jid.JID): jid to use as local jid
+            This is needed for client which can be addressed with a different jid than
+            client.jid if a local part is used (e.g. piotr@file.example.net where
+            client.jid would be file.example.net)
         @return (D(list[Candidate])): list of candidates, ordered by priority
         """
         server_factory = yield self.getSocks5ServerFactory()
         local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
-        proxy = yield self.getProxy(client)
+        try:
+            proxy = yield self.getProxy(client, local_jid)
+        except exceptions.NotFound:
+            proxy = None
 
         # its time to gather the candidates
         candidates = []
@@ -907,7 +918,7 @@
                 local_port,
                 XEP_0065.TYPE_DIRECT,
                 PRIORITY_BEST_DIRECT,
-                client.jid,
+                local_jid,
                 priority_local=True,
                 factory=server_factory,
             )
@@ -919,7 +930,7 @@
                     local_port,
                     XEP_0065.TYPE_DIRECT,
                     PRIORITY_DIRECT,
-                    client.jid,
+                    local_jid,
                     priority_local=True,
                     factory=server_factory,
                 )
@@ -933,7 +944,7 @@
                     ext_port,
                     XEP_0065.TYPE_ASSISTED,
                     PRIORITY_ASSISTED,
-                    client.jid,
+                    local_jid,
                     priority_local=True,
                     factory=server_factory,
                 )
@@ -1134,24 +1145,26 @@
 
         return failure_
 
-    def startStream(self, client, stream_object, to_jid, sid):
+    def startStream(self, client, stream_object, local_jid, to_jid, sid):
         """Launch the stream workflow
 
         @param streamProducer: stream_object to use
+        @param local_jid(jid.JID): same as for [getCandidates]
         @param to_jid: JID of the recipient
         @param sid: Stream session id
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
         @return (D): Deferred fired when session is finished
         """
-        session_data = self._createSession(client, stream_object, to_jid, sid, True)
+        session_data = self._createSession(
+            client, stream_object, local_jid, to_jid, sid, True)
 
         session_data[client] = client
 
         def gotCandidates(candidates):
             session_data["candidates"] = candidates
             iq_elt = client.IQ()
-            iq_elt["from"] = client.jid.full()
+            iq_elt["from"] = local_jid.full()
             iq_elt["to"] = to_jid.full()
             query_elt = iq_elt.addElement((NS_BS, "query"))
             query_elt["mode"] = "tcp"
@@ -1165,13 +1178,13 @@
                 log.debug(u"Candidate proposed: {}".format(candidate))
 
             d = iq_elt.send()
-            args = [session_data, client]
+            args = [client, session_data, local_jid]
             d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
 
-        self.getCandidates(client).addCallback(gotCandidates)
+        self.getCandidates(client, local_jid).addCallback(gotCandidates)
         return session_data[DEFER_KEY]
 
-    def _IQNegotiationCb(self, iq_elt, session_data, client):
+    def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid):
         """Called when the result of open iq is received
 
         @param session_data(dict): data of the session
@@ -1204,7 +1217,7 @@
             d = self.connectCandidate(client, candidate, session_data["hash"])
             d.addCallback(
                 lambda __: candidate.activate(
-                    session_data["id"], session_data["peer_jid"], client
+                    client, session_data["id"], session_data["peer_jid"], local_jid
                 )
             )
             d.addErrback(self._activationEb)
@@ -1216,7 +1229,7 @@
     def _activationEb(self, failure):
         log.warning(u"Proxy activation error: {}".format(failure.value))
 
-    def _IQNegotiationEb(self, stanza_err, session_data, client):
+    def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid):
         log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value))
         # FIXME: must clean session
 
@@ -1227,10 +1240,12 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, client, stream_object, to_jid, sid, requester=False):
+    def _createSession(self, client, stream_object, local_jid, to_jid, sid,
+                       requester=False):
         """Called when a bytestream is imminent
 
-        @param stream_object(iface.IStreamProducer): File object where data will be written
+        @param stream_object(iface.IStreamProducer): File object where data will be
+            written
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
         @param initiator(bool): if True, this session is create by initiator
@@ -1239,10 +1254,10 @@
         if sid in client.xep_0065_sid_session:
             raise exceptions.ConflictError(u"A session with this id already exists !")
         if requester:
-            session_hash = getSessionHash(client.jid, to_jid, sid)
+            session_hash = getSessionHash(local_jid, to_jid, sid)
             session_data = self._registerHash(client, session_hash, stream_object)
         else:
-            session_hash = getSessionHash(to_jid, client.jid, sid)
+            session_hash = getSessionHash(to_jid, local_jid, sid)
             session_d = defer.Deferred()
             session_d.addBoth(self.killSession, session_hash, sid, client)
             session_data = client._s5b_sessions[session_hash] = {
@@ -1255,6 +1270,7 @@
         session_data.update(
             {
                 "id": sid,
+                "local_jid": local_jid,
                 "peer_jid": to_jid,
                 "stream_object": stream_object,
                 "hash": session_hash,
--- a/sat/plugins/plugin_xep_0096.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0096.py	Sun Apr 28 08:55:13 2019 +0200
@@ -239,7 +239,7 @@
         # file_obj = self._getFileObject(dest_path, can_range)
         # range_offset = file_obj.tell()
         d = data["stream_plugin"].createSession(
-            client, data["stream_object"], data["peer_jid"], data["si_id"]
+            client, data["stream_object"], client.jid, data["peer_jid"], data["si_id"]
         )
         d.addCallback(self._transferCb, client, data)
         d.addErrback(self._transferEb, client, data)
@@ -357,7 +357,8 @@
         stream_object = stream.FileStreamObject(
             self.host, client, filepath, uid=sid, size=size
         )
-        d = plugin.startStream(client, stream_object, jid.JID(iq_elt["from"]), sid)
+        d = plugin.startStream(client, stream_object, client.jid,
+                               jid.JID(iq_elt["from"]), sid)
         d.addCallback(self._sendCb, client, sid, stream_object)
         d.addErrback(self._sendEb, client, sid, stream_object)
 
--- a/sat/plugins/plugin_xep_0166.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0166.py	Sun Apr 28 08:55:13 2019 +0200
@@ -121,7 +121,7 @@
 
     def _buildJingleElt(self, client, session, action):
         iq_elt = client.IQ("set")
-        iq_elt["from"] = client.jid.full()
+        iq_elt["from"] = session['local_jid'].full()
         iq_elt["to"] = session["peer_jid"].full()
         jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
         jingle_elt["sid"] = session["id"]
@@ -353,7 +353,8 @@
         @return D(unicode): jingle session id
         """
         assert contents  # there must be at least one content
-        if peer_jid == client.jid:
+        if (peer_jid == client.jid
+            or client.is_component and peer_jid.host == client.jid.host):
             raise ValueError(_(u"You can't do a jingle session with yourself"))
         initiator = client.jid
         sid = unicode(uuid.uuid4())
@@ -363,6 +364,7 @@
             "state": STATE_PENDING,
             "initiator": initiator,
             "role": XEP_0166.ROLE_INITIATOR,
+            "local_jid": client.jid,
             "peer_jid": peer_jid,
             "started": time.time(),
             "contents": {},
@@ -538,6 +540,9 @@
                 "state": STATE_PENDING,
                 "initiator": peer_jid,
                 "role": XEP_0166.ROLE_RESPONDER,
+                # we store local_jid using request['to'] because for a component the jid
+                # used may not be client.jid (if a local part is used).
+                "local_jid": jid.JID(request['to']),
                 "peer_jid": peer_jid,
                 "started": time.time(),
             }
@@ -832,7 +837,7 @@
         iq_elt, jingle_elt = self._buildJingleElt(
             client, session, XEP_0166.A_SESSION_ACCEPT
         )
-        jingle_elt["responder"] = client.jid.full()
+        jingle_elt["responder"] = session['local_jid'].full()
 
         # contents
 
--- a/sat/plugins/plugin_xep_0234.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0234.py	Sun Apr 28 08:55:13 2019 +0200
@@ -106,21 +106,9 @@
 
     # generic methods
 
-    def buildFileElement(
-        self,
-        name,
-        file_hash=None,
-        hash_algo=None,
-        size=None,
-        mime_type=None,
-        desc=None,
-        modified=None,
-        transfer_range=None,
-        path=None,
-        namespace=None,
-        file_elt=None,
-        **kwargs
-    ):
+    def buildFileElement(self, name, file_hash=None, hash_algo=None, size=None,
+        mime_type=None, desc=None, modified=None, transfer_range=None, path=None,
+        namespace=None, file_elt=None, **kwargs):
         """Generate a <file> element with available metadata
 
         @param file_hash(unicode, None): hash of the file
--- a/sat/plugins/plugin_xep_0260.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0260.py	Sun Apr 28 08:55:13 2019 +0200
@@ -140,13 +140,14 @@
         transport_data = content_data["transport_data"]
         sid = transport_data["sid"] = unicode(uuid.uuid4())
         session_hash = transport_data["session_hash"] = self._s5b.getSessionHash(
-            client.jid, session["peer_jid"], sid
+            session[u"local_jid"], session["peer_jid"], sid
         )
         transport_data["peer_session_hash"] = self._s5b.getSessionHash(
-            session["peer_jid"], client.jid, sid
+            session["peer_jid"], session[u"local_jid"], sid
         )  # requester and target are inversed for peer candidates
         transport_data["stream_d"] = self._s5b.registerHash(client, session_hash, None)
-        candidates = transport_data["candidates"] = yield self._s5b.getCandidates(client)
+        candidates = transport_data["candidates"] = yield self._s5b.getCandidates(
+            client, session["local_jid"])
         mode = "tcp"  # XXX: we only manage tcp for now
         transport_elt = self._buildCandidates(
             session, candidates, sid, session_hash, client, mode
@@ -249,7 +250,7 @@
                 log.debug(
                     u"Candidates have same priority, we select the one choosed by initiator"
                 )
-                if session["initiator"] == client.jid:
+                if session["initiator"] == session[u"local_jid"]:
                     choosed_candidate = best_candidate
                 else:
                     choosed_candidate = peer_best_candidate
@@ -425,12 +426,12 @@
             assert "peer_candidates" not in transport_data
             sid = transport_data["sid"] = transport_elt["sid"]
             session_hash = transport_data["session_hash"] = self._s5b.getSessionHash(
-                client.jid, session["peer_jid"], sid
+                session["local_jid"], session["peer_jid"], sid
             )
             peer_session_hash = transport_data[
                 "peer_session_hash"
             ] = self._s5b.getSessionHash(
-                session["peer_jid"], client.jid, sid
+                session["peer_jid"], session["local_jid"], sid
             )  # requester and target are inversed for peer candidates
             peer_candidates = transport_data["peer_candidates"] = self._parseCandidates(
                 transport_elt
@@ -444,7 +445,7 @@
             d.addCallback(
                 self._foundPeerCandidate, session, transport_data, content_name, client
             )
-            candidates = yield self._s5b.getCandidates(client)
+            candidates = yield self._s5b.getCandidates(client, session["local_jid"])
             # we remove duplicate candidates
             candidates = [
                 candidate for candidate in candidates if candidate not in peer_candidates
--- a/sat/plugins/plugin_xep_0261.py	Sun Apr 28 08:55:13 2019 +0200
+++ b/sat/plugins/plugin_xep_0261.py	Sun Apr 28 08:55:13 2019 +0200
@@ -84,17 +84,19 @@
         elif action in (self._j.A_SESSION_INITIATE, self._j.A_TRANSPORT_REPLACE):
             transport_data["sid"] = transport_elt["sid"]
         elif action in (self._j.A_START, self._j.A_PREPARE_RESPONDER):
+            local_jid = session["local_jid"]
             peer_jid = session["peer_jid"]
             sid = transport_data["sid"]
             stream_object = content_data["stream_object"]
             if action == self._j.A_START:
                 block_size = transport_data["block_size"]
                 d = self._ibb.startStream(
-                    client, stream_object, peer_jid, sid, block_size
+                    client, stream_object, local_jid, peer_jid, sid, block_size
                 )
                 d.chainDeferred(content_data["finished_d"])
             else:
-                d = self._ibb.createSession(client, stream_object, peer_jid, sid)
+                d = self._ibb.createSession(
+                    client, stream_object, local_jid, peer_jid, sid)
                 d.chainDeferred(content_data["finished_d"])
         else:
             log.warning(u"FIXME: unmanaged action {}".format(action))