Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0065.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0065.py@524856bd7b19 |
children | b86912d3fd33 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for managing xep-0065 | |
5 | |
6 # Copyright (C) | |
7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) | |
8 # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) | |
9 # 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
10 | |
11 # This program is free software: you can redistribute it and/or modify | |
12 # it under the terms of the GNU Affero General Public License as published by | |
13 # the Free Software Foundation, either version 3 of the License, or | |
14 # (at your option) any later version. | |
15 | |
16 # This program is distributed in the hope that it will be useful, | |
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
19 # GNU Affero General Public License for more details. | |
20 | |
21 # You should have received a copy of the GNU Affero General Public License | |
22 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
23 | |
24 # -- | |
25 | |
26 # This module is based on proxy65 (http://code.google.com/p/proxy65), | |
27 # originaly written by David Smith and modified by Fabio Forno. | |
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | |
29 # license. | |
30 | |
31 # -- | |
32 | |
33 # Here is a copy of the original license: | |
34 | |
35 # Copyright (C) | |
36 # 2002-2004 Dave Smith (dizzyd@jabber.org) | |
37 # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) | |
38 | |
39 # Permission is hereby granted, free of charge, to any person obtaining a copy | |
40 # of this software and associated documentation files (the "Software"), to deal | |
41 # in the Software without restriction, including without limitation the rights | |
42 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
43 # copies of the Software, and to permit persons to whom the Software is | |
44 # furnished to do so, subject to the following conditions: | |
45 | |
46 # The above copyright notice and this permission notice shall be included in | |
47 # all copies or substantial portions of the Software. | |
48 | |
49 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
50 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
51 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
52 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
55 # THE SOFTWARE. | |
56 | |
57 import struct | |
58 import hashlib | |
59 import uuid | |
60 from collections import namedtuple | |
61 from zope.interface import implementer | |
62 from twisted.internet import protocol | |
63 from twisted.internet import reactor | |
64 from twisted.internet import error as internet_error | |
65 from twisted.words.protocols.jabber import error as jabber_error | |
66 from twisted.words.protocols.jabber import jid | |
67 from twisted.words.protocols.jabber import xmlstream | |
68 from twisted.internet import defer | |
69 from wokkel import disco, iwokkel | |
70 from libervia.backend.core.i18n import _ | |
71 from libervia.backend.core.log import getLogger | |
72 from libervia.backend.core.constants import Const as C | |
73 from libervia.backend.core import exceptions | |
74 from libervia.backend.tools import sat_defer | |
75 | |
76 | |
77 log = getLogger(__name__) | |
78 | |
79 | |
80 PLUGIN_INFO = { | |
81 C.PI_NAME: "XEP 0065 Plugin", | |
82 C.PI_IMPORT_NAME: "XEP-0065", | |
83 C.PI_TYPE: "XEP", | |
84 C.PI_MODES: C.PLUG_MODE_BOTH, | |
85 C.PI_PROTOCOLS: ["XEP-0065"], | |
86 C.PI_DEPENDENCIES: ["IP"], | |
87 C.PI_RECOMMENDATIONS: ["NAT-PORT"], | |
88 C.PI_MAIN: "XEP_0065", | |
89 C.PI_HANDLER: "yes", | |
90 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""), | |
91 } | |
92 | |
93 IQ_SET = '/iq[@type="set"]' | |
94 NS_BS = "http://jabber.org/protocol/bytestreams" | |
95 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | |
96 TIMER_KEY = "timer" | |
97 DEFER_KEY = "finished" # key of the deferred used to track session end | |
98 SERVER_STARTING_PORT = ( | |
99 0 | |
100 ) # starting number for server port search (0 to ask automatic attribution) | |
101 | |
102 # priorities are candidates local priorities, must be a int between 0 and 65535 | |
103 PRIORITY_BEST_DIRECT = 10000 | |
104 PRIORITY_DIRECT = 5000 | |
105 PRIORITY_ASSISTED = 1000 | |
106 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b | |
107 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 | |
108 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) | |
109 | |
110 TIMEOUT = 300 # maxium time between session creation and stream start | |
111 | |
112 # XXX: by default eveything is automatic | |
113 # TODO: use these params to force use of specific proxy/port/IP | |
114 # PARAMS = """ | |
115 # <params> | |
116 # <general> | |
117 # <category name="File Transfer"> | |
118 # <param name="Force IP" type="string" /> | |
119 # <param name="Force Port" type="int" constraint="1;65535" /> | |
120 # </category> | |
121 # </general> | |
122 # <individual> | |
123 # <category name="File Transfer"> | |
124 # <param name="Force Proxy" value="" type="string" /> | |
125 # <param name="Force Proxy host" value="" type="string" /> | |
126 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> | |
127 # </category> | |
128 # </individual> | |
129 # </params> | |
130 # """ | |
131 | |
132 ( | |
133 STATE_INITIAL, | |
134 STATE_AUTH, | |
135 STATE_REQUEST, | |
136 STATE_READY, | |
137 STATE_AUTH_USERPASS, | |
138 STATE_CLIENT_INITIAL, | |
139 STATE_CLIENT_AUTH, | |
140 STATE_CLIENT_REQUEST, | |
141 ) = range(8) | |
142 | |
143 SOCKS5_VER = 0x05 | |
144 | |
145 ADDR_IPV4 = 0x01 | |
146 ADDR_DOMAINNAME = 0x03 | |
147 ADDR_IPV6 = 0x04 | |
148 | |
149 CMD_CONNECT = 0x01 | |
150 CMD_BIND = 0x02 | |
151 CMD_UDPASSOC = 0x03 | |
152 | |
153 AUTHMECH_ANON = 0x00 | |
154 AUTHMECH_USERPASS = 0x02 | |
155 AUTHMECH_INVALID = 0xFF | |
156 | |
157 REPLY_SUCCESS = 0x00 | |
158 REPLY_GENERAL_FAILUR = 0x01 | |
159 REPLY_CONN_NOT_ALLOWED = 0x02 | |
160 REPLY_NETWORK_UNREACHABLE = 0x03 | |
161 REPLY_HOST_UNREACHABLE = 0x04 | |
162 REPLY_CONN_REFUSED = 0x05 | |
163 REPLY_TTL_EXPIRED = 0x06 | |
164 REPLY_CMD_NOT_SUPPORTED = 0x07 | |
165 REPLY_ADDR_NOT_SUPPORTED = 0x08 | |
166 | |
167 | |
168 ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"]) | |
169 | |
170 | |
171 class Candidate(object): | |
172 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, | |
173 factory=None,): | |
174 """ | |
175 @param host(unicode): host IP or domain | |
176 @param port(int): port | |
177 @param type_(unicode): stream type (one of XEP_0065.TYPE_*) | |
178 @param priority(int): priority | |
179 @param jid_(jid.JID): jid | |
180 @param id_(None, id_): Candidate ID, or None to generate | |
181 @param priority_local(bool): if True, priority is used as local priority, | |
182 else priority is used as global one (and local priority is set to 0) | |
183 """ | |
184 assert isinstance(jid_, jid.JID) | |
185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) | |
186 self.id = id_ if id_ is not None else str(uuid.uuid4()) | |
187 if priority_local: | |
188 self._local_priority = int(priority) | |
189 self._priority = self.calculate_priority() | |
190 else: | |
191 self._local_priority = 0 | |
192 self._priority = int(priority) | |
193 self.factory = factory | |
194 | |
195 def discard(self): | |
196 """Disconnect a candidate if it is connected | |
197 | |
198 Used to disconnect tryed client when they are discarded | |
199 """ | |
200 log.debug("Discarding {}".format(self)) | |
201 try: | |
202 self.factory.discard() | |
203 except AttributeError: | |
204 pass # no discard for Socks5ServerFactory | |
205 | |
206 @property | |
207 def local_priority(self): | |
208 return self._local_priority | |
209 | |
210 @property | |
211 def priority(self): | |
212 return self._priority | |
213 | |
214 def __str__(self): | |
215 return "Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( | |
216 self, id=" id={}".format(self.id if self.id is not None else "") | |
217 ) | |
218 | |
219 def __eq__(self, other): | |
220 # self.id is is not used in __eq__ as the same candidate can have | |
221 # different ids if proposed by initiator or responder | |
222 try: | |
223 return ( | |
224 self.host == other.host | |
225 and self.port == other.port | |
226 and self.jid == other.jid | |
227 ) | |
228 except (AttributeError, TypeError): | |
229 return False | |
230 | |
231 def __ne__(self, other): | |
232 return not self.__eq__(other) | |
233 | |
234 def calculate_priority(self): | |
235 """Calculate candidate priority according to XEP-0260 §2.2 | |
236 | |
237 | |
238 @return (int): priority | |
239 """ | |
240 if self.type == XEP_0065.TYPE_DIRECT: | |
241 multiplier = 126 | |
242 elif self.type == XEP_0065.TYPE_ASSISTED: | |
243 multiplier = 120 | |
244 elif self.type == XEP_0065.TYPE_TUNEL: | |
245 multiplier = 110 | |
246 elif self.type == XEP_0065.TYPE_PROXY: | |
247 multiplier = 10 | |
248 else: | |
249 raise exceptions.InternalError("Unknown {} type !".format(self.type)) | |
250 return 2 ** 16 * multiplier + self._local_priority | |
251 | |
252 def activate(self, client, sid, peer_jid, local_jid): | |
253 """Activate the proxy candidate | |
254 | |
255 Send activation request as explained in XEP-0065 § 6.3.5 | |
256 Must only be used with proxy candidates | |
257 @param sid(unicode): session id (same as for get_session_hash) | |
258 @param peer_jid(jid.JID): jid of the other peer | |
259 @return (D(domish.Element)): IQ result (or error) | |
260 """ | |
261 assert self.type == XEP_0065.TYPE_PROXY | |
262 iq_elt = client.IQ() | |
263 iq_elt["from"] = local_jid.full() | |
264 iq_elt["to"] = self.jid.full() | |
265 query_elt = iq_elt.addElement((NS_BS, "query")) | |
266 query_elt["sid"] = sid | |
267 query_elt.addElement("activate", content=peer_jid.full()) | |
268 return iq_elt.send() | |
269 | |
270 def start_transfer(self, session_hash=None): | |
271 if self.type == XEP_0065.TYPE_PROXY: | |
272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default | |
273 else: | |
274 chunk_size = None | |
275 self.factory.start_transfer(session_hash, chunk_size=chunk_size) | |
276 | |
277 | |
278 def get_session_hash(requester_jid, target_jid, sid): | |
279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 | |
280 | |
281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) | |
282 @param target_jid(jid.JID): jid of the target | |
283 @param sid(unicode): session id | |
284 @return (str): hash | |
285 """ | |
286 return hashlib.sha1( | |
287 (sid + requester_jid.full() + target_jid.full()).encode("utf-8") | |
288 ).hexdigest() | |
289 | |
290 | |
291 class SOCKSv5(protocol.Protocol): | |
292 CHUNK_SIZE = 2 ** 16 | |
293 | |
294 def __init__(self, session_hash=None): | |
295 """ | |
296 @param session_hash(str): hash of the session | |
297 must only be used in client mode | |
298 """ | |
299 self.connection = defer.Deferred() # called when connection/auth is done | |
300 if session_hash is not None: | |
301 assert isinstance(session_hash, str) | |
302 self.server_mode = False | |
303 self._session_hash = session_hash | |
304 self.state = STATE_CLIENT_INITIAL | |
305 else: | |
306 self.server_mode = True | |
307 self.state = STATE_INITIAL | |
308 self.buf = b"" | |
309 self.supportedAuthMechs = [AUTHMECH_ANON] | |
310 self.supportedAddrs = [ADDR_DOMAINNAME] | |
311 self.enabledCommands = [CMD_CONNECT] | |
312 self.peersock = None | |
313 self.addressType = 0 | |
314 self.requestType = 0 | |
315 self._stream_object = None | |
316 self.active = False # set to True when protocol is actually used for transfer | |
317 # used by factories to know when the finished Deferred can be triggered | |
318 | |
319 @property | |
320 def stream_object(self): | |
321 if self._stream_object is None: | |
322 self._stream_object = self.getSession()["stream_object"] | |
323 if self.server_mode: | |
324 self._stream_object.registerProducer(self.transport, True) | |
325 return self._stream_object | |
326 | |
327 def getSession(self): | |
328 """Return session associated with this candidate | |
329 | |
330 @return (dict): session data | |
331 """ | |
332 if self.server_mode: | |
333 return self.factory.getSession(self._session_hash) | |
334 else: | |
335 return self.factory.getSession() | |
336 | |
337 def _start_negotiation(self): | |
338 log.debug("starting negotiation (client mode)") | |
339 self.state = STATE_CLIENT_AUTH | |
340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) | |
341 | |
342 def _parse_negotiation(self): | |
343 try: | |
344 # Parse out data | |
345 ver, nmethod = struct.unpack("!BB", self.buf[:2]) | |
346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) | |
347 | |
348 # Ensure version is correct | |
349 if ver != 5: | |
350 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) | |
351 self.transport.loseConnection() | |
352 return | |
353 | |
354 # Trim off front of the buffer | |
355 self.buf = self.buf[nmethod + 2 :] | |
356 | |
357 # Check for supported auth mechs | |
358 for m in self.supportedAuthMechs: | |
359 if m in methods: | |
360 # Update internal state, according to selected method | |
361 if m == AUTHMECH_ANON: | |
362 self.state = STATE_REQUEST | |
363 elif m == AUTHMECH_USERPASS: | |
364 self.state = STATE_AUTH_USERPASS | |
365 # Complete negotiation w/ this method | |
366 self.transport.write(struct.pack("!BB", SOCKS5_VER, m)) | |
367 return | |
368 | |
369 # No supported mechs found, notify client and close the connection | |
370 log.warning("Unsupported authentication mechanism") | |
371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) | |
372 self.transport.loseConnection() | |
373 except struct.error: | |
374 pass | |
375 | |
376 def _parse_user_pass(self): | |
377 try: | |
378 # Parse out data | |
379 ver, ulen = struct.unpack("BB", self.buf[:2]) | |
380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) | |
381 plen, = struct.unpack("B", self.buf[ulen + 2]) | |
382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) | |
383 # Trim off fron of the buffer | |
384 self.buf = self.buf[3 + ulen + plen :] | |
385 # Fire event to authenticate user | |
386 if self.authenticate_user_pass(uname, password): | |
387 # Signal success | |
388 self.state = STATE_REQUEST | |
389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) | |
390 else: | |
391 # Signal failure | |
392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) | |
393 self.transport.loseConnection() | |
394 except struct.error: | |
395 pass | |
396 | |
397 def send_error_reply(self, errorcode): | |
398 # Any other address types are not supported | |
399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) | |
400 self.transport.write(result) | |
401 self.transport.loseConnection() | |
402 | |
403 def _parseRequest(self): | |
404 try: | |
405 # Parse out data and trim buffer accordingly | |
406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) | |
407 | |
408 # Ensure we actually support the requested address type | |
409 if self.addressType not in self.supportedAddrs: | |
410 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) | |
411 return | |
412 | |
413 # Deal with addresses | |
414 if self.addressType == ADDR_IPV4: | |
415 addr, port = struct.unpack("!IH", self.buf[4:10]) | |
416 self.buf = self.buf[10:] | |
417 elif self.addressType == ADDR_DOMAINNAME: | |
418 nlen = self.buf[4] | |
419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) | |
420 self.buf = self.buf[7 + len(addr) :] | |
421 else: | |
422 # Any other address types are not supported | |
423 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) | |
424 return | |
425 | |
426 # Ensure command is supported | |
427 if cmd not in self.enabledCommands: | |
428 # Send a not supported error | |
429 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) | |
430 return | |
431 | |
432 # Process the command | |
433 if cmd == CMD_CONNECT: | |
434 self.connect_requested(addr, port) | |
435 elif cmd == CMD_BIND: | |
436 self.bind_requested(addr, port) | |
437 else: | |
438 # Any other command is not supported | |
439 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) | |
440 | |
441 except struct.error: | |
442 # The buffer is probably not complete, we need to wait more | |
443 return None | |
444 | |
445 def _make_request(self): | |
446 hash_ = self._session_hash.encode('utf-8') | |
447 request = struct.pack( | |
448 "!5B%dsH" % len(hash_), | |
449 SOCKS5_VER, | |
450 CMD_CONNECT, | |
451 0, | |
452 ADDR_DOMAINNAME, | |
453 len(hash_), | |
454 hash_, | |
455 0, | |
456 ) | |
457 self.transport.write(request) | |
458 self.state = STATE_CLIENT_REQUEST | |
459 | |
460 def _parse_request_reply(self): | |
461 try: | |
462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) | |
463 # Ensure we actually support the requested address type | |
464 if self.addressType not in self.supportedAddrs: | |
465 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) | |
466 return | |
467 | |
468 # Deal with addresses | |
469 if self.addressType == ADDR_IPV4: | |
470 addr, port = struct.unpack("!IH", self.buf[4:10]) | |
471 self.buf = self.buf[10:] | |
472 elif self.addressType == ADDR_DOMAINNAME: | |
473 nlen = self.buf[4] | |
474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) | |
475 self.buf = self.buf[7 + len(addr) :] | |
476 else: | |
477 # Any other address types are not supported | |
478 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) | |
479 return | |
480 | |
481 # Ensure reply is OK | |
482 if rep != REPLY_SUCCESS: | |
483 self.loseConnection() | |
484 return | |
485 | |
486 self.state = STATE_READY | |
487 self.connection.callback(None) | |
488 | |
489 except struct.error: | |
490 # The buffer is probably not complete, we need to wait more | |
491 return None | |
492 | |
493 def connectionMade(self): | |
494 log.debug( | |
495 "Socks5 connectionMade (mode = {})".format( | |
496 "server" if self.state == STATE_INITIAL else "client" | |
497 ) | |
498 ) | |
499 if self.state == STATE_CLIENT_INITIAL: | |
500 self._start_negotiation() | |
501 | |
502 def connect_requested(self, addr, port): | |
503 # Check that this session is expected | |
504 if not self.factory.add_to_session(addr.decode('utf-8'), self): | |
505 log.warning( | |
506 "Unexpected connection request received from {host}".format( | |
507 host=self.transport.getPeer().host | |
508 ) | |
509 ) | |
510 self.send_error_reply(REPLY_CONN_REFUSED) | |
511 return | |
512 self._session_hash = addr.decode('utf-8') | |
513 self.connect_completed(addr, 0) | |
514 | |
515 def start_transfer(self, chunk_size): | |
516 """Callback called when the result iq is received | |
517 | |
518 @param chunk_size(None, int): size of the buffer, or None for default | |
519 """ | |
520 self.active = True | |
521 if chunk_size is not None: | |
522 self.CHUNK_SIZE = chunk_size | |
523 log.debug("Starting file transfer") | |
524 d = self.stream_object.start_stream(self.transport) | |
525 d.addCallback(self.stream_finished) | |
526 | |
527 def stream_finished(self, d): | |
528 log.info(_("File transfer completed, closing connection")) | |
529 self.transport.loseConnection() | |
530 | |
531 def connect_completed(self, remotehost, remoteport): | |
532 if self.addressType == ADDR_IPV4: | |
533 result = struct.pack( | |
534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport | |
535 ) | |
536 elif self.addressType == ADDR_DOMAINNAME: | |
537 result = struct.pack( | |
538 "!BBBBB%dsH" % len(remotehost), | |
539 SOCKS5_VER, | |
540 REPLY_SUCCESS, | |
541 0, | |
542 ADDR_DOMAINNAME, | |
543 len(remotehost), | |
544 remotehost, | |
545 remoteport, | |
546 ) | |
547 self.transport.write(result) | |
548 self.state = STATE_READY | |
549 | |
550 def bind_requested(self, addr, port): | |
551 pass | |
552 | |
553 def authenticate_user_pass(self, user, passwd): | |
554 # FIXME: implement authentication and remove the debug printing a password | |
555 log.debug("User/pass: %s/%s" % (user, passwd)) | |
556 return True | |
557 | |
558 def dataReceived(self, buf): | |
559 if self.state == STATE_READY: | |
560 # Everything is set, we just have to write the incoming data | |
561 self.stream_object.write(buf) | |
562 if not self.active: | |
563 self.active = True | |
564 self.getSession()[TIMER_KEY].cancel() | |
565 return | |
566 | |
567 self.buf = self.buf + buf | |
568 if self.state == STATE_INITIAL: | |
569 self._parse_negotiation() | |
570 if self.state == STATE_AUTH_USERPASS: | |
571 self._parse_user_pass() | |
572 if self.state == STATE_REQUEST: | |
573 self._parseRequest() | |
574 if self.state == STATE_CLIENT_REQUEST: | |
575 self._parse_request_reply() | |
576 if self.state == STATE_CLIENT_AUTH: | |
577 ver, method = struct.unpack("!BB", buf) | |
578 self.buf = self.buf[2:] | |
579 if ver != SOCKS5_VER or method != AUTHMECH_ANON: | |
580 self.transport.loseConnection() | |
581 else: | |
582 self._make_request() | |
583 | |
584 def connectionLost(self, reason): | |
585 log.debug("Socks5 connection lost: {}".format(reason.value)) | |
586 if self.state != STATE_READY: | |
587 self.connection.errback(reason) | |
588 if self.server_mode: | |
589 try: | |
590 session_hash = self._session_hash | |
591 except AttributeError: | |
592 log.debug("no session has been received yet") | |
593 else: | |
594 self.factory.remove_from_session(session_hash, self, reason) | |
595 | |
596 | |
597 class Socks5ServerFactory(protocol.ServerFactory): | |
598 protocol = SOCKSv5 | |
599 | |
600 def __init__(self, parent): | |
601 """ | |
602 @param parent(XEP_0065): XEP_0065 parent instance | |
603 """ | |
604 self.parent = parent | |
605 | |
606 def getSession(self, session_hash): | |
607 return self.parent.getSession(None, session_hash) | |
608 | |
609 def start_transfer(self, session_hash, chunk_size=None): | |
610 session = self.getSession(session_hash) | |
611 try: | |
612 protocol = session["protocols"][0] | |
613 except (KeyError, IndexError): | |
614 log.error("Can't start file transfer, can't find protocol") | |
615 else: | |
616 session[TIMER_KEY].cancel() | |
617 protocol.start_transfer(chunk_size) | |
618 | |
619 def add_to_session(self, session_hash, protocol): | |
620 """Check is session_hash is valid, and associate protocol with it | |
621 | |
622 the session will be associated to the corresponding candidate | |
623 @param session_hash(str): hash of the session | |
624 @param protocol(SOCKSv5): protocol instance | |
625 @param return(bool): True if hash was valid (i.e. expected), False else | |
626 """ | |
627 assert isinstance(session_hash, str) | |
628 try: | |
629 session_data = self.getSession(session_hash) | |
630 except KeyError: | |
631 return False | |
632 else: | |
633 session_data.setdefault("protocols", []).append(protocol) | |
634 return True | |
635 | |
636 def remove_from_session(self, session_hash, protocol, reason): | |
637 """Remove a protocol from session_data | |
638 | |
639 There can be several protocol instances while candidates are tried, they | |
640 have removed when candidate connection is closed | |
641 @param session_hash(str): hash of the session | |
642 @param protocol(SOCKSv5): protocol instance | |
643 @param reason(failure.Failure): reason of the removal | |
644 """ | |
645 try: | |
646 protocols = self.getSession(session_hash)["protocols"] | |
647 protocols.remove(protocol) | |
648 except (KeyError, ValueError): | |
649 log.error("Protocol not found in session while it should be there") | |
650 else: | |
651 if protocol.active: | |
652 # The active protocol has been removed, session is finished | |
653 if reason.check(internet_error.ConnectionDone): | |
654 self.getSession(session_hash)[DEFER_KEY].callback(None) | |
655 else: | |
656 self.getSession(session_hash)[DEFER_KEY].errback(reason) | |
657 | |
658 | |
659 class Socks5ClientFactory(protocol.ClientFactory): | |
660 protocol = SOCKSv5 | |
661 | |
662 def __init__(self, client, parent, session, session_hash): | |
663 """Init the Client Factory | |
664 | |
665 @param session(dict): session data | |
666 @param session_hash(unicode): hash used for peer_connection | |
667 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
668 """ | |
669 self.session = session | |
670 self.session_hash = session_hash | |
671 self.client = client | |
672 self.connection = defer.Deferred() | |
673 self._protocol_instance = None | |
674 self.connector = None | |
675 | |
676 def discard(self): | |
677 """Disconnect the client | |
678 | |
679 Also set a discarded flag, which avoid to call the session Deferred | |
680 """ | |
681 self.connector.disconnect() | |
682 | |
683 def getSession(self): | |
684 return self.session | |
685 | |
686 def start_transfer(self, __=None, chunk_size=None): | |
687 self.session[TIMER_KEY].cancel() | |
688 self._protocol_instance.start_transfer(chunk_size) | |
689 | |
690 def clientConnectionFailed(self, connector, reason): | |
691 log.debug("Connection failed") | |
692 self.connection.errback(reason) | |
693 | |
694 def clientConnectionLost(self, connector, reason): | |
695 log.debug(_("Socks 5 client connection lost (reason: %s)") % reason.value) | |
696 if self._protocol_instance.active: | |
697 # This one was used for the transfer, than mean that | |
698 # the Socks5 session is finished | |
699 if reason.check(internet_error.ConnectionDone): | |
700 self.getSession()[DEFER_KEY].callback(None) | |
701 else: | |
702 self.getSession()[DEFER_KEY].errback(reason) | |
703 self._protocol_instance = None | |
704 | |
705 def buildProtocol(self, addr): | |
706 log.debug(("Socks 5 client connection started")) | |
707 p = self.protocol(session_hash=self.session_hash) | |
708 p.factory = self | |
709 p.connection.chainDeferred(self.connection) | |
710 self._protocol_instance = p | |
711 return p | |
712 | |
713 | |
714 class XEP_0065(object): | |
715 NAMESPACE = NS_BS | |
716 TYPE_DIRECT = "direct" | |
717 TYPE_ASSISTED = "assisted" | |
718 TYPE_TUNEL = "tunel" | |
719 TYPE_PROXY = "proxy" | |
720 Candidate = Candidate | |
721 | |
722 def __init__(self, host): | |
723 log.info(_("Plugin XEP_0065 initialization")) | |
724 self.host = host | |
725 | |
726 # session data | |
727 self.hash_clients_map = {} # key: hash of the transfer session, value: session data | |
728 self._cache_proxies = {} # key: server jid, value: proxy data | |
729 | |
730 # misc data | |
731 self._server_factory = None | |
732 self._external_port = None | |
733 | |
734 # plugins shortcuts | |
735 self._ip = self.host.plugins["IP"] | |
736 try: | |
737 self._np = self.host.plugins["NAT-PORT"] | |
738 except KeyError: | |
739 log.debug("NAT Port plugin not available") | |
740 self._np = None | |
741 | |
742 # parameters | |
743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP | |
744 # host.memory.update_params(PARAMS) | |
745 | |
746 def get_handler(self, client): | |
747 return XEP_0065_handler(self) | |
748 | |
749 def profile_connected(self, client): | |
750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) | |
751 client._s5b_sessions = {} | |
752 | |
753 def get_session_hash(self, from_jid, to_jid, sid): | |
754 return get_session_hash(from_jid, to_jid, sid) | |
755 | |
756 def get_socks_5_server_factory(self): | |
757 """Return server factory | |
758 | |
759 The server is created if it doesn't exists yet | |
760 self._server_factory_port is set on server creation | |
761 """ | |
762 | |
763 if self._server_factory is None: | |
764 self._server_factory = Socks5ServerFactory(self) | |
765 for port in range(SERVER_STARTING_PORT, 65356): | |
766 try: | |
767 listening_port = reactor.listenTCP(port, self._server_factory) | |
768 except internet_error.CannotListenError as e: | |
769 log.debug( | |
770 "Cannot listen on port {port}: {err_msg}{err_num}".format( | |
771 port=port, | |
772 err_msg=e.socketError.strerror, | |
773 err_num=" (error code: {})".format(e.socketError.errno), | |
774 ) | |
775 ) | |
776 else: | |
777 self._server_factory_port = listening_port.getHost().port | |
778 break | |
779 | |
780 log.info( | |
781 _("Socks5 Stream server launched on port {}").format( | |
782 self._server_factory_port | |
783 ) | |
784 ) | |
785 return self._server_factory | |
786 | |
787 @defer.inlineCallbacks | |
788 def get_proxy(self, client, local_jid): | |
789 """Return the proxy available for this profile | |
790 | |
791 cache is used between clients using the same server | |
792 @param local_jid(jid.JID): same as for [get_candidates] | |
793 @return ((D)(ProxyInfos, None)): Found proxy infos, | |
794 or None if not acceptable proxy is found | |
795 @raise exceptions.NotFound: no Proxy found | |
796 """ | |
797 | |
798 def notFound(server): | |
799 log.info("No proxy found on this server") | |
800 self._cache_proxies[server] = None | |
801 raise exceptions.NotFound | |
802 | |
803 server = client.host if client.is_component else client.jid.host | |
804 try: | |
805 defer.returnValue(self._cache_proxies[server]) | |
806 except KeyError: | |
807 pass | |
808 try: | |
809 proxy = ( | |
810 yield self.host.find_service_entities(client, "proxy", "bytestreams") | |
811 ).pop() | |
812 except (defer.CancelledError, StopIteration, KeyError): | |
813 notFound(server) | |
814 iq_elt = client.IQ("get") | |
815 iq_elt["from"] = local_jid.full() | |
816 iq_elt["to"] = proxy.full() | |
817 iq_elt.addElement((NS_BS, "query")) | |
818 | |
819 try: | |
820 result_elt = yield iq_elt.send() | |
821 except jabber_error.StanzaError as failure: | |
822 log.warning( | |
823 "Error while requesting proxy info on {jid}: {error}".format( | |
824 jid=proxy.full(), error=failure | |
825 ) | |
826 ) | |
827 notFound(server) | |
828 | |
829 try: | |
830 query_elt = next(result_elt.elements(NS_BS, "query")) | |
831 streamhost_elt = next(query_elt.elements(NS_BS, "streamhost")) | |
832 host = streamhost_elt["host"] | |
833 jid_ = streamhost_elt["jid"] | |
834 port = streamhost_elt["port"] | |
835 if not all((host, jid, port)): | |
836 raise KeyError | |
837 jid_ = jid.JID(jid_) | |
838 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): | |
839 log.warning("Invalid proxy data received from {}".format(proxy.full())) | |
840 notFound(server) | |
841 | |
842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) | |
843 log.info("Proxy found: {}".format(proxy_infos)) | |
844 defer.returnValue(proxy_infos) | |
845 | |
846 @defer.inlineCallbacks | |
847 def _get_network_data(self, client): | |
848 """Retrieve information about network | |
849 | |
850 @param client: %(doc_client)s | |
851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data | |
852 """ | |
853 self.get_socks_5_server_factory() | |
854 local_port = self._server_factory_port | |
855 external_ip = yield self._ip.get_external_ip(client) | |
856 local_ips = yield self._ip.get_local_i_ps(client) | |
857 | |
858 if external_ip is not None and self._external_port is None: | |
859 if external_ip != local_ips[0]: | |
860 log.info("We are probably behind a NAT") | |
861 if self._np is None: | |
862 log.warning("NAT port plugin not available, we can't map port") | |
863 else: | |
864 ext_port = yield self._np.map_port( | |
865 local_port, desc="SaT socks5 stream" | |
866 ) | |
867 if ext_port is None: | |
868 log.warning("Can't map NAT port") | |
869 else: | |
870 self._external_port = ext_port | |
871 | |
872 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | |
873 | |
874 @defer.inlineCallbacks | |
875 def get_candidates(self, client, local_jid): | |
876 """Return a list of our stream candidates | |
877 | |
878 @param local_jid(jid.JID): jid to use as local jid | |
879 This is needed for client which can be addressed with a different jid than | |
880 client.jid if a local part is used (e.g. piotr@file.example.net where | |
881 client.jid would be file.example.net) | |
882 @return (D(list[Candidate])): list of candidates, ordered by priority | |
883 """ | |
884 server_factory = yield self.get_socks_5_server_factory() | |
885 local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client) | |
886 try: | |
887 proxy = yield self.get_proxy(client, local_jid) | |
888 except exceptions.NotFound: | |
889 proxy = None | |
890 | |
891 # its time to gather the candidates | |
892 candidates = [] | |
893 | |
894 # first the direct ones | |
895 | |
896 # the preferred direct connection | |
897 ip = local_ips.pop(0) | |
898 candidates.append( | |
899 Candidate( | |
900 ip, | |
901 local_port, | |
902 XEP_0065.TYPE_DIRECT, | |
903 PRIORITY_BEST_DIRECT, | |
904 local_jid, | |
905 priority_local=True, | |
906 factory=server_factory, | |
907 ) | |
908 ) | |
909 for ip in local_ips: | |
910 candidates.append( | |
911 Candidate( | |
912 ip, | |
913 local_port, | |
914 XEP_0065.TYPE_DIRECT, | |
915 PRIORITY_DIRECT, | |
916 local_jid, | |
917 priority_local=True, | |
918 factory=server_factory, | |
919 ) | |
920 ) | |
921 | |
922 # then the assisted one | |
923 if ext_port is not None: | |
924 candidates.append( | |
925 Candidate( | |
926 external_ip, | |
927 ext_port, | |
928 XEP_0065.TYPE_ASSISTED, | |
929 PRIORITY_ASSISTED, | |
930 local_jid, | |
931 priority_local=True, | |
932 factory=server_factory, | |
933 ) | |
934 ) | |
935 | |
936 # finally the proxy | |
937 if proxy: | |
938 candidates.append( | |
939 Candidate( | |
940 proxy.host, | |
941 proxy.port, | |
942 XEP_0065.TYPE_PROXY, | |
943 PRIORITY_PROXY, | |
944 proxy.jid, | |
945 priority_local=True, | |
946 ) | |
947 ) | |
948 | |
949 # should be already sorted, but just in case the priorities get weird | |
950 candidates.sort(key=lambda c: c.priority, reverse=True) | |
951 defer.returnValue(candidates) | |
952 | |
953 def _add_connector(self, connector, candidate): | |
954 """Add connector used to connect to candidate, and return client factory's connection Deferred | |
955 | |
956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion | |
957 @param connector: a connector implementing IConnector | |
958 @param candidate(Candidate): candidate linked to the connector | |
959 @return (D): Deferred fired when factory connection is done or has failed | |
960 """ | |
961 candidate.factory.connector = connector | |
962 return candidate.factory.connection | |
963 | |
964 def connect_candidate( | |
965 self, client, candidate, session_hash, peer_session_hash=None, delay=None | |
966 ): | |
967 """Connect to a candidate | |
968 | |
969 Connection will be done with a Socks5ClientFactory | |
970 @param candidate(Candidate): candidate to connect to | |
971 @param session_hash(unicode): hash of the session | |
972 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
973 @param peer_session_hash(unicode, None): hash used with the peer | |
974 None to use session_hash. | |
975 None must be used in 2 cases: | |
976 - when XEP-0065 is used with XEP-0096 | |
977 - when a peer connect to a proxy *he proposed himself* | |
978 in practice, peer_session_hash is only used by try_candidates | |
979 @param delay(None, float): optional delay to wait before connection, in seconds | |
980 @return (D): Deferred launched when TCP connection + Socks5 connection is done | |
981 """ | |
982 if peer_session_hash is None: | |
983 # for XEP-0065, only one hash is needed | |
984 peer_session_hash = session_hash | |
985 session = self.getSession(client, session_hash) | |
986 factory = Socks5ClientFactory(client, self, session, peer_session_hash) | |
987 candidate.factory = factory | |
988 if delay is None: | |
989 d = defer.succeed(candidate.host) | |
990 else: | |
991 d = sat_defer.DelayedDeferred(delay, candidate.host) | |
992 d.addCallback(reactor.connectTCP, candidate.port, factory) | |
993 d.addCallback(self._add_connector, candidate) | |
994 return d | |
995 | |
996 def try_candidates( | |
997 self, | |
998 client, | |
999 candidates, | |
1000 session_hash, | |
1001 peer_session_hash, | |
1002 connection_cb=None, | |
1003 connection_eb=None, | |
1004 ): | |
1005 defers_list = [] | |
1006 | |
1007 for candidate in candidates: | |
1008 delay = CANDIDATE_DELAY * len(defers_list) | |
1009 if candidate.type == XEP_0065.TYPE_PROXY: | |
1010 delay += CANDIDATE_DELAY_PROXY | |
1011 d = self.connect_candidate( | |
1012 client, candidate, session_hash, peer_session_hash, delay | |
1013 ) | |
1014 if connection_cb is not None: | |
1015 d.addCallback( | |
1016 lambda __, candidate=candidate, client=client: connection_cb( | |
1017 client, candidate | |
1018 ) | |
1019 ) | |
1020 if connection_eb is not None: | |
1021 d.addErrback(connection_eb, client, candidate) | |
1022 defers_list.append(d) | |
1023 | |
1024 return defers_list | |
1025 | |
1026 def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None): | |
1027 """Get best candidate (according to priority) which can connect | |
1028 | |
1029 @param candidates(iterable[Candidate]): candidates to test | |
1030 @param session_hash(unicode): hash of the session | |
1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
1032 @param peer_session_hash(unicode, None): hash of the other peer | |
1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates | |
1034 @return (D(None, Candidate)): best candidate or None if none can connect | |
1035 """ | |
1036 defer_candidates = None | |
1037 | |
1038 def connection_cb(client, candidate): | |
1039 log.info("Connection of {} successful".format(str(candidate))) | |
1040 for idx, other_candidate in enumerate(candidates): | |
1041 try: | |
1042 if other_candidate.priority < candidate.priority: | |
1043 log.debug("Cancelling {}".format(other_candidate)) | |
1044 defer_candidates[idx].cancel() | |
1045 except AttributeError: | |
1046 assert other_candidate is None | |
1047 | |
1048 def connection_eb(failure, client, candidate): | |
1049 if failure.check(defer.CancelledError): | |
1050 log.debug("Connection of {} has been cancelled".format(candidate)) | |
1051 else: | |
1052 log.info( | |
1053 "Connection of {candidate} Failed: {error}".format( | |
1054 candidate=candidate, error=failure.value | |
1055 ) | |
1056 ) | |
1057 candidates[candidates.index(candidate)] = None | |
1058 | |
1059 def all_tested(__): | |
1060 log.debug("All candidates have been tested") | |
1061 good_candidates = [c for c in candidates if c] | |
1062 return good_candidates[0] if good_candidates else None | |
1063 | |
1064 defer_candidates = self.try_candidates( | |
1065 client, | |
1066 candidates, | |
1067 session_hash, | |
1068 peer_session_hash, | |
1069 connection_cb, | |
1070 connection_eb, | |
1071 ) | |
1072 d_list = defer.DeferredList(defer_candidates) | |
1073 d_list.addCallback(all_tested) | |
1074 return d_list | |
1075 | |
1076 def _time_out(self, session_hash, client): | |
1077 """Called when stream was not started quickly enough | |
1078 | |
1079 @param session_hash(str): hash as returned by get_session_hash | |
1080 @param client: %(doc_client)s | |
1081 """ | |
1082 log.info("Socks5 Bytestream: TimeOut reached") | |
1083 session = self.getSession(client, session_hash) | |
1084 session[DEFER_KEY].errback(exceptions.TimeOutError()) | |
1085 | |
1086 def kill_session(self, failure_, session_hash, sid, client): | |
1087 """Clean the current session | |
1088 | |
1089 @param session_hash(str): hash as returned by get_session_hash | |
1090 @param sid(None, unicode): session id | |
1091 or None if self.xep_0065_sid_session was not used | |
1092 @param client: %(doc_client)s | |
1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else | |
1094 @return (None, failure.Failure): failure_ is returned | |
1095 """ | |
1096 log.debug( | |
1097 "Cleaning session with hash {hash}{id}: {reason}".format( | |
1098 hash=session_hash, | |
1099 reason="" if failure_ is None else failure_.value, | |
1100 id="" if sid is None else " (id: {})".format(sid), | |
1101 ) | |
1102 ) | |
1103 | |
1104 try: | |
1105 assert self.hash_clients_map[session_hash] == client | |
1106 del self.hash_clients_map[session_hash] | |
1107 except KeyError: | |
1108 pass | |
1109 | |
1110 if sid is not None: | |
1111 try: | |
1112 del client.xep_0065_sid_session[sid] | |
1113 except KeyError: | |
1114 log.warning("Session id {} is unknown".format(sid)) | |
1115 | |
1116 try: | |
1117 session_data = client._s5b_sessions[session_hash] | |
1118 except KeyError: | |
1119 log.warning("There is no session with this hash") | |
1120 return | |
1121 else: | |
1122 del client._s5b_sessions[session_hash] | |
1123 | |
1124 try: | |
1125 session_data["timer"].cancel() | |
1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): | |
1127 pass | |
1128 | |
1129 return failure_ | |
1130 | |
1131 def start_stream(self, client, stream_object, local_jid, to_jid, sid): | |
1132 """Launch the stream workflow | |
1133 | |
1134 @param streamProducer: stream_object to use | |
1135 @param local_jid(jid.JID): same as for [get_candidates] | |
1136 @param to_jid: JID of the recipient | |
1137 @param sid: Stream session id | |
1138 @param successCb: method to call when stream successfuly finished | |
1139 @param failureCb: method to call when something goes wrong | |
1140 @return (D): Deferred fired when session is finished | |
1141 """ | |
1142 session_data = self._create_session( | |
1143 client, stream_object, local_jid, to_jid, sid, True) | |
1144 | |
1145 session_data[client] = client | |
1146 | |
1147 def got_candidates(candidates): | |
1148 session_data["candidates"] = candidates | |
1149 iq_elt = client.IQ() | |
1150 iq_elt["from"] = local_jid.full() | |
1151 iq_elt["to"] = to_jid.full() | |
1152 query_elt = iq_elt.addElement((NS_BS, "query")) | |
1153 query_elt["mode"] = "tcp" | |
1154 query_elt["sid"] = sid | |
1155 | |
1156 for candidate in candidates: | |
1157 streamhost = query_elt.addElement("streamhost") | |
1158 streamhost["host"] = candidate.host | |
1159 streamhost["port"] = str(candidate.port) | |
1160 streamhost["jid"] = candidate.jid.full() | |
1161 log.debug("Candidate proposed: {}".format(candidate)) | |
1162 | |
1163 d = iq_elt.send() | |
1164 args = [client, session_data, local_jid] | |
1165 d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args) | |
1166 | |
1167 self.get_candidates(client, local_jid).addCallback(got_candidates) | |
1168 return session_data[DEFER_KEY] | |
1169 | |
1170 def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid): | |
1171 """Called when the result of open iq is received | |
1172 | |
1173 @param session_data(dict): data of the session | |
1174 @param client: %(doc_client)s | |
1175 @param iq_elt(domish.Element): <iq> result | |
1176 """ | |
1177 try: | |
1178 query_elt = next(iq_elt.elements(NS_BS, "query")) | |
1179 streamhost_used_elt = next(query_elt.elements(NS_BS, "streamhost-used")) | |
1180 except StopIteration: | |
1181 log.warning("No streamhost found in stream query") | |
1182 # FIXME: must clean session | |
1183 return | |
1184 | |
1185 streamhost_jid = jid.JID(streamhost_used_elt["jid"]) | |
1186 try: | |
1187 candidate = next(( | |
1188 c for c in session_data["candidates"] if c.jid == streamhost_jid | |
1189 )) | |
1190 except StopIteration: | |
1191 log.warning( | |
1192 "Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()) | |
1193 ) | |
1194 return | |
1195 else: | |
1196 log.info("Candidate choosed by target: {}".format(candidate)) | |
1197 | |
1198 if candidate.type == XEP_0065.TYPE_PROXY: | |
1199 log.info("A Socks5 proxy is used") | |
1200 d = self.connect_candidate(client, candidate, session_data["hash"]) | |
1201 d.addCallback( | |
1202 lambda __: candidate.activate( | |
1203 client, session_data["id"], session_data["peer_jid"], local_jid | |
1204 ) | |
1205 ) | |
1206 d.addErrback(self._activation_eb) | |
1207 else: | |
1208 d = defer.succeed(None) | |
1209 | |
1210 d.addCallback(lambda __: candidate.start_transfer(session_data["hash"])) | |
1211 | |
1212 def _activation_eb(self, failure): | |
1213 log.warning("Proxy activation error: {}".format(failure.value)) | |
1214 | |
1215 def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid): | |
1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) | |
1217 # FIXME: must clean session | |
1218 | |
1219 def create_session(self, *args, **kwargs): | |
1220 """like [_create_session] but return the session deferred instead of the whole session | |
1221 | |
1222 session deferred is fired when transfer is finished | |
1223 """ | |
1224 return self._create_session(*args, **kwargs)[DEFER_KEY] | |
1225 | |
1226 def _create_session(self, client, stream_object, local_jid, to_jid, sid, | |
1227 requester=False): | |
1228 """Called when a bytestream is imminent | |
1229 | |
1230 @param stream_object(iface.IStreamProducer): File object where data will be | |
1231 written | |
1232 @param to_jid(jid.JId): jid of the other peer | |
1233 @param sid(unicode): session id | |
1234 @param initiator(bool): if True, this session is create by initiator | |
1235 @return (dict): session data | |
1236 """ | |
1237 if sid in client.xep_0065_sid_session: | |
1238 raise exceptions.ConflictError("A session with this id already exists !") | |
1239 if requester: | |
1240 session_hash = get_session_hash(local_jid, to_jid, sid) | |
1241 session_data = self._register_hash(client, session_hash, stream_object) | |
1242 else: | |
1243 session_hash = get_session_hash(to_jid, local_jid, sid) | |
1244 session_d = defer.Deferred() | |
1245 session_d.addBoth(self.kill_session, session_hash, sid, client) | |
1246 session_data = client._s5b_sessions[session_hash] = { | |
1247 DEFER_KEY: session_d, | |
1248 TIMER_KEY: reactor.callLater( | |
1249 TIMEOUT, self._time_out, session_hash, client | |
1250 ), | |
1251 } | |
1252 client.xep_0065_sid_session[sid] = session_data | |
1253 session_data.update( | |
1254 { | |
1255 "id": sid, | |
1256 "local_jid": local_jid, | |
1257 "peer_jid": to_jid, | |
1258 "stream_object": stream_object, | |
1259 "hash": session_hash, | |
1260 } | |
1261 ) | |
1262 | |
1263 return session_data | |
1264 | |
1265 def getSession(self, client, session_hash): | |
1266 """Return session data | |
1267 | |
1268 @param session_hash(unicode): hash of the session | |
1269 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
1270 @param client(None, SatXMPPClient): client of the peer | |
1271 None is used only if client is unknown (this is only the case | |
1272 for incoming request received by Socks5ServerFactory). None must | |
1273 only be used by Socks5ServerFactory. | |
1274 See comments below for details | |
1275 @return (dict): session data | |
1276 """ | |
1277 assert isinstance(session_hash, str) | |
1278 if client is None: | |
1279 try: | |
1280 client = self.hash_clients_map[session_hash] | |
1281 except KeyError as e: | |
1282 log.warning("The requested session doesn't exists !") | |
1283 raise e | |
1284 return client._s5b_sessions[session_hash] | |
1285 | |
1286 def register_hash(self, *args, **kwargs): | |
1287 """like [_register_hash] but return the session deferred instead of the whole session | |
1288 session deferred is fired when transfer is finished | |
1289 """ | |
1290 return self._register_hash(*args, **kwargs)[DEFER_KEY] | |
1291 | |
1292 def _register_hash(self, client, session_hash, stream_object): | |
1293 """Create a session_data associated to hash | |
1294 | |
1295 @param session_hash(str): hash of the session | |
1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object | |
1297 None if it will be filled later | |
1298 return (dict): session data | |
1299 """ | |
1300 assert session_hash not in client._s5b_sessions | |
1301 session_d = defer.Deferred() | |
1302 session_d.addBoth(self.kill_session, session_hash, None, client) | |
1303 session_data = client._s5b_sessions[session_hash] = { | |
1304 DEFER_KEY: session_d, | |
1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client), | |
1306 } | |
1307 | |
1308 if stream_object is not None: | |
1309 session_data["stream_object"] = stream_object | |
1310 | |
1311 assert session_hash not in self.hash_clients_map | |
1312 self.hash_clients_map[session_hash] = client | |
1313 | |
1314 return session_data | |
1315 | |
1316 def associate_stream_object(self, client, session_hash, stream_object): | |
1317 """Associate a stream object with a session""" | |
1318 session_data = self.getSession(client, session_hash) | |
1319 assert "stream_object" not in session_data | |
1320 session_data["stream_object"] = stream_object | |
1321 | |
1322 def stream_query(self, iq_elt, client): | |
1323 log.debug("BS stream query") | |
1324 | |
1325 iq_elt.handled = True | |
1326 | |
1327 query_elt = next(iq_elt.elements(NS_BS, "query")) | |
1328 try: | |
1329 sid = query_elt["sid"] | |
1330 except KeyError: | |
1331 log.warning("Invalid bystreams request received") | |
1332 return client.sendError(iq_elt, "bad-request") | |
1333 | |
1334 streamhost_elts = list(query_elt.elements(NS_BS, "streamhost")) | |
1335 if not streamhost_elts: | |
1336 return client.sendError(iq_elt, "bad-request") | |
1337 | |
1338 try: | |
1339 session_data = client.xep_0065_sid_session[sid] | |
1340 except KeyError: | |
1341 log.warning("Ignoring unexpected BS transfer: {}".format(sid)) | |
1342 return client.sendError(iq_elt, "not-acceptable") | |
1343 | |
1344 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) | |
1345 | |
1346 candidates = [] | |
1347 nb_sh = len(streamhost_elts) | |
1348 for idx, sh_elt in enumerate(streamhost_elts): | |
1349 try: | |
1350 host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"]) | |
1351 except KeyError: | |
1352 log.warning("malformed streamhost element") | |
1353 return client.sendError(iq_elt, "bad-request") | |
1354 priority = nb_sh - idx | |
1355 if jid_.userhostJID() != peer_jid.userhostJID(): | |
1356 type_ = XEP_0065.TYPE_PROXY | |
1357 else: | |
1358 type_ = XEP_0065.TYPE_DIRECT | |
1359 candidates.append(Candidate(host, port, type_, priority, jid_)) | |
1360 | |
1361 for candidate in candidates: | |
1362 log.info("Candidate proposed: {}".format(candidate)) | |
1363 | |
1364 d = self.get_best_candidate(client, candidates, session_data["hash"]) | |
1365 d.addCallback(self._ack_stream, iq_elt, session_data, client) | |
1366 | |
1367 def _ack_stream(self, candidate, iq_elt, session_data, client): | |
1368 if candidate is None: | |
1369 log.info("No streamhost candidate worked, we have to end negotiation") | |
1370 return client.sendError(iq_elt, "item-not-found") | |
1371 log.info("We choose: {}".format(candidate)) | |
1372 result_elt = xmlstream.toResponse(iq_elt, "result") | |
1373 query_elt = result_elt.addElement((NS_BS, "query")) | |
1374 query_elt["sid"] = session_data["id"] | |
1375 streamhost_used_elt = query_elt.addElement("streamhost-used") | |
1376 streamhost_used_elt["jid"] = candidate.jid.full() | |
1377 client.send(result_elt) | |
1378 | |
1379 | |
1380 @implementer(iwokkel.IDisco) | |
1381 class XEP_0065_handler(xmlstream.XMPPHandler): | |
1382 | |
1383 def __init__(self, plugin_parent): | |
1384 self.plugin_parent = plugin_parent | |
1385 self.host = plugin_parent.host | |
1386 | |
1387 def connectionInitialized(self): | |
1388 self.xmlstream.addObserver( | |
1389 BS_REQUEST, self.plugin_parent.stream_query, client=self.parent | |
1390 ) | |
1391 | |
1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1393 return [disco.DiscoFeature(NS_BS)] | |
1394 | |
1395 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1396 return [] |