diff src/plugins/plugin_xep_0047.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 67cc54b01a12
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0047.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_xep_0047.py	Thu Feb 08 00:37:42 2018 +0100
@@ -126,38 +126,34 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, file_obj, to_jid, sid, profile):
+    def _createSession(self, client, stream_object, to_jid, sid):
         """Called when a bytestream is imminent
 
-        @param file_obj(file): File object where data will be written
+        @param stream_object(IConsumer): stream object where data will be written
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
-        @param profile: %(doc_profile)s
         @return (dict): session data
         """
-        client = self.host.getClient(profile)
         if sid in client.xep_0047_current_stream:
             raise exceptions.ConflictError(u'A session with this id already exists !')
         session_data = client.xep_0047_current_stream[sid] = \
             {'id': sid,
              DEFER_KEY: defer.Deferred(),
              'to': to_jid,
-             'file_obj': file_obj,
+             'stream_object': stream_object,
              'seq': -1,
              'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
             }
 
         return session_data
 
-    def _onIBBOpen(self, iq_elt, profile):
+    def _onIBBOpen(self, iq_elt, client):
         """"Called when an IBB <open> element is received
 
         @param iq_elt(domish.Element): the whole <iq> stanza
-        @param profile: %(doc_profile)s
         """
         log.debug(_(u"IBB stream opening"))
         iq_elt.handled = True
-        client = self.host.getClient(profile)
         open_elt = iq_elt.elements(NS_IBB, 'open').next()
         block_size = open_elt.getAttribute('block-size')
         sid = open_elt.getAttribute('sid')
@@ -184,20 +180,18 @@
         # we now set the stream observer to look after data packet
         # FIXME: if we never get the events, the observers stay.
         #        would be better to have generic observer and check id once triggered
-        client.xmlstream.addObserver(event_data, observer_cb, profile=profile)
-        client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile)
+        client.xmlstream.addObserver(event_data, observer_cb, client=client)
+        client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client)
         # finally, we send the accept stanza
         iq_result_elt = xmlstream.toResponse(iq_elt, 'result')
         client.send(iq_result_elt)
 
-    def _onIBBClose(self, iq_elt, profile):
+    def _onIBBClose(self, iq_elt, client):
         """"Called when an IBB <close> element is received
 
         @param iq_elt(domish.Element): the whole <iq> stanza
-        @param profile: %(doc_profile)s
         """
         iq_elt.handled = True
-        client = self.host.getClient(profile)
         log.debug(_("IBB stream closing"))
         close_elt = iq_elt.elements(NS_IBB, 'close').next()
         # XXX: this observer is only triggered on valid sid, so we don't need to check it
@@ -207,15 +201,13 @@
         client.send(iq_result_elt)
         self._killSession(sid, client)
 
-    def _onIBBData(self, element, profile):
+    def _onIBBData(self, element, client):
         """Observer called on <iq> or <message> stanzas with data element
 
-        Manage the data elelement (check validity and write to the file_obj)
+        Manage the data elelement (check validity and write to the stream_object)
         @param element(domish.Element): <iq> or <message> stanza
-        @param profile: %(doc_profile)s
         """
         element.handled = True
-        client = self.host.getClient(profile)
         data_elt = element.elements(NS_IBB, 'data').next()
         sid = data_elt['sid']
 
@@ -226,7 +218,7 @@
             return self._sendError('item-not-found', None, element, client)
 
         from_jid = session_data["to"]
-        file_obj = session_data["file_obj"]
+        stream_object = session_data["stream_object"]
 
         if from_jid.full() != element['from']:
             log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from']))
@@ -248,7 +240,7 @@
 
         # we can now decode the data
         try:
-            file_obj.write(base64.b64decode(str(data_elt)))
+            stream_object.write(base64.b64decode(str(data_elt)))
         except TypeError:
             # The base64 data is invalid
             log.warning(_(u"Invalid base64 data"))
@@ -276,17 +268,15 @@
             self._killSession(sid, client, error_condition)
         client.send(iq_elt)
 
-    def startStream(self, file_obj, to_jid, sid, block_size=None, profile=C.PROF_KEY_NONE):
+    def startStream(self, client, stream_object, to_jid, sid, block_size=None):
         """Launch the stream workflow
 
-        @param file_obj(file): file_obj to send
+        @param stream_object(ifaces.IStreamProducer): stream object to send
         @param to_jid(jid.JID): JID of the recipient
         @param sid(unicode): Stream session id
         @param block_size(int, None): size of the block (or None for default)
-        @param profile: %(doc_profile)s
         """
-        session_data = self._createSession(file_obj, to_jid, sid, profile)
-        client = self.host.getClient(profile)
+        session_data = self._createSession(client, stream_object, to_jid, sid)
 
         if block_size is None:
             block_size = XEP_0047.BLOCK_SIZE
@@ -313,7 +303,7 @@
         """
         session_data["timer"].reset(TIMEOUT)
 
-        buffer_ = session_data["file_obj"].read(session_data["block_size"])
+        buffer_ = session_data["stream_object"].read(session_data["block_size"])
         if buffer_:
             next_iq_elt = client.IQ()
             next_iq_elt['to'] = session_data["to"].full()
@@ -357,7 +347,7 @@
         self.plugin_parent = parent
 
     def connectionInitialized(self):
-        self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile)
+        self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent)
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
         return [disco.DiscoFeature(NS_IBB)]