Mercurial > libervia-backend
diff src/plugins/plugin_xep_0096.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0096.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0096.py Thu Feb 08 00:37:42 2018 +0100 @@ -23,6 +23,7 @@ log = getLogger(__name__) from sat.core import exceptions from sat.tools import xml_tools +from sat.tools import stream from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error @@ -63,16 +64,15 @@ def unload(self): self._si.unregisterSIProfile(SI_PROFILE_NAME) - def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE): + def _badRequest(self, client, iq_elt, message=None): """Send a bad-request error @param iq_elt(domish.Element): initial <IQ> element of the SI request @param message(None, unicode): informational message to display in the logs - @param profile: %(doc_profile)s """ if message is not None: log.warning(message) - self._si.sendError(iq_elt, 'bad-request', profile) + self._si.sendError(client, iq_elt, 'bad-request') def _parseRange(self, parent_elt, file_size): """find and parse <range/> element @@ -107,14 +107,13 @@ return range_, range_offset, range_length - def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile): + def _transferRequest(self, client, iq_elt, si_id, si_mime_type, si_elt): """Called when a file transfer is requested @param iq_elt(domish.Element): initial <IQ> element of the SI request @param si_id(unicode): Stream Initiation session id @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) @param si_elt(domish.Element): request - @param profile: %(doc_profile)s """ log.info(_("XEP-0096 file transfer requested")) peer_jid = jid.JID(iq_elt['from']) @@ -122,18 +121,18 @@ try: file_elt = si_elt.elements(NS_SI_FT, "file").next() except StopIteration: - return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "No <file/> element found in SI File Transfer request") try: feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) except exceptions.NotFound: - return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "No <feature/> element found in SI File Transfer request") try: filename = file_elt["name"] file_size = int(file_elt["size"]) except (KeyError, ValueError): - return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") file_date = file_elt.getAttribute("date") file_hash = file_elt.getAttribute("hash") @@ -148,12 +147,12 @@ try: range_, range_offset, range_length = self._parseRange(file_elt, file_size) except ValueError: - return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") try: stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) except KeyError: - return self._badRequest(iq_elt, "No stream method found", profile) + return self._badRequest(client, iq_elt, "No stream method found") if stream_method: if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: @@ -164,7 +163,7 @@ log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer") else: log.warning(u"Can't find a valid stream method") - self._si.sendError(iq_elt, 'not-acceptable', profile) + self._si.sendError(client, iq_elt, 'not-acceptable') return #if we are here, the transfer can start, we just need user's agreement @@ -172,27 +171,19 @@ "range": range_, "range_offset": range_offset, "range_length": range_length, "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} - d = self._f.getDestDir(peer_jid, data, data, profile) - d.addCallback(self.confirmationCb, iq_elt, data, profile) + d = self._f.getDestDir(client, peer_jid, data, data, stream_object=True) + d.addCallback(self.confirmationCb, client, iq_elt, data) - def _getFileObject(self, dest_path, can_range=False): - """Open file, put file pointer to the end if the file if needed - @param dest_path: path of the destination file - @param can_range: True if the file pointer can be moved - @return: File Object""" - return open(dest_path, "ab" if can_range else "wb") - - def confirmationCb(self, accepted, iq_elt, data, profile): + def confirmationCb(self, accepted, client, iq_elt, data): """Called on confirmation answer @param accepted(bool): True if file transfer is accepted @param iq_elt(domish.Element): initial SI request @param data(dict): session data - @param profile: %(doc_profile)s """ if not accepted: log.info(u"File transfer declined") - self._si.sendError(iq_elt, 'forbidden', profile) + self._si.sendError(client, iq_elt, 'forbidden') return # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] # can_range = data['can_range'] == "True" @@ -216,9 +207,9 @@ # file_obj = self._getFileObject(dest_path, can_range) # range_offset = file_obj.tell() - d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile) - d.addCallback(self._transferCb, data, profile) - d.addErrback(self._transferEb, data, profile) + d = data['stream_plugin'].createSession(client, data['stream_object'], data['peer_jid'], data['si_id']) + d.addCallback(self._transferCb, client, data) + d.addErrback(self._transferEb, client, data) #we can send the iq result feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) @@ -229,32 +220,31 @@ # range_elt['offset'] = str(range_offset) # #TODO: manage range length # misc_elts.append(range_elt) - self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile) + self._si.acceptStream(client, iq_elt, feature_elt, misc_elts) - def _transferCb(self, dummy, data, profile): + def _transferCb(self, dummy, client, data): """Called by the stream method when transfer successfuly finished @param data: session data - @param profile: %(doc_profile)s """ #TODO: check hash - data['file_obj'].close() + data['stream_object'].close() log.info(u'Transfer {si_id} successfuly finished'.format(**data)) - def _transferEb(self, failure, data, profile): + def _transferEb(self, failure, client, data): """Called when something went wrong with the transfer @param id: stream id @param data: session data - @param profile: %(doc_profile)s """ log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data)) - data['file_obj'].close() + data['stream_object'].close() def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): - return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile) + client = self.host.getClient(profile) + return self.sendFile(client, jid.JID(peer_jid_s), filepath, name or None, desc or None) - def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE): + def sendFile(self, client, peer_jid, filepath, name=None, desc=None): """Send a file using XEP-0096 @param peer_jid(jid.JID): recipient @@ -265,7 +255,6 @@ @param profile: %(doc_profile)s @return: an unique id to identify the transfer """ - client = self.host.getClient(profile) feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) file_transfer_elts = [] @@ -282,7 +271,7 @@ file_transfer_elts.append(domish.Element((None, 'range'))) - sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile) + sid, offer_d = self._si.proposeStream(client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts) args = [filepath, sid, size, client] offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) return sid @@ -318,15 +307,15 @@ log.warning(u"Invalid stream method received") return - file_obj = self._f.File(self.host, - filepath, - uid=sid, - size=size, - profile=client.profile - ) - d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile) - d.addCallback(self._sendCb, sid, file_obj, client.profile) - d.addErrback(self._sendEb, sid, file_obj, client.profile) + stream_object = stream.FileStreamObject(self.host, + client, + filepath, + uid=sid, + size=size, + ) + d = plugin.startStream(client, stream_object, jid.JID(iq_elt['from']), sid) + d.addCallback(self._sendCb, client, sid, stream_object) + d.addErrback(self._sendEb, client, sid, stream_object) def _fileEb(self, failure, filepath, sid, size, client): if failure.check(error.StanzaError): @@ -347,16 +336,16 @@ else: log.error(u'Error while proposing stream: {}'.format(failure)) - def _sendCb(self, dummy, sid, file_obj, profile): + def _sendCb(self, dummy, client, sid, stream_object): log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( sid=sid, - profile=profile)) - file_obj.close() + profile=client.profile)) + stream_object.close() - def _sendEb(self, failure, sid, file_obj, profile): + def _sendEb(self, failure, client, sid, stream_object): log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( sid=sid, - profile=profile, + profile=client.profile, reason=unicode(failure.value), )) - file_obj.close() + stream_object.close()