Mercurial > libervia-backend
diff src/plugins/plugin_xep_0065.py @ 1577:d04d7402b8e9
plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation:
/!\ range is not working yet
/!\ pipe plugin is broken for now
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 Nov 2015 18:19:49 +0100 |
parents | d5f59ba166fe |
children | 8cc7d83141a4 |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py Wed Nov 11 18:19:49 2015 +0100 +++ b/src/plugins/plugin_xep_0065.py Wed Nov 11 18:19:49 2015 +0100 @@ -63,13 +63,12 @@ from twisted.internet import protocol from twisted.internet import reactor from twisted.internet import error as internet_error -from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.words.protocols.jabber import error as jabber_error +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import xmlstream from twisted.protocols.basic import FileSender -from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure -from sat.core.exceptions import ProfileNotInCacheError from collections import namedtuple import struct import hashlib @@ -432,7 +431,6 @@ return None def _makeRequest(self): - # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) hash_ = self._session_hash request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) self.transport.write(request) @@ -464,13 +462,8 @@ self.loseConnection() return - # if self.factory.proxy: - # self.state = STATE_READY - # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) - # else: self.state = STATE_READY self.connection.callback(None) - # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error: # The buffer is probably not complete, we need to wait more @@ -489,9 +482,6 @@ .format(host=self.transport.getPeer().host)) return self._session_hash = addr - # self.sid, self.profile = self.factory.hash_profiles_map[addr] - # client = self.factory.host.getClient(self.profile) - # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) def startTransfer(self): @@ -546,11 +536,6 @@ def connectionLost(self, reason): log.debug(u"Socks5 connection lost: {}".format(reason.value)) - # self.transport.unregisterProducer() - # if self.peersock is not None: - # self.peersock.peersock = None - # self.peersock.transport.unregisterProducer() - # self.peersock = None if self.state != STATE_READY: self.connection.errback(reason) if self.server_mode : @@ -620,7 +605,6 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): def __init__(self, parent, session_hash, profile): """Init the Client Factory @@ -635,13 +619,6 @@ self._protocol_instance = None self.connector = None self._discarded = False - # self.data = stream_data[sid] - # self.sid = sid - # self.iq_id = iq_id - # self.activateCb = activateCb - # self.finishedCb = finishedCb - # self.proxy = proxy - # self.profile = profile def discard(self): """Disconnect the client @@ -671,7 +648,6 @@ self.getSession()[DEFER_KEY].callback(None) else: self.getSession()[DEFER_KEY].errback(reason) - # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful def buildProtocol(self, addr): log.debug(("Socks 5 client connection started")) @@ -719,7 +695,7 @@ def profileConnected(self, profile): client = self.host.getClient(profile) - client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) + client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) client._s5b_sessions = {} def getSessionHash(self, from_jid, to_jid, sid): @@ -776,7 +752,7 @@ notFound(server) iq_elt = client.IQ('get') iq_elt['to'] = proxy.full() - iq_elt.addElement('query', NS_BS) + iq_elt.addElement((NS_BS, 'query')) try: result_elt = yield iq_elt.send() @@ -914,6 +890,14 @@ return defers_list def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): + """Get best candidate (according to priority) which can connect + + @param candidates(iterable[Candidate]): candidates to test + @param session_hash(unicode): hash of the session + hash is the same as hostname computer in XEP-0065 ยง 5.3.2 #1 + @param profile: %(doc_profile)s + @return (D(None, Candidate)): best candidate or None if none can connect + """ defer_candidates = None def connectionCb(candidate, profile): @@ -947,7 +931,7 @@ def _timeOut(self, sid, client): """Delecte current_stream id, called after timeout - @param id: id of client.xep_0065_current_stream""" + @param id: id of client.xep_0065_sid_session""" log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( sid=sid, profile=client.profile)) self._killSession(sid, client, u"TIMEOUT") @@ -961,7 +945,7 @@ else, will be used to call failure_cb """ try: - session = client.xep_0065_current_stream[sid] + session = client.xep_0065_sid_session[sid] except KeyError: log.warning(_("kill id called on a non existant id")) return @@ -976,7 +960,7 @@ if session['timer'].active(): session['timer'].cancel() - del client.xep_0065_current_stream[sid] + del client.xep_0065_sid_session[sid] # FIXME: to check try: @@ -1004,105 +988,73 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s + @return (D): Deferred fired when session is finished """ client = self.host.getClient(profile) - session_data = self._createSession(file_obj, to_jid, sid, client.profile) - - session_data["to"] = to_jid - session_data["xmlstream"] = client.xmlstream - hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) + session_data = self._createSession(file_obj, to_jid, sid, True, client.profile) - self.hash_profiles_map[hash_] = (sid, profile) - - iq_elt = jabber_client.IQ(client.xmlstream, 'set') - iq_elt["from"] = client.jid.full() - iq_elt["to"] = to_jid.full() - query_elt = iq_elt.addElement('query', NS_BS) - query_elt['mode'] = 'tcp' - query_elt['sid'] = sid + session_data[client] = client - #first streamhost: direct connection - streamhost = query_elt.addElement('streamhost') - streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") - streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") - streamhost['jid'] = client.jid.full() + def gotCandidates(candidates): + session_data['candidates'] = candidates + iq_elt = client.IQ() + iq_elt["from"] = client.jid.full() + iq_elt["to"] = to_jid.full() + query_elt = iq_elt.addElement((NS_BS, 'query')) + query_elt['mode'] = 'tcp' + query_elt['sid'] = sid - #second streamhost: mediated connection, using proxy - streamhost = query_elt.addElement('streamhost') - streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) - streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) - streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + for candidate in candidates: + streamhost = query_elt.addElement('streamhost') + streamhost['host'] = candidate.host + streamhost['port'] = str(candidate.port) + streamhost['jid'] = candidate.jid.full() - iq_elt.addCallback(self._IQOpen, session_data, client) - iq_elt.send() + d = iq_elt.send() + args = [session_data, client] + d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) + + self.getCandidates(profile).addCallback(gotCandidates) return session_data[DEFER_KEY] - def _IQOpen(self, session_data, client, iq_elt): + def _IQNegotiationCb(self, iq_elt, session_data, client): """Called when the result of open iq is received @param session_data(dict): data of the session @param client: %(doc_client)s @param iq_elt(domish.Element): <iq> result """ - sid = session_data['id'] - if iq_elt["type"] == "error": - log.warning(_("Socks5 transfer failed")) - # FIXME: must clean session - return - try: - session_data = client.xep_0065_current_stream[sid] - file_obj = session_data["file_obj"] - timer = session_data["timer"] - except KeyError: - raise exceptions.InternalError - - timer.reset(TIMEOUT) - - query_elt = iq_elt.elements(NS_BS, 'query').next() - streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) - - if not streamhost_elts: - log.warning(_("No streamhost found in stream query")) + query_elt = iq_elt.elements(NS_BS, 'query').next() + streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() + except StopIteration: + log.warning(u"No streamhost found in stream query") # FIXME: must clean session return - # FIXME: must be cleaned ! - - streamhost_jid = streamhost_elts[0]['jid'] - if streamhost_jid != client.jid.full(): - log.debug(_("A proxy server is used")) - proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) - proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) - proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) - if proxy_jid != streamhost_jid: - log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) - return - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) - reactor.connectTCP(proxy_host, int(proxy_port), factory) - else: - session_data["start_transfer_cb"](file_obj) # We now activate the stream + streamhost_jid = jid.JID(streamhost_used_elt['jid']) + try: + candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() + except StopIteration: + log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) + return - def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): - log.debug(_("activating stream")) - client = self.host.getClient(profile) - session_data = client.xep_0065_current_stream[sid] + if candidate.type == XEP_0065.TYPE_PROXY: + log.info(u"A Socks5 proxy is used") + d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) + d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) + d.addErrback(self._activationEb) + else: + d = defer.succeed(None) - iq_elt = client.IQ(client.xmlstream, 'set') - iq_elt["from"] = client.jid.full() - iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) - query_elt = iq_elt.addElement('query', NS_BS) - query_elt['sid'] = sid - query_elt.addElement('activate', content=session_data['to'].full()) - iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) - iq_elt.send() + d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) - def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): - if iq_elt['type'] == 'error': - log.warning(_("Can't activate the proxy stream")) - return - else: - start_transfer_cb(file_obj) + def _activationEb(self, failure): + log.warning(u"Proxy activation error: {}".format(failure.value)) + + def _IQNegotiationEb(self, stanza_err, session_data, client): + log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition)) + # FIXME: must clean session def createSession(self, *args, **kwargs): """like [_createSession] but return the session deferred instead of the whole session @@ -1111,26 +1063,34 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, file_obj, to_jid, sid, profile): + def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): """Called when a bytestream is imminent @param file_obj(file): File object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id + @param initiator(bool): if True, this session is create by initiator @param profile: %(doc_profile)s @return (dict): session data """ client = self.host.getClient(profile) - if sid in client.xep_0065_current_stream: + if sid in client.xep_0065_sid_session: raise exceptions.ConflictError(u'A session with this id already exists !') - session_data = client.xep_0065_current_stream[sid] = \ + if requester: + session_hash = getSessionHash(client.jid, to_jid, sid) + session_data = self._registerHash(session_hash, file_obj, profile) + else: + session_hash = getSessionHash(to_jid, client.jid, sid) + session_data = client._s5b_sessions[session_hash] = { + DEFER_KEY: defer.Deferred(), + } + client.xep_0065_sid_session[sid] = session_data + session_data.update( {'id': sid, - DEFER_KEY: defer.Deferred(), - 'to': to_jid, - 'file_obj': file_obj, - 'seq': -1, # FIXME: to check - 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), - } + 'peer_jid': to_jid, + 'file': file_obj, + 'hash': session_hash, + }) return session_data @@ -1156,7 +1116,7 @@ return client._s5b_sessions[session_hash] def registerHash(self, *args, **kwargs): - """like [_registerHash] but resutrn the session deferred instead of the whole session + """like [_registerHash] but resturn the session deferred instead of the whole session session deferred is fired when transfer is finished """ return self._registerHash(*args, **kwargs)[DEFER_KEY] @@ -1195,91 +1155,62 @@ return session_data def streamQuery(self, iq_elt, profile): - """Get file using byte stream""" - log.debug(_("BS stream query")) + log.debug(u"BS stream query") client = self.host.getClient(profile) - if not client: - raise ProfileNotInCacheError - - xmlstream = client.xmlstream - iq_elt.handled = True - query_elt = iq_elt.firstChildElement() - sid = query_elt.getAttribute("sid") - streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) - if not sid in client.xep_0065_current_stream: - log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid)) - self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) - return + query_elt = iq_elt.elements(NS_BS, 'query').next() + try: + sid = query_elt['sid'] + except KeyError: + log.warning(u"Invalid bystreams request received") + return client.sendError(iq_elt, "bad-request") - client.xep_0065_current_stream[sid]['timer'].cancel() - client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) - client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream - + streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) if not streamhost_elts: - log.warning(_(u"No streamhost found in stream query %s" % sid)) - self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) - return + return client.sendError(iq_elt, "bad-request") - streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case - sh_host = streamhost_elt.getAttribute("host") - sh_port = streamhost_elt.getAttribute("port") - sh_jid = streamhost_elt.getAttribute("jid") - if not sh_host or not sh_port or not sh_jid: - log.warning(_("incomplete streamhost element")) - self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) - return + try: + session_data = client.xep_0065_sid_session[sid] + except KeyError: + log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) + return client.sendError(iq_elt, 'not-acceptable') - client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) - - log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) - reactor.connectTCP(sh_host, int(sh_port), factory) + peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) - def activateStream(self, sid, iq_id, profile): - client = self.host.getClient(profile) - log.debug(_("activating stream")) - result = domish.Element((None, 'iq')) - session_data = client.xep_0065_current_stream[sid] - result['type'] = 'result' - result['id'] = iq_id - result['from'] = session_data["to"].full() - result['to'] = session_data["from"].full() - query = result.addElement('query', NS_BS) - query['sid'] = sid - streamhost = query.addElement('streamhost-used') - streamhost['jid'] = session_data["streamhost"][2] - session_data["xmlstream"].send(result) + candidates = [] + nb_sh = len(streamhost_elts) + for idx, sh_elt in enumerate(streamhost_elts): + try: + host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) + except KeyError: + log.warning(u"malformed streamhost element") + return client.sendError(iq_elt, "bad-request") + priority = nb_sh - idx + if jid_.userhostJID() != peer_jid.userhostJID(): + type_ = XEP_0065.TYPE_PROXY + else: + type_ = XEP_0065.TYPE_DIRECT + candidates.append(Candidate(host, port, type_, priority, jid_)) - def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): - """Not acceptable error used when the stream is not expected or something is going wrong - @param iq_id: IQ id - @param to_jid: addressee - @param xmlstream: XML stream to use to send the error""" - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - error_el = result.addElement('error') - error_el['type'] = 'modify' - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable')) - xmlstream.send(result) + for candidate in candidates: + log.info(u"Candidate proposed: {}".format(candidate)) + + d = self.getBestCandidate(candidates, session_data['hash'], profile) + d.addCallback(self._ackStream, iq_elt, session_data, client) - def sendBadRequestError(self, iq_id, to_jid, xmlstream): - """Not acceptable error used when the stream is not expected or something is going wrong - @param iq_id: IQ id - @param to_jid: addressee - @param xmlstream: XML stream to use to send the error""" - result = domish.Element((None, 'iq')) - result['type'] = 'result' - result['id'] = iq_id - result['to'] = to_jid - error_el = result.addElement('error') - error_el['type'] = 'cancel' - error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request')) - xmlstream.send(result) + def _ackStream(self, candidate, iq_elt, session_data, client): + if candidate is None: + log.info("No streamhost candidate worked, we have to end negotiation") + return client.sendError(iq_elt, 'item-not-found') + log.debug(u"activating stream") + result_elt = xmlstream.toResponse(iq_elt, 'result') + query_elt = result_elt.addElement((NS_BS, 'query')) + query_elt['sid'] = session_data['id'] + streamhost_used_elt = query_elt.addElement('streamhost-used') + streamhost_used_elt['jid'] = candidate.jid.full() + client.xmlstream.send(result_elt) class XEP_0065_handler(XMPPHandler):