Mercurial > libervia-backend
changeset 1577:d04d7402b8e9
plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation:
/!\ range is not working yet
/!\ pipe plugin is broken for now
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 Nov 2015 18:19:49 +0100 |
parents | d5f59ba166fe |
children | 7fef6cdf5953 |
files | src/plugins/plugin_xep_0020.py src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0095.py src/plugins/plugin_xep_0096.py |
diffstat | 4 files changed, 535 insertions(+), 551 deletions(-) [+] |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0020.py Wed Nov 11 18:19:49 2015 +0100 +++ b/src/plugins/plugin_xep_0020.py Wed Nov 11 18:19:49 2015 +0100 @@ -20,7 +20,7 @@ from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) -from twisted.words.protocols.jabber import client, jid +from core import exceptions from twisted.words.xish import domish from zope.interface import implements @@ -55,53 +55,93 @@ def getFeatureElt(self, elt): """Check element's children to find feature elements - @param elt: domish.Element - @return: feature elements""" - return [child for child in elt.elements() if child.name == 'feature'] + + @param elt(domish.Element): parent element of the feature element + @return: feature elements + @raise exceptions.NotFound: no feature element found + """ + try: + feature_elt = elt.elements(NS_FEATURE_NEG, 'feature').next() + except StopIteration: + raise exceptions.NotFound + return feature_elt + + def _getForm(self, elt, namespace): + """Return the first child data form - def getChoosedOptions(self, elt): + @param elt(domish.Element): parent of the data form + @param namespace (None, unicode): form namespace or None to ignore + @return (None, data_form.Form): data form or None is nothing is found + """ + if namespace is None: + try: + form_elt = elt.elements(data_form.NS_X_DATA).next() + except StopIteration: + return None + else: + return data_form.Form.fromElement(form_elt) + else: + return data_form.findForm(elt, namespace) + + def getChoosedOptions(self, feature_elt, namespace): """Return choosed feature for feature element - @param elt: feature domish element - @return: dict with feature name as key, and choosed option as value""" - form = data_form.Form.fromElement(elt.firstChildElement()) + + @param feature_elt(domish.Element): feature domish element + @param namespace (None, unicode): form namespace or None to ignore + @return (dict): feature name as key, and choosed option as value + @raise exceptions.NotFound: not data form is found + """ + form = self._getForm(feature_elt, namespace) + if form is None: + raise exceptions.NotFound result = {} for field in form.fields: values = form.fields[field].values result[field] = values[0] if values else None if len(values) > 1: - log.warning(_(u"More than one value choosed for %s, keeping the first one") % field) + log.warning(_(u"More than one value choosed for {}, keeping the first one").format(field)) return result - def negociate(self, feature_elt, form_type, negociable_values): - """Negociate the feature options - @param feature_elt: feature domish element - @param form_type: the option to negociate - @param negociable_values: acceptable values for this negociation""" - form = data_form.Form.fromElement(feature_elt.firstChildElement()) - options = [option.value for option in form.fields[form_type].options] - for value in negociable_values: + def negotiate(self, feature_elt, name, negotiable_values, namespace): + """Negotiate the feature options + + @param feature_elt(domish.Element): feature element + @param name: the option name (i.e. field's var attribute) to negotiate + @param negotiable_values(iterable): acceptable values for this negotiation + first corresponding value will be returned + @param namespace (None, unicode): form namespace or None to ignore + @raise KeyError: name is not found in data form fields + """ + form = self._getForm(feature_elt, namespace) + options = [option.value for option in form.fields[name].options] + for value in negotiable_values: if value in options: return value return None - def chooseOption(self, options_dict): + def chooseOption(self, options, namespace): """Build a feature element with choosed options - @param options_dict: dict with feature as key and choosed option as value""" + + @param options(dict): dict with feature as key and choosed option as value + @param namespace (None, unicode): form namespace or None to ignore + """ feature_elt = domish.Element((NS_FEATURE_NEG, 'feature')) - x_form = data_form.Form('submit') - x_form.makeFields(options_dict) + x_form = data_form.Form('submit', formNamespace=namespace) + x_form.makeFields(options) feature_elt.addChild(x_form.toElement()) return feature_elt - def proposeFeatures(self, options_dict, namespace=None): + def proposeFeatures(self, options_dict, namespace): """Build a feature element with options to propose - @param options_dict: dict with feature as key and list of acceptable options as value - @param namespace: feature namespace""" + + @param options_dict(dict): dict with feature as key and iterable of acceptable options as value + @param namespace(None, unicode): feature namespace + """ feature_elt = domish.Element((NS_FEATURE_NEG, 'feature')) x_form = data_form.Form('form', formNamespace=namespace) for field in options_dict: x_form.addField(data_form.Field('list-single', field, - options=[data_form.Option(_option) for _option in options_dict[field]])) + options=[data_form.Option(option) for option in options_dict[field]])) feature_elt.addChild(x_form.toElement()) return feature_elt
--- a/src/plugins/plugin_xep_0065.py Wed Nov 11 18:19:49 2015 +0100 +++ b/src/plugins/plugin_xep_0065.py Wed Nov 11 18:19:49 2015 +0100 @@ -63,13 +63,12 @@ from twisted.internet import protocol from twisted.internet import reactor from twisted.internet import error as internet_error -from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.words.protocols.jabber import error as jabber_error +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import xmlstream from twisted.protocols.basic import FileSender -from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure -from sat.core.exceptions import ProfileNotInCacheError from collections import namedtuple import struct import hashlib @@ -432,7 +431,6 @@ return None def _makeRequest(self): - # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) hash_ = self._session_hash request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) self.transport.write(request) @@ -464,13 +462,8 @@ self.loseConnection() return - # if self.factory.proxy: - # self.state = STATE_READY - # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) - # else: self.state = STATE_READY self.connection.callback(None) - # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error: # The buffer is probably not complete, we need to wait more @@ -489,9 +482,6 @@ .format(host=self.transport.getPeer().host)) return self._session_hash = addr - # self.sid, self.profile = self.factory.hash_profiles_map[addr] - # client = self.factory.host.getClient(self.profile) - # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) def startTransfer(self): @@ -546,11 +536,6 @@ def connectionLost(self, reason): log.debug(u"Socks5 connection lost: {}".format(reason.value)) - # self.transport.unregisterProducer() - # if self.peersock is not None: - # self.peersock.peersock = None - # self.peersock.transport.unregisterProducer() - # self.peersock = None if self.state != STATE_READY: self.connection.errback(reason) if self.server_mode : @@ -620,7 +605,6 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): def __init__(self, parent, session_hash, profile): """Init the Client Factory @@ -635,13 +619,6 @@ self._protocol_instance = None self.connector = None self._discarded = False - # self.data = stream_data[sid] - # self.sid = sid - # self.iq_id = iq_id - # self.activateCb = activateCb - # self.finishedCb = finishedCb - # self.proxy = proxy - # self.profile = profile def discard(self): """Disconnect the client @@ -671,7 +648,6 @@ self.getSession()[DEFER_KEY].callback(None) else: self.getSession()[DEFER_KEY].errback(reason) - # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful def buildProtocol(self, addr): log.debug(("Socks 5 client connection started")) @@ -719,7 +695,7 @@ def profileConnected(self, profile): client = self.host.getClient(profile) - client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) + client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) client._s5b_sessions = {} def getSessionHash(self, from_jid, to_jid, sid): @@ -776,7 +752,7 @@ notFound(server) iq_elt = client.IQ('get') iq_elt['to'] = proxy.full() - iq_elt.addElement('query', NS_BS) + iq_elt.addElement((NS_BS, 'query')) try: result_elt = yield iq_elt.send() @@ -914,6 +890,14 @@ return defers_list def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): + """Get best candidate (according to priority) which can connect + + @param candidates(iterable[Candidate]): candidates to test + @param session_hash(unicode): hash of the session + hash is the same as hostname computer in XEP-0065 ยง 5.3.2 #1 + @param profile: %(doc_profile)s + @return (D(None, Candidate)): best candidate or None if none can connect + """ defer_candidates = None def connectionCb(candidate, profile): @@ -947,7 +931,7 @@ def _timeOut(self, sid, client): """Delecte current_stream id, called after timeout - @param id: id of client.xep_0065_current_stream""" + @param id: id of client.xep_0065_sid_session""" log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( sid=sid, profile=client.profile)) self._killSession(sid, client, u"TIMEOUT") @@ -961,7 +945,7 @@ else, will be used to call failure_cb """ try: - session = client.xep_0065_current_stream[sid] + session = client.xep_0065_sid_session[sid] except KeyError: log.warning(_("kill id called on a non existant id")) return @@ -976,7 +960,7 @@ if session['timer'].active(): session['timer'].cancel() - del client.xep_0065_current_stream[sid] + del client.xep_0065_sid_session[sid] # FIXME: to check try: @@ -1004,105 +988,73 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s + @return (D): Deferred fired when session is finished """ client = self.host.getClient(profile) - session_data = self._createSession(file_obj, to_jid, sid, client.profile) - - session_data["to"] = to_jid - session_data["xmlstream"] = client.xmlstream - hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) + session_data = self._createSession(file_obj, to_jid, sid, True, client.profile) - self.hash_profiles_map[hash_] = (sid, profile) - - iq_elt = jabber_client.IQ(client.xmlstream, 'set') - iq_elt["from"] = client.jid.full() - iq_elt["to"] = to_jid.full() - query_elt = iq_elt.addElement('query', NS_BS) - query_elt['mode'] = 'tcp' - query_elt['sid'] = sid + session_data[client] = client - #first streamhost: direct connection - streamhost = query_elt.addElement('streamhost') - streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") - streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") - streamhost['jid'] = client.jid.full() + def gotCandidates(candidates): + session_data['candidates'] = candidates + iq_elt = client.IQ() + iq_elt["from"] = client.jid.full() + iq_elt["to"] = to_jid.full() + query_elt = iq_elt.addElement((NS_BS, 'query')) + query_elt['mode'] = 'tcp' + query_elt['sid'] = sid - #second streamhost: mediated connection, using proxy - streamhost = query_elt.addElement('streamhost') - streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) - streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) - streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + for candidate in candidates: + streamhost = query_elt.addElement('streamhost') + streamhost['host'] = candidate.host + streamhost['port'] = str(candidate.port) + streamhost['jid'] = candidate.jid.full() - iq_elt.addCallback(self._IQOpen, session_data, client) - iq_elt.send() + d = iq_elt.send() + args = [session_data, client] + d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) + + self.getCandidates(profile).addCallback(gotCandidates) return session_data[DEFER_KEY] - def _IQOpen(self, session_data, client, iq_elt): + def _IQNegotiationCb(self, iq_elt, session_data, client): """Called when the result of open iq is received @param session_data(dict): data of the session @param client: %(doc_client)s @param iq_elt(domish.Element): <iq> result """ - sid = session_data['id'] - if iq_elt["type"] == "error": - log.warning(_("Socks5 transfer failed")) - # FIXME: must clean session - return - try: - session_data = client.xep_0065_current_stream[sid] - file_obj = session_data["file_obj"] - timer = session_data["timer"] - except KeyError: - raise exceptions.InternalError - - timer.reset(TIMEOUT) - - query_elt = iq_elt.elements(NS_BS, 'query').next() - streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) - - if not streamhost_elts: - log.warning(_("No streamhost found in stream query")) + query_elt = iq_elt.elements(NS_BS, 'query').next() + streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() + except StopIteration: + log.warning(u"No streamhost found in stream query") # FIXME: must clean session return - # FIXME: must be cleaned ! - - streamhost_jid = streamhost_elts[0]['jid'] - if streamhost_jid != client.jid.full(): - log.debug(_("A proxy server is used")) - proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) - proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) - proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) - if proxy_jid != streamhost_jid: - log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) - return - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) - reactor.connectTCP(proxy_host, int(proxy_port), factory) - else: - session_data["start_transfer_cb"](file_obj) # We now activate the stream + streamhost_jid = jid.JID(streamhost_used_elt['jid']) + try: + candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() + except StopIteration: + log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) + return - def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): - log.debug(_("activating stream")) - client = self.host.getClient(profile) - session_data = client.xep_0065_current_stream[sid] + if candidate.type == XEP_0065.TYPE_PROXY: + log.info(u"A Socks5 proxy is used") + d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) + d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) + d.addErrback(self._activationEb) + else: + d = defer.succeed(None) - iq_elt = client.IQ(client.xmlstream, 'set') - iq_elt["from"] = client.jid.full() - iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) - query_elt = iq_elt.addElement('query', NS_BS) - query_elt['sid'] = sid - query_elt.addElement('activate', content=session_data['to'].full()) - iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) - iq_elt.send() + d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) - def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): - if iq_elt['type'] == 'error': - log.warning(_("Can't activate the proxy stream")) - return - else: - start_transfer_cb(file_obj) + def _activationEb(self, failure): + log.warning(u"Proxy activation error: {}".format(failure.value)) + + def _IQNegotiationEb(self, stanza_err, session_data, client): + log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition)) + # FIXME: must clean session def createSession(self, *args, **kwargs): """like [_createSession] but return the session deferred instead of the whole session @@ -1111,26 +1063,34 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, file_obj, to_jid, sid, profile): + def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): """Called when a bytestream is imminent @param file_obj(file): File object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id + @param initiator(bool): if True, this session is create by initiator @param profile: %(doc_profile)s @return (dict): session data """ client = self.host.getClient(profile) - if sid in client.xep_0065_current_stream: + if sid in client.xep_0065_sid_session: raise exceptions.ConflictError(u'A session with this id already exists !') - session_data = client.xep_0065_current_stream[sid] = \ + if requester: + session_hash = getSessionHash(client.jid, to_jid, sid) + session_data = self._registerHash(session_hash, file_obj, profile) + else: + session_hash = getSessionHash(to_jid, client.jid, sid) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: defer.Deferred(), + } + client.xep_0065_sid_session[sid] = session_data + session_data.update( {'id': sid, - DEFER_KEY: defer.Deferred(), - 'to': to_jid, - 'file_obj': file_obj, - 'seq': -1, # FIXME: to check - 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), - } + 'peer_jid': to_jid, + 'file': file_obj, + 'hash': session_hash, + }) return session_data @@ -1156,7 +1116,7 @@ return client._s5b_sessions[session_hash] def registerHash(self, *args, **kwargs): - """like [_registerHash] but resutrn the session deferred instead of the whole session + """like [_registerHash] but resturn the session deferred instead of the whole session session deferred is fired when transfer is finished """ return self._registerHash(*args, **kwargs)[DEFER_KEY] @@ -1195,91 +1155,62 @@ return session_data def streamQuery(self, iq_elt, profile): - """Get file using byte stream""" - log.debug(_("BS stream query")) + log.debug(u"BS stream query") client = self.host.getClient(profile) - if not client: - raise ProfileNotInCacheError - - xmlstream = client.xmlstream - iq_elt.handled = True - query_elt = iq_elt.firstChildElement() - sid = query_elt.getAttribute("sid") - streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) - if not sid in client.xep_0065_current_stream: - log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) - self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) - return + query_elt = iq_elt.elements(NS_BS, 'query').next() + try: + sid = query_elt['sid'] + except KeyError: + log.warning(u"Invalid bystreams request received") + return client.sendError(iq_elt, "bad-request") - client.xep_0065_current_stream[sid]['timer'].cancel() - client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) - client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream - + streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) if not streamhost_elts: - log.warning(_(u"No streamhost found in stream query %s" % sid)) - self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) - return + return client.sendError(iq_elt, "bad-request") - streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case - sh_host = streamhost_elt.getAttribute("host") - sh_port = streamhost_elt.getAttribute("port") - sh_jid = streamhost_elt.getAttribute("jid") - if not sh_host or not sh_port or not sh_jid: - log.warning(_("incomplete streamhost element")) - self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) - return + try: + session_data = client.xep_0065_sid_session[sid] + except KeyError: + log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) + return client.sendError(iq_elt, 'not-acceptable') - client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) - - log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) - reactor.connectTCP(sh_host, int(sh_port), factory) + peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) - def activateStream(self, sid, iq_id, profile): - client = self.host.getClient(profile) - log.debug(_("activating stream")) - result = domish.Element((None, 'iq')) - session_data = client.xep_0065_current_stream[sid] - result['type'] = 'result' - result['id'] = iq_id - result['from'] = session_data["to"].full() - result['to'] = session_data["from"].full() - query = result.addElement('query', NS_BS) - query['sid'] = sid - streamhost = query.addElement('streamhost-used') - streamhost['jid'] = session_data["streamhost"][2] - session_data["xmlstream"].send(result) + candidates = [] + nb_sh = len(streamhost_elts) + for idx, sh_elt in enumerate(streamhost_elts): + try: + host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) + except KeyError: + log.warning(u"malformed streamhost element") + return client.sendError(iq_elt, "bad-request") + priority = nb_sh - idx + if jid_.userhostJID() != peer_jid.userhostJID(): + type_ = XEP_0065.TYPE_PROXY + else: + type_ = XEP_0065.TYPE_DIRECT + candidates.append(Candidate(host, port, type_, priority, jid_)) - def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): - """Not acceptable error used when the stream is not expected or something is going wrong - @param iq_id: IQ id - @param to_jid: addressee - @param xmlstream: XML stream to use to send the error""" - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - error_el = result.addElement('error') - error_el['type'] = 'modify' - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) - xmlstream.send(result) + for candidate in candidates: + log.info(u"Candidate proposed: {}".format(candidate)) + + d = self.getBestCandidate(candidates, session_data['hash'], profile) + d.addCallback(self._ackStream, iq_elt, session_data, client) - def sendBadRequestError(self, iq_id, to_jid, xmlstream): - """Not acceptable error used when the stream is not expected or something is going wrong - @param iq_id: IQ id - @param to_jid: addressee - @param xmlstream: XML stream to use to send the error""" - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - error_el = result.addElement('error') - error_el['type'] = 'cancel' - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) - xmlstream.send(result) + def _ackStream(self, candidate, iq_elt, session_data, client): + if candidate is None: + log.info("No streamhost candidate worked, we have to end negotiation") + return client.sendError(iq_elt, 'item-not-found') + log.debug(u"activating stream") + result_elt = xmlstream.toResponse(iq_elt, 'result') + query_elt = result_elt.addElement((NS_BS, 'query')) + query_elt['sid'] = session_data['id'] + streamhost_used_elt = query_elt.addElement('streamhost-used') + streamhost_used_elt['jid'] = candidate.jid.full() + client.xmlstream.send(result_elt) class XEP_0065_handler(XMPPHandler):
--- a/src/plugins/plugin_xep_0095.py Wed Nov 11 18:19:49 2015 +0100 +++ b/src/plugins/plugin_xep_0095.py Wed Nov 11 18:19:49 2015 +0100 @@ -21,23 +21,14 @@ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) -from twisted.words.xish import domish -from twisted.words.protocols.jabber import client +from sat.core import exceptions +from twisted.words.protocols.jabber import xmlstream +from twisted.words.protocols.jabber import error +from zope.interface import implements +from wokkel import disco +from wokkel import iwokkel import uuid -from zope.interface import implements - -try: - from twisted.words.protocols.xmlstream import XMPPHandler -except ImportError: - from wokkel.subprotocols import XMPPHandler - -from wokkel import disco, iwokkel - -IQ_SET = '/iq[@type="set"]' -NS_SI = 'http://jabber.org/protocol/si' -SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' -SI_PROFILE_HEADER = "http://jabber.org/protocol/si/profile/" PLUGIN_INFO = { "name": "XEP 0095 Plugin", @@ -50,6 +41,13 @@ } +IQ_SET = '/iq[@type="set"]' +NS_SI = 'http://jabber.org/protocol/si' +SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' +SI_PROFILE_HEADER = "http://jabber.org/protocol/si/profile/" +SI_ERROR_CONDITIONS = ('bad-profile', 'no-valid-streams') + + class XEP_0095(object): def __init__(self, host): @@ -62,129 +60,105 @@ def registerSIProfile(self, si_profile, callback): """Add a callback for a SI Profile - param si_profile: SI profile name (e.g. file-transfer) - param callback: method to call when the profile name is asked""" + + @param si_profile(unicode): SI profile name (e.g. file-transfer) + @param callback(callable): method to call when the profile name is asked + """ self.si_profiles[si_profile] = callback - def streamInit(self, iq_el, profile): + def unregisterSIProfile(self, si_profile): + try: + del self.si_profiles[si_profile] + except KeyError: + log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile)) + + def streamInit(self, iq_elt, profile): """This method is called on stream initiation (XEP-0095 #3.2) - @param iq_el: IQ element + + @param iq_elt: IQ element @param profile: %(doc_profile)s""" log.info(_("XEP-0095 Stream initiation")) - iq_el.handled = True - si_el = iq_el.firstChildElement() - si_id = si_el.getAttribute('id') - si_mime_type = iq_el.getAttribute('mime-type', 'application/octet-stream') - si_profile = si_el.getAttribute('profile') + iq_elt.handled = True + si_elt = iq_elt.elements(NS_SI, 'si').next() + si_id = si_elt['id'] + si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream') + si_profile = si_elt['profile'] si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile if si_profile_key in self.si_profiles: #We know this SI profile, we call the callback - self.si_profiles[si_profile_key](iq_el['id'], iq_el['from'], si_id, si_mime_type, si_el, profile) + self.si_profiles[si_profile_key](iq_elt, si_id, si_mime_type, si_elt, profile) else: #We don't know this profile, we send an error - self.sendBadProfileError(iq_el['id'], iq_el['from'], profile) + self.sendError(iq_elt, 'bad-profile', profile) - def sendRejectedError(self, iq_id, to_jid, reason='Offer Declined', profile=C.PROF_KEY_NONE): - """Helper method to send when the stream is rejected - @param iq_id: IQ id - @param to_jid: recipient - @param reason: human readable reason (string) - @param profile: %(doc_profile)s""" - self.sendError(iq_id, to_jid, 403, 'cancel', {'text': reason}, profile=profile) - - def sendBadProfileError(self, iq_id, to_jid, profile): - """Helper method to send when we don't know the SI profile - @param iq_id: IQ id - @param to_jid: recipient - @param profile: %(doc_profile)s""" - self.sendError(iq_id, to_jid, 400, 'modify', profile=profile) + def sendError(self, request, condition, profile): + """Send IQ error as a result - def sendBadRequestError(self, iq_id, to_jid, profile): - """Helper method to send when we don't know the SI profile - @param iq_id: IQ id - @param to_jid: recipient - @param profile: %(doc_profile)s""" - self.sendError(iq_id, to_jid, 400, 'cancel', profile=profile) + @param request(domish.Element): original IQ request + @param condition(str): error condition + @param profile: %(doc_profile)s + """ + client = self.host.getClient(profile) + if condition in SI_ERROR_CONDITIONS: + si_condition = condition + condition = 'bad-request' + else: + si_condition = None - def sendFailedError(self, iq_id, to_jid, profile): - """Helper method to send when we transfer failed - @param iq_id: IQ id - @param to_jid: recipient - @param profile: %(doc_profile)s""" - self.sendError(iq_id, to_jid, 500, 'cancel', {'custom': 'failed'}, profile=profile) # as there is no lerror code for failed transfer, we use 500 (undefined-condition) + iq_error_elt = error.StanzaError(condition).toResponse(request) + if si_condition is not None: + iq_error_elt.error.addElement((NS_SI, si_condition)) + + client.xmlstream.send(iq_error_elt) - def sendError(self, iq_id, to_jid, err_code, err_type='cancel', data={}, profile=C.PROF_KEY_NONE): - """Send IQ error as a result - @param iq_id: IQ id - @param to_jid: recipient - @param err_code: error err_code (see XEP-0095 #4.2) - @param err_type: one of cancel, modify - @param data: error specific data (dictionary) + def acceptStream(self, iq_elt, feature_elt, misc_elts=None, profile=C.PROF_KEY_NONE): + """Send the accept stream initiation answer + + @param iq_elt(domish.Element): initial SI request + @param feature_elt(domish.Element): 'feature' element containing stream method to use + @param misc_elts(list[domish.Element]): list of elements to add @param profile: %(doc_profile)s """ - client_ = self.host.getClient(profile) - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - error_el = result.addElement('error') - error_el['err_code'] = str(err_code) - error_el['type'] = err_type - if err_code == 400 and err_type == 'cancel': - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) - error_el.addElement((NS_SI, 'no-valid-streams')) - elif err_code == 400 and err_type == 'modify': - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) - error_el.addElement((NS_SI, 'bad-profile')) - elif err_code == 403 and err_type == 'cancel': - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'forbidden')) - if 'text' in data: - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'text'), content=data['text']) - elif err_code == 500 and err_type == 'cancel': - condition_el = error_el.addElement((NS_SI, 'undefined-condition')) - if 'custom' in data and data['custom'] == 'failed': - condition_el.addContent('Stream failed') - - client_.xmlstream.send(result) + log.info(_("sending stream initiation accept answer")) + if misc_elts is None: + misc_elts = [] + client = self.host.getClient(profile) + result_elt = xmlstream.toResponse(iq_elt, 'result') + si_elt = result_elt.addElement((NS_SI, 'si')) + si_elt.addChild(feature_elt) + for elt in misc_elts: + si_elt.addChild(elt) + client.xmlstream.send(result_elt) - def acceptStream(self, iq_id, to_jid, feature_elt, misc_elts=[], profile=C.PROF_KEY_NONE): - """Send the accept stream initiation answer - @param iq_id: IQ id - @param feature_elt: domish element 'feature' containing stream method to use - @param misc_elts: list of domish element to add - @param profile: %(doc_profile)s""" - _client = self.host.getClient(profile) - assert(_client) - log.info(_("sending stream initiation accept answer")) - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - si = result.addElement('si', NS_SI) - si.addChild(feature_elt) - for elt in misc_elts: - si.addChild(elt) - _client.xmlstream.send(result) + def _parseOfferResult(self, iq_elt): + try: + si_elt = iq_elt.elements(NS_SI, "si").next() + except StopIteration: + log.warning(u"No <si/> element found in result while expected") + raise exceptions.DataError + return (iq_elt, si_elt) + + + def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile=C.PROF_KEY_NONE): + """Propose a stream initiation - def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile_key=C.PROF_KEY_NONE): - """Propose a stream initiation - @param to_jid: recipient (JID) - @param si_profile: Stream initiation profile (XEP-0095) - @param feature_elt: feature domish element, according to XEP-0020 - @param misc_elts: list of domish element to add for this profile - @param mime_type: stream mime type + @param to_jid(jid.JID): recipient + @param si_profile(unicode): Stream initiation profile (XEP-0095) + @param feature_elt(domish.Element): feature element, according to XEP-0020 + @param misc_elts(list[domish.Element]): list of elements to add + @param mime_type(unicode): stream mime type @param profile: %(doc_profile)s - @return: session id, offer""" - current_jid, xmlstream = self.host.getJidNStream(profile_key) - if not xmlstream: - log.error(_('Asking for an non-existant or not connected profile')) - return "" - - offer = client.IQ(xmlstream, 'set') + @return (tuple): tuple with: + - session id (unicode) + - (D(domish_elt, domish_elt): offer deferred which returl a tuple + with iq_elt and si_elt + """ + client = self.host.getClient(profile) + offer = client.IQ() sid = str(uuid.uuid4()) log.debug(_(u"Stream Session ID: %s") % offer["id"]) - offer["from"] = current_jid.full() + offer["from"] = client.jid.full() offer["to"] = to_jid.full() si = offer.addElement('si', NS_SI) si['id'] = sid @@ -194,11 +168,12 @@ si.addChild(elt) si.addChild(feature_elt) - offer.send() - return sid, offer + offer_d = offer.send() + offer_d.addCallback(self._parseOfferResult) + return sid, offer_d -class XEP_0095_handler(XMPPHandler): +class XEP_0095_handler(xmlstream.XMPPHandler): implements(iwokkel.IDisco) def __init__(self, plugin_parent): @@ -209,7 +184,7 @@ self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, profile=self.parent.profile) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): - return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature("http://jabber.org/protocol/si/profile/%s" % profile_name) for profile_name in self.plugin_parent.si_profiles] + return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []
--- a/src/plugins/plugin_xep_0096.py Wed Nov 11 18:19:49 2015 +0100 +++ b/src/plugins/plugin_xep_0096.py Wed Nov 11 18:19:49 2015 +0100 @@ -21,18 +21,17 @@ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) +from sat.core import exceptions from twisted.words.xish import domish from twisted.words.protocols.jabber import jid -from twisted.words.protocols import jabber +from twisted.words.protocols.jabber import error import os -from twisted.internet import reactor -from twisted.python import failure + -from wokkel import data_form - +NS_SI_FT = "http://jabber.org/protocol/si/profile/file-transfer" IQ_SET = '/iq[@type="set"]' -PROFILE_NAME = "file-transfer" -PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME +SI_PROFILE_NAME = "file-transfer" +SI_PROFILE = "http://jabber.org/protocol/si/profile/" + SI_PROFILE_NAME PLUGIN_INFO = { "name": "XEP-0096 Plugin", @@ -53,84 +52,125 @@ self.host = host self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed - self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) - host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile) + self._f = self.host.plugins["FILE"] + self._si = self.host.plugins["XEP-0095"] + self._si.registerSIProfile(SI_PROFILE_NAME, self._transferRequest) + host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile) + + def unload(self): + self._si.unregisterSIProfile(SI_PROFILE_NAME) + + def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE): + """Send a bad-request error - def profileConnected(self, profile): - client = self.host.getClient(profile) - client._xep_0096_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout, - # current stream method, [failed stream methods], profile] + @param iq_elt(domish.Element): initial <IQ> element of the SI request + @param message(None, unicode): informational message to display in the logs + @param profile: %(doc_profile)s + """ + if message is not None: + log.warning(message) + self._si.sendError(iq_elt, 'bad-request', profile) - def _kill_id(self, approval_id, profile): - """Delete a waiting_for_approval id, called after timeout - @param approval_id: id of _xep_0096_waiting_for_approval""" - log.info(_("SI File Transfer: TimeOut reached for id %s") % approval_id) + def _parseRange(self, parent_elt, file_size): + """find and parse <range/> element + + @param parent_elt(domish.Element): direct parent of the <range/> element + @return (tuple[bool, int, int]): a tuple with + - True if range is required + - range_offset + - range_length + """ try: - client = self.host.getClient(profile) - del client._xep_0096_waiting_for_approval[approval_id] - except KeyError: - log.warning(_("kill id called on a non existant approval id")) + range_elt = parent_elt.elements(NS_SI_FT, 'range').next() + except StopIteration: + range_ = False + range_offset = None + range_length = None + else: + range_ = True - def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): + try: + range_offset = int(range_elt['offset']) + except KeyError: + range_offset = 0 + + try: + range_length = int(range_elt['length']) + except KeyError: + range_length = file_size + + if range_offset != 0 or range_length != file_size: + raise NotImplementedError # FIXME + + return range_, range_offset, range_length + + def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile): """Called when a file transfer is requested - @param iq_id: id of the iq request - @param from_jid: jid of the sender - @param si_id: Stream Initiation session id - @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown) - @param si_el: domish.Element of the request - @param profile: %(doc_profile)s""" + + @param iq_elt(domish.Element): initial <IQ> element of the SI request + @param si_id(unicode): Stream Initiation session id + @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) + @param si_elt(domish.Element): request + @param profile: %(doc_profile)s + """ log.info(_("XEP-0096 file transfer requested")) - log.debug(si_el.toXml()) - client = self.host.getClient(profile) - filename = "" - file_size = "" - file_date = None - file_hash = None - file_desc = "" - can_range = False - file_elts = filter(lambda elt: elt.name == 'file', si_el.elements()) - feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) + peer_jid = jid.JID(iq_elt['from']) + + try: + file_elt = si_elt.elements(NS_SI_FT, "file").next() + except StopIteration: + return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile) + + try: + feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) + except exceptions.NotFound: + return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile) + + try: + filename = file_elt["name"] + file_size = int(file_elt["size"]) + except (KeyError, ValueError): + return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) + + file_date = file_elt.getAttribute("date") + file_hash = file_elt.getAttribute("hash") + + log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size)) - if file_elts: - file_el = file_elts[0] - filename = file_el["name"] - file_size = file_el["size"] - file_date = file_el.getAttribute("date", "") - file_hash = file_el.getAttribute("hash", "") - log.info(_(u"File proposed: name=[%(name)s] size=%(size)s") % {'name': filename, 'size': file_size}) - for file_child_el in file_el.elements(): - if file_child_el.name == "desc": - file_desc = unicode(file_child_el) - elif file_child_el.name == "range": - can_range = True - else: - log.warning(_("No file element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) - return + try: + file_desc = unicode(file_elt.elements(NS_SI_FT, 'desc').next()) + except StopIteration: + file_desc = '' + + try: + range_, range_offset, range_length = self._parseRange(file_elt, file_size) + except ValueError: + return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) - if feature_elts: - feature_el = feature_elts[0] - data_form.Form.fromElement(feature_el.firstChildElement()) - try: - stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) - except KeyError: - log.warning(_("No stream method found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) - return - if not stream_method: - log.warning(_("Can't find a valid stream method")) - self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) - return + try: + stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) + except KeyError: + return self._badRequest(iq_elt, "No stream method found", profile) + + if stream_method: + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + plugin = self.host.plugins["XEP-0065"] + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + plugin = self.host.plugins["XEP-0047"] + else: + log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) else: - log.warning(_("No feature element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) + log.warning(_("Can't find a valid stream method")) + self._si.sendError(iq_elt, 'not-acceptable', profile) return #if we are here, the transfer can start, we just need user's agreement - data = {"filename": filename, "id": iq_id, "from": from_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, "can_range": str(can_range)} - client._xep_0096_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id, profile), stream_method, []] + data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, + "range": range_, "range_offset": range_offset, "range_length": range_length, + "si_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} - self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB, profile) + d = self._f.getDestDir(peer_jid, data, data, profile) + d.addCallback(self.confirmationCb, iq_elt, data, profile) def _getFileObject(self, dest_path, can_range=False): """Open file, put file pointer to the end if the file if needed @@ -139,178 +179,176 @@ @return: File Object""" return open(dest_path, "ab" if can_range else "wb") - def confirmationCB(self, sid, accepted, frontend_data, profile): + def confirmationCb(self, accepted, iq_elt, data, profile): """Called on confirmation answer - @param sid: file transfer session id - @param accepted: True if file transfer is accepted - @param frontend_data: data sent by frontend""" - client = self.host.getClient(profile) - data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] - can_range = data['can_range'] == "True" - range_offset = 0 - if accepted: - if timeout.active(): - timeout.cancel() - try: - dest_path = frontend_data['dest_path'] - except KeyError: - log.error(_('dest path not found in frontend_data')) - del client._xep_0096_waiting_for_approval[sid] - return - if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: - plugin = self.host.plugins["XEP-0065"] - elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: - plugin = self.host.plugins["XEP-0047"] - else: - log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) - del client._xep_0096_waiting_for_approval[sid] - return - file_obj = self._getFileObject(dest_path, can_range) - range_offset = file_obj.tell() - d = plugin.createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile) - d.addCallback(self._transferSucceeded, sid, file_obj, stream_method, profile) - d.addErrback(self._transferFailed, sid, file_obj, stream_method, profile) + @param accepted(bool): True if file transfer is accepted + @param iq_elt(domish.Element): initial SI request + @param data(dict): session data + @param profile: %(doc_profile)s + """ + if not accepted: + log.info(u"File transfer declined") + self._si.sendError(iq_elt, 'forbidden', profile) + return + # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] + # can_range = data['can_range'] == "True" + # range_offset = 0 + # if timeout.active(): + # timeout.cancel() + # try: + # dest_path = frontend_data['dest_path'] + # except KeyError: + # log.error(_('dest path not found in frontend_data')) + # del client._xep_0096_waiting_for_approval[sid] + # return + # if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + # plugin = self.host.plugins["XEP-0065"] + # elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + # plugin = self.host.plugins["XEP-0047"] + # else: + # log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) + # del client._xep_0096_waiting_for_approval[sid] + # return - #we can send the iq result - feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) - misc_elts = [] - misc_elts.append(domish.Element((PROFILE, "file"))) - if can_range: - range_elt = domish.Element((None, "range")) - range_elt['offset'] = str(range_offset) - #TODO: manage range length - misc_elts.append(range_elt) - self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) - else: - log.debug(_(u"Transfer [%s] refused") % sid) - self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) - del(client._xep_0096_waiting_for_approval[sid]) + # file_obj = self._getFileObject(dest_path, can_range) + # range_offset = file_obj.tell() + d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile) + d.addCallback(self._transferCb, data, profile) + d.addErrback(self._transferEb, data, profile) - def _transferSucceeded(self, dummy, sid, file_obj, stream_method, profile): - self.transferSucceeded(sid, file_obj, stream_method, profile) + #we can send the iq result + feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) + misc_elts = [] + misc_elts.append(domish.Element((SI_PROFILE, "file"))) + # if can_range: + # range_elt = domish.Element((None, "range")) + # range_elt['offset'] = str(range_offset) + # #TODO: manage range length + # misc_elts.append(range_elt) + self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile) - def transferSucceeded(self, dummy, sid, file_obj, stream_method, profile): + def _transferCb(self, dummy, data, profile): """Called by the stream method when transfer successfuly finished - @param id: stream id""" - client = self.host.getClient(profile) - file_obj.close() - log.info(_('Transfer %s successfuly finished') % sid) - del(client._xep_0096_waiting_for_approval[sid]) - def _transferFailed(self, sid, file_obj, stream_method, reason, profile): - self.transferFailed(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile) + @param data: session data + @param profile: %(doc_profile)s + """ + #TODO: check hash + data['file_obj'].close() + log.info(u'Transfer {si_id} successfuly finished'.format(**data)) - def transferFailed(self, failure, sid, file_obj, stream_method, profile): + def _transferEb(self, failure, data, profile): """Called when something went wrong with the transfer @param id: stream id + @param data: session data + @param profile: %(doc_profile)s + """ + log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.condition), **data)) + data['file_obj'].close() + + def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): + return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile) + + def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE): + """Send a file using XEP-0096 + + @param peer_jid(jid.JID): recipient + @param filepath(str): absolute path to the file to send + @param name(unicode): name of the file to send + name must not contain "/" characters + @param desc: description of the file + @param profile: %(doc_profile)s + @return: an unique id to identify the transfer """ client = self.host.getClient(profile) - data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] - log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % { - 'id': sid, - 's_method': stream_method, - 'reason': unicode(failure)}) - filepath = file_obj.name - file_obj.close() - os.remove(filepath) - #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session - log.warning(_("All stream methods failed, can't transfer the file")) - del(client._xep_0096_waiting_for_approval[sid]) + feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) + + file_transfer_elts = [] + + statinfo = os.stat(filepath) + file_elt = domish.Element((SI_PROFILE, 'file')) + file_elt['name'] = name or os.path.basename(filepath) + assert '/' not in file_elt['name'] + size = statinfo.st_size + file_elt['size'] = str(size) + if desc: + file_elt.addElement('desc', content=desc) + file_transfer_elts.append(file_elt) - def fileCb(self, filepath, sid, size, profile, IQ): - if IQ['type'] == "error": - stanza_err = jabber.error.exceptionFromStanza(IQ) - if stanza_err.code == '403' and stanza_err.condition == 'forbidden': - log.debug(_(u"File transfer refused by %s") % IQ['from']) - self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile) - else: - log.warning(_(u"Error during file transfer with %s") % IQ['from']) - self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile) + file_transfer_elts.append(domish.Element((None, 'range'))) + + sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile) + args = [filepath, sid, size, client] + offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) + return sid + + def _fileCb(self, result_tuple, filepath, sid, size, client): + iq_elt, si_elt = result_tuple + + try: + feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) + except exceptions.NotFound: + log.warning(u"No <feature/> element found in result while expected") return - si_elt = IQ.firstChildElement() - - if IQ['type'] != "result" or not si_elt or si_elt.name != "si": - log.error(_("Protocol error during file transfer")) - return - - feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) - if not feature_elts: - log.warning(_("No feature element")) - return - - choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) + choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elt, namespace=None) try: stream_method = choosed_options["stream-method"] except KeyError: - log.warning(_("No stream method choosed")) + log.warning(u"No stream method choosed") return - range_offset = 0 - # range_length = None - range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements()) - if range_elts: - range_elt = range_elts[0] - range_offset = range_elt.getAttribute("offset", 0) - # range_length = range_elt.getAttribute("length") + try: + file_elt = si_elt.elements(NS_SI_FT, "file").next() + except StopIteration: + pass + else: + range_, range_offset, range_length = self._parseRange(file_elt, size) if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: plugin = self.host.plugins["XEP-0065"] elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: plugin = self.host.plugins["XEP-0047"] else: - log.error(u"Invalid stream method received") + log.warning(u"Invalid stream method received") return - file_obj = open(filepath, 'r') - if range_offset: - file_obj.seek(range_offset) - d = plugin.startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile) - d.addCallback(self.sendSuccessCb, sid, file_obj, stream_method, profile) - d.addErrback(self.sendFailureCb, sid, file_obj, stream_method, profile) + file_obj = self._f.File(self.host, + filepath, + size=size, + profile=client.profile + ) + d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile) + d.addCallback(self._sendCb, sid, file_obj, client.profile) + d.addErrback(self._sendEb, sid, file_obj, client.profile) - def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): - """send a file using XEP-0096 - @to_jid: recipient - @filepath: absolute path to the file to send - @data: dictionnary with the optional following keys: - - "description": description of the file - @param profile_key: %(doc_profile_key)s - @return: an unique id to identify the transfer - """ - profile = self.host.memory.getProfileName(profile_key) - if not profile: - log.warning(_("Trying to send a file from an unknown profile")) - return "" - feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) - - file_transfer_elts = [] + def _fileEb(self, failure, filepath, sid, size, client): + if failure.check(error.StanzaError): + stanza_err = failure.value + if stanza_err.code == '403' and stanza_err.condition == 'forbidden': + from_s = stanza_err.stanza['from'] + log.info(u"File transfer refused by {}".format(from_s)) + self.host.bridge.newAlert(_("The contact {} has refused your file").format(from_s), _("File refused"), "INFO", client.profile) + else: + log.warning(_(u"Error during file transfer")) + self.host.bridge.newAlert(_(u"Something went wrong during the file transfer session intialisation: {reason}").format(reason=unicode(stanza_err.condition)), _("File transfer error"), "ERROR", client.profile) + elif failure.check(exceptions.DataError): + log.warning(u'Invalid stanza received') + else: + log.error(u'Error while proposing stream: {}'.format(failure)) - statinfo = os.stat(filepath) - file_elt = domish.Element((PROFILE, 'file')) - file_elt['name'] = os.path.basename(filepath) - size = file_elt['size'] = str(statinfo.st_size) - file_transfer_elts.append(file_elt) - - file_transfer_elts.append(domish.Element((None, 'range'))) - - sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile) - offer.addCallback(self.fileCb, filepath, sid, size, profile) - return sid - - def _sendSuccessCb(self, sid, file_obj, stream_method, profile): - self.sendSuccessCb(sid, file_obj, stream_method, profile) - - def sendSuccessCb(self, dummy, sid, file_obj, stream_method, profile): - log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]') - % {"sid": sid, "profile": profile}) + def _sendCb(self, dummy, sid, file_obj, profile): + log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( + sid=sid, + profile=profile)) file_obj.close() - def _sendFailureCb(self, sid, file_obj, stream_method, reason, profile): - self.sendFailureCb(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile) - - def sendFailureCb(self, failure, sid, file_obj, stream_method, profile): + def _sendEb(self, failure, sid, file_obj, profile): + log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( + sid=sid, + profile=profile, + reason=unicode(failure.condition), + )) file_obj.close() - log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': unicode(failure), 'profile': profile})