view src/plugins/plugin_xep_0065.py @ 388:a617af506154

core: progress callback now use id as first parameter
author Goffi <goffi@goffi.org>
date Thu, 29 Sep 2011 12:14:19 +0200
parents 7c79d4a8c9e6
children 8f3551ceee17
line wrap: on
line source

#!/usr/bin/python
#-*- coding: utf-8 -*-
"""
SAT plugin for managing xep-0065

Copyright (C)
2002, 2003, 2004   Dave Smith (dizzyd@jabber.org)
2007, 2008         Fabio Forno (xmpp:ff@jabber.bluendo.com)
2009, 2010, 2011   Jérôme Poisson (goffi@goffi.org)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

--

This program is based on proxy65 (http://code.google.com/p/proxy65),
originaly written by David Smith and modified by Fabio Forno.
It is sublicensed under GPL v3 (or any later version) as allowed by the original
license.

--

Here is a copy of the original license:

Copyright (C) 
2002-2004   Dave Smith (dizzyd@jabber.org)
2007-2008   Fabio Forno (xmpp:ff@jabber.bluendo.com)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

from logging import debug, info, error
from twisted.internet import protocol, reactor
from twisted.protocols.basic import FileSender
from twisted.words.xish import domish
from twisted.web.client import getPage
import struct
import urllib
import hashlib, pdb

from zope.interface import implements

try:
    from twisted.words.protocols.xmlstream import XMPPHandler
except ImportError:
    from wokkel.subprotocols import XMPPHandler

from wokkel import disco, iwokkel

IQ_SET = '/iq[@type="set"]'
NS_BS = 'http://jabber.org/protocol/bytestreams'
BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'



PLUGIN_INFO = {
"name": "XEP 0065 Plugin",
"import_name": "XEP-0065",
"type": "XEP",
"protocols": ["XEP-0065"],
"main": "XEP_0065",
"handler": "yes",
"description": _("""Implementation of SOCKS5 Bytestreams""")
}

STATE_INITIAL = 0
STATE_AUTH    = 1
STATE_REQUEST = 2
STATE_READY   = 3
STATE_AUTH_USERPASS = 4
STATE_TARGET_INITIAL = 5
STATE_TARGET_AUTH    = 6
STATE_TARGET_REQUEST = 7
STATE_TARGET_READY   = 8
STATE_LAST    = 9

STATE_CONNECT_PENDING = STATE_LAST + 1

SOCKS5_VER = 0x05

ADDR_IPV4 = 0x01
ADDR_DOMAINNAME = 0x03
ADDR_IPV6 = 0x04

CMD_CONNECT = 0x01
CMD_BIND = 0x02
CMD_UDPASSOC = 0x03

AUTHMECH_ANON = 0x00
AUTHMECH_USERPASS = 0x02
AUTHMECH_INVALID = 0xFF

REPLY_SUCCESS = 0x00
REPLY_GENERAL_FAILUR = 0x01
REPLY_CONN_NOT_ALLOWED = 0x02
REPLY_NETWORK_UNREACHABLE = 0x03
REPLY_HOST_UNREACHABLE = 0x04
REPLY_CONN_REFUSED = 0x05
REPLY_TTL_EXPIRED = 0x06
REPLY_CMD_NOT_SUPPORTED = 0x07
REPLY_ADDR_NOT_SUPPORTED = 0x08





class SOCKSv5(protocol.Protocol, FileSender):
    def __init__(self):
        debug(_("Protocol init"))
        self.state = STATE_INITIAL
        self.buf = ""
        self.supportedAuthMechs = [ AUTHMECH_ANON ]
        self.supportedAddrs = [ ADDR_DOMAINNAME ]
        self.enabledCommands = [ CMD_CONNECT ]
        self.peersock = None
        self.addressType = 0
        self.requestType = 0
        self.activeConns = {}
        self.pendingConns = {}
        self.transfered = 0 #nb of bytes already copied

    def _startNegotiation(self):
        debug("_startNegotiation")
        self.state = STATE_TARGET_AUTH
        self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))

    def _parseNegotiation(self):
        debug("_parseNegotiation")
        try:
            # Parse out data
            ver, nmethod = struct.unpack('!BB', self.buf[:2])
            methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod+2])

            # Ensure version is correct
            if ver != 5:
                self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
                self.transport.loseConnection()
                return

            # Trim off front of the buffer
            self.buf = self.buf[nmethod+2:]
            
            # Check for supported auth mechs
            for m in self.supportedAuthMechs:
                if m in methods:
                    # Update internal state, according to selected method
                    if m == AUTHMECH_ANON:
                        self.state = STATE_REQUEST
                    elif m == AUTHMECH_USERPASS:
                        self.state = STATE_AUTH_USERPASS
                    # Complete negotiation w/ this method
                    self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
                    return

            # No supported mechs found, notify client and close the connection
            self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
            self.transport.loseConnection()
        except struct.error:
            pass

    def _parseUserPass(self):
        debug("_parseUserPass")
        try:
            # Parse out data
            ver, ulen = struct.unpack('BB', self.buf[:2])
            uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2])
            plen, = struct.unpack('B', self.buf[ulen + 2])
            password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen])
            # Trim off fron of the buffer
            self.buf = self.buf[3 + ulen + plen:]
            # Fire event to authenticate user
            if self.authenticateUserPass(uname, password):
                # Signal success
                self.state = STATE_REQUEST
                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00))
            else:
                # Signal failure
                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01))
                self.transport.loseConnection()
        except struct.error:
            pass

    def sendErrorReply(self, errorcode):
        debug("sendErrorReply")
        # Any other address types are not supported
        result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
        self.transport.write(result)
        self.transport.loseConnection()
    
    def addConnection(self, address, connection):
        info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection})
        olist = self.pendingConns.get(address, [])
        if len(olist) <= 1:
            olist.append(connection)
            self.pendingConns[address] = olist
            return True
        else:
            return False

    def removePendingConnection(self, address, connection):
        olist = self.pendingConns[address]
        if len(olist) == 1:
            del self.pendingConns[address]
        else:
            olist.remove(connection)
            self.pendingConns[address] = olist

    def removeActiveConnection(self, address):
        del self.activeConns[address]

    def _parseRequest(self):
        debug("_parseRequest")
        try:
            # Parse out data and trim buffer accordingly
            ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])

            # Ensure we actually support the requested address type
            if self.addressType not in self.supportedAddrs:
                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
                return

            # Deal with addresses
            if self.addressType == ADDR_IPV4:
                addr, port = struct.unpack('!IH', self.buf[4:10])
                self.buf = self.buf[10:]
            elif self.addressType == ADDR_DOMAINNAME:            
                nlen = ord(self.buf[4])
                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
                self.buf = self.buf[7 + len(addr):]
            else:
                # Any other address types are not supported
                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
                return

            # Ensure command is supported
            if cmd not in self.enabledCommands:
                # Send a not supported error
                self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
                return

            # Process the command
            if cmd == CMD_CONNECT:
                self.connectRequested(addr, port)
            elif cmd == CMD_BIND:
                self.bindRequested(addr, port)
            else:
                # Any other command is not supported
                self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)

        except struct.error, why:
            return None

    def _makeRequest(self):
        debug("_makeRequest")
        self.state = STATE_TARGET_REQUEST
        sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest()
        request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0)
        self.transport.write(request)

    def _parseRequestReply(self):
        debug("_parseRequestReply")
        try:
            ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
            # Ensure we actually support the requested address type
            if self.addressType not in self.supportedAddrs:
                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
                return

            # Deal with addresses
            if self.addressType == ADDR_IPV4:
                addr, port = struct.unpack('!IH', self.buf[4:10])
                self.buf = self.buf[10:]
            elif self.addressType == ADDR_DOMAINNAME:            
                nlen = ord(self.buf[4])
                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
                self.buf = self.buf[7 + len(addr):]
            else:
                # Any other address types are not supported
                self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
                return

            # Ensure reply is OK
            if rep != REPLY_SUCCESS:
                self.loseConnection()
                return

            debug(_("Saving file in %s."), self.data["dest_path"])
            self.dest_file = open(self.data["dest_path"], 'w')
            self.state = STATE_TARGET_READY
            self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream)


        except struct.error, why:
            return None

    def connectionMade(self):
        debug("connectionMade (mode = %s)" % self.mode)
        self.host.registerProgressCB(self.transfert_id, self.getProgress)

        if self.mode == "target":
            self.state = STATE_TARGET_INITIAL
            self._startNegotiation()

    def connectRequested(self, addr, port):
        debug("connectRequested")
        # Check for special connect to the namespace -- this signifies that the client
        # is just checking to ensure it can connect to the streamhost
        if addr == "http://jabber.org/protocol/bytestreams":
            self.connectCompleted(addr, 0)
            self.transport.loseConnection()
            return
            
        # Save addr, for cleanup
        self.addr = addr
        
        # Check to see if the requested address is already
        # activated -- send an error if so
        if addr in self.activeConns:
            self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED)
            return

        # Add this address to the pending connections
        if self.addConnection(addr, self):
            self.connectCompleted(addr, 0)
            self.transport.stopReading()
        else:
            self.sendErrorReply(socks5.REPLY_CONN_REFUSED)

    def getProgress(self, data):
        """Fill data with position of current transfert"""
        try:
            data["position"] = str(self.dest_file.tell())
            data["size"] = self.filesize
        except (ValueError, AttributeError):
            pass

    def fileTransfered(self, d):
        info(_("File transfer completed, closing connection"))
        self.transport.loseConnection()
        try:
            self.dest_file.close()
        except:
            pass

    def updateTransfered(self, data):
        self.transfered+=len(data)
        return data

    def connectCompleted(self, remotehost, remoteport):
        debug("connectCompleted")
        if self.addressType == ADDR_IPV4:
            result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
        elif self.addressType == ADDR_DOMAINNAME:
            result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
                                 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
        self.transport.write(result)
        self.state = STATE_READY
        self.dest_file=open(self.filepath)
        d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered)
        d.addCallback(self.fileTransfered)
    
    def bindRequested(self, addr, port):
        pass
    
    def authenticateUserPass(self, user, passwd):
        debug("User/pass: %s/%s", user, passwd)
        return True

    def dataReceived(self, buf):
        if self.state == STATE_TARGET_READY:
            self.dest_file.write(buf)
            self.transfered+=len(buf)
            return

        self.buf = self.buf + buf
        if self.state == STATE_INITIAL:
            self._parseNegotiation()
        if self.state == STATE_AUTH_USERPASS:
            self._parseUserPass()
        if self.state == STATE_REQUEST:
            self._parseRequest()
        if self.state == STATE_TARGET_AUTH:
            ver, method = struct.unpack('!BB', buf)
            self.buf = self.buf[2:]
            if ver!=SOCKS5_VER or method!=AUTHMECH_ANON:
                self.transport.loseConnection()
            else:
                self._makeRequest()
        if self.state == STATE_TARGET_REQUEST:
            self._parseRequestReply()


    def clientConnectionLost(self, reason):
        debug("clientConnectionLost")
        self.transport.loseConnection()

    def connectionLost(self, reason):
        debug("connectionLost")
        self.host.removeProgressCB(self.transfert_id)
        if self.state == STATE_CONNECT_PENDING:
            self.removePendingConnection(self.addr, self)
        else:
            self.transport.unregisterProducer()
            if self.peersock != None:
                self.peersock.peersock = None
                self.peersock.transport.unregisterProducer()
                self.peersock = None
                self.removeActiveConnection(self.addr)

class Socks5ServerFactory(protocol.ServerFactory):
    protocol = SOCKSv5
    protocol.mode = "initiator"  #FIXME: Q&D way, fix it 


    def startedConnecting(self, connector):
        debug (_("Socks 5 server connection started"))

    def clientConnectionLost(self, connector, reason):
        debug (_("Socks 5 server connection lost (reason: %s)"), reason)

class Socks5ClientFactory(protocol.ClientFactory):
    protocol = SOCKSv5
    protocol.mode = "target"  #FIXME: Q&D way, fix it 

    def startedConnecting(self, connector):
        debug (_("Socks 5 client connection started"))

    def clientConnectionLost(self, connector, reason):
        debug (_("Socks 5 client connection lost (reason: %s)"), reason)


class XEP_0065():
    
    params = """
    <params>
    <general>
    <category name="File Transfert">
        <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
        <param name="Port" value="28915" type="string" />
    </category>
    </general>
    </params>
    """

    def __init__(self, host):
        info(_("Plugin XEP_0065 initialization"))
        self.host = host
        debug(_("registering"))
        self.server_factory = Socks5ServerFactory()
        self.server_factory.protocol.host = self.host #needed for progress CB
        self.client_factory = Socks5ClientFactory()

        #parameters
        host.memory.importParams(XEP_0065.params)
        host.memory.setDefault("IP", "File Transfert", self.getExternalIP)
        
        port = int(self.host.memory.getParamA("Port", "File Transfert"))
        info(_("Launching Socks5 Stream server on port %d"), port)
        reactor.listenTCP(port, self.server_factory)
    
    def getHandler(self, profile):
        return XEP_0065_handler(self)  
   
    def getExternalIP(self):
        """Return IP visible from outside, by asking to a website"""
        return getPage("http://www.goffi.org/sat_tools/get_ip.php")

    def setData(self, data, id):
        self.data = data
        self.transfert_id = id
        
    def sendFile(self, id, filepath, size):
        #lauching socks5 initiator
        debug(_("Launching socks5 initiator"))
        self.server_factory.protocol.mode = "initiator"
        self.server_factory.protocol.filepath = filepath
        self.server_factory.protocol.filesize = size
        self.server_factory.protocol.transfert_id = id

    def getFile(self, iq, profile_key='@DEFAULT@'):
        """Get file using byte stream"""
        client = self.host.getClient(profile_key)
        assert(client)
        iq.handled = True
        SI_elem = iq.firstChildElement()
        IQ_id = iq['id']
        for element in SI_elem.elements():
            if element.name == "streamhost":
                info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']})
                factory = self.client_factory
                self.server_factory.protocol.mode = "target"
                factory.protocol.host = self.host #needed for progress CB
                factory.protocol.xmlstream = client.xmlstream
                factory.protocol.data = self.data
                factory.protocol.transfert_id = self.transfert_id
                factory.protocol.filesize = self.data["size"]
                factory.protocol.sid = SI_elem['sid']
                factory.protocol.initiator_jid = element['jid']
                factory.protocol.target_jid = client.jid.full()
                factory.protocol.IQ_id = IQ_id
                factory.protocol.activateCB = self.activateStream
                reactor.connectTCP(element['host'], int(element['port']), factory)
                
    def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream):
        debug(_("activating stream"))
        result = domish.Element(('', 'iq'))
        result['type'] = 'result'
        result['id'] = IQ_id
        result['from'] = from_jid
        result['to'] = to_jid
        query = result.addElement('query', 'http://jabber.org/protocol/bytestreams')
        query['sid'] = sid
        streamhost = query.addElement('streamhost-used')
        streamhost['jid'] = to_jid  #FIXME: use real streamhost
        xmlstream.send(result)

class XEP_0065_handler(XMPPHandler):
    implements(iwokkel.IDisco)
    
    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent
        self.host = plugin_parent.host
    
    def connectionInitialized(self):
        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile)


    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
        return [disco.DiscoFeature(NS_BS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
        return []