Mercurial > libervia-backend
view src/plugins/plugin_xep_0065.py @ 324:b069055320b1
core: fixed bad profile check in connect method
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 10 May 2011 15:46:20 +0200 |
parents | 7c79d4a8c9e6 |
children | 8f3551ceee17 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- """ SAT plugin for managing xep-0065 Copyright (C) 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. -- This program is based on proxy65 (http://code.google.com/p/proxy65), originaly written by David Smith and modified by Fabio Forno. It is sublicensed under GPL v3 (or any later version) as allowed by the original license. -- Here is a copy of the original license: Copyright (C) 2002-2004 Dave Smith (dizzyd@jabber.org) 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from logging import debug, info, error from twisted.internet import protocol, reactor from twisted.protocols.basic import FileSender from twisted.words.xish import domish from twisted.web.client import getPage import struct import urllib import hashlib, pdb from zope.interface import implements try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler from wokkel import disco, iwokkel IQ_SET = '/iq[@type="set"]' NS_BS = 'http://jabber.org/protocol/bytestreams' BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' PLUGIN_INFO = { "name": "XEP 0065 Plugin", "import_name": "XEP-0065", "type": "XEP", "protocols": ["XEP-0065"], "main": "XEP_0065", "handler": "yes", "description": _("""Implementation of SOCKS5 Bytestreams""") } STATE_INITIAL = 0 STATE_AUTH = 1 STATE_REQUEST = 2 STATE_READY = 3 STATE_AUTH_USERPASS = 4 STATE_TARGET_INITIAL = 5 STATE_TARGET_AUTH = 6 STATE_TARGET_REQUEST = 7 STATE_TARGET_READY = 8 STATE_LAST = 9 STATE_CONNECT_PENDING = STATE_LAST + 1 SOCKS5_VER = 0x05 ADDR_IPV4 = 0x01 ADDR_DOMAINNAME = 0x03 ADDR_IPV6 = 0x04 CMD_CONNECT = 0x01 CMD_BIND = 0x02 CMD_UDPASSOC = 0x03 AUTHMECH_ANON = 0x00 AUTHMECH_USERPASS = 0x02 AUTHMECH_INVALID = 0xFF REPLY_SUCCESS = 0x00 REPLY_GENERAL_FAILUR = 0x01 REPLY_CONN_NOT_ALLOWED = 0x02 REPLY_NETWORK_UNREACHABLE = 0x03 REPLY_HOST_UNREACHABLE = 0x04 REPLY_CONN_REFUSED = 0x05 REPLY_TTL_EXPIRED = 0x06 REPLY_CMD_NOT_SUPPORTED = 0x07 REPLY_ADDR_NOT_SUPPORTED = 0x08 class SOCKSv5(protocol.Protocol, FileSender): def __init__(self): debug(_("Protocol init")) self.state = STATE_INITIAL self.buf = "" self.supportedAuthMechs = [ AUTHMECH_ANON ] self.supportedAddrs = [ ADDR_DOMAINNAME ] self.enabledCommands = [ CMD_CONNECT ] self.peersock = None self.addressType = 0 self.requestType = 0 self.activeConns = {} self.pendingConns = {} self.transfered = 0 #nb of bytes already copied def _startNegotiation(self): debug("_startNegotiation") self.state = STATE_TARGET_AUTH self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): debug("_parseNegotiation") try: # Parse out data ver, nmethod = struct.unpack('!BB', self.buf[:2]) methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod+2]) # Ensure version is correct if ver != 5: self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() return # Trim off front of the buffer self.buf = self.buf[nmethod+2:] # Check for supported auth mechs for m in self.supportedAuthMechs: if m in methods: # Update internal state, according to selected method if m == AUTHMECH_ANON: self.state = STATE_REQUEST elif m == AUTHMECH_USERPASS: self.state = STATE_AUTH_USERPASS # Complete negotiation w/ this method self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) return # No supported mechs found, notify client and close the connection self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() except struct.error: pass def _parseUserPass(self): debug("_parseUserPass") try: # Parse out data ver, ulen = struct.unpack('BB', self.buf[:2]) uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) plen, = struct.unpack('B', self.buf[ulen + 2]) password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) # Trim off fron of the buffer self.buf = self.buf[3 + ulen + plen:] # Fire event to authenticate user if self.authenticateUserPass(uname, password): # Signal success self.state = STATE_REQUEST self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) else: # Signal failure self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) self.transport.loseConnection() except struct.error: pass def sendErrorReply(self, errorcode): debug("sendErrorReply") # Any other address types are not supported result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) self.transport.loseConnection() def addConnection(self, address, connection): info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection}) olist = self.pendingConns.get(address, []) if len(olist) <= 1: olist.append(connection) self.pendingConns[address] = olist return True else: return False def removePendingConnection(self, address, connection): olist = self.pendingConns[address] if len(olist) == 1: del self.pendingConns[address] else: olist.remove(connection) self.pendingConns[address] = olist def removeActiveConnection(self, address): del self.activeConns[address] def _parseRequest(self): debug("_parseRequest") try: # Parse out data and trim buffer accordingly ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure command is supported if cmd not in self.enabledCommands: # Send a not supported error self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) return # Process the command if cmd == CMD_CONNECT: self.connectRequested(addr, port) elif cmd == CMD_BIND: self.bindRequested(addr, port) else: # Any other command is not supported self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) except struct.error, why: return None def _makeRequest(self): debug("_makeRequest") self.state = STATE_TARGET_REQUEST sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest() request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) self.transport.write(request) def _parseRequestReply(self): debug("_parseRequestReply") try: ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure reply is OK if rep != REPLY_SUCCESS: self.loseConnection() return debug(_("Saving file in %s."), self.data["dest_path"]) self.dest_file = open(self.data["dest_path"], 'w') self.state = STATE_TARGET_READY self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream) except struct.error, why: return None def connectionMade(self): debug("connectionMade (mode = %s)" % self.mode) self.host.registerProgressCB(self.transfert_id, self.getProgress) if self.mode == "target": self.state = STATE_TARGET_INITIAL self._startNegotiation() def connectRequested(self, addr, port): debug("connectRequested") # Check for special connect to the namespace -- this signifies that the client # is just checking to ensure it can connect to the streamhost if addr == "http://jabber.org/protocol/bytestreams": self.connectCompleted(addr, 0) self.transport.loseConnection() return # Save addr, for cleanup self.addr = addr # Check to see if the requested address is already # activated -- send an error if so if addr in self.activeConns: self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED) return # Add this address to the pending connections if self.addConnection(addr, self): self.connectCompleted(addr, 0) self.transport.stopReading() else: self.sendErrorReply(socks5.REPLY_CONN_REFUSED) def getProgress(self, data): """Fill data with position of current transfert""" try: data["position"] = str(self.dest_file.tell()) data["size"] = self.filesize except (ValueError, AttributeError): pass def fileTransfered(self, d): info(_("File transfer completed, closing connection")) self.transport.loseConnection() try: self.dest_file.close() except: pass def updateTransfered(self, data): self.transfered+=len(data) return data def connectCompleted(self, remotehost, remoteport): debug("connectCompleted") if self.addressType == ADDR_IPV4: result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) elif self.addressType == ADDR_DOMAINNAME: result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) self.transport.write(result) self.state = STATE_READY self.dest_file=open(self.filepath) d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered) d.addCallback(self.fileTransfered) def bindRequested(self, addr, port): pass def authenticateUserPass(self, user, passwd): debug("User/pass: %s/%s", user, passwd) return True def dataReceived(self, buf): if self.state == STATE_TARGET_READY: self.dest_file.write(buf) self.transfered+=len(buf) return self.buf = self.buf + buf if self.state == STATE_INITIAL: self._parseNegotiation() if self.state == STATE_AUTH_USERPASS: self._parseUserPass() if self.state == STATE_REQUEST: self._parseRequest() if self.state == STATE_TARGET_AUTH: ver, method = struct.unpack('!BB', buf) self.buf = self.buf[2:] if ver!=SOCKS5_VER or method!=AUTHMECH_ANON: self.transport.loseConnection() else: self._makeRequest() if self.state == STATE_TARGET_REQUEST: self._parseRequestReply() def clientConnectionLost(self, reason): debug("clientConnectionLost") self.transport.loseConnection() def connectionLost(self, reason): debug("connectionLost") self.host.removeProgressCB(self.transfert_id) if self.state == STATE_CONNECT_PENDING: self.removePendingConnection(self.addr, self) else: self.transport.unregisterProducer() if self.peersock != None: self.peersock.peersock = None self.peersock.transport.unregisterProducer() self.peersock = None self.removeActiveConnection(self.addr) class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 protocol.mode = "initiator" #FIXME: Q&D way, fix it def startedConnecting(self, connector): debug (_("Socks 5 server connection started")) def clientConnectionLost(self, connector, reason): debug (_("Socks 5 server connection lost (reason: %s)"), reason) class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 protocol.mode = "target" #FIXME: Q&D way, fix it def startedConnecting(self, connector): debug (_("Socks 5 client connection started")) def clientConnectionLost(self, connector, reason): debug (_("Socks 5 client connection lost (reason: %s)"), reason) class XEP_0065(): params = """ <params> <general> <category name="File Transfert"> <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> <param name="Port" value="28915" type="string" /> </category> </general> </params> """ def __init__(self, host): info(_("Plugin XEP_0065 initialization")) self.host = host debug(_("registering")) self.server_factory = Socks5ServerFactory() self.server_factory.protocol.host = self.host #needed for progress CB self.client_factory = Socks5ClientFactory() #parameters host.memory.importParams(XEP_0065.params) host.memory.setDefault("IP", "File Transfert", self.getExternalIP) port = int(self.host.memory.getParamA("Port", "File Transfert")) info(_("Launching Socks5 Stream server on port %d"), port) reactor.listenTCP(port, self.server_factory) def getHandler(self, profile): return XEP_0065_handler(self) def getExternalIP(self): """Return IP visible from outside, by asking to a website""" return getPage("http://www.goffi.org/sat_tools/get_ip.php") def setData(self, data, id): self.data = data self.transfert_id = id def sendFile(self, id, filepath, size): #lauching socks5 initiator debug(_("Launching socks5 initiator")) self.server_factory.protocol.mode = "initiator" self.server_factory.protocol.filepath = filepath self.server_factory.protocol.filesize = size self.server_factory.protocol.transfert_id = id def getFile(self, iq, profile_key='@DEFAULT@'): """Get file using byte stream""" client = self.host.getClient(profile_key) assert(client) iq.handled = True SI_elem = iq.firstChildElement() IQ_id = iq['id'] for element in SI_elem.elements(): if element.name == "streamhost": info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']}) factory = self.client_factory self.server_factory.protocol.mode = "target" factory.protocol.host = self.host #needed for progress CB factory.protocol.xmlstream = client.xmlstream factory.protocol.data = self.data factory.protocol.transfert_id = self.transfert_id factory.protocol.filesize = self.data["size"] factory.protocol.sid = SI_elem['sid'] factory.protocol.initiator_jid = element['jid'] factory.protocol.target_jid = client.jid.full() factory.protocol.IQ_id = IQ_id factory.protocol.activateCB = self.activateStream reactor.connectTCP(element['host'], int(element['port']), factory) def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream): debug(_("activating stream")) result = domish.Element(('', 'iq')) result['type'] = 'result' result['id'] = IQ_id result['from'] = from_jid result['to'] = to_jid query = result.addElement('query', 'http://jabber.org/protocol/bytestreams') query['sid'] = sid streamhost = query.addElement('streamhost-used') streamhost['jid'] = to_jid #FIXME: use real streamhost xmlstream.send(result) class XEP_0065_handler(XMPPHandler): implements(iwokkel.IDisco) def __init__(self, plugin_parent): self.plugin_parent = plugin_parent self.host = plugin_parent.host def connectionInitialized(self): self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_BS)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []