diff src/plugins/plugin_exp_jingle_stream.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents src/plugins/plugin_exp_pipe.py@0046283a285d
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/plugin_exp_jingle_stream.py	Thu Feb 08 00:37:42 2018 +0100
@@ -0,0 +1,281 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+# SAT plugin for managing pipes (experimental)
+# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+from sat.core.i18n import _, D_
+from sat.core.constants import Const as C
+from sat.core import exceptions
+from sat.core.log import getLogger
+log = getLogger(__name__)
+from sat.tools import xml_tools
+from sat.tools import stream
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid
+from twisted.internet import defer
+from twisted.internet import protocol
+from twisted.internet import endpoints
+from twisted.internet import reactor
+from twisted.internet import error
+from twisted.internet import interfaces
+from zope import interface
+import errno
+NS_STREAM = 'http://salut-a-toi.org/protocol/stream'
+START_PORT = 8888
+    C.PI_NAME: "Jingle Stream Plugin",
+    C.PI_TYPE: "EXP",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0166"],
+    C.PI_MAIN: "JingleStream",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Jingle Stream plugin""")
+CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?")
+CONFIRM_TITLE = D_(u"Stream Request")
+class StreamProtocol(protocol.Protocol):
+    def __init__(self):
+        self.pause = False
+    def setPause(self, paused):
+        # in Python 2.x, Twisted classes are old style
+        # so we can use property and setter
+        if paused:
+            if not self.pause:
+                self.transport.pauseProducing()
+                self.pause = True
+        else:
+            if self.pause:
+                self.transport.resumeProducing()
+                self.pause = False
+    def disconnect(self):
+        self.transport.loseConnection()
+    def connectionMade(self):
+        if self.factory.client_conn is not None:
+            self.transport.loseConnection()
+        self.factory.setClientConn(self)
+    def dataReceived(self, data):
+        self.factory.writeToConsumer(data)
+    def sendData(self, data):
+        self.transport.write(data)
+    def connectionLost(self, reason):
+        if self.factory.client_conn != self:
+            # only the first connected client_conn is relevant
+            return
+        if reason.type == error.ConnectionDone:
+            self.factory.streamFinished()
+        else:
+            self.factory.streamFailed(reason)
+class StreamFactory(protocol.Factory):
+    protocol = StreamProtocol
+    consumer = None
+    producer = None
+    deferred = None
+    def __init__(self):
+        self.client_conn = None
+    def setClientConn(self, stream_protocol):
+        # in Python 2.x, Twisted classes are old style
+        # so we can use property and setter
+        assert self.client_conn is None
+        self.client_conn = stream_protocol
+        if self.consumer is None:
+            self.client_conn.setPause(True)
+    def startStream(self, consumer):
+        if self.consumer is not None:
+            raise exceptions.InternalError(_(u"stream can't be used with multiple consumers"))
+        assert self.deferred is None
+        self.consumer = consumer
+        consumer.registerProducer(self, True)
+        self.deferred = defer.Deferred()
+        if self.client_conn is not None:
+            self.client_conn.setPause(False)
+        return self.deferred
+    def streamFinished(self):
+        self.client_conn = None
+        if self.consumer:
+            self.consumer.unregisterProducer()
+            self.port_listening.stopListening()
+        self.deferred.callback(None)
+    def streamFailed(self, failure_):
+        self.client_conn = None
+        if self.consumer:
+            self.consumer.unregisterProducer()
+            self.port_listening.stopListening()
+            self.deferred.errback(failure_)
+        elif self.producer:
+            self.producer.stopProducing()
+    def stopStream(self):
+        if self.client_conn is not None:
+            self.client_conn.disconnect()
+    def registerProducer(self, producer, streaming):
+        self.producer = producer
+    def pauseProducing(self):
+        self.client_conn.setPause(True)
+    def resumeProducing(self):
+        self.client_conn.setPause(False)
+    def stopProducing(self):
+        if self.client_conn:
+            self.client_conn.disconnect()
+    def write(self, data):
+        try:
+            self.client_conn.sendData(data)
+        except AttributeError:
+            log.warning(_(u"No client connected, can't send data"))
+    def writeToConsumer(self, data):
+        self.consumer.write(data)
+class JingleStream(object):
+    """This non standard jingle application send byte stream"""
+    def __init__(self, host):
+        log.info(_("Plugin Stream initialization"))
+        self.host = host
+        self._j = host.plugins["XEP-0166"] # shortcut to access jingle
+        self._j.registerApplication(NS_STREAM, self)
+        host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True)
+    # jingle callbacks
+    def _streamOut(self, to_jid_s, profile_key):
+        client = self.host.getClient(profile_key)
+        return self.streamOut(client, jid.JID(to_jid_s))
+    @defer.inlineCallbacks
+    def streamOut(self, client, to_jid):
+        """send a stream
+        @param peer_jid(jid.JID): recipient
+        @return: an unique id to identify the transfer
+        """
+        port = START_PORT
+        factory = StreamFactory()
+        while True:
+            endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
+            try:
+                port_listening = yield endpoint.listen(factory)
+            except error.CannotListenError as e:
+                if e.socketError.errno == errno.EADDRINUSE:
+                    port += 1
+                else:
+                    raise e
+            else:
+                factory.port_listening = port_listening
+                break
+        self._j.initiate(client,
+                         to_jid,
+                         [{'app_ns': NS_STREAM,
+                           'senders': self._j.ROLE_INITIATOR,
+                           'app_kwargs': {'stream_object': factory},
+                         }])
+        defer.returnValue(unicode(port))
+    def jingleSessionInit(self, client, session, content_name, stream_object):
+        content_data = session['contents'][content_name]
+        application_data = content_data['application_data']
+        assert 'stream_object' not in application_data
+        application_data['stream_object'] = stream_object
+        desc_elt = domish.Element((NS_STREAM, 'description'))
+        return desc_elt
+    @defer.inlineCallbacks
+    def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt):
+        """This method request confirmation for a jingle session"""
+        content_data = session['contents'][content_name]
+        if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER):
+            log.warning(u"Bad sender, assuming initiator")
+            content_data['senders'] = self._j.ROLE_INITIATOR
+        confirm_data = yield xml_tools.deferDialog(self.host,
+            _(CONFIRM).format(peer=session['peer_jid'].full()),
+            _(CONFIRM_TITLE),
+            type_=C.XMLUI_DIALOG_CONFIRM,
+            action_extra={'meta_from_jid': session['peer_jid'].full(),
+                          'meta_type': "STREAM",
+                         },
+            security_limit=SECURITY_LIMIT,
+            profile=client.profile)
+        if not C.bool(confirm_data['answer']):
+            defer.returnValue(False)
+        try:
+            port = int(confirm_data['port'])
+        except (ValueError, KeyError):
+            raise exceptions.DataError(_(u'given port is invalid'))
+        endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port)
+        factory = StreamFactory()
+        yield endpoint.connect(factory)
+        content_data['stream_object'] = factory
+        finished_d = content_data['finished_d'] = defer.Deferred()
+        args = [client, session, content_name, content_data]
+        finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
+        defer.returnValue(True)
+    def jingleHandler(self, client, action, session, content_name, desc_elt):
+        content_data = session['contents'][content_name]
+        application_data = content_data['application_data']
+        if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
+            pass
+        elif action == self._j.A_SESSION_ACCEPT:
+            assert not 'stream_object' in content_data
+            content_data['stream_object'] = application_data['stream_object']
+            finished_d = content_data['finished_d'] = defer.Deferred()
+            args = [client, session, content_name, content_data]
+            finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
+        else:
+            log.warning(u"FIXME: unmanaged action {}".format(action))
+        return desc_elt
+    def _finishedCb(self, dummy, client, session, content_name, content_data):
+        log.info(u"Pipe transfer completed")
+        self._j.contentTerminate(client, session, content_name)
+        content_data['stream_object'].stopStream()
+    def _finishedEb(self, failure, client, session, content_name, content_data):
+        log.warning(u"Error while streaming pipe: {}".format(failure))
+        self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT)
+        content_data['stream_object'].stopStream()