view libervia/backend/plugins/plugin_xep_0260.py @ 4202:b26339343076

core: use a user specific directory for PID file: default location of pid file is now specific to logged user, this allow to run several instances of Libervia by different users on the same machine without PID conflicts.
author Goffi <goffi@goffi.org>
date Sun, 14 Jan 2024 17:48:02 +0100
parents b86912d3fd33
children e11b13418ba6
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Jingle (XEP-0260)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger

log = getLogger(__name__)
from libervia.backend.core import exceptions
from wokkel import disco, iwokkel
from zope.interface import implementer
from twisted.words.xish import domish
from twisted.words.protocols.jabber import jid
from twisted.internet import defer
import uuid

try:
    from twisted.words.protocols.xmlstream import XMPPHandler
except ImportError:
    from wokkel.subprotocols import XMPPHandler


NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle SOCKS5 Bytestreams",
    C.PI_IMPORT_NAME: "XEP-0260",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0260"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"],
    C.PI_RECOMMENDATIONS: ["XEP-0261"],  # needed for fallback
    C.PI_MAIN: "XEP_0260",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""),
}


class ProxyError(Exception):
    def __str__(self):
        return "an error happened while trying to use the proxy"


class XEP_0260(object):
    # TODO: udp handling

    def __init__(self, host):
        log.info(_("plugin Jingle SOCKS5 Bytestreams"))
        self.host = host
        self._j = host.plugins["XEP-0166"]  # shortcut to access jingle
        self._s5b = host.plugins["XEP-0065"]  # and socks5 bytestream
        try:
            self._jingle_ibb = host.plugins["XEP-0261"]
        except KeyError:
            self._jingle_ibb = None
        self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)

    def get_handler(self, client):
        return XEP_0260_handler()

    def _parse_candidates(self, transport_elt):
        """Parse <candidate> elements

        @param transport_elt(domish.Element): parent <transport> element
        @return (list[plugin_xep_0065.Candidate): list of parsed candidates
        """
        candidates = []
        for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"):
            try:
                cid = candidate_elt["cid"]
                host = candidate_elt["host"]
                jid_ = jid.JID(candidate_elt["jid"])
                port = int(candidate_elt.getAttribute("port", 1080))
                priority = int(candidate_elt["priority"])
                type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT)
            except (KeyError, ValueError):
                raise exceptions.DataError()
            candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
            candidates.append(candidate)
            # self._s5b.registerCandidate(candidate)
        return candidates

    def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None):
        """Build <transport> element with candidates

        @param session(dict): jingle session data
        @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
        @param sid(unicode): transport stream id
        @param client: %(doc_client)s
        @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
        @return (domish.Element): parent <transport> element where <candidate> elements must be added
        """
        proxy = next(
            (
                candidate
                for candidate in candidates
                if candidate.type == self._s5b.TYPE_PROXY
            ),
            None,
        )
        transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
        transport_elt["sid"] = sid
        if proxy is not None:
            transport_elt["dstaddr"] = session_hash
        if mode is not None:
            transport_elt["mode"] = "tcp"  # XXX: we only manage tcp for now

        for candidate in candidates:
            log.debug("Adding candidate: {}".format(candidate))
            candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B)
            if candidate.id is None:
                candidate.id = str(uuid.uuid4())
            candidate_elt["cid"] = candidate.id
            candidate_elt["host"] = candidate.host
            candidate_elt["jid"] = candidate.jid.full()
            candidate_elt["port"] = str(candidate.port)
            candidate_elt["priority"] = str(candidate.priority)
            candidate_elt["type"] = candidate.type
        return transport_elt

    async def jingle_session_init(self, client, session, content_name):
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        sid = transport_data["sid"] = str(uuid.uuid4())
        session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
            session["local_jid"], session["peer_jid"], sid
        )
        transport_data["peer_session_hash"] = self._s5b.get_session_hash(
            session["peer_jid"], session["local_jid"], sid
        )  # requester and target are inversed for peer candidates
        transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None)
        candidates = transport_data["candidates"] = await self._s5b.get_candidates(
            client, session["local_jid"])
        mode = "tcp"  # XXX: we only manage tcp for now
        transport_elt = self._build_candidates(
            session, candidates, sid, session_hash, client, mode
        )

        return transport_elt

    def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name):
        """Called when activation confirmation has been received from proxy

        cf XEP-0260 § 2.4
        """
        # now that the proxy is activated, we have to inform other peer
        content_data = session["contents"][content_name]
        iq_elt, transport_elt = self._j.build_action(
            client, self._j.A_TRANSPORT_INFO, session, content_name
        )
        transport_elt["sid"] = content_data["transport_data"]["sid"]
        activated_elt = transport_elt.addElement("activated")
        activated_elt["cid"] = candidate.id
        iq_elt.send()

    def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name):
        """Called when activation error has been received from proxy

        cf XEP-0260 § 2.4
        """
        # TODO: fallback to IBB
        # now that the proxy is activated, we have to inform other peer
        content_data = session["contents"][content_name]
        iq_elt, transport_elt = self._j.build_action(
            client, self._j.A_TRANSPORT_INFO, session, content_name
        )
        transport_elt["sid"] = content_data["transport_data"]["sid"]
        transport_elt.addElement("proxy-error")
        iq_elt.send()
        log.warning(
            "Can't activate proxy, we need to fallback to IBB: {reason}".format(
                reason=stanza_error.value.condition
            )
        )
        self.do_fallback(session, content_name, client)

    def _found_peer_candidate(
        self, candidate, session, transport_data, content_name, client
    ):
        """Called when the best candidate from other peer is found

        @param candidate(XEP_0065.Candidate, None): selected candidate,
            or None if no candidate is accessible
        @param session(dict):  session data
        @param transport_data(dict): transport data
        @param content_name(unicode): name of the current content
        @param client(unicode): %(doc_client)s
        """

        content_data = session["contents"][content_name]
        transport_data["best_candidate"] = candidate
        # we need to disconnect all non selected candidates before removing them
        for c in transport_data["peer_candidates"]:
            if c is None or c is candidate:
                continue
            c.discard()
        del transport_data["peer_candidates"]
        iq_elt, transport_elt = self._j.build_action(
            client, self._j.A_TRANSPORT_INFO, session, content_name
        )
        transport_elt["sid"] = content_data["transport_data"]["sid"]
        if candidate is None:
            log.warning("Can't connect to any peer candidate")
            candidate_elt = transport_elt.addElement("candidate-error")
        else:
            log.info("Found best peer candidate: {}".format(str(candidate)))
            candidate_elt = transport_elt.addElement("candidate-used")
            candidate_elt["cid"] = candidate.id
        iq_elt.send()  # TODO: check result stanza
        self._check_candidates(session, content_name, transport_data, client)

    def _check_candidates(self, session, content_name, transport_data, client):
        """Called when a candidate has been choosed

        if we have both candidates, we select one, or fallback to an other transport
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        content_data = session["contents"][content_name]
        try:
            best_candidate = transport_data["best_candidate"]
        except KeyError:
            # we have not our best candidate yet
            return
        try:
            peer_best_candidate = transport_data["peer_best_candidate"]
        except KeyError:
            # we have not peer best candidate yet
            return

        # at this point we have both candidates, it's time to choose one
        if best_candidate is None or peer_best_candidate is None:
            choosed_candidate = best_candidate or peer_best_candidate
        else:
            if best_candidate.priority == peer_best_candidate.priority:
                # same priority, we choose initiator one according to XEP-0260 §2.4 #4
                log.debug(
                    "Candidates have same priority, we select the one choosed by initiator"
                )
                if session["initiator"] == session["local_jid"]:
                    choosed_candidate = best_candidate
                else:
                    choosed_candidate = peer_best_candidate
            else:
                choosed_candidate = max(
                    best_candidate, peer_best_candidate, key=lambda c: c.priority
                )

        if choosed_candidate is None:
            log.warning("Socks5 negociation failed, we need to fallback to IBB")
            self.do_fallback(session, content_name, client)
        else:
            if choosed_candidate == peer_best_candidate:
                # peer_best_candidate was choosed from the candidates we have sent
                # so our_candidate is true if choosed_candidate is peer_best_candidate
                our_candidate = True
                # than also mean that best_candidate must be discarded !
                try:
                    best_candidate.discard()
                except AttributeError:  # but it can be None
                    pass
            else:
                our_candidate = False

            log.info(
                "Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
                    who="our" if our_candidate else "other peer",
                    candidate=choosed_candidate,
                )
            )
            del transport_data["best_candidate"]
            del transport_data["peer_best_candidate"]

            if choosed_candidate.type == self._s5b.TYPE_PROXY:
                # the stream transfer need to wait for proxy activation
                # (see XEP-0260 § 2.4)
                if our_candidate:
                    d = self._s5b.connect_candidate(
                        client, choosed_candidate, transport_data["session_hash"]
                    )
                    d.addCallback(
                        lambda __: choosed_candidate.activate(
                            transport_data["sid"], session["peer_jid"], client
                        )
                    )
                    args = [client, choosed_candidate, session, content_name]
                    d.addCallbacks(
                        self._proxy_activated_cb, self._proxy_activated_eb, args, None, args
                    )
                else:
                    # this Deferred will be called when we'll receive activation confirmation from other peer
                    d = transport_data["activation_d"] = defer.Deferred()
            else:
                d = defer.succeed(None)

            if content_data["senders"] == session["role"]:
                # we can now start the stream transfer (or start it after proxy activation)
                d.addCallback(
                    lambda __: choosed_candidate.start_transfer(
                        transport_data["session_hash"]
                    )
                )
                d.addErrback(self._start_eb, session, content_name, client)

    def _start_eb(self, fail, session, content_name, client):
        """Called when it's not possible to start the transfer

        Will try to fallback to IBB
        """
        try:
            reason = str(fail.value)
        except AttributeError:
            reason = str(fail)
        log.warning("Cant start transfert, we'll try fallback method: {}".format(reason))
        self.do_fallback(session, content_name, client)

    def _candidate_info(
        self, candidate_elt, session, content_name, transport_data, client
    ):
        """Called when best candidate has been received from peer (or if none is working)

        @param candidate_elt(domish.Element): candidate-used or candidate-error element
            (see XEP-0260 §2.3)
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        if candidate_elt.name == "candidate-error":
            # candidate-error, no candidate worked
            transport_data["peer_best_candidate"] = None
        else:
            # candidate-used, one candidate was choosed
            try:
                cid = candidate_elt.attributes["cid"]
            except KeyError:
                log.warning("No cid found in <candidate-used>")
                raise exceptions.DataError
            try:
                candidate = next((
                    c for c in transport_data["candidates"] if c.id == cid
                ))
            except StopIteration:
                log.warning("Given cid doesn't correspond to any known candidate !")
                raise exceptions.DataError  # TODO: send an error to other peer, and use better exception
            except KeyError:
                # a transport-info can also be intentionaly sent too early by other peer
                # but there is little probability
                log.error(
                    '"candidates" key doesn\'t exists in transport_data, it should at this point'
                )
                raise exceptions.InternalError
            # at this point we have the candidate choosed by other peer
            transport_data["peer_best_candidate"] = candidate
            log.info("Other peer best candidate: {}".format(candidate))

        del transport_data["candidates"]
        self._check_candidates(session, content_name, transport_data, client)

    def _proxy_activation_info(
        self, proxy_elt, session, content_name, transport_data, client
    ):
        """Called when proxy has been activated (or has sent an error)

        @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
            (see XEP-0260 §2.4)
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        try:
            activation_d = transport_data.pop("activation_d")
        except KeyError:
            log.warning("Received unexpected transport-info for proxy activation")

        if proxy_elt.name == "activated":
            activation_d.callback(None)
        else:
            activation_d.errback(ProxyError())

    async def jingle_handler(self, client, action, session, content_name, transport_elt):
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]

        if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
            pass

        elif action == self._j.A_SESSION_ACCEPT:
            # initiator side, we select a candidate in the ones sent by responder
            assert "peer_candidates" not in transport_data
            transport_data["peer_candidates"] = self._parse_candidates(transport_elt)

        elif action == self._j.A_START:
            session_hash = transport_data["session_hash"]
            peer_candidates = transport_data["peer_candidates"]
            stream_object = content_data["stream_object"]
            self._s5b.associate_stream_object(client, session_hash, stream_object)
            stream_d = transport_data.pop("stream_d")
            stream_d.chainDeferred(content_data["finished_d"])
            peer_session_hash = transport_data["peer_session_hash"]
            d = self._s5b.get_best_candidate(
                client, peer_candidates, session_hash, peer_session_hash
            )
            d.addCallback(
                self._found_peer_candidate, session, transport_data, content_name, client
            )

        elif action == self._j.A_SESSION_INITIATE:
            # responder side, we select a candidate in the ones sent by initiator
            # and we give our candidates
            assert "peer_candidates" not in transport_data
            sid = transport_data["sid"] = transport_elt["sid"]
            session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
                session["local_jid"], session["peer_jid"], sid
            )
            peer_session_hash = transport_data[
                "peer_session_hash"
            ] = self._s5b.get_session_hash(
                session["peer_jid"], session["local_jid"], sid
            )  # requester and target are inversed for peer candidates
            peer_candidates = transport_data["peer_candidates"] = self._parse_candidates(
                transport_elt
            )
            stream_object = content_data["stream_object"]
            stream_d = self._s5b.register_hash(client, session_hash, stream_object)
            stream_d.chainDeferred(content_data["finished_d"])
            d = self._s5b.get_best_candidate(
                client, peer_candidates, session_hash, peer_session_hash
            )
            d.addCallback(
                self._found_peer_candidate, session, transport_data, content_name, client
            )
            candidates = await self._s5b.get_candidates(client, session["local_jid"])
            # we remove duplicate candidates
            candidates = [
                candidate for candidate in candidates if candidate not in peer_candidates
            ]

            transport_data["candidates"] = candidates
            # we can now build a new <transport> element with our candidates
            transport_elt = self._build_candidates(
                session, candidates, sid, session_hash, client
            )

        elif action == self._j.A_TRANSPORT_INFO:
            # transport-info can be about candidate or proxy activation
            candidate_elt = None

            for method, names in (
                (self._candidate_info, ("candidate-used", "candidate-error")),
                (self._proxy_activation_info, ("activated", "proxy-error")),
            ):
                for name in names:
                    try:
                        candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name))
                    except StopIteration:
                        continue
                    else:
                        method(
                            candidate_elt, session, content_name, transport_data, client
                        )
                        break

            if candidate_elt is None:
                log.warning(
                    "Unexpected transport element: {}".format(transport_elt.toXml())
                )
        elif action == self._j.A_DESTROY:
            # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
            # note that sid argument is not necessary for sessions created by this plugin
            self._s5b.kill_session(None, transport_data["session_hash"], None, client)
        else:
            log.warning("FIXME: unmanaged action {}".format(action))

        return transport_elt

    def jingle_terminate(self, client, action, session, content_name, reason_elt):
        if reason_elt.decline:
            log.debug("Session declined, deleting S5B session")
            # we just need to clean the S5B session if it is declined
            content_data = session["contents"][content_name]
            transport_data = content_data["transport_data"]
            self._s5b.kill_session(None, transport_data["session_hash"], None, client)

    def _do_fallback(self, feature_checked, session, content_name, client):
        """Do the fallback, method called once feature is checked

         @param feature_checked(bool): True if other peer can do IBB
         """
        if not feature_checked:
            log.warning(
                "Other peer can't manage jingle IBB, be have to terminate the session"
            )
            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
        else:
            self._j.transport_replace(
                client, self._jingle_ibb.NAMESPACE, session, content_name
            )

    def do_fallback(self, session, content_name, client):
        """Fallback to IBB transport, used in last resort

        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param client(unicode): %(doc_client)s
        """
        if session["role"] != self._j.ROLE_INITIATOR:
            # only initiator must do the fallback, see XEP-0260 §3
            return
        if self._jingle_ibb is None:
            log.warning(
                "Jingle IBB (XEP-0261) plugin is not available, we have to close the session"
            )
            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
        else:
            d = self.host.hasFeature(
                client, self._jingle_ibb.NAMESPACE, session["peer_jid"]
            )
            d.addCallback(self._do_fallback, session, content_name, client)
        return d


@implementer(iwokkel.IDisco)
class XEP_0260_handler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_S5B)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []