Mercurial > libervia-backend
changeset 2927:69e4716d6268
plugins (jingle) file transfer: use initial "from" attribute as local jid instead of client.jid:
while client.jid is fine in a client context, for components it's not the right jid to use: it is the jid of the component itself while the file transfer/jingle session entity may be established with this jid + a local part (e.g. if client is files.example.net, session may be established with louise@files.example.net, in which case "from" is louise@files.example.net, while client.jid will be files.example.net).
As a consequence, using client.jid was causing trouble with components.
This patch fixes it for jingle and plugins linked to file transfer by keeping a "local_jid" variable in the session, where the jid from the original "from" attribute is used.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 28 Apr 2019 08:55:13 +0200 |
parents | 4cd7545c4ebb |
children | c0f6fd75af5f |
files | sat/plugins/plugin_xep_0047.py sat/plugins/plugin_xep_0065.py sat/plugins/plugin_xep_0096.py sat/plugins/plugin_xep_0166.py sat/plugins/plugin_xep_0234.py sat/plugins/plugin_xep_0260.py sat/plugins/plugin_xep_0261.py |
diffstat | 7 files changed, 75 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0047.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0047.py Sun Apr 28 08:55:13 2019 +0200 @@ -131,10 +131,11 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, client, stream_object, to_jid, sid): + def _createSession(self, client, stream_object, local_jid, to_jid, sid): """Called when a bytestream is imminent @param stream_object(IConsumer): stream object where data will be written + @param local_jid(jid.JID): same as [startStream] @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id @return (dict): session data @@ -144,6 +145,7 @@ session_data = client.xep_0047_current_stream[sid] = { "id": sid, DEFER_KEY: defer.Deferred(), + "local_jid": local_jid, "to": to_jid, "stream_object": stream_object, "seq": -1, @@ -285,15 +287,19 @@ self._killSession(sid, client, error_condition) client.send(iq_elt) - def startStream(self, client, stream_object, to_jid, sid, block_size=None): + def startStream(self, client, stream_object, local_jid, to_jid, sid, block_size=None): """Launch the stream workflow @param stream_object(ifaces.IStreamProducer): stream object to send + @param local_jid(jid.JID): jid to use as local jid + This is needed for client which can be addressed with a different jid than + client.jid if a local part is used (e.g. piotr@file.example.net where + client.jid would be file.example.net) @param to_jid(jid.JID): JID of the recipient @param sid(unicode): Stream session id @param block_size(int, None): size of the block (or None for default) """ - session_data = self._createSession(client, stream_object, to_jid, sid) + session_data = self._createSession(client, stream_object, local_jid, to_jid, sid) if block_size is None: block_size = XEP_0047.BLOCK_SIZE @@ -301,6 +307,7 @@ session_data["block_size"] = block_size iq_elt = client.IQ() + iq_elt["from"] = local_jid.full() iq_elt["to"] = to_jid.full() open_elt = iq_elt.addElement((NS_IBB, "open")) open_elt["block-size"] = str(block_size) @@ -323,6 +330,7 @@ buffer_ = session_data["stream_object"].read(session_data["block_size"]) if buffer_: next_iq_elt = client.IQ() + next_iq_elt["from"] = session_data["local_jid"].full() next_iq_elt["to"] = session_data["to"].full() data_elt = next_iq_elt.addElement((NS_IBB, "data")) seq = session_data["seq"] = (session_data["seq"] + 1) % 65535 @@ -350,6 +358,7 @@ @param failure_reason(unicode, None): reason of the failure, or None if steam was successful """ iq_elt = client.IQ() + iq_elt["from"] = session_data["local_jid"].full() iq_elt["to"] = session_data["to"].full() close_elt = iq_elt.addElement((NS_IBB, "close")) close_elt["sid"] = session_data["id"]
--- a/sat/plugins/plugin_xep_0065.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0065.py Sun Apr 28 08:55:13 2019 +0200 @@ -273,7 +273,7 @@ raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) return 2 ** 16 * multiplier + self._local_priority - def activate(self, sid, peer_jid, client): + def activate(self, client, sid, peer_jid, local_jid): """Activate the proxy candidate Send activation request as explained in XEP-0065 ยง 6.3.5 @@ -284,6 +284,7 @@ """ assert self.type == XEP_0065.TYPE_PROXY iq_elt = client.IQ() + iq_elt["from"] = local_jid.full() iq_elt["to"] = self.jid.full() query_elt = iq_elt.addElement((NS_BS, "query")) query_elt["sid"] = sid @@ -801,20 +802,22 @@ return self._server_factory @defer.inlineCallbacks - def getProxy(self, client): + def getProxy(self, client, local_jid): """Return the proxy available for this profile cache is used between clients using the same server + @param local_jid(jid.JID): same as for [getCandidates] @return ((D)(ProxyInfos, None)): Found proxy infos, or None if not acceptable proxy is found + @raise exceptions.NotFound: no Proxy found """ def notFound(server): log.info(u"No proxy found on this server") self._cache_proxies[server] = None - defer.returnValue(None) + raise exceptions.NotFound - server = client.jid.host + server = client.host if client.is_component else client.jid.host try: defer.returnValue(self._cache_proxies[server]) except KeyError: @@ -826,6 +829,7 @@ except (defer.CancelledError, StopIteration, KeyError): notFound(server) iq_elt = client.IQ("get") + iq_elt["from"] = local_jid.full() iq_elt["to"] = proxy.full() iq_elt.addElement((NS_BS, "query")) @@ -885,14 +889,21 @@ defer.returnValue((local_port, self._external_port, local_ips, external_ip)) @defer.inlineCallbacks - def getCandidates(self, client): + def getCandidates(self, client, local_jid): """Return a list of our stream candidates + @param local_jid(jid.JID): jid to use as local jid + This is needed for client which can be addressed with a different jid than + client.jid if a local part is used (e.g. piotr@file.example.net where + client.jid would be file.example.net) @return (D(list[Candidate])): list of candidates, ordered by priority """ server_factory = yield self.getSocks5ServerFactory() local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) - proxy = yield self.getProxy(client) + try: + proxy = yield self.getProxy(client, local_jid) + except exceptions.NotFound: + proxy = None # its time to gather the candidates candidates = [] @@ -907,7 +918,7 @@ local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, - client.jid, + local_jid, priority_local=True, factory=server_factory, ) @@ -919,7 +930,7 @@ local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, - client.jid, + local_jid, priority_local=True, factory=server_factory, ) @@ -933,7 +944,7 @@ ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, - client.jid, + local_jid, priority_local=True, factory=server_factory, ) @@ -1134,24 +1145,26 @@ return failure_ - def startStream(self, client, stream_object, to_jid, sid): + def startStream(self, client, stream_object, local_jid, to_jid, sid): """Launch the stream workflow @param streamProducer: stream_object to use + @param local_jid(jid.JID): same as for [getCandidates] @param to_jid: JID of the recipient @param sid: Stream session id @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @return (D): Deferred fired when session is finished """ - session_data = self._createSession(client, stream_object, to_jid, sid, True) + session_data = self._createSession( + client, stream_object, local_jid, to_jid, sid, True) session_data[client] = client def gotCandidates(candidates): session_data["candidates"] = candidates iq_elt = client.IQ() - iq_elt["from"] = client.jid.full() + iq_elt["from"] = local_jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement((NS_BS, "query")) query_elt["mode"] = "tcp" @@ -1165,13 +1178,13 @@ log.debug(u"Candidate proposed: {}".format(candidate)) d = iq_elt.send() - args = [session_data, client] + args = [client, session_data, local_jid] d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) - self.getCandidates(client).addCallback(gotCandidates) + self.getCandidates(client, local_jid).addCallback(gotCandidates) return session_data[DEFER_KEY] - def _IQNegotiationCb(self, iq_elt, session_data, client): + def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid): """Called when the result of open iq is received @param session_data(dict): data of the session @@ -1204,7 +1217,7 @@ d = self.connectCandidate(client, candidate, session_data["hash"]) d.addCallback( lambda __: candidate.activate( - session_data["id"], session_data["peer_jid"], client + client, session_data["id"], session_data["peer_jid"], local_jid ) ) d.addErrback(self._activationEb) @@ -1216,7 +1229,7 @@ def _activationEb(self, failure): log.warning(u"Proxy activation error: {}".format(failure.value)) - def _IQNegotiationEb(self, stanza_err, session_data, client): + def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) # FIXME: must clean session @@ -1227,10 +1240,12 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, client, stream_object, to_jid, sid, requester=False): + def _createSession(self, client, stream_object, local_jid, to_jid, sid, + requester=False): """Called when a bytestream is imminent - @param stream_object(iface.IStreamProducer): File object where data will be written + @param stream_object(iface.IStreamProducer): File object where data will be + written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id @param initiator(bool): if True, this session is create by initiator @@ -1239,10 +1254,10 @@ if sid in client.xep_0065_sid_session: raise exceptions.ConflictError(u"A session with this id already exists !") if requester: - session_hash = getSessionHash(client.jid, to_jid, sid) + session_hash = getSessionHash(local_jid, to_jid, sid) session_data = self._registerHash(client, session_hash, stream_object) else: - session_hash = getSessionHash(to_jid, client.jid, sid) + session_hash = getSessionHash(to_jid, local_jid, sid) session_d = defer.Deferred() session_d.addBoth(self.killSession, session_hash, sid, client) session_data = client._s5b_sessions[session_hash] = { @@ -1255,6 +1270,7 @@ session_data.update( { "id": sid, + "local_jid": local_jid, "peer_jid": to_jid, "stream_object": stream_object, "hash": session_hash,
--- a/sat/plugins/plugin_xep_0096.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0096.py Sun Apr 28 08:55:13 2019 +0200 @@ -239,7 +239,7 @@ # file_obj = self._getFileObject(dest_path, can_range) # range_offset = file_obj.tell() d = data["stream_plugin"].createSession( - client, data["stream_object"], data["peer_jid"], data["si_id"] + client, data["stream_object"], client.jid, data["peer_jid"], data["si_id"] ) d.addCallback(self._transferCb, client, data) d.addErrback(self._transferEb, client, data) @@ -357,7 +357,8 @@ stream_object = stream.FileStreamObject( self.host, client, filepath, uid=sid, size=size ) - d = plugin.startStream(client, stream_object, jid.JID(iq_elt["from"]), sid) + d = plugin.startStream(client, stream_object, client.jid, + jid.JID(iq_elt["from"]), sid) d.addCallback(self._sendCb, client, sid, stream_object) d.addErrback(self._sendEb, client, sid, stream_object)
--- a/sat/plugins/plugin_xep_0166.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0166.py Sun Apr 28 08:55:13 2019 +0200 @@ -121,7 +121,7 @@ def _buildJingleElt(self, client, session, action): iq_elt = client.IQ("set") - iq_elt["from"] = client.jid.full() + iq_elt["from"] = session['local_jid'].full() iq_elt["to"] = session["peer_jid"].full() jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) jingle_elt["sid"] = session["id"] @@ -353,7 +353,8 @@ @return D(unicode): jingle session id """ assert contents # there must be at least one content - if peer_jid == client.jid: + if (peer_jid == client.jid + or client.is_component and peer_jid.host == client.jid.host): raise ValueError(_(u"You can't do a jingle session with yourself")) initiator = client.jid sid = unicode(uuid.uuid4()) @@ -363,6 +364,7 @@ "state": STATE_PENDING, "initiator": initiator, "role": XEP_0166.ROLE_INITIATOR, + "local_jid": client.jid, "peer_jid": peer_jid, "started": time.time(), "contents": {}, @@ -538,6 +540,9 @@ "state": STATE_PENDING, "initiator": peer_jid, "role": XEP_0166.ROLE_RESPONDER, + # we store local_jid using request['to'] because for a component the jid + # used may not be client.jid (if a local part is used). + "local_jid": jid.JID(request['to']), "peer_jid": peer_jid, "started": time.time(), } @@ -832,7 +837,7 @@ iq_elt, jingle_elt = self._buildJingleElt( client, session, XEP_0166.A_SESSION_ACCEPT ) - jingle_elt["responder"] = client.jid.full() + jingle_elt["responder"] = session['local_jid'].full() # contents
--- a/sat/plugins/plugin_xep_0234.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0234.py Sun Apr 28 08:55:13 2019 +0200 @@ -106,21 +106,9 @@ # generic methods - def buildFileElement( - self, - name, - file_hash=None, - hash_algo=None, - size=None, - mime_type=None, - desc=None, - modified=None, - transfer_range=None, - path=None, - namespace=None, - file_elt=None, - **kwargs - ): + def buildFileElement(self, name, file_hash=None, hash_algo=None, size=None, + mime_type=None, desc=None, modified=None, transfer_range=None, path=None, + namespace=None, file_elt=None, **kwargs): """Generate a <file> element with available metadata @param file_hash(unicode, None): hash of the file
--- a/sat/plugins/plugin_xep_0260.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0260.py Sun Apr 28 08:55:13 2019 +0200 @@ -140,13 +140,14 @@ transport_data = content_data["transport_data"] sid = transport_data["sid"] = unicode(uuid.uuid4()) session_hash = transport_data["session_hash"] = self._s5b.getSessionHash( - client.jid, session["peer_jid"], sid + session[u"local_jid"], session["peer_jid"], sid ) transport_data["peer_session_hash"] = self._s5b.getSessionHash( - session["peer_jid"], client.jid, sid + session["peer_jid"], session[u"local_jid"], sid ) # requester and target are inversed for peer candidates transport_data["stream_d"] = self._s5b.registerHash(client, session_hash, None) - candidates = transport_data["candidates"] = yield self._s5b.getCandidates(client) + candidates = transport_data["candidates"] = yield self._s5b.getCandidates( + client, session["local_jid"]) mode = "tcp" # XXX: we only manage tcp for now transport_elt = self._buildCandidates( session, candidates, sid, session_hash, client, mode @@ -249,7 +250,7 @@ log.debug( u"Candidates have same priority, we select the one choosed by initiator" ) - if session["initiator"] == client.jid: + if session["initiator"] == session[u"local_jid"]: choosed_candidate = best_candidate else: choosed_candidate = peer_best_candidate @@ -425,12 +426,12 @@ assert "peer_candidates" not in transport_data sid = transport_data["sid"] = transport_elt["sid"] session_hash = transport_data["session_hash"] = self._s5b.getSessionHash( - client.jid, session["peer_jid"], sid + session["local_jid"], session["peer_jid"], sid ) peer_session_hash = transport_data[ "peer_session_hash" ] = self._s5b.getSessionHash( - session["peer_jid"], client.jid, sid + session["peer_jid"], session["local_jid"], sid ) # requester and target are inversed for peer candidates peer_candidates = transport_data["peer_candidates"] = self._parseCandidates( transport_elt @@ -444,7 +445,7 @@ d.addCallback( self._foundPeerCandidate, session, transport_data, content_name, client ) - candidates = yield self._s5b.getCandidates(client) + candidates = yield self._s5b.getCandidates(client, session["local_jid"]) # we remove duplicate candidates candidates = [ candidate for candidate in candidates if candidate not in peer_candidates
--- a/sat/plugins/plugin_xep_0261.py Sun Apr 28 08:55:13 2019 +0200 +++ b/sat/plugins/plugin_xep_0261.py Sun Apr 28 08:55:13 2019 +0200 @@ -84,17 +84,19 @@ elif action in (self._j.A_SESSION_INITIATE, self._j.A_TRANSPORT_REPLACE): transport_data["sid"] = transport_elt["sid"] elif action in (self._j.A_START, self._j.A_PREPARE_RESPONDER): + local_jid = session["local_jid"] peer_jid = session["peer_jid"] sid = transport_data["sid"] stream_object = content_data["stream_object"] if action == self._j.A_START: block_size = transport_data["block_size"] d = self._ibb.startStream( - client, stream_object, peer_jid, sid, block_size + client, stream_object, local_jid, peer_jid, sid, block_size ) d.chainDeferred(content_data["finished_d"]) else: - d = self._ibb.createSession(client, stream_object, peer_jid, sid) + d = self._ibb.createSession( + client, stream_object, local_jid, peer_jid, sid) d.chainDeferred(content_data["finished_d"]) else: log.warning(u"FIXME: unmanaged action {}".format(action))