Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_exp_jingle_stream.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_exp_jingle_stream.py@877145b4ba01 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_exp_jingle_stream.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 + + +# SAT plugin for managing pipes (experimental) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import errno +from zope import interface +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +from twisted.internet import protocol +from twisted.internet import endpoints +from twisted.internet import reactor +from twisted.internet import error +from twisted.internet import interfaces +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.constants import Const as C +from libervia.backend.core import exceptions +from libervia.backend.core.log import getLogger +from libervia.backend.tools import xml_tools +from libervia.backend.tools import stream + + +log = getLogger(__name__) + +NS_STREAM = "http://salut-a-toi.org/protocol/stream" +SECURITY_LIMIT = 30 +START_PORT = 8888 + +PLUGIN_INFO = { + C.PI_NAME: "Jingle Stream Plugin", + C.PI_IMPORT_NAME: "STREAM", + C.PI_TYPE: "EXP", + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0166"], + C.PI_MAIN: "JingleStream", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Jingle Stream plugin"""), +} + +CONFIRM = D_("{peer} wants to send you a stream, do you accept ?") +CONFIRM_TITLE = D_("Stream Request") + + +class StreamProtocol(protocol.Protocol): + def __init__(self): + self.pause = False + + def set_pause(self, paused): + # in Python 2.x, Twisted classes are old style + # so we can use property and setter + if paused: + if not self.pause: + self.transport.pauseProducing() + self.pause = True + else: + if self.pause: + self.transport.resumeProducing() + self.pause = False + + def disconnect(self): + self.transport.loseConnection() + + def connectionMade(self): + if self.factory.client_conn is not None: + self.transport.loseConnection() + self.factory.set_client_conn(self) + + def dataReceived(self, data): + self.factory.write_to_consumer(data) + + def sendData(self, data): + self.transport.write(data) + + def connectionLost(self, reason): + if self.factory.client_conn != self: + # only the first connected client_conn is relevant + return + + if reason.type == error.ConnectionDone: + self.factory.stream_finished() + else: + self.factory.stream_failed(reason) + + +@interface.implementer(stream.IStreamProducer) +@interface.implementer(interfaces.IPushProducer) +@interface.implementer(interfaces.IConsumer) +class StreamFactory(protocol.Factory): + protocol = StreamProtocol + consumer = None + producer = None + deferred = None + + def __init__(self): + self.client_conn = None + + def set_client_conn(self, stream_protocol): + # in Python 2.x, Twisted classes are old style + # so we can use property and setter + assert self.client_conn is None + self.client_conn = stream_protocol + if self.consumer is None: + self.client_conn.set_pause(True) + + def start_stream(self, consumer): + if self.consumer is not None: + raise exceptions.InternalError( + _("stream can't be used with multiple consumers") + ) + assert self.deferred is None + self.consumer = consumer + consumer.registerProducer(self, True) + self.deferred = defer.Deferred() + if self.client_conn is not None: + self.client_conn.set_pause(False) + return self.deferred + + def stream_finished(self): + self.client_conn = None + if self.consumer: + self.consumer.unregisterProducer() + self.port_listening.stopListening() + self.deferred.callback(None) + + def stream_failed(self, failure_): + self.client_conn = None + if self.consumer: + self.consumer.unregisterProducer() + self.port_listening.stopListening() + self.deferred.errback(failure_) + elif self.producer: + self.producer.stopProducing() + + def stop_stream(self): + if self.client_conn is not None: + self.client_conn.disconnect() + + def registerProducer(self, producer, streaming): + self.producer = producer + + def pauseProducing(self): + self.client_conn.set_pause(True) + + def resumeProducing(self): + self.client_conn.set_pause(False) + + def stopProducing(self): + if self.client_conn: + self.client_conn.disconnect() + + def write(self, data): + try: + self.client_conn.sendData(data) + except AttributeError: + log.warning(_("No client connected, can't send data")) + + def write_to_consumer(self, data): + self.consumer.write(data) + + +class JingleStream(object): + """This non standard jingle application send byte stream""" + + def __init__(self, host): + log.info(_("Plugin Stream initialization")) + self.host = host + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._j.register_application(NS_STREAM, self) + host.bridge.add_method( + "stream_out", + ".plugin", + in_sign="ss", + out_sign="s", + method=self._stream_out, + async_=True, + ) + + # jingle callbacks + + def _stream_out(self, to_jid_s, profile_key): + client = self.host.get_client(profile_key) + return defer.ensureDeferred(self.stream_out(client, jid.JID(to_jid_s))) + + async def stream_out(self, client, to_jid): + """send a stream + + @param peer_jid(jid.JID): recipient + @return: an unique id to identify the transfer + """ + port = START_PORT + factory = StreamFactory() + while True: + endpoint = endpoints.TCP4ServerEndpoint(reactor, port) + try: + port_listening = await endpoint.listen(factory) + except error.CannotListenError as e: + if e.socketError.errno == errno.EADDRINUSE: + port += 1 + else: + raise e + else: + factory.port_listening = port_listening + break + # we don't want to wait for IQ result of initiate + defer.ensureDeferred(self._j.initiate( + client, + to_jid, + [ + { + "app_ns": NS_STREAM, + "senders": self._j.ROLE_INITIATOR, + "app_kwargs": {"stream_object": factory}, + } + ], + )) + return str(port) + + def jingle_session_init(self, client, session, content_name, stream_object): + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + assert "stream_object" not in application_data + application_data["stream_object"] = stream_object + desc_elt = domish.Element((NS_STREAM, "description")) + return desc_elt + + @defer.inlineCallbacks + def jingle_request_confirmation(self, client, action, session, content_name, desc_elt): + """This method request confirmation for a jingle session""" + content_data = session["contents"][content_name] + if content_data["senders"] not in ( + self._j.ROLE_INITIATOR, + self._j.ROLE_RESPONDER, + ): + log.warning("Bad sender, assuming initiator") + content_data["senders"] = self._j.ROLE_INITIATOR + + confirm_data = yield xml_tools.defer_dialog( + self.host, + _(CONFIRM).format(peer=session["peer_jid"].full()), + _(CONFIRM_TITLE), + type_=C.XMLUI_DIALOG_CONFIRM, + action_extra={ + "from_jid": session["peer_jid"].full(), + "type": "STREAM", + }, + security_limit=SECURITY_LIMIT, + profile=client.profile, + ) + + if not C.bool(confirm_data["answer"]): + defer.returnValue(False) + try: + port = int(confirm_data["port"]) + except (ValueError, KeyError): + raise exceptions.DataError(_("given port is invalid")) + endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port) + factory = StreamFactory() + yield endpoint.connect(factory) + content_data["stream_object"] = factory + finished_d = content_data["finished_d"] = defer.Deferred() + args = [client, session, content_name, content_data] + finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) + defer.returnValue(True) + + def jingle_handler(self, client, action, session, content_name, desc_elt): + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): + pass + elif action == self._j.A_SESSION_ACCEPT: + assert not "stream_object" in content_data + content_data["stream_object"] = application_data["stream_object"] + finished_d = content_data["finished_d"] = defer.Deferred() + args = [client, session, content_name, content_data] + finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) + else: + log.warning("FIXME: unmanaged action {}".format(action)) + return desc_elt + + def _finished_cb(self, __, client, session, content_name, content_data): + log.info("Pipe transfer completed") + self._j.content_terminate(client, session, content_name) + content_data["stream_object"].stop_stream() + + def _finished_eb(self, failure, client, session, content_name, content_data): + log.warning("Error while streaming pipe: {}".format(failure)) + self._j.content_terminate( + client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT + ) + content_data["stream_object"].stop_stream()