Mercurial > libervia-backend
diff src/plugins/plugin_xep_0234.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0234.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0234.py Thu Feb 08 00:37:42 2018 +0100 @@ -25,6 +25,7 @@ from wokkel import disco, iwokkel from zope.interface import implements from sat.tools import utils +from sat.tools import stream import os.path from twisted.words.xish import domish from twisted.words.protocols.jabber import jid @@ -76,33 +77,33 @@ return u'{}_{}'.format(session['id'], content_name) def _fileJingleSend(self, peer_jid, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): - return self.fileJingleSend(jid.JID(peer_jid), filepath, name or None, file_desc or None, profile) + client = self.host.getClient(profile) + return self.fileJingleSend(client, jid.JID(peer_jid), filepath, name or None, file_desc or None) - def fileJingleSend(self, peer_jid, filepath, name, file_desc=None, profile=C.PROF_KEY_NONE): + def fileJingleSend(self, client, peer_jid, filepath, name, file_desc=None): """Send a file using jingle file transfer @param peer_jid(jid.JID): destinee jid @param filepath(str): absolute path of the file @param name(unicode, None): name of the file @param file_desc(unicode, None): description of the file - @param profile: %(doc_profile)s @return (D(unicode)): progress id """ progress_id_d = defer.Deferred() - self._j.initiate(peer_jid, + self._j.initiate(client, + peer_jid, [{'app_ns': NS_JINGLE_FT, 'senders': self._j.ROLE_INITIATOR, 'app_kwargs': {'filepath': filepath, 'name': name, 'file_desc': file_desc, 'progress_id_d': progress_id_d}, - }], - profile=profile) + }]) return progress_id_d # jingle callbacks - def jingleSessionInit(self, session, content_name, filepath, name, file_desc, progress_id_d, profile=C.PROF_KEY_NONE): + def jingleSessionInit(self, client, session, content_name, filepath, name, file_desc, progress_id_d): progress_id_d.callback(self._getProgressId(session, content_name)) content_data = session['contents'][content_name] application_data = content_data['application_data'] @@ -122,7 +123,7 @@ file_elt.addChild(self._hash.buildHashElt()) return desc_elt - def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): + def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): """This method request confirmation for a jingle session""" content_data = session['contents'][content_name] if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): @@ -166,15 +167,15 @@ def gotConfirmation(confirmed): if confirmed: finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] + args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) return confirmed - d = self._f.getDestDir(session['peer_jid'], content_data, file_data, profile) + d = self._f.getDestDir(client, session['peer_jid'], content_data, file_data, stream_object=True) d.addCallback(gotConfirmation) return d - def jingleHandler(self, action, session, content_name, desc_elt, profile): + def jingleHandler(self, client, action, session, content_name, desc_elt): content_data = session['contents'][content_name] application_data = content_data['application_data'] if action in (self._j.A_ACCEPTED_ACK,): @@ -188,27 +189,28 @@ log.debug("adding <range> element") file_elt.addElement('range') elif action == self._j.A_SESSION_ACCEPT: - assert not 'file_obj' in content_data + assert not 'stream_object' in content_data file_data = application_data['file_data'] file_path = application_data['file_path'] size = file_data['size'] # XXX: hash security is not critical here, so we just take the higher mandatory one hasher = file_data['hash_hasher'] = self._hash.getHasher('sha-256') - content_data['file_obj'] = self._f.File(self.host, - file_path, - uid=self._getProgressId(session, content_name), - size=size, - data_cb=lambda data: hasher.update(data), - profile=profile - ) + content_data['stream_object'] = stream.FileStreamObject( + self.host, + client, + file_path, + uid=self._getProgressId(session, content_name), + size=size, + data_cb=lambda data: hasher.update(data), + ) finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] + args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) else: log.warning(u"FIXME: unmanaged action {}".format(action)) return desc_elt - def jingleSessionInfo(self, action, session, content_name, jingle_elt, profile): + def jingleSessionInfo(self, client, action, session, content_name, jingle_elt): """Called on session-info action manage checksum, and ignore <received/> element @@ -240,17 +242,17 @@ log.warning(u"Hash algorithm used in given hash ({peer_algo}) doesn't correspond to the one we have used ({our_algo})" .format(peer_algo=algo, our_algo=file_data.get('hash_algo'))) else: - self._receiverTryTerminate(session, content_name, content_data, profile=profile) + self._receiverTryTerminate(client, session, content_name, content_data) else: raise NotImplementedError - def _sendCheckSum(self, session, content_name, content_data, profile): + def _sendCheckSum(self, client, session, content_name, content_data): """Send the session-info with the hash checksum""" file_data = content_data['application_data']['file_data'] hasher = file_data['hash_hasher'] hash_ = hasher.hexdigest() log.debug(u"Calculated hash: {}".format(hash_)) - iq_elt, jingle_elt = self._j.buildSessionInfo(session, profile) + iq_elt, jingle_elt = self._j.buildSessionInfo(client, session) checksum_elt = jingle_elt.addElement((NS_JINGLE_FT, 'checksum')) checksum_elt['creator'] = content_data['creator'] checksum_elt['name'] = content_name @@ -258,7 +260,7 @@ file_elt.addChild(self._hash.buildHashElt(hash_)) iq_elt.send() - def _receiverTryTerminate(self, session, content_name, content_data, last_try=False, profile=C.PROF_KEY_NONE): + def _receiverTryTerminate(self, client, session, content_name, content_data, last_try=False): """Try to terminate the session This method must only be used by the receiver. @@ -274,8 +276,8 @@ if hash_given is None: if last_try: log.warning(u"sender didn't sent hash checksum, we can't check the file") - self._j.delayedContentTerminate(session, content_name, profile=profile) - content_data['file_obj'].close() + self._j.delayedContentTerminate(client, session, content_name) + content_data['stream_object'].close() return True return False hasher = file_data['hash_hasher'] @@ -296,8 +298,8 @@ given = hash_given, our = hash_) - self._j.delayedContentTerminate(session, content_name, profile=profile) - content_data['file_obj'].close(progress_metadata, error) + self._j.delayedContentTerminate(client, session, content_name) + content_data['stream_object'].close(progress_metadata, error) # we may have the last_try timer still active, so we try to cancel it try: content_data['last_try_timer'].cancel() @@ -305,25 +307,25 @@ pass return True - def _finishedCb(self, dummy, session, content_name, content_data, profile): + def _finishedCb(self, dummy, client, session, content_name, content_data): log.info(u"File transfer terminated") if content_data['senders'] != session['role']: # we terminate the session only if we are the receiver, # as recommanded in XEP-0234 ยง2 (after example 6) content_data['transfer_finished'] = True - if not self._receiverTryTerminate(session, content_name, content_data, profile=profile): + if not self._receiverTryTerminate(client, session, content_name, content_data): # we have not received the hash yet, we wait 5 more seconds content_data['last_try_timer'] = reactor.callLater( - 5, self._receiverTryTerminate, session, content_name, content_data, last_try=True, profile=profile) + 5, self._receiverTryTerminate, client, session, content_name, content_data, last_try=True) else: # we are the sender, we send the checksum - self._sendCheckSum(session, content_name, content_data, profile) - content_data['file_obj'].close() + self._sendCheckSum(client, session, content_name, content_data) + content_data['stream_object'].close() - def _finishedEb(self, failure, session, content_name, content_data, profile): + def _finishedEb(self, failure, client, session, content_name, content_data): log.warning(u"Error while streaming file: {}".format(failure)) - content_data['file_obj'].close() - self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) + content_data['stream_object'].close() + self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) class XEP_0234_handler(XMPPHandler):