Mercurial > libervia-backend
diff src/plugins/plugin_xep_0065.py @ 223:86d249b6d9b7
Files reorganisation
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 29 Dec 2010 01:06:29 +0100 |
parents | plugins/plugin_xep_0065.py@bd24f2aed80c |
children | b1794cbb88e5 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0065.py Wed Dec 29 01:06:29 2010 +0100 @@ -0,0 +1,558 @@ +#!/usr/bin/python +#-*- coding: utf-8 -*- +""" +SAT plugin for managing xep-0065 + +Copyright (C) +2002-2004 Dave Smith (dizzyd@jabber.org) +2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) +2009, 2010 Jérôme Poisson (goffi@goffi.org) + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + +-- + +This program is based on proxy65 (http://code.google.com/p/proxy65), +originaly written by David Smith and modified by Fabio Forno. +It is sublicensed under GPL v3 (or any later version) as allowed by the original +license. + +-- + +Here is a copy of the original license: + +Copyright (C) +2002-2004 Dave Smith (dizzyd@jabber.org) +2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +from logging import debug, info, error +from twisted.internet import protocol, reactor +from twisted.protocols.basic import FileSender +from twisted.words.xish import domish +from twisted.web.client import getPage +import struct +import urllib +import hashlib, pdb + +from zope.interface import implements + +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler + +from wokkel import disco, iwokkel + +IQ_SET = '/iq[@type="set"]' +NS_BS = 'http://jabber.org/protocol/bytestreams' +BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' + + + +PLUGIN_INFO = { +"name": "XEP 0065 Plugin", +"import_name": "XEP_0065", +"type": "XEP", +"protocols": ["XEP-0065"], +"main": "XEP_0065", +"handler": "yes", +"description": _("""Implementation of SOCKS5 Bytestreams""") +} + +STATE_INITIAL = 0 +STATE_AUTH = 1 +STATE_REQUEST = 2 +STATE_READY = 3 +STATE_AUTH_USERPASS = 4 +STATE_TARGET_INITIAL = 5 +STATE_TARGET_AUTH = 6 +STATE_TARGET_REQUEST = 7 +STATE_TARGET_READY = 8 +STATE_LAST = 9 + +STATE_CONNECT_PENDING = STATE_LAST + 1 + +SOCKS5_VER = 0x05 + +ADDR_IPV4 = 0x01 +ADDR_DOMAINNAME = 0x03 +ADDR_IPV6 = 0x04 + +CMD_CONNECT = 0x01 +CMD_BIND = 0x02 +CMD_UDPASSOC = 0x03 + +AUTHMECH_ANON = 0x00 +AUTHMECH_USERPASS = 0x02 +AUTHMECH_INVALID = 0xFF + +REPLY_SUCCESS = 0x00 +REPLY_GENERAL_FAILUR = 0x01 +REPLY_CONN_NOT_ALLOWED = 0x02 +REPLY_NETWORK_UNREACHABLE = 0x03 +REPLY_HOST_UNREACHABLE = 0x04 +REPLY_CONN_REFUSED = 0x05 +REPLY_TTL_EXPIRED = 0x06 +REPLY_CMD_NOT_SUPPORTED = 0x07 +REPLY_ADDR_NOT_SUPPORTED = 0x08 + + + + + +class SOCKSv5(protocol.Protocol, FileSender): + def __init__(self): + debug(_("Protocol init")) + self.state = STATE_INITIAL + self.buf = "" + self.supportedAuthMechs = [ AUTHMECH_ANON ] + self.supportedAddrs = [ ADDR_DOMAINNAME ] + self.enabledCommands = [ CMD_CONNECT ] + self.peersock = None + self.addressType = 0 + self.requestType = 0 + self.activeConns = {} + self.pendingConns = {} + self.transfered = 0 #nb of bytes already copied + + def _startNegotiation(self): + debug("_startNegotiation") + self.state = STATE_TARGET_AUTH + self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) + + def _parseNegotiation(self): + debug("_parseNegotiation") + try: + # Parse out data + ver, nmethod = struct.unpack('!BB', self.buf[:2]) + methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod+2]) + + # Ensure version is correct + if ver != 5: + self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + return + + # Trim off front of the buffer + self.buf = self.buf[nmethod+2:] + + # Check for supported auth mechs + for m in self.supportedAuthMechs: + if m in methods: + # Update internal state, according to selected method + if m == AUTHMECH_ANON: + self.state = STATE_REQUEST + elif m == AUTHMECH_USERPASS: + self.state = STATE_AUTH_USERPASS + # Complete negotiation w/ this method + self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) + return + + # No supported mechs found, notify client and close the connection + self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.loseConnection() + except struct.error: + pass + + def _parseUserPass(self): + debug("_parseUserPass") + try: + # Parse out data + ver, ulen = struct.unpack('BB', self.buf[:2]) + uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) + plen, = struct.unpack('B', self.buf[ulen + 2]) + password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) + # Trim off fron of the buffer + self.buf = self.buf[3 + ulen + plen:] + # Fire event to authenticate user + if self.authenticateUserPass(uname, password): + # Signal success + self.state = STATE_REQUEST + self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) + else: + # Signal failure + self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) + self.transport.loseConnection() + except struct.error: + pass + + def sendErrorReply(self, errorcode): + debug("sendErrorReply") + # Any other address types are not supported + result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) + self.transport.write(result) + self.transport.loseConnection() + + def addConnection(self, address, connection): + info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection}) + olist = self.pendingConns.get(address, []) + if len(olist) <= 1: + olist.append(connection) + self.pendingConns[address] = olist + return True + else: + return False + + def removePendingConnection(self, address, connection): + olist = self.pendingConns[address] + if len(olist) == 1: + del self.pendingConns[address] + else: + olist.remove(connection) + self.pendingConns[address] = olist + + def removeActiveConnection(self, address): + del self.activeConns[address] + + def _parseRequest(self): + debug("_parseRequest") + try: + # Parse out data and trim buffer accordingly + ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack('!IH', self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = ord(self.buf[4]) + addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr):] + else: + # Any other address types are not supported + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure command is supported + if cmd not in self.enabledCommands: + # Send a not supported error + self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + return + + # Process the command + if cmd == CMD_CONNECT: + self.connectRequested(addr, port) + elif cmd == CMD_BIND: + self.bindRequested(addr, port) + else: + # Any other command is not supported + self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + + except struct.error, why: + return None + + def _makeRequest(self): + debug("_makeRequest") + self.state = STATE_TARGET_REQUEST + sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest() + request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) + self.transport.write(request) + + def _parseRequestReply(self): + debug("_parseRequestReply") + try: + ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + # Ensure we actually support the requested address type + if self.addressType not in self.supportedAddrs: + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Deal with addresses + if self.addressType == ADDR_IPV4: + addr, port = struct.unpack('!IH', self.buf[4:10]) + self.buf = self.buf[10:] + elif self.addressType == ADDR_DOMAINNAME: + nlen = ord(self.buf[4]) + addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr):] + else: + # Any other address types are not supported + self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + return + + # Ensure reply is OK + if rep != REPLY_SUCCESS: + self.loseConnection() + return + + debug(_("Saving file in %s."), self.data["dest_path"]) + self.dest_file = open(self.data["dest_path"], 'w') + self.state = STATE_TARGET_READY + self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream) + + + except struct.error, why: + return None + + def connectionMade(self): + debug("connectionMade (mode = %s)" % self.mode) + self.host.registerProgressCB(self.transfert_id, self.getProgress) + + if self.mode == "target": + self.state = STATE_TARGET_INITIAL + self._startNegotiation() + + def connectRequested(self, addr, port): + debug("connectRequested") + # Check for special connect to the namespace -- this signifies that the client + # is just checking to ensure it can connect to the streamhost + if addr == "http://jabber.org/protocol/bytestreams": + self.connectCompleted(addr, 0) + self.transport.loseConnection() + return + + # Save addr, for cleanup + self.addr = addr + + # Check to see if the requested address is already + # activated -- send an error if so + if addr in self.activeConns: + self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED) + return + + # Add this address to the pending connections + if self.addConnection(addr, self): + self.connectCompleted(addr, 0) + self.transport.stopReading() + else: + self.sendErrorReply(socks5.REPLY_CONN_REFUSED) + + def getProgress(self, data): + """Fill data with position of current transfert""" + try: + data["position"] = str(self.dest_file.tell()) + data["size"] = self.filesize + except (ValueError, AttributeError): + pass + + def fileTransfered(self, d): + info(_("File transfer completed, closing connection")) + self.transport.loseConnection() + try: + self.dest_file.close() + except: + pass + + def updateTransfered(self, data): + self.transfered+=len(data) + return data + + def connectCompleted(self, remotehost, remoteport): + debug("connectCompleted") + if self.addressType == ADDR_IPV4: + result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) + elif self.addressType == ADDR_DOMAINNAME: + result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, + ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) + self.transport.write(result) + self.state = STATE_READY + self.dest_file=open(self.filepath) + d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered) + d.addCallback(self.fileTransfered) + + def bindRequested(self, addr, port): + pass + + def authenticateUserPass(self, user, passwd): + debug("User/pass: %s/%s", user, passwd) + return True + + def dataReceived(self, buf): + if self.state == STATE_TARGET_READY: + self.dest_file.write(buf) + self.transfered+=len(buf) + return + + self.buf = self.buf + buf + if self.state == STATE_INITIAL: + self._parseNegotiation() + if self.state == STATE_AUTH_USERPASS: + self._parseUserPass() + if self.state == STATE_REQUEST: + self._parseRequest() + if self.state == STATE_TARGET_AUTH: + ver, method = struct.unpack('!BB', buf) + self.buf = self.buf[2:] + if ver!=SOCKS5_VER or method!=AUTHMECH_ANON: + self.transport.loseConnection() + else: + self._makeRequest() + if self.state == STATE_TARGET_REQUEST: + self._parseRequestReply() + + + def clientConnectionLost(self, reason): + debug("clientConnectionLost") + self.transport.loseConnection() + + def connectionLost(self, reason): + debug("connectionLost") + self.host.removeProgressCB(self.transfert_id) + if self.state == STATE_CONNECT_PENDING: + self.removePendingConnection(self.addr, self) + else: + self.transport.unregisterProducer() + if self.peersock != None: + self.peersock.peersock = None + self.peersock.transport.unregisterProducer() + self.peersock = None + self.removeActiveConnection(self.addr) + +class Socks5ServerFactory(protocol.ServerFactory): + protocol = SOCKSv5 + protocol.mode = "initiator" #FIXME: Q&D way, fix it + + + def startedConnecting(self, connector): + debug (_("Socks 5 server connection started")) + + def clientConnectionLost(self, connector, reason): + debug (_("Socks 5 server connection lost (reason: %s)"), reason) + +class Socks5ClientFactory(protocol.ClientFactory): + protocol = SOCKSv5 + protocol.mode = "target" #FIXME: Q&D way, fix it + + def startedConnecting(self, connector): + debug (_("Socks 5 client connection started")) + + def clientConnectionLost(self, connector, reason): + debug (_("Socks 5 client connection lost (reason: %s)"), reason) + + +class XEP_0065(): + + params = """ + <params> + <general> + <category name="File Transfert"> + <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> + <param name="Port" value="28915" type="string" /> + </category> + </general> + </params> + """ + + def __init__(self, host): + info(_("Plugin XEP_0065 initialization")) + self.host = host + debug(_("registering")) + self.server_factory = Socks5ServerFactory() + self.server_factory.protocol.host = self.host #needed for progress CB + self.client_factory = Socks5ClientFactory() + + #parameters + host.memory.importParams(XEP_0065.params) + host.memory.setDefault("IP", "File Transfert", self.getExternalIP) + + port = int(self.host.memory.getParamA("Port", "File Transfert")) + info(_("Launching Socks5 Stream server on port %d"), port) + reactor.listenTCP(port, self.server_factory) + + def getHandler(self, profile): + return XEP_0065_handler(self) + + def getExternalIP(self): + """Return IP visible from outside, by asking to a website""" + return getPage("http://www.goffi.org/sat_tools/get_ip.php") + + def setData(self, data, id): + self.data = data + self.transfert_id = id + + def sendFile(self, id, filepath, size): + #lauching socks5 initiator + debug(_("Launching socks5 initiator")) + self.server_factory.protocol.mode = "initiator" + self.server_factory.protocol.filepath = filepath + self.server_factory.protocol.filesize = size + self.server_factory.protocol.transfert_id = id + + def getFile(self, iq, profile_key='@DEFAULT@'): + """Get file using byte stream""" + client = self.host.getClient(profile_key) + assert(client) + iq.handled = True + SI_elem = iq.firstChildElement() + IQ_id = iq['id'] + for element in SI_elem.elements(): + if element.name == "streamhost": + info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']}) + factory = self.client_factory + self.server_factory.protocol.mode = "target" + factory.protocol.host = self.host #needed for progress CB + factory.protocol.xmlstream = client.xmlstream + factory.protocol.data = self.data + factory.protocol.transfert_id = self.transfert_id + factory.protocol.filesize = self.data["size"] + factory.protocol.sid = SI_elem['sid'] + factory.protocol.initiator_jid = element['jid'] + factory.protocol.target_jid = client.jid.full() + factory.protocol.IQ_id = IQ_id + factory.protocol.activateCB = self.activateStream + reactor.connectTCP(element['host'], int(element['port']), factory) + + def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream): + debug(_("activating stream")) + result = domish.Element(('', 'iq')) + result['type'] = 'result' + result['id'] = IQ_id + result['from'] = from_jid + result['to'] = to_jid + query = result.addElement('query', 'http://jabber.org/protocol/bytestreams') + query['sid'] = sid + streamhost = query.addElement('streamhost-used') + streamhost['jid'] = to_jid #FIXME: use real streamhost + xmlstream.send(result) + +class XEP_0065_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + + def connectionInitialized(self): + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile) + + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_BS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return []