comparison src/plugins/plugin_xep_0065.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
64 from twisted.internet import reactor 64 from twisted.internet import reactor
65 from twisted.internet import error as internet_error 65 from twisted.internet import error as internet_error
66 from twisted.words.protocols.jabber import error as jabber_error 66 from twisted.words.protocols.jabber import error as jabber_error
67 from twisted.words.protocols.jabber import jid 67 from twisted.words.protocols.jabber import jid
68 from twisted.words.protocols.jabber import xmlstream 68 from twisted.words.protocols.jabber import xmlstream
69 from twisted.protocols.basic import FileSender
70 from twisted.internet import defer 69 from twisted.internet import defer
71 from collections import namedtuple 70 from collections import namedtuple
72 import struct 71 import struct
73 import hashlib 72 import hashlib
74 import uuid 73 import uuid
292 @return (str): hash 291 @return (str): hash
293 """ 292 """
294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() 293 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest()
295 294
296 295
297 class SOCKSv5(protocol.Protocol, FileSender): 296 class SOCKSv5(protocol.Protocol):
298 CHUNK_SIZE = 2**16 297 CHUNK_SIZE = 2**16
299 298
300 def __init__(self, session_hash=None): 299 def __init__(self, session_hash=None):
301 """ 300 """
302 @param session_hash(str): hash of the session 301 @param session_hash(str): hash of the session
315 self.supportedAddrs = [ADDR_DOMAINNAME] 314 self.supportedAddrs = [ADDR_DOMAINNAME]
316 self.enabledCommands = [CMD_CONNECT] 315 self.enabledCommands = [CMD_CONNECT]
317 self.peersock = None 316 self.peersock = None
318 self.addressType = 0 317 self.addressType = 0
319 self.requestType = 0 318 self.requestType = 0
320 self._file_obj = None 319 self._stream_object = None
321 self.active = False # set to True when protocol is actually used for transfer 320 self.active = False # set to True when protocol is actually used for transfer
322 # used by factories to know when the finished Deferred can be triggered 321 # used by factories to know when the finished Deferred can be triggered
323 322
324 @property 323 @property
325 def file_obj(self): 324 def stream_object(self):
326 if self._file_obj is None: 325 if self._stream_object is None:
327 self._file_obj = self.getSession()['file'] 326 self._stream_object = self.getSession()['stream_object']
328 return self._file_obj 327 if self.server_mode:
328 self._stream_object.registerProducer(self.transport, True)
329 return self._stream_object
329 330
330 def getSession(self): 331 def getSession(self):
331 """Return session associated with this candidate 332 """Return session associated with this candidate
332 333
333 @return (dict): session data 334 @return (dict): session data
506 """ 507 """
507 self.active = True 508 self.active = True
508 if chunk_size is not None: 509 if chunk_size is not None:
509 self.CHUNK_SIZE = chunk_size 510 self.CHUNK_SIZE = chunk_size
510 log.debug(u"Starting file transfer") 511 log.debug(u"Starting file transfer")
511 d = self.beginFileTransfer(self.file_obj, self.transport) 512 d = self.stream_object.startStream(self.transport)
512 d.addCallback(self.fileTransfered) 513 d.addCallback(self.streamFinished)
513 514
514 def fileTransfered(self, d): 515 def streamFinished(self, d):
515 log.info(_("File transfer completed, closing connection")) 516 log.info(_("File transfer completed, closing connection"))
516 self.transport.loseConnection() 517 self.transport.loseConnection()
517 518
518 def connectCompleted(self, remotehost, remoteport): 519 def connectCompleted(self, remotehost, remoteport):
519 if self.addressType == ADDR_IPV4: 520 if self.addressType == ADDR_IPV4:
533 return True 534 return True
534 535
535 def dataReceived(self, buf): 536 def dataReceived(self, buf):
536 if self.state == STATE_READY: 537 if self.state == STATE_READY:
537 # Everything is set, we just have to write the incoming data 538 # Everything is set, we just have to write the incoming data
538 self.file_obj.write(buf) 539 self.stream_object.write(buf)
539 if not self.active: 540 if not self.active:
540 self.active = True 541 self.active = True
541 self.getSession()[TIMER_KEY].cancel() 542 self.getSession()[TIMER_KEY].cancel()
542 return 543 return
543 544
574 @param parent(XEP_0065): XEP_0065 parent instance 575 @param parent(XEP_0065): XEP_0065 parent instance
575 """ 576 """
576 self.parent = parent 577 self.parent = parent
577 578
578 def getSession(self, session_hash): 579 def getSession(self, session_hash):
579 return self.parent.getSession(session_hash, None) 580 return self.parent.getSession(None, session_hash)
580 581
581 def startTransfer(self, session_hash, chunk_size=None): 582 def startTransfer(self, session_hash, chunk_size=None):
582 session = self.getSession(session_hash) 583 session = self.getSession(session_hash)
583 try: 584 try:
584 protocol = session['protocols'][0] 585 protocol = session['protocols'][0]
628 629
629 630
630 class Socks5ClientFactory(protocol.ClientFactory): 631 class Socks5ClientFactory(protocol.ClientFactory):
631 protocol = SOCKSv5 632 protocol = SOCKSv5
632 633
633 def __init__(self, parent, session, session_hash, profile): 634 def __init__(self, client, parent, session, session_hash):
634 """Init the Client Factory 635 """Init the Client Factory
635 636
636 @param session(dict): session data 637 @param session(dict): session data
637 @param session_hash(unicode): hash used for peer_connection 638 @param session_hash(unicode): hash used for peer_connection
638 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 639 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
639 @param profile(unciode): %(doc_profile)s
640 """ 640 """
641 self.session = session 641 self.session = session
642 self.session_hash = session_hash 642 self.session_hash = session_hash
643 self.profile = profile 643 self.client = client
644 self.connection = defer.Deferred() 644 self.connection = defer.Deferred()
645 self._protocol_instance = None 645 self._protocol_instance = None
646 self.connector = None 646 self.connector = None
647 647
648 def discard(self): 648 def discard(self):
694 def __init__(self, host): 694 def __init__(self, host):
695 log.info(_("Plugin XEP_0065 initialization")) 695 log.info(_("Plugin XEP_0065 initialization"))
696 self.host = host 696 self.host = host
697 697
698 # session data 698 # session data
699 self.hash_profiles_map = {} # key: hash of the transfer session, value: session data 699 self.hash_clients_map = {} # key: hash of the transfer session, value: session data
700 self._cache_proxies = {} # key: server jid, value: proxy data 700 self._cache_proxies = {} # key: server jid, value: proxy data
701 701
702 # misc data 702 # misc data
703 self._server_factory = None 703 self._server_factory = None
704 self._external_port = None 704 self._external_port = None
749 749
750 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) 750 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port))
751 return self._server_factory 751 return self._server_factory
752 752
753 @defer.inlineCallbacks 753 @defer.inlineCallbacks
754 def getProxy(self, profile): 754 def getProxy(self, client):
755 """Return the proxy available for this profile 755 """Return the proxy available for this profile
756 756
757 cache is used between profiles using the same server 757 cache is used between clients using the same server
758 @param profile: %(doc_profile)s
759 @return ((D)(ProxyInfos, None)): Found proxy infos, 758 @return ((D)(ProxyInfos, None)): Found proxy infos,
760 or None if not acceptable proxy is found 759 or None if not acceptable proxy is found
761 """ 760 """
762 def notFound(server): 761 def notFound(server):
763 log.info(u"No proxy found on this server") 762 log.info(u"No proxy found on this server")
764 self._cache_proxies[server] = None 763 self._cache_proxies[server] = None
765 defer.returnValue(None) 764 defer.returnValue(None)
766 client = self.host.getClient(profile)
767 server = client.jid.host 765 server = client.jid.host
768 try: 766 try:
769 defer.returnValue(self._cache_proxies[server]) 767 defer.returnValue(self._cache_proxies[server])
770 except KeyError: 768 except KeyError:
771 pass 769 pass
808 @param client: %(doc_client)s 806 @param client: %(doc_client)s
809 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data 807 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
810 """ 808 """
811 self.getSocks5ServerFactory() 809 self.getSocks5ServerFactory()
812 local_port = self._server_factory_port 810 local_port = self._server_factory_port
813 external_ip = yield self._ip.getExternalIP(client.profile) 811 external_ip = yield self._ip.getExternalIP(client)
814 local_ips = yield self._ip.getLocalIPs(client.profile) 812 local_ips = yield self._ip.getLocalIPs(client)
815 813
816 if external_ip is not None and self._external_port is None: 814 if external_ip is not None and self._external_port is None:
817 if external_ip != local_ips[0]: 815 if external_ip != local_ips[0]:
818 log.info(u"We are probably behind a NAT") 816 log.info(u"We are probably behind a NAT")
819 if self._np is None: 817 if self._np is None:
826 self._external_port = ext_port 824 self._external_port = ext_port
827 825
828 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) 826 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
829 827
830 @defer.inlineCallbacks 828 @defer.inlineCallbacks
831 def getCandidates(self, profile): 829 def getCandidates(self, client):
832 """Return a list of our stream candidates 830 """Return a list of our stream candidates
833 831
834 @param profile: %(doc_profile)s
835 @return (D(list[Candidate])): list of candidates, ordered by priority 832 @return (D(list[Candidate])): list of candidates, ordered by priority
836 """ 833 """
837 client = self.host.getClient(profile)
838 server_factory = yield self.getSocks5ServerFactory() 834 server_factory = yield self.getSocks5ServerFactory()
839 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) 835 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
840 proxy = yield self.getProxy(profile) 836 proxy = yield self.getProxy(client)
841 837
842 # its time to gather the candidates 838 # its time to gather the candidates
843 candidates = [] 839 candidates = []
844 840
845 # first the direct ones 841 # first the direct ones
871 @return (D): Deferred fired when factory connection is done or has failed 867 @return (D): Deferred fired when factory connection is done or has failed
872 """ 868 """
873 candidate.factory.connector = connector 869 candidate.factory.connector = connector
874 return candidate.factory.connection 870 return candidate.factory.connection
875 871
876 def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE): 872 def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None):
877 """Connect to a candidate 873 """Connect to a candidate
878 874
879 Connection will be done with a Socks5ClientFactory 875 Connection will be done with a Socks5ClientFactory
880 @param candidate(Candidate): candidate to connect to 876 @param candidate(Candidate): candidate to connect to
881 @param session_hash(unicode): hash of the session 877 @param session_hash(unicode): hash of the session
885 None must be used in 2 cases: 881 None must be used in 2 cases:
886 - when XEP-0065 is used with XEP-0096 882 - when XEP-0065 is used with XEP-0096
887 - when a peer connect to a proxy *he proposed himself* 883 - when a peer connect to a proxy *he proposed himself*
888 in practice, peer_session_hash is only used by tryCandidates 884 in practice, peer_session_hash is only used by tryCandidates
889 @param delay(None, float): optional delay to wait before connection, in seconds 885 @param delay(None, float): optional delay to wait before connection, in seconds
890 @param profile: %(doc_profile)s
891 @return (D): Deferred launched when TCP connection + Socks5 connection is done 886 @return (D): Deferred launched when TCP connection + Socks5 connection is done
892 """ 887 """
893 if peer_session_hash is None: 888 if peer_session_hash is None:
894 # for XEP-0065, only one hash is needed 889 # for XEP-0065, only one hash is needed
895 peer_session_hash = session_hash 890 peer_session_hash = session_hash
896 session = self.getSession(session_hash, profile) 891 session = self.getSession(client, session_hash)
897 factory = Socks5ClientFactory(self, session, peer_session_hash, profile) 892 factory = Socks5ClientFactory(client, self, session, peer_session_hash)
898 candidate.factory = factory 893 candidate.factory = factory
899 if delay is None: 894 if delay is None:
900 d = defer.succeed(candidate.host) 895 d = defer.succeed(candidate.host)
901 else: 896 else:
902 d = sat_defer.DelayedDeferred(delay, candidate.host) 897 d = sat_defer.DelayedDeferred(delay, candidate.host)
903 d.addCallback(reactor.connectTCP, candidate.port, factory) 898 d.addCallback(reactor.connectTCP, candidate.port, factory)
904 d.addCallback(self._addConnector, candidate) 899 d.addCallback(self._addConnector, candidate)
905 return d 900 return d
906 901
907 def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): 902 def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None):
908 defers_list = [] 903 defers_list = []
909 904
910 for candidate in candidates: 905 for candidate in candidates:
911 delay = CANDIDATE_DELAY * len(defers_list) 906 delay = CANDIDATE_DELAY * len(defers_list)
912 if candidate.type == XEP_0065.TYPE_PROXY: 907 if candidate.type == XEP_0065.TYPE_PROXY:
913 delay += CANDIDATE_DELAY_PROXY 908 delay += CANDIDATE_DELAY_PROXY
914 d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile) 909 d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay)
915 if connection_cb is not None: 910 if connection_cb is not None:
916 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) 911 d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate))
917 if connection_eb is not None: 912 if connection_eb is not None:
918 d.addErrback(connection_eb, candidate, profile) 913 d.addErrback(connection_eb, client, candidate)
919 defers_list.append(d) 914 defers_list.append(d)
920 915
921 return defers_list 916 return defers_list
922 917
923 def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE): 918 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None):
924 """Get best candidate (according to priority) which can connect 919 """Get best candidate (according to priority) which can connect
925 920
926 @param candidates(iterable[Candidate]): candidates to test 921 @param candidates(iterable[Candidate]): candidates to test
927 @param session_hash(unicode): hash of the session 922 @param session_hash(unicode): hash of the session
928 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 923 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
929 @param peer_session_hash(unicode, None): hash of the other peer 924 @param peer_session_hash(unicode, None): hash of the other peer
930 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates 925 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
931 @param profile: %(doc_profile)s
932 @return (D(None, Candidate)): best candidate or None if none can connect 926 @return (D(None, Candidate)): best candidate or None if none can connect
933 """ 927 """
934 defer_candidates = None 928 defer_candidates = None
935 929
936 def connectionCb(candidate, profile): 930 def connectionCb(client, candidate):
937 log.info(u"Connection of {} successful".format(unicode(candidate))) 931 log.info(u"Connection of {} successful".format(unicode(candidate)))
938 for idx, other_candidate in enumerate(candidates): 932 for idx, other_candidate in enumerate(candidates):
939 try: 933 try:
940 if other_candidate.priority < candidate.priority: 934 if other_candidate.priority < candidate.priority:
941 log.debug(u"Cancelling {}".format(other_candidate)) 935 log.debug(u"Cancelling {}".format(other_candidate))
942 defer_candidates[idx].cancel() 936 defer_candidates[idx].cancel()
943 except AttributeError: 937 except AttributeError:
944 assert other_candidate is None 938 assert other_candidate is None
945 939
946 def connectionEb(failure, candidate, profile): 940 def connectionEb(failure, client, candidate):
947 if failure.check(defer.CancelledError): 941 if failure.check(defer.CancelledError):
948 log.debug(u"Connection of {} has been cancelled".format(candidate)) 942 log.debug(u"Connection of {} has been cancelled".format(candidate))
949 else: 943 else:
950 log.info(u"Connection of {candidate} Failed: {error}".format( 944 log.info(u"Connection of {candidate} Failed: {error}".format(
951 candidate = candidate, 945 candidate = candidate,
955 def allTested(self): 949 def allTested(self):
956 log.debug(u"All candidates have been tested") 950 log.debug(u"All candidates have been tested")
957 good_candidates = [c for c in candidates if c] 951 good_candidates = [c for c in candidates if c]
958 return good_candidates[0] if good_candidates else None 952 return good_candidates[0] if good_candidates else None
959 953
960 defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile) 954 defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb)
961 d_list = defer.DeferredList(defer_candidates) 955 d_list = defer.DeferredList(defer_candidates)
962 d_list.addCallback(allTested) 956 d_list.addCallback(allTested)
963 return d_list 957 return d_list
964 958
965 def _timeOut(self, session_hash, client): 959 def _timeOut(self, session_hash, client):
967 961
968 @param session_hash(str): hash as returned by getSessionHash 962 @param session_hash(str): hash as returned by getSessionHash
969 @param client: %(doc_client)s 963 @param client: %(doc_client)s
970 """ 964 """
971 log.info(u"Socks5 Bytestream: TimeOut reached") 965 log.info(u"Socks5 Bytestream: TimeOut reached")
972 session = self.getSession(session_hash, client.profile) 966 session = self.getSession(client, session_hash)
973 session[DEFER_KEY].errback(exceptions.TimeOutError) 967 session[DEFER_KEY].errback(exceptions.TimeOutError)
974 968
975 def killSession(self, reason, session_hash, sid, client): 969 def killSession(self, reason, session_hash, sid, client):
976 """Clean the current session 970 """Clean the current session
977 971
987 reason='' if reason is None else reason.value, 981 reason='' if reason is None else reason.value,
988 id='' if sid is None else u' (id: {})'.format(sid), 982 id='' if sid is None else u' (id: {})'.format(sid),
989 )) 983 ))
990 984
991 try: 985 try:
992 assert self.hash_profiles_map[session_hash] == client.profile 986 assert self.hash_clients_map[session_hash] == client
993 del self.hash_profiles_map[session_hash] 987 del self.hash_clients_map[session_hash]
994 except KeyError: 988 except KeyError:
995 pass 989 pass
996 990
997 if sid is not None: 991 if sid is not None:
998 try: 992 try:
1013 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): 1007 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1014 pass 1008 pass
1015 1009
1016 return reason 1010 return reason
1017 1011
1018 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): 1012 def startStream(self, client, stream_object, to_jid, sid):
1019 """Launch the stream workflow 1013 """Launch the stream workflow
1020 1014
1021 @param file_obj: file_obj to send 1015 @param streamProducer: stream_object to use
1022 @param to_jid: JID of the recipient 1016 @param to_jid: JID of the recipient
1023 @param sid: Stream session id 1017 @param sid: Stream session id
1024 @param successCb: method to call when stream successfuly finished 1018 @param successCb: method to call when stream successfuly finished
1025 @param failureCb: method to call when something goes wrong 1019 @param failureCb: method to call when something goes wrong
1026 @param profile: %(doc_profile)s
1027 @return (D): Deferred fired when session is finished 1020 @return (D): Deferred fired when session is finished
1028 """ 1021 """
1029 client = self.host.getClient(profile) 1022 session_data = self._createSession(client, stream_object, to_jid, sid, True)
1030 session_data = self._createSession(file_obj, to_jid, sid, True, client.profile)
1031 1023
1032 session_data[client] = client 1024 session_data[client] = client
1033 1025
1034 def gotCandidates(candidates): 1026 def gotCandidates(candidates):
1035 session_data['candidates'] = candidates 1027 session_data['candidates'] = candidates
1049 1041
1050 d = iq_elt.send() 1042 d = iq_elt.send()
1051 args = [session_data, client] 1043 args = [session_data, client]
1052 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1044 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1053 1045
1054 self.getCandidates(profile).addCallback(gotCandidates) 1046 self.getCandidates(client).addCallback(gotCandidates)
1055 return session_data[DEFER_KEY] 1047 return session_data[DEFER_KEY]
1056 1048
1057 def _IQNegotiationCb(self, iq_elt, session_data, client): 1049 def _IQNegotiationCb(self, iq_elt, session_data, client):
1058 """Called when the result of open iq is received 1050 """Called when the result of open iq is received
1059 1051
1078 else: 1070 else:
1079 log.info(u"Candidate choosed by target: {}".format(candidate)) 1071 log.info(u"Candidate choosed by target: {}".format(candidate))
1080 1072
1081 if candidate.type == XEP_0065.TYPE_PROXY: 1073 if candidate.type == XEP_0065.TYPE_PROXY:
1082 log.info(u"A Socks5 proxy is used") 1074 log.info(u"A Socks5 proxy is used")
1083 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) 1075 d = self.connectCandidate(client, candidate, session_data['hash'])
1084 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) 1076 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
1085 d.addErrback(self._activationEb) 1077 d.addErrback(self._activationEb)
1086 else: 1078 else:
1087 d = defer.succeed(None) 1079 d = defer.succeed(None)
1088 1080
1100 1092
1101 session deferred is fired when transfer is finished 1093 session deferred is fired when transfer is finished
1102 """ 1094 """
1103 return self._createSession(*args, **kwargs)[DEFER_KEY] 1095 return self._createSession(*args, **kwargs)[DEFER_KEY]
1104 1096
1105 def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): 1097 def _createSession(self, client, stream_object, to_jid, sid, requester=False):
1106 """Called when a bytestream is imminent 1098 """Called when a bytestream is imminent
1107 1099
1108 @param file_obj(file): File object where data will be written 1100 @param stream_object(iface.IStreamProducer): File object where data will be written
1109 @param to_jid(jid.JId): jid of the other peer 1101 @param to_jid(jid.JId): jid of the other peer
1110 @param sid(unicode): session id 1102 @param sid(unicode): session id
1111 @param initiator(bool): if True, this session is create by initiator 1103 @param initiator(bool): if True, this session is create by initiator
1112 @param profile: %(doc_profile)s
1113 @return (dict): session data 1104 @return (dict): session data
1114 """ 1105 """
1115 client = self.host.getClient(profile)
1116 if sid in client.xep_0065_sid_session: 1106 if sid in client.xep_0065_sid_session:
1117 raise exceptions.ConflictError(u'A session with this id already exists !') 1107 raise exceptions.ConflictError(u'A session with this id already exists !')
1118 if requester: 1108 if requester:
1119 session_hash = getSessionHash(client.jid, to_jid, sid) 1109 session_hash = getSessionHash(client.jid, to_jid, sid)
1120 session_data = self._registerHash(session_hash, file_obj, profile) 1110 session_data = self._registerHash(client, session_hash, stream_object)
1121 else: 1111 else:
1122 session_hash = getSessionHash(to_jid, client.jid, sid) 1112 session_hash = getSessionHash(to_jid, client.jid, sid)
1123 session_d = defer.Deferred() 1113 session_d = defer.Deferred()
1124 session_d.addBoth(self.killSession, session_hash, sid, client) 1114 session_d.addBoth(self.killSession, session_hash, sid, client)
1125 session_data = client._s5b_sessions[session_hash] = { 1115 session_data = client._s5b_sessions[session_hash] = {
1128 } 1118 }
1129 client.xep_0065_sid_session[sid] = session_data 1119 client.xep_0065_sid_session[sid] = session_data
1130 session_data.update( 1120 session_data.update(
1131 {'id': sid, 1121 {'id': sid,
1132 'peer_jid': to_jid, 1122 'peer_jid': to_jid,
1133 'file': file_obj, 1123 'stream_object': stream_object,
1134 'hash': session_hash, 1124 'hash': session_hash,
1135 }) 1125 })
1136 1126
1137 return session_data 1127 return session_data
1138 1128
1139 def getSession(self, session_hash, profile): 1129 def getSession(self, client, session_hash):
1140 """Return session data 1130 """Return session data
1141 1131
1142 @param session_hash(unicode): hash of the session 1132 @param session_hash(unicode): hash of the session
1143 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 1133 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
1144 @param profile(None, unicode): profile of the peer 1134 @param client(None, SatXMPPClient): client of the peer
1145 None is used only if profile is unknown (this is only the case 1135 None is used only if client is unknown (this is only the case
1146 for incoming request received by Socks5ServerFactory). None must 1136 for incoming request received by Socks5ServerFactory). None must
1147 only be used by Socks5ServerFactory. 1137 only be used by Socks5ServerFactory.
1148 See comments below for details 1138 See comments below for details
1149 @return (dict): session data 1139 @return (dict): session data
1150 """ 1140 """
1151 if profile is None: 1141 if client is None:
1152 try: 1142 try:
1153 profile = self.hash_profiles_map[session_hash] 1143 client = self.hash_clients_map[session_hash]
1154 except KeyError as e: 1144 except KeyError as e:
1155 log.warning(u"The requested session doesn't exists !") 1145 log.warning(u"The requested session doesn't exists !")
1156 raise e 1146 raise e
1157 client = self.host.getClient(profile)
1158 return client._s5b_sessions[session_hash] 1147 return client._s5b_sessions[session_hash]
1159 1148
1160 def registerHash(self, *args, **kwargs): 1149 def registerHash(self, *args, **kwargs):
1161 """like [_registerHash] but return the session deferred instead of the whole session 1150 """like [_registerHash] but return the session deferred instead of the whole session
1162 session deferred is fired when transfer is finished 1151 session deferred is fired when transfer is finished
1163 """ 1152 """
1164 return self._registerHash(*args, **kwargs)[DEFER_KEY] 1153 return self._registerHash(*args, **kwargs)[DEFER_KEY]
1165 1154
1166 def _registerHash(self, session_hash, file_obj, profile): 1155 def _registerHash(self, client, session_hash, stream_object):
1167 """Create a session_data associated to hash 1156 """Create a session_data associated to hash
1168 1157
1169 @param session_hash(str): hash of the session 1158 @param session_hash(str): hash of the session
1170 @param file_obj(file, None): file-like object 1159 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
1171 None if it will be filled later 1160 None if it will be filled later
1172 @param profile: %(doc_profile)s
1173 return (dict): session data 1161 return (dict): session data
1174 """ 1162 """
1175 client = self.host.getClient(profile)
1176 assert session_hash not in client._s5b_sessions 1163 assert session_hash not in client._s5b_sessions
1177 session_d = defer.Deferred() 1164 session_d = defer.Deferred()
1178 session_d.addBoth(self.killSession, session_hash, None, client) 1165 session_d.addBoth(self.killSession, session_hash, None, client)
1179 session_data = client._s5b_sessions[session_hash] = { 1166 session_data = client._s5b_sessions[session_hash] = {
1180 DEFER_KEY: session_d, 1167 DEFER_KEY: session_d,
1181 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), 1168 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1182 } 1169 }
1183 1170
1184 if file_obj is not None: 1171 if stream_object is not None:
1185 session_data['file'] = file_obj 1172 session_data['stream_object'] = stream_object
1186 1173
1187 assert session_hash not in self.hash_profiles_map 1174 assert session_hash not in self.hash_clients_map
1188 self.hash_profiles_map[session_hash] = profile 1175 self.hash_clients_map[session_hash] = client
1189 1176
1190 return session_data 1177 return session_data
1191 1178
1192 def associateFileObj(self, session_hash, file_obj, profile): 1179 def associateStreamObject(self, client, session_hash, stream_object):
1193 """Associate a file obj with a session""" 1180 """Associate a stream object with a session"""
1194 session_data = self.getSession(session_hash, profile) 1181 session_data = self.getSession(client, session_hash)
1195 assert 'file' not in session_data 1182 assert 'stream_object' not in session_data
1196 session_data['file'] = file_obj 1183 session_data['stream_object'] = stream_object
1197 1184
1198 def streamQuery(self, iq_elt, profile): 1185 def streamQuery(self, iq_elt, client):
1199 log.debug(u"BS stream query") 1186 log.debug(u"BS stream query")
1200 client = self.host.getClient(profile)
1201 1187
1202 iq_elt.handled = True 1188 iq_elt.handled = True
1203 1189
1204 query_elt = iq_elt.elements(NS_BS, 'query').next() 1190 query_elt = iq_elt.elements(NS_BS, 'query').next()
1205 try: 1191 try:
1236 candidates.append(Candidate(host, port, type_, priority, jid_)) 1222 candidates.append(Candidate(host, port, type_, priority, jid_))
1237 1223
1238 for candidate in candidates: 1224 for candidate in candidates:
1239 log.info(u"Candidate proposed: {}".format(candidate)) 1225 log.info(u"Candidate proposed: {}".format(candidate))
1240 1226
1241 d = self.getBestCandidate(candidates, session_data['hash'], profile=profile) 1227 d = self.getBestCandidate(client, candidates, session_data['hash'])
1242 d.addCallback(self._ackStream, iq_elt, session_data, client) 1228 d.addCallback(self._ackStream, iq_elt, session_data, client)
1243 1229
1244 def _ackStream(self, candidate, iq_elt, session_data, client): 1230 def _ackStream(self, candidate, iq_elt, session_data, client):
1245 if candidate is None: 1231 if candidate is None:
1246 log.info("No streamhost candidate worked, we have to end negotiation") 1232 log.info("No streamhost candidate worked, we have to end negotiation")
1260 def __init__(self, plugin_parent): 1246 def __init__(self, plugin_parent):
1261 self.plugin_parent = plugin_parent 1247 self.plugin_parent = plugin_parent
1262 self.host = plugin_parent.host 1248 self.host = plugin_parent.host
1263 1249
1264 def connectionInitialized(self): 1250 def connectionInitialized(self):
1265 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) 1251 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent)
1266 1252
1267 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 1253 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1268 return [disco.DiscoFeature(NS_BS)] 1254 return [disco.DiscoFeature(NS_BS)]
1269 1255
1270 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 1256 def getDiscoItems(self, requestor, target, nodeIdentifier=''):