Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
64 from twisted.internet import reactor | 64 from twisted.internet import reactor |
65 from twisted.internet import error as internet_error | 65 from twisted.internet import error as internet_error |
66 from twisted.words.protocols.jabber import error as jabber_error | 66 from twisted.words.protocols.jabber import error as jabber_error |
67 from twisted.words.protocols.jabber import jid | 67 from twisted.words.protocols.jabber import jid |
68 from twisted.words.protocols.jabber import xmlstream | 68 from twisted.words.protocols.jabber import xmlstream |
69 from twisted.protocols.basic import FileSender | |
70 from twisted.internet import defer | 69 from twisted.internet import defer |
71 from collections import namedtuple | 70 from collections import namedtuple |
72 import struct | 71 import struct |
73 import hashlib | 72 import hashlib |
74 import uuid | 73 import uuid |
292 @return (str): hash | 291 @return (str): hash |
293 """ | 292 """ |
294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() | 293 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() |
295 | 294 |
296 | 295 |
297 class SOCKSv5(protocol.Protocol, FileSender): | 296 class SOCKSv5(protocol.Protocol): |
298 CHUNK_SIZE = 2**16 | 297 CHUNK_SIZE = 2**16 |
299 | 298 |
300 def __init__(self, session_hash=None): | 299 def __init__(self, session_hash=None): |
301 """ | 300 """ |
302 @param session_hash(str): hash of the session | 301 @param session_hash(str): hash of the session |
315 self.supportedAddrs = [ADDR_DOMAINNAME] | 314 self.supportedAddrs = [ADDR_DOMAINNAME] |
316 self.enabledCommands = [CMD_CONNECT] | 315 self.enabledCommands = [CMD_CONNECT] |
317 self.peersock = None | 316 self.peersock = None |
318 self.addressType = 0 | 317 self.addressType = 0 |
319 self.requestType = 0 | 318 self.requestType = 0 |
320 self._file_obj = None | 319 self._stream_object = None |
321 self.active = False # set to True when protocol is actually used for transfer | 320 self.active = False # set to True when protocol is actually used for transfer |
322 # used by factories to know when the finished Deferred can be triggered | 321 # used by factories to know when the finished Deferred can be triggered |
323 | 322 |
324 @property | 323 @property |
325 def file_obj(self): | 324 def stream_object(self): |
326 if self._file_obj is None: | 325 if self._stream_object is None: |
327 self._file_obj = self.getSession()['file'] | 326 self._stream_object = self.getSession()['stream_object'] |
328 return self._file_obj | 327 if self.server_mode: |
328 self._stream_object.registerProducer(self.transport, True) | |
329 return self._stream_object | |
329 | 330 |
330 def getSession(self): | 331 def getSession(self): |
331 """Return session associated with this candidate | 332 """Return session associated with this candidate |
332 | 333 |
333 @return (dict): session data | 334 @return (dict): session data |
506 """ | 507 """ |
507 self.active = True | 508 self.active = True |
508 if chunk_size is not None: | 509 if chunk_size is not None: |
509 self.CHUNK_SIZE = chunk_size | 510 self.CHUNK_SIZE = chunk_size |
510 log.debug(u"Starting file transfer") | 511 log.debug(u"Starting file transfer") |
511 d = self.beginFileTransfer(self.file_obj, self.transport) | 512 d = self.stream_object.startStream(self.transport) |
512 d.addCallback(self.fileTransfered) | 513 d.addCallback(self.streamFinished) |
513 | 514 |
514 def fileTransfered(self, d): | 515 def streamFinished(self, d): |
515 log.info(_("File transfer completed, closing connection")) | 516 log.info(_("File transfer completed, closing connection")) |
516 self.transport.loseConnection() | 517 self.transport.loseConnection() |
517 | 518 |
518 def connectCompleted(self, remotehost, remoteport): | 519 def connectCompleted(self, remotehost, remoteport): |
519 if self.addressType == ADDR_IPV4: | 520 if self.addressType == ADDR_IPV4: |
533 return True | 534 return True |
534 | 535 |
535 def dataReceived(self, buf): | 536 def dataReceived(self, buf): |
536 if self.state == STATE_READY: | 537 if self.state == STATE_READY: |
537 # Everything is set, we just have to write the incoming data | 538 # Everything is set, we just have to write the incoming data |
538 self.file_obj.write(buf) | 539 self.stream_object.write(buf) |
539 if not self.active: | 540 if not self.active: |
540 self.active = True | 541 self.active = True |
541 self.getSession()[TIMER_KEY].cancel() | 542 self.getSession()[TIMER_KEY].cancel() |
542 return | 543 return |
543 | 544 |
574 @param parent(XEP_0065): XEP_0065 parent instance | 575 @param parent(XEP_0065): XEP_0065 parent instance |
575 """ | 576 """ |
576 self.parent = parent | 577 self.parent = parent |
577 | 578 |
578 def getSession(self, session_hash): | 579 def getSession(self, session_hash): |
579 return self.parent.getSession(session_hash, None) | 580 return self.parent.getSession(None, session_hash) |
580 | 581 |
581 def startTransfer(self, session_hash, chunk_size=None): | 582 def startTransfer(self, session_hash, chunk_size=None): |
582 session = self.getSession(session_hash) | 583 session = self.getSession(session_hash) |
583 try: | 584 try: |
584 protocol = session['protocols'][0] | 585 protocol = session['protocols'][0] |
628 | 629 |
629 | 630 |
630 class Socks5ClientFactory(protocol.ClientFactory): | 631 class Socks5ClientFactory(protocol.ClientFactory): |
631 protocol = SOCKSv5 | 632 protocol = SOCKSv5 |
632 | 633 |
633 def __init__(self, parent, session, session_hash, profile): | 634 def __init__(self, client, parent, session, session_hash): |
634 """Init the Client Factory | 635 """Init the Client Factory |
635 | 636 |
636 @param session(dict): session data | 637 @param session(dict): session data |
637 @param session_hash(unicode): hash used for peer_connection | 638 @param session_hash(unicode): hash used for peer_connection |
638 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | 639 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
639 @param profile(unciode): %(doc_profile)s | |
640 """ | 640 """ |
641 self.session = session | 641 self.session = session |
642 self.session_hash = session_hash | 642 self.session_hash = session_hash |
643 self.profile = profile | 643 self.client = client |
644 self.connection = defer.Deferred() | 644 self.connection = defer.Deferred() |
645 self._protocol_instance = None | 645 self._protocol_instance = None |
646 self.connector = None | 646 self.connector = None |
647 | 647 |
648 def discard(self): | 648 def discard(self): |
694 def __init__(self, host): | 694 def __init__(self, host): |
695 log.info(_("Plugin XEP_0065 initialization")) | 695 log.info(_("Plugin XEP_0065 initialization")) |
696 self.host = host | 696 self.host = host |
697 | 697 |
698 # session data | 698 # session data |
699 self.hash_profiles_map = {} # key: hash of the transfer session, value: session data | 699 self.hash_clients_map = {} # key: hash of the transfer session, value: session data |
700 self._cache_proxies = {} # key: server jid, value: proxy data | 700 self._cache_proxies = {} # key: server jid, value: proxy data |
701 | 701 |
702 # misc data | 702 # misc data |
703 self._server_factory = None | 703 self._server_factory = None |
704 self._external_port = None | 704 self._external_port = None |
749 | 749 |
750 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) | 750 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) |
751 return self._server_factory | 751 return self._server_factory |
752 | 752 |
753 @defer.inlineCallbacks | 753 @defer.inlineCallbacks |
754 def getProxy(self, profile): | 754 def getProxy(self, client): |
755 """Return the proxy available for this profile | 755 """Return the proxy available for this profile |
756 | 756 |
757 cache is used between profiles using the same server | 757 cache is used between clients using the same server |
758 @param profile: %(doc_profile)s | |
759 @return ((D)(ProxyInfos, None)): Found proxy infos, | 758 @return ((D)(ProxyInfos, None)): Found proxy infos, |
760 or None if not acceptable proxy is found | 759 or None if not acceptable proxy is found |
761 """ | 760 """ |
762 def notFound(server): | 761 def notFound(server): |
763 log.info(u"No proxy found on this server") | 762 log.info(u"No proxy found on this server") |
764 self._cache_proxies[server] = None | 763 self._cache_proxies[server] = None |
765 defer.returnValue(None) | 764 defer.returnValue(None) |
766 client = self.host.getClient(profile) | |
767 server = client.jid.host | 765 server = client.jid.host |
768 try: | 766 try: |
769 defer.returnValue(self._cache_proxies[server]) | 767 defer.returnValue(self._cache_proxies[server]) |
770 except KeyError: | 768 except KeyError: |
771 pass | 769 pass |
808 @param client: %(doc_client)s | 806 @param client: %(doc_client)s |
809 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data | 807 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data |
810 """ | 808 """ |
811 self.getSocks5ServerFactory() | 809 self.getSocks5ServerFactory() |
812 local_port = self._server_factory_port | 810 local_port = self._server_factory_port |
813 external_ip = yield self._ip.getExternalIP(client.profile) | 811 external_ip = yield self._ip.getExternalIP(client) |
814 local_ips = yield self._ip.getLocalIPs(client.profile) | 812 local_ips = yield self._ip.getLocalIPs(client) |
815 | 813 |
816 if external_ip is not None and self._external_port is None: | 814 if external_ip is not None and self._external_port is None: |
817 if external_ip != local_ips[0]: | 815 if external_ip != local_ips[0]: |
818 log.info(u"We are probably behind a NAT") | 816 log.info(u"We are probably behind a NAT") |
819 if self._np is None: | 817 if self._np is None: |
826 self._external_port = ext_port | 824 self._external_port = ext_port |
827 | 825 |
828 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | 826 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) |
829 | 827 |
830 @defer.inlineCallbacks | 828 @defer.inlineCallbacks |
831 def getCandidates(self, profile): | 829 def getCandidates(self, client): |
832 """Return a list of our stream candidates | 830 """Return a list of our stream candidates |
833 | 831 |
834 @param profile: %(doc_profile)s | |
835 @return (D(list[Candidate])): list of candidates, ordered by priority | 832 @return (D(list[Candidate])): list of candidates, ordered by priority |
836 """ | 833 """ |
837 client = self.host.getClient(profile) | |
838 server_factory = yield self.getSocks5ServerFactory() | 834 server_factory = yield self.getSocks5ServerFactory() |
839 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) | 835 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) |
840 proxy = yield self.getProxy(profile) | 836 proxy = yield self.getProxy(client) |
841 | 837 |
842 # its time to gather the candidates | 838 # its time to gather the candidates |
843 candidates = [] | 839 candidates = [] |
844 | 840 |
845 # first the direct ones | 841 # first the direct ones |
871 @return (D): Deferred fired when factory connection is done or has failed | 867 @return (D): Deferred fired when factory connection is done or has failed |
872 """ | 868 """ |
873 candidate.factory.connector = connector | 869 candidate.factory.connector = connector |
874 return candidate.factory.connection | 870 return candidate.factory.connection |
875 | 871 |
876 def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE): | 872 def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): |
877 """Connect to a candidate | 873 """Connect to a candidate |
878 | 874 |
879 Connection will be done with a Socks5ClientFactory | 875 Connection will be done with a Socks5ClientFactory |
880 @param candidate(Candidate): candidate to connect to | 876 @param candidate(Candidate): candidate to connect to |
881 @param session_hash(unicode): hash of the session | 877 @param session_hash(unicode): hash of the session |
885 None must be used in 2 cases: | 881 None must be used in 2 cases: |
886 - when XEP-0065 is used with XEP-0096 | 882 - when XEP-0065 is used with XEP-0096 |
887 - when a peer connect to a proxy *he proposed himself* | 883 - when a peer connect to a proxy *he proposed himself* |
888 in practice, peer_session_hash is only used by tryCandidates | 884 in practice, peer_session_hash is only used by tryCandidates |
889 @param delay(None, float): optional delay to wait before connection, in seconds | 885 @param delay(None, float): optional delay to wait before connection, in seconds |
890 @param profile: %(doc_profile)s | |
891 @return (D): Deferred launched when TCP connection + Socks5 connection is done | 886 @return (D): Deferred launched when TCP connection + Socks5 connection is done |
892 """ | 887 """ |
893 if peer_session_hash is None: | 888 if peer_session_hash is None: |
894 # for XEP-0065, only one hash is needed | 889 # for XEP-0065, only one hash is needed |
895 peer_session_hash = session_hash | 890 peer_session_hash = session_hash |
896 session = self.getSession(session_hash, profile) | 891 session = self.getSession(client, session_hash) |
897 factory = Socks5ClientFactory(self, session, peer_session_hash, profile) | 892 factory = Socks5ClientFactory(client, self, session, peer_session_hash) |
898 candidate.factory = factory | 893 candidate.factory = factory |
899 if delay is None: | 894 if delay is None: |
900 d = defer.succeed(candidate.host) | 895 d = defer.succeed(candidate.host) |
901 else: | 896 else: |
902 d = sat_defer.DelayedDeferred(delay, candidate.host) | 897 d = sat_defer.DelayedDeferred(delay, candidate.host) |
903 d.addCallback(reactor.connectTCP, candidate.port, factory) | 898 d.addCallback(reactor.connectTCP, candidate.port, factory) |
904 d.addCallback(self._addConnector, candidate) | 899 d.addCallback(self._addConnector, candidate) |
905 return d | 900 return d |
906 | 901 |
907 def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): | 902 def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): |
908 defers_list = [] | 903 defers_list = [] |
909 | 904 |
910 for candidate in candidates: | 905 for candidate in candidates: |
911 delay = CANDIDATE_DELAY * len(defers_list) | 906 delay = CANDIDATE_DELAY * len(defers_list) |
912 if candidate.type == XEP_0065.TYPE_PROXY: | 907 if candidate.type == XEP_0065.TYPE_PROXY: |
913 delay += CANDIDATE_DELAY_PROXY | 908 delay += CANDIDATE_DELAY_PROXY |
914 d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile) | 909 d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) |
915 if connection_cb is not None: | 910 if connection_cb is not None: |
916 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) | 911 d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) |
917 if connection_eb is not None: | 912 if connection_eb is not None: |
918 d.addErrback(connection_eb, candidate, profile) | 913 d.addErrback(connection_eb, client, candidate) |
919 defers_list.append(d) | 914 defers_list.append(d) |
920 | 915 |
921 return defers_list | 916 return defers_list |
922 | 917 |
923 def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE): | 918 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): |
924 """Get best candidate (according to priority) which can connect | 919 """Get best candidate (according to priority) which can connect |
925 | 920 |
926 @param candidates(iterable[Candidate]): candidates to test | 921 @param candidates(iterable[Candidate]): candidates to test |
927 @param session_hash(unicode): hash of the session | 922 @param session_hash(unicode): hash of the session |
928 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | 923 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
929 @param peer_session_hash(unicode, None): hash of the other peer | 924 @param peer_session_hash(unicode, None): hash of the other peer |
930 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates | 925 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates |
931 @param profile: %(doc_profile)s | |
932 @return (D(None, Candidate)): best candidate or None if none can connect | 926 @return (D(None, Candidate)): best candidate or None if none can connect |
933 """ | 927 """ |
934 defer_candidates = None | 928 defer_candidates = None |
935 | 929 |
936 def connectionCb(candidate, profile): | 930 def connectionCb(client, candidate): |
937 log.info(u"Connection of {} successful".format(unicode(candidate))) | 931 log.info(u"Connection of {} successful".format(unicode(candidate))) |
938 for idx, other_candidate in enumerate(candidates): | 932 for idx, other_candidate in enumerate(candidates): |
939 try: | 933 try: |
940 if other_candidate.priority < candidate.priority: | 934 if other_candidate.priority < candidate.priority: |
941 log.debug(u"Cancelling {}".format(other_candidate)) | 935 log.debug(u"Cancelling {}".format(other_candidate)) |
942 defer_candidates[idx].cancel() | 936 defer_candidates[idx].cancel() |
943 except AttributeError: | 937 except AttributeError: |
944 assert other_candidate is None | 938 assert other_candidate is None |
945 | 939 |
946 def connectionEb(failure, candidate, profile): | 940 def connectionEb(failure, client, candidate): |
947 if failure.check(defer.CancelledError): | 941 if failure.check(defer.CancelledError): |
948 log.debug(u"Connection of {} has been cancelled".format(candidate)) | 942 log.debug(u"Connection of {} has been cancelled".format(candidate)) |
949 else: | 943 else: |
950 log.info(u"Connection of {candidate} Failed: {error}".format( | 944 log.info(u"Connection of {candidate} Failed: {error}".format( |
951 candidate = candidate, | 945 candidate = candidate, |
955 def allTested(self): | 949 def allTested(self): |
956 log.debug(u"All candidates have been tested") | 950 log.debug(u"All candidates have been tested") |
957 good_candidates = [c for c in candidates if c] | 951 good_candidates = [c for c in candidates if c] |
958 return good_candidates[0] if good_candidates else None | 952 return good_candidates[0] if good_candidates else None |
959 | 953 |
960 defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile) | 954 defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) |
961 d_list = defer.DeferredList(defer_candidates) | 955 d_list = defer.DeferredList(defer_candidates) |
962 d_list.addCallback(allTested) | 956 d_list.addCallback(allTested) |
963 return d_list | 957 return d_list |
964 | 958 |
965 def _timeOut(self, session_hash, client): | 959 def _timeOut(self, session_hash, client): |
967 | 961 |
968 @param session_hash(str): hash as returned by getSessionHash | 962 @param session_hash(str): hash as returned by getSessionHash |
969 @param client: %(doc_client)s | 963 @param client: %(doc_client)s |
970 """ | 964 """ |
971 log.info(u"Socks5 Bytestream: TimeOut reached") | 965 log.info(u"Socks5 Bytestream: TimeOut reached") |
972 session = self.getSession(session_hash, client.profile) | 966 session = self.getSession(client, session_hash) |
973 session[DEFER_KEY].errback(exceptions.TimeOutError) | 967 session[DEFER_KEY].errback(exceptions.TimeOutError) |
974 | 968 |
975 def killSession(self, reason, session_hash, sid, client): | 969 def killSession(self, reason, session_hash, sid, client): |
976 """Clean the current session | 970 """Clean the current session |
977 | 971 |
987 reason='' if reason is None else reason.value, | 981 reason='' if reason is None else reason.value, |
988 id='' if sid is None else u' (id: {})'.format(sid), | 982 id='' if sid is None else u' (id: {})'.format(sid), |
989 )) | 983 )) |
990 | 984 |
991 try: | 985 try: |
992 assert self.hash_profiles_map[session_hash] == client.profile | 986 assert self.hash_clients_map[session_hash] == client |
993 del self.hash_profiles_map[session_hash] | 987 del self.hash_clients_map[session_hash] |
994 except KeyError: | 988 except KeyError: |
995 pass | 989 pass |
996 | 990 |
997 if sid is not None: | 991 if sid is not None: |
998 try: | 992 try: |
1013 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): | 1007 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): |
1014 pass | 1008 pass |
1015 | 1009 |
1016 return reason | 1010 return reason |
1017 | 1011 |
1018 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): | 1012 def startStream(self, client, stream_object, to_jid, sid): |
1019 """Launch the stream workflow | 1013 """Launch the stream workflow |
1020 | 1014 |
1021 @param file_obj: file_obj to send | 1015 @param streamProducer: stream_object to use |
1022 @param to_jid: JID of the recipient | 1016 @param to_jid: JID of the recipient |
1023 @param sid: Stream session id | 1017 @param sid: Stream session id |
1024 @param successCb: method to call when stream successfuly finished | 1018 @param successCb: method to call when stream successfuly finished |
1025 @param failureCb: method to call when something goes wrong | 1019 @param failureCb: method to call when something goes wrong |
1026 @param profile: %(doc_profile)s | |
1027 @return (D): Deferred fired when session is finished | 1020 @return (D): Deferred fired when session is finished |
1028 """ | 1021 """ |
1029 client = self.host.getClient(profile) | 1022 session_data = self._createSession(client, stream_object, to_jid, sid, True) |
1030 session_data = self._createSession(file_obj, to_jid, sid, True, client.profile) | |
1031 | 1023 |
1032 session_data[client] = client | 1024 session_data[client] = client |
1033 | 1025 |
1034 def gotCandidates(candidates): | 1026 def gotCandidates(candidates): |
1035 session_data['candidates'] = candidates | 1027 session_data['candidates'] = candidates |
1049 | 1041 |
1050 d = iq_elt.send() | 1042 d = iq_elt.send() |
1051 args = [session_data, client] | 1043 args = [session_data, client] |
1052 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) | 1044 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) |
1053 | 1045 |
1054 self.getCandidates(profile).addCallback(gotCandidates) | 1046 self.getCandidates(client).addCallback(gotCandidates) |
1055 return session_data[DEFER_KEY] | 1047 return session_data[DEFER_KEY] |
1056 | 1048 |
1057 def _IQNegotiationCb(self, iq_elt, session_data, client): | 1049 def _IQNegotiationCb(self, iq_elt, session_data, client): |
1058 """Called when the result of open iq is received | 1050 """Called when the result of open iq is received |
1059 | 1051 |
1078 else: | 1070 else: |
1079 log.info(u"Candidate choosed by target: {}".format(candidate)) | 1071 log.info(u"Candidate choosed by target: {}".format(candidate)) |
1080 | 1072 |
1081 if candidate.type == XEP_0065.TYPE_PROXY: | 1073 if candidate.type == XEP_0065.TYPE_PROXY: |
1082 log.info(u"A Socks5 proxy is used") | 1074 log.info(u"A Socks5 proxy is used") |
1083 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) | 1075 d = self.connectCandidate(client, candidate, session_data['hash']) |
1084 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) | 1076 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) |
1085 d.addErrback(self._activationEb) | 1077 d.addErrback(self._activationEb) |
1086 else: | 1078 else: |
1087 d = defer.succeed(None) | 1079 d = defer.succeed(None) |
1088 | 1080 |
1100 | 1092 |
1101 session deferred is fired when transfer is finished | 1093 session deferred is fired when transfer is finished |
1102 """ | 1094 """ |
1103 return self._createSession(*args, **kwargs)[DEFER_KEY] | 1095 return self._createSession(*args, **kwargs)[DEFER_KEY] |
1104 | 1096 |
1105 def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): | 1097 def _createSession(self, client, stream_object, to_jid, sid, requester=False): |
1106 """Called when a bytestream is imminent | 1098 """Called when a bytestream is imminent |
1107 | 1099 |
1108 @param file_obj(file): File object where data will be written | 1100 @param stream_object(iface.IStreamProducer): File object where data will be written |
1109 @param to_jid(jid.JId): jid of the other peer | 1101 @param to_jid(jid.JId): jid of the other peer |
1110 @param sid(unicode): session id | 1102 @param sid(unicode): session id |
1111 @param initiator(bool): if True, this session is create by initiator | 1103 @param initiator(bool): if True, this session is create by initiator |
1112 @param profile: %(doc_profile)s | |
1113 @return (dict): session data | 1104 @return (dict): session data |
1114 """ | 1105 """ |
1115 client = self.host.getClient(profile) | |
1116 if sid in client.xep_0065_sid_session: | 1106 if sid in client.xep_0065_sid_session: |
1117 raise exceptions.ConflictError(u'A session with this id already exists !') | 1107 raise exceptions.ConflictError(u'A session with this id already exists !') |
1118 if requester: | 1108 if requester: |
1119 session_hash = getSessionHash(client.jid, to_jid, sid) | 1109 session_hash = getSessionHash(client.jid, to_jid, sid) |
1120 session_data = self._registerHash(session_hash, file_obj, profile) | 1110 session_data = self._registerHash(client, session_hash, stream_object) |
1121 else: | 1111 else: |
1122 session_hash = getSessionHash(to_jid, client.jid, sid) | 1112 session_hash = getSessionHash(to_jid, client.jid, sid) |
1123 session_d = defer.Deferred() | 1113 session_d = defer.Deferred() |
1124 session_d.addBoth(self.killSession, session_hash, sid, client) | 1114 session_d.addBoth(self.killSession, session_hash, sid, client) |
1125 session_data = client._s5b_sessions[session_hash] = { | 1115 session_data = client._s5b_sessions[session_hash] = { |
1128 } | 1118 } |
1129 client.xep_0065_sid_session[sid] = session_data | 1119 client.xep_0065_sid_session[sid] = session_data |
1130 session_data.update( | 1120 session_data.update( |
1131 {'id': sid, | 1121 {'id': sid, |
1132 'peer_jid': to_jid, | 1122 'peer_jid': to_jid, |
1133 'file': file_obj, | 1123 'stream_object': stream_object, |
1134 'hash': session_hash, | 1124 'hash': session_hash, |
1135 }) | 1125 }) |
1136 | 1126 |
1137 return session_data | 1127 return session_data |
1138 | 1128 |
1139 def getSession(self, session_hash, profile): | 1129 def getSession(self, client, session_hash): |
1140 """Return session data | 1130 """Return session data |
1141 | 1131 |
1142 @param session_hash(unicode): hash of the session | 1132 @param session_hash(unicode): hash of the session |
1143 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | 1133 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
1144 @param profile(None, unicode): profile of the peer | 1134 @param client(None, SatXMPPClient): client of the peer |
1145 None is used only if profile is unknown (this is only the case | 1135 None is used only if client is unknown (this is only the case |
1146 for incoming request received by Socks5ServerFactory). None must | 1136 for incoming request received by Socks5ServerFactory). None must |
1147 only be used by Socks5ServerFactory. | 1137 only be used by Socks5ServerFactory. |
1148 See comments below for details | 1138 See comments below for details |
1149 @return (dict): session data | 1139 @return (dict): session data |
1150 """ | 1140 """ |
1151 if profile is None: | 1141 if client is None: |
1152 try: | 1142 try: |
1153 profile = self.hash_profiles_map[session_hash] | 1143 client = self.hash_clients_map[session_hash] |
1154 except KeyError as e: | 1144 except KeyError as e: |
1155 log.warning(u"The requested session doesn't exists !") | 1145 log.warning(u"The requested session doesn't exists !") |
1156 raise e | 1146 raise e |
1157 client = self.host.getClient(profile) | |
1158 return client._s5b_sessions[session_hash] | 1147 return client._s5b_sessions[session_hash] |
1159 | 1148 |
1160 def registerHash(self, *args, **kwargs): | 1149 def registerHash(self, *args, **kwargs): |
1161 """like [_registerHash] but return the session deferred instead of the whole session | 1150 """like [_registerHash] but return the session deferred instead of the whole session |
1162 session deferred is fired when transfer is finished | 1151 session deferred is fired when transfer is finished |
1163 """ | 1152 """ |
1164 return self._registerHash(*args, **kwargs)[DEFER_KEY] | 1153 return self._registerHash(*args, **kwargs)[DEFER_KEY] |
1165 | 1154 |
1166 def _registerHash(self, session_hash, file_obj, profile): | 1155 def _registerHash(self, client, session_hash, stream_object): |
1167 """Create a session_data associated to hash | 1156 """Create a session_data associated to hash |
1168 | 1157 |
1169 @param session_hash(str): hash of the session | 1158 @param session_hash(str): hash of the session |
1170 @param file_obj(file, None): file-like object | 1159 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object |
1171 None if it will be filled later | 1160 None if it will be filled later |
1172 @param profile: %(doc_profile)s | |
1173 return (dict): session data | 1161 return (dict): session data |
1174 """ | 1162 """ |
1175 client = self.host.getClient(profile) | |
1176 assert session_hash not in client._s5b_sessions | 1163 assert session_hash not in client._s5b_sessions |
1177 session_d = defer.Deferred() | 1164 session_d = defer.Deferred() |
1178 session_d.addBoth(self.killSession, session_hash, None, client) | 1165 session_d.addBoth(self.killSession, session_hash, None, client) |
1179 session_data = client._s5b_sessions[session_hash] = { | 1166 session_data = client._s5b_sessions[session_hash] = { |
1180 DEFER_KEY: session_d, | 1167 DEFER_KEY: session_d, |
1181 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | 1168 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), |
1182 } | 1169 } |
1183 | 1170 |
1184 if file_obj is not None: | 1171 if stream_object is not None: |
1185 session_data['file'] = file_obj | 1172 session_data['stream_object'] = stream_object |
1186 | 1173 |
1187 assert session_hash not in self.hash_profiles_map | 1174 assert session_hash not in self.hash_clients_map |
1188 self.hash_profiles_map[session_hash] = profile | 1175 self.hash_clients_map[session_hash] = client |
1189 | 1176 |
1190 return session_data | 1177 return session_data |
1191 | 1178 |
1192 def associateFileObj(self, session_hash, file_obj, profile): | 1179 def associateStreamObject(self, client, session_hash, stream_object): |
1193 """Associate a file obj with a session""" | 1180 """Associate a stream object with a session""" |
1194 session_data = self.getSession(session_hash, profile) | 1181 session_data = self.getSession(client, session_hash) |
1195 assert 'file' not in session_data | 1182 assert 'stream_object' not in session_data |
1196 session_data['file'] = file_obj | 1183 session_data['stream_object'] = stream_object |
1197 | 1184 |
1198 def streamQuery(self, iq_elt, profile): | 1185 def streamQuery(self, iq_elt, client): |
1199 log.debug(u"BS stream query") | 1186 log.debug(u"BS stream query") |
1200 client = self.host.getClient(profile) | |
1201 | 1187 |
1202 iq_elt.handled = True | 1188 iq_elt.handled = True |
1203 | 1189 |
1204 query_elt = iq_elt.elements(NS_BS, 'query').next() | 1190 query_elt = iq_elt.elements(NS_BS, 'query').next() |
1205 try: | 1191 try: |
1236 candidates.append(Candidate(host, port, type_, priority, jid_)) | 1222 candidates.append(Candidate(host, port, type_, priority, jid_)) |
1237 | 1223 |
1238 for candidate in candidates: | 1224 for candidate in candidates: |
1239 log.info(u"Candidate proposed: {}".format(candidate)) | 1225 log.info(u"Candidate proposed: {}".format(candidate)) |
1240 | 1226 |
1241 d = self.getBestCandidate(candidates, session_data['hash'], profile=profile) | 1227 d = self.getBestCandidate(client, candidates, session_data['hash']) |
1242 d.addCallback(self._ackStream, iq_elt, session_data, client) | 1228 d.addCallback(self._ackStream, iq_elt, session_data, client) |
1243 | 1229 |
1244 def _ackStream(self, candidate, iq_elt, session_data, client): | 1230 def _ackStream(self, candidate, iq_elt, session_data, client): |
1245 if candidate is None: | 1231 if candidate is None: |
1246 log.info("No streamhost candidate worked, we have to end negotiation") | 1232 log.info("No streamhost candidate worked, we have to end negotiation") |
1260 def __init__(self, plugin_parent): | 1246 def __init__(self, plugin_parent): |
1261 self.plugin_parent = plugin_parent | 1247 self.plugin_parent = plugin_parent |
1262 self.host = plugin_parent.host | 1248 self.host = plugin_parent.host |
1263 | 1249 |
1264 def connectionInitialized(self): | 1250 def connectionInitialized(self): |
1265 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) | 1251 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) |
1266 | 1252 |
1267 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 1253 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
1268 return [disco.DiscoFeature(NS_BS)] | 1254 return [disco.DiscoFeature(NS_BS)] |
1269 | 1255 |
1270 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 1256 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |