diff libervia/backend/plugins/plugin_xep_0260.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0260.py@3900626bc100
children b86912d3fd33
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0260.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,551 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for Jingle (XEP-0260)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from libervia.backend.core.i18n import _
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from libervia.backend.core import exceptions
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid
+from twisted.internet import defer
+import uuid
+
+try:
+    from twisted.words.protocols.xmlstream import XMPPHandler
+except ImportError:
+    from wokkel.subprotocols import XMPPHandler
+
+
+NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle SOCKS5 Bytestreams",
+    C.PI_IMPORT_NAME: "XEP-0260",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0260"],
+    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"],
+    C.PI_RECOMMENDATIONS: ["XEP-0261"],  # needed for fallback
+    C.PI_MAIN: "XEP_0260",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""),
+}
+
+
+class ProxyError(Exception):
+    def __str__(self):
+        return "an error happened while trying to use the proxy"
+
+
+class XEP_0260(object):
+    # TODO: udp handling
+
+    def __init__(self, host):
+        log.info(_("plugin Jingle SOCKS5 Bytestreams"))
+        self.host = host
+        self._j = host.plugins["XEP-0166"]  # shortcut to access jingle
+        self._s5b = host.plugins["XEP-0065"]  # and socks5 bytestream
+        try:
+            self._jingle_ibb = host.plugins["XEP-0261"]
+        except KeyError:
+            self._jingle_ibb = None
+        self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)
+
+    def get_handler(self, client):
+        return XEP_0260_handler()
+
+    def _parse_candidates(self, transport_elt):
+        """Parse <candidate> elements
+
+        @param transport_elt(domish.Element): parent <transport> element
+        @return (list[plugin_xep_0065.Candidate): list of parsed candidates
+        """
+        candidates = []
+        for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"):
+            try:
+                cid = candidate_elt["cid"]
+                host = candidate_elt["host"]
+                jid_ = jid.JID(candidate_elt["jid"])
+                port = int(candidate_elt.getAttribute("port", 1080))
+                priority = int(candidate_elt["priority"])
+                type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT)
+            except (KeyError, ValueError):
+                raise exceptions.DataError()
+            candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
+            candidates.append(candidate)
+            # self._s5b.registerCandidate(candidate)
+        return candidates
+
+    def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None):
+        """Build <transport> element with candidates
+
+        @param session(dict): jingle session data
+        @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
+        @param sid(unicode): transport stream id
+        @param client: %(doc_client)s
+        @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
+        @return (domish.Element): parent <transport> element where <candidate> elements must be added
+        """
+        proxy = next(
+            (
+                candidate
+                for candidate in candidates
+                if candidate.type == self._s5b.TYPE_PROXY
+            ),
+            None,
+        )
+        transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
+        transport_elt["sid"] = sid
+        if proxy is not None:
+            transport_elt["dstaddr"] = session_hash
+        if mode is not None:
+            transport_elt["mode"] = "tcp"  # XXX: we only manage tcp for now
+
+        for candidate in candidates:
+            log.debug("Adding candidate: {}".format(candidate))
+            candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B)
+            if candidate.id is None:
+                candidate.id = str(uuid.uuid4())
+            candidate_elt["cid"] = candidate.id
+            candidate_elt["host"] = candidate.host
+            candidate_elt["jid"] = candidate.jid.full()
+            candidate_elt["port"] = str(candidate.port)
+            candidate_elt["priority"] = str(candidate.priority)
+            candidate_elt["type"] = candidate.type
+        return transport_elt
+
+    @defer.inlineCallbacks
+    def jingle_session_init(self, client, session, content_name):
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        sid = transport_data["sid"] = str(uuid.uuid4())
+        session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
+            session["local_jid"], session["peer_jid"], sid
+        )
+        transport_data["peer_session_hash"] = self._s5b.get_session_hash(
+            session["peer_jid"], session["local_jid"], sid
+        )  # requester and target are inversed for peer candidates
+        transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None)
+        candidates = transport_data["candidates"] = yield self._s5b.get_candidates(
+            client, session["local_jid"])
+        mode = "tcp"  # XXX: we only manage tcp for now
+        transport_elt = self._build_candidates(
+            session, candidates, sid, session_hash, client, mode
+        )
+
+        defer.returnValue(transport_elt)
+
+    def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name):
+        """Called when activation confirmation has been received from proxy
+
+        cf XEP-0260 § 2.4
+        """
+        # now that the proxy is activated, we have to inform other peer
+        content_data = session["contents"][content_name]
+        iq_elt, transport_elt = self._j.build_action(
+            client, self._j.A_TRANSPORT_INFO, session, content_name
+        )
+        transport_elt["sid"] = content_data["transport_data"]["sid"]
+        activated_elt = transport_elt.addElement("activated")
+        activated_elt["cid"] = candidate.id
+        iq_elt.send()
+
+    def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name):
+        """Called when activation error has been received from proxy
+
+        cf XEP-0260 § 2.4
+        """
+        # TODO: fallback to IBB
+        # now that the proxy is activated, we have to inform other peer
+        content_data = session["contents"][content_name]
+        iq_elt, transport_elt = self._j.build_action(
+            client, self._j.A_TRANSPORT_INFO, session, content_name
+        )
+        transport_elt["sid"] = content_data["transport_data"]["sid"]
+        transport_elt.addElement("proxy-error")
+        iq_elt.send()
+        log.warning(
+            "Can't activate proxy, we need to fallback to IBB: {reason}".format(
+                reason=stanza_error.value.condition
+            )
+        )
+        self.do_fallback(session, content_name, client)
+
+    def _found_peer_candidate(
+        self, candidate, session, transport_data, content_name, client
+    ):
+        """Called when the best candidate from other peer is found
+
+        @param candidate(XEP_0065.Candidate, None): selected candidate,
+            or None if no candidate is accessible
+        @param session(dict):  session data
+        @param transport_data(dict): transport data
+        @param content_name(unicode): name of the current content
+        @param client(unicode): %(doc_client)s
+        """
+
+        content_data = session["contents"][content_name]
+        transport_data["best_candidate"] = candidate
+        # we need to disconnect all non selected candidates before removing them
+        for c in transport_data["peer_candidates"]:
+            if c is None or c is candidate:
+                continue
+            c.discard()
+        del transport_data["peer_candidates"]
+        iq_elt, transport_elt = self._j.build_action(
+            client, self._j.A_TRANSPORT_INFO, session, content_name
+        )
+        transport_elt["sid"] = content_data["transport_data"]["sid"]
+        if candidate is None:
+            log.warning("Can't connect to any peer candidate")
+            candidate_elt = transport_elt.addElement("candidate-error")
+        else:
+            log.info("Found best peer candidate: {}".format(str(candidate)))
+            candidate_elt = transport_elt.addElement("candidate-used")
+            candidate_elt["cid"] = candidate.id
+        iq_elt.send()  # TODO: check result stanza
+        self._check_candidates(session, content_name, transport_data, client)
+
+    def _check_candidates(self, session, content_name, transport_data, client):
+        """Called when a candidate has been choosed
+
+        if we have both candidates, we select one, or fallback to an other transport
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        content_data = session["contents"][content_name]
+        try:
+            best_candidate = transport_data["best_candidate"]
+        except KeyError:
+            # we have not our best candidate yet
+            return
+        try:
+            peer_best_candidate = transport_data["peer_best_candidate"]
+        except KeyError:
+            # we have not peer best candidate yet
+            return
+
+        # at this point we have both candidates, it's time to choose one
+        if best_candidate is None or peer_best_candidate is None:
+            choosed_candidate = best_candidate or peer_best_candidate
+        else:
+            if best_candidate.priority == peer_best_candidate.priority:
+                # same priority, we choose initiator one according to XEP-0260 §2.4 #4
+                log.debug(
+                    "Candidates have same priority, we select the one choosed by initiator"
+                )
+                if session["initiator"] == session["local_jid"]:
+                    choosed_candidate = best_candidate
+                else:
+                    choosed_candidate = peer_best_candidate
+            else:
+                choosed_candidate = max(
+                    best_candidate, peer_best_candidate, key=lambda c: c.priority
+                )
+
+        if choosed_candidate is None:
+            log.warning("Socks5 negociation failed, we need to fallback to IBB")
+            self.do_fallback(session, content_name, client)
+        else:
+            if choosed_candidate == peer_best_candidate:
+                # peer_best_candidate was choosed from the candidates we have sent
+                # so our_candidate is true if choosed_candidate is peer_best_candidate
+                our_candidate = True
+                # than also mean that best_candidate must be discarded !
+                try:
+                    best_candidate.discard()
+                except AttributeError:  # but it can be None
+                    pass
+            else:
+                our_candidate = False
+
+            log.info(
+                "Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
+                    who="our" if our_candidate else "other peer",
+                    candidate=choosed_candidate,
+                )
+            )
+            del transport_data["best_candidate"]
+            del transport_data["peer_best_candidate"]
+
+            if choosed_candidate.type == self._s5b.TYPE_PROXY:
+                # the stream transfer need to wait for proxy activation
+                # (see XEP-0260 § 2.4)
+                if our_candidate:
+                    d = self._s5b.connect_candidate(
+                        client, choosed_candidate, transport_data["session_hash"]
+                    )
+                    d.addCallback(
+                        lambda __: choosed_candidate.activate(
+                            transport_data["sid"], session["peer_jid"], client
+                        )
+                    )
+                    args = [client, choosed_candidate, session, content_name]
+                    d.addCallbacks(
+                        self._proxy_activated_cb, self._proxy_activated_eb, args, None, args
+                    )
+                else:
+                    # this Deferred will be called when we'll receive activation confirmation from other peer
+                    d = transport_data["activation_d"] = defer.Deferred()
+            else:
+                d = defer.succeed(None)
+
+            if content_data["senders"] == session["role"]:
+                # we can now start the stream transfer (or start it after proxy activation)
+                d.addCallback(
+                    lambda __: choosed_candidate.start_transfer(
+                        transport_data["session_hash"]
+                    )
+                )
+                d.addErrback(self._start_eb, session, content_name, client)
+
+    def _start_eb(self, fail, session, content_name, client):
+        """Called when it's not possible to start the transfer
+
+        Will try to fallback to IBB
+        """
+        try:
+            reason = str(fail.value)
+        except AttributeError:
+            reason = str(fail)
+        log.warning("Cant start transfert, we'll try fallback method: {}".format(reason))
+        self.do_fallback(session, content_name, client)
+
+    def _candidate_info(
+        self, candidate_elt, session, content_name, transport_data, client
+    ):
+        """Called when best candidate has been received from peer (or if none is working)
+
+        @param candidate_elt(domish.Element): candidate-used or candidate-error element
+            (see XEP-0260 §2.3)
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        if candidate_elt.name == "candidate-error":
+            # candidate-error, no candidate worked
+            transport_data["peer_best_candidate"] = None
+        else:
+            # candidate-used, one candidate was choosed
+            try:
+                cid = candidate_elt.attributes["cid"]
+            except KeyError:
+                log.warning("No cid found in <candidate-used>")
+                raise exceptions.DataError
+            try:
+                candidate = next((
+                    c for c in transport_data["candidates"] if c.id == cid
+                ))
+            except StopIteration:
+                log.warning("Given cid doesn't correspond to any known candidate !")
+                raise exceptions.DataError  # TODO: send an error to other peer, and use better exception
+            except KeyError:
+                # a transport-info can also be intentionaly sent too early by other peer
+                # but there is little probability
+                log.error(
+                    '"candidates" key doesn\'t exists in transport_data, it should at this point'
+                )
+                raise exceptions.InternalError
+            # at this point we have the candidate choosed by other peer
+            transport_data["peer_best_candidate"] = candidate
+            log.info("Other peer best candidate: {}".format(candidate))
+
+        del transport_data["candidates"]
+        self._check_candidates(session, content_name, transport_data, client)
+
+    def _proxy_activation_info(
+        self, proxy_elt, session, content_name, transport_data, client
+    ):
+        """Called when proxy has been activated (or has sent an error)
+
+        @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
+            (see XEP-0260 §2.4)
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        try:
+            activation_d = transport_data.pop("activation_d")
+        except KeyError:
+            log.warning("Received unexpected transport-info for proxy activation")
+
+        if proxy_elt.name == "activated":
+            activation_d.callback(None)
+        else:
+            activation_d.errback(ProxyError())
+
+    @defer.inlineCallbacks
+    def jingle_handler(self, client, action, session, content_name, transport_elt):
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+
+        if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
+            pass
+
+        elif action == self._j.A_SESSION_ACCEPT:
+            # initiator side, we select a candidate in the ones sent by responder
+            assert "peer_candidates" not in transport_data
+            transport_data["peer_candidates"] = self._parse_candidates(transport_elt)
+
+        elif action == self._j.A_START:
+            session_hash = transport_data["session_hash"]
+            peer_candidates = transport_data["peer_candidates"]
+            stream_object = content_data["stream_object"]
+            self._s5b.associate_stream_object(client, session_hash, stream_object)
+            stream_d = transport_data.pop("stream_d")
+            stream_d.chainDeferred(content_data["finished_d"])
+            peer_session_hash = transport_data["peer_session_hash"]
+            d = self._s5b.get_best_candidate(
+                client, peer_candidates, session_hash, peer_session_hash
+            )
+            d.addCallback(
+                self._found_peer_candidate, session, transport_data, content_name, client
+            )
+
+        elif action == self._j.A_SESSION_INITIATE:
+            # responder side, we select a candidate in the ones sent by initiator
+            # and we give our candidates
+            assert "peer_candidates" not in transport_data
+            sid = transport_data["sid"] = transport_elt["sid"]
+            session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
+                session["local_jid"], session["peer_jid"], sid
+            )
+            peer_session_hash = transport_data[
+                "peer_session_hash"
+            ] = self._s5b.get_session_hash(
+                session["peer_jid"], session["local_jid"], sid
+            )  # requester and target are inversed for peer candidates
+            peer_candidates = transport_data["peer_candidates"] = self._parse_candidates(
+                transport_elt
+            )
+            stream_object = content_data["stream_object"]
+            stream_d = self._s5b.register_hash(client, session_hash, stream_object)
+            stream_d.chainDeferred(content_data["finished_d"])
+            d = self._s5b.get_best_candidate(
+                client, peer_candidates, session_hash, peer_session_hash
+            )
+            d.addCallback(
+                self._found_peer_candidate, session, transport_data, content_name, client
+            )
+            candidates = yield self._s5b.get_candidates(client, session["local_jid"])
+            # we remove duplicate candidates
+            candidates = [
+                candidate for candidate in candidates if candidate not in peer_candidates
+            ]
+
+            transport_data["candidates"] = candidates
+            # we can now build a new <transport> element with our candidates
+            transport_elt = self._build_candidates(
+                session, candidates, sid, session_hash, client
+            )
+
+        elif action == self._j.A_TRANSPORT_INFO:
+            # transport-info can be about candidate or proxy activation
+            candidate_elt = None
+
+            for method, names in (
+                (self._candidate_info, ("candidate-used", "candidate-error")),
+                (self._proxy_activation_info, ("activated", "proxy-error")),
+            ):
+                for name in names:
+                    try:
+                        candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name))
+                    except StopIteration:
+                        continue
+                    else:
+                        method(
+                            candidate_elt, session, content_name, transport_data, client
+                        )
+                        break
+
+            if candidate_elt is None:
+                log.warning(
+                    "Unexpected transport element: {}".format(transport_elt.toXml())
+                )
+        elif action == self._j.A_DESTROY:
+            # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
+            # note that sid argument is not necessary for sessions created by this plugin
+            self._s5b.kill_session(None, transport_data["session_hash"], None, client)
+        else:
+            log.warning("FIXME: unmanaged action {}".format(action))
+
+        defer.returnValue(transport_elt)
+
+    def jingle_terminate(self, client, action, session, content_name, reason_elt):
+        if reason_elt.decline:
+            log.debug("Session declined, deleting S5B session")
+            # we just need to clean the S5B session if it is declined
+            content_data = session["contents"][content_name]
+            transport_data = content_data["transport_data"]
+            self._s5b.kill_session(None, transport_data["session_hash"], None, client)
+
+    def _do_fallback(self, feature_checked, session, content_name, client):
+        """Do the fallback, method called once feature is checked
+
+         @param feature_checked(bool): True if other peer can do IBB
+         """
+        if not feature_checked:
+            log.warning(
+                "Other peer can't manage jingle IBB, be have to terminate the session"
+            )
+            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
+        else:
+            self._j.transport_replace(
+                client, self._jingle_ibb.NAMESPACE, session, content_name
+            )
+
+    def do_fallback(self, session, content_name, client):
+        """Fallback to IBB transport, used in last resort
+
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param client(unicode): %(doc_client)s
+        """
+        if session["role"] != self._j.ROLE_INITIATOR:
+            # only initiator must do the fallback, see XEP-0260 §3
+            return
+        if self._jingle_ibb is None:
+            log.warning(
+                "Jingle IBB (XEP-0261) plugin is not available, we have to close the session"
+            )
+            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
+        else:
+            d = self.host.hasFeature(
+                client, self._jingle_ibb.NAMESPACE, session["peer_jid"]
+            )
+            d.addCallback(self._do_fallback, session, content_name, client)
+        return d
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0260_handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_JINGLE_S5B)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []