Mercurial > libervia-backend
view src/plugins/plugin_xep_0065.py @ 1525:49d33cb48207
plugin file: first draft:
This plugin will be used to manage file transfers, and choose the best available method.
It embed a class dedicated to files: SatFile, which act as a file objects but manage some high level stuffs (like progression).
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 25 Sep 2015 19:19:12 +0200 |
parents | 3265a2639182 |
children | 7cc29634b6ef |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- # SAT plugin for managing xep-0065 # Copyright (C) # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) # 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # -- # This program is based on proxy65 (http://code.google.com/p/proxy65), # originaly written by David Smith and modified by Fabio Forno. # It is sublicensed under AGPL v3 (or any later version) as allowed by the original # license. # -- # Here is a copy of the original license: # Copyright (C) # 2002-2004 Dave Smith (dizzyd@jabber.org) # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) from twisted.internet import protocol, reactor from twisted.internet import error from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.protocols.basic import FileSender from twisted.words.xish import domish from twisted.web.client import getPage from sat.core.exceptions import ProfileNotInCacheError import struct import hashlib from zope.interface import implements try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler from wokkel import disco, iwokkel IQ_SET = '/iq[@type="set"]' NS_BS = 'http://jabber.org/protocol/bytestreams' BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' TIMEOUT = 60 # timeout for workflow PLUGIN_INFO = { "name": "XEP 0065 Plugin", "import_name": "XEP-0065", "type": "XEP", "protocols": ["XEP-0065"], "main": "XEP_0065", "handler": "yes", "description": _("""Implementation of SOCKS5 Bytestreams""") } STATE_INITIAL = 0 STATE_AUTH = 1 STATE_REQUEST = 2 STATE_READY = 3 STATE_AUTH_USERPASS = 4 STATE_TARGET_INITIAL = 5 STATE_TARGET_AUTH = 6 STATE_TARGET_REQUEST = 7 STATE_TARGET_READY = 8 STATE_LAST = 9 STATE_CONNECT_PENDING = STATE_LAST + 1 SOCKS5_VER = 0x05 ADDR_IPV4 = 0x01 ADDR_DOMAINNAME = 0x03 ADDR_IPV6 = 0x04 CMD_CONNECT = 0x01 CMD_BIND = 0x02 CMD_UDPASSOC = 0x03 AUTHMECH_ANON = 0x00 AUTHMECH_USERPASS = 0x02 AUTHMECH_INVALID = 0xFF REPLY_SUCCESS = 0x00 REPLY_GENERAL_FAILUR = 0x01 REPLY_CONN_NOT_ALLOWED = 0x02 REPLY_NETWORK_UNREACHABLE = 0x03 REPLY_HOST_UNREACHABLE = 0x04 REPLY_CONN_REFUSED = 0x05 REPLY_TTL_EXPIRED = 0x06 REPLY_CMD_NOT_SUPPORTED = 0x07 REPLY_ADDR_NOT_SUPPORTED = 0x08 def calculateHash(from_jid, to_jid, sid): """Calculate SHA1 Hash according to XEP-0065 @param from_jid: jid of the requester @param to_jid: jid of the target @param sid: session id @return: hash (string)""" return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() class SOCKSv5(protocol.Protocol, FileSender): def __init__(self): log.debug(_("Protocol init")) self.state = STATE_INITIAL self.buf = "" self.supportedAuthMechs = [AUTHMECH_ANON] self.supportedAddrs = [ADDR_DOMAINNAME] self.enabledCommands = [CMD_CONNECT] self.peersock = None self.addressType = 0 self.requestType = 0 def _startNegotiation(self): log.debug("_startNegotiation") self.state = STATE_TARGET_AUTH self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): log.debug("_parseNegotiation") try: # Parse out data ver, nmethod = struct.unpack('!BB', self.buf[:2]) methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) # Ensure version is correct if ver != 5: self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() return # Trim off front of the buffer self.buf = self.buf[nmethod + 2:] # Check for supported auth mechs for m in self.supportedAuthMechs: if m in methods: # Update internal state, according to selected method if m == AUTHMECH_ANON: self.state = STATE_REQUEST elif m == AUTHMECH_USERPASS: self.state = STATE_AUTH_USERPASS # Complete negotiation w/ this method self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) return # No supported mechs found, notify client and close the connection self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() except struct.error: pass def _parseUserPass(self): log.debug("_parseUserPass") try: # Parse out data ver, ulen = struct.unpack('BB', self.buf[:2]) uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) plen, = struct.unpack('B', self.buf[ulen + 2]) password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) # Trim off fron of the buffer self.buf = self.buf[3 + ulen + plen:] # Fire event to authenticate user if self.authenticateUserPass(uname, password): # Signal success self.state = STATE_REQUEST self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) else: # Signal failure self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) self.transport.loseConnection() except struct.error: pass def sendErrorReply(self, errorcode): log.debug("sendErrorReply") # Any other address types are not supported result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) self.transport.loseConnection() def _parseRequest(self): log.debug("_parseRequest") try: # Parse out data and trim buffer accordingly ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure command is supported if cmd not in self.enabledCommands: # Send a not supported error self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) return # Process the command if cmd == CMD_CONNECT: self.connectRequested(addr, port) elif cmd == CMD_BIND: self.bindRequested(addr, port) else: # Any other command is not supported self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) except struct.error: return None def _makeRequest(self): log.debug("_makeRequest") self.state = STATE_TARGET_REQUEST sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) self.transport.write(request) def _parseRequestReply(self): log.debug("_parseRequestReply") try: ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses if self.addressType == ADDR_IPV4: addr, port = struct.unpack('!IH', self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) self.buf = self.buf[7 + len(addr):] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure reply is OK if rep != REPLY_SUCCESS: self.loseConnection() return if self.factory.proxy: self.state = STATE_READY self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) else: self.state = STATE_TARGET_READY self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error: return None def connectionMade(self): log.debug(u"connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") if isinstance(self.factory, Socks5ClientFactory): self.sid = self.factory.sid self.profile = self.factory.profile self.data = self.factory.data self.state = STATE_TARGET_INITIAL self._startNegotiation() def connectRequested(self, addr, port): log.debug("connectRequested") # Check that this session is expected if addr not in self.factory.hash_sid_map: #no: we refuse it self.sendErrorReply(REPLY_CONN_REFUSED) return self.sid, self.profile = self.factory.hash_sid_map[addr] client = self.factory.host.getClient(self.profile) client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) self.transport.stopReading() def startTransfer(self, file_obj): """Callback called when the result iq is received""" d = self.beginFileTransfer(file_obj, self.transport) d.addCallback(self.fileTransfered) def fileTransfered(self, d): log.info(_("File transfer completed, closing connection")) self.transport.loseConnection() self.factory.finishedCb(self.sid, True, self.profile) def connectCompleted(self, remotehost, remoteport): log.debug("connectCompleted") if self.addressType == ADDR_IPV4: result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) elif self.addressType == ADDR_DOMAINNAME: result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) self.transport.write(result) self.state = STATE_READY def bindRequested(self, addr, port): pass def authenticateUserPass(self, user, passwd): # FIXME: implement authentication and remove the debug printing a password log.debug(u"User/pass: %s/%s" % (user, passwd)) return True def dataReceived(self, buf): if self.state == STATE_TARGET_READY: self.data["file_obj"].write(buf) return self.buf = self.buf + buf if self.state == STATE_INITIAL: self._parseNegotiation() if self.state == STATE_AUTH_USERPASS: self._parseUserPass() if self.state == STATE_REQUEST: self._parseRequest() if self.state == STATE_TARGET_AUTH: ver, method = struct.unpack('!BB', buf) self.buf = self.buf[2:] if ver != SOCKS5_VER or method != AUTHMECH_ANON: self.transport.loseConnection() else: self._makeRequest() if self.state == STATE_TARGET_REQUEST: self._parseRequestReply() def clientConnectionLost(self, reason): log.debug("clientConnectionLost") self.transport.loseConnection() def connectionLost(self, reason): log.debug("connectionLost") if self.state != STATE_CONNECT_PENDING: self.transport.unregisterProducer() if self.peersock is not None: self.peersock.peersock = None self.peersock.transport.unregisterProducer() self.peersock = None class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 def __init__(self, host, hash_sid_map, finishedCb): self.host = host self.hash_sid_map = hash_sid_map self.finishedCb = finishedCb def startedConnecting(self, connector): log.debug(_("Socks 5 server connection started")) def clientConnectionLost(self, connector, reason): log.debug(_(u"Socks 5 server connection lost (reason: %s)") % reason) class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): """Init the Client Factory @param current_stream: current streams data @param sid: Session ID @param iq_id: iq id used to initiate the stream @param activateCb: method to call to activate the stream @param finishedCb: method to call when the stream session is finished @param proxy: True if we are connecting throught a proxy (and we are a requester) @param profile: %(doc_profile)s""" assert(profile) self.data = current_stream[sid] self.sid = sid self.iq_id = iq_id self.activateCb = activateCb self.finishedCb = finishedCb self.proxy = proxy self.profile = profile def startedConnecting(self, connector): log.debug(_("Socks 5 client connection started")) def clientConnectionLost(self, connector, reason): log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason) self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful class XEP_0065(object): NAMESPACE = NS_BS params = """ <params> <general> <category name="File Transfer"> <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> <param name="Port" value="28915" type="int" constraint="1;65535" /> </category> </general> <individual> <category name="File Transfer"> <param name="Proxy" value="" type="string" /> <param name="Proxy host" value="" type="string" /> <param name="Proxy port" value="" type="int" constraint="1;65535" /> </category> </individual> </params> """ def __init__(self, host): log.info(_("Plugin XEP_0065 initialization")) #session data self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile) self.host = host log.debug(_("registering")) self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) #parameters host.memory.updateParams(XEP_0065.params) host.memory.setDefault("IP", "File Transfer", self.getExternalIP) port = int(self.host.memory.getParamA("Port", "File Transfer")) log.info(_("Launching Socks5 Stream server on port %d") % port) reactor.listenTCP(port, self.server_factory) def getHandler(self, profile): return XEP_0065_handler(self) def profileConnected(self, profile): client = self.host.getClient(profile) client.xep_0065_current_stream = {} # key: stream_id, value: data(dict) def getExternalIP(self): """Return IP visible from outside, by asking to a website""" return getPage("http://www.goffi.org/sat_tools/get_ip.php") def getProgress(self, sid, data, profile): """Fill data with position of current transfer""" client = self.host.getClient(profile) try: file_obj = client.xep_0065_current_stream[sid]["file_obj"] data["position"] = str(file_obj.tell()) data["size"] = str(client.xep_0065_current_stream[sid]["size"]) except: pass def _timeOut(self, sid, profile): """Delecte current_stream id, called after timeout @param id: id of client.xep_0065_current_stream""" log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") % {"sid": sid, "profile": profile}) self._killId(sid, False, "TIMEOUT", profile) def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): """Delete an current_stream id, clean up associated observers @param sid: id of client.xep_0065_current_stream""" assert(profile) client = self.host.getClient(profile) if sid not in client.xep_0065_current_stream: log.warning(_("kill id called on a non existant id")) return if "observer_cb" in client.xep_0065_current_stream[sid]: xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) if client.xep_0065_current_stream[sid]['timer'].active(): client.xep_0065_current_stream[sid]['timer'].cancel() if "size" in client.xep_0065_current_stream[sid]: self.host.removeProgressCB(sid, profile) file_obj = client.xep_0065_current_stream[sid]['file_obj'] success_cb = client.xep_0065_current_stream[sid]['success_cb'] failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] session_hash = client.xep_0065_current_stream[sid].get('hash') del client.xep_0065_current_stream[sid] if session_hash in self.hash_sid_map: #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). del self.hash_sid_map[session_hash] if success: success_cb(sid, file_obj, NS_BS, profile) else: failure_cb(sid, file_obj, NS_BS, failure_reason, profile) def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @param sid: Stream session id @param length: number of byte to send, or None to send until the end @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s""" assert(profile) client = self.host.getClient(profile) if length is not None: log.error(_('stream length not managed yet')) return profile_jid = client.jid xmlstream = client.xmlstream data = client.xep_0065_current_stream[sid] = {} data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["file_obj"] = file_obj data["from"] = profile_jid data["to"] = to_jid data["success_cb"] = successCb data["failure_cb"] = failureCb data["xmlstream"] = xmlstream data["hash"] = calculateHash(profile_jid, to_jid, sid) self.hash_sid_map[data["hash"]] = (sid, profile) if size: data["size"] = size self.host.registerProgressCB(sid, self.getProgress, profile) iq_elt = jabber_client.IQ(xmlstream, 'set') iq_elt["from"] = profile_jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement('query', NS_BS) query_elt['mode'] = 'tcp' query_elt['sid'] = sid #first streamhost: direct connection streamhost = query_elt.addElement('streamhost') streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") streamhost['jid'] = profile_jid.full() #second streamhost: mediated connection, using proxy streamhost = query_elt.addElement('streamhost') streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) iq_elt.addCallback(self.iqResult, sid, profile) iq_elt.send() def iqResult(self, sid, profile, iq_elt): """Called when the result of open iq is received""" if iq_elt["type"] == "error": log.warning(_("Transfer failed")) return client = self.host.getClient(profile) try: data = client.xep_0065_current_stream[sid] file_obj = data["file_obj"] timer = data["timer"] except KeyError: log.error(_("Internal error, can't do transfer")) return if timer.active(): timer.cancel() profile_jid, xmlstream = self.host.getJidNStream(profile) query_elt = iq_elt.firstChildElement() streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) if not streamhost_elts: log.warning(_("No streamhost found in stream query")) return streamhost_jid = streamhost_elts[0]['jid'] if streamhost_jid != profile_jid.full(): log.debug(_("A proxy server is used")) proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) if proxy_jid != streamhost_jid: log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) return factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) reactor.connectTCP(proxy_host, int(proxy_port), factory) else: data["start_transfer_cb"](file_obj) # We now activate the stream def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): log.debug(_("activating stream")) client = self.host.getClient(profile) data = client.xep_0065_current_stream[sid] profile_jid, xmlstream = self.host.getJidNStream(profile) iq_elt = client.IQ(xmlstream, 'set') iq_elt["from"] = profile_jid.full() iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) query_elt = iq_elt.addElement('query', NS_BS) query_elt['sid'] = sid query_elt.addElement('activate', content=data['to'].full()) iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) iq_elt.send() def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): if iq_elt['type'] == 'error': log.warning(_("Can't activate the proxy stream")) return else: start_transfer_cb(file_obj) def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): """Called when a bytestream is imminent @param from_jid: jid of the sender @param sid: Stream id @param file_obj: File object where data will be written @param size: full size of the data, or None if unknown @param success_cb: method to call when successfuly finished @param failure_cb: method to call when something goes wrong @param profile: %(doc_profile)s""" client = self.host.getClient(profile) data = client.xep_0065_current_stream[sid] = {} data["from"] = from_jid data["file_obj"] = file_obj data["seq"] = -1 if size: data["size"] = size self.host.registerProgressCB(sid, self.getProgress, profile) data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["success_cb"] = success_cb data["failure_cb"] = failure_cb def streamQuery(self, iq_elt, profile): """Get file using byte stream""" log.debug(_("BS stream query")) client = self.host.getClient(profile) if not client: raise ProfileNotInCacheError xmlstream = client.xmlstream iq_elt.handled = True query_elt = iq_elt.firstChildElement() sid = query_elt.getAttribute("sid") streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) if not sid in client.xep_0065_current_stream: log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) return client.xep_0065_current_stream[sid]['timer'].cancel() client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream if not streamhost_elts: log.warning(_(u"No streamhost found in stream query %s" % sid)) self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case sh_host = streamhost_elt.getAttribute("host") sh_port = streamhost_elt.getAttribute("port") sh_jid = streamhost_elt.getAttribute("jid") if not sh_host or not sh_port or not sh_jid: log.warning(_("incomplete streamhost element")) self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) reactor.connectTCP(sh_host, int(sh_port), factory) def activateStream(self, sid, iq_id, profile): client = self.host.getClient(profile) log.debug(_("activating stream")) result = domish.Element((None, 'iq')) data = client.xep_0065_current_stream[sid] result['type'] = 'result' result['id'] = iq_id result['from'] = data["to"].full() result['to'] = data["from"].full() query = result.addElement('query', NS_BS) query['sid'] = sid streamhost = query.addElement('streamhost-used') streamhost['jid'] = data["streamhost"][2] data["xmlstream"].send(result) def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): """Not acceptable error used when the stream is not expected or something is going wrong @param iq_id: IQ id @param to_jid: addressee @param xmlstream: XML stream to use to send the error""" result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = iq_id result['to'] = to_jid error_el = result.addElement('error') error_el['type'] = 'modify' error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) xmlstream.send(result) def sendBadRequestError(self, iq_id, to_jid, xmlstream): """Not acceptable error used when the stream is not expected or something is going wrong @param iq_id: IQ id @param to_jid: addressee @param xmlstream: XML stream to use to send the error""" result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = iq_id result['to'] = to_jid error_el = result.addElement('error') error_el['type'] = 'cancel' error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) xmlstream.send(result) class XEP_0065_handler(XMPPHandler): implements(iwokkel.IDisco) def __init__(self, plugin_parent): self.plugin_parent = plugin_parent self.host = plugin_parent.host def _proxyDataResult(self, iq_elt): """Called with the information about proxy according to XEP-0065 #4 Params should be filled with these infos""" if iq_elt["type"] == "error": log.warning(_("Can't determine proxy information")) return query_elt = iq_elt.firstChildElement() if query_elt.name != "query": log.warning(_("Bad answer received from proxy")) return streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) if not streamhost_elts: log.warning(_("No streamhost found in stream query")) return if len(streamhost_elts) != 1: log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) streamhost_elt = streamhost_elts[0] self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""), "File Transfer", profile_key=self.parent.profile) self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""), "File Transfer", profile_key=self.parent.profile) self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port", ""), "File Transfer", profile_key=self.parent.profile) def connectionInitialized(self): def connection_ok(dummy): self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=self.parent.profile) if not proxy: def proxiesFound(entities): try: proxy_ent = entities.pop() except KeyError: log.info(_("No proxy found on this server")) return iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get') iq_elt["to"] = proxy_ent.full() iq_elt.addElement('query', NS_BS) iq_elt.addCallback(self._proxyDataResult) iq_elt.send() d = self.host.findServiceEntities("proxy", "bytestreams", profile_key=self.parent.profile) d.addCallback(proxiesFound) self.parent.getConnectionDeferred().addCallback(connection_ok) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_BS)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []