diff libervia/backend/plugins/plugin_xep_0065.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0065.py@524856bd7b19
children b86912d3fd33
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0065.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,1396 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for managing xep-0065
+
+# Copyright (C)
+# 2002, 2003, 2004   Dave Smith (dizzyd@jabber.org)
+# 2007, 2008         Fabio Forno (xmpp:ff@jabber.bluendo.com)
+# 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+# --
+
+# This module is based on proxy65 (http://code.google.com/p/proxy65),
+# originaly written by David Smith and modified by Fabio Forno.
+# It is sublicensed under AGPL v3 (or any later version) as allowed by the original
+# license.
+
+# --
+
+# Here is a copy of the original license:
+
+# Copyright (C)
+# 2002-2004   Dave Smith (dizzyd@jabber.org)
+# 2007-2008   Fabio Forno (xmpp:ff@jabber.bluendo.com)
+
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import struct
+import hashlib
+import uuid
+from collections import namedtuple
+from zope.interface import implementer
+from twisted.internet import protocol
+from twisted.internet import reactor
+from twisted.internet import error as internet_error
+from twisted.words.protocols.jabber import error as jabber_error
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber import xmlstream
+from twisted.internet import defer
+from wokkel import disco, iwokkel
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core import exceptions
+from libervia.backend.tools import sat_defer
+
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "XEP 0065 Plugin",
+    C.PI_IMPORT_NAME: "XEP-0065",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0065"],
+    C.PI_DEPENDENCIES: ["IP"],
+    C.PI_RECOMMENDATIONS: ["NAT-PORT"],
+    C.PI_MAIN: "XEP_0065",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""),
+}
+
+IQ_SET = '/iq[@type="set"]'
+NS_BS = "http://jabber.org/protocol/bytestreams"
+BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
+TIMER_KEY = "timer"
+DEFER_KEY = "finished"  # key of the deferred used to track session end
+SERVER_STARTING_PORT = (
+    0
+)  # starting number for server port search (0 to ask automatic attribution)
+
+# priorities are candidates local priorities, must be a int between 0 and 65535
+PRIORITY_BEST_DIRECT = 10000
+PRIORITY_DIRECT = 5000
+PRIORITY_ASSISTED = 1000
+PRIORITY_PROXY = 0.2  # proxy is the last option for s5b
+CANDIDATE_DELAY = 0.2  # see XEP-0260 §4
+CANDIDATE_DELAY_PROXY = 0.2  # additional time for proxy types (see XEP-0260 §4 note 3)
+
+TIMEOUT = 300  # maxium time between session creation and stream start
+
+# XXX: by default eveything is automatic
+# TODO: use these params to force use of specific proxy/port/IP
+# PARAMS = """
+#     <params>
+#     <general>
+#     <category name="File Transfer">
+#         <param name="Force IP" type="string" />
+#         <param name="Force Port" type="int" constraint="1;65535" />
+#     </category>
+#     </general>
+#     <individual>
+#     <category name="File Transfer">
+#         <param name="Force Proxy" value="" type="string" />
+#         <param name="Force Proxy host" value="" type="string" />
+#         <param name="Force Proxy port" value="" type="int" constraint="1;65535" />
+#     </category>
+#     </individual>
+#     </params>
+#     """
+
+(
+    STATE_INITIAL,
+    STATE_AUTH,
+    STATE_REQUEST,
+    STATE_READY,
+    STATE_AUTH_USERPASS,
+    STATE_CLIENT_INITIAL,
+    STATE_CLIENT_AUTH,
+    STATE_CLIENT_REQUEST,
+) = range(8)
+
+SOCKS5_VER = 0x05
+
+ADDR_IPV4 = 0x01
+ADDR_DOMAINNAME = 0x03
+ADDR_IPV6 = 0x04
+
+CMD_CONNECT = 0x01
+CMD_BIND = 0x02
+CMD_UDPASSOC = 0x03
+
+AUTHMECH_ANON = 0x00
+AUTHMECH_USERPASS = 0x02
+AUTHMECH_INVALID = 0xFF
+
+REPLY_SUCCESS = 0x00
+REPLY_GENERAL_FAILUR = 0x01
+REPLY_CONN_NOT_ALLOWED = 0x02
+REPLY_NETWORK_UNREACHABLE = 0x03
+REPLY_HOST_UNREACHABLE = 0x04
+REPLY_CONN_REFUSED = 0x05
+REPLY_TTL_EXPIRED = 0x06
+REPLY_CMD_NOT_SUPPORTED = 0x07
+REPLY_ADDR_NOT_SUPPORTED = 0x08
+
+
+ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"])
+
+
+class Candidate(object):
+    def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False,
+                 factory=None,):
+        """
+        @param host(unicode): host IP or domain
+        @param port(int): port
+        @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
+        @param priority(int): priority
+        @param jid_(jid.JID): jid
+        @param id_(None, id_): Candidate ID, or None to generate
+        @param priority_local(bool): if True, priority is used as local priority,
+            else priority is used as global one (and local priority is set to 0)
+        """
+        assert isinstance(jid_, jid.JID)
+        self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
+        self.id = id_ if id_ is not None else str(uuid.uuid4())
+        if priority_local:
+            self._local_priority = int(priority)
+            self._priority = self.calculate_priority()
+        else:
+            self._local_priority = 0
+            self._priority = int(priority)
+        self.factory = factory
+
+    def discard(self):
+        """Disconnect a candidate if it is connected
+
+        Used to disconnect tryed client when they are discarded
+        """
+        log.debug("Discarding {}".format(self))
+        try:
+            self.factory.discard()
+        except AttributeError:
+            pass  # no discard for Socks5ServerFactory
+
+    @property
+    def local_priority(self):
+        return self._local_priority
+
+    @property
+    def priority(self):
+        return self._priority
+
+    def __str__(self):
+        return "Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
+            self, id=" id={}".format(self.id if self.id is not None else "")
+        )
+
+    def __eq__(self, other):
+        # self.id is is not used in __eq__ as the same candidate can have
+        # different ids if proposed by initiator or responder
+        try:
+            return (
+                self.host == other.host
+                and self.port == other.port
+                and self.jid == other.jid
+            )
+        except (AttributeError, TypeError):
+            return False
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def calculate_priority(self):
+        """Calculate candidate priority according to XEP-0260 §2.2
+
+
+        @return (int): priority
+        """
+        if self.type == XEP_0065.TYPE_DIRECT:
+            multiplier = 126
+        elif self.type == XEP_0065.TYPE_ASSISTED:
+            multiplier = 120
+        elif self.type == XEP_0065.TYPE_TUNEL:
+            multiplier = 110
+        elif self.type == XEP_0065.TYPE_PROXY:
+            multiplier = 10
+        else:
+            raise exceptions.InternalError("Unknown {} type !".format(self.type))
+        return 2 ** 16 * multiplier + self._local_priority
+
+    def activate(self, client, sid, peer_jid, local_jid):
+        """Activate the proxy candidate
+
+        Send activation request as explained in XEP-0065 § 6.3.5
+        Must only be used with proxy candidates
+        @param sid(unicode): session id (same as for get_session_hash)
+        @param peer_jid(jid.JID): jid of the other peer
+        @return (D(domish.Element)): IQ result (or error)
+        """
+        assert self.type == XEP_0065.TYPE_PROXY
+        iq_elt = client.IQ()
+        iq_elt["from"] = local_jid.full()
+        iq_elt["to"] = self.jid.full()
+        query_elt = iq_elt.addElement((NS_BS, "query"))
+        query_elt["sid"] = sid
+        query_elt.addElement("activate", content=peer_jid.full())
+        return iq_elt.send()
+
+    def start_transfer(self, session_hash=None):
+        if self.type == XEP_0065.TYPE_PROXY:
+            chunk_size = 4096  # Prosody's proxy reject bigger chunks by default
+        else:
+            chunk_size = None
+        self.factory.start_transfer(session_hash, chunk_size=chunk_size)
+
+
+def get_session_hash(requester_jid, target_jid, sid):
+    """Calculate SHA1 Hash according to XEP-0065 §5.3.2
+
+    @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
+    @param target_jid(jid.JID): jid of the target
+    @param sid(unicode): session id
+    @return (str): hash
+    """
+    return hashlib.sha1(
+        (sid + requester_jid.full() + target_jid.full()).encode("utf-8")
+    ).hexdigest()
+
+
+class SOCKSv5(protocol.Protocol):
+    CHUNK_SIZE = 2 ** 16
+
+    def __init__(self, session_hash=None):
+        """
+        @param session_hash(str): hash of the session
+            must only be used in client mode
+        """
+        self.connection = defer.Deferred()  # called when connection/auth is done
+        if session_hash is not None:
+            assert isinstance(session_hash, str)
+            self.server_mode = False
+            self._session_hash = session_hash
+            self.state = STATE_CLIENT_INITIAL
+        else:
+            self.server_mode = True
+            self.state = STATE_INITIAL
+        self.buf = b""
+        self.supportedAuthMechs = [AUTHMECH_ANON]
+        self.supportedAddrs = [ADDR_DOMAINNAME]
+        self.enabledCommands = [CMD_CONNECT]
+        self.peersock = None
+        self.addressType = 0
+        self.requestType = 0
+        self._stream_object = None
+        self.active = False  # set to True when protocol is actually used for transfer
+        # used by factories to know when the finished Deferred can be triggered
+
+    @property
+    def stream_object(self):
+        if self._stream_object is None:
+            self._stream_object = self.getSession()["stream_object"]
+            if self.server_mode:
+                self._stream_object.registerProducer(self.transport, True)
+        return self._stream_object
+
+    def getSession(self):
+        """Return session associated with this candidate
+
+        @return (dict): session data
+        """
+        if self.server_mode:
+            return self.factory.getSession(self._session_hash)
+        else:
+            return self.factory.getSession()
+
+    def _start_negotiation(self):
+        log.debug("starting negotiation (client mode)")
+        self.state = STATE_CLIENT_AUTH
+        self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON))
+
+    def _parse_negotiation(self):
+        try:
+            # Parse out data
+            ver, nmethod = struct.unpack("!BB", self.buf[:2])
+            methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2])
+
+            # Ensure version is correct
+            if ver != 5:
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
+                self.transport.loseConnection()
+                return
+
+            # Trim off front of the buffer
+            self.buf = self.buf[nmethod + 2 :]
+
+            # Check for supported auth mechs
+            for m in self.supportedAuthMechs:
+                if m in methods:
+                    # Update internal state, according to selected method
+                    if m == AUTHMECH_ANON:
+                        self.state = STATE_REQUEST
+                    elif m == AUTHMECH_USERPASS:
+                        self.state = STATE_AUTH_USERPASS
+                    # Complete negotiation w/ this method
+                    self.transport.write(struct.pack("!BB", SOCKS5_VER, m))
+                    return
+
+            # No supported mechs found, notify client and close the connection
+            log.warning("Unsupported authentication mechanism")
+            self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
+            self.transport.loseConnection()
+        except struct.error:
+            pass
+
+    def _parse_user_pass(self):
+        try:
+            # Parse out data
+            ver, ulen = struct.unpack("BB", self.buf[:2])
+            uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2])
+            plen, = struct.unpack("B", self.buf[ulen + 2])
+            password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen])
+            # Trim off fron of the buffer
+            self.buf = self.buf[3 + ulen + plen :]
+            # Fire event to authenticate user
+            if self.authenticate_user_pass(uname, password):
+                # Signal success
+                self.state = STATE_REQUEST
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00))
+            else:
+                # Signal failure
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01))
+                self.transport.loseConnection()
+        except struct.error:
+            pass
+
+    def send_error_reply(self, errorcode):
+        # Any other address types are not supported
+        result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0)
+        self.transport.write(result)
+        self.transport.loseConnection()
+
+    def _parseRequest(self):
+        try:
+            # Parse out data and trim buffer accordingly
+            ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
+
+            # Ensure we actually support the requested address type
+            if self.addressType not in self.supportedAddrs:
+                self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Deal with addresses
+            if self.addressType == ADDR_IPV4:
+                addr, port = struct.unpack("!IH", self.buf[4:10])
+                self.buf = self.buf[10:]
+            elif self.addressType == ADDR_DOMAINNAME:
+                nlen = self.buf[4]
+                addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr) :]
+            else:
+                # Any other address types are not supported
+                self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Ensure command is supported
+            if cmd not in self.enabledCommands:
+                # Send a not supported error
+                self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
+                return
+
+            # Process the command
+            if cmd == CMD_CONNECT:
+                self.connect_requested(addr, port)
+            elif cmd == CMD_BIND:
+                self.bind_requested(addr, port)
+            else:
+                # Any other command is not supported
+                self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
+
+        except struct.error:
+            # The buffer is probably not complete, we need to wait more
+            return None
+
+    def _make_request(self):
+        hash_ = self._session_hash.encode('utf-8')
+        request = struct.pack(
+            "!5B%dsH" % len(hash_),
+            SOCKS5_VER,
+            CMD_CONNECT,
+            0,
+            ADDR_DOMAINNAME,
+            len(hash_),
+            hash_,
+            0,
+        )
+        self.transport.write(request)
+        self.state = STATE_CLIENT_REQUEST
+
+    def _parse_request_reply(self):
+        try:
+            ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
+            # Ensure we actually support the requested address type
+            if self.addressType not in self.supportedAddrs:
+                self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Deal with addresses
+            if self.addressType == ADDR_IPV4:
+                addr, port = struct.unpack("!IH", self.buf[4:10])
+                self.buf = self.buf[10:]
+            elif self.addressType == ADDR_DOMAINNAME:
+                nlen = self.buf[4]
+                addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr) :]
+            else:
+                # Any other address types are not supported
+                self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Ensure reply is OK
+            if rep != REPLY_SUCCESS:
+                self.loseConnection()
+                return
+
+            self.state = STATE_READY
+            self.connection.callback(None)
+
+        except struct.error:
+            # The buffer is probably not complete, we need to wait more
+            return None
+
+    def connectionMade(self):
+        log.debug(
+            "Socks5 connectionMade (mode = {})".format(
+                "server" if self.state == STATE_INITIAL else "client"
+            )
+        )
+        if self.state == STATE_CLIENT_INITIAL:
+            self._start_negotiation()
+
+    def connect_requested(self, addr, port):
+        # Check that this session is expected
+        if not self.factory.add_to_session(addr.decode('utf-8'), self):
+            log.warning(
+                "Unexpected connection request received from {host}".format(
+                    host=self.transport.getPeer().host
+                )
+            )
+            self.send_error_reply(REPLY_CONN_REFUSED)
+            return
+        self._session_hash = addr.decode('utf-8')
+        self.connect_completed(addr, 0)
+
+    def start_transfer(self, chunk_size):
+        """Callback called when the result iq is received
+
+        @param chunk_size(None, int): size of the buffer, or None for default
+        """
+        self.active = True
+        if chunk_size is not None:
+            self.CHUNK_SIZE = chunk_size
+        log.debug("Starting file transfer")
+        d = self.stream_object.start_stream(self.transport)
+        d.addCallback(self.stream_finished)
+
+    def stream_finished(self, d):
+        log.info(_("File transfer completed, closing connection"))
+        self.transport.loseConnection()
+
+    def connect_completed(self, remotehost, remoteport):
+        if self.addressType == ADDR_IPV4:
+            result = struct.pack(
+                "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport
+            )
+        elif self.addressType == ADDR_DOMAINNAME:
+            result = struct.pack(
+                "!BBBBB%dsH" % len(remotehost),
+                SOCKS5_VER,
+                REPLY_SUCCESS,
+                0,
+                ADDR_DOMAINNAME,
+                len(remotehost),
+                remotehost,
+                remoteport,
+            )
+        self.transport.write(result)
+        self.state = STATE_READY
+
+    def bind_requested(self, addr, port):
+        pass
+
+    def authenticate_user_pass(self, user, passwd):
+        # FIXME: implement authentication and remove the debug printing a password
+        log.debug("User/pass: %s/%s" % (user, passwd))
+        return True
+
+    def dataReceived(self, buf):
+        if self.state == STATE_READY:
+            # Everything is set, we just have to write the incoming data
+            self.stream_object.write(buf)
+            if not self.active:
+                self.active = True
+                self.getSession()[TIMER_KEY].cancel()
+            return
+
+        self.buf = self.buf + buf
+        if self.state == STATE_INITIAL:
+            self._parse_negotiation()
+        if self.state == STATE_AUTH_USERPASS:
+            self._parse_user_pass()
+        if self.state == STATE_REQUEST:
+            self._parseRequest()
+        if self.state == STATE_CLIENT_REQUEST:
+            self._parse_request_reply()
+        if self.state == STATE_CLIENT_AUTH:
+            ver, method = struct.unpack("!BB", buf)
+            self.buf = self.buf[2:]
+            if ver != SOCKS5_VER or method != AUTHMECH_ANON:
+                self.transport.loseConnection()
+            else:
+                self._make_request()
+
+    def connectionLost(self, reason):
+        log.debug("Socks5 connection lost: {}".format(reason.value))
+        if self.state != STATE_READY:
+            self.connection.errback(reason)
+        if self.server_mode:
+            try:
+                session_hash = self._session_hash
+            except AttributeError:
+                log.debug("no session has been received yet")
+            else:
+                self.factory.remove_from_session(session_hash, self, reason)
+
+
+class Socks5ServerFactory(protocol.ServerFactory):
+    protocol = SOCKSv5
+
+    def __init__(self, parent):
+        """
+        @param parent(XEP_0065): XEP_0065 parent instance
+        """
+        self.parent = parent
+
+    def getSession(self, session_hash):
+        return self.parent.getSession(None, session_hash)
+
+    def start_transfer(self, session_hash, chunk_size=None):
+        session = self.getSession(session_hash)
+        try:
+            protocol = session["protocols"][0]
+        except (KeyError, IndexError):
+            log.error("Can't start file transfer, can't find protocol")
+        else:
+            session[TIMER_KEY].cancel()
+            protocol.start_transfer(chunk_size)
+
+    def add_to_session(self, session_hash, protocol):
+        """Check is session_hash is valid, and associate protocol with it
+
+        the session will be associated to the corresponding candidate
+        @param session_hash(str): hash of the session
+        @param protocol(SOCKSv5): protocol instance
+        @param return(bool): True if hash was valid (i.e. expected), False else
+        """
+        assert isinstance(session_hash, str)
+        try:
+            session_data = self.getSession(session_hash)
+        except KeyError:
+            return False
+        else:
+            session_data.setdefault("protocols", []).append(protocol)
+            return True
+
+    def remove_from_session(self, session_hash, protocol, reason):
+        """Remove a protocol from session_data
+
+        There can be several protocol instances while candidates are tried, they
+        have removed when candidate connection is closed
+        @param session_hash(str): hash of the session
+        @param protocol(SOCKSv5): protocol instance
+        @param reason(failure.Failure): reason of the removal
+        """
+        try:
+            protocols = self.getSession(session_hash)["protocols"]
+            protocols.remove(protocol)
+        except (KeyError, ValueError):
+            log.error("Protocol not found in session while it should be there")
+        else:
+            if protocol.active:
+                # The active protocol has been removed, session is finished
+                if reason.check(internet_error.ConnectionDone):
+                    self.getSession(session_hash)[DEFER_KEY].callback(None)
+                else:
+                    self.getSession(session_hash)[DEFER_KEY].errback(reason)
+
+
+class Socks5ClientFactory(protocol.ClientFactory):
+    protocol = SOCKSv5
+
+    def __init__(self, client, parent, session, session_hash):
+        """Init the Client Factory
+
+        @param session(dict): session data
+        @param session_hash(unicode): hash used for peer_connection
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        """
+        self.session = session
+        self.session_hash = session_hash
+        self.client = client
+        self.connection = defer.Deferred()
+        self._protocol_instance = None
+        self.connector = None
+
+    def discard(self):
+        """Disconnect the client
+
+        Also set a discarded flag, which avoid to call the session Deferred
+        """
+        self.connector.disconnect()
+
+    def getSession(self):
+        return self.session
+
+    def start_transfer(self, __=None, chunk_size=None):
+        self.session[TIMER_KEY].cancel()
+        self._protocol_instance.start_transfer(chunk_size)
+
+    def clientConnectionFailed(self, connector, reason):
+        log.debug("Connection failed")
+        self.connection.errback(reason)
+
+    def clientConnectionLost(self, connector, reason):
+        log.debug(_("Socks 5 client connection lost (reason: %s)") % reason.value)
+        if self._protocol_instance.active:
+            # This one was used for the transfer, than mean that
+            # the Socks5 session is finished
+            if reason.check(internet_error.ConnectionDone):
+                self.getSession()[DEFER_KEY].callback(None)
+            else:
+                self.getSession()[DEFER_KEY].errback(reason)
+        self._protocol_instance = None
+
+    def buildProtocol(self, addr):
+        log.debug(("Socks 5 client connection started"))
+        p = self.protocol(session_hash=self.session_hash)
+        p.factory = self
+        p.connection.chainDeferred(self.connection)
+        self._protocol_instance = p
+        return p
+
+
+class XEP_0065(object):
+    NAMESPACE = NS_BS
+    TYPE_DIRECT = "direct"
+    TYPE_ASSISTED = "assisted"
+    TYPE_TUNEL = "tunel"
+    TYPE_PROXY = "proxy"
+    Candidate = Candidate
+
+    def __init__(self, host):
+        log.info(_("Plugin XEP_0065 initialization"))
+        self.host = host
+
+        # session data
+        self.hash_clients_map = {}  # key: hash of the transfer session, value: session data
+        self._cache_proxies = {}  # key: server jid, value: proxy data
+
+        # misc data
+        self._server_factory = None
+        self._external_port = None
+
+        # plugins shortcuts
+        self._ip = self.host.plugins["IP"]
+        try:
+            self._np = self.host.plugins["NAT-PORT"]
+        except KeyError:
+            log.debug("NAT Port plugin not available")
+            self._np = None
+
+        # parameters
+        # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
+        # host.memory.update_params(PARAMS)
+
+    def get_handler(self, client):
+        return XEP_0065_handler(self)
+
+    def profile_connected(self, client):
+        client.xep_0065_sid_session = {}  # key: stream_id, value: session_data(dict)
+        client._s5b_sessions = {}
+
+    def get_session_hash(self, from_jid, to_jid, sid):
+        return get_session_hash(from_jid, to_jid, sid)
+
+    def get_socks_5_server_factory(self):
+        """Return server factory
+
+        The server is created if it doesn't exists yet
+        self._server_factory_port is set on server creation
+        """
+
+        if self._server_factory is None:
+            self._server_factory = Socks5ServerFactory(self)
+            for port in range(SERVER_STARTING_PORT, 65356):
+                try:
+                    listening_port = reactor.listenTCP(port, self._server_factory)
+                except internet_error.CannotListenError as e:
+                    log.debug(
+                        "Cannot listen on port {port}: {err_msg}{err_num}".format(
+                            port=port,
+                            err_msg=e.socketError.strerror,
+                            err_num=" (error code: {})".format(e.socketError.errno),
+                        )
+                    )
+                else:
+                    self._server_factory_port = listening_port.getHost().port
+                    break
+
+            log.info(
+                _("Socks5 Stream server launched on port {}").format(
+                    self._server_factory_port
+                )
+            )
+        return self._server_factory
+
+    @defer.inlineCallbacks
+    def get_proxy(self, client, local_jid):
+        """Return the proxy available for this profile
+
+        cache is used between clients using the same server
+        @param local_jid(jid.JID): same as for [get_candidates]
+        @return ((D)(ProxyInfos, None)): Found proxy infos,
+            or None if not acceptable proxy is found
+        @raise exceptions.NotFound: no Proxy found
+        """
+
+        def notFound(server):
+            log.info("No proxy found on this server")
+            self._cache_proxies[server] = None
+            raise exceptions.NotFound
+
+        server = client.host if client.is_component else client.jid.host
+        try:
+            defer.returnValue(self._cache_proxies[server])
+        except KeyError:
+            pass
+        try:
+            proxy = (
+                yield self.host.find_service_entities(client, "proxy", "bytestreams")
+            ).pop()
+        except (defer.CancelledError, StopIteration, KeyError):
+            notFound(server)
+        iq_elt = client.IQ("get")
+        iq_elt["from"] = local_jid.full()
+        iq_elt["to"] = proxy.full()
+        iq_elt.addElement((NS_BS, "query"))
+
+        try:
+            result_elt = yield iq_elt.send()
+        except jabber_error.StanzaError as failure:
+            log.warning(
+                "Error while requesting proxy info on {jid}: {error}".format(
+                    jid=proxy.full(), error=failure
+                )
+            )
+            notFound(server)
+
+        try:
+            query_elt = next(result_elt.elements(NS_BS, "query"))
+            streamhost_elt = next(query_elt.elements(NS_BS, "streamhost"))
+            host = streamhost_elt["host"]
+            jid_ = streamhost_elt["jid"]
+            port = streamhost_elt["port"]
+            if not all((host, jid, port)):
+                raise KeyError
+            jid_ = jid.JID(jid_)
+        except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError):
+            log.warning("Invalid proxy data received from {}".format(proxy.full()))
+            notFound(server)
+
+        proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
+        log.info("Proxy found: {}".format(proxy_infos))
+        defer.returnValue(proxy_infos)
+
+    @defer.inlineCallbacks
+    def _get_network_data(self, client):
+        """Retrieve information about network
+
+        @param client: %(doc_client)s
+        @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
+        """
+        self.get_socks_5_server_factory()
+        local_port = self._server_factory_port
+        external_ip = yield self._ip.get_external_ip(client)
+        local_ips = yield self._ip.get_local_i_ps(client)
+
+        if external_ip is not None and self._external_port is None:
+            if external_ip != local_ips[0]:
+                log.info("We are probably behind a NAT")
+                if self._np is None:
+                    log.warning("NAT port plugin not available, we can't map port")
+                else:
+                    ext_port = yield self._np.map_port(
+                        local_port, desc="SaT socks5 stream"
+                    )
+                    if ext_port is None:
+                        log.warning("Can't map NAT port")
+                    else:
+                        self._external_port = ext_port
+
+        defer.returnValue((local_port, self._external_port, local_ips, external_ip))
+
+    @defer.inlineCallbacks
+    def get_candidates(self, client, local_jid):
+        """Return a list of our stream candidates
+
+        @param local_jid(jid.JID): jid to use as local jid
+            This is needed for client which can be addressed with a different jid than
+            client.jid if a local part is used (e.g. piotr@file.example.net where
+            client.jid would be file.example.net)
+        @return (D(list[Candidate])): list of candidates, ordered by priority
+        """
+        server_factory = yield self.get_socks_5_server_factory()
+        local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client)
+        try:
+            proxy = yield self.get_proxy(client, local_jid)
+        except exceptions.NotFound:
+            proxy = None
+
+        # its time to gather the candidates
+        candidates = []
+
+        # first the direct ones
+
+        # the preferred direct connection
+        ip = local_ips.pop(0)
+        candidates.append(
+            Candidate(
+                ip,
+                local_port,
+                XEP_0065.TYPE_DIRECT,
+                PRIORITY_BEST_DIRECT,
+                local_jid,
+                priority_local=True,
+                factory=server_factory,
+            )
+        )
+        for ip in local_ips:
+            candidates.append(
+                Candidate(
+                    ip,
+                    local_port,
+                    XEP_0065.TYPE_DIRECT,
+                    PRIORITY_DIRECT,
+                    local_jid,
+                    priority_local=True,
+                    factory=server_factory,
+                )
+            )
+
+        # then the assisted one
+        if ext_port is not None:
+            candidates.append(
+                Candidate(
+                    external_ip,
+                    ext_port,
+                    XEP_0065.TYPE_ASSISTED,
+                    PRIORITY_ASSISTED,
+                    local_jid,
+                    priority_local=True,
+                    factory=server_factory,
+                )
+            )
+
+        # finally the proxy
+        if proxy:
+            candidates.append(
+                Candidate(
+                    proxy.host,
+                    proxy.port,
+                    XEP_0065.TYPE_PROXY,
+                    PRIORITY_PROXY,
+                    proxy.jid,
+                    priority_local=True,
+                )
+            )
+
+        # should be already sorted, but just in case the priorities get weird
+        candidates.sort(key=lambda c: c.priority, reverse=True)
+        defer.returnValue(candidates)
+
+    def _add_connector(self, connector, candidate):
+        """Add connector used to connect to candidate, and return client factory's connection Deferred
+
+        the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
+        @param connector: a connector implementing IConnector
+        @param candidate(Candidate): candidate linked to the connector
+        @return (D): Deferred fired when factory connection is done or has failed
+        """
+        candidate.factory.connector = connector
+        return candidate.factory.connection
+
+    def connect_candidate(
+        self, client, candidate, session_hash, peer_session_hash=None, delay=None
+    ):
+        """Connect to a candidate
+
+        Connection will be done with a Socks5ClientFactory
+        @param candidate(Candidate): candidate to connect to
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        @param peer_session_hash(unicode, None): hash used with the peer
+            None to use session_hash.
+            None must be used in 2 cases:
+                - when XEP-0065 is used with XEP-0096
+                - when a peer connect to a proxy *he proposed himself*
+            in practice, peer_session_hash is only used by try_candidates
+        @param delay(None, float): optional delay to wait before connection, in seconds
+        @return (D): Deferred launched when TCP connection + Socks5 connection is done
+        """
+        if peer_session_hash is None:
+            # for XEP-0065, only one hash is needed
+            peer_session_hash = session_hash
+        session = self.getSession(client, session_hash)
+        factory = Socks5ClientFactory(client, self, session, peer_session_hash)
+        candidate.factory = factory
+        if delay is None:
+            d = defer.succeed(candidate.host)
+        else:
+            d = sat_defer.DelayedDeferred(delay, candidate.host)
+        d.addCallback(reactor.connectTCP, candidate.port, factory)
+        d.addCallback(self._add_connector, candidate)
+        return d
+
+    def try_candidates(
+        self,
+        client,
+        candidates,
+        session_hash,
+        peer_session_hash,
+        connection_cb=None,
+        connection_eb=None,
+    ):
+        defers_list = []
+
+        for candidate in candidates:
+            delay = CANDIDATE_DELAY * len(defers_list)
+            if candidate.type == XEP_0065.TYPE_PROXY:
+                delay += CANDIDATE_DELAY_PROXY
+            d = self.connect_candidate(
+                client, candidate, session_hash, peer_session_hash, delay
+            )
+            if connection_cb is not None:
+                d.addCallback(
+                    lambda __, candidate=candidate, client=client: connection_cb(
+                        client, candidate
+                    )
+                )
+            if connection_eb is not None:
+                d.addErrback(connection_eb, client, candidate)
+            defers_list.append(d)
+
+        return defers_list
+
+    def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None):
+        """Get best candidate (according to priority) which can connect
+
+        @param candidates(iterable[Candidate]): candidates to test
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        @param peer_session_hash(unicode, None): hash of the other peer
+            only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
+        @return (D(None, Candidate)): best candidate or None if none can connect
+        """
+        defer_candidates = None
+
+        def connection_cb(client, candidate):
+            log.info("Connection of {} successful".format(str(candidate)))
+            for idx, other_candidate in enumerate(candidates):
+                try:
+                    if other_candidate.priority < candidate.priority:
+                        log.debug("Cancelling {}".format(other_candidate))
+                        defer_candidates[idx].cancel()
+                except AttributeError:
+                    assert other_candidate is None
+
+        def connection_eb(failure, client, candidate):
+            if failure.check(defer.CancelledError):
+                log.debug("Connection of {} has been cancelled".format(candidate))
+            else:
+                log.info(
+                    "Connection of {candidate} Failed: {error}".format(
+                        candidate=candidate, error=failure.value
+                    )
+                )
+            candidates[candidates.index(candidate)] = None
+
+        def all_tested(__):
+            log.debug("All candidates have been tested")
+            good_candidates = [c for c in candidates if c]
+            return good_candidates[0] if good_candidates else None
+
+        defer_candidates = self.try_candidates(
+            client,
+            candidates,
+            session_hash,
+            peer_session_hash,
+            connection_cb,
+            connection_eb,
+        )
+        d_list = defer.DeferredList(defer_candidates)
+        d_list.addCallback(all_tested)
+        return d_list
+
+    def _time_out(self, session_hash, client):
+        """Called when stream was not started quickly enough
+
+        @param session_hash(str): hash as returned by get_session_hash
+        @param client: %(doc_client)s
+        """
+        log.info("Socks5 Bytestream: TimeOut reached")
+        session = self.getSession(client, session_hash)
+        session[DEFER_KEY].errback(exceptions.TimeOutError())
+
+    def kill_session(self, failure_, session_hash, sid, client):
+        """Clean the current session
+
+        @param session_hash(str): hash as returned by get_session_hash
+        @param sid(None, unicode): session id
+            or None if self.xep_0065_sid_session was not used
+        @param client: %(doc_client)s
+        @param failure_(None, failure.Failure): None if eveything was fine, a failure else
+        @return (None, failure.Failure): failure_ is returned
+        """
+        log.debug(
+            "Cleaning session with hash {hash}{id}: {reason}".format(
+                hash=session_hash,
+                reason="" if failure_ is None else failure_.value,
+                id="" if sid is None else " (id: {})".format(sid),
+            )
+        )
+
+        try:
+            assert self.hash_clients_map[session_hash] == client
+            del self.hash_clients_map[session_hash]
+        except KeyError:
+            pass
+
+        if sid is not None:
+            try:
+                del client.xep_0065_sid_session[sid]
+            except KeyError:
+                log.warning("Session id {} is unknown".format(sid))
+
+        try:
+            session_data = client._s5b_sessions[session_hash]
+        except KeyError:
+            log.warning("There is no session with this hash")
+            return
+        else:
+            del client._s5b_sessions[session_hash]
+
+        try:
+            session_data["timer"].cancel()
+        except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
+            pass
+
+        return failure_
+
+    def start_stream(self, client, stream_object, local_jid, to_jid, sid):
+        """Launch the stream workflow
+
+        @param streamProducer: stream_object to use
+        @param local_jid(jid.JID): same as for [get_candidates]
+        @param to_jid: JID of the recipient
+        @param sid: Stream session id
+        @param successCb: method to call when stream successfuly finished
+        @param failureCb: method to call when something goes wrong
+        @return (D): Deferred fired when session is finished
+        """
+        session_data = self._create_session(
+            client, stream_object, local_jid, to_jid, sid, True)
+
+        session_data[client] = client
+
+        def got_candidates(candidates):
+            session_data["candidates"] = candidates
+            iq_elt = client.IQ()
+            iq_elt["from"] = local_jid.full()
+            iq_elt["to"] = to_jid.full()
+            query_elt = iq_elt.addElement((NS_BS, "query"))
+            query_elt["mode"] = "tcp"
+            query_elt["sid"] = sid
+
+            for candidate in candidates:
+                streamhost = query_elt.addElement("streamhost")
+                streamhost["host"] = candidate.host
+                streamhost["port"] = str(candidate.port)
+                streamhost["jid"] = candidate.jid.full()
+                log.debug("Candidate proposed: {}".format(candidate))
+
+            d = iq_elt.send()
+            args = [client, session_data, local_jid]
+            d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args)
+
+        self.get_candidates(client, local_jid).addCallback(got_candidates)
+        return session_data[DEFER_KEY]
+
+    def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid):
+        """Called when the result of open iq is received
+
+        @param session_data(dict): data of the session
+        @param client: %(doc_client)s
+        @param iq_elt(domish.Element): <iq> result
+        """
+        try:
+            query_elt = next(iq_elt.elements(NS_BS, "query"))
+            streamhost_used_elt = next(query_elt.elements(NS_BS, "streamhost-used"))
+        except StopIteration:
+            log.warning("No streamhost found in stream query")
+            # FIXME: must clean session
+            return
+
+        streamhost_jid = jid.JID(streamhost_used_elt["jid"])
+        try:
+            candidate = next((
+                c for c in session_data["candidates"] if c.jid == streamhost_jid
+            ))
+        except StopIteration:
+            log.warning(
+                "Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())
+            )
+            return
+        else:
+            log.info("Candidate choosed by target: {}".format(candidate))
+
+        if candidate.type == XEP_0065.TYPE_PROXY:
+            log.info("A Socks5 proxy is used")
+            d = self.connect_candidate(client, candidate, session_data["hash"])
+            d.addCallback(
+                lambda __: candidate.activate(
+                    client, session_data["id"], session_data["peer_jid"], local_jid
+                )
+            )
+            d.addErrback(self._activation_eb)
+        else:
+            d = defer.succeed(None)
+
+        d.addCallback(lambda __: candidate.start_transfer(session_data["hash"]))
+
+    def _activation_eb(self, failure):
+        log.warning("Proxy activation error: {}".format(failure.value))
+
+    def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid):
+        log.warning("Socks5 transfer failed: {}".format(stanza_err.value))
+        # FIXME: must clean session
+
+    def create_session(self, *args, **kwargs):
+        """like [_create_session] but return the session deferred instead of the whole session
+
+        session deferred is fired when transfer is finished
+        """
+        return self._create_session(*args, **kwargs)[DEFER_KEY]
+
+    def _create_session(self, client, stream_object, local_jid, to_jid, sid,
+                       requester=False):
+        """Called when a bytestream is imminent
+
+        @param stream_object(iface.IStreamProducer): File object where data will be
+            written
+        @param to_jid(jid.JId): jid of the other peer
+        @param sid(unicode): session id
+        @param initiator(bool): if True, this session is create by initiator
+        @return (dict): session data
+        """
+        if sid in client.xep_0065_sid_session:
+            raise exceptions.ConflictError("A session with this id already exists !")
+        if requester:
+            session_hash = get_session_hash(local_jid, to_jid, sid)
+            session_data = self._register_hash(client, session_hash, stream_object)
+        else:
+            session_hash = get_session_hash(to_jid, local_jid, sid)
+            session_d = defer.Deferred()
+            session_d.addBoth(self.kill_session, session_hash, sid, client)
+            session_data = client._s5b_sessions[session_hash] = {
+                DEFER_KEY: session_d,
+                TIMER_KEY: reactor.callLater(
+                    TIMEOUT, self._time_out, session_hash, client
+                ),
+            }
+        client.xep_0065_sid_session[sid] = session_data
+        session_data.update(
+            {
+                "id": sid,
+                "local_jid": local_jid,
+                "peer_jid": to_jid,
+                "stream_object": stream_object,
+                "hash": session_hash,
+            }
+        )
+
+        return session_data
+
+    def getSession(self, client, session_hash):
+        """Return session data
+
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        @param client(None, SatXMPPClient): client of the peer
+            None is used only if client is unknown (this is only the case
+            for incoming request received by Socks5ServerFactory). None must
+            only be used by Socks5ServerFactory.
+            See comments below for details
+        @return (dict): session data
+        """
+        assert isinstance(session_hash, str)
+        if client is None:
+            try:
+                client = self.hash_clients_map[session_hash]
+            except KeyError as e:
+                log.warning("The requested session doesn't exists !")
+                raise e
+        return client._s5b_sessions[session_hash]
+
+    def register_hash(self, *args, **kwargs):
+        """like [_register_hash] but return the session deferred instead of the whole session
+        session deferred is fired when transfer is finished
+        """
+        return self._register_hash(*args, **kwargs)[DEFER_KEY]
+
+    def _register_hash(self, client, session_hash, stream_object):
+        """Create a session_data associated to hash
+
+        @param session_hash(str): hash of the session
+        @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
+            None if it will be filled later
+        return (dict): session data
+        """
+        assert session_hash not in client._s5b_sessions
+        session_d = defer.Deferred()
+        session_d.addBoth(self.kill_session, session_hash, None, client)
+        session_data = client._s5b_sessions[session_hash] = {
+            DEFER_KEY: session_d,
+            TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client),
+        }
+
+        if stream_object is not None:
+            session_data["stream_object"] = stream_object
+
+        assert session_hash not in self.hash_clients_map
+        self.hash_clients_map[session_hash] = client
+
+        return session_data
+
+    def associate_stream_object(self, client, session_hash, stream_object):
+        """Associate a stream object with  a session"""
+        session_data = self.getSession(client, session_hash)
+        assert "stream_object" not in session_data
+        session_data["stream_object"] = stream_object
+
+    def stream_query(self, iq_elt, client):
+        log.debug("BS stream query")
+
+        iq_elt.handled = True
+
+        query_elt = next(iq_elt.elements(NS_BS, "query"))
+        try:
+            sid = query_elt["sid"]
+        except KeyError:
+            log.warning("Invalid bystreams request received")
+            return client.sendError(iq_elt, "bad-request")
+
+        streamhost_elts = list(query_elt.elements(NS_BS, "streamhost"))
+        if not streamhost_elts:
+            return client.sendError(iq_elt, "bad-request")
+
+        try:
+            session_data = client.xep_0065_sid_session[sid]
+        except KeyError:
+            log.warning("Ignoring unexpected BS transfer: {}".format(sid))
+            return client.sendError(iq_elt, "not-acceptable")
+
+        peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
+
+        candidates = []
+        nb_sh = len(streamhost_elts)
+        for idx, sh_elt in enumerate(streamhost_elts):
+            try:
+                host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"])
+            except KeyError:
+                log.warning("malformed streamhost element")
+                return client.sendError(iq_elt, "bad-request")
+            priority = nb_sh - idx
+            if jid_.userhostJID() != peer_jid.userhostJID():
+                type_ = XEP_0065.TYPE_PROXY
+            else:
+                type_ = XEP_0065.TYPE_DIRECT
+            candidates.append(Candidate(host, port, type_, priority, jid_))
+
+        for candidate in candidates:
+            log.info("Candidate proposed: {}".format(candidate))
+
+        d = self.get_best_candidate(client, candidates, session_data["hash"])
+        d.addCallback(self._ack_stream, iq_elt, session_data, client)
+
+    def _ack_stream(self, candidate, iq_elt, session_data, client):
+        if candidate is None:
+            log.info("No streamhost candidate worked, we have to end negotiation")
+            return client.sendError(iq_elt, "item-not-found")
+        log.info("We choose: {}".format(candidate))
+        result_elt = xmlstream.toResponse(iq_elt, "result")
+        query_elt = result_elt.addElement((NS_BS, "query"))
+        query_elt["sid"] = session_data["id"]
+        streamhost_used_elt = query_elt.addElement("streamhost-used")
+        streamhost_used_elt["jid"] = candidate.jid.full()
+        client.send(result_elt)
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0065_handler(xmlstream.XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+        self.host = plugin_parent.host
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(
+            BS_REQUEST, self.plugin_parent.stream_query, client=self.parent
+        )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_BS)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []