Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 1759:81923b3f8b14
plugin XEP-0065: better handling of finished Deferred
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 Dec 2015 22:38:00 +0100 |
parents | a66d34353f34 |
children | d17772b0fe22 |
comparison
equal
deleted
inserted
replaced
1758:a66d34353f34 | 1759:81923b3f8b14 |
---|---|
316 self.enabledCommands = [CMD_CONNECT] | 316 self.enabledCommands = [CMD_CONNECT] |
317 self.peersock = None | 317 self.peersock = None |
318 self.addressType = 0 | 318 self.addressType = 0 |
319 self.requestType = 0 | 319 self.requestType = 0 |
320 self._file_obj = None | 320 self._file_obj = None |
321 self.active = False # set to True when protocol is actually used for transfer | |
322 # used by factories to know when the finished Deferred can be triggered | |
321 | 323 |
322 @property | 324 @property |
323 def file_obj(self): | 325 def file_obj(self): |
324 if self._file_obj is None: | 326 if self._file_obj is None: |
325 if self.server_mode: | 327 if self.server_mode: |
493 def startTransfer(self, chunk_size): | 495 def startTransfer(self, chunk_size): |
494 """Callback called when the result iq is received | 496 """Callback called when the result iq is received |
495 | 497 |
496 @param chunk_size(None, int): size of the buffer, or None for default | 498 @param chunk_size(None, int): size of the buffer, or None for default |
497 """ | 499 """ |
500 self.active = True | |
498 if chunk_size is not None: | 501 if chunk_size is not None: |
499 self.CHUNK_SIZE = chunk_size | 502 self.CHUNK_SIZE = chunk_size |
500 log.debug(u"Starting file transfer") | 503 log.debug(u"Starting file transfer") |
501 d = self.beginFileTransfer(self.file_obj, self.transport) | 504 d = self.beginFileTransfer(self.file_obj, self.transport) |
502 d.addCallback(self.fileTransfered) | 505 d.addCallback(self.fileTransfered) |
524 | 527 |
525 def dataReceived(self, buf): | 528 def dataReceived(self, buf): |
526 if self.state == STATE_READY: | 529 if self.state == STATE_READY: |
527 # Everything is set, we just have to write the incoming data | 530 # Everything is set, we just have to write the incoming data |
528 self.file_obj.write(buf) | 531 self.file_obj.write(buf) |
532 if not self.active: | |
533 self.active = True | |
529 return | 534 return |
530 | 535 |
531 self.buf = self.buf + buf | 536 self.buf = self.buf + buf |
532 if self.state == STATE_INITIAL: | 537 if self.state == STATE_INITIAL: |
533 self._parseNegotiation() | 538 self._parseNegotiation() |
604 protocols = self.getSession(session_hash)['protocols'] | 609 protocols = self.getSession(session_hash)['protocols'] |
605 protocols.remove(protocol) | 610 protocols.remove(protocol) |
606 except (KeyError, ValueError): | 611 except (KeyError, ValueError): |
607 log.error(u"Protocol not found in session while it should be there") | 612 log.error(u"Protocol not found in session while it should be there") |
608 else: | 613 else: |
609 if not protocols: | 614 if protocol.active: |
610 # The last protocol has been removed, session is finished | 615 # The active protocol has been removed, session is finished |
611 if reason.check(internet_error.ConnectionDone): | 616 if reason.check(internet_error.ConnectionDone): |
612 self.getSession(session_hash)[DEFER_KEY].callback(None) | 617 self.getSession(session_hash)[DEFER_KEY].callback(None) |
613 else: | 618 else: |
614 self.getSession(session_hash)[DEFER_KEY].errback(reason) | 619 self.getSession(session_hash)[DEFER_KEY].errback(reason) |
615 | 620 |
629 self.session_hash = session_hash | 634 self.session_hash = session_hash |
630 self.profile = profile | 635 self.profile = profile |
631 self.connection = defer.Deferred() | 636 self.connection = defer.Deferred() |
632 self._protocol_instance = None | 637 self._protocol_instance = None |
633 self.connector = None | 638 self.connector = None |
634 self._discarded = False | |
635 | 639 |
636 def discard(self): | 640 def discard(self): |
637 """Disconnect the client | 641 """Disconnect the client |
638 | 642 |
639 Also set a discarded flag, which avoid to call the session Deferred | 643 Also set a discarded flag, which avoid to call the session Deferred |
640 """ | 644 """ |
641 self.connector.disconnect() | 645 self.connector.disconnect() |
642 self._discarded = True | |
643 | 646 |
644 def getSession(self): | 647 def getSession(self): |
645 return self.session | 648 return self.session |
646 | 649 |
647 def startTransfer(self, dummy=None, chunk_size=None): | 650 def startTransfer(self, dummy=None, chunk_size=None): |
652 log.debug(u"Connection failed") | 655 log.debug(u"Connection failed") |
653 self.connection.errback(reason) | 656 self.connection.errback(reason) |
654 | 657 |
655 def clientConnectionLost(self, connector, reason): | 658 def clientConnectionLost(self, connector, reason): |
656 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) | 659 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) |
657 self._protocol_instance = None | 660 if self._protocol_instance.active: |
658 if not self._discarded: | |
659 # This one was used for the transfer, than mean that | 661 # This one was used for the transfer, than mean that |
660 # the Socks5 session is finished | 662 # the Socks5 session is finished |
661 if reason.check(internet_error.ConnectionDone): | 663 if reason.check(internet_error.ConnectionDone): |
662 self.getSession()[DEFER_KEY].callback(None) | 664 self.getSession()[DEFER_KEY].callback(None) |
663 else: | 665 else: |
664 self.getSession()[DEFER_KEY].errback(reason) | 666 self.getSession()[DEFER_KEY].errback(reason) |
667 self._protocol_instance = None | |
665 | 668 |
666 def buildProtocol(self, addr): | 669 def buildProtocol(self, addr): |
667 log.debug(("Socks 5 client connection started")) | 670 log.debug(("Socks 5 client connection started")) |
668 p = self.protocol(session_hash=self.session_hash) | 671 p = self.protocol(session_hash=self.session_hash) |
669 p.factory = self | 672 p.factory = self |