diff src/plugins/plugin_exp_pipe.py @ 1669:697effba0310

plugin pipe: rewritten plugin as a jingle application. The current implentation can, in some cases, block the backend, and is experimental only. Improvments are needed in the future.
author Goffi <goffi@goffi.org>
date Wed, 25 Nov 2015 02:04:43 +0100
parents 3265a2639182
children dbd7c79aab2b
line wrap: on
line diff
--- a/src/plugins/plugin_exp_pipe.py	Wed Nov 25 00:22:23 2015 +0100
+++ b/src/plugins/plugin_exp_pipe.py	Wed Nov 25 02:04:43 2015 +0100
@@ -17,230 +17,126 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from sat.core.i18n import _
+from sat.core.i18n import _, D_
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 log = getLogger(__name__)
+from sat.tools import xml_tools
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
-from twisted.words.protocols import jabber
-from twisted.internet import reactor
+from twisted.internet import defer
 
-from wokkel import data_form
-
-IQ_SET = '/iq[@type="set"]'
-PROFILE_NAME = "pipe-transfer"
-PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
+NS_PIPE = 'org.salut-a-toi/pipe'
+SECURITY_LIMIT=30
 
 PLUGIN_INFO = {
     "name": "Pipe Plugin",
     "import_name": "EXP-PIPE",
     "type": "EXP",
     "protocols": ["EXP-PIPE"],
-    "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"],
+    "dependencies": ["XEP-0166"],
     "main": "Exp_Pipe",
     "handler": "no",
-    "description": _("""Implementation of SI Pipe Transfer""")
+    "description": _("""Jingle Pipe Transfer experimental plugin""")
 }
 
+CONFIRM = D_(u"{peer} wants to send you a pipe stream, do you accept ?")
+CONFIRM_TITLE = D_(u"Pipe stream")
 
 class Exp_Pipe(object):
-    """This is a modified version of XEP-0096 to work with named pipes instead of files"""
+    """This non standard jingle application works with named pipes"""
 
     def __init__(self, host):
         log.info(_("Plugin Pipe initialization"))
         self.host = host
-        self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
-                                 self.host.plugins["XEP-0047"].NAMESPACE]  # Stream methods managed
-        self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
-        host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)
-
-    def profileConnected(self, profile):
-        client = self.host.getClient(profile)
-        client._pipe_waiting_for_approval = {}  # key = id, value = [transfer data, IdelayedCall Reactor timeout,
-                                        # current stream method, [failed stream methods], profile]
-
-    def _kill_id(self, approval_id, profile):
-        """Delete a waiting_for_approval id, called after timeout
-        @param approval_id: id of _pipe_waiting_for_approval"""
-        log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id)
-        try:
-            client = self.host.getClient(profile)
-            del client._pipe_waiting_for_approval[approval_id]
-        except KeyError:
-            log.warning(_(u"kill id called on a non existant approval id"))
+        self._j = host.plugins["XEP-0166"] # shortcut to access jingle
+        self._j.registerApplication(NS_PIPE, self)
+        host.bridge.addMethod("pipeOut", ".plugin", in_sign='sss', out_sign='', method=self._pipeOut)
 
-    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
-        """Called when a pipe transfer is requested
-        @param iq_id: id of the iq request
-        @param from_jid: jid of the sender
-        @param si_id: Stream Initiation session id
-        @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
-        @param si_el: domish.Element of the request
-        @param profile: %(doc_profile)s"""
-        log.info(_("EXP-PIPE file transfer requested"))
-        log.debug(si_el.toXml())
-        client = self.host.getClient(profile)
-        pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
-
-        if not pipe_elts:
-            log.warning(_(u"No pipe element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-            return
-
-        if feature_elts:
-            feature_el = feature_elts[0]
-            data_form.Form.fromElement(feature_el.firstChildElement())
-            try:
-                stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
-            except KeyError:
-                log.warning(_(u"No stream method found"))
-                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-                return
-            if not stream_method:
-                log.warning(_(u"Can't find a valid stream method"))
-                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
-                return
-        else:
-            log.warning(_(u"No feature element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-            return
-
-        #if we are here, the transfer can start, we just need user's agreement
-        data = {"id": iq_id, "from": from_jid}
-        client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
-
-        self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile)
+    # jingle callbacks
 
-    def confirmationCB(self, sid, accepted, frontend_data, profile):
-        """Called on confirmation answer
-        @param sid: file transfer session id
-        @param accepted: True if file transfer is accepted
-        @param frontend_data: data sent by frontend"""
-        client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
-        if accepted:
-            if timeout.active():
-                timeout.cancel()
-            try:
-                dest_path = frontend_data['dest_path']
-            except KeyError:
-                log.error(_(u'dest path not found in frontend_data'))
-                del(client._pipe_waiting_for_approval[sid])
-                return
-            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-                file_obj = open(dest_path, 'w+')
-                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
-            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-                file_obj = open(dest_path, 'w+')
-                self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
-            else:
-                log.error(_(u"Unknown stream method, this should not happen at this stage, cancelling transfer"))
-                del(client._pipe_waiting_for_approval[sid])
-                return
-
-            #we can send the iq result
-            feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
-            misc_elts = []
-            misc_elts.append(domish.Element((PROFILE, "file")))
-            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
-        else:
-            log.debug(_(u"Transfer [%s] refused"), sid)
-            self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
-            del(client._pipe_waiting_for_approval[sid])
-
-    def _transferSucceeded(self, sid, file_obj, stream_method, profile):
-        """Called by the stream method when transfer successfuly finished
-        @param id: stream id"""
-        client = self.host.getClient(profile)
-        file_obj.close()
-        log.info(_('Transfer %s successfuly finished') % sid)
-        del(client._pipe_waiting_for_approval[sid])
+    def _pipeOut(self, peer_jid_s, filepath, profile_key=C.PROF_KEY_NONE):
+        profile = self.host.memory.getProfileName(profile_key)
+        self.pipeOut(jid.JID(peer_jid_s), filepath, profile)
 
-    def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
-        """Called when something went wrong with the transfer
-        @param id: stream id
-        @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
-        client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid,
-                                                                               's_method': stream_method})
-        # filepath = file_obj.name
-        file_obj.close()
-        #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
-        log.warning(_(u"All stream methods failed, can't transfer the file"))
-        del(client._pipe_waiting_for_approval[sid])
-
-    def pipeCb(self, filepath, sid, profile, IQ):
-        if IQ['type'] == "error":
-            stanza_err = jabber.error.exceptionFromStanza(IQ)
-            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
-                log.debug(_(u"Pipe transfer refused by %s") % IQ['from'])
-                self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile)
-            else:
-                log.warning(_(u"Error during pipe stream transfer with %s") % IQ['from'])
-                self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile)
-            return
-
-        si_elt = IQ.firstChildElement()
-
-        if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
-            log.error(_(u"Protocol error during file transfer"))
-            return
+    def pipeOut(self, peer_jid, filepath, profile):
+        """send a file using EXP-PIPE
 
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
-        if not feature_elts:
-            log.warning(_(u"No feature element"))
-            return
-
-        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
-        try:
-            stream_method = choosed_options["stream-method"]
-        except KeyError:
-            log.warning(_(u"No stream method choosed"))
-            return
-
-        if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-            #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
-            #file_obj = os.fdopen(fd, 'r')
-            file_obj = open(filepath, 'r')  # XXX: we have to be sure that filepath is well opened, as reading can block it
-            self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
-        elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-            #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
-            #file_obj = os.fdopen(fd, 'r')
-            file_obj = open(filepath, 'r')  # XXX: we have to be sure that filepath is well opened, as reading can block it
-            self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
-        else:
-            log.warning(_(u"Invalid stream method received"))
-
-    def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
-        """send a file using EXP-PIPE
-        @to_jid: recipient
-        @filepath: absolute path to the named pipe to send
-        @data: dictionnary with the optional data
+        @param peer_jid(jid.JID): recipient
+        @param filepath(unicode): absolute path to the named pipe to send
         @param profile_key: %(doc_profile_key)s
         @return: an unique id to identify the transfer
         """
-        profile = self.host.memory.getProfileName(profile_key)
-        if not profile:
-            log.warning(_(u"Trying to send a file from an unknown profile"))
-            return ""
-        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})
+        self._j.initiate(peer_jid,
+                         [{'app_ns': NS_PIPE,
+                           'senders': self._j.ROLE_INITIATOR,
+                           'app_kwargs': {'filepath': filepath,
+                                         },
+                         }],
+                         profile=profile)
+
+    def jingleSessionInit(self, session, content_name, filepath, profile=C.PROF_KEY_NONE):
+        content_data = session['contents'][content_name]
+        application_data = content_data['application_data']
+        assert 'file_path' not in application_data
+        application_data['file_path'] = filepath
+        desc_elt = domish.Element((NS_PIPE, 'description'))
+        return desc_elt
 
-        pipe_transfer_elts = []
+    def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile):
+        """This method request confirmation for a jingle session"""
+        content_data = session['contents'][content_name]
+        if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER):
+            log.warning(u"Bad sender, assuming initiator")
+            content_data['senders'] = self._j.ROLE_INITIATOR
 
-        pipe_elt = domish.Element((PROFILE, 'pipe'))
-        pipe_transfer_elts.append(pipe_elt)
+        def gotConfirmation(data):
+            if data.get('cancelled', False):
+                return False
+            application_data = content_data['application_data']
+            dest_path = application_data['file_path'] = data['path']
+            content_data['file_obj'] = open(dest_path, 'w+')
+            finished_d = content_data['finished_d'] = defer.Deferred()
+            args = [session, content_name, content_data, profile]
+            finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
+            return True
 
-        sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile)
-        offer.addCallback(self.pipeCb, filepath, sid, profile)
-        return sid
+        d = xml_tools.deferDialog(self.host,
+            _(CONFIRM).format(peer=session['peer_jid'].full()),
+            _(CONFIRM_TITLE),
+            type_=C.XMLUI_DIALOG_FILE,
+            options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR},
+            action_extra={'meta_from_jid': session['peer_jid'].full(),
+                          'meta_type': "PIPE",
+                         },
+            security_limit=SECURITY_LIMIT,
+            profile=profile)
+
+        d.addCallback(gotConfirmation)
+        return d
 
-    def sendSuccessCb(self, sid, file_obj, stream_method, profile):
-        log.info(_('Transfer %s successfuly finished') % sid)
-        file_obj.close()
+    def jingleHandler(self, action, session, content_name, desc_elt, profile):
+        content_data = session['contents'][content_name]
+        application_data = content_data['application_data']
+        if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
+            pass
+        elif action == self._j.A_SESSION_ACCEPT:
+            assert not 'file_obj' in content_data
+            filepath = application_data['file_path']
+            content_data['file_obj'] = open(filepath, 'r')  # XXX: we have to be sure that filepath is well opened, as reading can block it
+            finished_d = content_data['finished_d'] = defer.Deferred()
+            args = [session, content_name, content_data, profile]
+            finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
+        else:
+            log.warning(u"FIXME: unmanaged action {}".format(action))
+        return desc_elt
 
-    def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
-        file_obj.close()
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile})
+    def _finishedCb(self, dummy, session, content_name, content_data, profile):
+        log.info(u"Pipe transfer completed")
+        self._j.contentTerminate(session, content_name, profile=profile)
+        content_data['file_obj'].close()
+
+    def _finishedEb(self, failure, session, content_name, content_data, profile):
+        log.warning(u"Error while streaming pipe: {}".format(failure))
+        content_data['file_obj'].close()
+        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)