comparison src/plugins/plugin_xep_0065.py @ 1577:d04d7402b8e9

plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation: /!\ range is not working yet /!\ pipe plugin is broken for now
author Goffi <goffi@goffi.org>
date Wed, 11 Nov 2015 18:19:49 +0100
parents d5f59ba166fe
children 8cc7d83141a4
comparison
equal deleted inserted replaced
1576:d5f59ba166fe 1577:d04d7402b8e9
61 from sat.core import exceptions 61 from sat.core import exceptions
62 from sat.tools import sat_defer 62 from sat.tools import sat_defer
63 from twisted.internet import protocol 63 from twisted.internet import protocol
64 from twisted.internet import reactor 64 from twisted.internet import reactor
65 from twisted.internet import error as internet_error 65 from twisted.internet import error as internet_error
66 from twisted.words.protocols.jabber import jid, client as jabber_client
67 from twisted.words.protocols.jabber import error as jabber_error 66 from twisted.words.protocols.jabber import error as jabber_error
67 from twisted.words.protocols.jabber import jid
68 from twisted.words.protocols.jabber import xmlstream
68 from twisted.protocols.basic import FileSender 69 from twisted.protocols.basic import FileSender
69 from twisted.words.xish import domish
70 from twisted.internet import defer 70 from twisted.internet import defer
71 from twisted.python import failure 71 from twisted.python import failure
72 from sat.core.exceptions import ProfileNotInCacheError
73 from collections import namedtuple 72 from collections import namedtuple
74 import struct 73 import struct
75 import hashlib 74 import hashlib
76 import uuid 75 import uuid
77 76
430 except struct.error: 429 except struct.error:
431 # The buffer is probably not complete, we need to wait more 430 # The buffer is probably not complete, we need to wait more
432 return None 431 return None
433 432
434 def _makeRequest(self): 433 def _makeRequest(self):
435 # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
436 hash_ = self._session_hash 434 hash_ = self._session_hash
437 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) 435 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
438 self.transport.write(request) 436 self.transport.write(request)
439 self.state = STATE_CLIENT_REQUEST 437 self.state = STATE_CLIENT_REQUEST
440 438
462 # Ensure reply is OK 460 # Ensure reply is OK
463 if rep != REPLY_SUCCESS: 461 if rep != REPLY_SUCCESS:
464 self.loseConnection() 462 self.loseConnection()
465 return 463 return
466 464
467 # if self.factory.proxy:
468 # self.state = STATE_READY
469 # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
470 # else:
471 self.state = STATE_READY 465 self.state = STATE_READY
472 self.connection.callback(None) 466 self.connection.callback(None)
473 # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
474 467
475 except struct.error: 468 except struct.error:
476 # The buffer is probably not complete, we need to wait more 469 # The buffer is probably not complete, we need to wait more
477 return None 470 return None
478 471
487 self.sendErrorReply(REPLY_CONN_REFUSED) 480 self.sendErrorReply(REPLY_CONN_REFUSED)
488 log.warning(u"Unexpected connection request received from {host}" 481 log.warning(u"Unexpected connection request received from {host}"
489 .format(host=self.transport.getPeer().host)) 482 .format(host=self.transport.getPeer().host))
490 return 483 return
491 self._session_hash = addr 484 self._session_hash = addr
492 # self.sid, self.profile = self.factory.hash_profiles_map[addr]
493 # client = self.factory.host.getClient(self.profile)
494 # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
495 self.connectCompleted(addr, 0) 485 self.connectCompleted(addr, 0)
496 486
497 def startTransfer(self): 487 def startTransfer(self):
498 """Callback called when the result iq is received""" 488 """Callback called when the result iq is received"""
499 log.debug(u"Starting file transfer") 489 log.debug(u"Starting file transfer")
544 else: 534 else:
545 self._makeRequest() 535 self._makeRequest()
546 536
547 def connectionLost(self, reason): 537 def connectionLost(self, reason):
548 log.debug(u"Socks5 connection lost: {}".format(reason.value)) 538 log.debug(u"Socks5 connection lost: {}".format(reason.value))
549 # self.transport.unregisterProducer()
550 # if self.peersock is not None:
551 # self.peersock.peersock = None
552 # self.peersock.transport.unregisterProducer()
553 # self.peersock = None
554 if self.state != STATE_READY: 539 if self.state != STATE_READY:
555 self.connection.errback(reason) 540 self.connection.errback(reason)
556 if self.server_mode : 541 if self.server_mode :
557 self.factory.removeFromSession(self._session_hash, self, reason) 542 self.factory.removeFromSession(self._session_hash, self, reason)
558 543
618 603
619 604
620 class Socks5ClientFactory(protocol.ClientFactory): 605 class Socks5ClientFactory(protocol.ClientFactory):
621 protocol = SOCKSv5 606 protocol = SOCKSv5
622 607
623 # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE):
624 def __init__(self, parent, session_hash, profile): 608 def __init__(self, parent, session_hash, profile):
625 """Init the Client Factory 609 """Init the Client Factory
626 610
627 @param session_hash(unicode): hash of the session 611 @param session_hash(unicode): hash of the session
628 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 612 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
633 self.profile = profile 617 self.profile = profile
634 self.connection = defer.Deferred() 618 self.connection = defer.Deferred()
635 self._protocol_instance = None 619 self._protocol_instance = None
636 self.connector = None 620 self.connector = None
637 self._discarded = False 621 self._discarded = False
638 # self.data = stream_data[sid]
639 # self.sid = sid
640 # self.iq_id = iq_id
641 # self.activateCb = activateCb
642 # self.finishedCb = finishedCb
643 # self.proxy = proxy
644 # self.profile = profile
645 622
646 def discard(self): 623 def discard(self):
647 """Disconnect the client 624 """Disconnect the client
648 625
649 Also set a discarded flag, which avoid to call the session Deferred 626 Also set a discarded flag, which avoid to call the session Deferred
669 # the Socks5 session is finished 646 # the Socks5 session is finished
670 if reason.check(internet_error.ConnectionDone): 647 if reason.check(internet_error.ConnectionDone):
671 self.getSession()[DEFER_KEY].callback(None) 648 self.getSession()[DEFER_KEY].callback(None)
672 else: 649 else:
673 self.getSession()[DEFER_KEY].errback(reason) 650 self.getSession()[DEFER_KEY].errback(reason)
674 # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful
675 651
676 def buildProtocol(self, addr): 652 def buildProtocol(self, addr):
677 log.debug(("Socks 5 client connection started")) 653 log.debug(("Socks 5 client connection started"))
678 p = self.protocol(session_hash=self.session_hash) 654 p = self.protocol(session_hash=self.session_hash)
679 p.factory = self 655 p.factory = self
717 def getHandler(self, profile): 693 def getHandler(self, profile):
718 return XEP_0065_handler(self) 694 return XEP_0065_handler(self)
719 695
720 def profileConnected(self, profile): 696 def profileConnected(self, profile):
721 client = self.host.getClient(profile) 697 client = self.host.getClient(profile)
722 client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) 698 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict)
723 client._s5b_sessions = {} 699 client._s5b_sessions = {}
724 700
725 def getSessionHash(self, from_jid, to_jid, sid): 701 def getSessionHash(self, from_jid, to_jid, sid):
726 return getSessionHash(from_jid, to_jid, sid) 702 return getSessionHash(from_jid, to_jid, sid)
727 703
774 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() 750 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop()
775 except (exceptions.CancelError, StopIteration): 751 except (exceptions.CancelError, StopIteration):
776 notFound(server) 752 notFound(server)
777 iq_elt = client.IQ('get') 753 iq_elt = client.IQ('get')
778 iq_elt['to'] = proxy.full() 754 iq_elt['to'] = proxy.full()
779 iq_elt.addElement('query', NS_BS) 755 iq_elt.addElement((NS_BS, 'query'))
780 756
781 try: 757 try:
782 result_elt = yield iq_elt.send() 758 result_elt = yield iq_elt.send()
783 except jabber_error.StanzaError as failure: 759 except jabber_error.StanzaError as failure:
784 log.warning(u"Error while requesting proxy info on {jid}: {error}" 760 log.warning(u"Error while requesting proxy info on {jid}: {error}"
912 defers_list.append(d) 888 defers_list.append(d)
913 889
914 return defers_list 890 return defers_list
915 891
916 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): 892 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
893 """Get best candidate (according to priority) which can connect
894
895 @param candidates(iterable[Candidate]): candidates to test
896 @param session_hash(unicode): hash of the session
897 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
898 @param profile: %(doc_profile)s
899 @return (D(None, Candidate)): best candidate or None if none can connect
900 """
917 defer_candidates = None 901 defer_candidates = None
918 902
919 def connectionCb(candidate, profile): 903 def connectionCb(candidate, profile):
920 log.info(u"Connection of {} successful".format(unicode(candidate))) 904 log.info(u"Connection of {} successful".format(unicode(candidate)))
921 for idx, other_candidate in enumerate(candidates): 905 for idx, other_candidate in enumerate(candidates):
945 d_list.addCallback(allTested) 929 d_list.addCallback(allTested)
946 return d_list 930 return d_list
947 931
948 def _timeOut(self, sid, client): 932 def _timeOut(self, sid, client):
949 """Delecte current_stream id, called after timeout 933 """Delecte current_stream id, called after timeout
950 @param id: id of client.xep_0065_current_stream""" 934 @param id: id of client.xep_0065_sid_session"""
951 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( 935 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
952 sid=sid, profile=client.profile)) 936 sid=sid, profile=client.profile))
953 self._killSession(sid, client, u"TIMEOUT") 937 self._killSession(sid, client, u"TIMEOUT")
954 938
955 def _killSession(self, sid, client, failure_reason=None): 939 def _killSession(self, sid, client, failure_reason=None):
959 @param client: %(doc_client)s 943 @param client: %(doc_client)s
960 @param failure_reason(None, unicode): if None the session is successful 944 @param failure_reason(None, unicode): if None the session is successful
961 else, will be used to call failure_cb 945 else, will be used to call failure_cb
962 """ 946 """
963 try: 947 try:
964 session = client.xep_0065_current_stream[sid] 948 session = client.xep_0065_sid_session[sid]
965 except KeyError: 949 except KeyError:
966 log.warning(_("kill id called on a non existant id")) 950 log.warning(_("kill id called on a non existant id"))
967 return 951 return
968 952
969 try: 953 try:
974 client.xmlstream.removeObserver(session["event_data"], observer_cb) 958 client.xmlstream.removeObserver(session["event_data"], observer_cb)
975 959
976 if session['timer'].active(): 960 if session['timer'].active():
977 session['timer'].cancel() 961 session['timer'].cancel()
978 962
979 del client.xep_0065_current_stream[sid] 963 del client.xep_0065_sid_session[sid]
980 964
981 # FIXME: to check 965 # FIXME: to check
982 try: 966 try:
983 session_hash = session.get['hash'] 967 session_hash = session.get['hash']
984 del self.hash_profiles_map[session_hash] 968 del self.hash_profiles_map[session_hash]
1002 @param to_jid: JID of the recipient 986 @param to_jid: JID of the recipient
1003 @param sid: Stream session id 987 @param sid: Stream session id
1004 @param successCb: method to call when stream successfuly finished 988 @param successCb: method to call when stream successfuly finished
1005 @param failureCb: method to call when something goes wrong 989 @param failureCb: method to call when something goes wrong
1006 @param profile: %(doc_profile)s 990 @param profile: %(doc_profile)s
991 @return (D): Deferred fired when session is finished
1007 """ 992 """
1008 client = self.host.getClient(profile) 993 client = self.host.getClient(profile)
1009 session_data = self._createSession(file_obj, to_jid, sid, client.profile) 994 session_data = self._createSession(file_obj, to_jid, sid, True, client.profile)
1010 995
1011 session_data["to"] = to_jid 996 session_data[client] = client
1012 session_data["xmlstream"] = client.xmlstream 997
1013 hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) 998 def gotCandidates(candidates):
1014 999 session_data['candidates'] = candidates
1015 self.hash_profiles_map[hash_] = (sid, profile) 1000 iq_elt = client.IQ()
1016 1001 iq_elt["from"] = client.jid.full()
1017 iq_elt = jabber_client.IQ(client.xmlstream, 'set') 1002 iq_elt["to"] = to_jid.full()
1018 iq_elt["from"] = client.jid.full() 1003 query_elt = iq_elt.addElement((NS_BS, 'query'))
1019 iq_elt["to"] = to_jid.full() 1004 query_elt['mode'] = 'tcp'
1020 query_elt = iq_elt.addElement('query', NS_BS) 1005 query_elt['sid'] = sid
1021 query_elt['mode'] = 'tcp' 1006
1022 query_elt['sid'] = sid 1007 for candidate in candidates:
1023 1008 streamhost = query_elt.addElement('streamhost')
1024 #first streamhost: direct connection 1009 streamhost['host'] = candidate.host
1025 streamhost = query_elt.addElement('streamhost') 1010 streamhost['port'] = str(candidate.port)
1026 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") 1011 streamhost['jid'] = candidate.jid.full()
1027 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") 1012
1028 streamhost['jid'] = client.jid.full() 1013 d = iq_elt.send()
1029 1014 args = [session_data, client]
1030 #second streamhost: mediated connection, using proxy 1015 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1031 streamhost = query_elt.addElement('streamhost') 1016
1032 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) 1017 self.getCandidates(profile).addCallback(gotCandidates)
1033 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
1034 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
1035
1036 iq_elt.addCallback(self._IQOpen, session_data, client)
1037 iq_elt.send()
1038 return session_data[DEFER_KEY] 1018 return session_data[DEFER_KEY]
1039 1019
1040 def _IQOpen(self, session_data, client, iq_elt): 1020 def _IQNegotiationCb(self, iq_elt, session_data, client):
1041 """Called when the result of open iq is received 1021 """Called when the result of open iq is received
1042 1022
1043 @param session_data(dict): data of the session 1023 @param session_data(dict): data of the session
1044 @param client: %(doc_client)s 1024 @param client: %(doc_client)s
1045 @param iq_elt(domish.Element): <iq> result 1025 @param iq_elt(domish.Element): <iq> result
1046 """ 1026 """
1047 sid = session_data['id'] 1027 try:
1048 if iq_elt["type"] == "error": 1028 query_elt = iq_elt.elements(NS_BS, 'query').next()
1049 log.warning(_("Socks5 transfer failed")) 1029 streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next()
1030 except StopIteration:
1031 log.warning(u"No streamhost found in stream query")
1050 # FIXME: must clean session 1032 # FIXME: must clean session
1051 return 1033 return
1052 1034
1053 try: 1035 streamhost_jid = jid.JID(streamhost_used_elt['jid'])
1054 session_data = client.xep_0065_current_stream[sid] 1036 try:
1055 file_obj = session_data["file_obj"] 1037 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
1056 timer = session_data["timer"] 1038 except StopIteration:
1057 except KeyError: 1039 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
1058 raise exceptions.InternalError
1059
1060 timer.reset(TIMEOUT)
1061
1062 query_elt = iq_elt.elements(NS_BS, 'query').next()
1063 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used'))
1064
1065 if not streamhost_elts:
1066 log.warning(_("No streamhost found in stream query"))
1067 # FIXME: must clean session
1068 return 1040 return
1069 1041
1070 # FIXME: must be cleaned ! 1042 if candidate.type == XEP_0065.TYPE_PROXY:
1071 1043 log.info(u"A Socks5 proxy is used")
1072 streamhost_jid = streamhost_elts[0]['jid'] 1044 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile)
1073 if streamhost_jid != client.jid.full(): 1045 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
1074 log.debug(_("A proxy server is used")) 1046 d.addErrback(self._activationEb)
1075 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile)
1076 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile)
1077 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile)
1078 if proxy_jid != streamhost_jid:
1079 log.warning(_("Proxy jid is not the same as in parameters, this should not happen"))
1080 return
1081 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile)
1082 reactor.connectTCP(proxy_host, int(proxy_port), factory)
1083 else: 1047 else:
1084 session_data["start_transfer_cb"](file_obj) # We now activate the stream 1048 d = defer.succeed(None)
1085 1049
1086 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): 1050 d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash']))
1087 log.debug(_("activating stream")) 1051
1088 client = self.host.getClient(profile) 1052 def _activationEb(self, failure):
1089 session_data = client.xep_0065_current_stream[sid] 1053 log.warning(u"Proxy activation error: {}".format(failure.value))
1090 1054
1091 iq_elt = client.IQ(client.xmlstream, 'set') 1055 def _IQNegotiationEb(self, stanza_err, session_data, client):
1092 iq_elt["from"] = client.jid.full() 1056 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition))
1093 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 1057 # FIXME: must clean session
1094 query_elt = iq_elt.addElement('query', NS_BS)
1095 query_elt['sid'] = sid
1096 query_elt.addElement('activate', content=session_data['to'].full())
1097 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj'])
1098 iq_elt.send()
1099
1100 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
1101 if iq_elt['type'] == 'error':
1102 log.warning(_("Can't activate the proxy stream"))
1103 return
1104 else:
1105 start_transfer_cb(file_obj)
1106 1058
1107 def createSession(self, *args, **kwargs): 1059 def createSession(self, *args, **kwargs):
1108 """like [_createSession] but return the session deferred instead of the whole session 1060 """like [_createSession] but return the session deferred instead of the whole session
1109 1061
1110 session deferred is fired when transfer is finished 1062 session deferred is fired when transfer is finished
1111 """ 1063 """
1112 return self._createSession(*args, **kwargs)[DEFER_KEY] 1064 return self._createSession(*args, **kwargs)[DEFER_KEY]
1113 1065
1114 def _createSession(self, file_obj, to_jid, sid, profile): 1066 def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE):
1115 """Called when a bytestream is imminent 1067 """Called when a bytestream is imminent
1116 1068
1117 @param file_obj(file): File object where data will be written 1069 @param file_obj(file): File object where data will be written
1118 @param to_jid(jid.JId): jid of the other peer 1070 @param to_jid(jid.JId): jid of the other peer
1119 @param sid(unicode): session id 1071 @param sid(unicode): session id
1072 @param initiator(bool): if True, this session is create by initiator
1120 @param profile: %(doc_profile)s 1073 @param profile: %(doc_profile)s
1121 @return (dict): session data 1074 @return (dict): session data
1122 """ 1075 """
1123 client = self.host.getClient(profile) 1076 client = self.host.getClient(profile)
1124 if sid in client.xep_0065_current_stream: 1077 if sid in client.xep_0065_sid_session:
1125 raise exceptions.ConflictError(u'A session with this id already exists !') 1078 raise exceptions.ConflictError(u'A session with this id already exists !')
1126 session_data = client.xep_0065_current_stream[sid] = \ 1079 if requester:
1080 session_hash = getSessionHash(client.jid, to_jid, sid)
1081 session_data = self._registerHash(session_hash, file_obj, profile)
1082 else:
1083 session_hash = getSessionHash(to_jid, client.jid, sid)
1084 session_data = client._s5b_sessions[session_hash] = {
1085 DEFER_KEY: defer.Deferred(),
1086 }
1087 client.xep_0065_sid_session[sid] = session_data
1088 session_data.update(
1127 {'id': sid, 1089 {'id': sid,
1128 DEFER_KEY: defer.Deferred(), 1090 'peer_jid': to_jid,
1129 'to': to_jid, 1091 'file': file_obj,
1130 'file_obj': file_obj, 1092 'hash': session_hash,
1131 'seq': -1, # FIXME: to check 1093 })
1132 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
1133 }
1134 1094
1135 return session_data 1095 return session_data
1136 1096
1137 def getSession(self, session_hash, profile): 1097 def getSession(self, session_hash, profile):
1138 """Return session data 1098 """Return session data
1154 raise e 1114 raise e
1155 client = self.host.getClient(profile) 1115 client = self.host.getClient(profile)
1156 return client._s5b_sessions[session_hash] 1116 return client._s5b_sessions[session_hash]
1157 1117
1158 def registerHash(self, *args, **kwargs): 1118 def registerHash(self, *args, **kwargs):
1159 """like [_registerHash] but resutrn the session deferred instead of the whole session 1119 """like [_registerHash] but resturn the session deferred instead of the whole session
1160 session deferred is fired when transfer is finished 1120 session deferred is fired when transfer is finished
1161 """ 1121 """
1162 return self._registerHash(*args, **kwargs)[DEFER_KEY] 1122 return self._registerHash(*args, **kwargs)[DEFER_KEY]
1163 1123
1164 def _registerHash(self, session_hash, file_obj, profile): 1124 def _registerHash(self, session_hash, file_obj, profile):
1193 self.hash_profiles_map[session_hash] = profile 1153 self.hash_profiles_map[session_hash] = profile
1194 1154
1195 return session_data 1155 return session_data
1196 1156
1197 def streamQuery(self, iq_elt, profile): 1157 def streamQuery(self, iq_elt, profile):
1198 """Get file using byte stream""" 1158 log.debug(u"BS stream query")
1199 log.debug(_("BS stream query"))
1200 client = self.host.getClient(profile) 1159 client = self.host.getClient(profile)
1201 1160
1202 if not client:
1203 raise ProfileNotInCacheError
1204
1205 xmlstream = client.xmlstream
1206
1207 iq_elt.handled = True 1161 iq_elt.handled = True
1208 query_elt = iq_elt.firstChildElement() 1162
1209 sid = query_elt.getAttribute("sid") 1163 query_elt = iq_elt.elements(NS_BS, 'query').next()
1210 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) 1164 try:
1211 1165 sid = query_elt['sid']
1212 if not sid in client.xep_0065_current_stream: 1166 except KeyError:
1213 log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) 1167 log.warning(u"Invalid bystreams request received")
1214 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) 1168 return client.sendError(iq_elt, "bad-request")
1215 return 1169
1216 1170 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost'))
1217 client.xep_0065_current_stream[sid]['timer'].cancel()
1218 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"])
1219 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream
1220
1221 if not streamhost_elts: 1171 if not streamhost_elts:
1222 log.warning(_(u"No streamhost found in stream query %s" % sid)) 1172 return client.sendError(iq_elt, "bad-request")
1223 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) 1173
1224 return 1174 try:
1225 1175 session_data = client.xep_0065_sid_session[sid]
1226 streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case 1176 except KeyError:
1227 sh_host = streamhost_elt.getAttribute("host") 1177 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
1228 sh_port = streamhost_elt.getAttribute("port") 1178 return client.sendError(iq_elt, 'not-acceptable')
1229 sh_jid = streamhost_elt.getAttribute("jid") 1179
1230 if not sh_host or not sh_port or not sh_jid: 1180 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
1231 log.warning(_("incomplete streamhost element")) 1181
1232 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) 1182 candidates = []
1233 return 1183 nb_sh = len(streamhost_elts)
1234 1184 for idx, sh_elt in enumerate(streamhost_elts):
1235 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) 1185 try:
1236 1186 host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid'])
1237 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) 1187 except KeyError:
1238 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) 1188 log.warning(u"malformed streamhost element")
1239 reactor.connectTCP(sh_host, int(sh_port), factory) 1189 return client.sendError(iq_elt, "bad-request")
1240 1190 priority = nb_sh - idx
1241 def activateStream(self, sid, iq_id, profile): 1191 if jid_.userhostJID() != peer_jid.userhostJID():
1242 client = self.host.getClient(profile) 1192 type_ = XEP_0065.TYPE_PROXY
1243 log.debug(_("activating stream")) 1193 else:
1244 result = domish.Element((None, 'iq')) 1194 type_ = XEP_0065.TYPE_DIRECT
1245 session_data = client.xep_0065_current_stream[sid] 1195 candidates.append(Candidate(host, port, type_, priority, jid_))
1246 result['type'] = 'result' 1196
1247 result['id'] = iq_id 1197 for candidate in candidates:
1248 result['from'] = session_data["to"].full() 1198 log.info(u"Candidate proposed: {}".format(candidate))
1249 result['to'] = session_data["from"].full() 1199
1250 query = result.addElement('query', NS_BS) 1200 d = self.getBestCandidate(candidates, session_data['hash'], profile)
1251 query['sid'] = sid 1201 d.addCallback(self._ackStream, iq_elt, session_data, client)
1252 streamhost = query.addElement('streamhost-used') 1202
1253 streamhost['jid'] = session_data["streamhost"][2] 1203 def _ackStream(self, candidate, iq_elt, session_data, client):
1254 session_data["xmlstream"].send(result) 1204 if candidate is None:
1255 1205 log.info("No streamhost candidate worked, we have to end negotiation")
1256 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): 1206 return client.sendError(iq_elt, 'item-not-found')
1257 """Not acceptable error used when the stream is not expected or something is going wrong 1207 log.debug(u"activating stream")
1258 @param iq_id: IQ id 1208 result_elt = xmlstream.toResponse(iq_elt, 'result')
1259 @param to_jid: addressee 1209 query_elt = result_elt.addElement((NS_BS, 'query'))
1260 @param xmlstream: XML stream to use to send the error""" 1210 query_elt['sid'] = session_data['id']
1261 result = domish.Element((None, 'iq')) 1211 streamhost_used_elt = query_elt.addElement('streamhost-used')
1262 result['type'] = 'result' 1212 streamhost_used_elt['jid'] = candidate.jid.full()
1263 result['id'] = iq_id 1213 client.xmlstream.send(result_elt)
1264 result['to'] = to_jid
1265 error_el = result.addElement('error')
1266 error_el['type'] = 'modify'
1267 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable'))
1268 xmlstream.send(result)
1269
1270 def sendBadRequestError(self, iq_id, to_jid, xmlstream):
1271 """Not acceptable error used when the stream is not expected or something is going wrong
1272 @param iq_id: IQ id
1273 @param to_jid: addressee
1274 @param xmlstream: XML stream to use to send the error"""
1275 result = domish.Element((None, 'iq'))
1276 result['type'] = 'result'
1277 result['id'] = iq_id
1278 result['to'] = to_jid
1279 error_el = result.addElement('error')
1280 error_el['type'] = 'cancel'
1281 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request'))
1282 xmlstream.send(result)
1283 1214
1284 1215
1285 class XEP_0065_handler(XMPPHandler): 1216 class XEP_0065_handler(XMPPHandler):
1286 implements(iwokkel.IDisco) 1217 implements(iwokkel.IDisco)
1287 1218