diff src/plugins/plugin_xep_0065.py @ 223:86d249b6d9b7

Files reorganisation
author Goffi <goffi@goffi.org>
date Wed, 29 Dec 2010 01:06:29 +0100
parents plugins/plugin_xep_0065.py@bd24f2aed80c
children b1794cbb88e5
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/plugin_xep_0065.py	Wed Dec 29 01:06:29 2010 +0100
@@ -0,0 +1,558 @@
+#!/usr/bin/python
+#-*- coding: utf-8 -*-
+"""
+SAT plugin for managing xep-0065
+
+Copyright (C)
+2002-2004   Dave Smith (dizzyd@jabber.org)
+2007-2008   Fabio Forno (xmpp:ff@jabber.bluendo.com)
+2009, 2010  Jérôme Poisson (goffi@goffi.org)
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+--
+
+This program is based on proxy65 (http://code.google.com/p/proxy65),
+originaly written by David Smith and modified by Fabio Forno.
+It is sublicensed under GPL v3 (or any later version) as allowed by the original
+license.
+
+--
+
+Here is a copy of the original license:
+
+Copyright (C) 
+2002-2004   Dave Smith (dizzyd@jabber.org)
+2007-2008   Fabio Forno (xmpp:ff@jabber.bluendo.com)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
+from logging import debug, info, error
+from twisted.internet import protocol, reactor
+from twisted.protocols.basic import FileSender
+from twisted.words.xish import domish
+from twisted.web.client import getPage
+import struct
+import urllib
+import hashlib, pdb
+
+from zope.interface import implements
+
+try:
+    from twisted.words.protocols.xmlstream import XMPPHandler
+except ImportError:
+    from wokkel.subprotocols import XMPPHandler
+
+from wokkel import disco, iwokkel
+
+IQ_SET = '/iq[@type="set"]'
+NS_BS = 'http://jabber.org/protocol/bytestreams'
+BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
+
+
+
+PLUGIN_INFO = {
+"name": "XEP 0065 Plugin",
+"import_name": "XEP_0065",
+"type": "XEP",
+"protocols": ["XEP-0065"],
+"main": "XEP_0065",
+"handler": "yes",
+"description": _("""Implementation of SOCKS5 Bytestreams""")
+}
+
+STATE_INITIAL = 0
+STATE_AUTH    = 1
+STATE_REQUEST = 2
+STATE_READY   = 3
+STATE_AUTH_USERPASS = 4
+STATE_TARGET_INITIAL = 5
+STATE_TARGET_AUTH    = 6
+STATE_TARGET_REQUEST = 7
+STATE_TARGET_READY   = 8
+STATE_LAST    = 9
+
+STATE_CONNECT_PENDING = STATE_LAST + 1
+
+SOCKS5_VER = 0x05
+
+ADDR_IPV4 = 0x01
+ADDR_DOMAINNAME = 0x03
+ADDR_IPV6 = 0x04
+
+CMD_CONNECT = 0x01
+CMD_BIND = 0x02
+CMD_UDPASSOC = 0x03
+
+AUTHMECH_ANON = 0x00
+AUTHMECH_USERPASS = 0x02
+AUTHMECH_INVALID = 0xFF
+
+REPLY_SUCCESS = 0x00
+REPLY_GENERAL_FAILUR = 0x01
+REPLY_CONN_NOT_ALLOWED = 0x02
+REPLY_NETWORK_UNREACHABLE = 0x03
+REPLY_HOST_UNREACHABLE = 0x04
+REPLY_CONN_REFUSED = 0x05
+REPLY_TTL_EXPIRED = 0x06
+REPLY_CMD_NOT_SUPPORTED = 0x07
+REPLY_ADDR_NOT_SUPPORTED = 0x08
+
+
+
+
+
+class SOCKSv5(protocol.Protocol, FileSender):
+    def __init__(self):
+        debug(_("Protocol init"))
+        self.state = STATE_INITIAL
+        self.buf = ""
+        self.supportedAuthMechs = [ AUTHMECH_ANON ]
+        self.supportedAddrs = [ ADDR_DOMAINNAME ]
+        self.enabledCommands = [ CMD_CONNECT ]
+        self.peersock = None
+        self.addressType = 0
+        self.requestType = 0
+        self.activeConns = {}
+        self.pendingConns = {}
+        self.transfered = 0 #nb of bytes already copied
+
+    def _startNegotiation(self):
+        debug("_startNegotiation")
+        self.state = STATE_TARGET_AUTH
+        self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
+
+    def _parseNegotiation(self):
+        debug("_parseNegotiation")
+        try:
+            # Parse out data
+            ver, nmethod = struct.unpack('!BB', self.buf[:2])
+            methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod+2])
+
+            # Ensure version is correct
+            if ver != 5:
+                self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
+                self.transport.loseConnection()
+                return
+
+            # Trim off front of the buffer
+            self.buf = self.buf[nmethod+2:]
+            
+            # Check for supported auth mechs
+            for m in self.supportedAuthMechs:
+                if m in methods:
+                    # Update internal state, according to selected method
+                    if m == AUTHMECH_ANON:
+                        self.state = STATE_REQUEST
+                    elif m == AUTHMECH_USERPASS:
+                        self.state = STATE_AUTH_USERPASS
+                    # Complete negotiation w/ this method
+                    self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
+                    return
+
+            # No supported mechs found, notify client and close the connection
+            self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
+            self.transport.loseConnection()
+        except struct.error:
+            pass
+
+    def _parseUserPass(self):
+        debug("_parseUserPass")
+        try:
+            # Parse out data
+            ver, ulen = struct.unpack('BB', self.buf[:2])
+            uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2])
+            plen, = struct.unpack('B', self.buf[ulen + 2])
+            password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen])
+            # Trim off fron of the buffer
+            self.buf = self.buf[3 + ulen + plen:]
+            # Fire event to authenticate user
+            if self.authenticateUserPass(uname, password):
+                # Signal success
+                self.state = STATE_REQUEST
+                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00))
+            else:
+                # Signal failure
+                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01))
+                self.transport.loseConnection()
+        except struct.error:
+            pass
+
+    def sendErrorReply(self, errorcode):
+        debug("sendErrorReply")
+        # Any other address types are not supported
+        result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
+        self.transport.write(result)
+        self.transport.loseConnection()
+    
+    def addConnection(self, address, connection):
+        info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection})
+        olist = self.pendingConns.get(address, [])
+        if len(olist) <= 1:
+            olist.append(connection)
+            self.pendingConns[address] = olist
+            return True
+        else:
+            return False
+
+    def removePendingConnection(self, address, connection):
+        olist = self.pendingConns[address]
+        if len(olist) == 1:
+            del self.pendingConns[address]
+        else:
+            olist.remove(connection)
+            self.pendingConns[address] = olist
+
+    def removeActiveConnection(self, address):
+        del self.activeConns[address]
+
+    def _parseRequest(self):
+        debug("_parseRequest")
+        try:
+            # Parse out data and trim buffer accordingly
+            ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
+
+            # Ensure we actually support the requested address type
+            if self.addressType not in self.supportedAddrs:
+                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Deal with addresses
+            if self.addressType == ADDR_IPV4:
+                addr, port = struct.unpack('!IH', self.buf[4:10])
+                self.buf = self.buf[10:]
+            elif self.addressType == ADDR_DOMAINNAME:            
+                nlen = ord(self.buf[4])
+                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr):]
+            else:
+                # Any other address types are not supported
+                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Ensure command is supported
+            if cmd not in self.enabledCommands:
+                # Send a not supported error
+                self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
+                return
+
+            # Process the command
+            if cmd == CMD_CONNECT:
+                self.connectRequested(addr, port)
+            elif cmd == CMD_BIND:
+                self.bindRequested(addr, port)
+            else:
+                # Any other command is not supported
+                self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
+
+        except struct.error, why:
+            return None
+
+    def _makeRequest(self):
+        debug("_makeRequest")
+        self.state = STATE_TARGET_REQUEST
+        sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest()
+        request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0)
+        self.transport.write(request)
+
+    def _parseRequestReply(self):
+        debug("_parseRequestReply")
+        try:
+            ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
+            # Ensure we actually support the requested address type
+            if self.addressType not in self.supportedAddrs:
+                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Deal with addresses
+            if self.addressType == ADDR_IPV4:
+                addr, port = struct.unpack('!IH', self.buf[4:10])
+                self.buf = self.buf[10:]
+            elif self.addressType == ADDR_DOMAINNAME:            
+                nlen = ord(self.buf[4])
+                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr):]
+            else:
+                # Any other address types are not supported
+                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
+                return
+
+            # Ensure reply is OK
+            if rep != REPLY_SUCCESS:
+                self.loseConnection()
+                return
+
+            debug(_("Saving file in %s."), self.data["dest_path"])
+            self.dest_file = open(self.data["dest_path"], 'w')
+            self.state = STATE_TARGET_READY
+            self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream)
+
+
+        except struct.error, why:
+            return None
+
+    def connectionMade(self):
+        debug("connectionMade (mode = %s)" % self.mode)
+        self.host.registerProgressCB(self.transfert_id, self.getProgress)
+
+        if self.mode == "target":
+            self.state = STATE_TARGET_INITIAL
+            self._startNegotiation()
+
+    def connectRequested(self, addr, port):
+        debug("connectRequested")
+        # Check for special connect to the namespace -- this signifies that the client
+        # is just checking to ensure it can connect to the streamhost
+        if addr == "http://jabber.org/protocol/bytestreams":
+            self.connectCompleted(addr, 0)
+            self.transport.loseConnection()
+            return
+            
+        # Save addr, for cleanup
+        self.addr = addr
+        
+        # Check to see if the requested address is already
+        # activated -- send an error if so
+        if addr in self.activeConns:
+            self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED)
+            return
+
+        # Add this address to the pending connections
+        if self.addConnection(addr, self):
+            self.connectCompleted(addr, 0)
+            self.transport.stopReading()
+        else:
+            self.sendErrorReply(socks5.REPLY_CONN_REFUSED)
+
+    def getProgress(self, data):
+        """Fill data with position of current transfert"""
+        try:
+            data["position"] = str(self.dest_file.tell())
+            data["size"] = self.filesize
+        except (ValueError, AttributeError):
+            pass
+
+    def fileTransfered(self, d):
+        info(_("File transfer completed, closing connection"))
+        self.transport.loseConnection()
+        try:
+            self.dest_file.close()
+        except:
+            pass
+
+    def updateTransfered(self, data):
+        self.transfered+=len(data)
+        return data
+
+    def connectCompleted(self, remotehost, remoteport):
+        debug("connectCompleted")
+        if self.addressType == ADDR_IPV4:
+            result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
+        elif self.addressType == ADDR_DOMAINNAME:
+            result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
+                                 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
+        self.transport.write(result)
+        self.state = STATE_READY
+        self.dest_file=open(self.filepath)
+        d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered)
+        d.addCallback(self.fileTransfered)
+    
+    def bindRequested(self, addr, port):
+        pass
+    
+    def authenticateUserPass(self, user, passwd):
+        debug("User/pass: %s/%s", user, passwd)
+        return True
+
+    def dataReceived(self, buf):
+        if self.state == STATE_TARGET_READY:
+            self.dest_file.write(buf)
+            self.transfered+=len(buf)
+            return
+
+        self.buf = self.buf + buf
+        if self.state == STATE_INITIAL:
+            self._parseNegotiation()
+        if self.state == STATE_AUTH_USERPASS:
+            self._parseUserPass()
+        if self.state == STATE_REQUEST:
+            self._parseRequest()
+        if self.state == STATE_TARGET_AUTH:
+            ver, method = struct.unpack('!BB', buf)
+            self.buf = self.buf[2:]
+            if ver!=SOCKS5_VER or method!=AUTHMECH_ANON:
+                self.transport.loseConnection()
+            else:
+                self._makeRequest()
+        if self.state == STATE_TARGET_REQUEST:
+            self._parseRequestReply()
+
+
+    def clientConnectionLost(self, reason):
+        debug("clientConnectionLost")
+        self.transport.loseConnection()
+
+    def connectionLost(self, reason):
+        debug("connectionLost")
+        self.host.removeProgressCB(self.transfert_id)
+        if self.state == STATE_CONNECT_PENDING:
+            self.removePendingConnection(self.addr, self)
+        else:
+            self.transport.unregisterProducer()
+            if self.peersock != None:
+                self.peersock.peersock = None
+                self.peersock.transport.unregisterProducer()
+                self.peersock = None
+                self.removeActiveConnection(self.addr)
+
+class Socks5ServerFactory(protocol.ServerFactory):
+    protocol = SOCKSv5
+    protocol.mode = "initiator"  #FIXME: Q&D way, fix it 
+
+
+    def startedConnecting(self, connector):
+        debug (_("Socks 5 server connection started"))
+
+    def clientConnectionLost(self, connector, reason):
+        debug (_("Socks 5 server connection lost (reason: %s)"), reason)
+
+class Socks5ClientFactory(protocol.ClientFactory):
+    protocol = SOCKSv5
+    protocol.mode = "target"  #FIXME: Q&D way, fix it 
+
+    def startedConnecting(self, connector):
+        debug (_("Socks 5 client connection started"))
+
+    def clientConnectionLost(self, connector, reason):
+        debug (_("Socks 5 client connection lost (reason: %s)"), reason)
+
+
+class XEP_0065():
+    
+    params = """
+    <params>
+    <general>
+    <category name="File Transfert">
+        <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
+        <param name="Port" value="28915" type="string" />
+    </category>
+    </general>
+    </params>
+    """
+
+    def __init__(self, host):
+        info(_("Plugin XEP_0065 initialization"))
+        self.host = host
+        debug(_("registering"))
+        self.server_factory = Socks5ServerFactory()
+        self.server_factory.protocol.host = self.host #needed for progress CB
+        self.client_factory = Socks5ClientFactory()
+
+        #parameters
+        host.memory.importParams(XEP_0065.params)
+        host.memory.setDefault("IP", "File Transfert", self.getExternalIP)
+        
+        port = int(self.host.memory.getParamA("Port", "File Transfert"))
+        info(_("Launching Socks5 Stream server on port %d"), port)
+        reactor.listenTCP(port, self.server_factory)
+    
+    def getHandler(self, profile):
+        return XEP_0065_handler(self)  
+   
+    def getExternalIP(self):
+        """Return IP visible from outside, by asking to a website"""
+        return getPage("http://www.goffi.org/sat_tools/get_ip.php")
+
+    def setData(self, data, id):
+        self.data = data
+        self.transfert_id = id
+        
+    def sendFile(self, id, filepath, size):
+        #lauching socks5 initiator
+        debug(_("Launching socks5 initiator"))
+        self.server_factory.protocol.mode = "initiator"
+        self.server_factory.protocol.filepath = filepath
+        self.server_factory.protocol.filesize = size
+        self.server_factory.protocol.transfert_id = id
+
+    def getFile(self, iq, profile_key='@DEFAULT@'):
+        """Get file using byte stream"""
+        client = self.host.getClient(profile_key)
+        assert(client)
+        iq.handled = True
+        SI_elem = iq.firstChildElement()
+        IQ_id = iq['id']
+        for element in SI_elem.elements():
+            if element.name == "streamhost":
+                info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']})
+                factory = self.client_factory
+                self.server_factory.protocol.mode = "target"
+                factory.protocol.host = self.host #needed for progress CB
+                factory.protocol.xmlstream = client.xmlstream
+                factory.protocol.data = self.data
+                factory.protocol.transfert_id = self.transfert_id
+                factory.protocol.filesize = self.data["size"]
+                factory.protocol.sid = SI_elem['sid']
+                factory.protocol.initiator_jid = element['jid']
+                factory.protocol.target_jid = client.jid.full()
+                factory.protocol.IQ_id = IQ_id
+                factory.protocol.activateCB = self.activateStream
+                reactor.connectTCP(element['host'], int(element['port']), factory)
+                
+    def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream):
+        debug(_("activating stream"))
+        result = domish.Element(('', 'iq'))
+        result['type'] = 'result'
+        result['id'] = IQ_id
+        result['from'] = from_jid
+        result['to'] = to_jid
+        query = result.addElement('query', 'http://jabber.org/protocol/bytestreams')
+        query['sid'] = sid
+        streamhost = query.addElement('streamhost-used')
+        streamhost['jid'] = to_jid  #FIXME: use real streamhost
+        xmlstream.send(result)
+
+class XEP_0065_handler(XMPPHandler):
+    implements(iwokkel.IDisco)
+    
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+        self.host = plugin_parent.host
+    
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile)
+
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
+        return [disco.DiscoFeature(NS_BS)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
+        return []