Mercurial > libervia-backend
diff src/plugins/plugin_exp_pipe.py @ 1669:697effba0310
plugin pipe: rewritten plugin as a jingle application. The current implentation can, in some cases, block the backend, and is experimental only. Improvments are needed in the future.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Nov 2015 02:04:43 +0100 |
parents | 3265a2639182 |
children | dbd7c79aab2b |
line wrap: on
line diff
--- a/src/plugins/plugin_exp_pipe.py Wed Nov 25 00:22:23 2015 +0100 +++ b/src/plugins/plugin_exp_pipe.py Wed Nov 25 02:04:43 2015 +0100 @@ -17,230 +17,126 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from sat.core.i18n import _ +from sat.core.i18n import _, D_ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) +from sat.tools import xml_tools from twisted.words.xish import domish from twisted.words.protocols.jabber import jid -from twisted.words.protocols import jabber -from twisted.internet import reactor +from twisted.internet import defer -from wokkel import data_form - -IQ_SET = '/iq[@type="set"]' -PROFILE_NAME = "pipe-transfer" -PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME +NS_PIPE = 'org.salut-a-toi/pipe' +SECURITY_LIMIT=30 PLUGIN_INFO = { "name": "Pipe Plugin", "import_name": "EXP-PIPE", "type": "EXP", "protocols": ["EXP-PIPE"], - "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"], + "dependencies": ["XEP-0166"], "main": "Exp_Pipe", "handler": "no", - "description": _("""Implementation of SI Pipe Transfer""") + "description": _("""Jingle Pipe Transfer experimental plugin""") } +CONFIRM = D_(u"{peer} wants to send you a pipe stream, do you accept ?") +CONFIRM_TITLE = D_(u"Pipe stream") class Exp_Pipe(object): - """This is a modified version of XEP-0096 to work with named pipes instead of files""" + """This non standard jingle application works with named pipes""" def __init__(self, host): log.info(_("Plugin Pipe initialization")) self.host = host - self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, - self.host.plugins["XEP-0047"].NAMESPACE] # Stream methods managed - self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) - host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) - - def profileConnected(self, profile): - client = self.host.getClient(profile) - client._pipe_waiting_for_approval = {} # key = id, value = [transfer data, IdelayedCall Reactor timeout, - # current stream method, [failed stream methods], profile] - - def _kill_id(self, approval_id, profile): - """Delete a waiting_for_approval id, called after timeout - @param approval_id: id of _pipe_waiting_for_approval""" - log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id) - try: - client = self.host.getClient(profile) - del client._pipe_waiting_for_approval[approval_id] - except KeyError: - log.warning(_(u"kill id called on a non existant approval id")) + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._j.registerApplication(NS_PIPE, self) + host.bridge.addMethod("pipeOut", ".plugin", in_sign='sss', out_sign='', method=self._pipeOut) - def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): - """Called when a pipe transfer is requested - @param iq_id: id of the iq request - @param from_jid: jid of the sender - @param si_id: Stream Initiation session id - @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown) - @param si_el: domish.Element of the request - @param profile: %(doc_profile)s""" - log.info(_("EXP-PIPE file transfer requested")) - log.debug(si_el.toXml()) - client = self.host.getClient(profile) - pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) - feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) - - if not pipe_elts: - log.warning(_(u"No pipe element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) - return - - if feature_elts: - feature_el = feature_elts[0] - data_form.Form.fromElement(feature_el.firstChildElement()) - try: - stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m) - except KeyError: - log.warning(_(u"No stream method found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) - return - if not stream_method: - log.warning(_(u"Can't find a valid stream method")) - self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) - return - else: - log.warning(_(u"No feature element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) - return - - #if we are here, the transfer can start, we just need user's agreement - data = {"id": iq_id, "from": from_jid} - client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] - - self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) + # jingle callbacks - def confirmationCB(self, sid, accepted, frontend_data, profile): - """Called on confirmation answer - @param sid: file transfer session id - @param accepted: True if file transfer is accepted - @param frontend_data: data sent by frontend""" - client = self.host.getClient(profile) - data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] - if accepted: - if timeout.active(): - timeout.cancel() - try: - dest_path = frontend_data['dest_path'] - except KeyError: - log.error(_(u'dest path not found in frontend_data')) - del(client._pipe_waiting_for_approval[sid]) - return - if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: - file_obj = open(dest_path, 'w+') - self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) - elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: - file_obj = open(dest_path, 'w+') - self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) - else: - log.error(_(u"Unknown stream method, this should not happen at this stage, cancelling transfer")) - del(client._pipe_waiting_for_approval[sid]) - return - - #we can send the iq result - feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method}) - misc_elts = [] - misc_elts.append(domish.Element((PROFILE, "file"))) - self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) - else: - log.debug(_(u"Transfer [%s] refused"), sid) - self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) - del(client._pipe_waiting_for_approval[sid]) - - def _transferSucceeded(self, sid, file_obj, stream_method, profile): - """Called by the stream method when transfer successfuly finished - @param id: stream id""" - client = self.host.getClient(profile) - file_obj.close() - log.info(_('Transfer %s successfuly finished') % sid) - del(client._pipe_waiting_for_approval[sid]) + def _pipeOut(self, peer_jid_s, filepath, profile_key=C.PROF_KEY_NONE): + profile = self.host.memory.getProfileName(profile_key) + self.pipeOut(jid.JID(peer_jid_s), filepath, profile) - def _transferFailed(self, sid, file_obj, stream_method, reason, profile): - """Called when something went wrong with the transfer - @param id: stream id - @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" - client = self.host.getClient(profile) - data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] - log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid, - 's_method': stream_method}) - # filepath = file_obj.name - file_obj.close() - #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session - log.warning(_(u"All stream methods failed, can't transfer the file")) - del(client._pipe_waiting_for_approval[sid]) - - def pipeCb(self, filepath, sid, profile, IQ): - if IQ['type'] == "error": - stanza_err = jabber.error.exceptionFromStanza(IQ) - if stanza_err.code == '403' and stanza_err.condition == 'forbidden': - log.debug(_(u"Pipe transfer refused by %s") % IQ['from']) - self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile) - else: - log.warning(_(u"Error during pipe stream transfer with %s") % IQ['from']) - self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile) - return - - si_elt = IQ.firstChildElement() - - if IQ['type'] != "result" or not si_elt or si_elt.name != "si": - log.error(_(u"Protocol error during file transfer")) - return + def pipeOut(self, peer_jid, filepath, profile): + """send a file using EXP-PIPE - feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) - if not feature_elts: - log.warning(_(u"No feature element")) - return - - choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0]) - try: - stream_method = choosed_options["stream-method"] - except KeyError: - log.warning(_(u"No stream method choosed")) - return - - if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: - #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender - #file_obj = os.fdopen(fd, 'r') - file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it - self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) - elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: - #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender - #file_obj = os.fdopen(fd, 'r') - file_obj = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it - self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile) - else: - log.warning(_(u"Invalid stream method received")) - - def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): - """send a file using EXP-PIPE - @to_jid: recipient - @filepath: absolute path to the named pipe to send - @data: dictionnary with the optional data + @param peer_jid(jid.JID): recipient + @param filepath(unicode): absolute path to the named pipe to send @param profile_key: %(doc_profile_key)s @return: an unique id to identify the transfer """ - profile = self.host.memory.getProfileName(profile_key) - if not profile: - log.warning(_(u"Trying to send a file from an unknown profile")) - return "" - feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}) + self._j.initiate(peer_jid, + [{'app_ns': NS_PIPE, + 'senders': self._j.ROLE_INITIATOR, + 'app_kwargs': {'filepath': filepath, + }, + }], + profile=profile) + + def jingleSessionInit(self, session, content_name, filepath, profile=C.PROF_KEY_NONE): + content_data = session['contents'][content_name] + application_data = content_data['application_data'] + assert 'file_path' not in application_data + application_data['file_path'] = filepath + desc_elt = domish.Element((NS_PIPE, 'description')) + return desc_elt - pipe_transfer_elts = [] + def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): + """This method request confirmation for a jingle session""" + content_data = session['contents'][content_name] + if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): + log.warning(u"Bad sender, assuming initiator") + content_data['senders'] = self._j.ROLE_INITIATOR - pipe_elt = domish.Element((PROFILE, 'pipe')) - pipe_transfer_elts.append(pipe_elt) + def gotConfirmation(data): + if data.get('cancelled', False): + return False + application_data = content_data['application_data'] + dest_path = application_data['file_path'] = data['path'] + content_data['file_obj'] = open(dest_path, 'w+') + finished_d = content_data['finished_d'] = defer.Deferred() + args = [session, content_name, content_data, profile] + finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) + return True - sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile) - offer.addCallback(self.pipeCb, filepath, sid, profile) - return sid + d = xml_tools.deferDialog(self.host, + _(CONFIRM).format(peer=session['peer_jid'].full()), + _(CONFIRM_TITLE), + type_=C.XMLUI_DIALOG_FILE, + options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR}, + action_extra={'meta_from_jid': session['peer_jid'].full(), + 'meta_type': "PIPE", + }, + security_limit=SECURITY_LIMIT, + profile=profile) + + d.addCallback(gotConfirmation) + return d - def sendSuccessCb(self, sid, file_obj, stream_method, profile): - log.info(_('Transfer %s successfuly finished') % sid) - file_obj.close() + def jingleHandler(self, action, session, content_name, desc_elt, profile): + content_data = session['contents'][content_name] + application_data = content_data['application_data'] + if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): + pass + elif action == self._j.A_SESSION_ACCEPT: + assert not 'file_obj' in content_data + filepath = application_data['file_path'] + content_data['file_obj'] = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it + finished_d = content_data['finished_d'] = defer.Deferred() + args = [session, content_name, content_data, profile] + finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) + else: + log.warning(u"FIXME: unmanaged action {}".format(action)) + return desc_elt - def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): - file_obj.close() - log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile}) + def _finishedCb(self, dummy, session, content_name, content_data, profile): + log.info(u"Pipe transfer completed") + self._j.contentTerminate(session, content_name, profile=profile) + content_data['file_obj'].close() + + def _finishedEb(self, failure, session, content_name, content_data, profile): + log.warning(u"Error while streaming pipe: {}".format(failure)) + content_data['file_obj'].close() + self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)