Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0065.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/plugins/plugin_xep_0065.py@7ad5f2c4e34a |
children | 56f94936df1e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0065.py Mon Apr 02 19:44:50 2018 +0200 @@ -0,0 +1,1258 @@ +#!/usr/bin/env python2 +#-*- coding: utf-8 -*- + +# SAT plugin for managing xep-0065 + +# Copyright (C) +# 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) +# 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) +# 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# -- + +# This module is based on proxy65 (http://code.google.com/p/proxy65), +# originaly written by David Smith and modified by Fabio Forno. +# It is sublicensed under AGPL v3 (or any later version) as allowed by the original +# license. + +# -- + +# Here is a copy of the original license: + +# Copyright (C) +# 2002-2004 Dave Smith (dizzyd@jabber.org) +# 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.tools import sat_defer +from twisted.internet import protocol +from twisted.internet import reactor +from twisted.internet import error as internet_error +from twisted.words.protocols.jabber import error as jabber_error +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import xmlstream +from twisted.internet import defer +from collections import namedtuple +import struct +import hashlib +import uuid + +from zope.interface import implements + +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler + +from wokkel import disco, iwokkel + + +PLUGIN_INFO = { + C.PI_NAME: "XEP 0065 Plugin", + C.PI_IMPORT_NAME: "XEP-0065", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0065"], + C.PI_DEPENDENCIES: ["IP"], + C.PI_RECOMMENDATIONS: ["NAT-PORT"], + C.PI_MAIN: "XEP_0065", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""") +} + +IQ_SET = '/iq[@type="set"]' +NS_BS = 'http://jabber.org/protocol/bytestreams' +BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' +TIMER_KEY = 'timer' +DEFER_KEY = 'finished' # key of the deferred used to track session end +SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) + +# priorities are candidates local priorities, must be a int between 0 and 65535 +PRIORITY_BEST_DIRECT = 10000 +PRIORITY_DIRECT = 5000 +PRIORITY_ASSISTED = 1000 +PRIORITY_PROXY = 0.2 # proxy is the last option for s5b +CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 +CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) + +TIMEOUT = 300 # maxium time between session creation and stream start + +# XXX: by default eveything is automatic +# TODO: use these params to force use of specific proxy/port/IP +# PARAMS = """ +# <params> +# <general> +# <category name="File Transfer"> +# <param name="Force IP" type="string" /> +# <param name="Force Port" type="int" constraint="1;65535" /> +# </category> +# </general> +# <individual> +# <category name="File Transfer"> +# <param name="Force Proxy" value="" type="string" /> +# <param name="Force Proxy host" value="" type="string" /> +# <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> +# </category> +# </individual> +# </params> +# """ + +(STATE_INITIAL, +STATE_AUTH, +STATE_REQUEST, +STATE_READY, +STATE_AUTH_USERPASS, +STATE_CLIENT_INITIAL, +STATE_CLIENT_AUTH, +STATE_CLIENT_REQUEST, +) = xrange(8) + +SOCKS5_VER = 0x05 + +ADDR_IPV4 = 0x01 +ADDR_DOMAINNAME = 0x03 +ADDR_IPV6 = 0x04 + +CMD_CONNECT = 0x01 +CMD_BIND = 0x02 +CMD_UDPASSOC = 0x03 + +AUTHMECH_ANON = 0x00 +AUTHMECH_USERPASS = 0x02 +AUTHMECH_INVALID = 0xFF + +REPLY_SUCCESS = 0x00 +REPLY_GENERAL_FAILUR = 0x01 +REPLY_CONN_NOT_ALLOWED = 0x02 +REPLY_NETWORK_UNREACHABLE = 0x03 +REPLY_HOST_UNREACHABLE = 0x04 +REPLY_CONN_REFUSED = 0x05 +REPLY_TTL_EXPIRED = 0x06 +REPLY_CMD_NOT_SUPPORTED = 0x07 +REPLY_ADDR_NOT_SUPPORTED = 0x08 + + +ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) + + +class Candidate(object): + + def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): + """ + @param host(unicode): host IP or domain + @param port(int): port + @param type_(unicode): stream type (one of XEP_0065.TYPE_*) + @param priority(int): priority + @param jid_(jid.JID): jid + @param id_(None, id_): Candidate ID, or None to generate + @param priority_local(bool): if True, priority is used as local priority, + else priority is used as global one (and local priority is set to 0) + """ + assert isinstance(jid_, jid.JID) + self.host, self.port, self.type, self.jid = ( + host, int(port), type_, jid_) + self.id = id_ if id_ is not None else unicode(uuid.uuid4()) + if priority_local: + self._local_priority = int(priority) + self._priority = self.calculatePriority() + else: + self._local_priority = 0 + self._priority = int(priority) + self.factory = factory + + def discard(self): + """Disconnect a candidate if it is connected + + Used to disconnect tryed client when they are discarded + """ + log.debug(u"Discarding {}".format(self)) + try: + self.factory.discard() + except AttributeError: + pass # no discard for Socks5ServerFactory + + @property + def local_priority(self): + return self._local_priority + + @property + def priority(self): + return self._priority + + def __str__(self): + # similar to __unicode__ but we don't show jid and we encode id + return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( + self, + id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), + ) + + def __unicode__(self): + return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( + self, + id=u" id={}".format(self.id if self.id is not None else u''), + ) + + def __eq__(self, other): + # self.id is is not used in __eq__ as the same candidate can have + # different ids if proposed by initiator or responder + try: + return (self.host == other.host and + self.port == other.port and + self.jid == other.jid) + except (AttributeError, TypeError): + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def calculatePriority(self): + """Calculate candidate priority according to XEP-0260 §2.2 + + + @return (int): priority + """ + if self.type == XEP_0065.TYPE_DIRECT: + multiplier = 126 + elif self.type == XEP_0065.TYPE_ASSISTED: + multiplier = 120 + elif self.type == XEP_0065.TYPE_TUNEL: + multiplier = 110 + elif self.type == XEP_0065.TYPE_PROXY: + multiplier = 10 + else: + raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) + return 2**16 * multiplier + self._local_priority + + def activate(self, sid, peer_jid, client): + """Activate the proxy candidate + + Send activation request as explained in XEP-0065 § 6.3.5 + Must only be used with proxy candidates + @param sid(unicode): session id (same as for getSessionHash) + @param peer_jid(jid.JID): jid of the other peer + @return (D(domish.Element)): IQ result (or error) + """ + assert self.type == XEP_0065.TYPE_PROXY + iq_elt = client.IQ() + iq_elt['to'] = self.jid.full() + query_elt = iq_elt.addElement((NS_BS, 'query')) + query_elt['sid'] = sid + query_elt.addElement('activate', content=peer_jid.full()) + return iq_elt.send() + + def startTransfer(self, session_hash=None): + if self.type == XEP_0065.TYPE_PROXY: + chunk_size = 4096 # Prosody's proxy reject bigger chunks by default + else: + chunk_size = None + self.factory.startTransfer(session_hash, chunk_size=chunk_size) + + +def getSessionHash(requester_jid, target_jid, sid): + """Calculate SHA1 Hash according to XEP-0065 §5.3.2 + + @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) + @param target_jid(jid.JID): jid of the target + @param sid(unicode): session id + @return (str): hash + """ + return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() + + +class SOCKSv5(protocol.Protocol): + CHUNK_SIZE = 2**16 + + def __init__(self, session_hash=None): + """ + @param session_hash(str): hash of the session + must only be used in client mode + """ + self.connection = defer.Deferred() # called when connection/auth is done + if session_hash is not None: + self.server_mode = False + self._session_hash = session_hash + self.state = STATE_CLIENT_INITIAL + else: + self.server_mode = True + self.state = STATE_INITIAL + self.buf = "" + self.supportedAuthMechs = [AUTHMECH_ANON] + self.supportedAddrs = [ADDR_DOMAINNAME] + self.enabledCommands = [CMD_CONNECT] + self.peersock = None + self.addressType = 0 + self.requestType = 0 + self._stream_object = None + self.active = False # set to True when protocol is actually used for transfer + # used by factories to know when the finished Deferred can be triggered + + @property + def stream_object(self): + if self._stream_object is None: + self._stream_object = self.getSession()['stream_object'] + if self.server_mode: + self._stream_object.registerProducer(self.transport, True) + return self._stream_object + + def getSession(self): + """Return session associated with this candidate + + @return (dict): session data + """ + if self.server_mode: + return self.factory.getSession(self._session_hash) + else: + return self.factory.getSession() + + def _startNegotiation(self): + log.debug("starting negotiation (client mode)") + self.state = STATE_CLIENT_AUTH + self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) + + def _parseNegotiation(self): + try: + # Parse out data + ver, nmethod = struct.unpack('!BB', self.buf[:2]) + methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) + + # Ensure version is correct + if ver != 5: + self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + return + + # Trim off front of the buffer + self.buf = self.buf[nmethod + 2:] + + # Check for supported auth mechs + for m in self.supportedAuthMechs: + if m in methods: + # Update internal state, according to selected method + if m == AUTHMECH_ANON: + self.state = STATE_REQUEST + elif m == AUTHMECH_USERPASS: + self.state = STATE_AUTH_USERPASS + # Complete negotiation w/ this method + self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) + return + + # No supported mechs found, notify client and close the connection + log.warning(u"Unsupported authentication mechanism") + self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + except struct.error: + pass + + def _parseUserPass(self): + try: + # Parse out data + ver, ulen = struct.unpack('BB', self.buf[:2]) + uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) + plen, = struct.unpack('B', self.buf[ulen + 2]) + password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) + # Trim off fron of the buffer + self.buf = self.buf[3 + ulen + plen:] + # Fire event to authenticate user + if self.authenticateUserPass(uname, password): + # Signal success + self.state = STATE_REQUEST + self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) + else: + # Signal failure + self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) + self.transport.loseConnection() + except struct.error: + pass + + def sendErrorReply(self, errorcode): + # Any other address types are not supported + result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) + self.transport.write(result) + self.transport.loseConnection() + + def _parseRequest(self): + try: + # Parse out data and trim buffer accordingly + ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack('!IH', self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = ord(self.buf[4]) + addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr):] + else: + # Any other address types are not supported + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure command is supported + if cmd not in self.enabledCommands: + # Send a not supported error + self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + return + + # Process the command + if cmd == CMD_CONNECT: + self.connectRequested(addr, port) + elif cmd == CMD_BIND: + self.bindRequested(addr, port) + else: + # Any other command is not supported + self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + + except struct.error: + # The buffer is probably not complete, we need to wait more + return None + + def _makeRequest(self): + hash_ = self._session_hash + request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) + self.transport.write(request) + self.state = STATE_CLIENT_REQUEST + + def _parseRequestReply(self): + try: + ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack('!IH', self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = ord(self.buf[4]) + addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr):] + else: + # Any other address types are not supported + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure reply is OK + if rep != REPLY_SUCCESS: + self.loseConnection() + return + + self.state = STATE_READY + self.connection.callback(None) + + except struct.error: + # The buffer is probably not complete, we need to wait more + return None + + def connectionMade(self): + log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) + if self.state == STATE_CLIENT_INITIAL: + self._startNegotiation() + + def connectRequested(self, addr, port): + # Check that this session is expected + if not self.factory.addToSession(addr, self): + self.sendErrorReply(REPLY_CONN_REFUSED) + log.warning(u"Unexpected connection request received from {host}" + .format(host=self.transport.getPeer().host)) + return + self._session_hash = addr + self.connectCompleted(addr, 0) + + def startTransfer(self, chunk_size): + """Callback called when the result iq is received + + @param chunk_size(None, int): size of the buffer, or None for default + """ + self.active = True + if chunk_size is not None: + self.CHUNK_SIZE = chunk_size + log.debug(u"Starting file transfer") + d = self.stream_object.startStream(self.transport) + d.addCallback(self.streamFinished) + + def streamFinished(self, d): + log.info(_("File transfer completed, closing connection")) + self.transport.loseConnection() + + def connectCompleted(self, remotehost, remoteport): + if self.addressType == ADDR_IPV4: + result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) + elif self.addressType == ADDR_DOMAINNAME: + result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, + ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) + self.transport.write(result) + self.state = STATE_READY + + def bindRequested(self, addr, port): + pass + + def authenticateUserPass(self, user, passwd): + # FIXME: implement authentication and remove the debug printing a password + log.debug(u"User/pass: %s/%s" % (user, passwd)) + return True + + def dataReceived(self, buf): + if self.state == STATE_READY: + # Everything is set, we just have to write the incoming data + self.stream_object.write(buf) + if not self.active: + self.active = True + self.getSession()[TIMER_KEY].cancel() + return + + self.buf = self.buf + buf + if self.state == STATE_INITIAL: + self._parseNegotiation() + if self.state == STATE_AUTH_USERPASS: + self._parseUserPass() + if self.state == STATE_REQUEST: + self._parseRequest() + if self.state == STATE_CLIENT_REQUEST: + self._parseRequestReply() + if self.state == STATE_CLIENT_AUTH: + ver, method = struct.unpack('!BB', buf) + self.buf = self.buf[2:] + if ver != SOCKS5_VER or method != AUTHMECH_ANON: + self.transport.loseConnection() + else: + self._makeRequest() + + def connectionLost(self, reason): + log.debug(u"Socks5 connection lost: {}".format(reason.value)) + if self.state != STATE_READY: + self.connection.errback(reason) + if self.server_mode : + self.factory.removeFromSession(self._session_hash, self, reason) + + +class Socks5ServerFactory(protocol.ServerFactory): + protocol = SOCKSv5 + + def __init__(self, parent): + """ + @param parent(XEP_0065): XEP_0065 parent instance + """ + self.parent = parent + + def getSession(self, session_hash): + return self.parent.getSession(None, session_hash) + + def startTransfer(self, session_hash, chunk_size=None): + session = self.getSession(session_hash) + try: + protocol = session['protocols'][0] + except (KeyError, IndexError): + log.error(u"Can't start file transfer, can't find protocol") + else: + session[TIMER_KEY].cancel() + protocol.startTransfer(chunk_size) + + def addToSession(self, session_hash, protocol): + """Check is session_hash is valid, and associate protocol with it + + the session will be associated to the corresponding candidate + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param return(bool): True if hash was valid (i.e. expected), False else + """ + try: + session_data = self.getSession(session_hash) + except KeyError: + return False + else: + session_data.setdefault('protocols', []).append(protocol) + return True + + def removeFromSession(self, session_hash, protocol, reason): + """Remove a protocol from session_data + + There can be several protocol instances while candidates are tried, they + have removed when candidate connection is closed + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param reason(failure.Failure): reason of the removal + """ + try: + protocols = self.getSession(session_hash)['protocols'] + protocols.remove(protocol) + except (KeyError, ValueError): + log.error(u"Protocol not found in session while it should be there") + else: + if protocol.active: + # The active protocol has been removed, session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession(session_hash)[DEFER_KEY].callback(None) + else: + self.getSession(session_hash)[DEFER_KEY].errback(reason) + + +class Socks5ClientFactory(protocol.ClientFactory): + protocol = SOCKSv5 + + def __init__(self, client, parent, session, session_hash): + """Init the Client Factory + + @param session(dict): session data + @param session_hash(unicode): hash used for peer_connection + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + """ + self.session = session + self.session_hash = session_hash + self.client = client + self.connection = defer.Deferred() + self._protocol_instance = None + self.connector = None + + def discard(self): + """Disconnect the client + + Also set a discarded flag, which avoid to call the session Deferred + """ + self.connector.disconnect() + + def getSession(self): + return self.session + + def startTransfer(self, dummy=None, chunk_size=None): + self.session[TIMER_KEY].cancel() + self._protocol_instance.startTransfer(chunk_size) + + def clientConnectionFailed(self, connector, reason): + log.debug(u"Connection failed") + self.connection.errback(reason) + + def clientConnectionLost(self, connector, reason): + log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) + if self._protocol_instance.active: + # This one was used for the transfer, than mean that + # the Socks5 session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession()[DEFER_KEY].callback(None) + else: + self.getSession()[DEFER_KEY].errback(reason) + self._protocol_instance = None + + def buildProtocol(self, addr): + log.debug(("Socks 5 client connection started")) + p = self.protocol(session_hash=self.session_hash) + p.factory = self + p.connection.chainDeferred(self.connection) + self._protocol_instance = p + return p + + +class XEP_0065(object): + NAMESPACE = NS_BS + TYPE_DIRECT = 'direct' + TYPE_ASSISTED = 'assisted' + TYPE_TUNEL = 'tunel' + TYPE_PROXY = 'proxy' + Candidate = Candidate + + def __init__(self, host): + log.info(_("Plugin XEP_0065 initialization")) + self.host = host + + # session data + self.hash_clients_map = {} # key: hash of the transfer session, value: session data + self._cache_proxies = {} # key: server jid, value: proxy data + + # misc data + self._server_factory = None + self._external_port = None + + # plugins shortcuts + self._ip = self.host.plugins['IP'] + try: + self._np = self.host.plugins['NAT-PORT'] + except KeyError: + log.debug(u"NAT Port plugin not available") + self._np = None + + # parameters + # XXX: params are not used for now, but they may be used in the futur to force proxy/IP + # host.memory.updateParams(PARAMS) + + def getHandler(self, client): + return XEP_0065_handler(self) + + def profileConnected(self, client): + client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) + client._s5b_sessions = {} + + def getSessionHash(self, from_jid, to_jid, sid): + return getSessionHash(from_jid, to_jid, sid) + + def getSocks5ServerFactory(self): + """Return server factory + + The server is created if it doesn't exists yet + self._server_factory_port is set on server creation + """ + + if self._server_factory is None: + self._server_factory = Socks5ServerFactory(self) + for port in xrange(SERVER_STARTING_PORT, 65356): + try: + listening_port = reactor.listenTCP(port, self._server_factory) + except internet_error.CannotListenError as e: + log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( + port=port, + err_msg=e.socketError.strerror, + err_num=u' (error code: {})'.format(e.socketError.errno), + )) + else: + self._server_factory_port = listening_port.getHost().port + break + + log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) + return self._server_factory + + @defer.inlineCallbacks + def getProxy(self, client): + """Return the proxy available for this profile + + cache is used between clients using the same server + @return ((D)(ProxyInfos, None)): Found proxy infos, + or None if not acceptable proxy is found + """ + def notFound(server): + log.info(u"No proxy found on this server") + self._cache_proxies[server] = None + defer.returnValue(None) + server = client.jid.host + try: + defer.returnValue(self._cache_proxies[server]) + except KeyError: + pass + try: + proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop() + except (defer.CancelledError, StopIteration, KeyError): + notFound(server) + iq_elt = client.IQ('get') + iq_elt['to'] = proxy.full() + iq_elt.addElement((NS_BS, 'query')) + + try: + result_elt = yield iq_elt.send() + except jabber_error.StanzaError as failure: + log.warning(u"Error while requesting proxy info on {jid}: {error}" + .format(proxy.full(), failure)) + notFound(server) + + try: + query_elt = result_elt.elements(NS_BS, 'query').next() + streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() + host = streamhost_elt['host'] + jid_ = streamhost_elt['jid'] + port = streamhost_elt['port'] + if not all((host, jid, port)): + raise KeyError + jid_ = jid.JID(jid_) + except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): + log.warning(u"Invalid proxy data received from {}".format(proxy.full())) + notFound(server) + + proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) + log.info(u"Proxy found: {}".format(proxy_infos)) + defer.returnValue(proxy_infos) + + @defer.inlineCallbacks + def _getNetworkData(self, client): + """Retrieve information about network + + @param client: %(doc_client)s + @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data + """ + self.getSocks5ServerFactory() + local_port = self._server_factory_port + external_ip = yield self._ip.getExternalIP(client) + local_ips = yield self._ip.getLocalIPs(client) + + if external_ip is not None and self._external_port is None: + if external_ip != local_ips[0]: + log.info(u"We are probably behind a NAT") + if self._np is None: + log.warning(u"NAT port plugin not available, we can't map port") + else: + ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") + if ext_port is None: + log.warning(u"Can't map NAT port") + else: + self._external_port = ext_port + + defer.returnValue((local_port, self._external_port, local_ips, external_ip)) + + @defer.inlineCallbacks + def getCandidates(self, client): + """Return a list of our stream candidates + + @return (D(list[Candidate])): list of candidates, ordered by priority + """ + server_factory = yield self.getSocks5ServerFactory() + local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) + proxy = yield self.getProxy(client) + + # its time to gather the candidates + candidates = [] + + # first the direct ones + + # the preferred direct connection + ip = local_ips.pop(0) + candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) + for ip in local_ips: + candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) + + # then the assisted one + if ext_port is not None: + candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) + + # finally the proxy + if proxy: + candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) + + # should be already sorted, but just in case the priorities get weird + candidates.sort(key=lambda c: c.priority, reverse=True) + defer.returnValue(candidates) + + def _addConnector(self, connector, candidate): + """Add connector used to connect to candidate, and return client factory's connection Deferred + + the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion + @param connector: a connector implementing IConnector + @param candidate(Candidate): candidate linked to the connector + @return (D): Deferred fired when factory connection is done or has failed + """ + candidate.factory.connector = connector + return candidate.factory.connection + + def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): + """Connect to a candidate + + Connection will be done with a Socks5ClientFactory + @param candidate(Candidate): candidate to connect to + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param peer_session_hash(unicode, None): hash used with the peer + None to use session_hash. + None must be used in 2 cases: + - when XEP-0065 is used with XEP-0096 + - when a peer connect to a proxy *he proposed himself* + in practice, peer_session_hash is only used by tryCandidates + @param delay(None, float): optional delay to wait before connection, in seconds + @return (D): Deferred launched when TCP connection + Socks5 connection is done + """ + if peer_session_hash is None: + # for XEP-0065, only one hash is needed + peer_session_hash = session_hash + session = self.getSession(client, session_hash) + factory = Socks5ClientFactory(client, self, session, peer_session_hash) + candidate.factory = factory + if delay is None: + d = defer.succeed(candidate.host) + else: + d = sat_defer.DelayedDeferred(delay, candidate.host) + d.addCallback(reactor.connectTCP, candidate.port, factory) + d.addCallback(self._addConnector, candidate) + return d + + def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): + defers_list = [] + + for candidate in candidates: + delay = CANDIDATE_DELAY * len(defers_list) + if candidate.type == XEP_0065.TYPE_PROXY: + delay += CANDIDATE_DELAY_PROXY + d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) + if connection_cb is not None: + d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) + if connection_eb is not None: + d.addErrback(connection_eb, client, candidate) + defers_list.append(d) + + return defers_list + + def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): + """Get best candidate (according to priority) which can connect + + @param candidates(iterable[Candidate]): candidates to test + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param peer_session_hash(unicode, None): hash of the other peer + only useful for XEP-0260, must be None for XEP-0065 streamhost candidates + @return (D(None, Candidate)): best candidate or None if none can connect + """ + defer_candidates = None + + def connectionCb(client, candidate): + log.info(u"Connection of {} successful".format(unicode(candidate))) + for idx, other_candidate in enumerate(candidates): + try: + if other_candidate.priority < candidate.priority: + log.debug(u"Cancelling {}".format(other_candidate)) + defer_candidates[idx].cancel() + except AttributeError: + assert other_candidate is None + + def connectionEb(failure, client, candidate): + if failure.check(defer.CancelledError): + log.debug(u"Connection of {} has been cancelled".format(candidate)) + else: + log.info(u"Connection of {candidate} Failed: {error}".format( + candidate = candidate, + error = failure.value)) + candidates[candidates.index(candidate)] = None + + def allTested(self): + log.debug(u"All candidates have been tested") + good_candidates = [c for c in candidates if c] + return good_candidates[0] if good_candidates else None + + defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) + d_list = defer.DeferredList(defer_candidates) + d_list.addCallback(allTested) + return d_list + + def _timeOut(self, session_hash, client): + """Called when stream was not started quickly enough + + @param session_hash(str): hash as returned by getSessionHash + @param client: %(doc_client)s + """ + log.info(u"Socks5 Bytestream: TimeOut reached") + session = self.getSession(client, session_hash) + session[DEFER_KEY].errback(exceptions.TimeOutError) + + def killSession(self, failure_, session_hash, sid, client): + """Clean the current session + + @param session_hash(str): hash as returned by getSessionHash + @param sid(None, unicode): session id + or None if self.xep_0065_sid_session was not used + @param client: %(doc_client)s + @param failure_(None, failure.Failure): None if eveything was fine, a failure else + @return (None, failure.Failure): failure_ is returned + """ + log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( + hash=session_hash, + reason='' if failure_ is None else failure_.value, + id='' if sid is None else u' (id: {})'.format(sid), + )) + + try: + assert self.hash_clients_map[session_hash] == client + del self.hash_clients_map[session_hash] + except KeyError: + pass + + if sid is not None: + try: + del client.xep_0065_sid_session[sid] + except KeyError: + log.warning(u"Session id {} is unknown".format(sid)) + + try: + session_data = client._s5b_sessions[session_hash] + except KeyError: + log.warning(u"There is no session with this hash") + return + else: + del client._s5b_sessions[session_hash] + + try: + session_data['timer'].cancel() + except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): + pass + + return failure_ + + def startStream(self, client, stream_object, to_jid, sid): + """Launch the stream workflow + + @param streamProducer: stream_object to use + @param to_jid: JID of the recipient + @param sid: Stream session id + @param successCb: method to call when stream successfuly finished + @param failureCb: method to call when something goes wrong + @return (D): Deferred fired when session is finished + """ + session_data = self._createSession(client, stream_object, to_jid, sid, True) + + session_data[client] = client + + def gotCandidates(candidates): + session_data['candidates'] = candidates + iq_elt = client.IQ() + iq_elt["from"] = client.jid.full() + iq_elt["to"] = to_jid.full() + query_elt = iq_elt.addElement((NS_BS, 'query')) + query_elt['mode'] = 'tcp' + query_elt['sid'] = sid + + for candidate in candidates: + streamhost = query_elt.addElement('streamhost') + streamhost['host'] = candidate.host + streamhost['port'] = str(candidate.port) + streamhost['jid'] = candidate.jid.full() + log.debug(u"Candidate proposed: {}".format(candidate)) + + d = iq_elt.send() + args = [session_data, client] + d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) + + self.getCandidates(client).addCallback(gotCandidates) + return session_data[DEFER_KEY] + + def _IQNegotiationCb(self, iq_elt, session_data, client): + """Called when the result of open iq is received + + @param session_data(dict): data of the session + @param client: %(doc_client)s + @param iq_elt(domish.Element): <iq> result + """ + try: + query_elt = iq_elt.elements(NS_BS, 'query').next() + streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() + except StopIteration: + log.warning(u"No streamhost found in stream query") + # FIXME: must clean session + return + + streamhost_jid = jid.JID(streamhost_used_elt['jid']) + try: + candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() + except StopIteration: + log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) + return + else: + log.info(u"Candidate choosed by target: {}".format(candidate)) + + if candidate.type == XEP_0065.TYPE_PROXY: + log.info(u"A Socks5 proxy is used") + d = self.connectCandidate(client, candidate, session_data['hash']) + d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) + d.addErrback(self._activationEb) + else: + d = defer.succeed(None) + + d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) + + def _activationEb(self, failure): + log.warning(u"Proxy activation error: {}".format(failure.value)) + + def _IQNegotiationEb(self, stanza_err, session_data, client): + log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) + # FIXME: must clean session + + def createSession(self, *args, **kwargs): + """like [_createSession] but return the session deferred instead of the whole session + + session deferred is fired when transfer is finished + """ + return self._createSession(*args, **kwargs)[DEFER_KEY] + + def _createSession(self, client, stream_object, to_jid, sid, requester=False): + """Called when a bytestream is imminent + + @param stream_object(iface.IStreamProducer): File object where data will be written + @param to_jid(jid.JId): jid of the other peer + @param sid(unicode): session id + @param initiator(bool): if True, this session is create by initiator + @return (dict): session data + """ + if sid in client.xep_0065_sid_session: + raise exceptions.ConflictError(u'A session with this id already exists !') + if requester: + session_hash = getSessionHash(client.jid, to_jid, sid) + session_data = self._registerHash(client, session_hash, stream_object) + else: + session_hash = getSessionHash(to_jid, client.jid, sid) + session_d = defer.Deferred() + session_d.addBoth(self.killSession, session_hash, sid, client) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), + } + client.xep_0065_sid_session[sid] = session_data + session_data.update( + {'id': sid, + 'peer_jid': to_jid, + 'stream_object': stream_object, + 'hash': session_hash, + }) + + return session_data + + def getSession(self, client, session_hash): + """Return session data + + @param session_hash(unicode): hash of the session + hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 + @param client(None, SatXMPPClient): client of the peer + None is used only if client is unknown (this is only the case + for incoming request received by Socks5ServerFactory). None must + only be used by Socks5ServerFactory. + See comments below for details + @return (dict): session data + """ + if client is None: + try: + client = self.hash_clients_map[session_hash] + except KeyError as e: + log.warning(u"The requested session doesn't exists !") + raise e + return client._s5b_sessions[session_hash] + + def registerHash(self, *args, **kwargs): + """like [_registerHash] but return the session deferred instead of the whole session + session deferred is fired when transfer is finished + """ + return self._registerHash(*args, **kwargs)[DEFER_KEY] + + def _registerHash(self, client, session_hash, stream_object): + """Create a session_data associated to hash + + @param session_hash(str): hash of the session + @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object + None if it will be filled later + return (dict): session data + """ + assert session_hash not in client._s5b_sessions + session_d = defer.Deferred() + session_d.addBoth(self.killSession, session_hash, None, client) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), + } + + if stream_object is not None: + session_data['stream_object'] = stream_object + + assert session_hash not in self.hash_clients_map + self.hash_clients_map[session_hash] = client + + return session_data + + def associateStreamObject(self, client, session_hash, stream_object): + """Associate a stream object with a session""" + session_data = self.getSession(client, session_hash) + assert 'stream_object' not in session_data + session_data['stream_object'] = stream_object + + def streamQuery(self, iq_elt, client): + log.debug(u"BS stream query") + + iq_elt.handled = True + + query_elt = iq_elt.elements(NS_BS, 'query').next() + try: + sid = query_elt['sid'] + except KeyError: + log.warning(u"Invalid bystreams request received") + return client.sendError(iq_elt, "bad-request") + + streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) + if not streamhost_elts: + return client.sendError(iq_elt, "bad-request") + + try: + session_data = client.xep_0065_sid_session[sid] + except KeyError: + log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) + return client.sendError(iq_elt, 'not-acceptable') + + peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) + + candidates = [] + nb_sh = len(streamhost_elts) + for idx, sh_elt in enumerate(streamhost_elts): + try: + host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) + except KeyError: + log.warning(u"malformed streamhost element") + return client.sendError(iq_elt, "bad-request") + priority = nb_sh - idx + if jid_.userhostJID() != peer_jid.userhostJID(): + type_ = XEP_0065.TYPE_PROXY + else: + type_ = XEP_0065.TYPE_DIRECT + candidates.append(Candidate(host, port, type_, priority, jid_)) + + for candidate in candidates: + log.info(u"Candidate proposed: {}".format(candidate)) + + d = self.getBestCandidate(client, candidates, session_data['hash']) + d.addCallback(self._ackStream, iq_elt, session_data, client) + + def _ackStream(self, candidate, iq_elt, session_data, client): + if candidate is None: + log.info("No streamhost candidate worked, we have to end negotiation") + return client.sendError(iq_elt, 'item-not-found') + log.info(u"We choose: {}".format(candidate)) + result_elt = xmlstream.toResponse(iq_elt, 'result') + query_elt = result_elt.addElement((NS_BS, 'query')) + query_elt['sid'] = session_data['id'] + streamhost_used_elt = query_elt.addElement('streamhost-used') + streamhost_used_elt['jid'] = candidate.jid.full() + client.send(result_elt) + + +class XEP_0065_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + + def connectionInitialized(self): + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_BS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return []