comparison src/plugins/plugin_xep_0065.py @ 1759:81923b3f8b14

plugin XEP-0065: better handling of finished Deferred
author Goffi <goffi@goffi.org>
date Thu, 17 Dec 2015 22:38:00 +0100
parents a66d34353f34
children d17772b0fe22
comparison
equal deleted inserted replaced
1758:a66d34353f34 1759:81923b3f8b14
316 self.enabledCommands = [CMD_CONNECT] 316 self.enabledCommands = [CMD_CONNECT]
317 self.peersock = None 317 self.peersock = None
318 self.addressType = 0 318 self.addressType = 0
319 self.requestType = 0 319 self.requestType = 0
320 self._file_obj = None 320 self._file_obj = None
321 self.active = False # set to True when protocol is actually used for transfer
322 # used by factories to know when the finished Deferred can be triggered
321 323
322 @property 324 @property
323 def file_obj(self): 325 def file_obj(self):
324 if self._file_obj is None: 326 if self._file_obj is None:
325 if self.server_mode: 327 if self.server_mode:
493 def startTransfer(self, chunk_size): 495 def startTransfer(self, chunk_size):
494 """Callback called when the result iq is received 496 """Callback called when the result iq is received
495 497
496 @param chunk_size(None, int): size of the buffer, or None for default 498 @param chunk_size(None, int): size of the buffer, or None for default
497 """ 499 """
500 self.active = True
498 if chunk_size is not None: 501 if chunk_size is not None:
499 self.CHUNK_SIZE = chunk_size 502 self.CHUNK_SIZE = chunk_size
500 log.debug(u"Starting file transfer") 503 log.debug(u"Starting file transfer")
501 d = self.beginFileTransfer(self.file_obj, self.transport) 504 d = self.beginFileTransfer(self.file_obj, self.transport)
502 d.addCallback(self.fileTransfered) 505 d.addCallback(self.fileTransfered)
524 527
525 def dataReceived(self, buf): 528 def dataReceived(self, buf):
526 if self.state == STATE_READY: 529 if self.state == STATE_READY:
527 # Everything is set, we just have to write the incoming data 530 # Everything is set, we just have to write the incoming data
528 self.file_obj.write(buf) 531 self.file_obj.write(buf)
532 if not self.active:
533 self.active = True
529 return 534 return
530 535
531 self.buf = self.buf + buf 536 self.buf = self.buf + buf
532 if self.state == STATE_INITIAL: 537 if self.state == STATE_INITIAL:
533 self._parseNegotiation() 538 self._parseNegotiation()
604 protocols = self.getSession(session_hash)['protocols'] 609 protocols = self.getSession(session_hash)['protocols']
605 protocols.remove(protocol) 610 protocols.remove(protocol)
606 except (KeyError, ValueError): 611 except (KeyError, ValueError):
607 log.error(u"Protocol not found in session while it should be there") 612 log.error(u"Protocol not found in session while it should be there")
608 else: 613 else:
609 if not protocols: 614 if protocol.active:
610 # The last protocol has been removed, session is finished 615 # The active protocol has been removed, session is finished
611 if reason.check(internet_error.ConnectionDone): 616 if reason.check(internet_error.ConnectionDone):
612 self.getSession(session_hash)[DEFER_KEY].callback(None) 617 self.getSession(session_hash)[DEFER_KEY].callback(None)
613 else: 618 else:
614 self.getSession(session_hash)[DEFER_KEY].errback(reason) 619 self.getSession(session_hash)[DEFER_KEY].errback(reason)
615 620
629 self.session_hash = session_hash 634 self.session_hash = session_hash
630 self.profile = profile 635 self.profile = profile
631 self.connection = defer.Deferred() 636 self.connection = defer.Deferred()
632 self._protocol_instance = None 637 self._protocol_instance = None
633 self.connector = None 638 self.connector = None
634 self._discarded = False
635 639
636 def discard(self): 640 def discard(self):
637 """Disconnect the client 641 """Disconnect the client
638 642
639 Also set a discarded flag, which avoid to call the session Deferred 643 Also set a discarded flag, which avoid to call the session Deferred
640 """ 644 """
641 self.connector.disconnect() 645 self.connector.disconnect()
642 self._discarded = True
643 646
644 def getSession(self): 647 def getSession(self):
645 return self.session 648 return self.session
646 649
647 def startTransfer(self, dummy=None, chunk_size=None): 650 def startTransfer(self, dummy=None, chunk_size=None):
652 log.debug(u"Connection failed") 655 log.debug(u"Connection failed")
653 self.connection.errback(reason) 656 self.connection.errback(reason)
654 657
655 def clientConnectionLost(self, connector, reason): 658 def clientConnectionLost(self, connector, reason):
656 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) 659 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value)
657 self._protocol_instance = None 660 if self._protocol_instance.active:
658 if not self._discarded:
659 # This one was used for the transfer, than mean that 661 # This one was used for the transfer, than mean that
660 # the Socks5 session is finished 662 # the Socks5 session is finished
661 if reason.check(internet_error.ConnectionDone): 663 if reason.check(internet_error.ConnectionDone):
662 self.getSession()[DEFER_KEY].callback(None) 664 self.getSession()[DEFER_KEY].callback(None)
663 else: 665 else:
664 self.getSession()[DEFER_KEY].errback(reason) 666 self.getSession()[DEFER_KEY].errback(reason)
667 self._protocol_instance = None
665 668
666 def buildProtocol(self, addr): 669 def buildProtocol(self, addr):
667 log.debug(("Socks 5 client connection started")) 670 log.debug(("Socks 5 client connection started"))
668 p = self.protocol(session_hash=self.session_hash) 671 p = self.protocol(session_hash=self.session_hash)
669 p.factory = self 672 p.factory = self