Mercurial > libervia-backend
changeset 401:b2caa2615c4c
jp roster name manegement + Pipe transfer
- added experimental pipe transfer protocol (based on xep-0096)
- jp now manage roster name to jid conversion (if an argument correspond to a roster name, it replace it with the corresponding jid)
- jp now add last known resource of when a resource is needed but we have a bare jid (e.g. for file or pipe transfer)
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 07 Oct 2011 00:25:15 +0200 |
parents | 22788653ae8d |
children | f03688bdb858 |
files | frontends/src/bridge/DBus.py frontends/src/jp/jp src/bridge/bridge_constructor/dbus_frontend_template.py src/plugins/plugin_exp_pipe.py |
diffstat | 4 files changed, 344 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/frontends/src/bridge/DBus.py Thu Oct 06 21:21:53 2011 +0200 +++ b/frontends/src/bridge/DBus.py Fri Oct 07 00:25:15 2011 +0200 @@ -199,6 +199,9 @@ def sendFile(self, to, path, data, profile_key='@DEFAULT@'): return self.db_plugin_iface.sendFile(to, path, data, profile_key) + def pipeOut(self, to, path, data, profile_key='@DEFAULT@'): + return self.db_plugin_iface.pipeOut(to, path, data, profile_key) + def findGateways(self, target, profile_key='@DEFAULT@'): return self.db_plugin_iface.findGateways(target, profile_key)
--- a/frontends/src/jp/jp Thu Oct 06 21:21:53 2011 +0200 +++ b/frontends/src/jp/jp Fri Oct 07 00:25:15 2011 +0200 @@ -56,6 +56,8 @@ import gobject from sat_frontends.bridge.DBus import DBusBridgeFrontend,BridgeExceptionNoService import tarfile +import tempfile +import shutil try: from progressbar import ProgressBar, Percentage, Bar, ETA, FileTransferSpeed except ImportError, e: @@ -105,28 +107,29 @@ help=_("Add a new line at the beginning of the input (usefull for ascii art ;))")) parser.add_option("--connect", action="store_true", default=False, help=_("Connect the profile before doing anything else")) + parser.add_option("--pipe-in", action="store_true", default=False, + help=_("Wait for the reception of a pipe stream")) + parser.add_option("--pipe-out", action="store_true", default=False, + help=_("Pipe a stream out ")) (self.options, args) = parser.parse_args() if len(args) < 1 and not self.options.wait_file: parser.error(_("You must specify the destination JID (Jabber ID)").encode('utf-8')) - if self.options.wait_file: + if self.options.wait_file or self.options.pipe_in: #several jid - self.dest_jids = args + self.dest_jids = [arg.decode('utf-8') for arg in args] else: #one dest_jid, other args are files - self.dest_jid = JID(args[-1]) - if not self.dest_jid.is_valid: - error (_("%s is not a valid JID !"), self.dest_jid) - exit(1) + self.dest_jid = JID(args[-1].decode('utf-8')) self.files = args[:-1] if not pbar_available and self.options.progress: self.options.progress = False error (_("Option progress is not available, deactivated.")) - if self.options.progress or self.options.wait_file or self.options.connect: + if self.options.progress or self.options.wait_file or self.options.connect or self.options.pipe_in: self.start_loop = True #We have to use loop for these options else: self.start_loop = False @@ -155,7 +158,35 @@ self.connected() + def check_jids(self): + """Check jids validity, transform roster name to corresponding jids""" + names2jid = {} + for contact in self.bridge.getContacts(self.options.profile): + _jid, attr, groups = contact + if attr.has_key("name"): + names2jid[attr["name"].lower()] = _jid + + def expandJid(jid): + _jid = jid.lower() + return unicode(names2jid[_jid] if _jid in names2jid else jid) + + def check(jid): + if not jid.is_valid: + error (_("%s is not a valid JID !"), self.dest_jid) + exit(1) + + try: + self.dest_jid = expandJid(self.dest_jid) + check(self.dest_jid) + except AttributeError: + pass + try: + for i in range(len(self.dest_jids)): + self.dest_jids[i] = expandJid(self.dest_jids[i]) + check(self.dest_jids[i]) + except AttributeError: + pass def send_stdin(self): """Send incomming data on stdin to jabber contact""" @@ -172,6 +203,19 @@ else: self.bridge.sendMessage(self.dest_jid, header + "".join(sys.stdin.readlines()), profile_key=self.profile) + + def pipe_out(self): + """Create named pipe, and send stdin to it""" + tmp_dir = tempfile.mkdtemp() + fifopath = os.path.join(tmp_dir,"pipe_out") + os.mkfifo(fifopath) + self.bridge.pipeOut(self._getFullJid(self.dest_jid), fifopath, {}, profile_key=self.profile) + f = open(fifopath, 'w+') + shutil.copyfileobj(sys.stdin, f) + f.close() + shutil.rmtree(tmp_dir) + + def send_files(self): """Send files to jabber contact""" @@ -183,6 +227,7 @@ error (_("[%s] is a dir ! Please send files inside or use compression") % file) exit(1) + full_dest_jid = self._getFullJid(self.dest_jid) if self.options.bz2: tmpfile = (basename(self.files[0]) or basename(dirname(self.files[0])) ) + '.tar.bz2' #FIXME: tmp, need an algorithm to find a good name/path if os.path.exists(tmpfile): @@ -199,19 +244,31 @@ bz2.close() info(_("OK !")) path = abspath(tmpfile) - self.transfer_id = self.bridge.sendFile(self.dest_jid, path, {}, profile_key=self.profile) + self.transfer_id = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) else: for file in self.files: path = abspath(file) - self.transfer_id = self.bridge.sendFile(self.dest_jid, path, {}, profile_key=self.profile) #FIXME: show progress only for last transfer_id + self.transfer_id = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) #FIXME: show progress only for last transfer_id + - #TODO: manage ProgressBar + def _getFullJid(self, param_jid): + """Return the full jid if possible (add last resource when find a bare jid""" + _jid = JID(param_jid) + if not _jid.resource: + #if the resource is not given, we try to add the last known resource + last_resource = self.bridge.getLastResource(param_jid, self.options.profile) + if last_resource: + return "%s/%s" % (_jid.short, last_resource) + return param_jid + def askConfirmation(self, type, id, data): """CB used for file transfer, accept files depending on parameters""" answer_data={} if type == "FILE_TRANSFER": - if self.dest_jids and not data['from'] in self.dest_jids: + if not self.options.wait_file: + return + if self.dest_jids and not JID(data['from']).short in [JID(_jid).short for _jid in self.dest_jids]: return #file is not sent by a filtered jid answer_data["dest_path"] = os.getcwd()+'/'+data['filename'] @@ -228,13 +285,30 @@ if not self.options.multiple and not self.options.progress: #we just accept one file self.loop.quit() + elif type == "PIPE_TRANSFER": + if not self.options.pipe_in: + return + if self.dest_jids and not JID(data['from']).short in [JID(_jid).short for _jid in self.dest_jids]: + return #pipe stream is not sent by a filtered jid + + tmp_dir = tempfile.mkdtemp() + fifopath = os.path.join(tmp_dir,"pipe_in") + answer_data["dest_path"] = fifopath + os.mkfifo(fifopath) + self.bridge.confirmationAnswer(id, True, answer_data) + f = open(fifopath, 'r') + shutil.copyfileobj(f, sys.stdout) + f.close() + shutil.rmtree(tmp_dir) + self.loop.quit() + def actionResult(self, type, id, data): #FIXME info (_("FIXME: actionResult not implemented")) - def wait_file(self): - """Wait for a file and write it on local dir""" + def confirmation_reply(self): + """Auto reply to confirmations requests""" self.bridge.register("askConfirmation", self.askConfirmation) def progressCB(self): @@ -269,19 +343,22 @@ def connected(self): """This is called when the profile is connected""" - if self.options.wait_file: - self.wait_file() + self.check_jids() + if self.options.wait_file or self.options.pipe_in: + self.confirmation_reply() else: - if not self.files: #we send message only if there are no files to send + if self.files: + self.send_files() + elif self.options.pipe_out: + self.pipe_out() + else: self.send_stdin() - else: - self.send_files() if self.options.progress: self.pbar = None gobject.timeout_add(10, self.progressCB) - if not self.options.progress and not self.options.wait_file: + if self.start_loop and not self.options.progress and not self.options.wait_file and not self.options.pipe_in: self.loop.quit()
--- a/src/bridge/bridge_constructor/dbus_frontend_template.py Thu Oct 06 21:21:53 2011 +0200 +++ b/src/bridge/bridge_constructor/dbus_frontend_template.py Fri Oct 07 00:25:15 2011 +0200 @@ -98,6 +98,9 @@ def sendFile(self, to, path, data, profile_key='@DEFAULT@'): return self.db_plugin_iface.sendFile(to, path, data, profile_key) + def pipeOut(self, to, path, data, profile_key='@DEFAULT@'): + return self.db_plugin_iface.pipeOut(to, path, data, profile_key) + def findGateways(self, target, profile_key='@DEFAULT@'): return self.db_plugin_iface.findGateways(target, profile_key)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_exp_pipe.py Fri Oct 07 00:25:15 2011 +0200 @@ -0,0 +1,242 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +""" +SAT plugin for managing pipes (experimental) +Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +""" + +from logging import debug, info, warning, error +from twisted.words.xish import domish +from twisted.internet import protocol +from twisted.words.protocols.jabber import client, jid +from twisted.words.protocols.jabber import error as jab_error +import os, os.path +from twisted.internet import reactor +import pdb + +from zope.interface import implements + +from wokkel import disco, iwokkel, data_form + +IQ_SET = '/iq[@type="set"]' +PROFILE_NAME = "pipe-transfer" +PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME + +PLUGIN_INFO = { +"name": "Pipe Plugin", +"import_name": "EXP-PIPE", +"type": "EXP", +"protocols": ["EXP-PIPE"], +"dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], +"main": "Exp_Pipe", +"handler": "no", +"description": _("""Implementation of SI Pipe Transfer""") +} + +class Exp_Pipe(): + """This is a modified version of XEP-0096 to work with named pipes instead of files""" + + def __init__(self, host): + info(_("Plugin Pipe initialization")) + self.host = host + self._waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, + # current stream method, [failed stream methods], profile] + self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, + self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed + self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) + host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) + + def _kill_id(self, approval_id): + """Delete a waiting_for_approval id, called after timeout + @param approval_id: id of _waiting_for_approval""" + info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id); + try: + del self._waiting_for_approval[approval_id] + except KeyError: + warning(_("kill id called on a non existant approval id")) + + def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): + """Called when a pipe transfer is requested + @param iq_id: id of the iq request + @param from_jid: jid of the sender + @param si_id: Stream Initiation session id + @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) + @param si_el: domish.Element of the request + @param profile: %(doc_profile)s""" + info (_("EXP-PIPE file transfer requested")) + debug(si_el.toXml()) + pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) + feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) + + if not pipe_elts: + warning(_("No pipe element found")) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) + return + + if feature_elts: + feature_el = feature_elts[0] + form = data_form.Form.fromElement(feature_el.firstChildElement()) + try: + stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) + except KeyError: + warning(_("No stream method found")) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) + return + if not stream_method: + warning(_("Can't find a valid stream method")) + self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) + return + else: + warning(_("No feature element found")) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) + return + + #if we are here, the transfer can start, we just need user's agreement + data={ "id": iq_id, "from":from_jid } + self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] + + self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB) + + + def confirmationCB(self, sid, accepted, frontend_data): + """Called on confirmation answer + @param sid: file transfer session id + @param accepted: True if file transfer is accepted + @param frontend_data: data sent by frontend""" + data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + if accepted: + if timeout.active(): + timeout.cancel() + try: + dest_path = frontend_data['dest_path'] + except KeyError: + error(_('dest path not found in frontend_data')) + del(self._waiting_for_approval[sid]) + return + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + file_obj = open(dest_path, 'w+') + self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + file_obj = open(dest_path, 'w+') + self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) + else: + error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) + del(self._waiting_for_approval[sid]) + return + + #we can send the iq result + feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method':stream_method}) + misc_elts = [] + misc_elts.append(domish.Element((PROFILE, "file"))) + self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) + else: + debug (_("Transfer [%s] refused"), sid) + self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) + del(self._waiting_for_approval[sid]) + + def _transferSucceeded(self, sid, file_obj, stream_method): + """Called by the stream method when transfer successfuly finished + @param id: stream id""" + file_obj.close() + info(_('Transfer %s successfuly finished') % sid) + del(self._waiting_for_approval[sid]) + + def _transferFailed(self, sid, file_obj, stream_method, reason): + """Called when something went wrong with the transfer + @param id: stream id + @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" + data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, + 's_method': stream_method }) + filepath = file_obj.name + file_obj.close() + #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session + warning(_("All stream methods failed, can't transfer the file")) + del(self._waiting_for_approval[sid]) + + def pipeCb(self, profile, filepath, sid, IQ): + if IQ['type'] == "error": + stanza_err = jab_error.exceptionFromStanza(IQ) + if stanza_err.code == '403' and stanza_err.condition == 'forbidden': + debug(_("Pipe transfer refused by %s") % IQ['from']) + self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) + else: + warning(_("Error during pipe stream transfer with %s") % IQ['from']) + self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) + return + + si_elt = IQ.firstChildElement() + + if IQ['type'] != "result" or not si_elt or si_elt.name != "si": + error(_("Protocol error during file transfer")) + return + + feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) + if not feature_elts: + warning(_("No feature element")) + return + + choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) + try: + stream_method = choosed_options["stream-method"] + except KeyError: + warning(_("No stream method choosed")) + return + + if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: + #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender + #file_obj = os.fdopen(fd, 'r') + file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it + self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) + elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: + #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender + #file_obj = os.fdopen(fd, 'r') + file_obj = open(filepath, 'r') #XXX: we have to be sure that filepath is well opened, as reading can block it + self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) + else: + warning(_("Invalid stream method received")) + + def pipeOut(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): + """send a file using EXP-PIPE + @to_jid: recipient + @filepath: absolute path to the named pipe to send + @data: dictionnary with the optional data + @param profile_key: %(doc_profile_key)s + @return: an unique id to identify the transfer + """ + profile = self.host.memory.getProfileName(profile_key) + if not profile: + warning(_("Trying to send a file from an unknown profile")) + return "" + feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) + + pipe_transfer_elts = [] + + pipe_elt = domish.Element((PROFILE, 'pipe')) + pipe_transfer_elts.append(pipe_elt) + + sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) + offer.addCallback(self.pipeCb, profile, filepath, sid) + return sid + + def sendSuccessCb(self, sid, file_obj, stream_method): + info(_('Transfer %s successfuly finished') % sid) + file_obj.close() + + def sendFailureCb(self, sid, file_obj, stream_method, reason): + file_obj.close() + warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method })