Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0260.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0260.py@3900626bc100 |
children | b86912d3fd33 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0260.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,551 @@ +#!/usr/bin/env python3 + + +# SAT plugin for Jingle (XEP-0260) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from libervia.backend.core.i18n import _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger + +log = getLogger(__name__) +from libervia.backend.core import exceptions +from wokkel import disco, iwokkel +from zope.interface import implementer +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +import uuid + +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler + + +NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1" + +PLUGIN_INFO = { + C.PI_NAME: "Jingle SOCKS5 Bytestreams", + C.PI_IMPORT_NAME: "XEP-0260", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0260"], + C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"], + C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback + C.PI_MAIN: "XEP_0260", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""), +} + + +class ProxyError(Exception): + def __str__(self): + return "an error happened while trying to use the proxy" + + +class XEP_0260(object): + # TODO: udp handling + + def __init__(self, host): + log.info(_("plugin Jingle SOCKS5 Bytestreams")) + self.host = host + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream + try: + self._jingle_ibb = host.plugins["XEP-0261"] + except KeyError: + self._jingle_ibb = None + self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) + + def get_handler(self, client): + return XEP_0260_handler() + + def _parse_candidates(self, transport_elt): + """Parse <candidate> elements + + @param transport_elt(domish.Element): parent <transport> element + @return (list[plugin_xep_0065.Candidate): list of parsed candidates + """ + candidates = [] + for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"): + try: + cid = candidate_elt["cid"] + host = candidate_elt["host"] + jid_ = jid.JID(candidate_elt["jid"]) + port = int(candidate_elt.getAttribute("port", 1080)) + priority = int(candidate_elt["priority"]) + type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT) + except (KeyError, ValueError): + raise exceptions.DataError() + candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) + candidates.append(candidate) + # self._s5b.registerCandidate(candidate) + return candidates + + def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None): + """Build <transport> element with candidates + + @param session(dict): jingle session data + @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add + @param sid(unicode): transport stream id + @param client: %(doc_client)s + @param mode(str, None): 'tcp' or 'udp', or None to have no attribute + @return (domish.Element): parent <transport> element where <candidate> elements must be added + """ + proxy = next( + ( + candidate + for candidate in candidates + if candidate.type == self._s5b.TYPE_PROXY + ), + None, + ) + transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) + transport_elt["sid"] = sid + if proxy is not None: + transport_elt["dstaddr"] = session_hash + if mode is not None: + transport_elt["mode"] = "tcp" # XXX: we only manage tcp for now + + for candidate in candidates: + log.debug("Adding candidate: {}".format(candidate)) + candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B) + if candidate.id is None: + candidate.id = str(uuid.uuid4()) + candidate_elt["cid"] = candidate.id + candidate_elt["host"] = candidate.host + candidate_elt["jid"] = candidate.jid.full() + candidate_elt["port"] = str(candidate.port) + candidate_elt["priority"] = str(candidate.priority) + candidate_elt["type"] = candidate.type + return transport_elt + + @defer.inlineCallbacks + def jingle_session_init(self, client, session, content_name): + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + sid = transport_data["sid"] = str(uuid.uuid4()) + session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( + session["local_jid"], session["peer_jid"], sid + ) + transport_data["peer_session_hash"] = self._s5b.get_session_hash( + session["peer_jid"], session["local_jid"], sid + ) # requester and target are inversed for peer candidates + transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None) + candidates = transport_data["candidates"] = yield self._s5b.get_candidates( + client, session["local_jid"]) + mode = "tcp" # XXX: we only manage tcp for now + transport_elt = self._build_candidates( + session, candidates, sid, session_hash, client, mode + ) + + defer.returnValue(transport_elt) + + def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name): + """Called when activation confirmation has been received from proxy + + cf XEP-0260 § 2.4 + """ + # now that the proxy is activated, we have to inform other peer + content_data = session["contents"][content_name] + iq_elt, transport_elt = self._j.build_action( + client, self._j.A_TRANSPORT_INFO, session, content_name + ) + transport_elt["sid"] = content_data["transport_data"]["sid"] + activated_elt = transport_elt.addElement("activated") + activated_elt["cid"] = candidate.id + iq_elt.send() + + def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name): + """Called when activation error has been received from proxy + + cf XEP-0260 § 2.4 + """ + # TODO: fallback to IBB + # now that the proxy is activated, we have to inform other peer + content_data = session["contents"][content_name] + iq_elt, transport_elt = self._j.build_action( + client, self._j.A_TRANSPORT_INFO, session, content_name + ) + transport_elt["sid"] = content_data["transport_data"]["sid"] + transport_elt.addElement("proxy-error") + iq_elt.send() + log.warning( + "Can't activate proxy, we need to fallback to IBB: {reason}".format( + reason=stanza_error.value.condition + ) + ) + self.do_fallback(session, content_name, client) + + def _found_peer_candidate( + self, candidate, session, transport_data, content_name, client + ): + """Called when the best candidate from other peer is found + + @param candidate(XEP_0065.Candidate, None): selected candidate, + or None if no candidate is accessible + @param session(dict): session data + @param transport_data(dict): transport data + @param content_name(unicode): name of the current content + @param client(unicode): %(doc_client)s + """ + + content_data = session["contents"][content_name] + transport_data["best_candidate"] = candidate + # we need to disconnect all non selected candidates before removing them + for c in transport_data["peer_candidates"]: + if c is None or c is candidate: + continue + c.discard() + del transport_data["peer_candidates"] + iq_elt, transport_elt = self._j.build_action( + client, self._j.A_TRANSPORT_INFO, session, content_name + ) + transport_elt["sid"] = content_data["transport_data"]["sid"] + if candidate is None: + log.warning("Can't connect to any peer candidate") + candidate_elt = transport_elt.addElement("candidate-error") + else: + log.info("Found best peer candidate: {}".format(str(candidate))) + candidate_elt = transport_elt.addElement("candidate-used") + candidate_elt["cid"] = candidate.id + iq_elt.send() # TODO: check result stanza + self._check_candidates(session, content_name, transport_data, client) + + def _check_candidates(self, session, content_name, transport_data, client): + """Called when a candidate has been choosed + + if we have both candidates, we select one, or fallback to an other transport + @param session(dict): session data + @param content_name(unicode): name of the current content + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + content_data = session["contents"][content_name] + try: + best_candidate = transport_data["best_candidate"] + except KeyError: + # we have not our best candidate yet + return + try: + peer_best_candidate = transport_data["peer_best_candidate"] + except KeyError: + # we have not peer best candidate yet + return + + # at this point we have both candidates, it's time to choose one + if best_candidate is None or peer_best_candidate is None: + choosed_candidate = best_candidate or peer_best_candidate + else: + if best_candidate.priority == peer_best_candidate.priority: + # same priority, we choose initiator one according to XEP-0260 §2.4 #4 + log.debug( + "Candidates have same priority, we select the one choosed by initiator" + ) + if session["initiator"] == session["local_jid"]: + choosed_candidate = best_candidate + else: + choosed_candidate = peer_best_candidate + else: + choosed_candidate = max( + best_candidate, peer_best_candidate, key=lambda c: c.priority + ) + + if choosed_candidate is None: + log.warning("Socks5 negociation failed, we need to fallback to IBB") + self.do_fallback(session, content_name, client) + else: + if choosed_candidate == peer_best_candidate: + # peer_best_candidate was choosed from the candidates we have sent + # so our_candidate is true if choosed_candidate is peer_best_candidate + our_candidate = True + # than also mean that best_candidate must be discarded ! + try: + best_candidate.discard() + except AttributeError: # but it can be None + pass + else: + our_candidate = False + + log.info( + "Socks5 negociation successful, {who} candidate will be used: {candidate}".format( + who="our" if our_candidate else "other peer", + candidate=choosed_candidate, + ) + ) + del transport_data["best_candidate"] + del transport_data["peer_best_candidate"] + + if choosed_candidate.type == self._s5b.TYPE_PROXY: + # the stream transfer need to wait for proxy activation + # (see XEP-0260 § 2.4) + if our_candidate: + d = self._s5b.connect_candidate( + client, choosed_candidate, transport_data["session_hash"] + ) + d.addCallback( + lambda __: choosed_candidate.activate( + transport_data["sid"], session["peer_jid"], client + ) + ) + args = [client, choosed_candidate, session, content_name] + d.addCallbacks( + self._proxy_activated_cb, self._proxy_activated_eb, args, None, args + ) + else: + # this Deferred will be called when we'll receive activation confirmation from other peer + d = transport_data["activation_d"] = defer.Deferred() + else: + d = defer.succeed(None) + + if content_data["senders"] == session["role"]: + # we can now start the stream transfer (or start it after proxy activation) + d.addCallback( + lambda __: choosed_candidate.start_transfer( + transport_data["session_hash"] + ) + ) + d.addErrback(self._start_eb, session, content_name, client) + + def _start_eb(self, fail, session, content_name, client): + """Called when it's not possible to start the transfer + + Will try to fallback to IBB + """ + try: + reason = str(fail.value) + except AttributeError: + reason = str(fail) + log.warning("Cant start transfert, we'll try fallback method: {}".format(reason)) + self.do_fallback(session, content_name, client) + + def _candidate_info( + self, candidate_elt, session, content_name, transport_data, client + ): + """Called when best candidate has been received from peer (or if none is working) + + @param candidate_elt(domish.Element): candidate-used or candidate-error element + (see XEP-0260 §2.3) + @param session(dict): session data + @param content_name(unicode): name of the current content + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + if candidate_elt.name == "candidate-error": + # candidate-error, no candidate worked + transport_data["peer_best_candidate"] = None + else: + # candidate-used, one candidate was choosed + try: + cid = candidate_elt.attributes["cid"] + except KeyError: + log.warning("No cid found in <candidate-used>") + raise exceptions.DataError + try: + candidate = next(( + c for c in transport_data["candidates"] if c.id == cid + )) + except StopIteration: + log.warning("Given cid doesn't correspond to any known candidate !") + raise exceptions.DataError # TODO: send an error to other peer, and use better exception + except KeyError: + # a transport-info can also be intentionaly sent too early by other peer + # but there is little probability + log.error( + '"candidates" key doesn\'t exists in transport_data, it should at this point' + ) + raise exceptions.InternalError + # at this point we have the candidate choosed by other peer + transport_data["peer_best_candidate"] = candidate + log.info("Other peer best candidate: {}".format(candidate)) + + del transport_data["candidates"] + self._check_candidates(session, content_name, transport_data, client) + + def _proxy_activation_info( + self, proxy_elt, session, content_name, transport_data, client + ): + """Called when proxy has been activated (or has sent an error) + + @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element + (see XEP-0260 §2.4) + @param session(dict): session data + @param content_name(unicode): name of the current content + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + try: + activation_d = transport_data.pop("activation_d") + except KeyError: + log.warning("Received unexpected transport-info for proxy activation") + + if proxy_elt.name == "activated": + activation_d.callback(None) + else: + activation_d.errback(ProxyError()) + + @defer.inlineCallbacks + def jingle_handler(self, client, action, session, content_name, transport_elt): + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + + if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + pass + + elif action == self._j.A_SESSION_ACCEPT: + # initiator side, we select a candidate in the ones sent by responder + assert "peer_candidates" not in transport_data + transport_data["peer_candidates"] = self._parse_candidates(transport_elt) + + elif action == self._j.A_START: + session_hash = transport_data["session_hash"] + peer_candidates = transport_data["peer_candidates"] + stream_object = content_data["stream_object"] + self._s5b.associate_stream_object(client, session_hash, stream_object) + stream_d = transport_data.pop("stream_d") + stream_d.chainDeferred(content_data["finished_d"]) + peer_session_hash = transport_data["peer_session_hash"] + d = self._s5b.get_best_candidate( + client, peer_candidates, session_hash, peer_session_hash + ) + d.addCallback( + self._found_peer_candidate, session, transport_data, content_name, client + ) + + elif action == self._j.A_SESSION_INITIATE: + # responder side, we select a candidate in the ones sent by initiator + # and we give our candidates + assert "peer_candidates" not in transport_data + sid = transport_data["sid"] = transport_elt["sid"] + session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( + session["local_jid"], session["peer_jid"], sid + ) + peer_session_hash = transport_data[ + "peer_session_hash" + ] = self._s5b.get_session_hash( + session["peer_jid"], session["local_jid"], sid + ) # requester and target are inversed for peer candidates + peer_candidates = transport_data["peer_candidates"] = self._parse_candidates( + transport_elt + ) + stream_object = content_data["stream_object"] + stream_d = self._s5b.register_hash(client, session_hash, stream_object) + stream_d.chainDeferred(content_data["finished_d"]) + d = self._s5b.get_best_candidate( + client, peer_candidates, session_hash, peer_session_hash + ) + d.addCallback( + self._found_peer_candidate, session, transport_data, content_name, client + ) + candidates = yield self._s5b.get_candidates(client, session["local_jid"]) + # we remove duplicate candidates + candidates = [ + candidate for candidate in candidates if candidate not in peer_candidates + ] + + transport_data["candidates"] = candidates + # we can now build a new <transport> element with our candidates + transport_elt = self._build_candidates( + session, candidates, sid, session_hash, client + ) + + elif action == self._j.A_TRANSPORT_INFO: + # transport-info can be about candidate or proxy activation + candidate_elt = None + + for method, names in ( + (self._candidate_info, ("candidate-used", "candidate-error")), + (self._proxy_activation_info, ("activated", "proxy-error")), + ): + for name in names: + try: + candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name)) + except StopIteration: + continue + else: + method( + candidate_elt, session, content_name, transport_data, client + ) + break + + if candidate_elt is None: + log.warning( + "Unexpected transport element: {}".format(transport_elt.toXml()) + ) + elif action == self._j.A_DESTROY: + # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session. + # note that sid argument is not necessary for sessions created by this plugin + self._s5b.kill_session(None, transport_data["session_hash"], None, client) + else: + log.warning("FIXME: unmanaged action {}".format(action)) + + defer.returnValue(transport_elt) + + def jingle_terminate(self, client, action, session, content_name, reason_elt): + if reason_elt.decline: + log.debug("Session declined, deleting S5B session") + # we just need to clean the S5B session if it is declined + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + self._s5b.kill_session(None, transport_data["session_hash"], None, client) + + def _do_fallback(self, feature_checked, session, content_name, client): + """Do the fallback, method called once feature is checked + + @param feature_checked(bool): True if other peer can do IBB + """ + if not feature_checked: + log.warning( + "Other peer can't manage jingle IBB, be have to terminate the session" + ) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) + else: + self._j.transport_replace( + client, self._jingle_ibb.NAMESPACE, session, content_name + ) + + def do_fallback(self, session, content_name, client): + """Fallback to IBB transport, used in last resort + + @param session(dict): session data + @param content_name(unicode): name of the current content + @param client(unicode): %(doc_client)s + """ + if session["role"] != self._j.ROLE_INITIATOR: + # only initiator must do the fallback, see XEP-0260 §3 + return + if self._jingle_ibb is None: + log.warning( + "Jingle IBB (XEP-0261) plugin is not available, we have to close the session" + ) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) + else: + d = self.host.hasFeature( + client, self._jingle_ibb.NAMESPACE, session["peer_jid"] + ) + d.addCallback(self._do_fallback, session, content_name, client) + return d + + +@implementer(iwokkel.IDisco) +class XEP_0260_handler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_S5B)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []