#!/usr/bin/python
#-*- coding: utf-8 -*-
"""
SAT plugin for managing xep-0065
Copyright (C)
2002-2004 Dave Smith (dizzyd@jabber.org)
2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
2009, 2010 Jérôme Poisson (goffi@goffi.org)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
--
This program is based on proxy65 (http://code.google.com/p/proxy65),
originaly written by David Smith and modified by Fabio Forno.
It is sublicensed under GPL v3 (or any later version) as allowed by the original
license.
--
Here is a copy of the original license:
Copyright (C)
2002-2004 Dave Smith (dizzyd@jabber.org)
2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from logging import debug, info, error
from twisted.internet import protocol, reactor
from twisted.protocols.basic import FileSender
from twisted.words.xish import domish
from twisted.web.client import getPage
import struct
import urllib
import hashlib, pdb
from zope.interface import implements
try:
from twisted.words.protocols.xmlstream import XMPPHandler
except ImportError:
from wokkel.subprotocols import XMPPHandler
from wokkel import disco, iwokkel
IQ_SET = '/iq[@type="set"]'
NS_BS = 'http://jabber.org/protocol/bytestreams'
BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
PLUGIN_INFO = {
"name": "XEP 0065 Plugin",
"import_name": "XEP_0065",
"type": "XEP",
"protocols": ["XEP-0065"],
"main": "XEP_0065",
"handler": "yes",
"description": _("""Implementation of SOCKS5 Bytestreams""")
}
STATE_INITIAL = 0
STATE_AUTH = 1
STATE_REQUEST = 2
STATE_READY = 3
STATE_AUTH_USERPASS = 4
STATE_TARGET_INITIAL = 5
STATE_TARGET_AUTH = 6
STATE_TARGET_REQUEST = 7
STATE_TARGET_READY = 8
STATE_LAST = 9
STATE_CONNECT_PENDING = STATE_LAST + 1
SOCKS5_VER = 0x05
ADDR_IPV4 = 0x01
ADDR_DOMAINNAME = 0x03
ADDR_IPV6 = 0x04
CMD_CONNECT = 0x01
CMD_BIND = 0x02
CMD_UDPASSOC = 0x03
AUTHMECH_ANON = 0x00
AUTHMECH_USERPASS = 0x02
AUTHMECH_INVALID = 0xFF
REPLY_SUCCESS = 0x00
REPLY_GENERAL_FAILUR = 0x01
REPLY_CONN_NOT_ALLOWED = 0x02
REPLY_NETWORK_UNREACHABLE = 0x03
REPLY_HOST_UNREACHABLE = 0x04
REPLY_CONN_REFUSED = 0x05
REPLY_TTL_EXPIRED = 0x06
REPLY_CMD_NOT_SUPPORTED = 0x07
REPLY_ADDR_NOT_SUPPORTED = 0x08
class SOCKSv5(protocol.Protocol, FileSender):
def __init__(self):
debug(_("Protocol init"))
self.state = STATE_INITIAL
self.buf = ""
self.supportedAuthMechs = [ AUTHMECH_ANON ]
self.supportedAddrs = [ ADDR_DOMAINNAME ]
self.enabledCommands = [ CMD_CONNECT ]
self.peersock = None
self.addressType = 0
self.requestType = 0
self.activeConns = {}
self.pendingConns = {}
self.transfered = 0 #nb of bytes already copied
def _startNegotiation(self):
debug("_startNegotiation")
self.state = STATE_TARGET_AUTH
self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
def _parseNegotiation(self):
debug("_parseNegotiation")
try:
# Parse out data
ver, nmethod = struct.unpack('!BB', self.buf[:2])
methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod+2])
# Ensure version is correct
if ver != 5:
self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
self.transport.loseConnection()
return
# Trim off front of the buffer
self.buf = self.buf[nmethod+2:]
# Check for supported auth mechs
for m in self.supportedAuthMechs:
if m in methods:
# Update internal state, according to selected method
if m == AUTHMECH_ANON:
self.state = STATE_REQUEST
elif m == AUTHMECH_USERPASS:
self.state = STATE_AUTH_USERPASS
# Complete negotiation w/ this method
self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
return
# No supported mechs found, notify client and close the connection
self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
self.transport.loseConnection()
except struct.error:
pass
def _parseUserPass(self):
debug("_parseUserPass")
try:
# Parse out data
ver, ulen = struct.unpack('BB', self.buf[:2])
uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2])
plen, = struct.unpack('B', self.buf[ulen + 2])
password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen])
# Trim off fron of the buffer
self.buf = self.buf[3 + ulen + plen:]
# Fire event to authenticate user
if self.authenticateUserPass(uname, password):
# Signal success
self.state = STATE_REQUEST
self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00))
else:
# Signal failure
self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01))
self.transport.loseConnection()
except struct.error:
pass
def sendErrorReply(self, errorcode):
debug("sendErrorReply")
# Any other address types are not supported
result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
self.transport.write(result)
self.transport.loseConnection()
def addConnection(self, address, connection):
info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection})
olist = self.pendingConns.get(address, [])
if len(olist) <= 1:
olist.append(connection)
self.pendingConns[address] = olist
return True
else:
return False
def removePendingConnection(self, address, connection):
olist = self.pendingConns[address]
if len(olist) == 1:
del self.pendingConns[address]
else:
olist.remove(connection)
self.pendingConns[address] = olist
def removeActiveConnection(self, address):
del self.activeConns[address]
def _parseRequest(self):
debug("_parseRequest")
try:
# Parse out data and trim buffer accordingly
ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
# Ensure we actually support the requested address type
if self.addressType not in self.supportedAddrs:
self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
return
# Deal with addresses
if self.addressType == ADDR_IPV4:
addr, port = struct.unpack('!IH', self.buf[4:10])
self.buf = self.buf[10:]
elif self.addressType == ADDR_DOMAINNAME:
nlen = ord(self.buf[4])
addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
self.buf = self.buf[7 + len(addr):]
else:
# Any other address types are not supported
self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
return
# Ensure command is supported
if cmd not in self.enabledCommands:
# Send a not supported error
self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
return
# Process the command
if cmd == CMD_CONNECT:
self.connectRequested(addr, port)
elif cmd == CMD_BIND:
self.bindRequested(addr, port)
else:
# Any other command is not supported
self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
except struct.error, why:
return None
def _makeRequest(self):
debug("_makeRequest")
self.state = STATE_TARGET_REQUEST
sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest()
request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0)
self.transport.write(request)
def _parseRequestReply(self):
debug("_parseRequestReply")
try:
ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
# Ensure we actually support the requested address type
if self.addressType not in self.supportedAddrs:
self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
return
# Deal with addresses
if self.addressType == ADDR_IPV4:
addr, port = struct.unpack('!IH', self.buf[4:10])
self.buf = self.buf[10:]
elif self.addressType == ADDR_DOMAINNAME:
nlen = ord(self.buf[4])
addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
self.buf = self.buf[7 + len(addr):]
else:
# Any other address types are not supported
self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
return
# Ensure reply is OK
if rep != REPLY_SUCCESS:
self.loseConnection()
return
debug(_("Saving file in %s."), self.data["dest_path"])
self.dest_file = open(self.data["dest_path"], 'w')
self.state = STATE_TARGET_READY
self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream)
except struct.error, why:
return None
def connectionMade(self):
debug("connectionMade (mode = %s)" % self.mode)
self.host.registerProgressCB(self.transfert_id, self.getProgress)
if self.mode == "target":
self.state = STATE_TARGET_INITIAL
self._startNegotiation()
def connectRequested(self, addr, port):
debug("connectRequested")
# Check for special connect to the namespace -- this signifies that the client
# is just checking to ensure it can connect to the streamhost
if addr == "http://jabber.org/protocol/bytestreams":
self.connectCompleted(addr, 0)
self.transport.loseConnection()
return
# Save addr, for cleanup
self.addr = addr
# Check to see if the requested address is already
# activated -- send an error if so
if addr in self.activeConns:
self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED)
return
# Add this address to the pending connections
if self.addConnection(addr, self):
self.connectCompleted(addr, 0)
self.transport.stopReading()
else:
self.sendErrorReply(socks5.REPLY_CONN_REFUSED)
def getProgress(self, data):
"""Fill data with position of current transfert"""
data["size"] = self.filesize
try:
data["position"] = str(self.dest_file.tell())
except (ValueError, AttributeError):
data["position"] = ""
def fileTransfered(self, d):
info(_("File transfer completed, closing connection"))
self.transport.loseConnection()
def updateTransfered(self, data):
self.transfered+=len(data)
return data
def connectCompleted(self, remotehost, remoteport):
debug("connectCompleted")
if self.addressType == ADDR_IPV4:
result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
elif self.addressType == ADDR_DOMAINNAME:
result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
self.transport.write(result)
self.state = STATE_READY
self.dest_file=open(self.filepath)
d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered)
d.addCallback(self.fileTransfered)
def bindRequested(self, addr, port):
pass
def authenticateUserPass(self, user, passwd):
debug("User/pass: %s/%s", user, passwd)
return True
def dataReceived(self, buf):
if self.state == STATE_TARGET_READY:
self.dest_file.write(buf)
self.transfered+=len(buf)
return
self.buf = self.buf + buf
if self.state == STATE_INITIAL:
self._parseNegotiation()
if self.state == STATE_AUTH_USERPASS:
self._parseUserPass()
if self.state == STATE_REQUEST:
self._parseRequest()
if self.state == STATE_TARGET_AUTH:
ver, method = struct.unpack('!BB', buf)
self.buf = self.buf[2:]
if ver!=SOCKS5_VER or method!=AUTHMECH_ANON:
self.transport.loseConnection()
else:
self._makeRequest()
if self.state == STATE_TARGET_REQUEST:
self._parseRequestReply()
def clientConnectionLost(self, reason):
debug("clientConnectionLost")
self.transport.loseConnection()
def connectionLost(self, reason):
debug("connectionLost")
self.host.removeProgressCB(self.transfert_id)
if self.state == STATE_CONNECT_PENDING:
self.removePendingConnection(self.addr, self)
else:
self.transport.unregisterProducer()
if self.peersock != None:
self.peersock.peersock = None
self.peersock.transport.unregisterProducer()
self.peersock = None
self.removeActiveConnection(self.addr)
class Socks5ServerFactory(protocol.ServerFactory):
protocol = SOCKSv5
protocol.mode = "initiator" #FIXME: Q&D way, fix it
def startedConnecting(self, connector):
debug (_("Socks 5 server connection started"))
def clientConnectionLost(self, connector, reason):
debug (_("Socks 5 server connection lost (reason: %s)"), reason)
class Socks5ClientFactory(protocol.ClientFactory):
protocol = SOCKSv5
protocol.mode = "target" #FIXME: Q&D way, fix it
def startedConnecting(self, connector):
debug (_("Socks 5 client connection started"))
def clientConnectionLost(self, connector, reason):
debug (_("Socks 5 client connection lost (reason: %s)"), reason)
class XEP_0065():
params = """
"""
def __init__(self, host):
info(_("Plugin XEP_0065 initialization"))
self.host = host
debug(_("registering"))
self.server_factory = Socks5ServerFactory()
self.server_factory.protocol.host = self.host #needed for progress CB
self.client_factory = Socks5ClientFactory()
#parameters
host.memory.importParams(XEP_0065.params)
host.memory.setDefault("IP", "File Transfert", self.getExternalIP)
port = int(self.host.memory.getParamA("Port", "File Transfert"))
info(_("Launching Socks5 Stream server on port %d"), port)
reactor.listenTCP(port, self.server_factory)
def getHandler(self):
return XEP_0065_handler(self)
def getExternalIP(self):
"""Return IP visible from outside, by asking to a website"""
return getPage("http://www.goffi.org/sat_tools/get_ip.php")
def setData(self, data, id):
self.data = data
self.transfert_id = id
def sendFile(self, id, filepath, size):
#lauching socks5 initiator
debug(_("Launching socks5 initiator"))
self.server_factory.protocol.mode = "initiator"
self.server_factory.protocol.filepath = filepath
self.server_factory.protocol.filesize = size
self.server_factory.protocol.transfert_id = id
def getFile(self, iq, profile_key='@DEFAULT@'):
"""Get file using byte stream"""
client = self.host.getClient(profile_key)
assert(client)
iq.handled = True
SI_elem = iq.firstChildElement()
IQ_id = iq['id']
for element in SI_elem.elements():
if element.name == "streamhost":
info (_("Stream proposed: host=[%(host)s] port=[%(post)s]") % {'host':element['host'], 'port':element['port']})
factory = self.client_factory
self.server_factory.protocol.mode = "target"
factory.protocol.host = self.host #needed for progress CB
factory.protocol.xmlstream = client.xmlstream
factory.protocol.data = self.data
factory.protocol.transfert_id = self.transfert_id
factory.protocol.filesize = self.data["size"]
factory.protocol.sid = SI_elem['sid']
factory.protocol.initiator_jid = element['jid']
factory.protocol.target_jid = client.jid.full()
factory.protocol.IQ_id = IQ_id
factory.protocol.activateCB = self.activateStream
reactor.connectTCP(element['host'], int(element['port']), factory)
def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream):
debug(_("activating stream"))
result = domish.Element(('', 'iq'))
result['type'] = 'result'
result['id'] = IQ_id
result['from'] = from_jid
result['to'] = to_jid
query = result.addElement('query', 'http://jabber.org/protocol/bytestreams')
query['sid'] = sid
streamhost = query.addElement('streamhost-used')
streamhost['jid'] = to_jid #FIXME: use real streamhost
xmlstream.send(result)
class XEP_0065_handler(XMPPHandler):
implements(iwokkel.IDisco)
def __init__(self, plugin_parent):
self.plugin_parent = plugin_parent
self.host = plugin_parent.host
def connectionInitialized(self):
self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile)
def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
return [disco.DiscoFeature(NS_BS)]
def getDiscoItems(self, requestor, target, nodeIdentifier=''):
return []