Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 1577:d04d7402b8e9
plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation:
/!\ range is not working yet
/!\ pipe plugin is broken for now
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 Nov 2015 18:19:49 +0100 |
parents | d5f59ba166fe |
children | 8cc7d83141a4 |
comparison
equal
deleted
inserted
replaced
1576:d5f59ba166fe | 1577:d04d7402b8e9 |
---|---|
61 from sat.core import exceptions | 61 from sat.core import exceptions |
62 from sat.tools import sat_defer | 62 from sat.tools import sat_defer |
63 from twisted.internet import protocol | 63 from twisted.internet import protocol |
64 from twisted.internet import reactor | 64 from twisted.internet import reactor |
65 from twisted.internet import error as internet_error | 65 from twisted.internet import error as internet_error |
66 from twisted.words.protocols.jabber import jid, client as jabber_client | |
67 from twisted.words.protocols.jabber import error as jabber_error | 66 from twisted.words.protocols.jabber import error as jabber_error |
67 from twisted.words.protocols.jabber import jid | |
68 from twisted.words.protocols.jabber import xmlstream | |
68 from twisted.protocols.basic import FileSender | 69 from twisted.protocols.basic import FileSender |
69 from twisted.words.xish import domish | |
70 from twisted.internet import defer | 70 from twisted.internet import defer |
71 from twisted.python import failure | 71 from twisted.python import failure |
72 from sat.core.exceptions import ProfileNotInCacheError | |
73 from collections import namedtuple | 72 from collections import namedtuple |
74 import struct | 73 import struct |
75 import hashlib | 74 import hashlib |
76 import uuid | 75 import uuid |
77 | 76 |
430 except struct.error: | 429 except struct.error: |
431 # The buffer is probably not complete, we need to wait more | 430 # The buffer is probably not complete, we need to wait more |
432 return None | 431 return None |
433 | 432 |
434 def _makeRequest(self): | 433 def _makeRequest(self): |
435 # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) | |
436 hash_ = self._session_hash | 434 hash_ = self._session_hash |
437 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) | 435 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) |
438 self.transport.write(request) | 436 self.transport.write(request) |
439 self.state = STATE_CLIENT_REQUEST | 437 self.state = STATE_CLIENT_REQUEST |
440 | 438 |
462 # Ensure reply is OK | 460 # Ensure reply is OK |
463 if rep != REPLY_SUCCESS: | 461 if rep != REPLY_SUCCESS: |
464 self.loseConnection() | 462 self.loseConnection() |
465 return | 463 return |
466 | 464 |
467 # if self.factory.proxy: | |
468 # self.state = STATE_READY | |
469 # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) | |
470 # else: | |
471 self.state = STATE_READY | 465 self.state = STATE_READY |
472 self.connection.callback(None) | 466 self.connection.callback(None) |
473 # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) | |
474 | 467 |
475 except struct.error: | 468 except struct.error: |
476 # The buffer is probably not complete, we need to wait more | 469 # The buffer is probably not complete, we need to wait more |
477 return None | 470 return None |
478 | 471 |
487 self.sendErrorReply(REPLY_CONN_REFUSED) | 480 self.sendErrorReply(REPLY_CONN_REFUSED) |
488 log.warning(u"Unexpected connection request received from {host}" | 481 log.warning(u"Unexpected connection request received from {host}" |
489 .format(host=self.transport.getPeer().host)) | 482 .format(host=self.transport.getPeer().host)) |
490 return | 483 return |
491 self._session_hash = addr | 484 self._session_hash = addr |
492 # self.sid, self.profile = self.factory.hash_profiles_map[addr] | |
493 # client = self.factory.host.getClient(self.profile) | |
494 # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer | |
495 self.connectCompleted(addr, 0) | 485 self.connectCompleted(addr, 0) |
496 | 486 |
497 def startTransfer(self): | 487 def startTransfer(self): |
498 """Callback called when the result iq is received""" | 488 """Callback called when the result iq is received""" |
499 log.debug(u"Starting file transfer") | 489 log.debug(u"Starting file transfer") |
544 else: | 534 else: |
545 self._makeRequest() | 535 self._makeRequest() |
546 | 536 |
547 def connectionLost(self, reason): | 537 def connectionLost(self, reason): |
548 log.debug(u"Socks5 connection lost: {}".format(reason.value)) | 538 log.debug(u"Socks5 connection lost: {}".format(reason.value)) |
549 # self.transport.unregisterProducer() | |
550 # if self.peersock is not None: | |
551 # self.peersock.peersock = None | |
552 # self.peersock.transport.unregisterProducer() | |
553 # self.peersock = None | |
554 if self.state != STATE_READY: | 539 if self.state != STATE_READY: |
555 self.connection.errback(reason) | 540 self.connection.errback(reason) |
556 if self.server_mode : | 541 if self.server_mode : |
557 self.factory.removeFromSession(self._session_hash, self, reason) | 542 self.factory.removeFromSession(self._session_hash, self, reason) |
558 | 543 |
618 | 603 |
619 | 604 |
620 class Socks5ClientFactory(protocol.ClientFactory): | 605 class Socks5ClientFactory(protocol.ClientFactory): |
621 protocol = SOCKSv5 | 606 protocol = SOCKSv5 |
622 | 607 |
623 # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): | |
624 def __init__(self, parent, session_hash, profile): | 608 def __init__(self, parent, session_hash, profile): |
625 """Init the Client Factory | 609 """Init the Client Factory |
626 | 610 |
627 @param session_hash(unicode): hash of the session | 611 @param session_hash(unicode): hash of the session |
628 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 612 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 |
633 self.profile = profile | 617 self.profile = profile |
634 self.connection = defer.Deferred() | 618 self.connection = defer.Deferred() |
635 self._protocol_instance = None | 619 self._protocol_instance = None |
636 self.connector = None | 620 self.connector = None |
637 self._discarded = False | 621 self._discarded = False |
638 # self.data = stream_data[sid] | |
639 # self.sid = sid | |
640 # self.iq_id = iq_id | |
641 # self.activateCb = activateCb | |
642 # self.finishedCb = finishedCb | |
643 # self.proxy = proxy | |
644 # self.profile = profile | |
645 | 622 |
646 def discard(self): | 623 def discard(self): |
647 """Disconnect the client | 624 """Disconnect the client |
648 | 625 |
649 Also set a discarded flag, which avoid to call the session Deferred | 626 Also set a discarded flag, which avoid to call the session Deferred |
669 # the Socks5 session is finished | 646 # the Socks5 session is finished |
670 if reason.check(internet_error.ConnectionDone): | 647 if reason.check(internet_error.ConnectionDone): |
671 self.getSession()[DEFER_KEY].callback(None) | 648 self.getSession()[DEFER_KEY].callback(None) |
672 else: | 649 else: |
673 self.getSession()[DEFER_KEY].errback(reason) | 650 self.getSession()[DEFER_KEY].errback(reason) |
674 # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful | |
675 | 651 |
676 def buildProtocol(self, addr): | 652 def buildProtocol(self, addr): |
677 log.debug(("Socks 5 client connection started")) | 653 log.debug(("Socks 5 client connection started")) |
678 p = self.protocol(session_hash=self.session_hash) | 654 p = self.protocol(session_hash=self.session_hash) |
679 p.factory = self | 655 p.factory = self |
717 def getHandler(self, profile): | 693 def getHandler(self, profile): |
718 return XEP_0065_handler(self) | 694 return XEP_0065_handler(self) |
719 | 695 |
720 def profileConnected(self, profile): | 696 def profileConnected(self, profile): |
721 client = self.host.getClient(profile) | 697 client = self.host.getClient(profile) |
722 client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) | 698 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) |
723 client._s5b_sessions = {} | 699 client._s5b_sessions = {} |
724 | 700 |
725 def getSessionHash(self, from_jid, to_jid, sid): | 701 def getSessionHash(self, from_jid, to_jid, sid): |
726 return getSessionHash(from_jid, to_jid, sid) | 702 return getSessionHash(from_jid, to_jid, sid) |
727 | 703 |
774 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() | 750 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() |
775 except (exceptions.CancelError, StopIteration): | 751 except (exceptions.CancelError, StopIteration): |
776 notFound(server) | 752 notFound(server) |
777 iq_elt = client.IQ('get') | 753 iq_elt = client.IQ('get') |
778 iq_elt['to'] = proxy.full() | 754 iq_elt['to'] = proxy.full() |
779 iq_elt.addElement('query', NS_BS) | 755 iq_elt.addElement((NS_BS, 'query')) |
780 | 756 |
781 try: | 757 try: |
782 result_elt = yield iq_elt.send() | 758 result_elt = yield iq_elt.send() |
783 except jabber_error.StanzaError as failure: | 759 except jabber_error.StanzaError as failure: |
784 log.warning(u"Error while requesting proxy info on {jid}: {error}" | 760 log.warning(u"Error while requesting proxy info on {jid}: {error}" |
912 defers_list.append(d) | 888 defers_list.append(d) |
913 | 889 |
914 return defers_list | 890 return defers_list |
915 | 891 |
916 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): | 892 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): |
893 """Get best candidate (according to priority) which can connect | |
894 | |
895 @param candidates(iterable[Candidate]): candidates to test | |
896 @param session_hash(unicode): hash of the session | |
897 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | |
898 @param profile: %(doc_profile)s | |
899 @return (D(None, Candidate)): best candidate or None if none can connect | |
900 """ | |
917 defer_candidates = None | 901 defer_candidates = None |
918 | 902 |
919 def connectionCb(candidate, profile): | 903 def connectionCb(candidate, profile): |
920 log.info(u"Connection of {} successful".format(unicode(candidate))) | 904 log.info(u"Connection of {} successful".format(unicode(candidate))) |
921 for idx, other_candidate in enumerate(candidates): | 905 for idx, other_candidate in enumerate(candidates): |
945 d_list.addCallback(allTested) | 929 d_list.addCallback(allTested) |
946 return d_list | 930 return d_list |
947 | 931 |
948 def _timeOut(self, sid, client): | 932 def _timeOut(self, sid, client): |
949 """Delecte current_stream id, called after timeout | 933 """Delecte current_stream id, called after timeout |
950 @param id: id of client.xep_0065_current_stream""" | 934 @param id: id of client.xep_0065_sid_session""" |
951 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( | 935 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( |
952 sid=sid, profile=client.profile)) | 936 sid=sid, profile=client.profile)) |
953 self._killSession(sid, client, u"TIMEOUT") | 937 self._killSession(sid, client, u"TIMEOUT") |
954 | 938 |
955 def _killSession(self, sid, client, failure_reason=None): | 939 def _killSession(self, sid, client, failure_reason=None): |
959 @param client: %(doc_client)s | 943 @param client: %(doc_client)s |
960 @param failure_reason(None, unicode): if None the session is successful | 944 @param failure_reason(None, unicode): if None the session is successful |
961 else, will be used to call failure_cb | 945 else, will be used to call failure_cb |
962 """ | 946 """ |
963 try: | 947 try: |
964 session = client.xep_0065_current_stream[sid] | 948 session = client.xep_0065_sid_session[sid] |
965 except KeyError: | 949 except KeyError: |
966 log.warning(_("kill id called on a non existant id")) | 950 log.warning(_("kill id called on a non existant id")) |
967 return | 951 return |
968 | 952 |
969 try: | 953 try: |
974 client.xmlstream.removeObserver(session["event_data"], observer_cb) | 958 client.xmlstream.removeObserver(session["event_data"], observer_cb) |
975 | 959 |
976 if session['timer'].active(): | 960 if session['timer'].active(): |
977 session['timer'].cancel() | 961 session['timer'].cancel() |
978 | 962 |
979 del client.xep_0065_current_stream[sid] | 963 del client.xep_0065_sid_session[sid] |
980 | 964 |
981 # FIXME: to check | 965 # FIXME: to check |
982 try: | 966 try: |
983 session_hash = session.get['hash'] | 967 session_hash = session.get['hash'] |
984 del self.hash_profiles_map[session_hash] | 968 del self.hash_profiles_map[session_hash] |
1002 @param to_jid: JID of the recipient | 986 @param to_jid: JID of the recipient |
1003 @param sid: Stream session id | 987 @param sid: Stream session id |
1004 @param successCb: method to call when stream successfuly finished | 988 @param successCb: method to call when stream successfuly finished |
1005 @param failureCb: method to call when something goes wrong | 989 @param failureCb: method to call when something goes wrong |
1006 @param profile: %(doc_profile)s | 990 @param profile: %(doc_profile)s |
991 @return (D): Deferred fired when session is finished | |
1007 """ | 992 """ |
1008 client = self.host.getClient(profile) | 993 client = self.host.getClient(profile) |
1009 session_data = self._createSession(file_obj, to_jid, sid, client.profile) | 994 session_data = self._createSession(file_obj, to_jid, sid, True, client.profile) |
1010 | 995 |
1011 session_data["to"] = to_jid | 996 session_data[client] = client |
1012 session_data["xmlstream"] = client.xmlstream | 997 |
1013 hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) | 998 def gotCandidates(candidates): |
1014 | 999 session_data['candidates'] = candidates |
1015 self.hash_profiles_map[hash_] = (sid, profile) | 1000 iq_elt = client.IQ() |
1016 | 1001 iq_elt["from"] = client.jid.full() |
1017 iq_elt = jabber_client.IQ(client.xmlstream, 'set') | 1002 iq_elt["to"] = to_jid.full() |
1018 iq_elt["from"] = client.jid.full() | 1003 query_elt = iq_elt.addElement((NS_BS, 'query')) |
1019 iq_elt["to"] = to_jid.full() | 1004 query_elt['mode'] = 'tcp' |
1020 query_elt = iq_elt.addElement('query', NS_BS) | 1005 query_elt['sid'] = sid |
1021 query_elt['mode'] = 'tcp' | 1006 |
1022 query_elt['sid'] = sid | 1007 for candidate in candidates: |
1023 | 1008 streamhost = query_elt.addElement('streamhost') |
1024 #first streamhost: direct connection | 1009 streamhost['host'] = candidate.host |
1025 streamhost = query_elt.addElement('streamhost') | 1010 streamhost['port'] = str(candidate.port) |
1026 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") | 1011 streamhost['jid'] = candidate.jid.full() |
1027 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") | 1012 |
1028 streamhost['jid'] = client.jid.full() | 1013 d = iq_elt.send() |
1029 | 1014 args = [session_data, client] |
1030 #second streamhost: mediated connection, using proxy | 1015 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) |
1031 streamhost = query_elt.addElement('streamhost') | 1016 |
1032 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | 1017 self.getCandidates(profile).addCallback(gotCandidates) |
1033 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | |
1034 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | |
1035 | |
1036 iq_elt.addCallback(self._IQOpen, session_data, client) | |
1037 iq_elt.send() | |
1038 return session_data[DEFER_KEY] | 1018 return session_data[DEFER_KEY] |
1039 | 1019 |
1040 def _IQOpen(self, session_data, client, iq_elt): | 1020 def _IQNegotiationCb(self, iq_elt, session_data, client): |
1041 """Called when the result of open iq is received | 1021 """Called when the result of open iq is received |
1042 | 1022 |
1043 @param session_data(dict): data of the session | 1023 @param session_data(dict): data of the session |
1044 @param client: %(doc_client)s | 1024 @param client: %(doc_client)s |
1045 @param iq_elt(domish.Element): <iq> result | 1025 @param iq_elt(domish.Element): <iq> result |
1046 """ | 1026 """ |
1047 sid = session_data['id'] | 1027 try: |
1048 if iq_elt["type"] == "error": | 1028 query_elt = iq_elt.elements(NS_BS, 'query').next() |
1049 log.warning(_("Socks5 transfer failed")) | 1029 streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() |
1030 except StopIteration: | |
1031 log.warning(u"No streamhost found in stream query") | |
1050 # FIXME: must clean session | 1032 # FIXME: must clean session |
1051 return | 1033 return |
1052 | 1034 |
1053 try: | 1035 streamhost_jid = jid.JID(streamhost_used_elt['jid']) |
1054 session_data = client.xep_0065_current_stream[sid] | 1036 try: |
1055 file_obj = session_data["file_obj"] | 1037 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() |
1056 timer = session_data["timer"] | 1038 except StopIteration: |
1057 except KeyError: | 1039 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) |
1058 raise exceptions.InternalError | |
1059 | |
1060 timer.reset(TIMEOUT) | |
1061 | |
1062 query_elt = iq_elt.elements(NS_BS, 'query').next() | |
1063 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) | |
1064 | |
1065 if not streamhost_elts: | |
1066 log.warning(_("No streamhost found in stream query")) | |
1067 # FIXME: must clean session | |
1068 return | 1040 return |
1069 | 1041 |
1070 # FIXME: must be cleaned ! | 1042 if candidate.type == XEP_0065.TYPE_PROXY: |
1071 | 1043 log.info(u"A Socks5 proxy is used") |
1072 streamhost_jid = streamhost_elts[0]['jid'] | 1044 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) |
1073 if streamhost_jid != client.jid.full(): | 1045 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) |
1074 log.debug(_("A proxy server is used")) | 1046 d.addErrback(self._activationEb) |
1075 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) | |
1076 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) | |
1077 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) | |
1078 if proxy_jid != streamhost_jid: | |
1079 log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) | |
1080 return | |
1081 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) | |
1082 reactor.connectTCP(proxy_host, int(proxy_port), factory) | |
1083 else: | 1047 else: |
1084 session_data["start_transfer_cb"](file_obj) # We now activate the stream | 1048 d = defer.succeed(None) |
1085 | 1049 |
1086 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): | 1050 d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) |
1087 log.debug(_("activating stream")) | 1051 |
1088 client = self.host.getClient(profile) | 1052 def _activationEb(self, failure): |
1089 session_data = client.xep_0065_current_stream[sid] | 1053 log.warning(u"Proxy activation error: {}".format(failure.value)) |
1090 | 1054 |
1091 iq_elt = client.IQ(client.xmlstream, 'set') | 1055 def _IQNegotiationEb(self, stanza_err, session_data, client): |
1092 iq_elt["from"] = client.jid.full() | 1056 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition)) |
1093 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 1057 # FIXME: must clean session |
1094 query_elt = iq_elt.addElement('query', NS_BS) | |
1095 query_elt['sid'] = sid | |
1096 query_elt.addElement('activate', content=session_data['to'].full()) | |
1097 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) | |
1098 iq_elt.send() | |
1099 | |
1100 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): | |
1101 if iq_elt['type'] == 'error': | |
1102 log.warning(_("Can't activate the proxy stream")) | |
1103 return | |
1104 else: | |
1105 start_transfer_cb(file_obj) | |
1106 | 1058 |
1107 def createSession(self, *args, **kwargs): | 1059 def createSession(self, *args, **kwargs): |
1108 """like [_createSession] but return the session deferred instead of the whole session | 1060 """like [_createSession] but return the session deferred instead of the whole session |
1109 | 1061 |
1110 session deferred is fired when transfer is finished | 1062 session deferred is fired when transfer is finished |
1111 """ | 1063 """ |
1112 return self._createSession(*args, **kwargs)[DEFER_KEY] | 1064 return self._createSession(*args, **kwargs)[DEFER_KEY] |
1113 | 1065 |
1114 def _createSession(self, file_obj, to_jid, sid, profile): | 1066 def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): |
1115 """Called when a bytestream is imminent | 1067 """Called when a bytestream is imminent |
1116 | 1068 |
1117 @param file_obj(file): File object where data will be written | 1069 @param file_obj(file): File object where data will be written |
1118 @param to_jid(jid.JId): jid of the other peer | 1070 @param to_jid(jid.JId): jid of the other peer |
1119 @param sid(unicode): session id | 1071 @param sid(unicode): session id |
1072 @param initiator(bool): if True, this session is create by initiator | |
1120 @param profile: %(doc_profile)s | 1073 @param profile: %(doc_profile)s |
1121 @return (dict): session data | 1074 @return (dict): session data |
1122 """ | 1075 """ |
1123 client = self.host.getClient(profile) | 1076 client = self.host.getClient(profile) |
1124 if sid in client.xep_0065_current_stream: | 1077 if sid in client.xep_0065_sid_session: |
1125 raise exceptions.ConflictError(u'A session with this id already exists !') | 1078 raise exceptions.ConflictError(u'A session with this id already exists !') |
1126 session_data = client.xep_0065_current_stream[sid] = \ | 1079 if requester: |
1080 session_hash = getSessionHash(client.jid, to_jid, sid) | |
1081 session_data = self._registerHash(session_hash, file_obj, profile) | |
1082 else: | |
1083 session_hash = getSessionHash(to_jid, client.jid, sid) | |
1084 session_data = client._s5b_sessions[session_hash] = { | |
1085 DEFER_KEY: defer.Deferred(), | |
1086 } | |
1087 client.xep_0065_sid_session[sid] = session_data | |
1088 session_data.update( | |
1127 {'id': sid, | 1089 {'id': sid, |
1128 DEFER_KEY: defer.Deferred(), | 1090 'peer_jid': to_jid, |
1129 'to': to_jid, | 1091 'file': file_obj, |
1130 'file_obj': file_obj, | 1092 'hash': session_hash, |
1131 'seq': -1, # FIXME: to check | 1093 }) |
1132 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), | |
1133 } | |
1134 | 1094 |
1135 return session_data | 1095 return session_data |
1136 | 1096 |
1137 def getSession(self, session_hash, profile): | 1097 def getSession(self, session_hash, profile): |
1138 """Return session data | 1098 """Return session data |
1154 raise e | 1114 raise e |
1155 client = self.host.getClient(profile) | 1115 client = self.host.getClient(profile) |
1156 return client._s5b_sessions[session_hash] | 1116 return client._s5b_sessions[session_hash] |
1157 | 1117 |
1158 def registerHash(self, *args, **kwargs): | 1118 def registerHash(self, *args, **kwargs): |
1159 """like [_registerHash] but resutrn the session deferred instead of the whole session | 1119 """like [_registerHash] but resturn the session deferred instead of the whole session |
1160 session deferred is fired when transfer is finished | 1120 session deferred is fired when transfer is finished |
1161 """ | 1121 """ |
1162 return self._registerHash(*args, **kwargs)[DEFER_KEY] | 1122 return self._registerHash(*args, **kwargs)[DEFER_KEY] |
1163 | 1123 |
1164 def _registerHash(self, session_hash, file_obj, profile): | 1124 def _registerHash(self, session_hash, file_obj, profile): |
1193 self.hash_profiles_map[session_hash] = profile | 1153 self.hash_profiles_map[session_hash] = profile |
1194 | 1154 |
1195 return session_data | 1155 return session_data |
1196 | 1156 |
1197 def streamQuery(self, iq_elt, profile): | 1157 def streamQuery(self, iq_elt, profile): |
1198 """Get file using byte stream""" | 1158 log.debug(u"BS stream query") |
1199 log.debug(_("BS stream query")) | |
1200 client = self.host.getClient(profile) | 1159 client = self.host.getClient(profile) |
1201 | 1160 |
1202 if not client: | |
1203 raise ProfileNotInCacheError | |
1204 | |
1205 xmlstream = client.xmlstream | |
1206 | |
1207 iq_elt.handled = True | 1161 iq_elt.handled = True |
1208 query_elt = iq_elt.firstChildElement() | 1162 |
1209 sid = query_elt.getAttribute("sid") | 1163 query_elt = iq_elt.elements(NS_BS, 'query').next() |
1210 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | 1164 try: |
1211 | 1165 sid = query_elt['sid'] |
1212 if not sid in client.xep_0065_current_stream: | 1166 except KeyError: |
1213 log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) | 1167 log.warning(u"Invalid bystreams request received") |
1214 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) | 1168 return client.sendError(iq_elt, "bad-request") |
1215 return | 1169 |
1216 | 1170 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) |
1217 client.xep_0065_current_stream[sid]['timer'].cancel() | |
1218 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) | |
1219 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream | |
1220 | |
1221 if not streamhost_elts: | 1171 if not streamhost_elts: |
1222 log.warning(_(u"No streamhost found in stream query %s" % sid)) | 1172 return client.sendError(iq_elt, "bad-request") |
1223 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 1173 |
1224 return | 1174 try: |
1225 | 1175 session_data = client.xep_0065_sid_session[sid] |
1226 streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case | 1176 except KeyError: |
1227 sh_host = streamhost_elt.getAttribute("host") | 1177 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) |
1228 sh_port = streamhost_elt.getAttribute("port") | 1178 return client.sendError(iq_elt, 'not-acceptable') |
1229 sh_jid = streamhost_elt.getAttribute("jid") | 1179 |
1230 if not sh_host or not sh_port or not sh_jid: | 1180 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) |
1231 log.warning(_("incomplete streamhost element")) | 1181 |
1232 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 1182 candidates = [] |
1233 return | 1183 nb_sh = len(streamhost_elts) |
1234 | 1184 for idx, sh_elt in enumerate(streamhost_elts): |
1235 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) | 1185 try: |
1236 | 1186 host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) |
1237 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) | 1187 except KeyError: |
1238 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) | 1188 log.warning(u"malformed streamhost element") |
1239 reactor.connectTCP(sh_host, int(sh_port), factory) | 1189 return client.sendError(iq_elt, "bad-request") |
1240 | 1190 priority = nb_sh - idx |
1241 def activateStream(self, sid, iq_id, profile): | 1191 if jid_.userhostJID() != peer_jid.userhostJID(): |
1242 client = self.host.getClient(profile) | 1192 type_ = XEP_0065.TYPE_PROXY |
1243 log.debug(_("activating stream")) | 1193 else: |
1244 result = domish.Element((None, 'iq')) | 1194 type_ = XEP_0065.TYPE_DIRECT |
1245 session_data = client.xep_0065_current_stream[sid] | 1195 candidates.append(Candidate(host, port, type_, priority, jid_)) |
1246 result['type'] = 'result' | 1196 |
1247 result['id'] = iq_id | 1197 for candidate in candidates: |
1248 result['from'] = session_data["to"].full() | 1198 log.info(u"Candidate proposed: {}".format(candidate)) |
1249 result['to'] = session_data["from"].full() | 1199 |
1250 query = result.addElement('query', NS_BS) | 1200 d = self.getBestCandidate(candidates, session_data['hash'], profile) |
1251 query['sid'] = sid | 1201 d.addCallback(self._ackStream, iq_elt, session_data, client) |
1252 streamhost = query.addElement('streamhost-used') | 1202 |
1253 streamhost['jid'] = session_data["streamhost"][2] | 1203 def _ackStream(self, candidate, iq_elt, session_data, client): |
1254 session_data["xmlstream"].send(result) | 1204 if candidate is None: |
1255 | 1205 log.info("No streamhost candidate worked, we have to end negotiation") |
1256 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): | 1206 return client.sendError(iq_elt, 'item-not-found') |
1257 """Not acceptable error used when the stream is not expected or something is going wrong | 1207 log.debug(u"activating stream") |
1258 @param iq_id: IQ id | 1208 result_elt = xmlstream.toResponse(iq_elt, 'result') |
1259 @param to_jid: addressee | 1209 query_elt = result_elt.addElement((NS_BS, 'query')) |
1260 @param xmlstream: XML stream to use to send the error""" | 1210 query_elt['sid'] = session_data['id'] |
1261 result = domish.Element((None, 'iq')) | 1211 streamhost_used_elt = query_elt.addElement('streamhost-used') |
1262 result['type'] = 'result' | 1212 streamhost_used_elt['jid'] = candidate.jid.full() |
1263 result['id'] = iq_id | 1213 client.xmlstream.send(result_elt) |
1264 result['to'] = to_jid | |
1265 error_el = result.addElement('error') | |
1266 error_el['type'] = 'modify' | |
1267 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) | |
1268 xmlstream.send(result) | |
1269 | |
1270 def sendBadRequestError(self, iq_id, to_jid, xmlstream): | |
1271 """Not acceptable error used when the stream is not expected or something is going wrong | |
1272 @param iq_id: IQ id | |
1273 @param to_jid: addressee | |
1274 @param xmlstream: XML stream to use to send the error""" | |
1275 result = domish.Element((None, 'iq')) | |
1276 result['type'] = 'result' | |
1277 result['id'] = iq_id | |
1278 result['to'] = to_jid | |
1279 error_el = result.addElement('error') | |
1280 error_el['type'] = 'cancel' | |
1281 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) | |
1282 xmlstream.send(result) | |
1283 | 1214 |
1284 | 1215 |
1285 class XEP_0065_handler(XMPPHandler): | 1216 class XEP_0065_handler(XMPPHandler): |
1286 implements(iwokkel.IDisco) | 1217 implements(iwokkel.IDisco) |
1287 | 1218 |