Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0260.py @ 4236:f59e9421a650
test (unit/cli): Add a file send/receive test for WebRTC:
fix 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 15:21:00 +0200 |
parents | e11b13418ba6 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for Jingle (XEP-0260) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler log = getLogger(__name__) from libervia.backend.core import exceptions from wokkel import disco, iwokkel from zope.interface import implementer from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.internet import defer import uuid try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1" PLUGIN_INFO = { C.PI_NAME: "Jingle SOCKS5 Bytestreams", C.PI_IMPORT_NAME: "XEP-0260", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0260"], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"], C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback C.PI_MAIN: "XEP_0260", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""), } class ProxyError(Exception): def __str__(self): return "an error happened while trying to use the proxy" class XEP_0260(BaseTransportHandler): # TODO: udp handling def __init__(self, host): log.info(_("plugin Jingle SOCKS5 Bytestreams")) self.host = host self._j = host.plugins["XEP-0166"] # shortcut to access jingle self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream try: self._jingle_ibb = host.plugins["XEP-0261"] except KeyError: self._jingle_ibb = None self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) def get_handler(self, client): return XEP_0260_handler() def _parse_candidates(self, transport_elt): """Parse <candidate> elements @param transport_elt(domish.Element): parent <transport> element @return (list[plugin_xep_0065.Candidate): list of parsed candidates """ candidates = [] for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"): try: cid = candidate_elt["cid"] host = candidate_elt["host"] jid_ = jid.JID(candidate_elt["jid"]) port = int(candidate_elt.getAttribute("port", 1080)) priority = int(candidate_elt["priority"]) type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT) except (KeyError, ValueError): raise exceptions.DataError() candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) candidates.append(candidate) # self._s5b.registerCandidate(candidate) return candidates def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None): """Build <transport> element with candidates @param session(dict): jingle session data @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add @param sid(unicode): transport stream id @param client: %(doc_client)s @param mode(str, None): 'tcp' or 'udp', or None to have no attribute @return (domish.Element): parent <transport> element where <candidate> elements must be added """ proxy = next( ( candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY ), None, ) transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) transport_elt["sid"] = sid if proxy is not None: transport_elt["dstaddr"] = session_hash if mode is not None: transport_elt["mode"] = "tcp" # XXX: we only manage tcp for now for candidate in candidates: log.debug("Adding candidate: {}".format(candidate)) candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B) if candidate.id is None: candidate.id = str(uuid.uuid4()) candidate_elt["cid"] = candidate.id candidate_elt["host"] = candidate.host candidate_elt["jid"] = candidate.jid.full() candidate_elt["port"] = str(candidate.port) candidate_elt["priority"] = str(candidate.priority) candidate_elt["type"] = candidate.type return transport_elt async def jingle_session_init(self, client, session, content_name): content_data = session["contents"][content_name] transport_data = content_data["transport_data"] sid = transport_data["sid"] = str(uuid.uuid4()) session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( session["local_jid"], session["peer_jid"], sid ) transport_data["peer_session_hash"] = self._s5b.get_session_hash( session["peer_jid"], session["local_jid"], sid ) # requester and target are inversed for peer candidates transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None) candidates = transport_data["candidates"] = await self._s5b.get_candidates( client, session["local_jid"]) mode = "tcp" # XXX: we only manage tcp for now transport_elt = self._build_candidates( session, candidates, sid, session_hash, client, mode ) return transport_elt def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name): """Called when activation confirmation has been received from proxy cf XEP-0260 § 2.4 """ # now that the proxy is activated, we have to inform other peer content_data = session["contents"][content_name] iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) transport_elt["sid"] = content_data["transport_data"]["sid"] activated_elt = transport_elt.addElement("activated") activated_elt["cid"] = candidate.id iq_elt.send() def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name): """Called when activation error has been received from proxy cf XEP-0260 § 2.4 """ # TODO: fallback to IBB # now that the proxy is activated, we have to inform other peer content_data = session["contents"][content_name] iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) transport_elt["sid"] = content_data["transport_data"]["sid"] transport_elt.addElement("proxy-error") iq_elt.send() log.warning( "Can't activate proxy, we need to fallback to IBB: {reason}".format( reason=stanza_error.value.condition ) ) self.do_fallback(session, content_name, client) def _found_peer_candidate( self, candidate, session, transport_data, content_name, client ): """Called when the best candidate from other peer is found @param candidate(XEP_0065.Candidate, None): selected candidate, or None if no candidate is accessible @param session(dict): session data @param transport_data(dict): transport data @param content_name(unicode): name of the current content @param client(unicode): %(doc_client)s """ content_data = session["contents"][content_name] transport_data["best_candidate"] = candidate # we need to disconnect all non selected candidates before removing them for c in transport_data["peer_candidates"]: if c is None or c is candidate: continue c.discard() del transport_data["peer_candidates"] iq_elt, transport_elt = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name ) transport_elt["sid"] = content_data["transport_data"]["sid"] if candidate is None: log.warning("Can't connect to any peer candidate") candidate_elt = transport_elt.addElement("candidate-error") else: log.info("Found best peer candidate: {}".format(str(candidate))) candidate_elt = transport_elt.addElement("candidate-used") candidate_elt["cid"] = candidate.id iq_elt.send() # TODO: check result stanza self._check_candidates(session, content_name, transport_data, client) def _check_candidates(self, session, content_name, transport_data, client): """Called when a candidate has been choosed if we have both candidates, we select one, or fallback to an other transport @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ content_data = session["contents"][content_name] try: best_candidate = transport_data["best_candidate"] except KeyError: # we have not our best candidate yet return try: peer_best_candidate = transport_data["peer_best_candidate"] except KeyError: # we have not peer best candidate yet return # at this point we have both candidates, it's time to choose one if best_candidate is None or peer_best_candidate is None: choosed_candidate = best_candidate or peer_best_candidate else: if best_candidate.priority == peer_best_candidate.priority: # same priority, we choose initiator one according to XEP-0260 §2.4 #4 log.debug( "Candidates have same priority, we select the one choosed by initiator" ) if session["initiator"] == session["local_jid"]: choosed_candidate = best_candidate else: choosed_candidate = peer_best_candidate else: choosed_candidate = max( best_candidate, peer_best_candidate, key=lambda c: c.priority ) if choosed_candidate is None: log.warning("Socks5 negociation failed, we need to fallback to IBB") self.do_fallback(session, content_name, client) else: if choosed_candidate == peer_best_candidate: # peer_best_candidate was choosed from the candidates we have sent # so our_candidate is true if choosed_candidate is peer_best_candidate our_candidate = True # than also mean that best_candidate must be discarded ! try: best_candidate.discard() except AttributeError: # but it can be None pass else: our_candidate = False log.info( "Socks5 negociation successful, {who} candidate will be used: {candidate}".format( who="our" if our_candidate else "other peer", candidate=choosed_candidate, ) ) del transport_data["best_candidate"] del transport_data["peer_best_candidate"] if choosed_candidate.type == self._s5b.TYPE_PROXY: # the stream transfer need to wait for proxy activation # (see XEP-0260 § 2.4) if our_candidate: d = self._s5b.connect_candidate( client, choosed_candidate, transport_data["session_hash"] ) d.addCallback( lambda __: choosed_candidate.activate( transport_data["sid"], session["peer_jid"], client ) ) args = [client, choosed_candidate, session, content_name] d.addCallbacks( self._proxy_activated_cb, self._proxy_activated_eb, args, None, args ) else: # this Deferred will be called when we'll receive activation confirmation from other peer d = transport_data["activation_d"] = defer.Deferred() else: d = defer.succeed(None) if content_data["senders"] == session["role"]: # we can now start the stream transfer (or start it after proxy activation) d.addCallback( lambda __: choosed_candidate.start_transfer( transport_data["session_hash"] ) ) d.addErrback(self._start_eb, session, content_name, client) def _start_eb(self, fail, session, content_name, client): """Called when it's not possible to start the transfer Will try to fallback to IBB """ try: reason = str(fail.value) except AttributeError: reason = str(fail) log.warning("Cant start transfert, we'll try fallback method: {}".format(reason)) self.do_fallback(session, content_name, client) def _candidate_info( self, candidate_elt, session, content_name, transport_data, client ): """Called when best candidate has been received from peer (or if none is working) @param candidate_elt(domish.Element): candidate-used or candidate-error element (see XEP-0260 §2.3) @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ if candidate_elt.name == "candidate-error": # candidate-error, no candidate worked transport_data["peer_best_candidate"] = None else: # candidate-used, one candidate was choosed try: cid = candidate_elt.attributes["cid"] except KeyError: log.warning("No cid found in <candidate-used>") raise exceptions.DataError try: candidate = next(( c for c in transport_data["candidates"] if c.id == cid )) except StopIteration: log.warning("Given cid doesn't correspond to any known candidate !") raise exceptions.DataError # TODO: send an error to other peer, and use better exception except KeyError: # a transport-info can also be intentionaly sent too early by other peer # but there is little probability log.error( '"candidates" key doesn\'t exists in transport_data, it should at this point' ) raise exceptions.InternalError # at this point we have the candidate choosed by other peer transport_data["peer_best_candidate"] = candidate log.info("Other peer best candidate: {}".format(candidate)) del transport_data["candidates"] self._check_candidates(session, content_name, transport_data, client) def _proxy_activation_info( self, proxy_elt, session, content_name, transport_data, client ): """Called when proxy has been activated (or has sent an error) @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element (see XEP-0260 §2.4) @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ try: activation_d = transport_data.pop("activation_d") except KeyError: log.warning("Received unexpected transport-info for proxy activation") if proxy_elt.name == "activated": activation_d.callback(None) else: activation_d.errback(ProxyError()) async def jingle_handler(self, client, action, session, content_name, transport_elt): content_data = session["contents"][content_name] transport_data = content_data["transport_data"] if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): pass elif action == self._j.A_SESSION_ACCEPT: # initiator side, we select a candidate in the ones sent by responder assert "peer_candidates" not in transport_data transport_data["peer_candidates"] = self._parse_candidates(transport_elt) elif action == self._j.A_START: session_hash = transport_data["session_hash"] peer_candidates = transport_data["peer_candidates"] stream_object = content_data["stream_object"] self._s5b.associate_stream_object(client, session_hash, stream_object) stream_d = transport_data.pop("stream_d") stream_d.chainDeferred(content_data["finished_d"]) peer_session_hash = transport_data["peer_session_hash"] d = self._s5b.get_best_candidate( client, peer_candidates, session_hash, peer_session_hash ) d.addCallback( self._found_peer_candidate, session, transport_data, content_name, client ) elif action == self._j.A_SESSION_INITIATE: # responder side, we select a candidate in the ones sent by initiator # and we give our candidates assert "peer_candidates" not in transport_data sid = transport_data["sid"] = transport_elt["sid"] session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( session["local_jid"], session["peer_jid"], sid ) peer_session_hash = transport_data[ "peer_session_hash" ] = self._s5b.get_session_hash( session["peer_jid"], session["local_jid"], sid ) # requester and target are inversed for peer candidates peer_candidates = transport_data["peer_candidates"] = self._parse_candidates( transport_elt ) stream_object = content_data["stream_object"] stream_d = self._s5b.register_hash(client, session_hash, stream_object) stream_d.chainDeferred(content_data["finished_d"]) d = self._s5b.get_best_candidate( client, peer_candidates, session_hash, peer_session_hash ) d.addCallback( self._found_peer_candidate, session, transport_data, content_name, client ) candidates = await self._s5b.get_candidates(client, session["local_jid"]) # we remove duplicate candidates candidates = [ candidate for candidate in candidates if candidate not in peer_candidates ] transport_data["candidates"] = candidates # we can now build a new <transport> element with our candidates transport_elt = self._build_candidates( session, candidates, sid, session_hash, client ) elif action == self._j.A_TRANSPORT_INFO: # transport-info can be about candidate or proxy activation candidate_elt = None for method, names in ( (self._candidate_info, ("candidate-used", "candidate-error")), (self._proxy_activation_info, ("activated", "proxy-error")), ): for name in names: try: candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name)) except StopIteration: continue else: method( candidate_elt, session, content_name, transport_data, client ) break if candidate_elt is None: log.warning( "Unexpected transport element: {}".format(transport_elt.toXml()) ) elif action == self._j.A_DESTROY: # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session. # note that sid argument is not necessary for sessions created by this plugin self._s5b.kill_session(None, transport_data["session_hash"], None, client) else: log.warning("FIXME: unmanaged action {}".format(action)) return transport_elt def jingle_terminate(self, client, action, session, content_name, reason_elt): if reason_elt.decline: log.debug("Session declined, deleting S5B session") # we just need to clean the S5B session if it is declined content_data = session["contents"][content_name] transport_data = content_data["transport_data"] self._s5b.kill_session(None, transport_data["session_hash"], None, client) def _do_fallback(self, feature_checked, session, content_name, client): """Do the fallback, method called once feature is checked @param feature_checked(bool): True if other peer can do IBB """ if not feature_checked: log.warning( "Other peer can't manage jingle IBB, be have to terminate the session" ) self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: self._j.transport_replace( client, self._jingle_ibb.NAMESPACE, session, content_name ) def do_fallback(self, session, content_name, client): """Fallback to IBB transport, used in last resort @param session(dict): session data @param content_name(unicode): name of the current content @param client(unicode): %(doc_client)s """ if session["role"] != self._j.ROLE_INITIATOR: # only initiator must do the fallback, see XEP-0260 §3 return if self._jingle_ibb is None: log.warning( "Jingle IBB (XEP-0261) plugin is not available, we have to close the session" ) self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: d = self.host.hasFeature( client, self._jingle_ibb.NAMESPACE, session["peer_jid"] ) d.addCallback(self._do_fallback, session, content_name, client) return d @implementer(iwokkel.IDisco) class XEP_0260_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_S5B)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []