Mercurial > libervia-backend
view src/plugins/plugin_xep_0065.py @ 1567:268fda4236ca
plugins XE0166, XEP-0234, XEP-0260, XEP-0261: renamed session key managing other peer's jid to "peer_jid" instead of "to_jid"
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 08 Nov 2015 14:44:33 +0100 |
parents | 7cc29634b6ef |
children | 44854fb5d3b2 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- # SAT plugin for managing xep-0065 # Copyright (C) # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) # 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # -- # This module is based on proxy65 (http://code.google.com/p/proxy65), # originaly written by David Smith and modified by Fabio Forno. # It is sublicensed under AGPL v3 (or any later version) as allowed by the original # license. # -- # Here is a copy of the original license: # Copyright (C) # 2002-2004 Dave Smith (dizzyd@jabber.org) # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) from sat.core.constants import Const as C from sat.core import exceptions from sat.tools import sat_defer from twisted.internet import protocol from twisted.internet import reactor from twisted.internet import error as internet_error from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.words.protocols.jabber import error as jabber_error from twisted.protocols.basic import FileSender from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure from sat.core.exceptions import ProfileNotInCacheError from collections import namedtuple import struct import hashlib import uuid from zope.interface import implements try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler from wokkel import disco, iwokkel IQ_SET = '/iq[@type="set"]' NS_BS = 'http://jabber.org/protocol/bytestreams' BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' TIMEOUT = 60 # timeout for workflow DEFER_KEY = 'finished' # key of the deferred used to track session end SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) # priorities are candidates local priorities, must be a int between 0 and 65535 PRIORITY_BEST_DIRECT = 10000 PRIORITY_DIRECT = 5000 PRIORITY_ASSISTED = 1000 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) PLUGIN_INFO = { "name": "XEP 0065 Plugin", "import_name": "XEP-0065", "type": "XEP", "protocols": ["XEP-0065"], "dependencies": ["IP"], "recommendations": ["NAT-PORT"], "main": "XEP_0065", "handler": "yes", "description": _("""Implementation of SOCKS5 Bytestreams""") } # XXX: by default eveything is automatic # TODO: use these params to force use of specific proxy/port/IP # PARAMS = """ # <params> # <general> # <category name="File Transfer"> # <param name="Force IP" type="string" /> # <param name="Force Port" type="int" constraint="1;65535" /> # </category> # </general> # <individual> # <category name="File Transfer"> # <param name="Force Proxy" value="" type="string" /> # <param name="Force Proxy host" value="" type="string" /> # <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> # </category> # </individual> # </params> # """ (STATE_INITIAL, STATE_AUTH, STATE_REQUEST, STATE_READY, STATE_AUTH_USERPASS, STATE_CLIENT_INITIAL, STATE_CLIENT_AUTH, STATE_CLIENT_REQUEST, ) = xrange(8) SOCKS5_VER = 0x05 ADDR_IPV4 = 0x01 ADDR_DOMAINNAME = 0x03 ADDR_IPV6 = 0x04 CMD_CONNECT = 0x01 CMD_BIND = 0x02 CMD_UDPASSOC = 0x03 AUTHMECH_ANON = 0x00 AUTHMECH_USERPASS = 0x02 AUTHMECH_INVALID = 0xFF REPLY_SUCCESS = 0x00 REPLY_GENERAL_FAILUR = 0x01 REPLY_CONN_NOT_ALLOWED = 0x02 REPLY_NETWORK_UNREACHABLE = 0x03 REPLY_HOST_UNREACHABLE = 0x04 REPLY_CONN_REFUSED = 0x05 REPLY_TTL_EXPIRED = 0x06 REPLY_CMD_NOT_SUPPORTED = 0x07 REPLY_ADDR_NOT_SUPPORTED = 0x08 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) class Candidate(object): def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): """ @param host(unicode): host IP or domain @param port(int): port @param type_(unicode): stream type (one of XEP_0065.TYPE_*) @param priority(int): priority @param jid_(jid.JID): jid @param id_(None, id_): Candidate ID, or None to generate @param priority_local(bool): if True, priority is used as local priority, else priority is used as global one (and local priority is set to 0) """ assert isinstance(jid_, jid.JID) self.host, self.port, self.type, self.jid = ( host, int(port), type_, jid_) self.id = id_ if id_ is not None else unicode(uuid.uuid4()) if priority_local: self._local_priority = int(priority) self._priority = self.calculatePriority() else: self._local_priority = 0 self._priority = int(priority) self.factory = factory def discard(self): """Disconnect a candidate if it is connected Used to disconnect tryed client when they are discarded """ log.debug(u"Discarding {}".format(self)) try: self.factory.discard() except AttributeError: pass # no discard for Socks5ServerFactory @property def local_priority(self): return self._local_priority @property def priority(self): return self._priority def __str__(self): # similar to __unicode__ but we don't show jid and we encode id return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( self, id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), ) def __unicode__(self): return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( self, id=u" id={}".format(self.id if self.id is not None else u''), ) def __eq__(self, other): # self.id is is not used in __eq__ as the same candidate can have # different ids if proposed by initiator or responder try: return (self.host == other.host and self.port == other.port and self.jid == other.jid) except (AttributeError, TypeError): return False def __ne__(self, other): return not self.__eq__(other) def calculatePriority(self): """Calculate candidate priority according to XEP-0260 §2.2 @return (int): priority """ if self.type == XEP_0065.TYPE_DIRECT: multiplier = 126 elif self.type == XEP_0065.TYPE_ASSISTED: multiplier = 120 elif self.type == XEP_0065.TYPE_TUNEL: multiplier = 110 elif self.type == XEP_0065.TYPE_PROXY: multiplier = 10 else: raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) return 2**16 * multiplier + self._local_priority def startTransfer(self, session_hash=None): self.factory.startTransfer(session_hash) def getSessionHash(from_jid, to_jid, sid): """Calculate SHA1 Hash according to XEP-0065 §5.3.2 @param from_jid(jid.JID): jid of the requester @param to_jid(jid.JID): jid of the target @param sid(unicode): session id @return (str): hash """ return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() class SOCKSv5(protocol.Protocol, FileSender): def __init__(self, session_hash=None): """ @param session_hash(str): hash of the session must only be used in client mode """ log.debug(_("Protocol init")) self.connection = defer.Deferred() # called when connection/auth is done if session_hash is not None: self.server_mode = False self._session_hash = session_hash self.state = STATE_CLIENT_INITIAL else: self.server_mode = True self.state = STATE_INITIAL self.buf = "" self.supportedAuthMechs = [AUTHMECH_ANON] self.supportedAddrs = [ADDR_DOMAINNAME] self.enabledCommands = [CMD_CONNECT] self.peersock = None self.addressType = 0 self.requestType = 0 self._file_obj = None @property def file_obj(self): if self._file_obj is None: if self.server_mode: self._file_obj = self.factory.getSession(self._session_hash)["file"] else: self._file_obj = self.factory.getSession()['file'] return self._file_obj def _startNegotiation(self): log.debug("starting negotiation (client mode)") self.state = STATE_CLIENT_AUTH self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): log.debug("_parseNegotiation") try: # Parse out data ver, nmethod = struct.unpack('!BB', self.buf[:2]) methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) # Ensure version is correct if ver != 5: self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() return # Trim off front of the buffer self.buf = self.buf[nmethod + 2:] # Check for supported auth mechs for m in self.supportedAuthMechs: if m in methods: # Update internal state, according to selected method if m == AUTHMECH_ANON: self.state = STATE_REQUEST elif m == AUTHMECH_USERPASS: self.state = STATE_AUTH_USERPASS # Complete negotiation w/ this method self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) return # No supported mechs found, notify client and close the connection log.warning(u"Unsupported authentication mechanism") self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() except struct.error: pass def _parseUserPass(self): log.debug("_parseUserPass") try: # Parse out data ver, ulen = struct.unpack('BB', self.buf[:2]) uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) plen, = struct.unpack('B', self.buf[ulen + 2]) password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) # Trim off fron of the buffer self.buf = self.buf[3 + ulen + plen:] # Fire event to authenticate user if self.authenticateUserPass(uname, password): # Signal success self.state = STATE_REQUEST self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) else: # Signal failure self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) self.transport.loseConnection() except struct.error: pass def sendErrorReply(self, errorcode): log.debug("sendErrorReply") # Any other address types are not supported result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) self.transport.loseConnection() def _parseRequest(self): log.debug("_parseRequest") try: # Parse out data and trim buffer accordingly ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure command is supported if cmd not in self.enabledCommands: # Send a not supported error self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) return # Process the command if cmd == CMD_CONNECT: self.connectRequested(addr, port) elif cmd == CMD_BIND: self.bindRequested(addr, port) else: # Any other command is not supported self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) except struct.error: # The buffer is probably not complete, we need to wait more return None def _makeRequest(self): log.debug("_makeRequest") # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) hash_ = self._session_hash request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) self.transport.write(request) self.state = STATE_CLIENT_REQUEST def _parseRequestReply(self): log.debug("_parseRequestReply") try: ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure reply is OK if rep != REPLY_SUCCESS: self.loseConnection() return # if self.factory.proxy: # self.state = STATE_READY # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) # else: self.state = STATE_READY self.connection.callback(None) # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error: # The buffer is probably not complete, we need to wait more return None def connectionMade(self): log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) if self.state == STATE_CLIENT_INITIAL: self._startNegotiation() def connectRequested(self, addr, port): log.debug("connectRequested") # Check that this session is expected if not self.factory.addToSession(addr, self): self.sendErrorReply(REPLY_CONN_REFUSED) log.warning(u"Unexpected connection request received from {host}" .format(host=self.transport.getPeer().host)) return self._session_hash = addr # self.sid, self.profile = self.factory.hash_profiles_map[addr] # client = self.factory.host.getClient(self.profile) # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) def startTransfer(self): """Callback called when the result iq is received""" log.debug(u"Starting file transfer") d = self.beginFileTransfer(self.file_obj, self.transport) d.addCallback(self.fileTransfered) def fileTransfered(self, d): log.info(_("File transfer completed, closing connection")) self.transport.loseConnection() def connectCompleted(self, remotehost, remoteport): log.debug("connectCompleted") if self.addressType == ADDR_IPV4: result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) elif self.addressType == ADDR_DOMAINNAME: result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) self.transport.write(result) self.state = STATE_READY def bindRequested(self, addr, port): pass def authenticateUserPass(self, user, passwd): # FIXME: implement authentication and remove the debug printing a password log.debug(u"User/pass: %s/%s" % (user, passwd)) return True def dataReceived(self, buf): if self.state == STATE_READY: # Everything is set, we just have to write the incoming data self.file_obj.write(buf) return self.buf = self.buf + buf if self.state == STATE_INITIAL: self._parseNegotiation() if self.state == STATE_AUTH_USERPASS: self._parseUserPass() if self.state == STATE_REQUEST: self._parseRequest() if self.state == STATE_CLIENT_REQUEST: self._parseRequestReply() if self.state == STATE_CLIENT_AUTH: ver, method = struct.unpack('!BB', buf) self.buf = self.buf[2:] if ver != SOCKS5_VER or method != AUTHMECH_ANON: self.transport.loseConnection() else: self._makeRequest() def connectionLost(self, reason): log.debug(u"Socks5 connection lost: {}".format(reason.value)) # self.transport.unregisterProducer() # if self.peersock is not None: # self.peersock.peersock = None # self.peersock.transport.unregisterProducer() # self.peersock = None if self.state != STATE_READY: self.connection.errback(reason) if self.server_mode : self.factory.removeFromSession(self._session_hash, self, reason) class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 def __init__(self, parent): """ @param parent(XEP_0065): XEP_0065 parent instance """ self.parent = parent def getSession(self, session_hash): return self.parent.getSession(session_hash, None) def startTransfer(self, session_hash): session = self.getSession(session_hash) try: protocol = session['protocols'][0] except (KeyError, IndexError): log.error(u"Can't start file transfer, can't find protocol") else: protocol.startTransfer() def addToSession(self, session_hash, protocol): """Check is session_hash is valid, and associate protocol with it the session will be associated to the corresponding candidate @param session_hash(str): hash of the session @param protocol(SOCKSv5): protocol instance @param return(bool): True if hash was valid (i.e. expected), False else """ try: session_data = self.getSession(session_hash) except KeyError: return False else: session_data.setdefault('protocols', []).append(protocol) return True def removeFromSession(self, session_hash, protocol, reason): """Remove a protocol from session_data There can be several protocol instances while candidates are tried, they have removed when candidate connection is closed @param session_hash(str): hash of the session @param protocol(SOCKSv5): protocol instance @param reason(failure.Failure): reason of the removal """ try: protocols = self.getSession(session_hash)['protocols'] protocols.remove(protocol) except (KeyError, ValueError): log.error(u"Protocol not found in session while it should be there") else: if not protocols: # The last protocol has been removed, session is finished if reason.check(internet_error.ConnectionDone): self.getSession(session_hash)[DEFER_KEY].callback(None) else: self.getSession(session_hash)[DEFER_KEY].errback(reason) class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): def __init__(self, parent, session_hash, profile): """Init the Client Factory @param session_hash(unicode): hash of the session hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 @param profile(unciode): %(doc_profile)s """ self.session = parent.getSession(session_hash, profile) self.session_hash = session_hash self.profile = profile self.connection = defer.Deferred() self._protocol_instance = None self.connector = None self._discarded = False # self.data = stream_data[sid] # self.sid = sid # self.iq_id = iq_id # self.activateCb = activateCb # self.finishedCb = finishedCb # self.proxy = proxy # self.profile = profile def discard(self): """Disconnect the client Also set a discarded flag, which avoid to call the session Deferred """ self.connector.disconnect() self._discarded = True def getSession(self): return self.session def startTransfer(self, dummy=None): self._protocol_instance.startTransfer() def clientConnectionFailed(self, connector, reason): log.debug(u"Connection failed") self.connection.errback(reason) def clientConnectionLost(self, connector, reason): log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) self._protocol_instance = None if not self._discarded: # This one was used for the transfer, than mean that # the Socks5 session is finished if reason.check(internet_error.ConnectionDone): self.getSession()[DEFER_KEY].callback(None) else: self.getSession()[DEFER_KEY].errback(reason) # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful def buildProtocol(self, addr): log.debug(("Socks 5 client connection started")) p = self.protocol(session_hash=self.session_hash) p.factory = self p.connection.chainDeferred(self.connection) self._protocol_instance = p return p class XEP_0065(object): NAMESPACE = NS_BS TYPE_DIRECT = 'direct' TYPE_ASSISTED = 'assisted' TYPE_TUNEL = 'tunel' TYPE_PROXY = 'proxy' Candidate = Candidate def __init__(self, host): log.info(_("Plugin XEP_0065 initialization")) self.host = host # session data self.hash_profiles_map = {} # key: hash of the transfer session, value: session data self._cache_proxies = {} # key: server jid, value: proxy data # misc data self._server_factory = None self._external_port = None # plugins shortcuts self._ip = self.host.plugins['IP'] try: self._np = self.host.plugins['NAT-PORT'] except KeyError: log.debug(u"NAT Port plugin not available") self._np = None # parameters # XXX: params are not used for now, but they may be used in the futur to force proxy/IP # host.memory.updateParams(PARAMS) def getHandler(self, profile): return XEP_0065_handler(self) def profileConnected(self, profile): client = self.host.getClient(profile) client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) client._s5b_sessions = {} def getSessionHash(self, from_jid, to_jid, sid): return getSessionHash(from_jid, to_jid, sid) def getSocks5ServerFactory(self): """Return server factory The server is created if it doesn't exists yet self._server_factory_port is set on server creation """ if self._server_factory is None: # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client)) self._server_factory = Socks5ServerFactory(self) for port in xrange(SERVER_STARTING_PORT, 65356): try: listening_port = reactor.listenTCP(port, self._server_factory) except internet_error.CannotListenError as e: log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( port=port, err_msg=e.socketError.strerror, err_num=u' (error code: {})'.format(e.socketError.errno), )) else: self._server_factory_port = listening_port.getHost().port break log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) return self._server_factory @defer.inlineCallbacks def getProxy(self, profile): """Return the proxy available for this profile cache is used between profiles using the same server @param profile: %(doc_profile)s @return ((D)(ProxyInfos, None)): Found proxy infos, or None if not acceptable proxy is found """ def notFound(server): log.info(u"No proxy found on this server") self._cache_proxies[server] = None defer.returnValue(None) client = self.host.getClient(profile) server = client.jid.host try: defer.returnValue(self._cache_proxies[server]) except KeyError: pass try: proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() except (exceptions.CancelError, StopIteration): notFound(server) iq_elt = client.IQ('get') iq_elt['to'] = proxy.full() iq_elt.addElement('query', NS_BS) try: result_elt = yield iq_elt.send() except jabber_error.StanzaError as failure: log.warning(u"Error while requesting proxy info on {jid}: {error}" .format(proxy.full(), failure)) notFound(server) try: query_elt = result_elt.elements(NS_BS, 'query').next() streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() host = streamhost_elt['host'] jid_ = streamhost_elt['jid'] port = streamhost_elt['port'] if not all((host, jid, port)): raise KeyError jid_ = jid.JID(jid_) except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat): log.warning(u"Invalid proxy data received from {}".format(proxy.full())) notFound(server) proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) log.info(u"Proxy found: {}".format(proxy_infos)) defer.returnValue(proxy_infos) @defer.inlineCallbacks def _getNetworkData(self, client): """Retrieve information about network @param client: %(doc_client)s @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data """ self.getSocks5ServerFactory() local_port = self._server_factory_port external_ip = yield self._ip.getExternalIP(client.profile) local_ips = yield self._ip.getLocalIPs(client.profile) if not local_ips: log.warning(u"Can't find local IPs, we can't do direct connection") else: if external_ip is not None and self._external_port is None: if external_ip != local_ips[0]: log.info(u"We are probably behind a NAT") if self._np is None: log.warning(u"NAT port plugin not available, we can't map port") else: ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") if ext_port is None: log.warning(u"Can't map NAT port") else: self._external_port = ext_port defer.returnValue((local_port, self._external_port, local_ips, external_ip)) @defer.inlineCallbacks def getCandidates(self, profile): """Return a list of our stream candidates @param profile: %(doc_profile)s @return (D(list[Candidate])): list of candidates, ordered by priority """ client = self.host.getClient(profile) server_factory = yield self.getSocks5ServerFactory() local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) proxy = yield self.getProxy(profile) # its time to gather the candidates candidates = [] # first the direct ones if local_ips: # the preferred direct connection ip = local_ips.pop(0) candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) for ip in local_ips: candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) # then the assisted one if ext_port is not None: candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) # finally the proxy if proxy: candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) # should be already sorted, but just in case the priorities get weird candidates.sort(key=lambda c: c.priority, reverse=True) defer.returnValue(candidates) def _addConnector(self, connector, candidate): """Add connector used to connect to candidate, and return client factory's connection Deferred the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion @param connector: a connector implementing IConnector @param candidate(Candidate): candidate linked to the connector @return (D): Deferred fired when factory connection is done or has failed """ candidate.factory.connector = connector return candidate.factory.connection def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): defers_list = [] for candidate in candidates: factory = Socks5ClientFactory(self, session_hash, profile) candidate.factory = factory delay = CANDIDATE_DELAY * len(defers_list) if candidate.type == XEP_0065.TYPE_PROXY: delay += CANDIDATE_DELAY_PROXY d = sat_defer.DelayedDeferred(delay, candidate.host) d.addCallback(reactor.connectTCP, candidate.port, factory) d.addCallback(self._addConnector, candidate) if connection_cb is not None: d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) if connection_eb is not None: d.addErrback(connection_eb, candidate, profile) defers_list.append(d) return defers_list def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): defer_candidates = None def connectionCb(candidate, profile): log.info(u"Connection of {} successful".format(unicode(candidate))) for idx, other_candidate in enumerate(candidates): try: if other_candidate.priority < candidate.priority: log.debug(u"Cancelling {}".format(other_candidate)) defer_candidates[idx].cancel() except AttributeError: assert other_candidate is None def connectionEb(failure, candidate, profile): if failure.check(defer.CancelledError): log.debug(u"Connection of {} has been cancelled".format(candidate)) else: log.info(u"Connection of {candidate} Failed: {error}".format( candidate = candidate, error = failure.value)) candidates[candidates.index(candidate)] = None def allTested(self): log.debug(u"All candidates have been tested") good_candidates = [c for c in candidates if c] return good_candidates[0] if good_candidates else None defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) d_list = defer.DeferredList(defer_candidates) d_list.addCallback(allTested) return d_list def _timeOut(self, sid, client): """Delecte current_stream id, called after timeout @param id: id of client.xep_0065_current_stream""" log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( sid=sid, profile=client.profile)) self._killSession(sid, client, u"TIMEOUT") def _killSession(self, sid, client, failure_reason=None): """Delete a current_stream id, clean up associated observers @param sid(unicode): session id @param client: %(doc_client)s @param failure_reason(None, unicode): if None the session is successful else, will be used to call failure_cb """ try: session = client.xep_0065_current_stream[sid] except KeyError: log.warning(_("kill id called on a non existant id")) return try: observer_cb = session['observer_cb'] except KeyError: pass else: client.xmlstream.removeObserver(session["event_data"], observer_cb) if session['timer'].active(): session['timer'].cancel() del client.xep_0065_current_stream[sid] # FIXME: to check try: session_hash = session.get['hash'] del self.hash_profiles_map[session_hash] # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc). except KeyError: log.debug(u"Not hash found for this session") pass success = failure_reason is None stream_d = session[DEFER_KEY] if success: stream_d.callback(None) else: stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @param sid: Stream session id @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s """ client = self.host.getClient(profile) session_data = self._createSession(file_obj, to_jid, sid, client.profile) session_data["to"] = to_jid session_data["xmlstream"] = client.xmlstream hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) self.hash_profiles_map[hash_] = (sid, profile) iq_elt = jabber_client.IQ(client.xmlstream, 'set') iq_elt["from"] = client.jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement('query', NS_BS) query_elt['mode'] = 'tcp' query_elt['sid'] = sid #first streamhost: direct connection streamhost = query_elt.addElement('streamhost') streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") streamhost['jid'] = client.jid.full() #second streamhost: mediated connection, using proxy streamhost = query_elt.addElement('streamhost') streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) iq_elt.addCallback(self._IQOpen, session_data, client) iq_elt.send() return session_data[DEFER_KEY] def _IQOpen(self, session_data, client, iq_elt): """Called when the result of open iq is received @param session_data(dict): data of the session @param client: %(doc_client)s @param iq_elt(domish.Element): <iq> result """ sid = session_data['id'] if iq_elt["type"] == "error": log.warning(_("Socks5 transfer failed")) # FIXME: must clean session return try: session_data = client.xep_0065_current_stream[sid] file_obj = session_data["file_obj"] timer = session_data["timer"] except KeyError: raise exceptions.InternalError timer.reset(TIMEOUT) query_elt = iq_elt.elements(NS_BS, 'query').next() streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) if not streamhost_elts: log.warning(_("No streamhost found in stream query")) # FIXME: must clean session return # FIXME: must be cleaned ! streamhost_jid = streamhost_elts[0]['jid'] if streamhost_jid != client.jid.full(): log.debug(_("A proxy server is used")) proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) if proxy_jid != streamhost_jid: log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) return factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) reactor.connectTCP(proxy_host, int(proxy_port), factory) else: session_data["start_transfer_cb"](file_obj) # We now activate the stream def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): log.debug(_("activating stream")) client = self.host.getClient(profile) session_data = client.xep_0065_current_stream[sid] iq_elt = client.IQ(client.xmlstream, 'set') iq_elt["from"] = client.jid.full() iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) query_elt = iq_elt.addElement('query', NS_BS) query_elt['sid'] = sid query_elt.addElement('activate', content=session_data['to'].full()) iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) iq_elt.send() def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): if iq_elt['type'] == 'error': log.warning(_("Can't activate the proxy stream")) return else: start_transfer_cb(file_obj) def createSession(self, *args, **kwargs): """like [_createSession] but return the session deferred instead of the whole session session deferred is fired when transfer is finished """ return self._createSession(*args, **kwargs)[DEFER_KEY] def _createSession(self, file_obj, to_jid, sid, profile): """Called when a bytestream is imminent @param file_obj(file): File object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id @param profile: %(doc_profile)s @return (dict): session data """ client = self.host.getClient(profile) if sid in client.xep_0065_current_stream: raise exceptions.ConflictError(u'A session with this id already exists !') session_data = client.xep_0065_current_stream[sid] = \ {'id': sid, DEFER_KEY: defer.Deferred(), 'to': to_jid, 'file_obj': file_obj, 'seq': -1, # FIXME: to check 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), } return session_data def getSession(self, session_hash, profile): """Return session data @param session_hash(unicode): hash of the session hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 @param profile(None, unicode): profile of the peer None is used only if profile is unknown (this is only the case for incoming request received by Socks5ServerFactory). None must only be used by Socks5ServerFactory. See comments below for details @return (dict): session data """ if profile is None: try: profile = self.hash_profiles_map[session_hash] except KeyError as e: log.warning(u"The requested session doesn't exists !") raise e client = self.host.getClient(profile) return client._s5b_sessions[session_hash] def registerHash(self, *args, **kwargs): """like [_registerHash] but resutrn the session deferred instead of the whole session session deferred is fired when transfer is finished """ return self._registerHash(*args, **kwargs)[DEFER_KEY] def _registerHash(self, session_hash, file_obj, profile): """Create a session_data associated to hash @param session_hash(str): hash of the session @param file_obj(file): file-like object @param profile: %(doc_profile)s return (dict): session data """ client = self.host.getClient(profile) assert session_hash not in client._s5b_sessions session_data = client._s5b_sessions[session_hash] = { "file": file_obj, DEFER_KEY: defer.Deferred(), } if session_hash in self.hash_profiles_map: # The only case when 2 profiles want to register the same hash # is when they are on the same instance log.info(u"Both Socks5 peers are on the same instance") # XXX:If both peers are on the same instance, they'll register the same # session_hash, so we'll have 2 profiles for the same hash. The first # one will be the responder (and so the second one the initiator). # As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4), # responder will handle the Socks5 server. Only the server will use # self.hash_profiles_map to get the profile, so we can ignore the second # one (the initiator profile). # There is no easy way to known if the incoming connection # to the Socks5Server is from initiator or responder, so this seams a # reasonable workaround. else: self.hash_profiles_map[session_hash] = profile return session_data def streamQuery(self, iq_elt, profile): """Get file using byte stream""" log.debug(_("BS stream query")) client = self.host.getClient(profile) if not client: raise ProfileNotInCacheError xmlstream = client.xmlstream iq_elt.handled = True query_elt = iq_elt.firstChildElement() sid = query_elt.getAttribute("sid") streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) if not sid in client.xep_0065_current_stream: log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) return client.xep_0065_current_stream[sid]['timer'].cancel() client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream if not streamhost_elts: log.warning(_(u"No streamhost found in stream query %s" % sid)) self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case sh_host = streamhost_elt.getAttribute("host") sh_port = streamhost_elt.getAttribute("port") sh_jid = streamhost_elt.getAttribute("jid") if not sh_host or not sh_port or not sh_jid: log.warning(_("incomplete streamhost element")) self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) reactor.connectTCP(sh_host, int(sh_port), factory) def activateStream(self, sid, iq_id, profile): client = self.host.getClient(profile) log.debug(_("activating stream")) result = domish.Element((None, 'iq')) session_data = client.xep_0065_current_stream[sid] result['type'] = 'result' result['id'] = iq_id result['from'] = session_data["to"].full() result['to'] = session_data["from"].full() query = result.addElement('query', NS_BS) query['sid'] = sid streamhost = query.addElement('streamhost-used') streamhost['jid'] = session_data["streamhost"][2] session_data["xmlstream"].send(result) def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): """Not acceptable error used when the stream is not expected or something is going wrong @param iq_id: IQ id @param to_jid: addressee @param xmlstream: XML stream to use to send the error""" result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = iq_id result['to'] = to_jid error_el = result.addElement('error') error_el['type'] = 'modify' error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) xmlstream.send(result) def sendBadRequestError(self, iq_id, to_jid, xmlstream): """Not acceptable error used when the stream is not expected or something is going wrong @param iq_id: IQ id @param to_jid: addressee @param xmlstream: XML stream to use to send the error""" result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = iq_id result['to'] = to_jid error_el = result.addElement('error') error_el['type'] = 'cancel' error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) xmlstream.send(result) class XEP_0065_handler(XMPPHandler): implements(iwokkel.IDisco) def __init__(self, plugin_parent): self.plugin_parent = plugin_parent self.host = plugin_parent.host def connectionInitialized(self): self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_BS)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []