# HG changeset patch # User Goffi # Date 1317290971 -7200 # Node ID deeebf697d9a40c7eff4f78a56b900ef9a3da418 # Parent 41fdaeb005bca7e26b07f568074ccc2d3b06d9b8 plugins: plugin XEP-0096 update, use of XEP-0047 (IBB) diff -r 41fdaeb005bc -r deeebf697d9a src/plugins/plugin_xep_0096.py --- a/src/plugins/plugin_xep_0096.py Thu Sep 29 12:07:11 2011 +0200 +++ b/src/plugins/plugin_xep_0096.py Thu Sep 29 12:09:31 2011 +0200 @@ -19,36 +19,33 @@ along with this program. If not, see . """ -from logging import debug, info, error +from logging import debug, info, warning, error from twisted.words.xish import domish from twisted.internet import protocol from twisted.words.protocols.jabber import client, jid from twisted.words.protocols.jabber import error as jab_error -import os.path -from twisted.internet import reactor #FIXME best way ??? +import os, os.path +from twisted.internet import reactor import pdb from zope.interface import implements -try: - from twisted.words.protocols.xmlstream import XMPPHandler -except ImportError: - from wokkel.subprotocols import XMPPHandler - -from wokkel import disco, iwokkel +from wokkel import disco, iwokkel, data_form IQ_SET = '/iq[@type="set"]' NS_SI = 'http://jabber.org/protocol/si' SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]' +PROFILE_NAME = "file-transfer" +PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME PLUGIN_INFO = { "name": "XEP 0096 Plugin", "import_name": "XEP-0096", "type": "XEP", "protocols": ["XEP-0096"], -"dependencies": ["XEP-0065"], +"dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], "main": "XEP_0096", -"handler": "yes", +"handler": "no", "description": _("""Implementation of SI File Transfert""") } @@ -57,152 +54,248 @@ def __init__(self, host): info(_("Plugin XEP_0096 initialization")) self.host = host - self._waiting_for_approval = {} - host.bridge.addMethod("sendFile", ".plugin", in_sign='sss', out_sign='s', method=self.sendFile) - - def getHandler(self, profile): - return XEP_0096_handler(self) + self._waiting_for_approval = {} #key = id, value = [transfert data, IdelayedCall Reactor timeout, + # current stream method, [failed stream methods], profile] + self.managed_stream_m = [#self.host.plugins["XEP-0065"].NS_BS, + self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed + self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transfertRequest) + host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile) - def xep_96(self, IQ, profile): - info (_("XEP-0096 management")) - IQ.handled=True - SI_elem = IQ.firstChildElement() - debug(SI_elem.toXml()) + def _kill_id(self, approval_id): + """Delete a waiting_for_approval id, called after timeout + @param approval_id: id of _waiting_for_approval""" + info(_("SI File Transfert: TimeOut reached for id %s") % approval_id); + try: + del self._waiting_for_approval[approval_id] + except KeyError: + warning(_("kill id called on a non existant approval id")) + + def transfertRequest(self, from_jid, si_id, si_mime_type, si_el, profile): + """Called when a file transfert is requested + @param from_jid: jid of the sender + @param si_id: Stream Initiation session id + @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown) + @param si_el: domish.Element of the request + @param profile: %(doc_profile)s""" + info (_("XEP-0096 file transfert requested")) + debug(si_el.toXml()) filename = "" file_size = "" - for element in SI_elem.elements(): - if element.name == "file": - info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':element['name'], 'size':element['size']}) - filename = element["name"] - file_size = element["size"] - elif element.name == "feature": - from_jid = IQ["from"] - self._waiting_for_approval[IQ["id"]] = (element, from_jid, file_size, profile) - data={ "filename":filename, "from":from_jid, "size":file_size } - self.host.askConfirmation(IQ["id"], "FILE_TRANSFERT", data, self.confirmationCB) + file_date = None + file_hash = None + file_desc = "" + can_range = False + file_elts = filter(lambda elt: elt.name == 'file', si_el.elements()) + feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) + + if file_elts: + file_el = file_elts[0] + filename = file_el["name"] + file_size = file_el["size"] + file_date = file_el.getAttribute("date", "") + file_hash = file_el.getAttribute("hash", "") + info (_("File proposed: name=[%(name)s] size=%(size)s") % {'name':filename, 'size':file_size}) + for file_child_el in file_el.elements(): + if file_child_el.name == "desc": + file_desc = unicode(file_child_el) + elif file_child_el.name == "range": + can_range = True + else: + warning(_("No file element found")) + self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + return + + if feature_elts: + feature_el = feature_elts[0] + form = data_form.Form.fromElement(feature_el.firstChildElement()) + try: + stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) + except KeyError: + warning(_("No stream method found")) + self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + return + if not stream_method: + warning(_("Can't find a valid stream method")) + self.host.plugins["XEP-0095"].sendFailedError(si_id, from_jid, profile) + return + else: + warning(_("No feature element found")) + self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + return + + #if we are here, the transfert can start, we just need user's agreement + data={ "filename":filename, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) } + self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] - def confirmationCB(self, id, accepted, data): - """Called on confirmation answer""" + self.host.askConfirmation(si_id, "FILE_TRANSFERT", data, self.confirmationCB) + + + def _getFileObject(self, dest_path, can_range = False): + """Open file, put file pointer to the end if the file if needed + @param dest_path: path of the destination file + @param can_range: True if the file pointer can be moved + @return: File Object""" + return open(dest_path, "ab" if can_range else "wb") + + def confirmationCB(self, id, accepted, frontend_data): + """Called on confirmation answer + @param id: file transfert session id + @param accepted: True if file transfert is accepted + @param frontend_data: data sent by frontend""" + data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[id] + can_range = data['can_range'] == "True" + range_offset = 0 if accepted: - data['size'] = self._waiting_for_approval[id][2] - self.host.plugins["XEP-0065"].setData(data, id) - self.approved(id) + if timeout.active(): + timeout.cancel() + try: + dest_path = frontend_data['dest_path'] + except KeyError: + error(_('dest path not found in frontend_data')) + del(self._waiting_for_approval[id]) + return + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + self.host.plugins["XEP-0065"].setData(data, id) + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + file_obj = self._getFileObject(dest_path, can_range) + range_offset = file_obj.tell() + self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) + else: + error(_("Unknown stream method, this should not happen at this stage, cancelling transfert")) + del(self._waiting_for_approval[id]) + return + + #we can send the iq result + feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) + misc_elts = [] + misc_elts.append(domish.Element((PROFILE, "file"))) + if can_range: + range_elt = domish.Element(('', "range")) + range_elt['offset'] = str(range_offset) + #TODO: manage range length + misc_elts.append(range_elt) + self.host.plugins["XEP-0095"].acceptStream(id, data['from'], feature_elt, misc_elts, profile) else: debug (_("Transfert [%s] refused"), id) + self.host.plugins["XEP-0095"].sendRejectedError (id, data['from'], profile=profile) del(self._waiting_for_approval[id]) - def approved(self, id): - """must be called when a file transfert has be accepted by client""" - debug (_("Transfert [%s] accepted"), id) + def _transferSucceeded(self, sid, file_obj, stream_method): + """Called by the stream method when transfert successfuly finished + @param id: stream id""" + file_obj.close() + info(_('Transfert %s successfuly finished') % sid) + del(self._waiting_for_approval[sid]) - if ( not self._waiting_for_approval.has_key(id) ): - error (_("Approved unknow id !")) - #TODO: manage this (maybe approved by several frontends) - else: - element, from_id, size, profile = self._waiting_for_approval[id] - del(self._waiting_for_approval[id]) - self.negociate(element, id, from_id, profile) + def _transferFailed(self, sid, file_obj, stream_method, reason): + """Called when something went wrong with the transfert + @param id: stream id + @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" + data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid, + 's_method': stream_method }) + filepath = file_obj.name + file_obj.close() + os.remove(filepath) + #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session + warning(_("All stream methods failed, can't transfert the file")) + self.host.plugins["XEP-0095"].sendFailedError(id, data['from'], profile) + del(self._waiting_for_approval[id]) - def negociate(self, feat_elem, id, to_jid, profile): - #TODO: put this in a plugin - #FIXME: over ultra mega ugly, need to be generic - client = self.host.getClient(profile) - assert(client) - info (_("Feature negociation")) - data = feat_elem.firstChildElement() - field = data.firstChildElement() - #FIXME: several options ! Q&D code for test only - option = field.firstChildElement() - value = option.firstChildElement() - if unicode(value) == "http://jabber.org/protocol/bytestreams": - #ugly, as usual, need to be entirely rewritten (just for test !) - result = domish.Element(('', 'iq')) - result['type'] = 'result' - result['id'] = id - result['to'] = to_jid - si = result.addElement('si', 'http://jabber.org/protocol/si') - file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') - feature = si.addElement('feature', 'http://jabber.org/protocol/feature-neg') - x = feature.addElement('x', 'jabber:x:data') - x['type'] = 'submit' - field = x.addElement('field') - field['var'] = 'stream-method' - value = field.addElement('value') - value.addContent('http://jabber.org/protocol/bytestreams') - client.xmlstream.send(result) - - def fileCB(self, answer, xmlstream, current_jid): - if answer['type']=="result": #FIXME FIXME FIXME ugly ugly ugly ! and temp FIXME FIXME FIXME + def fileCb(self, profile, filepath, sid, size, IQ): + if IQ['type'] == "error": + stanza_err = jab_error.exceptionFromStanza(IQ) + if stanza_err.code == '403' and stanza_err.condition == 'forbidden': + debug(_("File transfert refused by %s") % IQ['from']) + self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile) + else: + warning(_("Error during file transfert with %s") % IQ['from']) + self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile) + return + + si_elt = IQ.firstChildElement() + + if IQ['type'] != "result" or not si_elt or si_elt.name != "si": + error(_("Protocol error during file transfer")) + return + + feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) + if not feature_elts: + warning(_("No feature element")) + return + + choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) + try: + stream_method = choosed_options["stream-method"] + except KeyError: + warning(_("No stream method choosed")) + return + + range_offset = 0 + range_length = None + range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements()) + if range_elts: + range_elt = range_elts[0] + range_offset = range_elt.getAttribute("offset", 0) + range_length = range_elt.getAttribute("length") + + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: info("SENDING UGLY ANSWER") - offer=client.IQ(xmlstream,'set') + """offer=client.IQ(xmlstream,'set') offer["from"]=current_jid.full() offer["to"]=answer['from'] - query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams') + query=offer.addElement('query', 'http://jabber.org/protocol/ibb') + #query=offer.addElement('query', 'http://jabber.org/protocol/bytestreams') query['mode']='tcp' streamhost=query.addElement('streamhost') streamhost['host']=self.host.memory.getParamA("IP", "File Transfert") streamhost['port']=self.host.memory.getParamA("Port", "File Transfert") streamhost['jid']=current_jid.full() - offer.send() + offer.send()""" + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + file_obj = open(filepath, 'r') + if range_offset: + file_obj.seek(range_offset) + self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) + else: + warning(_("Invalid stream method received")) - def sendFile(self, to, filepath, profile_key='@DEFAULT@'): + def sendFile(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): """send a file using XEP-0096 - Return an unique id to identify the transfert + @to_jid: recipient + @filepath: absolute path to the file to send + @data: dictionnary with the optional following keys: + - "description": description of the file + @param profile_key: %(doc_profile_key)s + @return: an unique id to identify the transfert """ - current_jid, xmlstream = self.host.getJidNStream(profile_key) - if not xmlstream: - error (_('Asking for an non-existant or not connected profile')) + profile = self.host.memory.getProfileName(profile_key) + if not profile: + warning(_("Trying to send a file from an unknown profile")) return "" - debug ("sendfile (%s) to %s", filepath, to ) - print type(filepath), type(to) + feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) + + #self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size)) + + file_transfer_elts = [] statinfo = os.stat(filepath) - - offer=client.IQ(xmlstream,'set') - debug ("Transfert ID: %s", offer["id"]) - - self.host.plugins["XEP-0065"].sendFile(offer["id"], filepath, str(statinfo.st_size)) + file_elt = domish.Element((PROFILE, 'file')) + file_elt['name']=os.path.basename(filepath) + size = file_elt['size']=str(statinfo.st_size) + file_transfer_elts.append(file_elt) - offer["from"]=current_jid.full() - offer["to"]=jid.JID(to).full() - si=offer.addElement('si','http://jabber.org/protocol/si') - si["mime-type"]='text/plain' - si["profile"]='http://jabber.org/protocol/si/profile/file-transfer' - file = si.addElement('file', 'http://jabber.org/protocol/si/profile/file-transfer') - file['name']=os.path.basename(filepath) - file['size']=str(statinfo.st_size) - - ### - # FIXME: Ugly temporary hard coded implementation of XEP-0020 & XEP-0004, - # Need to be recoded elsewhere in a more generic way - ### + file_transfer_elts.append(domish.Element((None,'range'))) - feature=si.addElement('feature', "http://jabber.org/protocol/feature-neg") - x=feature.addElement('x', "jabber:x:data") - x['type']='form' - field=x.addElement('field') - field['type']='list-single' - field['var']='stream-method' - option = field.addElement('option') - value = option.addElement('value', content='http://jabber.org/protocol/bytestreams') + sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key = profile) + offer.addCallback(self.fileCb, profile, filepath, sid, size) + return sid - offer.addCallback(self.fileCB, current_jid = current_jid, xmlstream = xmlstream) - offer.send() - return offer["id"] #XXX: using IQ id as file transfert id seems OK as IQ id are required -class XEP_0096_handler(XMPPHandler): - implements(iwokkel.IDisco) - - def __init__(self, plugin_parent): - self.plugin_parent = plugin_parent - self.host = plugin_parent.host + def sendSuccessCb(self, sid, file_obj, stream_method): + info(_('Transfer %s successfuly finished') % sid) + file_obj.close() - def connectionInitialized(self): - self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.xep_96, profile = self.parent.profile) - - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): - return [disco.DiscoFeature(NS_SI)] - - def getDiscoItems(self, requestor, target, nodeIdentifier=''): - return [] - + def sendFailureCb(self, sid, file_obj, stream_method, reason): + file_obj.close() + warning(_('Transfert %(id)s failed with stream method %(s_method)s') % { 'id': sid, s_method: stream_method })