comparison libervia/backend/plugins/plugin_xep_0065.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0065.py@524856bd7b19
children b86912d3fd33
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for managing xep-0065
5
6 # Copyright (C)
7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org)
8 # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
9 # 2009-2021 Jérôme Poisson (goffi@goffi.org)
10
11 # This program is free software: you can redistribute it and/or modify
12 # it under the terms of the GNU Affero General Public License as published by
13 # the Free Software Foundation, either version 3 of the License, or
14 # (at your option) any later version.
15
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU Affero General Public License for more details.
20
21 # You should have received a copy of the GNU Affero General Public License
22 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23
24 # --
25
26 # This module is based on proxy65 (http://code.google.com/p/proxy65),
27 # originaly written by David Smith and modified by Fabio Forno.
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
29 # license.
30
31 # --
32
33 # Here is a copy of the original license:
34
35 # Copyright (C)
36 # 2002-2004 Dave Smith (dizzyd@jabber.org)
37 # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
38
39 # Permission is hereby granted, free of charge, to any person obtaining a copy
40 # of this software and associated documentation files (the "Software"), to deal
41 # in the Software without restriction, including without limitation the rights
42 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
43 # copies of the Software, and to permit persons to whom the Software is
44 # furnished to do so, subject to the following conditions:
45
46 # The above copyright notice and this permission notice shall be included in
47 # all copies or substantial portions of the Software.
48
49 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
52 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
55 # THE SOFTWARE.
56
57 import struct
58 import hashlib
59 import uuid
60 from collections import namedtuple
61 from zope.interface import implementer
62 from twisted.internet import protocol
63 from twisted.internet import reactor
64 from twisted.internet import error as internet_error
65 from twisted.words.protocols.jabber import error as jabber_error
66 from twisted.words.protocols.jabber import jid
67 from twisted.words.protocols.jabber import xmlstream
68 from twisted.internet import defer
69 from wokkel import disco, iwokkel
70 from libervia.backend.core.i18n import _
71 from libervia.backend.core.log import getLogger
72 from libervia.backend.core.constants import Const as C
73 from libervia.backend.core import exceptions
74 from libervia.backend.tools import sat_defer
75
76
77 log = getLogger(__name__)
78
79
80 PLUGIN_INFO = {
81 C.PI_NAME: "XEP 0065 Plugin",
82 C.PI_IMPORT_NAME: "XEP-0065",
83 C.PI_TYPE: "XEP",
84 C.PI_MODES: C.PLUG_MODE_BOTH,
85 C.PI_PROTOCOLS: ["XEP-0065"],
86 C.PI_DEPENDENCIES: ["IP"],
87 C.PI_RECOMMENDATIONS: ["NAT-PORT"],
88 C.PI_MAIN: "XEP_0065",
89 C.PI_HANDLER: "yes",
90 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""),
91 }
92
93 IQ_SET = '/iq[@type="set"]'
94 NS_BS = "http://jabber.org/protocol/bytestreams"
95 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
96 TIMER_KEY = "timer"
97 DEFER_KEY = "finished" # key of the deferred used to track session end
98 SERVER_STARTING_PORT = (
99 0
100 ) # starting number for server port search (0 to ask automatic attribution)
101
102 # priorities are candidates local priorities, must be a int between 0 and 65535
103 PRIORITY_BEST_DIRECT = 10000
104 PRIORITY_DIRECT = 5000
105 PRIORITY_ASSISTED = 1000
106 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
107 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
108 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
109
110 TIMEOUT = 300 # maxium time between session creation and stream start
111
112 # XXX: by default eveything is automatic
113 # TODO: use these params to force use of specific proxy/port/IP
114 # PARAMS = """
115 # <params>
116 # <general>
117 # <category name="File Transfer">
118 # <param name="Force IP" type="string" />
119 # <param name="Force Port" type="int" constraint="1;65535" />
120 # </category>
121 # </general>
122 # <individual>
123 # <category name="File Transfer">
124 # <param name="Force Proxy" value="" type="string" />
125 # <param name="Force Proxy host" value="" type="string" />
126 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" />
127 # </category>
128 # </individual>
129 # </params>
130 # """
131
132 (
133 STATE_INITIAL,
134 STATE_AUTH,
135 STATE_REQUEST,
136 STATE_READY,
137 STATE_AUTH_USERPASS,
138 STATE_CLIENT_INITIAL,
139 STATE_CLIENT_AUTH,
140 STATE_CLIENT_REQUEST,
141 ) = range(8)
142
143 SOCKS5_VER = 0x05
144
145 ADDR_IPV4 = 0x01
146 ADDR_DOMAINNAME = 0x03
147 ADDR_IPV6 = 0x04
148
149 CMD_CONNECT = 0x01
150 CMD_BIND = 0x02
151 CMD_UDPASSOC = 0x03
152
153 AUTHMECH_ANON = 0x00
154 AUTHMECH_USERPASS = 0x02
155 AUTHMECH_INVALID = 0xFF
156
157 REPLY_SUCCESS = 0x00
158 REPLY_GENERAL_FAILUR = 0x01
159 REPLY_CONN_NOT_ALLOWED = 0x02
160 REPLY_NETWORK_UNREACHABLE = 0x03
161 REPLY_HOST_UNREACHABLE = 0x04
162 REPLY_CONN_REFUSED = 0x05
163 REPLY_TTL_EXPIRED = 0x06
164 REPLY_CMD_NOT_SUPPORTED = 0x07
165 REPLY_ADDR_NOT_SUPPORTED = 0x08
166
167
168 ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"])
169
170
171 class Candidate(object):
172 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False,
173 factory=None,):
174 """
175 @param host(unicode): host IP or domain
176 @param port(int): port
177 @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
178 @param priority(int): priority
179 @param jid_(jid.JID): jid
180 @param id_(None, id_): Candidate ID, or None to generate
181 @param priority_local(bool): if True, priority is used as local priority,
182 else priority is used as global one (and local priority is set to 0)
183 """
184 assert isinstance(jid_, jid.JID)
185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
186 self.id = id_ if id_ is not None else str(uuid.uuid4())
187 if priority_local:
188 self._local_priority = int(priority)
189 self._priority = self.calculate_priority()
190 else:
191 self._local_priority = 0
192 self._priority = int(priority)
193 self.factory = factory
194
195 def discard(self):
196 """Disconnect a candidate if it is connected
197
198 Used to disconnect tryed client when they are discarded
199 """
200 log.debug("Discarding {}".format(self))
201 try:
202 self.factory.discard()
203 except AttributeError:
204 pass # no discard for Socks5ServerFactory
205
206 @property
207 def local_priority(self):
208 return self._local_priority
209
210 @property
211 def priority(self):
212 return self._priority
213
214 def __str__(self):
215 return "Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
216 self, id=" id={}".format(self.id if self.id is not None else "")
217 )
218
219 def __eq__(self, other):
220 # self.id is is not used in __eq__ as the same candidate can have
221 # different ids if proposed by initiator or responder
222 try:
223 return (
224 self.host == other.host
225 and self.port == other.port
226 and self.jid == other.jid
227 )
228 except (AttributeError, TypeError):
229 return False
230
231 def __ne__(self, other):
232 return not self.__eq__(other)
233
234 def calculate_priority(self):
235 """Calculate candidate priority according to XEP-0260 §2.2
236
237
238 @return (int): priority
239 """
240 if self.type == XEP_0065.TYPE_DIRECT:
241 multiplier = 126
242 elif self.type == XEP_0065.TYPE_ASSISTED:
243 multiplier = 120
244 elif self.type == XEP_0065.TYPE_TUNEL:
245 multiplier = 110
246 elif self.type == XEP_0065.TYPE_PROXY:
247 multiplier = 10
248 else:
249 raise exceptions.InternalError("Unknown {} type !".format(self.type))
250 return 2 ** 16 * multiplier + self._local_priority
251
252 def activate(self, client, sid, peer_jid, local_jid):
253 """Activate the proxy candidate
254
255 Send activation request as explained in XEP-0065 § 6.3.5
256 Must only be used with proxy candidates
257 @param sid(unicode): session id (same as for get_session_hash)
258 @param peer_jid(jid.JID): jid of the other peer
259 @return (D(domish.Element)): IQ result (or error)
260 """
261 assert self.type == XEP_0065.TYPE_PROXY
262 iq_elt = client.IQ()
263 iq_elt["from"] = local_jid.full()
264 iq_elt["to"] = self.jid.full()
265 query_elt = iq_elt.addElement((NS_BS, "query"))
266 query_elt["sid"] = sid
267 query_elt.addElement("activate", content=peer_jid.full())
268 return iq_elt.send()
269
270 def start_transfer(self, session_hash=None):
271 if self.type == XEP_0065.TYPE_PROXY:
272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default
273 else:
274 chunk_size = None
275 self.factory.start_transfer(session_hash, chunk_size=chunk_size)
276
277
278 def get_session_hash(requester_jid, target_jid, sid):
279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2
280
281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
282 @param target_jid(jid.JID): jid of the target
283 @param sid(unicode): session id
284 @return (str): hash
285 """
286 return hashlib.sha1(
287 (sid + requester_jid.full() + target_jid.full()).encode("utf-8")
288 ).hexdigest()
289
290
291 class SOCKSv5(protocol.Protocol):
292 CHUNK_SIZE = 2 ** 16
293
294 def __init__(self, session_hash=None):
295 """
296 @param session_hash(str): hash of the session
297 must only be used in client mode
298 """
299 self.connection = defer.Deferred() # called when connection/auth is done
300 if session_hash is not None:
301 assert isinstance(session_hash, str)
302 self.server_mode = False
303 self._session_hash = session_hash
304 self.state = STATE_CLIENT_INITIAL
305 else:
306 self.server_mode = True
307 self.state = STATE_INITIAL
308 self.buf = b""
309 self.supportedAuthMechs = [AUTHMECH_ANON]
310 self.supportedAddrs = [ADDR_DOMAINNAME]
311 self.enabledCommands = [CMD_CONNECT]
312 self.peersock = None
313 self.addressType = 0
314 self.requestType = 0
315 self._stream_object = None
316 self.active = False # set to True when protocol is actually used for transfer
317 # used by factories to know when the finished Deferred can be triggered
318
319 @property
320 def stream_object(self):
321 if self._stream_object is None:
322 self._stream_object = self.getSession()["stream_object"]
323 if self.server_mode:
324 self._stream_object.registerProducer(self.transport, True)
325 return self._stream_object
326
327 def getSession(self):
328 """Return session associated with this candidate
329
330 @return (dict): session data
331 """
332 if self.server_mode:
333 return self.factory.getSession(self._session_hash)
334 else:
335 return self.factory.getSession()
336
337 def _start_negotiation(self):
338 log.debug("starting negotiation (client mode)")
339 self.state = STATE_CLIENT_AUTH
340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON))
341
342 def _parse_negotiation(self):
343 try:
344 # Parse out data
345 ver, nmethod = struct.unpack("!BB", self.buf[:2])
346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2])
347
348 # Ensure version is correct
349 if ver != 5:
350 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
351 self.transport.loseConnection()
352 return
353
354 # Trim off front of the buffer
355 self.buf = self.buf[nmethod + 2 :]
356
357 # Check for supported auth mechs
358 for m in self.supportedAuthMechs:
359 if m in methods:
360 # Update internal state, according to selected method
361 if m == AUTHMECH_ANON:
362 self.state = STATE_REQUEST
363 elif m == AUTHMECH_USERPASS:
364 self.state = STATE_AUTH_USERPASS
365 # Complete negotiation w/ this method
366 self.transport.write(struct.pack("!BB", SOCKS5_VER, m))
367 return
368
369 # No supported mechs found, notify client and close the connection
370 log.warning("Unsupported authentication mechanism")
371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
372 self.transport.loseConnection()
373 except struct.error:
374 pass
375
376 def _parse_user_pass(self):
377 try:
378 # Parse out data
379 ver, ulen = struct.unpack("BB", self.buf[:2])
380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2])
381 plen, = struct.unpack("B", self.buf[ulen + 2])
382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen])
383 # Trim off fron of the buffer
384 self.buf = self.buf[3 + ulen + plen :]
385 # Fire event to authenticate user
386 if self.authenticate_user_pass(uname, password):
387 # Signal success
388 self.state = STATE_REQUEST
389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00))
390 else:
391 # Signal failure
392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01))
393 self.transport.loseConnection()
394 except struct.error:
395 pass
396
397 def send_error_reply(self, errorcode):
398 # Any other address types are not supported
399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0)
400 self.transport.write(result)
401 self.transport.loseConnection()
402
403 def _parseRequest(self):
404 try:
405 # Parse out data and trim buffer accordingly
406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
407
408 # Ensure we actually support the requested address type
409 if self.addressType not in self.supportedAddrs:
410 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
411 return
412
413 # Deal with addresses
414 if self.addressType == ADDR_IPV4:
415 addr, port = struct.unpack("!IH", self.buf[4:10])
416 self.buf = self.buf[10:]
417 elif self.addressType == ADDR_DOMAINNAME:
418 nlen = self.buf[4]
419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
420 self.buf = self.buf[7 + len(addr) :]
421 else:
422 # Any other address types are not supported
423 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
424 return
425
426 # Ensure command is supported
427 if cmd not in self.enabledCommands:
428 # Send a not supported error
429 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
430 return
431
432 # Process the command
433 if cmd == CMD_CONNECT:
434 self.connect_requested(addr, port)
435 elif cmd == CMD_BIND:
436 self.bind_requested(addr, port)
437 else:
438 # Any other command is not supported
439 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
440
441 except struct.error:
442 # The buffer is probably not complete, we need to wait more
443 return None
444
445 def _make_request(self):
446 hash_ = self._session_hash.encode('utf-8')
447 request = struct.pack(
448 "!5B%dsH" % len(hash_),
449 SOCKS5_VER,
450 CMD_CONNECT,
451 0,
452 ADDR_DOMAINNAME,
453 len(hash_),
454 hash_,
455 0,
456 )
457 self.transport.write(request)
458 self.state = STATE_CLIENT_REQUEST
459
460 def _parse_request_reply(self):
461 try:
462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
463 # Ensure we actually support the requested address type
464 if self.addressType not in self.supportedAddrs:
465 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
466 return
467
468 # Deal with addresses
469 if self.addressType == ADDR_IPV4:
470 addr, port = struct.unpack("!IH", self.buf[4:10])
471 self.buf = self.buf[10:]
472 elif self.addressType == ADDR_DOMAINNAME:
473 nlen = self.buf[4]
474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
475 self.buf = self.buf[7 + len(addr) :]
476 else:
477 # Any other address types are not supported
478 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
479 return
480
481 # Ensure reply is OK
482 if rep != REPLY_SUCCESS:
483 self.loseConnection()
484 return
485
486 self.state = STATE_READY
487 self.connection.callback(None)
488
489 except struct.error:
490 # The buffer is probably not complete, we need to wait more
491 return None
492
493 def connectionMade(self):
494 log.debug(
495 "Socks5 connectionMade (mode = {})".format(
496 "server" if self.state == STATE_INITIAL else "client"
497 )
498 )
499 if self.state == STATE_CLIENT_INITIAL:
500 self._start_negotiation()
501
502 def connect_requested(self, addr, port):
503 # Check that this session is expected
504 if not self.factory.add_to_session(addr.decode('utf-8'), self):
505 log.warning(
506 "Unexpected connection request received from {host}".format(
507 host=self.transport.getPeer().host
508 )
509 )
510 self.send_error_reply(REPLY_CONN_REFUSED)
511 return
512 self._session_hash = addr.decode('utf-8')
513 self.connect_completed(addr, 0)
514
515 def start_transfer(self, chunk_size):
516 """Callback called when the result iq is received
517
518 @param chunk_size(None, int): size of the buffer, or None for default
519 """
520 self.active = True
521 if chunk_size is not None:
522 self.CHUNK_SIZE = chunk_size
523 log.debug("Starting file transfer")
524 d = self.stream_object.start_stream(self.transport)
525 d.addCallback(self.stream_finished)
526
527 def stream_finished(self, d):
528 log.info(_("File transfer completed, closing connection"))
529 self.transport.loseConnection()
530
531 def connect_completed(self, remotehost, remoteport):
532 if self.addressType == ADDR_IPV4:
533 result = struct.pack(
534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport
535 )
536 elif self.addressType == ADDR_DOMAINNAME:
537 result = struct.pack(
538 "!BBBBB%dsH" % len(remotehost),
539 SOCKS5_VER,
540 REPLY_SUCCESS,
541 0,
542 ADDR_DOMAINNAME,
543 len(remotehost),
544 remotehost,
545 remoteport,
546 )
547 self.transport.write(result)
548 self.state = STATE_READY
549
550 def bind_requested(self, addr, port):
551 pass
552
553 def authenticate_user_pass(self, user, passwd):
554 # FIXME: implement authentication and remove the debug printing a password
555 log.debug("User/pass: %s/%s" % (user, passwd))
556 return True
557
558 def dataReceived(self, buf):
559 if self.state == STATE_READY:
560 # Everything is set, we just have to write the incoming data
561 self.stream_object.write(buf)
562 if not self.active:
563 self.active = True
564 self.getSession()[TIMER_KEY].cancel()
565 return
566
567 self.buf = self.buf + buf
568 if self.state == STATE_INITIAL:
569 self._parse_negotiation()
570 if self.state == STATE_AUTH_USERPASS:
571 self._parse_user_pass()
572 if self.state == STATE_REQUEST:
573 self._parseRequest()
574 if self.state == STATE_CLIENT_REQUEST:
575 self._parse_request_reply()
576 if self.state == STATE_CLIENT_AUTH:
577 ver, method = struct.unpack("!BB", buf)
578 self.buf = self.buf[2:]
579 if ver != SOCKS5_VER or method != AUTHMECH_ANON:
580 self.transport.loseConnection()
581 else:
582 self._make_request()
583
584 def connectionLost(self, reason):
585 log.debug("Socks5 connection lost: {}".format(reason.value))
586 if self.state != STATE_READY:
587 self.connection.errback(reason)
588 if self.server_mode:
589 try:
590 session_hash = self._session_hash
591 except AttributeError:
592 log.debug("no session has been received yet")
593 else:
594 self.factory.remove_from_session(session_hash, self, reason)
595
596
597 class Socks5ServerFactory(protocol.ServerFactory):
598 protocol = SOCKSv5
599
600 def __init__(self, parent):
601 """
602 @param parent(XEP_0065): XEP_0065 parent instance
603 """
604 self.parent = parent
605
606 def getSession(self, session_hash):
607 return self.parent.getSession(None, session_hash)
608
609 def start_transfer(self, session_hash, chunk_size=None):
610 session = self.getSession(session_hash)
611 try:
612 protocol = session["protocols"][0]
613 except (KeyError, IndexError):
614 log.error("Can't start file transfer, can't find protocol")
615 else:
616 session[TIMER_KEY].cancel()
617 protocol.start_transfer(chunk_size)
618
619 def add_to_session(self, session_hash, protocol):
620 """Check is session_hash is valid, and associate protocol with it
621
622 the session will be associated to the corresponding candidate
623 @param session_hash(str): hash of the session
624 @param protocol(SOCKSv5): protocol instance
625 @param return(bool): True if hash was valid (i.e. expected), False else
626 """
627 assert isinstance(session_hash, str)
628 try:
629 session_data = self.getSession(session_hash)
630 except KeyError:
631 return False
632 else:
633 session_data.setdefault("protocols", []).append(protocol)
634 return True
635
636 def remove_from_session(self, session_hash, protocol, reason):
637 """Remove a protocol from session_data
638
639 There can be several protocol instances while candidates are tried, they
640 have removed when candidate connection is closed
641 @param session_hash(str): hash of the session
642 @param protocol(SOCKSv5): protocol instance
643 @param reason(failure.Failure): reason of the removal
644 """
645 try:
646 protocols = self.getSession(session_hash)["protocols"]
647 protocols.remove(protocol)
648 except (KeyError, ValueError):
649 log.error("Protocol not found in session while it should be there")
650 else:
651 if protocol.active:
652 # The active protocol has been removed, session is finished
653 if reason.check(internet_error.ConnectionDone):
654 self.getSession(session_hash)[DEFER_KEY].callback(None)
655 else:
656 self.getSession(session_hash)[DEFER_KEY].errback(reason)
657
658
659 class Socks5ClientFactory(protocol.ClientFactory):
660 protocol = SOCKSv5
661
662 def __init__(self, client, parent, session, session_hash):
663 """Init the Client Factory
664
665 @param session(dict): session data
666 @param session_hash(unicode): hash used for peer_connection
667 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
668 """
669 self.session = session
670 self.session_hash = session_hash
671 self.client = client
672 self.connection = defer.Deferred()
673 self._protocol_instance = None
674 self.connector = None
675
676 def discard(self):
677 """Disconnect the client
678
679 Also set a discarded flag, which avoid to call the session Deferred
680 """
681 self.connector.disconnect()
682
683 def getSession(self):
684 return self.session
685
686 def start_transfer(self, __=None, chunk_size=None):
687 self.session[TIMER_KEY].cancel()
688 self._protocol_instance.start_transfer(chunk_size)
689
690 def clientConnectionFailed(self, connector, reason):
691 log.debug("Connection failed")
692 self.connection.errback(reason)
693
694 def clientConnectionLost(self, connector, reason):
695 log.debug(_("Socks 5 client connection lost (reason: %s)") % reason.value)
696 if self._protocol_instance.active:
697 # This one was used for the transfer, than mean that
698 # the Socks5 session is finished
699 if reason.check(internet_error.ConnectionDone):
700 self.getSession()[DEFER_KEY].callback(None)
701 else:
702 self.getSession()[DEFER_KEY].errback(reason)
703 self._protocol_instance = None
704
705 def buildProtocol(self, addr):
706 log.debug(("Socks 5 client connection started"))
707 p = self.protocol(session_hash=self.session_hash)
708 p.factory = self
709 p.connection.chainDeferred(self.connection)
710 self._protocol_instance = p
711 return p
712
713
714 class XEP_0065(object):
715 NAMESPACE = NS_BS
716 TYPE_DIRECT = "direct"
717 TYPE_ASSISTED = "assisted"
718 TYPE_TUNEL = "tunel"
719 TYPE_PROXY = "proxy"
720 Candidate = Candidate
721
722 def __init__(self, host):
723 log.info(_("Plugin XEP_0065 initialization"))
724 self.host = host
725
726 # session data
727 self.hash_clients_map = {} # key: hash of the transfer session, value: session data
728 self._cache_proxies = {} # key: server jid, value: proxy data
729
730 # misc data
731 self._server_factory = None
732 self._external_port = None
733
734 # plugins shortcuts
735 self._ip = self.host.plugins["IP"]
736 try:
737 self._np = self.host.plugins["NAT-PORT"]
738 except KeyError:
739 log.debug("NAT Port plugin not available")
740 self._np = None
741
742 # parameters
743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
744 # host.memory.update_params(PARAMS)
745
746 def get_handler(self, client):
747 return XEP_0065_handler(self)
748
749 def profile_connected(self, client):
750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict)
751 client._s5b_sessions = {}
752
753 def get_session_hash(self, from_jid, to_jid, sid):
754 return get_session_hash(from_jid, to_jid, sid)
755
756 def get_socks_5_server_factory(self):
757 """Return server factory
758
759 The server is created if it doesn't exists yet
760 self._server_factory_port is set on server creation
761 """
762
763 if self._server_factory is None:
764 self._server_factory = Socks5ServerFactory(self)
765 for port in range(SERVER_STARTING_PORT, 65356):
766 try:
767 listening_port = reactor.listenTCP(port, self._server_factory)
768 except internet_error.CannotListenError as e:
769 log.debug(
770 "Cannot listen on port {port}: {err_msg}{err_num}".format(
771 port=port,
772 err_msg=e.socketError.strerror,
773 err_num=" (error code: {})".format(e.socketError.errno),
774 )
775 )
776 else:
777 self._server_factory_port = listening_port.getHost().port
778 break
779
780 log.info(
781 _("Socks5 Stream server launched on port {}").format(
782 self._server_factory_port
783 )
784 )
785 return self._server_factory
786
787 @defer.inlineCallbacks
788 def get_proxy(self, client, local_jid):
789 """Return the proxy available for this profile
790
791 cache is used between clients using the same server
792 @param local_jid(jid.JID): same as for [get_candidates]
793 @return ((D)(ProxyInfos, None)): Found proxy infos,
794 or None if not acceptable proxy is found
795 @raise exceptions.NotFound: no Proxy found
796 """
797
798 def notFound(server):
799 log.info("No proxy found on this server")
800 self._cache_proxies[server] = None
801 raise exceptions.NotFound
802
803 server = client.host if client.is_component else client.jid.host
804 try:
805 defer.returnValue(self._cache_proxies[server])
806 except KeyError:
807 pass
808 try:
809 proxy = (
810 yield self.host.find_service_entities(client, "proxy", "bytestreams")
811 ).pop()
812 except (defer.CancelledError, StopIteration, KeyError):
813 notFound(server)
814 iq_elt = client.IQ("get")
815 iq_elt["from"] = local_jid.full()
816 iq_elt["to"] = proxy.full()
817 iq_elt.addElement((NS_BS, "query"))
818
819 try:
820 result_elt = yield iq_elt.send()
821 except jabber_error.StanzaError as failure:
822 log.warning(
823 "Error while requesting proxy info on {jid}: {error}".format(
824 jid=proxy.full(), error=failure
825 )
826 )
827 notFound(server)
828
829 try:
830 query_elt = next(result_elt.elements(NS_BS, "query"))
831 streamhost_elt = next(query_elt.elements(NS_BS, "streamhost"))
832 host = streamhost_elt["host"]
833 jid_ = streamhost_elt["jid"]
834 port = streamhost_elt["port"]
835 if not all((host, jid, port)):
836 raise KeyError
837 jid_ = jid.JID(jid_)
838 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError):
839 log.warning("Invalid proxy data received from {}".format(proxy.full()))
840 notFound(server)
841
842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
843 log.info("Proxy found: {}".format(proxy_infos))
844 defer.returnValue(proxy_infos)
845
846 @defer.inlineCallbacks
847 def _get_network_data(self, client):
848 """Retrieve information about network
849
850 @param client: %(doc_client)s
851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
852 """
853 self.get_socks_5_server_factory()
854 local_port = self._server_factory_port
855 external_ip = yield self._ip.get_external_ip(client)
856 local_ips = yield self._ip.get_local_i_ps(client)
857
858 if external_ip is not None and self._external_port is None:
859 if external_ip != local_ips[0]:
860 log.info("We are probably behind a NAT")
861 if self._np is None:
862 log.warning("NAT port plugin not available, we can't map port")
863 else:
864 ext_port = yield self._np.map_port(
865 local_port, desc="SaT socks5 stream"
866 )
867 if ext_port is None:
868 log.warning("Can't map NAT port")
869 else:
870 self._external_port = ext_port
871
872 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
873
874 @defer.inlineCallbacks
875 def get_candidates(self, client, local_jid):
876 """Return a list of our stream candidates
877
878 @param local_jid(jid.JID): jid to use as local jid
879 This is needed for client which can be addressed with a different jid than
880 client.jid if a local part is used (e.g. piotr@file.example.net where
881 client.jid would be file.example.net)
882 @return (D(list[Candidate])): list of candidates, ordered by priority
883 """
884 server_factory = yield self.get_socks_5_server_factory()
885 local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client)
886 try:
887 proxy = yield self.get_proxy(client, local_jid)
888 except exceptions.NotFound:
889 proxy = None
890
891 # its time to gather the candidates
892 candidates = []
893
894 # first the direct ones
895
896 # the preferred direct connection
897 ip = local_ips.pop(0)
898 candidates.append(
899 Candidate(
900 ip,
901 local_port,
902 XEP_0065.TYPE_DIRECT,
903 PRIORITY_BEST_DIRECT,
904 local_jid,
905 priority_local=True,
906 factory=server_factory,
907 )
908 )
909 for ip in local_ips:
910 candidates.append(
911 Candidate(
912 ip,
913 local_port,
914 XEP_0065.TYPE_DIRECT,
915 PRIORITY_DIRECT,
916 local_jid,
917 priority_local=True,
918 factory=server_factory,
919 )
920 )
921
922 # then the assisted one
923 if ext_port is not None:
924 candidates.append(
925 Candidate(
926 external_ip,
927 ext_port,
928 XEP_0065.TYPE_ASSISTED,
929 PRIORITY_ASSISTED,
930 local_jid,
931 priority_local=True,
932 factory=server_factory,
933 )
934 )
935
936 # finally the proxy
937 if proxy:
938 candidates.append(
939 Candidate(
940 proxy.host,
941 proxy.port,
942 XEP_0065.TYPE_PROXY,
943 PRIORITY_PROXY,
944 proxy.jid,
945 priority_local=True,
946 )
947 )
948
949 # should be already sorted, but just in case the priorities get weird
950 candidates.sort(key=lambda c: c.priority, reverse=True)
951 defer.returnValue(candidates)
952
953 def _add_connector(self, connector, candidate):
954 """Add connector used to connect to candidate, and return client factory's connection Deferred
955
956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
957 @param connector: a connector implementing IConnector
958 @param candidate(Candidate): candidate linked to the connector
959 @return (D): Deferred fired when factory connection is done or has failed
960 """
961 candidate.factory.connector = connector
962 return candidate.factory.connection
963
964 def connect_candidate(
965 self, client, candidate, session_hash, peer_session_hash=None, delay=None
966 ):
967 """Connect to a candidate
968
969 Connection will be done with a Socks5ClientFactory
970 @param candidate(Candidate): candidate to connect to
971 @param session_hash(unicode): hash of the session
972 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
973 @param peer_session_hash(unicode, None): hash used with the peer
974 None to use session_hash.
975 None must be used in 2 cases:
976 - when XEP-0065 is used with XEP-0096
977 - when a peer connect to a proxy *he proposed himself*
978 in practice, peer_session_hash is only used by try_candidates
979 @param delay(None, float): optional delay to wait before connection, in seconds
980 @return (D): Deferred launched when TCP connection + Socks5 connection is done
981 """
982 if peer_session_hash is None:
983 # for XEP-0065, only one hash is needed
984 peer_session_hash = session_hash
985 session = self.getSession(client, session_hash)
986 factory = Socks5ClientFactory(client, self, session, peer_session_hash)
987 candidate.factory = factory
988 if delay is None:
989 d = defer.succeed(candidate.host)
990 else:
991 d = sat_defer.DelayedDeferred(delay, candidate.host)
992 d.addCallback(reactor.connectTCP, candidate.port, factory)
993 d.addCallback(self._add_connector, candidate)
994 return d
995
996 def try_candidates(
997 self,
998 client,
999 candidates,
1000 session_hash,
1001 peer_session_hash,
1002 connection_cb=None,
1003 connection_eb=None,
1004 ):
1005 defers_list = []
1006
1007 for candidate in candidates:
1008 delay = CANDIDATE_DELAY * len(defers_list)
1009 if candidate.type == XEP_0065.TYPE_PROXY:
1010 delay += CANDIDATE_DELAY_PROXY
1011 d = self.connect_candidate(
1012 client, candidate, session_hash, peer_session_hash, delay
1013 )
1014 if connection_cb is not None:
1015 d.addCallback(
1016 lambda __, candidate=candidate, client=client: connection_cb(
1017 client, candidate
1018 )
1019 )
1020 if connection_eb is not None:
1021 d.addErrback(connection_eb, client, candidate)
1022 defers_list.append(d)
1023
1024 return defers_list
1025
1026 def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None):
1027 """Get best candidate (according to priority) which can connect
1028
1029 @param candidates(iterable[Candidate]): candidates to test
1030 @param session_hash(unicode): hash of the session
1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
1032 @param peer_session_hash(unicode, None): hash of the other peer
1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
1034 @return (D(None, Candidate)): best candidate or None if none can connect
1035 """
1036 defer_candidates = None
1037
1038 def connection_cb(client, candidate):
1039 log.info("Connection of {} successful".format(str(candidate)))
1040 for idx, other_candidate in enumerate(candidates):
1041 try:
1042 if other_candidate.priority < candidate.priority:
1043 log.debug("Cancelling {}".format(other_candidate))
1044 defer_candidates[idx].cancel()
1045 except AttributeError:
1046 assert other_candidate is None
1047
1048 def connection_eb(failure, client, candidate):
1049 if failure.check(defer.CancelledError):
1050 log.debug("Connection of {} has been cancelled".format(candidate))
1051 else:
1052 log.info(
1053 "Connection of {candidate} Failed: {error}".format(
1054 candidate=candidate, error=failure.value
1055 )
1056 )
1057 candidates[candidates.index(candidate)] = None
1058
1059 def all_tested(__):
1060 log.debug("All candidates have been tested")
1061 good_candidates = [c for c in candidates if c]
1062 return good_candidates[0] if good_candidates else None
1063
1064 defer_candidates = self.try_candidates(
1065 client,
1066 candidates,
1067 session_hash,
1068 peer_session_hash,
1069 connection_cb,
1070 connection_eb,
1071 )
1072 d_list = defer.DeferredList(defer_candidates)
1073 d_list.addCallback(all_tested)
1074 return d_list
1075
1076 def _time_out(self, session_hash, client):
1077 """Called when stream was not started quickly enough
1078
1079 @param session_hash(str): hash as returned by get_session_hash
1080 @param client: %(doc_client)s
1081 """
1082 log.info("Socks5 Bytestream: TimeOut reached")
1083 session = self.getSession(client, session_hash)
1084 session[DEFER_KEY].errback(exceptions.TimeOutError())
1085
1086 def kill_session(self, failure_, session_hash, sid, client):
1087 """Clean the current session
1088
1089 @param session_hash(str): hash as returned by get_session_hash
1090 @param sid(None, unicode): session id
1091 or None if self.xep_0065_sid_session was not used
1092 @param client: %(doc_client)s
1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else
1094 @return (None, failure.Failure): failure_ is returned
1095 """
1096 log.debug(
1097 "Cleaning session with hash {hash}{id}: {reason}".format(
1098 hash=session_hash,
1099 reason="" if failure_ is None else failure_.value,
1100 id="" if sid is None else " (id: {})".format(sid),
1101 )
1102 )
1103
1104 try:
1105 assert self.hash_clients_map[session_hash] == client
1106 del self.hash_clients_map[session_hash]
1107 except KeyError:
1108 pass
1109
1110 if sid is not None:
1111 try:
1112 del client.xep_0065_sid_session[sid]
1113 except KeyError:
1114 log.warning("Session id {} is unknown".format(sid))
1115
1116 try:
1117 session_data = client._s5b_sessions[session_hash]
1118 except KeyError:
1119 log.warning("There is no session with this hash")
1120 return
1121 else:
1122 del client._s5b_sessions[session_hash]
1123
1124 try:
1125 session_data["timer"].cancel()
1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1127 pass
1128
1129 return failure_
1130
1131 def start_stream(self, client, stream_object, local_jid, to_jid, sid):
1132 """Launch the stream workflow
1133
1134 @param streamProducer: stream_object to use
1135 @param local_jid(jid.JID): same as for [get_candidates]
1136 @param to_jid: JID of the recipient
1137 @param sid: Stream session id
1138 @param successCb: method to call when stream successfuly finished
1139 @param failureCb: method to call when something goes wrong
1140 @return (D): Deferred fired when session is finished
1141 """
1142 session_data = self._create_session(
1143 client, stream_object, local_jid, to_jid, sid, True)
1144
1145 session_data[client] = client
1146
1147 def got_candidates(candidates):
1148 session_data["candidates"] = candidates
1149 iq_elt = client.IQ()
1150 iq_elt["from"] = local_jid.full()
1151 iq_elt["to"] = to_jid.full()
1152 query_elt = iq_elt.addElement((NS_BS, "query"))
1153 query_elt["mode"] = "tcp"
1154 query_elt["sid"] = sid
1155
1156 for candidate in candidates:
1157 streamhost = query_elt.addElement("streamhost")
1158 streamhost["host"] = candidate.host
1159 streamhost["port"] = str(candidate.port)
1160 streamhost["jid"] = candidate.jid.full()
1161 log.debug("Candidate proposed: {}".format(candidate))
1162
1163 d = iq_elt.send()
1164 args = [client, session_data, local_jid]
1165 d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args)
1166
1167 self.get_candidates(client, local_jid).addCallback(got_candidates)
1168 return session_data[DEFER_KEY]
1169
1170 def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid):
1171 """Called when the result of open iq is received
1172
1173 @param session_data(dict): data of the session
1174 @param client: %(doc_client)s
1175 @param iq_elt(domish.Element): <iq> result
1176 """
1177 try:
1178 query_elt = next(iq_elt.elements(NS_BS, "query"))
1179 streamhost_used_elt = next(query_elt.elements(NS_BS, "streamhost-used"))
1180 except StopIteration:
1181 log.warning("No streamhost found in stream query")
1182 # FIXME: must clean session
1183 return
1184
1185 streamhost_jid = jid.JID(streamhost_used_elt["jid"])
1186 try:
1187 candidate = next((
1188 c for c in session_data["candidates"] if c.jid == streamhost_jid
1189 ))
1190 except StopIteration:
1191 log.warning(
1192 "Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())
1193 )
1194 return
1195 else:
1196 log.info("Candidate choosed by target: {}".format(candidate))
1197
1198 if candidate.type == XEP_0065.TYPE_PROXY:
1199 log.info("A Socks5 proxy is used")
1200 d = self.connect_candidate(client, candidate, session_data["hash"])
1201 d.addCallback(
1202 lambda __: candidate.activate(
1203 client, session_data["id"], session_data["peer_jid"], local_jid
1204 )
1205 )
1206 d.addErrback(self._activation_eb)
1207 else:
1208 d = defer.succeed(None)
1209
1210 d.addCallback(lambda __: candidate.start_transfer(session_data["hash"]))
1211
1212 def _activation_eb(self, failure):
1213 log.warning("Proxy activation error: {}".format(failure.value))
1214
1215 def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid):
1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value))
1217 # FIXME: must clean session
1218
1219 def create_session(self, *args, **kwargs):
1220 """like [_create_session] but return the session deferred instead of the whole session
1221
1222 session deferred is fired when transfer is finished
1223 """
1224 return self._create_session(*args, **kwargs)[DEFER_KEY]
1225
1226 def _create_session(self, client, stream_object, local_jid, to_jid, sid,
1227 requester=False):
1228 """Called when a bytestream is imminent
1229
1230 @param stream_object(iface.IStreamProducer): File object where data will be
1231 written
1232 @param to_jid(jid.JId): jid of the other peer
1233 @param sid(unicode): session id
1234 @param initiator(bool): if True, this session is create by initiator
1235 @return (dict): session data
1236 """
1237 if sid in client.xep_0065_sid_session:
1238 raise exceptions.ConflictError("A session with this id already exists !")
1239 if requester:
1240 session_hash = get_session_hash(local_jid, to_jid, sid)
1241 session_data = self._register_hash(client, session_hash, stream_object)
1242 else:
1243 session_hash = get_session_hash(to_jid, local_jid, sid)
1244 session_d = defer.Deferred()
1245 session_d.addBoth(self.kill_session, session_hash, sid, client)
1246 session_data = client._s5b_sessions[session_hash] = {
1247 DEFER_KEY: session_d,
1248 TIMER_KEY: reactor.callLater(
1249 TIMEOUT, self._time_out, session_hash, client
1250 ),
1251 }
1252 client.xep_0065_sid_session[sid] = session_data
1253 session_data.update(
1254 {
1255 "id": sid,
1256 "local_jid": local_jid,
1257 "peer_jid": to_jid,
1258 "stream_object": stream_object,
1259 "hash": session_hash,
1260 }
1261 )
1262
1263 return session_data
1264
1265 def getSession(self, client, session_hash):
1266 """Return session data
1267
1268 @param session_hash(unicode): hash of the session
1269 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
1270 @param client(None, SatXMPPClient): client of the peer
1271 None is used only if client is unknown (this is only the case
1272 for incoming request received by Socks5ServerFactory). None must
1273 only be used by Socks5ServerFactory.
1274 See comments below for details
1275 @return (dict): session data
1276 """
1277 assert isinstance(session_hash, str)
1278 if client is None:
1279 try:
1280 client = self.hash_clients_map[session_hash]
1281 except KeyError as e:
1282 log.warning("The requested session doesn't exists !")
1283 raise e
1284 return client._s5b_sessions[session_hash]
1285
1286 def register_hash(self, *args, **kwargs):
1287 """like [_register_hash] but return the session deferred instead of the whole session
1288 session deferred is fired when transfer is finished
1289 """
1290 return self._register_hash(*args, **kwargs)[DEFER_KEY]
1291
1292 def _register_hash(self, client, session_hash, stream_object):
1293 """Create a session_data associated to hash
1294
1295 @param session_hash(str): hash of the session
1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
1297 None if it will be filled later
1298 return (dict): session data
1299 """
1300 assert session_hash not in client._s5b_sessions
1301 session_d = defer.Deferred()
1302 session_d.addBoth(self.kill_session, session_hash, None, client)
1303 session_data = client._s5b_sessions[session_hash] = {
1304 DEFER_KEY: session_d,
1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client),
1306 }
1307
1308 if stream_object is not None:
1309 session_data["stream_object"] = stream_object
1310
1311 assert session_hash not in self.hash_clients_map
1312 self.hash_clients_map[session_hash] = client
1313
1314 return session_data
1315
1316 def associate_stream_object(self, client, session_hash, stream_object):
1317 """Associate a stream object with a session"""
1318 session_data = self.getSession(client, session_hash)
1319 assert "stream_object" not in session_data
1320 session_data["stream_object"] = stream_object
1321
1322 def stream_query(self, iq_elt, client):
1323 log.debug("BS stream query")
1324
1325 iq_elt.handled = True
1326
1327 query_elt = next(iq_elt.elements(NS_BS, "query"))
1328 try:
1329 sid = query_elt["sid"]
1330 except KeyError:
1331 log.warning("Invalid bystreams request received")
1332 return client.sendError(iq_elt, "bad-request")
1333
1334 streamhost_elts = list(query_elt.elements(NS_BS, "streamhost"))
1335 if not streamhost_elts:
1336 return client.sendError(iq_elt, "bad-request")
1337
1338 try:
1339 session_data = client.xep_0065_sid_session[sid]
1340 except KeyError:
1341 log.warning("Ignoring unexpected BS transfer: {}".format(sid))
1342 return client.sendError(iq_elt, "not-acceptable")
1343
1344 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
1345
1346 candidates = []
1347 nb_sh = len(streamhost_elts)
1348 for idx, sh_elt in enumerate(streamhost_elts):
1349 try:
1350 host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"])
1351 except KeyError:
1352 log.warning("malformed streamhost element")
1353 return client.sendError(iq_elt, "bad-request")
1354 priority = nb_sh - idx
1355 if jid_.userhostJID() != peer_jid.userhostJID():
1356 type_ = XEP_0065.TYPE_PROXY
1357 else:
1358 type_ = XEP_0065.TYPE_DIRECT
1359 candidates.append(Candidate(host, port, type_, priority, jid_))
1360
1361 for candidate in candidates:
1362 log.info("Candidate proposed: {}".format(candidate))
1363
1364 d = self.get_best_candidate(client, candidates, session_data["hash"])
1365 d.addCallback(self._ack_stream, iq_elt, session_data, client)
1366
1367 def _ack_stream(self, candidate, iq_elt, session_data, client):
1368 if candidate is None:
1369 log.info("No streamhost candidate worked, we have to end negotiation")
1370 return client.sendError(iq_elt, "item-not-found")
1371 log.info("We choose: {}".format(candidate))
1372 result_elt = xmlstream.toResponse(iq_elt, "result")
1373 query_elt = result_elt.addElement((NS_BS, "query"))
1374 query_elt["sid"] = session_data["id"]
1375 streamhost_used_elt = query_elt.addElement("streamhost-used")
1376 streamhost_used_elt["jid"] = candidate.jid.full()
1377 client.send(result_elt)
1378
1379
1380 @implementer(iwokkel.IDisco)
1381 class XEP_0065_handler(xmlstream.XMPPHandler):
1382
1383 def __init__(self, plugin_parent):
1384 self.plugin_parent = plugin_parent
1385 self.host = plugin_parent.host
1386
1387 def connectionInitialized(self):
1388 self.xmlstream.addObserver(
1389 BS_REQUEST, self.plugin_parent.stream_query, client=self.parent
1390 )
1391
1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1393 return [disco.DiscoFeature(NS_BS)]
1394
1395 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
1396 return []