Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0065.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0065.py@524856bd7b19 |
children | b86912d3fd33 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0065.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,1396 @@ +#!/usr/bin/env python3 + + +# SAT plugin for managing xep-0065 + +# Copyright (C) +# 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) +# 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) +# 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# -- + +# This module is based on proxy65 (http://code.google.com/p/proxy65), +# originaly written by David Smith and modified by Fabio Forno. +# It is sublicensed under AGPL v3 (or any later version) as allowed by the original +# license. + +# -- + +# Here is a copy of the original license: + +# Copyright (C) +# 2002-2004 Dave Smith (dizzyd@jabber.org) +# 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import struct +import hashlib +import uuid +from collections import namedtuple +from zope.interface import implementer +from twisted.internet import protocol +from twisted.internet import reactor +from twisted.internet import error as internet_error +from twisted.words.protocols.jabber import error as jabber_error +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import xmlstream +from twisted.internet import defer +from wokkel import disco, iwokkel +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.constants import Const as C +from libervia.backend.core import exceptions +from libervia.backend.tools import sat_defer + + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "XEP 0065 Plugin", + C.PI_IMPORT_NAME: "XEP-0065", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0065"], + C.PI_DEPENDENCIES: ["IP"], + C.PI_RECOMMENDATIONS: ["NAT-PORT"], + C.PI_MAIN: "XEP_0065", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""), +} + +IQ_SET = '/iq[@type="set"]' +NS_BS = "http://jabber.org/protocol/bytestreams" +BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' +TIMER_KEY = "timer" +DEFER_KEY = "finished" # key of the deferred used to track session end +SERVER_STARTING_PORT = ( + 0 +) # starting number for server port search (0 to ask automatic attribution) + +# priorities are candidates local priorities, must be a int between 0 and 65535 +PRIORITY_BEST_DIRECT = 10000 +PRIORITY_DIRECT = 5000 +PRIORITY_ASSISTED = 1000 +PRIORITY_PROXY = 0.2 # proxy is the last option for s5b +CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 +CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) + +TIMEOUT = 300 # maxium time between session creation and stream start + +# XXX: by default eveything is automatic +# TODO: use these params to force use of specific proxy/port/IP +# PARAMS = """ +# <params> +# <general> +# <category name="File Transfer"> +# <param name="Force IP" type="string" /> +# <param name="Force Port" type="int" constraint="1;65535" /> +# </category> +# </general> +# <individual> +# <category name="File Transfer"> +# <param name="Force Proxy" value="" type="string" /> +# <param name="Force Proxy host" value="" type="string" /> +# <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> +# </category> +# </individual> +# </params> +# """ + +( + STATE_INITIAL, + STATE_AUTH, + STATE_REQUEST, + STATE_READY, + STATE_AUTH_USERPASS, + STATE_CLIENT_INITIAL, + STATE_CLIENT_AUTH, + STATE_CLIENT_REQUEST, +) = range(8) + +SOCKS5_VER = 0x05 + +ADDR_IPV4 = 0x01 +ADDR_DOMAINNAME = 0x03 +ADDR_IPV6 = 0x04 + +CMD_CONNECT = 0x01 +CMD_BIND = 0x02 +CMD_UDPASSOC = 0x03 + +AUTHMECH_ANON = 0x00 +AUTHMECH_USERPASS = 0x02 +AUTHMECH_INVALID = 0xFF + +REPLY_SUCCESS = 0x00 +REPLY_GENERAL_FAILUR = 0x01 +REPLY_CONN_NOT_ALLOWED = 0x02 +REPLY_NETWORK_UNREACHABLE = 0x03 +REPLY_HOST_UNREACHABLE = 0x04 +REPLY_CONN_REFUSED = 0x05 +REPLY_TTL_EXPIRED = 0x06 +REPLY_CMD_NOT_SUPPORTED = 0x07 +REPLY_ADDR_NOT_SUPPORTED = 0x08 + + +ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"]) + + +class Candidate(object): + def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, + factory=None,): + """ + @param host(unicode): host IP or domain + @param port(int): port + @param type_(unicode): stream type (one of XEP_0065.TYPE_*) + @param priority(int): priority + @param jid_(jid.JID): jid + @param id_(None, id_): Candidate ID, or None to generate + @param priority_local(bool): if True, priority is used as local priority, + else priority is used as global one (and local priority is set to 0) + """ + assert isinstance(jid_, jid.JID) + self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) + self.id = id_ if id_ is not None else str(uuid.uuid4()) + if priority_local: + self._local_priority = int(priority) + self._priority = self.calculate_priority() + else: + self._local_priority = 0 + self._priority = int(priority) + self.factory = factory + + def discard(self): + """Disconnect a candidate if it is connected + + Used to disconnect tryed client when they are discarded + """ + log.debug("Discarding {}".format(self)) + try: + self.factory.discard() + except AttributeError: + pass # no discard for Socks5ServerFactory + + @property + def local_priority(self): + return self._local_priority + + @property + def priority(self): + return self._priority + + def __str__(self): + return "Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( + self, id=" id={}".format(self.id if self.id is not None else "") + ) + + def __eq__(self, other): + # self.id is is not used in __eq__ as the same candidate can have + # different ids if proposed by initiator or responder + try: + return ( + self.host == other.host + and self.port == other.port + and self.jid == other.jid + ) + except (AttributeError, TypeError): + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def calculate_priority(self): + """Calculate candidate priority according to XEP-0260 §2.2 + + + @return (int): priority + """ + if self.type == XEP_0065.TYPE_DIRECT: + multiplier = 126 + elif self.type == XEP_0065.TYPE_ASSISTED: + multiplier = 120 + elif self.type == XEP_0065.TYPE_TUNEL: + multiplier = 110 + elif self.type == XEP_0065.TYPE_PROXY: + multiplier = 10 + else: + raise exceptions.InternalError("Unknown {} type !".format(self.type)) + return 2 ** 16 * multiplier + self._local_priority + + def activate(self, client, sid, peer_jid, local_jid): + """Activate the proxy candidate + + Send activation request as explained in XEP-0065 § 6.3.5 + Must only be used with proxy candidates + @param sid(unicode): session id (same as for get_session_hash) + @param peer_jid(jid.JID): jid of the other peer + @return (D(domish.Element)): IQ result (or error) + """ + assert self.type == XEP_0065.TYPE_PROXY + iq_elt = client.IQ() + iq_elt["from"] = local_jid.full() + iq_elt["to"] = self.jid.full() + query_elt = iq_elt.addElement((NS_BS, "query")) + query_elt["sid"] = sid + query_elt.addElement("activate", content=peer_jid.full()) + return iq_elt.send() + + def start_transfer(self, session_hash=None): + if self.type == XEP_0065.TYPE_PROXY: + chunk_size = 4096 # Prosody's proxy reject bigger chunks by default + else: + chunk_size = None + self.factory.start_transfer(session_hash, chunk_size=chunk_size) + + +def get_session_hash(requester_jid, target_jid, sid): + """Calculate SHA1 Hash according to XEP-0065 §5.3.2 + + @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) + @param target_jid(jid.JID): jid of the target + @param sid(unicode): session id + @return (str): hash + """ + return hashlib.sha1( + (sid + requester_jid.full() + target_jid.full()).encode("utf-8") + ).hexdigest() + + +class SOCKSv5(protocol.Protocol): + CHUNK_SIZE = 2 ** 16 + + def __init__(self, session_hash=None): + """ + @param session_hash(str): hash of the session + must only be used in client mode + """ + self.connection = defer.Deferred() # called when connection/auth is done + if session_hash is not None: + assert isinstance(session_hash, str) + self.server_mode = False + self._session_hash = session_hash + self.state = STATE_CLIENT_INITIAL + else: + self.server_mode = True + self.state = STATE_INITIAL + self.buf = b"" + self.supportedAuthMechs = [AUTHMECH_ANON] + self.supportedAddrs = [ADDR_DOMAINNAME] + self.enabledCommands = [CMD_CONNECT] + self.peersock = None + self.addressType = 0 + self.requestType = 0 + self._stream_object = None + self.active = False # set to True when protocol is actually used for transfer + # used by factories to know when the finished Deferred can be triggered + + @property + def stream_object(self): + if self._stream_object is None: + self._stream_object = self.getSession()["stream_object"] + if self.server_mode: + self._stream_object.registerProducer(self.transport, True) + return self._stream_object + + def getSession(self): + """Return session associated with this candidate + + @return (dict): session data + """ + if self.server_mode: + return self.factory.getSession(self._session_hash) + else: + return self.factory.getSession() + + def _start_negotiation(self): + log.debug("starting negotiation (client mode)") + self.state = STATE_CLIENT_AUTH + self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) + + def _parse_negotiation(self): + try: + # Parse out data + ver, nmethod = struct.unpack("!BB", self.buf[:2]) + methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) + + # Ensure version is correct + if ver != 5: + self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + return + + # Trim off front of the buffer + self.buf = self.buf[nmethod + 2 :] + + # Check for supported auth mechs + for m in self.supportedAuthMechs: + if m in methods: + # Update internal state, according to selected method + if m == AUTHMECH_ANON: + self.state = STATE_REQUEST + elif m == AUTHMECH_USERPASS: + self.state = STATE_AUTH_USERPASS + # Complete negotiation w/ this method + self.transport.write(struct.pack("!BB", SOCKS5_VER, m)) + return + + # No supported mechs found, notify client and close the connection + log.warning("Unsupported authentication mechanism") + self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + except struct.error: + pass + + def _parse_user_pass(self): + try: + # Parse out data + ver, ulen = struct.unpack("BB", self.buf[:2]) + uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) + plen, = struct.unpack("B", self.buf[ulen + 2]) + password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) + # Trim off fron of the buffer + self.buf = self.buf[3 + ulen + plen :] + # Fire event to authenticate user + if self.authenticate_user_pass(uname, password): + # Signal success + self.state = STATE_REQUEST + self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) + else: + # Signal failure + self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) + self.transport.loseConnection() + except struct.error: + pass + + def send_error_reply(self, errorcode): + # Any other address types are not supported + result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) + self.transport.write(result) + self.transport.loseConnection() + + def _parseRequest(self): + try: + # Parse out data and trim buffer accordingly + ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) + + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack("!IH", self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = self.buf[4] + addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr) :] + else: + # Any other address types are not supported + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure command is supported + if cmd not in self.enabledCommands: + # Send a not supported error + self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) + return + + # Process the command + if cmd == CMD_CONNECT: + self.connect_requested(addr, port) + elif cmd == CMD_BIND: + self.bind_requested(addr, port) + else: + # Any other command is not supported + self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) + + except struct.error: + # The buffer is probably not complete, we need to wait more + return None + + def _make_request(self): + hash_ = self._session_hash.encode('utf-8') + request = struct.pack( + "!5B%dsH" % len(hash_), + SOCKS5_VER, + CMD_CONNECT, + 0, + ADDR_DOMAINNAME, + len(hash_), + hash_, + 0, + ) + self.transport.write(request) + self.state = STATE_CLIENT_REQUEST + + def _parse_request_reply(self): + try: + ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack("!IH", self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = self.buf[4] + addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr) :] + else: + # Any other address types are not supported + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure reply is OK + if rep != REPLY_SUCCESS: + self.loseConnection() + return + + self.state = STATE_READY + self.connection.callback(None) + + except struct.error: + # The buffer is probably not complete, we need to wait more + return None + + def connectionMade(self): + log.debug( + "Socks5 connectionMade (mode = {})".format( + "server" if self.state == STATE_INITIAL else "client" + ) + ) + if self.state == STATE_CLIENT_INITIAL: + self._start_negotiation() + + def connect_requested(self, addr, port): + # Check that this session is expected + if not self.factory.add_to_session(addr.decode('utf-8'), self): + log.warning( + "Unexpected connection request received from {host}".format( + host=self.transport.getPeer().host + ) + ) + self.send_error_reply(REPLY_CONN_REFUSED) + return + self._session_hash = addr.decode('utf-8') + self.connect_completed(addr, 0) + + def start_transfer(self, chunk_size): + """Callback called when the result iq is received + + @param chunk_size(None, int): size of the buffer, or None for default + """ + self.active = True + if chunk_size is not None: + self.CHUNK_SIZE = chunk_size + log.debug("Starting file transfer") + d = self.stream_object.start_stream(self.transport) + d.addCallback(self.stream_finished) + + def stream_finished(self, d): + log.info(_("File transfer completed, closing connection")) + self.transport.loseConnection() + + def connect_completed(self, remotehost, remoteport): + if self.addressType == ADDR_IPV4: + result = struct.pack( + "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport + ) + elif self.addressType == ADDR_DOMAINNAME: + result = struct.pack( + "!BBBBB%dsH" % len(remotehost), + SOCKS5_VER, + REPLY_SUCCESS, + 0, + ADDR_DOMAINNAME, + len(remotehost), + remotehost, + remoteport, + ) + self.transport.write(result) + self.state = STATE_READY + + def bind_requested(self, addr, port): + pass + + def authenticate_user_pass(self, user, passwd): + # FIXME: implement authentication and remove the debug printing a password + log.debug("User/pass: %s/%s" % (user, passwd)) + return True + + def dataReceived(self, buf): + if self.state == STATE_READY: + # Everything is set, we just have to write the incoming data + self.stream_object.write(buf) + if not self.active: + self.active = True + self.getSession()[TIMER_KEY].cancel() + return + + self.buf = self.buf + buf + if self.state == STATE_INITIAL: + self._parse_negotiation() + if self.state == STATE_AUTH_USERPASS: + self._parse_user_pass() + if self.state == STATE_REQUEST: + self._parseRequest() + if self.state == STATE_CLIENT_REQUEST: + self._parse_request_reply() + if self.state == STATE_CLIENT_AUTH: + ver, method = struct.unpack("!BB", buf) + self.buf = self.buf[2:] + if ver != SOCKS5_VER or method != AUTHMECH_ANON: + self.transport.loseConnection() + else: + self._make_request() + + def connectionLost(self, reason): + log.debug("Socks5 connection lost: {}".format(reason.value)) + if self.state != STATE_READY: + self.connection.errback(reason) + if self.server_mode: + try: + session_hash = self._session_hash + except AttributeError: + log.debug("no session has been received yet") + else: + self.factory.remove_from_session(session_hash, self, reason) + + +class Socks5ServerFactory(protocol.ServerFactory): + protocol = SOCKSv5 + + def __init__(self, parent): + """ + @param parent(XEP_0065): XEP_0065 parent instance + """ + self.parent = parent + + def getSession(self, session_hash): + return self.parent.getSession(None, session_hash) + + def start_transfer(self, session_hash, chunk_size=None): + session = self.getSession(session_hash) + try: + protocol = session["protocols"][0] + except (KeyError, IndexError): + log.error("Can't start file transfer, can't find protocol") + else: + session[TIMER_KEY].cancel() + protocol.start_transfer(chunk_size) + + def add_to_session(self, session_hash, protocol): + """Check is session_hash is valid, and associate protocol with it + + the session will be associated to the corresponding candidate + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param return(bool): True if hash was valid (i.e. expected), False else + """ + assert isinstance(session_hash, str) + try: + session_data = self.getSession(session_hash) + except KeyError: + return False + else: + session_data.setdefault("protocols", []).append(protocol) + return True + + def remove_from_session(self, session_hash, protocol, reason): + """Remove a protocol from session_data + + There can be several protocol instances while candidates are tried, they + have removed when candidate connection is closed + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param reason(failure.Failure): reason of the removal + """ + try: + protocols = self.getSession(session_hash)["protocols"] + protocols.remove(protocol) + except (KeyError, ValueError): + log.error("Protocol not found in session while it should be there") + else: + if protocol.active: + # The active protocol has been removed, session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession(session_hash)[DEFER_KEY].callback(None) + else: + self.getSession(session_hash)[DEFER_KEY].errback(reason) + + +class Socks5ClientFactory(protocol.ClientFactory): + protocol = SOCKSv5 + + def __init__(self, client, parent, session, session_hash): + """Init the Client Factory + + @param session(dict): session data + @param session_hash(unicode): hash used for peer_connection + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + """ + self.session = session + self.session_hash = session_hash + self.client = client + self.connection = defer.Deferred() + self._protocol_instance = None + self.connector = None + + def discard(self): + """Disconnect the client + + Also set a discarded flag, which avoid to call the session Deferred + """ + self.connector.disconnect() + + def getSession(self): + return self.session + + def start_transfer(self, __=None, chunk_size=None): + self.session[TIMER_KEY].cancel() + self._protocol_instance.start_transfer(chunk_size) + + def clientConnectionFailed(self, connector, reason): + log.debug("Connection failed") + self.connection.errback(reason) + + def clientConnectionLost(self, connector, reason): + log.debug(_("Socks 5 client connection lost (reason: %s)") % reason.value) + if self._protocol_instance.active: + # This one was used for the transfer, than mean that + # the Socks5 session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession()[DEFER_KEY].callback(None) + else: + self.getSession()[DEFER_KEY].errback(reason) + self._protocol_instance = None + + def buildProtocol(self, addr): + log.debug(("Socks 5 client connection started")) + p = self.protocol(session_hash=self.session_hash) + p.factory = self + p.connection.chainDeferred(self.connection) + self._protocol_instance = p + return p + + +class XEP_0065(object): + NAMESPACE = NS_BS + TYPE_DIRECT = "direct" + TYPE_ASSISTED = "assisted" + TYPE_TUNEL = "tunel" + TYPE_PROXY = "proxy" + Candidate = Candidate + + def __init__(self, host): + log.info(_("Plugin XEP_0065 initialization")) + self.host = host + + # session data + self.hash_clients_map = {} # key: hash of the transfer session, value: session data + self._cache_proxies = {} # key: server jid, value: proxy data + + # misc data + self._server_factory = None + self._external_port = None + + # plugins shortcuts + self._ip = self.host.plugins["IP"] + try: + self._np = self.host.plugins["NAT-PORT"] + except KeyError: + log.debug("NAT Port plugin not available") + self._np = None + + # parameters + # XXX: params are not used for now, but they may be used in the futur to force proxy/IP + # host.memory.update_params(PARAMS) + + def get_handler(self, client): + return XEP_0065_handler(self) + + def profile_connected(self, client): + client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) + client._s5b_sessions = {} + + def get_session_hash(self, from_jid, to_jid, sid): + return get_session_hash(from_jid, to_jid, sid) + + def get_socks_5_server_factory(self): + """Return server factory + + The server is created if it doesn't exists yet + self._server_factory_port is set on server creation + """ + + if self._server_factory is None: + self._server_factory = Socks5ServerFactory(self) + for port in range(SERVER_STARTING_PORT, 65356): + try: + listening_port = reactor.listenTCP(port, self._server_factory) + except internet_error.CannotListenError as e: + log.debug( + "Cannot listen on port {port}: {err_msg}{err_num}".format( + port=port, + err_msg=e.socketError.strerror, + err_num=" (error code: {})".format(e.socketError.errno), + ) + ) + else: + self._server_factory_port = listening_port.getHost().port + break + + log.info( + _("Socks5 Stream server launched on port {}").format( + self._server_factory_port + ) + ) + return self._server_factory + + @defer.inlineCallbacks + def get_proxy(self, client, local_jid): + """Return the proxy available for this profile + + cache is used between clients using the same server + @param local_jid(jid.JID): same as for [get_candidates] + @return ((D)(ProxyInfos, None)): Found proxy infos, + or None if not acceptable proxy is found + @raise exceptions.NotFound: no Proxy found + """ + + def notFound(server): + log.info("No proxy found on this server") + self._cache_proxies[server] = None + raise exceptions.NotFound + + server = client.host if client.is_component else client.jid.host + try: + defer.returnValue(self._cache_proxies[server]) + except KeyError: + pass + try: + proxy = ( + yield self.host.find_service_entities(client, "proxy", "bytestreams") + ).pop() + except (defer.CancelledError, StopIteration, KeyError): + notFound(server) + iq_elt = client.IQ("get") + iq_elt["from"] = local_jid.full() + iq_elt["to"] = proxy.full() + iq_elt.addElement((NS_BS, "query")) + + try: + result_elt = yield iq_elt.send() + except jabber_error.StanzaError as failure: + log.warning( + "Error while requesting proxy info on {jid}: {error}".format( + jid=proxy.full(), error=failure + ) + ) + notFound(server) + + try: + query_elt = next(result_elt.elements(NS_BS, "query")) + streamhost_elt = next(query_elt.elements(NS_BS, "streamhost")) + host = streamhost_elt["host"] + jid_ = streamhost_elt["jid"] + port = streamhost_elt["port"] + if not all((host, jid, port)): + raise KeyError + jid_ = jid.JID(jid_) + except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): + log.warning("Invalid proxy data received from {}".format(proxy.full())) + notFound(server) + + proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) + log.info("Proxy found: {}".format(proxy_infos)) + defer.returnValue(proxy_infos) + + @defer.inlineCallbacks + def _get_network_data(self, client): + """Retrieve information about network + + @param client: %(doc_client)s + @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data + """ + self.get_socks_5_server_factory() + local_port = self._server_factory_port + external_ip = yield self._ip.get_external_ip(client) + local_ips = yield self._ip.get_local_i_ps(client) + + if external_ip is not None and self._external_port is None: + if external_ip != local_ips[0]: + log.info("We are probably behind a NAT") + if self._np is None: + log.warning("NAT port plugin not available, we can't map port") + else: + ext_port = yield self._np.map_port( + local_port, desc="SaT socks5 stream" + ) + if ext_port is None: + log.warning("Can't map NAT port") + else: + self._external_port = ext_port + + defer.returnValue((local_port, self._external_port, local_ips, external_ip)) + + @defer.inlineCallbacks + def get_candidates(self, client, local_jid): + """Return a list of our stream candidates + + @param local_jid(jid.JID): jid to use as local jid + This is needed for client which can be addressed with a different jid than + client.jid if a local part is used (e.g. piotr@file.example.net where + client.jid would be file.example.net) + @return (D(list[Candidate])): list of candidates, ordered by priority + """ + server_factory = yield self.get_socks_5_server_factory() + local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client) + try: + proxy = yield self.get_proxy(client, local_jid) + except exceptions.NotFound: + proxy = None + + # its time to gather the candidates + candidates = [] + + # first the direct ones + + # the preferred direct connection + ip = local_ips.pop(0) + candidates.append( + Candidate( + ip, + local_port, + XEP_0065.TYPE_DIRECT, + PRIORITY_BEST_DIRECT, + local_jid, + priority_local=True, + factory=server_factory, + ) + ) + for ip in local_ips: + candidates.append( + Candidate( + ip, + local_port, + XEP_0065.TYPE_DIRECT, + PRIORITY_DIRECT, + local_jid, + priority_local=True, + factory=server_factory, + ) + ) + + # then the assisted one + if ext_port is not None: + candidates.append( + Candidate( + external_ip, + ext_port, + XEP_0065.TYPE_ASSISTED, + PRIORITY_ASSISTED, + local_jid, + priority_local=True, + factory=server_factory, + ) + ) + + # finally the proxy + if proxy: + candidates.append( + Candidate( + proxy.host, + proxy.port, + XEP_0065.TYPE_PROXY, + PRIORITY_PROXY, + proxy.jid, + priority_local=True, + ) + ) + + # should be already sorted, but just in case the priorities get weird + candidates.sort(key=lambda c: c.priority, reverse=True) + defer.returnValue(candidates) + + def _add_connector(self, connector, candidate): + """Add connector used to connect to candidate, and return client factory's connection Deferred + + the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion + @param connector: a connector implementing IConnector + @param candidate(Candidate): candidate linked to the connector + @return (D): Deferred fired when factory connection is done or has failed + """ + candidate.factory.connector = connector + return candidate.factory.connection + + def connect_candidate( + self, client, candidate, session_hash, peer_session_hash=None, delay=None + ): + """Connect to a candidate + + Connection will be done with a Socks5ClientFactory + @param candidate(Candidate): candidate to connect to + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param peer_session_hash(unicode, None): hash used with the peer + None to use session_hash. + None must be used in 2 cases: + - when XEP-0065 is used with XEP-0096 + - when a peer connect to a proxy *he proposed himself* + in practice, peer_session_hash is only used by try_candidates + @param delay(None, float): optional delay to wait before connection, in seconds + @return (D): Deferred launched when TCP connection + Socks5 connection is done + """ + if peer_session_hash is None: + # for XEP-0065, only one hash is needed + peer_session_hash = session_hash + session = self.getSession(client, session_hash) + factory = Socks5ClientFactory(client, self, session, peer_session_hash) + candidate.factory = factory + if delay is None: + d = defer.succeed(candidate.host) + else: + d = sat_defer.DelayedDeferred(delay, candidate.host) + d.addCallback(reactor.connectTCP, candidate.port, factory) + d.addCallback(self._add_connector, candidate) + return d + + def try_candidates( + self, + client, + candidates, + session_hash, + peer_session_hash, + connection_cb=None, + connection_eb=None, + ): + defers_list = [] + + for candidate in candidates: + delay = CANDIDATE_DELAY * len(defers_list) + if candidate.type == XEP_0065.TYPE_PROXY: + delay += CANDIDATE_DELAY_PROXY + d = self.connect_candidate( + client, candidate, session_hash, peer_session_hash, delay + ) + if connection_cb is not None: + d.addCallback( + lambda __, candidate=candidate, client=client: connection_cb( + client, candidate + ) + ) + if connection_eb is not None: + d.addErrback(connection_eb, client, candidate) + defers_list.append(d) + + return defers_list + + def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None): + """Get best candidate (according to priority) which can connect + + @param candidates(iterable[Candidate]): candidates to test + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param peer_session_hash(unicode, None): hash of the other peer + only useful for XEP-0260, must be None for XEP-0065 streamhost candidates + @return (D(None, Candidate)): best candidate or None if none can connect + """ + defer_candidates = None + + def connection_cb(client, candidate): + log.info("Connection of {} successful".format(str(candidate))) + for idx, other_candidate in enumerate(candidates): + try: + if other_candidate.priority < candidate.priority: + log.debug("Cancelling {}".format(other_candidate)) + defer_candidates[idx].cancel() + except AttributeError: + assert other_candidate is None + + def connection_eb(failure, client, candidate): + if failure.check(defer.CancelledError): + log.debug("Connection of {} has been cancelled".format(candidate)) + else: + log.info( + "Connection of {candidate} Failed: {error}".format( + candidate=candidate, error=failure.value + ) + ) + candidates[candidates.index(candidate)] = None + + def all_tested(__): + log.debug("All candidates have been tested") + good_candidates = [c for c in candidates if c] + return good_candidates[0] if good_candidates else None + + defer_candidates = self.try_candidates( + client, + candidates, + session_hash, + peer_session_hash, + connection_cb, + connection_eb, + ) + d_list = defer.DeferredList(defer_candidates) + d_list.addCallback(all_tested) + return d_list + + def _time_out(self, session_hash, client): + """Called when stream was not started quickly enough + + @param session_hash(str): hash as returned by get_session_hash + @param client: %(doc_client)s + """ + log.info("Socks5 Bytestream: TimeOut reached") + session = self.getSession(client, session_hash) + session[DEFER_KEY].errback(exceptions.TimeOutError()) + + def kill_session(self, failure_, session_hash, sid, client): + """Clean the current session + + @param session_hash(str): hash as returned by get_session_hash + @param sid(None, unicode): session id + or None if self.xep_0065_sid_session was not used + @param client: %(doc_client)s + @param failure_(None, failure.Failure): None if eveything was fine, a failure else + @return (None, failure.Failure): failure_ is returned + """ + log.debug( + "Cleaning session with hash {hash}{id}: {reason}".format( + hash=session_hash, + reason="" if failure_ is None else failure_.value, + id="" if sid is None else " (id: {})".format(sid), + ) + ) + + try: + assert self.hash_clients_map[session_hash] == client + del self.hash_clients_map[session_hash] + except KeyError: + pass + + if sid is not None: + try: + del client.xep_0065_sid_session[sid] + except KeyError: + log.warning("Session id {} is unknown".format(sid)) + + try: + session_data = client._s5b_sessions[session_hash] + except KeyError: + log.warning("There is no session with this hash") + return + else: + del client._s5b_sessions[session_hash] + + try: + session_data["timer"].cancel() + except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): + pass + + return failure_ + + def start_stream(self, client, stream_object, local_jid, to_jid, sid): + """Launch the stream workflow + + @param streamProducer: stream_object to use + @param local_jid(jid.JID): same as for [get_candidates] + @param to_jid: JID of the recipient + @param sid: Stream session id + @param successCb: method to call when stream successfuly finished + @param failureCb: method to call when something goes wrong + @return (D): Deferred fired when session is finished + """ + session_data = self._create_session( + client, stream_object, local_jid, to_jid, sid, True) + + session_data[client] = client + + def got_candidates(candidates): + session_data["candidates"] = candidates + iq_elt = client.IQ() + iq_elt["from"] = local_jid.full() + iq_elt["to"] = to_jid.full() + query_elt = iq_elt.addElement((NS_BS, "query")) + query_elt["mode"] = "tcp" + query_elt["sid"] = sid + + for candidate in candidates: + streamhost = query_elt.addElement("streamhost") + streamhost["host"] = candidate.host + streamhost["port"] = str(candidate.port) + streamhost["jid"] = candidate.jid.full() + log.debug("Candidate proposed: {}".format(candidate)) + + d = iq_elt.send() + args = [client, session_data, local_jid] + d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args) + + self.get_candidates(client, local_jid).addCallback(got_candidates) + return session_data[DEFER_KEY] + + def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid): + """Called when the result of open iq is received + + @param session_data(dict): data of the session + @param client: %(doc_client)s + @param iq_elt(domish.Element): <iq> result + """ + try: + query_elt = next(iq_elt.elements(NS_BS, "query")) + streamhost_used_elt = next(query_elt.elements(NS_BS, "streamhost-used")) + except StopIteration: + log.warning("No streamhost found in stream query") + # FIXME: must clean session + return + + streamhost_jid = jid.JID(streamhost_used_elt["jid"]) + try: + candidate = next(( + c for c in session_data["candidates"] if c.jid == streamhost_jid + )) + except StopIteration: + log.warning( + "Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()) + ) + return + else: + log.info("Candidate choosed by target: {}".format(candidate)) + + if candidate.type == XEP_0065.TYPE_PROXY: + log.info("A Socks5 proxy is used") + d = self.connect_candidate(client, candidate, session_data["hash"]) + d.addCallback( + lambda __: candidate.activate( + client, session_data["id"], session_data["peer_jid"], local_jid + ) + ) + d.addErrback(self._activation_eb) + else: + d = defer.succeed(None) + + d.addCallback(lambda __: candidate.start_transfer(session_data["hash"])) + + def _activation_eb(self, failure): + log.warning("Proxy activation error: {}".format(failure.value)) + + def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid): + log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) + # FIXME: must clean session + + def create_session(self, *args, **kwargs): + """like [_create_session] but return the session deferred instead of the whole session + + session deferred is fired when transfer is finished + """ + return self._create_session(*args, **kwargs)[DEFER_KEY] + + def _create_session(self, client, stream_object, local_jid, to_jid, sid, + requester=False): + """Called when a bytestream is imminent + + @param stream_object(iface.IStreamProducer): File object where data will be + written + @param to_jid(jid.JId): jid of the other peer + @param sid(unicode): session id + @param initiator(bool): if True, this session is create by initiator + @return (dict): session data + """ + if sid in client.xep_0065_sid_session: + raise exceptions.ConflictError("A session with this id already exists !") + if requester: + session_hash = get_session_hash(local_jid, to_jid, sid) + session_data = self._register_hash(client, session_hash, stream_object) + else: + session_hash = get_session_hash(to_jid, local_jid, sid) + session_d = defer.Deferred() + session_d.addBoth(self.kill_session, session_hash, sid, client) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater( + TIMEOUT, self._time_out, session_hash, client + ), + } + client.xep_0065_sid_session[sid] = session_data + session_data.update( + { + "id": sid, + "local_jid": local_jid, + "peer_jid": to_jid, + "stream_object": stream_object, + "hash": session_hash, + } + ) + + return session_data + + def getSession(self, client, session_hash): + """Return session data + + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param client(None, SatXMPPClient): client of the peer + None is used only if client is unknown (this is only the case + for incoming request received by Socks5ServerFactory). None must + only be used by Socks5ServerFactory. + See comments below for details + @return (dict): session data + """ + assert isinstance(session_hash, str) + if client is None: + try: + client = self.hash_clients_map[session_hash] + except KeyError as e: + log.warning("The requested session doesn't exists !") + raise e + return client._s5b_sessions[session_hash] + + def register_hash(self, *args, **kwargs): + """like [_register_hash] but return the session deferred instead of the whole session + session deferred is fired when transfer is finished + """ + return self._register_hash(*args, **kwargs)[DEFER_KEY] + + def _register_hash(self, client, session_hash, stream_object): + """Create a session_data associated to hash + + @param session_hash(str): hash of the session + @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object + None if it will be filled later + return (dict): session data + """ + assert session_hash not in client._s5b_sessions + session_d = defer.Deferred() + session_d.addBoth(self.kill_session, session_hash, None, client) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client), + } + + if stream_object is not None: + session_data["stream_object"] = stream_object + + assert session_hash not in self.hash_clients_map + self.hash_clients_map[session_hash] = client + + return session_data + + def associate_stream_object(self, client, session_hash, stream_object): + """Associate a stream object with a session""" + session_data = self.getSession(client, session_hash) + assert "stream_object" not in session_data + session_data["stream_object"] = stream_object + + def stream_query(self, iq_elt, client): + log.debug("BS stream query") + + iq_elt.handled = True + + query_elt = next(iq_elt.elements(NS_BS, "query")) + try: + sid = query_elt["sid"] + except KeyError: + log.warning("Invalid bystreams request received") + return client.sendError(iq_elt, "bad-request") + + streamhost_elts = list(query_elt.elements(NS_BS, "streamhost")) + if not streamhost_elts: + return client.sendError(iq_elt, "bad-request") + + try: + session_data = client.xep_0065_sid_session[sid] + except KeyError: + log.warning("Ignoring unexpected BS transfer: {}".format(sid)) + return client.sendError(iq_elt, "not-acceptable") + + peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) + + candidates = [] + nb_sh = len(streamhost_elts) + for idx, sh_elt in enumerate(streamhost_elts): + try: + host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"]) + except KeyError: + log.warning("malformed streamhost element") + return client.sendError(iq_elt, "bad-request") + priority = nb_sh - idx + if jid_.userhostJID() != peer_jid.userhostJID(): + type_ = XEP_0065.TYPE_PROXY + else: + type_ = XEP_0065.TYPE_DIRECT + candidates.append(Candidate(host, port, type_, priority, jid_)) + + for candidate in candidates: + log.info("Candidate proposed: {}".format(candidate)) + + d = self.get_best_candidate(client, candidates, session_data["hash"]) + d.addCallback(self._ack_stream, iq_elt, session_data, client) + + def _ack_stream(self, candidate, iq_elt, session_data, client): + if candidate is None: + log.info("No streamhost candidate worked, we have to end negotiation") + return client.sendError(iq_elt, "item-not-found") + log.info("We choose: {}".format(candidate)) + result_elt = xmlstream.toResponse(iq_elt, "result") + query_elt = result_elt.addElement((NS_BS, "query")) + query_elt["sid"] = session_data["id"] + streamhost_used_elt = query_elt.addElement("streamhost-used") + streamhost_used_elt["jid"] = candidate.jid.full() + client.send(result_elt) + + +@implementer(iwokkel.IDisco) +class XEP_0065_handler(xmlstream.XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + + def connectionInitialized(self): + self.xmlstream.addObserver( + BS_REQUEST, self.plugin_parent.stream_query, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_BS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []