Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0065.py @ 2927:69e4716d6268
plugins (jingle) file transfer: use initial "from" attribute as local jid instead of client.jid:
while client.jid is fine in a client context, for components it's not the right jid to use: it is the jid of the component itself while the file transfer/jingle session entity may be established with this jid + a local part (e.g. if client is files.example.net, session may be established with louise@files.example.net, in which case "from" is louise@files.example.net, while client.jid will be files.example.net).
As a consequence, using client.jid was causing trouble with components.
This patch fixes it for jingle and plugins linked to file transfer by keeping a "local_jid" variable in the session, where the jid from the original "from" attribute is used.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 28 Apr 2019 08:55:13 +0200 |
parents | 003b8b4b56a7 |
children | ab2696e34d29 |
comparison
equal
deleted
inserted
replaced
2926:4cd7545c4ebb | 2927:69e4716d6268 |
---|---|
271 multiplier = 10 | 271 multiplier = 10 |
272 else: | 272 else: |
273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) | 273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) |
274 return 2 ** 16 * multiplier + self._local_priority | 274 return 2 ** 16 * multiplier + self._local_priority |
275 | 275 |
276 def activate(self, sid, peer_jid, client): | 276 def activate(self, client, sid, peer_jid, local_jid): |
277 """Activate the proxy candidate | 277 """Activate the proxy candidate |
278 | 278 |
279 Send activation request as explained in XEP-0065 § 6.3.5 | 279 Send activation request as explained in XEP-0065 § 6.3.5 |
280 Must only be used with proxy candidates | 280 Must only be used with proxy candidates |
281 @param sid(unicode): session id (same as for getSessionHash) | 281 @param sid(unicode): session id (same as for getSessionHash) |
282 @param peer_jid(jid.JID): jid of the other peer | 282 @param peer_jid(jid.JID): jid of the other peer |
283 @return (D(domish.Element)): IQ result (or error) | 283 @return (D(domish.Element)): IQ result (or error) |
284 """ | 284 """ |
285 assert self.type == XEP_0065.TYPE_PROXY | 285 assert self.type == XEP_0065.TYPE_PROXY |
286 iq_elt = client.IQ() | 286 iq_elt = client.IQ() |
287 iq_elt["from"] = local_jid.full() | |
287 iq_elt["to"] = self.jid.full() | 288 iq_elt["to"] = self.jid.full() |
288 query_elt = iq_elt.addElement((NS_BS, "query")) | 289 query_elt = iq_elt.addElement((NS_BS, "query")) |
289 query_elt["sid"] = sid | 290 query_elt["sid"] = sid |
290 query_elt.addElement("activate", content=peer_jid.full()) | 291 query_elt.addElement("activate", content=peer_jid.full()) |
291 return iq_elt.send() | 292 return iq_elt.send() |
799 ) | 800 ) |
800 ) | 801 ) |
801 return self._server_factory | 802 return self._server_factory |
802 | 803 |
803 @defer.inlineCallbacks | 804 @defer.inlineCallbacks |
804 def getProxy(self, client): | 805 def getProxy(self, client, local_jid): |
805 """Return the proxy available for this profile | 806 """Return the proxy available for this profile |
806 | 807 |
807 cache is used between clients using the same server | 808 cache is used between clients using the same server |
809 @param local_jid(jid.JID): same as for [getCandidates] | |
808 @return ((D)(ProxyInfos, None)): Found proxy infos, | 810 @return ((D)(ProxyInfos, None)): Found proxy infos, |
809 or None if not acceptable proxy is found | 811 or None if not acceptable proxy is found |
812 @raise exceptions.NotFound: no Proxy found | |
810 """ | 813 """ |
811 | 814 |
812 def notFound(server): | 815 def notFound(server): |
813 log.info(u"No proxy found on this server") | 816 log.info(u"No proxy found on this server") |
814 self._cache_proxies[server] = None | 817 self._cache_proxies[server] = None |
815 defer.returnValue(None) | 818 raise exceptions.NotFound |
816 | 819 |
817 server = client.jid.host | 820 server = client.host if client.is_component else client.jid.host |
818 try: | 821 try: |
819 defer.returnValue(self._cache_proxies[server]) | 822 defer.returnValue(self._cache_proxies[server]) |
820 except KeyError: | 823 except KeyError: |
821 pass | 824 pass |
822 try: | 825 try: |
824 yield self.host.findServiceEntities(client, "proxy", "bytestreams") | 827 yield self.host.findServiceEntities(client, "proxy", "bytestreams") |
825 ).pop() | 828 ).pop() |
826 except (defer.CancelledError, StopIteration, KeyError): | 829 except (defer.CancelledError, StopIteration, KeyError): |
827 notFound(server) | 830 notFound(server) |
828 iq_elt = client.IQ("get") | 831 iq_elt = client.IQ("get") |
832 iq_elt["from"] = local_jid.full() | |
829 iq_elt["to"] = proxy.full() | 833 iq_elt["to"] = proxy.full() |
830 iq_elt.addElement((NS_BS, "query")) | 834 iq_elt.addElement((NS_BS, "query")) |
831 | 835 |
832 try: | 836 try: |
833 result_elt = yield iq_elt.send() | 837 result_elt = yield iq_elt.send() |
883 self._external_port = ext_port | 887 self._external_port = ext_port |
884 | 888 |
885 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | 889 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) |
886 | 890 |
887 @defer.inlineCallbacks | 891 @defer.inlineCallbacks |
888 def getCandidates(self, client): | 892 def getCandidates(self, client, local_jid): |
889 """Return a list of our stream candidates | 893 """Return a list of our stream candidates |
890 | 894 |
895 @param local_jid(jid.JID): jid to use as local jid | |
896 This is needed for client which can be addressed with a different jid than | |
897 client.jid if a local part is used (e.g. piotr@file.example.net where | |
898 client.jid would be file.example.net) | |
891 @return (D(list[Candidate])): list of candidates, ordered by priority | 899 @return (D(list[Candidate])): list of candidates, ordered by priority |
892 """ | 900 """ |
893 server_factory = yield self.getSocks5ServerFactory() | 901 server_factory = yield self.getSocks5ServerFactory() |
894 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) | 902 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) |
895 proxy = yield self.getProxy(client) | 903 try: |
904 proxy = yield self.getProxy(client, local_jid) | |
905 except exceptions.NotFound: | |
906 proxy = None | |
896 | 907 |
897 # its time to gather the candidates | 908 # its time to gather the candidates |
898 candidates = [] | 909 candidates = [] |
899 | 910 |
900 # first the direct ones | 911 # first the direct ones |
905 Candidate( | 916 Candidate( |
906 ip, | 917 ip, |
907 local_port, | 918 local_port, |
908 XEP_0065.TYPE_DIRECT, | 919 XEP_0065.TYPE_DIRECT, |
909 PRIORITY_BEST_DIRECT, | 920 PRIORITY_BEST_DIRECT, |
910 client.jid, | 921 local_jid, |
911 priority_local=True, | 922 priority_local=True, |
912 factory=server_factory, | 923 factory=server_factory, |
913 ) | 924 ) |
914 ) | 925 ) |
915 for ip in local_ips: | 926 for ip in local_ips: |
917 Candidate( | 928 Candidate( |
918 ip, | 929 ip, |
919 local_port, | 930 local_port, |
920 XEP_0065.TYPE_DIRECT, | 931 XEP_0065.TYPE_DIRECT, |
921 PRIORITY_DIRECT, | 932 PRIORITY_DIRECT, |
922 client.jid, | 933 local_jid, |
923 priority_local=True, | 934 priority_local=True, |
924 factory=server_factory, | 935 factory=server_factory, |
925 ) | 936 ) |
926 ) | 937 ) |
927 | 938 |
931 Candidate( | 942 Candidate( |
932 external_ip, | 943 external_ip, |
933 ext_port, | 944 ext_port, |
934 XEP_0065.TYPE_ASSISTED, | 945 XEP_0065.TYPE_ASSISTED, |
935 PRIORITY_ASSISTED, | 946 PRIORITY_ASSISTED, |
936 client.jid, | 947 local_jid, |
937 priority_local=True, | 948 priority_local=True, |
938 factory=server_factory, | 949 factory=server_factory, |
939 ) | 950 ) |
940 ) | 951 ) |
941 | 952 |
1132 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): | 1143 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): |
1133 pass | 1144 pass |
1134 | 1145 |
1135 return failure_ | 1146 return failure_ |
1136 | 1147 |
1137 def startStream(self, client, stream_object, to_jid, sid): | 1148 def startStream(self, client, stream_object, local_jid, to_jid, sid): |
1138 """Launch the stream workflow | 1149 """Launch the stream workflow |
1139 | 1150 |
1140 @param streamProducer: stream_object to use | 1151 @param streamProducer: stream_object to use |
1152 @param local_jid(jid.JID): same as for [getCandidates] | |
1141 @param to_jid: JID of the recipient | 1153 @param to_jid: JID of the recipient |
1142 @param sid: Stream session id | 1154 @param sid: Stream session id |
1143 @param successCb: method to call when stream successfuly finished | 1155 @param successCb: method to call when stream successfuly finished |
1144 @param failureCb: method to call when something goes wrong | 1156 @param failureCb: method to call when something goes wrong |
1145 @return (D): Deferred fired when session is finished | 1157 @return (D): Deferred fired when session is finished |
1146 """ | 1158 """ |
1147 session_data = self._createSession(client, stream_object, to_jid, sid, True) | 1159 session_data = self._createSession( |
1160 client, stream_object, local_jid, to_jid, sid, True) | |
1148 | 1161 |
1149 session_data[client] = client | 1162 session_data[client] = client |
1150 | 1163 |
1151 def gotCandidates(candidates): | 1164 def gotCandidates(candidates): |
1152 session_data["candidates"] = candidates | 1165 session_data["candidates"] = candidates |
1153 iq_elt = client.IQ() | 1166 iq_elt = client.IQ() |
1154 iq_elt["from"] = client.jid.full() | 1167 iq_elt["from"] = local_jid.full() |
1155 iq_elt["to"] = to_jid.full() | 1168 iq_elt["to"] = to_jid.full() |
1156 query_elt = iq_elt.addElement((NS_BS, "query")) | 1169 query_elt = iq_elt.addElement((NS_BS, "query")) |
1157 query_elt["mode"] = "tcp" | 1170 query_elt["mode"] = "tcp" |
1158 query_elt["sid"] = sid | 1171 query_elt["sid"] = sid |
1159 | 1172 |
1163 streamhost["port"] = str(candidate.port) | 1176 streamhost["port"] = str(candidate.port) |
1164 streamhost["jid"] = candidate.jid.full() | 1177 streamhost["jid"] = candidate.jid.full() |
1165 log.debug(u"Candidate proposed: {}".format(candidate)) | 1178 log.debug(u"Candidate proposed: {}".format(candidate)) |
1166 | 1179 |
1167 d = iq_elt.send() | 1180 d = iq_elt.send() |
1168 args = [session_data, client] | 1181 args = [client, session_data, local_jid] |
1169 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) | 1182 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) |
1170 | 1183 |
1171 self.getCandidates(client).addCallback(gotCandidates) | 1184 self.getCandidates(client, local_jid).addCallback(gotCandidates) |
1172 return session_data[DEFER_KEY] | 1185 return session_data[DEFER_KEY] |
1173 | 1186 |
1174 def _IQNegotiationCb(self, iq_elt, session_data, client): | 1187 def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid): |
1175 """Called when the result of open iq is received | 1188 """Called when the result of open iq is received |
1176 | 1189 |
1177 @param session_data(dict): data of the session | 1190 @param session_data(dict): data of the session |
1178 @param client: %(doc_client)s | 1191 @param client: %(doc_client)s |
1179 @param iq_elt(domish.Element): <iq> result | 1192 @param iq_elt(domish.Element): <iq> result |
1202 if candidate.type == XEP_0065.TYPE_PROXY: | 1215 if candidate.type == XEP_0065.TYPE_PROXY: |
1203 log.info(u"A Socks5 proxy is used") | 1216 log.info(u"A Socks5 proxy is used") |
1204 d = self.connectCandidate(client, candidate, session_data["hash"]) | 1217 d = self.connectCandidate(client, candidate, session_data["hash"]) |
1205 d.addCallback( | 1218 d.addCallback( |
1206 lambda __: candidate.activate( | 1219 lambda __: candidate.activate( |
1207 session_data["id"], session_data["peer_jid"], client | 1220 client, session_data["id"], session_data["peer_jid"], local_jid |
1208 ) | 1221 ) |
1209 ) | 1222 ) |
1210 d.addErrback(self._activationEb) | 1223 d.addErrback(self._activationEb) |
1211 else: | 1224 else: |
1212 d = defer.succeed(None) | 1225 d = defer.succeed(None) |
1214 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) | 1227 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) |
1215 | 1228 |
1216 def _activationEb(self, failure): | 1229 def _activationEb(self, failure): |
1217 log.warning(u"Proxy activation error: {}".format(failure.value)) | 1230 log.warning(u"Proxy activation error: {}".format(failure.value)) |
1218 | 1231 |
1219 def _IQNegotiationEb(self, stanza_err, session_data, client): | 1232 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): |
1220 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) | 1233 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) |
1221 # FIXME: must clean session | 1234 # FIXME: must clean session |
1222 | 1235 |
1223 def createSession(self, *args, **kwargs): | 1236 def createSession(self, *args, **kwargs): |
1224 """like [_createSession] but return the session deferred instead of the whole session | 1237 """like [_createSession] but return the session deferred instead of the whole session |
1225 | 1238 |
1226 session deferred is fired when transfer is finished | 1239 session deferred is fired when transfer is finished |
1227 """ | 1240 """ |
1228 return self._createSession(*args, **kwargs)[DEFER_KEY] | 1241 return self._createSession(*args, **kwargs)[DEFER_KEY] |
1229 | 1242 |
1230 def _createSession(self, client, stream_object, to_jid, sid, requester=False): | 1243 def _createSession(self, client, stream_object, local_jid, to_jid, sid, |
1244 requester=False): | |
1231 """Called when a bytestream is imminent | 1245 """Called when a bytestream is imminent |
1232 | 1246 |
1233 @param stream_object(iface.IStreamProducer): File object where data will be written | 1247 @param stream_object(iface.IStreamProducer): File object where data will be |
1248 written | |
1234 @param to_jid(jid.JId): jid of the other peer | 1249 @param to_jid(jid.JId): jid of the other peer |
1235 @param sid(unicode): session id | 1250 @param sid(unicode): session id |
1236 @param initiator(bool): if True, this session is create by initiator | 1251 @param initiator(bool): if True, this session is create by initiator |
1237 @return (dict): session data | 1252 @return (dict): session data |
1238 """ | 1253 """ |
1239 if sid in client.xep_0065_sid_session: | 1254 if sid in client.xep_0065_sid_session: |
1240 raise exceptions.ConflictError(u"A session with this id already exists !") | 1255 raise exceptions.ConflictError(u"A session with this id already exists !") |
1241 if requester: | 1256 if requester: |
1242 session_hash = getSessionHash(client.jid, to_jid, sid) | 1257 session_hash = getSessionHash(local_jid, to_jid, sid) |
1243 session_data = self._registerHash(client, session_hash, stream_object) | 1258 session_data = self._registerHash(client, session_hash, stream_object) |
1244 else: | 1259 else: |
1245 session_hash = getSessionHash(to_jid, client.jid, sid) | 1260 session_hash = getSessionHash(to_jid, local_jid, sid) |
1246 session_d = defer.Deferred() | 1261 session_d = defer.Deferred() |
1247 session_d.addBoth(self.killSession, session_hash, sid, client) | 1262 session_d.addBoth(self.killSession, session_hash, sid, client) |
1248 session_data = client._s5b_sessions[session_hash] = { | 1263 session_data = client._s5b_sessions[session_hash] = { |
1249 DEFER_KEY: session_d, | 1264 DEFER_KEY: session_d, |
1250 TIMER_KEY: reactor.callLater( | 1265 TIMER_KEY: reactor.callLater( |
1253 } | 1268 } |
1254 client.xep_0065_sid_session[sid] = session_data | 1269 client.xep_0065_sid_session[sid] = session_data |
1255 session_data.update( | 1270 session_data.update( |
1256 { | 1271 { |
1257 "id": sid, | 1272 "id": sid, |
1273 "local_jid": local_jid, | |
1258 "peer_jid": to_jid, | 1274 "peer_jid": to_jid, |
1259 "stream_object": stream_object, | 1275 "stream_object": stream_object, |
1260 "hash": session_hash, | 1276 "hash": session_hash, |
1261 } | 1277 } |
1262 ) | 1278 ) |