Mercurial > libervia-backend
diff src/plugins/plugin_xep_0047.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 67cc54b01a12 |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0047.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0047.py Thu Feb 08 00:37:42 2018 +0100 @@ -126,38 +126,34 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, file_obj, to_jid, sid, profile): + def _createSession(self, client, stream_object, to_jid, sid): """Called when a bytestream is imminent - @param file_obj(file): File object where data will be written + @param stream_object(IConsumer): stream object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id - @param profile: %(doc_profile)s @return (dict): session data """ - client = self.host.getClient(profile) if sid in client.xep_0047_current_stream: raise exceptions.ConflictError(u'A session with this id already exists !') session_data = client.xep_0047_current_stream[sid] = \ {'id': sid, DEFER_KEY: defer.Deferred(), 'to': to_jid, - 'file_obj': file_obj, + 'stream_object': stream_object, 'seq': -1, 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), } return session_data - def _onIBBOpen(self, iq_elt, profile): + def _onIBBOpen(self, iq_elt, client): """"Called when an IBB <open> element is received @param iq_elt(domish.Element): the whole <iq> stanza - @param profile: %(doc_profile)s """ log.debug(_(u"IBB stream opening")) iq_elt.handled = True - client = self.host.getClient(profile) open_elt = iq_elt.elements(NS_IBB, 'open').next() block_size = open_elt.getAttribute('block-size') sid = open_elt.getAttribute('sid') @@ -184,20 +180,18 @@ # we now set the stream observer to look after data packet # FIXME: if we never get the events, the observers stay. # would be better to have generic observer and check id once triggered - client.xmlstream.addObserver(event_data, observer_cb, profile=profile) - client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile) + client.xmlstream.addObserver(event_data, observer_cb, client=client) + client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) # finally, we send the accept stanza iq_result_elt = xmlstream.toResponse(iq_elt, 'result') client.send(iq_result_elt) - def _onIBBClose(self, iq_elt, profile): + def _onIBBClose(self, iq_elt, client): """"Called when an IBB <close> element is received @param iq_elt(domish.Element): the whole <iq> stanza - @param profile: %(doc_profile)s """ iq_elt.handled = True - client = self.host.getClient(profile) log.debug(_("IBB stream closing")) close_elt = iq_elt.elements(NS_IBB, 'close').next() # XXX: this observer is only triggered on valid sid, so we don't need to check it @@ -207,15 +201,13 @@ client.send(iq_result_elt) self._killSession(sid, client) - def _onIBBData(self, element, profile): + def _onIBBData(self, element, client): """Observer called on <iq> or <message> stanzas with data element - Manage the data elelement (check validity and write to the file_obj) + Manage the data elelement (check validity and write to the stream_object) @param element(domish.Element): <iq> or <message> stanza - @param profile: %(doc_profile)s """ element.handled = True - client = self.host.getClient(profile) data_elt = element.elements(NS_IBB, 'data').next() sid = data_elt['sid'] @@ -226,7 +218,7 @@ return self._sendError('item-not-found', None, element, client) from_jid = session_data["to"] - file_obj = session_data["file_obj"] + stream_object = session_data["stream_object"] if from_jid.full() != element['from']: log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) @@ -248,7 +240,7 @@ # we can now decode the data try: - file_obj.write(base64.b64decode(str(data_elt))) + stream_object.write(base64.b64decode(str(data_elt))) except TypeError: # The base64 data is invalid log.warning(_(u"Invalid base64 data")) @@ -276,17 +268,15 @@ self._killSession(sid, client, error_condition) client.send(iq_elt) - def startStream(self, file_obj, to_jid, sid, block_size=None, profile=C.PROF_KEY_NONE): + def startStream(self, client, stream_object, to_jid, sid, block_size=None): """Launch the stream workflow - @param file_obj(file): file_obj to send + @param stream_object(ifaces.IStreamProducer): stream object to send @param to_jid(jid.JID): JID of the recipient @param sid(unicode): Stream session id @param block_size(int, None): size of the block (or None for default) - @param profile: %(doc_profile)s """ - session_data = self._createSession(file_obj, to_jid, sid, profile) - client = self.host.getClient(profile) + session_data = self._createSession(client, stream_object, to_jid, sid) if block_size is None: block_size = XEP_0047.BLOCK_SIZE @@ -313,7 +303,7 @@ """ session_data["timer"].reset(TIMEOUT) - buffer_ = session_data["file_obj"].read(session_data["block_size"]) + buffer_ = session_data["stream_object"].read(session_data["block_size"]) if buffer_: next_iq_elt = client.IQ() next_iq_elt['to'] = session_data["to"].full() @@ -357,7 +347,7 @@ self.plugin_parent = parent def connectionInitialized(self): - self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile) + self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_IBB)]