comparison sat/plugins/plugin_xep_0065.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/plugins/plugin_xep_0065.py@7ad5f2c4e34a
children 56f94936df1e
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 #-*- coding: utf-8 -*-
3
4 # SAT plugin for managing xep-0065
5
6 # Copyright (C)
7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org)
8 # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
9 # 2009-2018 Jérôme Poisson (goffi@goffi.org)
10
11 # This program is free software: you can redistribute it and/or modify
12 # it under the terms of the GNU Affero General Public License as published by
13 # the Free Software Foundation, either version 3 of the License, or
14 # (at your option) any later version.
15
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU Affero General Public License for more details.
20
21 # You should have received a copy of the GNU Affero General Public License
22 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23
24 # --
25
26 # This module is based on proxy65 (http://code.google.com/p/proxy65),
27 # originaly written by David Smith and modified by Fabio Forno.
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
29 # license.
30
31 # --
32
33 # Here is a copy of the original license:
34
35 # Copyright (C)
36 # 2002-2004 Dave Smith (dizzyd@jabber.org)
37 # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
38
39 # Permission is hereby granted, free of charge, to any person obtaining a copy
40 # of this software and associated documentation files (the "Software"), to deal
41 # in the Software without restriction, including without limitation the rights
42 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
43 # copies of the Software, and to permit persons to whom the Software is
44 # furnished to do so, subject to the following conditions:
45
46 # The above copyright notice and this permission notice shall be included in
47 # all copies or substantial portions of the Software.
48
49 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
50 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
51 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
52 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
55 # THE SOFTWARE.
56
57 from sat.core.i18n import _
58 from sat.core.log import getLogger
59 log = getLogger(__name__)
60 from sat.core.constants import Const as C
61 from sat.core import exceptions
62 from sat.tools import sat_defer
63 from twisted.internet import protocol
64 from twisted.internet import reactor
65 from twisted.internet import error as internet_error
66 from twisted.words.protocols.jabber import error as jabber_error
67 from twisted.words.protocols.jabber import jid
68 from twisted.words.protocols.jabber import xmlstream
69 from twisted.internet import defer
70 from collections import namedtuple
71 import struct
72 import hashlib
73 import uuid
74
75 from zope.interface import implements
76
77 try:
78 from twisted.words.protocols.xmlstream import XMPPHandler
79 except ImportError:
80 from wokkel.subprotocols import XMPPHandler
81
82 from wokkel import disco, iwokkel
83
84
85 PLUGIN_INFO = {
86 C.PI_NAME: "XEP 0065 Plugin",
87 C.PI_IMPORT_NAME: "XEP-0065",
88 C.PI_TYPE: "XEP",
89 C.PI_MODES: C.PLUG_MODE_BOTH,
90 C.PI_PROTOCOLS: ["XEP-0065"],
91 C.PI_DEPENDENCIES: ["IP"],
92 C.PI_RECOMMENDATIONS: ["NAT-PORT"],
93 C.PI_MAIN: "XEP_0065",
94 C.PI_HANDLER: "yes",
95 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""")
96 }
97
98 IQ_SET = '/iq[@type="set"]'
99 NS_BS = 'http://jabber.org/protocol/bytestreams'
100 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
101 TIMER_KEY = 'timer'
102 DEFER_KEY = 'finished' # key of the deferred used to track session end
103 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
104
105 # priorities are candidates local priorities, must be a int between 0 and 65535
106 PRIORITY_BEST_DIRECT = 10000
107 PRIORITY_DIRECT = 5000
108 PRIORITY_ASSISTED = 1000
109 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
110 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
111 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
112
113 TIMEOUT = 300 # maxium time between session creation and stream start
114
115 # XXX: by default eveything is automatic
116 # TODO: use these params to force use of specific proxy/port/IP
117 # PARAMS = """
118 # <params>
119 # <general>
120 # <category name="File Transfer">
121 # <param name="Force IP" type="string" />
122 # <param name="Force Port" type="int" constraint="1;65535" />
123 # </category>
124 # </general>
125 # <individual>
126 # <category name="File Transfer">
127 # <param name="Force Proxy" value="" type="string" />
128 # <param name="Force Proxy host" value="" type="string" />
129 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" />
130 # </category>
131 # </individual>
132 # </params>
133 # """
134
135 (STATE_INITIAL,
136 STATE_AUTH,
137 STATE_REQUEST,
138 STATE_READY,
139 STATE_AUTH_USERPASS,
140 STATE_CLIENT_INITIAL,
141 STATE_CLIENT_AUTH,
142 STATE_CLIENT_REQUEST,
143 ) = xrange(8)
144
145 SOCKS5_VER = 0x05
146
147 ADDR_IPV4 = 0x01
148 ADDR_DOMAINNAME = 0x03
149 ADDR_IPV6 = 0x04
150
151 CMD_CONNECT = 0x01
152 CMD_BIND = 0x02
153 CMD_UDPASSOC = 0x03
154
155 AUTHMECH_ANON = 0x00
156 AUTHMECH_USERPASS = 0x02
157 AUTHMECH_INVALID = 0xFF
158
159 REPLY_SUCCESS = 0x00
160 REPLY_GENERAL_FAILUR = 0x01
161 REPLY_CONN_NOT_ALLOWED = 0x02
162 REPLY_NETWORK_UNREACHABLE = 0x03
163 REPLY_HOST_UNREACHABLE = 0x04
164 REPLY_CONN_REFUSED = 0x05
165 REPLY_TTL_EXPIRED = 0x06
166 REPLY_CMD_NOT_SUPPORTED = 0x07
167 REPLY_ADDR_NOT_SUPPORTED = 0x08
168
169
170 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port'])
171
172
173 class Candidate(object):
174
175 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None):
176 """
177 @param host(unicode): host IP or domain
178 @param port(int): port
179 @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
180 @param priority(int): priority
181 @param jid_(jid.JID): jid
182 @param id_(None, id_): Candidate ID, or None to generate
183 @param priority_local(bool): if True, priority is used as local priority,
184 else priority is used as global one (and local priority is set to 0)
185 """
186 assert isinstance(jid_, jid.JID)
187 self.host, self.port, self.type, self.jid = (
188 host, int(port), type_, jid_)
189 self.id = id_ if id_ is not None else unicode(uuid.uuid4())
190 if priority_local:
191 self._local_priority = int(priority)
192 self._priority = self.calculatePriority()
193 else:
194 self._local_priority = 0
195 self._priority = int(priority)
196 self.factory = factory
197
198 def discard(self):
199 """Disconnect a candidate if it is connected
200
201 Used to disconnect tryed client when they are discarded
202 """
203 log.debug(u"Discarding {}".format(self))
204 try:
205 self.factory.discard()
206 except AttributeError:
207 pass # no discard for Socks5ServerFactory
208
209 @property
210 def local_priority(self):
211 return self._local_priority
212
213 @property
214 def priority(self):
215 return self._priority
216
217 def __str__(self):
218 # similar to __unicode__ but we don't show jid and we encode id
219 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
220 self,
221 id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'),
222 )
223
224 def __unicode__(self):
225 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
226 self,
227 id=u" id={}".format(self.id if self.id is not None else u''),
228 )
229
230 def __eq__(self, other):
231 # self.id is is not used in __eq__ as the same candidate can have
232 # different ids if proposed by initiator or responder
233 try:
234 return (self.host == other.host and
235 self.port == other.port and
236 self.jid == other.jid)
237 except (AttributeError, TypeError):
238 return False
239
240 def __ne__(self, other):
241 return not self.__eq__(other)
242
243 def calculatePriority(self):
244 """Calculate candidate priority according to XEP-0260 §2.2
245
246
247 @return (int): priority
248 """
249 if self.type == XEP_0065.TYPE_DIRECT:
250 multiplier = 126
251 elif self.type == XEP_0065.TYPE_ASSISTED:
252 multiplier = 120
253 elif self.type == XEP_0065.TYPE_TUNEL:
254 multiplier = 110
255 elif self.type == XEP_0065.TYPE_PROXY:
256 multiplier = 10
257 else:
258 raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
259 return 2**16 * multiplier + self._local_priority
260
261 def activate(self, sid, peer_jid, client):
262 """Activate the proxy candidate
263
264 Send activation request as explained in XEP-0065 § 6.3.5
265 Must only be used with proxy candidates
266 @param sid(unicode): session id (same as for getSessionHash)
267 @param peer_jid(jid.JID): jid of the other peer
268 @return (D(domish.Element)): IQ result (or error)
269 """
270 assert self.type == XEP_0065.TYPE_PROXY
271 iq_elt = client.IQ()
272 iq_elt['to'] = self.jid.full()
273 query_elt = iq_elt.addElement((NS_BS, 'query'))
274 query_elt['sid'] = sid
275 query_elt.addElement('activate', content=peer_jid.full())
276 return iq_elt.send()
277
278 def startTransfer(self, session_hash=None):
279 if self.type == XEP_0065.TYPE_PROXY:
280 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default
281 else:
282 chunk_size = None
283 self.factory.startTransfer(session_hash, chunk_size=chunk_size)
284
285
286 def getSessionHash(requester_jid, target_jid, sid):
287 """Calculate SHA1 Hash according to XEP-0065 §5.3.2
288
289 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
290 @param target_jid(jid.JID): jid of the target
291 @param sid(unicode): session id
292 @return (str): hash
293 """
294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest()
295
296
297 class SOCKSv5(protocol.Protocol):
298 CHUNK_SIZE = 2**16
299
300 def __init__(self, session_hash=None):
301 """
302 @param session_hash(str): hash of the session
303 must only be used in client mode
304 """
305 self.connection = defer.Deferred() # called when connection/auth is done
306 if session_hash is not None:
307 self.server_mode = False
308 self._session_hash = session_hash
309 self.state = STATE_CLIENT_INITIAL
310 else:
311 self.server_mode = True
312 self.state = STATE_INITIAL
313 self.buf = ""
314 self.supportedAuthMechs = [AUTHMECH_ANON]
315 self.supportedAddrs = [ADDR_DOMAINNAME]
316 self.enabledCommands = [CMD_CONNECT]
317 self.peersock = None
318 self.addressType = 0
319 self.requestType = 0
320 self._stream_object = None
321 self.active = False # set to True when protocol is actually used for transfer
322 # used by factories to know when the finished Deferred can be triggered
323
324 @property
325 def stream_object(self):
326 if self._stream_object is None:
327 self._stream_object = self.getSession()['stream_object']
328 if self.server_mode:
329 self._stream_object.registerProducer(self.transport, True)
330 return self._stream_object
331
332 def getSession(self):
333 """Return session associated with this candidate
334
335 @return (dict): session data
336 """
337 if self.server_mode:
338 return self.factory.getSession(self._session_hash)
339 else:
340 return self.factory.getSession()
341
342 def _startNegotiation(self):
343 log.debug("starting negotiation (client mode)")
344 self.state = STATE_CLIENT_AUTH
345 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
346
347 def _parseNegotiation(self):
348 try:
349 # Parse out data
350 ver, nmethod = struct.unpack('!BB', self.buf[:2])
351 methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2])
352
353 # Ensure version is correct
354 if ver != 5:
355 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
356 self.transport.loseConnection()
357 return
358
359 # Trim off front of the buffer
360 self.buf = self.buf[nmethod + 2:]
361
362 # Check for supported auth mechs
363 for m in self.supportedAuthMechs:
364 if m in methods:
365 # Update internal state, according to selected method
366 if m == AUTHMECH_ANON:
367 self.state = STATE_REQUEST
368 elif m == AUTHMECH_USERPASS:
369 self.state = STATE_AUTH_USERPASS
370 # Complete negotiation w/ this method
371 self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
372 return
373
374 # No supported mechs found, notify client and close the connection
375 log.warning(u"Unsupported authentication mechanism")
376 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
377 self.transport.loseConnection()
378 except struct.error:
379 pass
380
381 def _parseUserPass(self):
382 try:
383 # Parse out data
384 ver, ulen = struct.unpack('BB', self.buf[:2])
385 uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2])
386 plen, = struct.unpack('B', self.buf[ulen + 2])
387 password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen])
388 # Trim off fron of the buffer
389 self.buf = self.buf[3 + ulen + plen:]
390 # Fire event to authenticate user
391 if self.authenticateUserPass(uname, password):
392 # Signal success
393 self.state = STATE_REQUEST
394 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00))
395 else:
396 # Signal failure
397 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01))
398 self.transport.loseConnection()
399 except struct.error:
400 pass
401
402 def sendErrorReply(self, errorcode):
403 # Any other address types are not supported
404 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
405 self.transport.write(result)
406 self.transport.loseConnection()
407
408 def _parseRequest(self):
409 try:
410 # Parse out data and trim buffer accordingly
411 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
412
413 # Ensure we actually support the requested address type
414 if self.addressType not in self.supportedAddrs:
415 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
416 return
417
418 # Deal with addresses
419 if self.addressType == ADDR_IPV4:
420 addr, port = struct.unpack('!IH', self.buf[4:10])
421 self.buf = self.buf[10:]
422 elif self.addressType == ADDR_DOMAINNAME:
423 nlen = ord(self.buf[4])
424 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
425 self.buf = self.buf[7 + len(addr):]
426 else:
427 # Any other address types are not supported
428 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
429 return
430
431 # Ensure command is supported
432 if cmd not in self.enabledCommands:
433 # Send a not supported error
434 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
435 return
436
437 # Process the command
438 if cmd == CMD_CONNECT:
439 self.connectRequested(addr, port)
440 elif cmd == CMD_BIND:
441 self.bindRequested(addr, port)
442 else:
443 # Any other command is not supported
444 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
445
446 except struct.error:
447 # The buffer is probably not complete, we need to wait more
448 return None
449
450 def _makeRequest(self):
451 hash_ = self._session_hash
452 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
453 self.transport.write(request)
454 self.state = STATE_CLIENT_REQUEST
455
456 def _parseRequestReply(self):
457 try:
458 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
459 # Ensure we actually support the requested address type
460 if self.addressType not in self.supportedAddrs:
461 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
462 return
463
464 # Deal with addresses
465 if self.addressType == ADDR_IPV4:
466 addr, port = struct.unpack('!IH', self.buf[4:10])
467 self.buf = self.buf[10:]
468 elif self.addressType == ADDR_DOMAINNAME:
469 nlen = ord(self.buf[4])
470 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
471 self.buf = self.buf[7 + len(addr):]
472 else:
473 # Any other address types are not supported
474 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
475 return
476
477 # Ensure reply is OK
478 if rep != REPLY_SUCCESS:
479 self.loseConnection()
480 return
481
482 self.state = STATE_READY
483 self.connection.callback(None)
484
485 except struct.error:
486 # The buffer is probably not complete, we need to wait more
487 return None
488
489 def connectionMade(self):
490 log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client"))
491 if self.state == STATE_CLIENT_INITIAL:
492 self._startNegotiation()
493
494 def connectRequested(self, addr, port):
495 # Check that this session is expected
496 if not self.factory.addToSession(addr, self):
497 self.sendErrorReply(REPLY_CONN_REFUSED)
498 log.warning(u"Unexpected connection request received from {host}"
499 .format(host=self.transport.getPeer().host))
500 return
501 self._session_hash = addr
502 self.connectCompleted(addr, 0)
503
504 def startTransfer(self, chunk_size):
505 """Callback called when the result iq is received
506
507 @param chunk_size(None, int): size of the buffer, or None for default
508 """
509 self.active = True
510 if chunk_size is not None:
511 self.CHUNK_SIZE = chunk_size
512 log.debug(u"Starting file transfer")
513 d = self.stream_object.startStream(self.transport)
514 d.addCallback(self.streamFinished)
515
516 def streamFinished(self, d):
517 log.info(_("File transfer completed, closing connection"))
518 self.transport.loseConnection()
519
520 def connectCompleted(self, remotehost, remoteport):
521 if self.addressType == ADDR_IPV4:
522 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
523 elif self.addressType == ADDR_DOMAINNAME:
524 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
525 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
526 self.transport.write(result)
527 self.state = STATE_READY
528
529 def bindRequested(self, addr, port):
530 pass
531
532 def authenticateUserPass(self, user, passwd):
533 # FIXME: implement authentication and remove the debug printing a password
534 log.debug(u"User/pass: %s/%s" % (user, passwd))
535 return True
536
537 def dataReceived(self, buf):
538 if self.state == STATE_READY:
539 # Everything is set, we just have to write the incoming data
540 self.stream_object.write(buf)
541 if not self.active:
542 self.active = True
543 self.getSession()[TIMER_KEY].cancel()
544 return
545
546 self.buf = self.buf + buf
547 if self.state == STATE_INITIAL:
548 self._parseNegotiation()
549 if self.state == STATE_AUTH_USERPASS:
550 self._parseUserPass()
551 if self.state == STATE_REQUEST:
552 self._parseRequest()
553 if self.state == STATE_CLIENT_REQUEST:
554 self._parseRequestReply()
555 if self.state == STATE_CLIENT_AUTH:
556 ver, method = struct.unpack('!BB', buf)
557 self.buf = self.buf[2:]
558 if ver != SOCKS5_VER or method != AUTHMECH_ANON:
559 self.transport.loseConnection()
560 else:
561 self._makeRequest()
562
563 def connectionLost(self, reason):
564 log.debug(u"Socks5 connection lost: {}".format(reason.value))
565 if self.state != STATE_READY:
566 self.connection.errback(reason)
567 if self.server_mode :
568 self.factory.removeFromSession(self._session_hash, self, reason)
569
570
571 class Socks5ServerFactory(protocol.ServerFactory):
572 protocol = SOCKSv5
573
574 def __init__(self, parent):
575 """
576 @param parent(XEP_0065): XEP_0065 parent instance
577 """
578 self.parent = parent
579
580 def getSession(self, session_hash):
581 return self.parent.getSession(None, session_hash)
582
583 def startTransfer(self, session_hash, chunk_size=None):
584 session = self.getSession(session_hash)
585 try:
586 protocol = session['protocols'][0]
587 except (KeyError, IndexError):
588 log.error(u"Can't start file transfer, can't find protocol")
589 else:
590 session[TIMER_KEY].cancel()
591 protocol.startTransfer(chunk_size)
592
593 def addToSession(self, session_hash, protocol):
594 """Check is session_hash is valid, and associate protocol with it
595
596 the session will be associated to the corresponding candidate
597 @param session_hash(str): hash of the session
598 @param protocol(SOCKSv5): protocol instance
599 @param return(bool): True if hash was valid (i.e. expected), False else
600 """
601 try:
602 session_data = self.getSession(session_hash)
603 except KeyError:
604 return False
605 else:
606 session_data.setdefault('protocols', []).append(protocol)
607 return True
608
609 def removeFromSession(self, session_hash, protocol, reason):
610 """Remove a protocol from session_data
611
612 There can be several protocol instances while candidates are tried, they
613 have removed when candidate connection is closed
614 @param session_hash(str): hash of the session
615 @param protocol(SOCKSv5): protocol instance
616 @param reason(failure.Failure): reason of the removal
617 """
618 try:
619 protocols = self.getSession(session_hash)['protocols']
620 protocols.remove(protocol)
621 except (KeyError, ValueError):
622 log.error(u"Protocol not found in session while it should be there")
623 else:
624 if protocol.active:
625 # The active protocol has been removed, session is finished
626 if reason.check(internet_error.ConnectionDone):
627 self.getSession(session_hash)[DEFER_KEY].callback(None)
628 else:
629 self.getSession(session_hash)[DEFER_KEY].errback(reason)
630
631
632 class Socks5ClientFactory(protocol.ClientFactory):
633 protocol = SOCKSv5
634
635 def __init__(self, client, parent, session, session_hash):
636 """Init the Client Factory
637
638 @param session(dict): session data
639 @param session_hash(unicode): hash used for peer_connection
640 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
641 """
642 self.session = session
643 self.session_hash = session_hash
644 self.client = client
645 self.connection = defer.Deferred()
646 self._protocol_instance = None
647 self.connector = None
648
649 def discard(self):
650 """Disconnect the client
651
652 Also set a discarded flag, which avoid to call the session Deferred
653 """
654 self.connector.disconnect()
655
656 def getSession(self):
657 return self.session
658
659 def startTransfer(self, dummy=None, chunk_size=None):
660 self.session[TIMER_KEY].cancel()
661 self._protocol_instance.startTransfer(chunk_size)
662
663 def clientConnectionFailed(self, connector, reason):
664 log.debug(u"Connection failed")
665 self.connection.errback(reason)
666
667 def clientConnectionLost(self, connector, reason):
668 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value)
669 if self._protocol_instance.active:
670 # This one was used for the transfer, than mean that
671 # the Socks5 session is finished
672 if reason.check(internet_error.ConnectionDone):
673 self.getSession()[DEFER_KEY].callback(None)
674 else:
675 self.getSession()[DEFER_KEY].errback(reason)
676 self._protocol_instance = None
677
678 def buildProtocol(self, addr):
679 log.debug(("Socks 5 client connection started"))
680 p = self.protocol(session_hash=self.session_hash)
681 p.factory = self
682 p.connection.chainDeferred(self.connection)
683 self._protocol_instance = p
684 return p
685
686
687 class XEP_0065(object):
688 NAMESPACE = NS_BS
689 TYPE_DIRECT = 'direct'
690 TYPE_ASSISTED = 'assisted'
691 TYPE_TUNEL = 'tunel'
692 TYPE_PROXY = 'proxy'
693 Candidate = Candidate
694
695 def __init__(self, host):
696 log.info(_("Plugin XEP_0065 initialization"))
697 self.host = host
698
699 # session data
700 self.hash_clients_map = {} # key: hash of the transfer session, value: session data
701 self._cache_proxies = {} # key: server jid, value: proxy data
702
703 # misc data
704 self._server_factory = None
705 self._external_port = None
706
707 # plugins shortcuts
708 self._ip = self.host.plugins['IP']
709 try:
710 self._np = self.host.plugins['NAT-PORT']
711 except KeyError:
712 log.debug(u"NAT Port plugin not available")
713 self._np = None
714
715 # parameters
716 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
717 # host.memory.updateParams(PARAMS)
718
719 def getHandler(self, client):
720 return XEP_0065_handler(self)
721
722 def profileConnected(self, client):
723 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict)
724 client._s5b_sessions = {}
725
726 def getSessionHash(self, from_jid, to_jid, sid):
727 return getSessionHash(from_jid, to_jid, sid)
728
729 def getSocks5ServerFactory(self):
730 """Return server factory
731
732 The server is created if it doesn't exists yet
733 self._server_factory_port is set on server creation
734 """
735
736 if self._server_factory is None:
737 self._server_factory = Socks5ServerFactory(self)
738 for port in xrange(SERVER_STARTING_PORT, 65356):
739 try:
740 listening_port = reactor.listenTCP(port, self._server_factory)
741 except internet_error.CannotListenError as e:
742 log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format(
743 port=port,
744 err_msg=e.socketError.strerror,
745 err_num=u' (error code: {})'.format(e.socketError.errno),
746 ))
747 else:
748 self._server_factory_port = listening_port.getHost().port
749 break
750
751 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port))
752 return self._server_factory
753
754 @defer.inlineCallbacks
755 def getProxy(self, client):
756 """Return the proxy available for this profile
757
758 cache is used between clients using the same server
759 @return ((D)(ProxyInfos, None)): Found proxy infos,
760 or None if not acceptable proxy is found
761 """
762 def notFound(server):
763 log.info(u"No proxy found on this server")
764 self._cache_proxies[server] = None
765 defer.returnValue(None)
766 server = client.jid.host
767 try:
768 defer.returnValue(self._cache_proxies[server])
769 except KeyError:
770 pass
771 try:
772 proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop()
773 except (defer.CancelledError, StopIteration, KeyError):
774 notFound(server)
775 iq_elt = client.IQ('get')
776 iq_elt['to'] = proxy.full()
777 iq_elt.addElement((NS_BS, 'query'))
778
779 try:
780 result_elt = yield iq_elt.send()
781 except jabber_error.StanzaError as failure:
782 log.warning(u"Error while requesting proxy info on {jid}: {error}"
783 .format(proxy.full(), failure))
784 notFound(server)
785
786 try:
787 query_elt = result_elt.elements(NS_BS, 'query').next()
788 streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next()
789 host = streamhost_elt['host']
790 jid_ = streamhost_elt['jid']
791 port = streamhost_elt['port']
792 if not all((host, jid, port)):
793 raise KeyError
794 jid_ = jid.JID(jid_)
795 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError):
796 log.warning(u"Invalid proxy data received from {}".format(proxy.full()))
797 notFound(server)
798
799 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
800 log.info(u"Proxy found: {}".format(proxy_infos))
801 defer.returnValue(proxy_infos)
802
803 @defer.inlineCallbacks
804 def _getNetworkData(self, client):
805 """Retrieve information about network
806
807 @param client: %(doc_client)s
808 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
809 """
810 self.getSocks5ServerFactory()
811 local_port = self._server_factory_port
812 external_ip = yield self._ip.getExternalIP(client)
813 local_ips = yield self._ip.getLocalIPs(client)
814
815 if external_ip is not None and self._external_port is None:
816 if external_ip != local_ips[0]:
817 log.info(u"We are probably behind a NAT")
818 if self._np is None:
819 log.warning(u"NAT port plugin not available, we can't map port")
820 else:
821 ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream")
822 if ext_port is None:
823 log.warning(u"Can't map NAT port")
824 else:
825 self._external_port = ext_port
826
827 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
828
829 @defer.inlineCallbacks
830 def getCandidates(self, client):
831 """Return a list of our stream candidates
832
833 @return (D(list[Candidate])): list of candidates, ordered by priority
834 """
835 server_factory = yield self.getSocks5ServerFactory()
836 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
837 proxy = yield self.getProxy(client)
838
839 # its time to gather the candidates
840 candidates = []
841
842 # first the direct ones
843
844 # the preferred direct connection
845 ip = local_ips.pop(0)
846 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory))
847 for ip in local_ips:
848 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory))
849
850 # then the assisted one
851 if ext_port is not None:
852 candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory))
853
854 # finally the proxy
855 if proxy:
856 candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True))
857
858 # should be already sorted, but just in case the priorities get weird
859 candidates.sort(key=lambda c: c.priority, reverse=True)
860 defer.returnValue(candidates)
861
862 def _addConnector(self, connector, candidate):
863 """Add connector used to connect to candidate, and return client factory's connection Deferred
864
865 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
866 @param connector: a connector implementing IConnector
867 @param candidate(Candidate): candidate linked to the connector
868 @return (D): Deferred fired when factory connection is done or has failed
869 """
870 candidate.factory.connector = connector
871 return candidate.factory.connection
872
873 def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None):
874 """Connect to a candidate
875
876 Connection will be done with a Socks5ClientFactory
877 @param candidate(Candidate): candidate to connect to
878 @param session_hash(unicode): hash of the session
879 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
880 @param peer_session_hash(unicode, None): hash used with the peer
881 None to use session_hash.
882 None must be used in 2 cases:
883 - when XEP-0065 is used with XEP-0096
884 - when a peer connect to a proxy *he proposed himself*
885 in practice, peer_session_hash is only used by tryCandidates
886 @param delay(None, float): optional delay to wait before connection, in seconds
887 @return (D): Deferred launched when TCP connection + Socks5 connection is done
888 """
889 if peer_session_hash is None:
890 # for XEP-0065, only one hash is needed
891 peer_session_hash = session_hash
892 session = self.getSession(client, session_hash)
893 factory = Socks5ClientFactory(client, self, session, peer_session_hash)
894 candidate.factory = factory
895 if delay is None:
896 d = defer.succeed(candidate.host)
897 else:
898 d = sat_defer.DelayedDeferred(delay, candidate.host)
899 d.addCallback(reactor.connectTCP, candidate.port, factory)
900 d.addCallback(self._addConnector, candidate)
901 return d
902
903 def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None):
904 defers_list = []
905
906 for candidate in candidates:
907 delay = CANDIDATE_DELAY * len(defers_list)
908 if candidate.type == XEP_0065.TYPE_PROXY:
909 delay += CANDIDATE_DELAY_PROXY
910 d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay)
911 if connection_cb is not None:
912 d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate))
913 if connection_eb is not None:
914 d.addErrback(connection_eb, client, candidate)
915 defers_list.append(d)
916
917 return defers_list
918
919 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None):
920 """Get best candidate (according to priority) which can connect
921
922 @param candidates(iterable[Candidate]): candidates to test
923 @param session_hash(unicode): hash of the session
924 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
925 @param peer_session_hash(unicode, None): hash of the other peer
926 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
927 @return (D(None, Candidate)): best candidate or None if none can connect
928 """
929 defer_candidates = None
930
931 def connectionCb(client, candidate):
932 log.info(u"Connection of {} successful".format(unicode(candidate)))
933 for idx, other_candidate in enumerate(candidates):
934 try:
935 if other_candidate.priority < candidate.priority:
936 log.debug(u"Cancelling {}".format(other_candidate))
937 defer_candidates[idx].cancel()
938 except AttributeError:
939 assert other_candidate is None
940
941 def connectionEb(failure, client, candidate):
942 if failure.check(defer.CancelledError):
943 log.debug(u"Connection of {} has been cancelled".format(candidate))
944 else:
945 log.info(u"Connection of {candidate} Failed: {error}".format(
946 candidate = candidate,
947 error = failure.value))
948 candidates[candidates.index(candidate)] = None
949
950 def allTested(self):
951 log.debug(u"All candidates have been tested")
952 good_candidates = [c for c in candidates if c]
953 return good_candidates[0] if good_candidates else None
954
955 defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb)
956 d_list = defer.DeferredList(defer_candidates)
957 d_list.addCallback(allTested)
958 return d_list
959
960 def _timeOut(self, session_hash, client):
961 """Called when stream was not started quickly enough
962
963 @param session_hash(str): hash as returned by getSessionHash
964 @param client: %(doc_client)s
965 """
966 log.info(u"Socks5 Bytestream: TimeOut reached")
967 session = self.getSession(client, session_hash)
968 session[DEFER_KEY].errback(exceptions.TimeOutError)
969
970 def killSession(self, failure_, session_hash, sid, client):
971 """Clean the current session
972
973 @param session_hash(str): hash as returned by getSessionHash
974 @param sid(None, unicode): session id
975 or None if self.xep_0065_sid_session was not used
976 @param client: %(doc_client)s
977 @param failure_(None, failure.Failure): None if eveything was fine, a failure else
978 @return (None, failure.Failure): failure_ is returned
979 """
980 log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format(
981 hash=session_hash,
982 reason='' if failure_ is None else failure_.value,
983 id='' if sid is None else u' (id: {})'.format(sid),
984 ))
985
986 try:
987 assert self.hash_clients_map[session_hash] == client
988 del self.hash_clients_map[session_hash]
989 except KeyError:
990 pass
991
992 if sid is not None:
993 try:
994 del client.xep_0065_sid_session[sid]
995 except KeyError:
996 log.warning(u"Session id {} is unknown".format(sid))
997
998 try:
999 session_data = client._s5b_sessions[session_hash]
1000 except KeyError:
1001 log.warning(u"There is no session with this hash")
1002 return
1003 else:
1004 del client._s5b_sessions[session_hash]
1005
1006 try:
1007 session_data['timer'].cancel()
1008 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1009 pass
1010
1011 return failure_
1012
1013 def startStream(self, client, stream_object, to_jid, sid):
1014 """Launch the stream workflow
1015
1016 @param streamProducer: stream_object to use
1017 @param to_jid: JID of the recipient
1018 @param sid: Stream session id
1019 @param successCb: method to call when stream successfuly finished
1020 @param failureCb: method to call when something goes wrong
1021 @return (D): Deferred fired when session is finished
1022 """
1023 session_data = self._createSession(client, stream_object, to_jid, sid, True)
1024
1025 session_data[client] = client
1026
1027 def gotCandidates(candidates):
1028 session_data['candidates'] = candidates
1029 iq_elt = client.IQ()
1030 iq_elt["from"] = client.jid.full()
1031 iq_elt["to"] = to_jid.full()
1032 query_elt = iq_elt.addElement((NS_BS, 'query'))
1033 query_elt['mode'] = 'tcp'
1034 query_elt['sid'] = sid
1035
1036 for candidate in candidates:
1037 streamhost = query_elt.addElement('streamhost')
1038 streamhost['host'] = candidate.host
1039 streamhost['port'] = str(candidate.port)
1040 streamhost['jid'] = candidate.jid.full()
1041 log.debug(u"Candidate proposed: {}".format(candidate))
1042
1043 d = iq_elt.send()
1044 args = [session_data, client]
1045 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1046
1047 self.getCandidates(client).addCallback(gotCandidates)
1048 return session_data[DEFER_KEY]
1049
1050 def _IQNegotiationCb(self, iq_elt, session_data, client):
1051 """Called when the result of open iq is received
1052
1053 @param session_data(dict): data of the session
1054 @param client: %(doc_client)s
1055 @param iq_elt(domish.Element): <iq> result
1056 """
1057 try:
1058 query_elt = iq_elt.elements(NS_BS, 'query').next()
1059 streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next()
1060 except StopIteration:
1061 log.warning(u"No streamhost found in stream query")
1062 # FIXME: must clean session
1063 return
1064
1065 streamhost_jid = jid.JID(streamhost_used_elt['jid'])
1066 try:
1067 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
1068 except StopIteration:
1069 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
1070 return
1071 else:
1072 log.info(u"Candidate choosed by target: {}".format(candidate))
1073
1074 if candidate.type == XEP_0065.TYPE_PROXY:
1075 log.info(u"A Socks5 proxy is used")
1076 d = self.connectCandidate(client, candidate, session_data['hash'])
1077 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
1078 d.addErrback(self._activationEb)
1079 else:
1080 d = defer.succeed(None)
1081
1082 d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash']))
1083
1084 def _activationEb(self, failure):
1085 log.warning(u"Proxy activation error: {}".format(failure.value))
1086
1087 def _IQNegotiationEb(self, stanza_err, session_data, client):
1088 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value))
1089 # FIXME: must clean session
1090
1091 def createSession(self, *args, **kwargs):
1092 """like [_createSession] but return the session deferred instead of the whole session
1093
1094 session deferred is fired when transfer is finished
1095 """
1096 return self._createSession(*args, **kwargs)[DEFER_KEY]
1097
1098 def _createSession(self, client, stream_object, to_jid, sid, requester=False):
1099 """Called when a bytestream is imminent
1100
1101 @param stream_object(iface.IStreamProducer): File object where data will be written
1102 @param to_jid(jid.JId): jid of the other peer
1103 @param sid(unicode): session id
1104 @param initiator(bool): if True, this session is create by initiator
1105 @return (dict): session data
1106 """
1107 if sid in client.xep_0065_sid_session:
1108 raise exceptions.ConflictError(u'A session with this id already exists !')
1109 if requester:
1110 session_hash = getSessionHash(client.jid, to_jid, sid)
1111 session_data = self._registerHash(client, session_hash, stream_object)
1112 else:
1113 session_hash = getSessionHash(to_jid, client.jid, sid)
1114 session_d = defer.Deferred()
1115 session_d.addBoth(self.killSession, session_hash, sid, client)
1116 session_data = client._s5b_sessions[session_hash] = {
1117 DEFER_KEY: session_d,
1118 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1119 }
1120 client.xep_0065_sid_session[sid] = session_data
1121 session_data.update(
1122 {'id': sid,
1123 'peer_jid': to_jid,
1124 'stream_object': stream_object,
1125 'hash': session_hash,
1126 })
1127
1128 return session_data
1129
1130 def getSession(self, client, session_hash):
1131 """Return session data
1132
1133 @param session_hash(unicode): hash of the session
1134 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
1135 @param client(None, SatXMPPClient): client of the peer
1136 None is used only if client is unknown (this is only the case
1137 for incoming request received by Socks5ServerFactory). None must
1138 only be used by Socks5ServerFactory.
1139 See comments below for details
1140 @return (dict): session data
1141 """
1142 if client is None:
1143 try:
1144 client = self.hash_clients_map[session_hash]
1145 except KeyError as e:
1146 log.warning(u"The requested session doesn't exists !")
1147 raise e
1148 return client._s5b_sessions[session_hash]
1149
1150 def registerHash(self, *args, **kwargs):
1151 """like [_registerHash] but return the session deferred instead of the whole session
1152 session deferred is fired when transfer is finished
1153 """
1154 return self._registerHash(*args, **kwargs)[DEFER_KEY]
1155
1156 def _registerHash(self, client, session_hash, stream_object):
1157 """Create a session_data associated to hash
1158
1159 @param session_hash(str): hash of the session
1160 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
1161 None if it will be filled later
1162 return (dict): session data
1163 """
1164 assert session_hash not in client._s5b_sessions
1165 session_d = defer.Deferred()
1166 session_d.addBoth(self.killSession, session_hash, None, client)
1167 session_data = client._s5b_sessions[session_hash] = {
1168 DEFER_KEY: session_d,
1169 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1170 }
1171
1172 if stream_object is not None:
1173 session_data['stream_object'] = stream_object
1174
1175 assert session_hash not in self.hash_clients_map
1176 self.hash_clients_map[session_hash] = client
1177
1178 return session_data
1179
1180 def associateStreamObject(self, client, session_hash, stream_object):
1181 """Associate a stream object with a session"""
1182 session_data = self.getSession(client, session_hash)
1183 assert 'stream_object' not in session_data
1184 session_data['stream_object'] = stream_object
1185
1186 def streamQuery(self, iq_elt, client):
1187 log.debug(u"BS stream query")
1188
1189 iq_elt.handled = True
1190
1191 query_elt = iq_elt.elements(NS_BS, 'query').next()
1192 try:
1193 sid = query_elt['sid']
1194 except KeyError:
1195 log.warning(u"Invalid bystreams request received")
1196 return client.sendError(iq_elt, "bad-request")
1197
1198 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost'))
1199 if not streamhost_elts:
1200 return client.sendError(iq_elt, "bad-request")
1201
1202 try:
1203 session_data = client.xep_0065_sid_session[sid]
1204 except KeyError:
1205 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
1206 return client.sendError(iq_elt, 'not-acceptable')
1207
1208 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
1209
1210 candidates = []
1211 nb_sh = len(streamhost_elts)
1212 for idx, sh_elt in enumerate(streamhost_elts):
1213 try:
1214 host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid'])
1215 except KeyError:
1216 log.warning(u"malformed streamhost element")
1217 return client.sendError(iq_elt, "bad-request")
1218 priority = nb_sh - idx
1219 if jid_.userhostJID() != peer_jid.userhostJID():
1220 type_ = XEP_0065.TYPE_PROXY
1221 else:
1222 type_ = XEP_0065.TYPE_DIRECT
1223 candidates.append(Candidate(host, port, type_, priority, jid_))
1224
1225 for candidate in candidates:
1226 log.info(u"Candidate proposed: {}".format(candidate))
1227
1228 d = self.getBestCandidate(client, candidates, session_data['hash'])
1229 d.addCallback(self._ackStream, iq_elt, session_data, client)
1230
1231 def _ackStream(self, candidate, iq_elt, session_data, client):
1232 if candidate is None:
1233 log.info("No streamhost candidate worked, we have to end negotiation")
1234 return client.sendError(iq_elt, 'item-not-found')
1235 log.info(u"We choose: {}".format(candidate))
1236 result_elt = xmlstream.toResponse(iq_elt, 'result')
1237 query_elt = result_elt.addElement((NS_BS, 'query'))
1238 query_elt['sid'] = session_data['id']
1239 streamhost_used_elt = query_elt.addElement('streamhost-used')
1240 streamhost_used_elt['jid'] = candidate.jid.full()
1241 client.send(result_elt)
1242
1243
1244 class XEP_0065_handler(XMPPHandler):
1245 implements(iwokkel.IDisco)
1246
1247 def __init__(self, plugin_parent):
1248 self.plugin_parent = plugin_parent
1249 self.host = plugin_parent.host
1250
1251 def connectionInitialized(self):
1252 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent)
1253
1254 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1255 return [disco.DiscoFeature(NS_BS)]
1256
1257 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1258 return []