comparison sat/plugins/plugin_xep_0065.py @ 2927:69e4716d6268

plugins (jingle) file transfer: use initial "from" attribute as local jid instead of client.jid: while client.jid is fine in a client context, for components it's not the right jid to use: it is the jid of the component itself while the file transfer/jingle session entity may be established with this jid + a local part (e.g. if client is files.example.net, session may be established with louise@files.example.net, in which case "from" is louise@files.example.net, while client.jid will be files.example.net). As a consequence, using client.jid was causing trouble with components. This patch fixes it for jingle and plugins linked to file transfer by keeping a "local_jid" variable in the session, where the jid from the original "from" attribute is used.
author Goffi <goffi@goffi.org>
date Sun, 28 Apr 2019 08:55:13 +0200
parents 003b8b4b56a7
children ab2696e34d29
comparison
equal deleted inserted replaced
2926:4cd7545c4ebb 2927:69e4716d6268
271 multiplier = 10 271 multiplier = 10
272 else: 272 else:
273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) 273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
274 return 2 ** 16 * multiplier + self._local_priority 274 return 2 ** 16 * multiplier + self._local_priority
275 275
276 def activate(self, sid, peer_jid, client): 276 def activate(self, client, sid, peer_jid, local_jid):
277 """Activate the proxy candidate 277 """Activate the proxy candidate
278 278
279 Send activation request as explained in XEP-0065 § 6.3.5 279 Send activation request as explained in XEP-0065 § 6.3.5
280 Must only be used with proxy candidates 280 Must only be used with proxy candidates
281 @param sid(unicode): session id (same as for getSessionHash) 281 @param sid(unicode): session id (same as for getSessionHash)
282 @param peer_jid(jid.JID): jid of the other peer 282 @param peer_jid(jid.JID): jid of the other peer
283 @return (D(domish.Element)): IQ result (or error) 283 @return (D(domish.Element)): IQ result (or error)
284 """ 284 """
285 assert self.type == XEP_0065.TYPE_PROXY 285 assert self.type == XEP_0065.TYPE_PROXY
286 iq_elt = client.IQ() 286 iq_elt = client.IQ()
287 iq_elt["from"] = local_jid.full()
287 iq_elt["to"] = self.jid.full() 288 iq_elt["to"] = self.jid.full()
288 query_elt = iq_elt.addElement((NS_BS, "query")) 289 query_elt = iq_elt.addElement((NS_BS, "query"))
289 query_elt["sid"] = sid 290 query_elt["sid"] = sid
290 query_elt.addElement("activate", content=peer_jid.full()) 291 query_elt.addElement("activate", content=peer_jid.full())
291 return iq_elt.send() 292 return iq_elt.send()
799 ) 800 )
800 ) 801 )
801 return self._server_factory 802 return self._server_factory
802 803
803 @defer.inlineCallbacks 804 @defer.inlineCallbacks
804 def getProxy(self, client): 805 def getProxy(self, client, local_jid):
805 """Return the proxy available for this profile 806 """Return the proxy available for this profile
806 807
807 cache is used between clients using the same server 808 cache is used between clients using the same server
809 @param local_jid(jid.JID): same as for [getCandidates]
808 @return ((D)(ProxyInfos, None)): Found proxy infos, 810 @return ((D)(ProxyInfos, None)): Found proxy infos,
809 or None if not acceptable proxy is found 811 or None if not acceptable proxy is found
812 @raise exceptions.NotFound: no Proxy found
810 """ 813 """
811 814
812 def notFound(server): 815 def notFound(server):
813 log.info(u"No proxy found on this server") 816 log.info(u"No proxy found on this server")
814 self._cache_proxies[server] = None 817 self._cache_proxies[server] = None
815 defer.returnValue(None) 818 raise exceptions.NotFound
816 819
817 server = client.jid.host 820 server = client.host if client.is_component else client.jid.host
818 try: 821 try:
819 defer.returnValue(self._cache_proxies[server]) 822 defer.returnValue(self._cache_proxies[server])
820 except KeyError: 823 except KeyError:
821 pass 824 pass
822 try: 825 try:
824 yield self.host.findServiceEntities(client, "proxy", "bytestreams") 827 yield self.host.findServiceEntities(client, "proxy", "bytestreams")
825 ).pop() 828 ).pop()
826 except (defer.CancelledError, StopIteration, KeyError): 829 except (defer.CancelledError, StopIteration, KeyError):
827 notFound(server) 830 notFound(server)
828 iq_elt = client.IQ("get") 831 iq_elt = client.IQ("get")
832 iq_elt["from"] = local_jid.full()
829 iq_elt["to"] = proxy.full() 833 iq_elt["to"] = proxy.full()
830 iq_elt.addElement((NS_BS, "query")) 834 iq_elt.addElement((NS_BS, "query"))
831 835
832 try: 836 try:
833 result_elt = yield iq_elt.send() 837 result_elt = yield iq_elt.send()
883 self._external_port = ext_port 887 self._external_port = ext_port
884 888
885 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) 889 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
886 890
887 @defer.inlineCallbacks 891 @defer.inlineCallbacks
888 def getCandidates(self, client): 892 def getCandidates(self, client, local_jid):
889 """Return a list of our stream candidates 893 """Return a list of our stream candidates
890 894
895 @param local_jid(jid.JID): jid to use as local jid
896 This is needed for client which can be addressed with a different jid than
897 client.jid if a local part is used (e.g. piotr@file.example.net where
898 client.jid would be file.example.net)
891 @return (D(list[Candidate])): list of candidates, ordered by priority 899 @return (D(list[Candidate])): list of candidates, ordered by priority
892 """ 900 """
893 server_factory = yield self.getSocks5ServerFactory() 901 server_factory = yield self.getSocks5ServerFactory()
894 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) 902 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
895 proxy = yield self.getProxy(client) 903 try:
904 proxy = yield self.getProxy(client, local_jid)
905 except exceptions.NotFound:
906 proxy = None
896 907
897 # its time to gather the candidates 908 # its time to gather the candidates
898 candidates = [] 909 candidates = []
899 910
900 # first the direct ones 911 # first the direct ones
905 Candidate( 916 Candidate(
906 ip, 917 ip,
907 local_port, 918 local_port,
908 XEP_0065.TYPE_DIRECT, 919 XEP_0065.TYPE_DIRECT,
909 PRIORITY_BEST_DIRECT, 920 PRIORITY_BEST_DIRECT,
910 client.jid, 921 local_jid,
911 priority_local=True, 922 priority_local=True,
912 factory=server_factory, 923 factory=server_factory,
913 ) 924 )
914 ) 925 )
915 for ip in local_ips: 926 for ip in local_ips:
917 Candidate( 928 Candidate(
918 ip, 929 ip,
919 local_port, 930 local_port,
920 XEP_0065.TYPE_DIRECT, 931 XEP_0065.TYPE_DIRECT,
921 PRIORITY_DIRECT, 932 PRIORITY_DIRECT,
922 client.jid, 933 local_jid,
923 priority_local=True, 934 priority_local=True,
924 factory=server_factory, 935 factory=server_factory,
925 ) 936 )
926 ) 937 )
927 938
931 Candidate( 942 Candidate(
932 external_ip, 943 external_ip,
933 ext_port, 944 ext_port,
934 XEP_0065.TYPE_ASSISTED, 945 XEP_0065.TYPE_ASSISTED,
935 PRIORITY_ASSISTED, 946 PRIORITY_ASSISTED,
936 client.jid, 947 local_jid,
937 priority_local=True, 948 priority_local=True,
938 factory=server_factory, 949 factory=server_factory,
939 ) 950 )
940 ) 951 )
941 952
1132 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): 1143 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1133 pass 1144 pass
1134 1145
1135 return failure_ 1146 return failure_
1136 1147
1137 def startStream(self, client, stream_object, to_jid, sid): 1148 def startStream(self, client, stream_object, local_jid, to_jid, sid):
1138 """Launch the stream workflow 1149 """Launch the stream workflow
1139 1150
1140 @param streamProducer: stream_object to use 1151 @param streamProducer: stream_object to use
1152 @param local_jid(jid.JID): same as for [getCandidates]
1141 @param to_jid: JID of the recipient 1153 @param to_jid: JID of the recipient
1142 @param sid: Stream session id 1154 @param sid: Stream session id
1143 @param successCb: method to call when stream successfuly finished 1155 @param successCb: method to call when stream successfuly finished
1144 @param failureCb: method to call when something goes wrong 1156 @param failureCb: method to call when something goes wrong
1145 @return (D): Deferred fired when session is finished 1157 @return (D): Deferred fired when session is finished
1146 """ 1158 """
1147 session_data = self._createSession(client, stream_object, to_jid, sid, True) 1159 session_data = self._createSession(
1160 client, stream_object, local_jid, to_jid, sid, True)
1148 1161
1149 session_data[client] = client 1162 session_data[client] = client
1150 1163
1151 def gotCandidates(candidates): 1164 def gotCandidates(candidates):
1152 session_data["candidates"] = candidates 1165 session_data["candidates"] = candidates
1153 iq_elt = client.IQ() 1166 iq_elt = client.IQ()
1154 iq_elt["from"] = client.jid.full() 1167 iq_elt["from"] = local_jid.full()
1155 iq_elt["to"] = to_jid.full() 1168 iq_elt["to"] = to_jid.full()
1156 query_elt = iq_elt.addElement((NS_BS, "query")) 1169 query_elt = iq_elt.addElement((NS_BS, "query"))
1157 query_elt["mode"] = "tcp" 1170 query_elt["mode"] = "tcp"
1158 query_elt["sid"] = sid 1171 query_elt["sid"] = sid
1159 1172
1163 streamhost["port"] = str(candidate.port) 1176 streamhost["port"] = str(candidate.port)
1164 streamhost["jid"] = candidate.jid.full() 1177 streamhost["jid"] = candidate.jid.full()
1165 log.debug(u"Candidate proposed: {}".format(candidate)) 1178 log.debug(u"Candidate proposed: {}".format(candidate))
1166 1179
1167 d = iq_elt.send() 1180 d = iq_elt.send()
1168 args = [session_data, client] 1181 args = [client, session_data, local_jid]
1169 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1182 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1170 1183
1171 self.getCandidates(client).addCallback(gotCandidates) 1184 self.getCandidates(client, local_jid).addCallback(gotCandidates)
1172 return session_data[DEFER_KEY] 1185 return session_data[DEFER_KEY]
1173 1186
1174 def _IQNegotiationCb(self, iq_elt, session_data, client): 1187 def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid):
1175 """Called when the result of open iq is received 1188 """Called when the result of open iq is received
1176 1189
1177 @param session_data(dict): data of the session 1190 @param session_data(dict): data of the session
1178 @param client: %(doc_client)s 1191 @param client: %(doc_client)s
1179 @param iq_elt(domish.Element): <iq> result 1192 @param iq_elt(domish.Element): <iq> result
1202 if candidate.type == XEP_0065.TYPE_PROXY: 1215 if candidate.type == XEP_0065.TYPE_PROXY:
1203 log.info(u"A Socks5 proxy is used") 1216 log.info(u"A Socks5 proxy is used")
1204 d = self.connectCandidate(client, candidate, session_data["hash"]) 1217 d = self.connectCandidate(client, candidate, session_data["hash"])
1205 d.addCallback( 1218 d.addCallback(
1206 lambda __: candidate.activate( 1219 lambda __: candidate.activate(
1207 session_data["id"], session_data["peer_jid"], client 1220 client, session_data["id"], session_data["peer_jid"], local_jid
1208 ) 1221 )
1209 ) 1222 )
1210 d.addErrback(self._activationEb) 1223 d.addErrback(self._activationEb)
1211 else: 1224 else:
1212 d = defer.succeed(None) 1225 d = defer.succeed(None)
1214 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) 1227 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"]))
1215 1228
1216 def _activationEb(self, failure): 1229 def _activationEb(self, failure):
1217 log.warning(u"Proxy activation error: {}".format(failure.value)) 1230 log.warning(u"Proxy activation error: {}".format(failure.value))
1218 1231
1219 def _IQNegotiationEb(self, stanza_err, session_data, client): 1232 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid):
1220 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) 1233 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value))
1221 # FIXME: must clean session 1234 # FIXME: must clean session
1222 1235
1223 def createSession(self, *args, **kwargs): 1236 def createSession(self, *args, **kwargs):
1224 """like [_createSession] but return the session deferred instead of the whole session 1237 """like [_createSession] but return the session deferred instead of the whole session
1225 1238
1226 session deferred is fired when transfer is finished 1239 session deferred is fired when transfer is finished
1227 """ 1240 """
1228 return self._createSession(*args, **kwargs)[DEFER_KEY] 1241 return self._createSession(*args, **kwargs)[DEFER_KEY]
1229 1242
1230 def _createSession(self, client, stream_object, to_jid, sid, requester=False): 1243 def _createSession(self, client, stream_object, local_jid, to_jid, sid,
1244 requester=False):
1231 """Called when a bytestream is imminent 1245 """Called when a bytestream is imminent
1232 1246
1233 @param stream_object(iface.IStreamProducer): File object where data will be written 1247 @param stream_object(iface.IStreamProducer): File object where data will be
1248 written
1234 @param to_jid(jid.JId): jid of the other peer 1249 @param to_jid(jid.JId): jid of the other peer
1235 @param sid(unicode): session id 1250 @param sid(unicode): session id
1236 @param initiator(bool): if True, this session is create by initiator 1251 @param initiator(bool): if True, this session is create by initiator
1237 @return (dict): session data 1252 @return (dict): session data
1238 """ 1253 """
1239 if sid in client.xep_0065_sid_session: 1254 if sid in client.xep_0065_sid_session:
1240 raise exceptions.ConflictError(u"A session with this id already exists !") 1255 raise exceptions.ConflictError(u"A session with this id already exists !")
1241 if requester: 1256 if requester:
1242 session_hash = getSessionHash(client.jid, to_jid, sid) 1257 session_hash = getSessionHash(local_jid, to_jid, sid)
1243 session_data = self._registerHash(client, session_hash, stream_object) 1258 session_data = self._registerHash(client, session_hash, stream_object)
1244 else: 1259 else:
1245 session_hash = getSessionHash(to_jid, client.jid, sid) 1260 session_hash = getSessionHash(to_jid, local_jid, sid)
1246 session_d = defer.Deferred() 1261 session_d = defer.Deferred()
1247 session_d.addBoth(self.killSession, session_hash, sid, client) 1262 session_d.addBoth(self.killSession, session_hash, sid, client)
1248 session_data = client._s5b_sessions[session_hash] = { 1263 session_data = client._s5b_sessions[session_hash] = {
1249 DEFER_KEY: session_d, 1264 DEFER_KEY: session_d,
1250 TIMER_KEY: reactor.callLater( 1265 TIMER_KEY: reactor.callLater(
1253 } 1268 }
1254 client.xep_0065_sid_session[sid] = session_data 1269 client.xep_0065_sid_session[sid] = session_data
1255 session_data.update( 1270 session_data.update(
1256 { 1271 {
1257 "id": sid, 1272 "id": sid,
1273 "local_jid": local_jid,
1258 "peer_jid": to_jid, 1274 "peer_jid": to_jid,
1259 "stream_object": stream_object, 1275 "stream_object": stream_object,
1260 "hash": session_hash, 1276 "hash": session_hash,
1261 } 1277 }
1262 ) 1278 )