diff libervia/backend/plugins/plugin_exp_jingle_stream.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_exp_jingle_stream.py@877145b4ba01
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_exp_jingle_stream.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,305 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for managing pipes (experimental)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import errno
+from zope import interface
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid
+from twisted.internet import defer
+from twisted.internet import protocol
+from twisted.internet import endpoints
+from twisted.internet import reactor
+from twisted.internet import error
+from twisted.internet import interfaces
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core import exceptions
+from libervia.backend.core.log import getLogger
+from libervia.backend.tools import xml_tools
+from libervia.backend.tools import stream
+
+
+log = getLogger(__name__)
+
+NS_STREAM = "http://salut-a-toi.org/protocol/stream"
+SECURITY_LIMIT = 30
+START_PORT = 8888
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle Stream Plugin",
+    C.PI_IMPORT_NAME: "STREAM",
+    C.PI_TYPE: "EXP",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0166"],
+    C.PI_MAIN: "JingleStream",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Jingle Stream plugin"""),
+}
+
+CONFIRM = D_("{peer} wants to send you a stream, do you accept ?")
+CONFIRM_TITLE = D_("Stream Request")
+
+
+class StreamProtocol(protocol.Protocol):
+    def __init__(self):
+        self.pause = False
+
+    def set_pause(self, paused):
+        # in Python 2.x, Twisted classes are old style
+        # so we can use property and setter
+        if paused:
+            if not self.pause:
+                self.transport.pauseProducing()
+                self.pause = True
+        else:
+            if self.pause:
+                self.transport.resumeProducing()
+                self.pause = False
+
+    def disconnect(self):
+        self.transport.loseConnection()
+
+    def connectionMade(self):
+        if self.factory.client_conn is not None:
+            self.transport.loseConnection()
+        self.factory.set_client_conn(self)
+
+    def dataReceived(self, data):
+        self.factory.write_to_consumer(data)
+
+    def sendData(self, data):
+        self.transport.write(data)
+
+    def connectionLost(self, reason):
+        if self.factory.client_conn != self:
+            # only the first connected client_conn is relevant
+            return
+
+        if reason.type == error.ConnectionDone:
+            self.factory.stream_finished()
+        else:
+            self.factory.stream_failed(reason)
+
+
+@interface.implementer(stream.IStreamProducer)
+@interface.implementer(interfaces.IPushProducer)
+@interface.implementer(interfaces.IConsumer)
+class StreamFactory(protocol.Factory):
+    protocol = StreamProtocol
+    consumer = None
+    producer = None
+    deferred = None
+
+    def __init__(self):
+        self.client_conn = None
+
+    def set_client_conn(self, stream_protocol):
+        # in Python 2.x, Twisted classes are old style
+        # so we can use property and setter
+        assert self.client_conn is None
+        self.client_conn = stream_protocol
+        if self.consumer is None:
+            self.client_conn.set_pause(True)
+
+    def start_stream(self, consumer):
+        if self.consumer is not None:
+            raise exceptions.InternalError(
+                _("stream can't be used with multiple consumers")
+            )
+        assert self.deferred is None
+        self.consumer = consumer
+        consumer.registerProducer(self, True)
+        self.deferred = defer.Deferred()
+        if self.client_conn is not None:
+            self.client_conn.set_pause(False)
+        return self.deferred
+
+    def stream_finished(self):
+        self.client_conn = None
+        if self.consumer:
+            self.consumer.unregisterProducer()
+            self.port_listening.stopListening()
+        self.deferred.callback(None)
+
+    def stream_failed(self, failure_):
+        self.client_conn = None
+        if self.consumer:
+            self.consumer.unregisterProducer()
+            self.port_listening.stopListening()
+            self.deferred.errback(failure_)
+        elif self.producer:
+            self.producer.stopProducing()
+
+    def stop_stream(self):
+        if self.client_conn is not None:
+            self.client_conn.disconnect()
+
+    def registerProducer(self, producer, streaming):
+        self.producer = producer
+
+    def pauseProducing(self):
+        self.client_conn.set_pause(True)
+
+    def resumeProducing(self):
+        self.client_conn.set_pause(False)
+
+    def stopProducing(self):
+        if self.client_conn:
+            self.client_conn.disconnect()
+
+    def write(self, data):
+        try:
+            self.client_conn.sendData(data)
+        except AttributeError:
+            log.warning(_("No client connected, can't send data"))
+
+    def write_to_consumer(self, data):
+        self.consumer.write(data)
+
+
+class JingleStream(object):
+    """This non standard jingle application send byte stream"""
+
+    def __init__(self, host):
+        log.info(_("Plugin Stream initialization"))
+        self.host = host
+        self._j = host.plugins["XEP-0166"]  # shortcut to access jingle
+        self._j.register_application(NS_STREAM, self)
+        host.bridge.add_method(
+            "stream_out",
+            ".plugin",
+            in_sign="ss",
+            out_sign="s",
+            method=self._stream_out,
+            async_=True,
+        )
+
+    # jingle callbacks
+
+    def _stream_out(self, to_jid_s, profile_key):
+        client = self.host.get_client(profile_key)
+        return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s)))
+
+    async def stream_out(self, client, to_jid):
+        """send a stream
+
+        @param peer_jid(jid.JID): recipient
+        @return: an unique id to identify the transfer
+        """
+        port = START_PORT
+        factory = StreamFactory()
+        while True:
+            endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
+            try:
+                port_listening = await endpoint.listen(factory)
+            except error.CannotListenError as e:
+                if e.socketError.errno == errno.EADDRINUSE:
+                    port += 1
+                else:
+                    raise e
+            else:
+                factory.port_listening = port_listening
+                break
+        # we don't want to wait for IQ result of initiate
+        defer.ensureDeferred(self._j.initiate(
+            client,
+            to_jid,
+            [
+                {
+                    "app_ns": NS_STREAM,
+                    "senders": self._j.ROLE_INITIATOR,
+                    "app_kwargs": {"stream_object": factory},
+                }
+            ],
+        ))
+        return str(port)
+
+    def jingle_session_init(self, client, session, content_name, stream_object):
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        assert "stream_object" not in application_data
+        application_data["stream_object"] = stream_object
+        desc_elt = domish.Element((NS_STREAM, "description"))
+        return desc_elt
+
+    @defer.inlineCallbacks
+    def jingle_request_confirmation(self, client, action, session, content_name, desc_elt):
+        """This method request confirmation for a jingle session"""
+        content_data = session["contents"][content_name]
+        if content_data["senders"] not in (
+            self._j.ROLE_INITIATOR,
+            self._j.ROLE_RESPONDER,
+        ):
+            log.warning("Bad sender, assuming initiator")
+            content_data["senders"] = self._j.ROLE_INITIATOR
+
+        confirm_data = yield xml_tools.defer_dialog(
+            self.host,
+            _(CONFIRM).format(peer=session["peer_jid"].full()),
+            _(CONFIRM_TITLE),
+            type_=C.XMLUI_DIALOG_CONFIRM,
+            action_extra={
+                "from_jid": session["peer_jid"].full(),
+                "type": "STREAM",
+            },
+            security_limit=SECURITY_LIMIT,
+            profile=client.profile,
+        )
+
+        if not C.bool(confirm_data["answer"]):
+            defer.returnValue(False)
+        try:
+            port = int(confirm_data["port"])
+        except (ValueError, KeyError):
+            raise exceptions.DataError(_("given port is invalid"))
+        endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port)
+        factory = StreamFactory()
+        yield endpoint.connect(factory)
+        content_data["stream_object"] = factory
+        finished_d = content_data["finished_d"] = defer.Deferred()
+        args = [client, session, content_name, content_data]
+        finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
+        defer.returnValue(True)
+
+    def jingle_handler(self, client, action, session, content_name, desc_elt):
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
+            pass
+        elif action == self._j.A_SESSION_ACCEPT:
+            assert not "stream_object" in content_data
+            content_data["stream_object"] = application_data["stream_object"]
+            finished_d = content_data["finished_d"] = defer.Deferred()
+            args = [client, session, content_name, content_data]
+            finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args)
+        else:
+            log.warning("FIXME: unmanaged action {}".format(action))
+        return desc_elt
+
+    def _finished_cb(self, __, client, session, content_name, content_data):
+        log.info("Pipe transfer completed")
+        self._j.content_terminate(client, session, content_name)
+        content_data["stream_object"].stop_stream()
+
+    def _finished_eb(self, failure, client, session, content_name, content_data):
+        log.warning("Error while streaming pipe: {}".format(failure))
+        self._j.content_terminate(
+            client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT
+        )
+        content_data["stream_object"].stop_stream()