diff frontends/src/jp/cmd_pipe.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children
line wrap: on
line diff
--- a/frontends/src/jp/cmd_pipe.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/frontends/src/jp/cmd_pipe.py	Thu Feb 08 00:37:42 2018 +0100
@@ -19,18 +19,19 @@
 
 from sat_frontends.jp import base
 
-import tempfile
+from sat_frontends.jp.constants import Const as C
 import sys
-import os
-import os.path
-import shutil
 from sat.core.i18n import _
 from sat_frontends.tools import jid
 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI
-
+from functools import partial
+import socket
+import SocketServer
+import errno
 
 __commands__ = ["Pipe"]
 
+START_PORT = 9999
 
 class PipeOut(base.CommandBase):
 
@@ -41,31 +42,64 @@
     def add_parser_options(self):
         self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid"))
 
+    def streamOutCb(self, port):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect(('127.0.0.1', int(port)))
+        while True:
+            buf = sys.stdin.read(4096)
+            if not buf:
+                break
+            try:
+                s.sendall(buf)
+            except socket.error as e:
+                if e.errno == errno.EPIPE:
+                    sys.stderr.write(str(e) + '\n')
+                    self.host.quit(1)
+                else:
+                    raise e
+        self.host.quit()
+
     def start(self):
         """ Create named pipe, and send stdin to it """
-        # TODO: check_jids
-        tmp_dir = tempfile.mkdtemp()
-        fifopath = os.path.join(tmp_dir,"pipe_out")
-        os.mkfifo(fifopath)
-        self.host.bridge.pipeOut(self.host.get_full_jid(self.args.jid), fifopath, self.profile)
-        with open(fifopath, 'w') as f:
-            shutil.copyfileobj(sys.stdin, f)
-        shutil.rmtree(tmp_dir)
-        self.host.quit()
+        self.host.bridge.streamOut(
+            self.host.get_full_jid(self.args.jid),
+            self.profile,
+            callback=self.streamOutCb,
+            errback=partial(self.errback,
+                            msg=_(u"can't start stream: {}"),
+                            exit_code=C.EXIT_BRIDGE_ERRBACK))
+
+
+class StreamServer(SocketServer.BaseRequestHandler):
+
+    def handle(self):
+        while True:
+            data = self.request.recv(4096)
+            if not data:
+                break
+            sys.stdout.write(data)
+            try:
+                sys.stdout.flush()
+            except IOError as e:
+                sys.stderr.write(str(e) + '\n')
+                break
+        # calling shutdown will do a deadlock as we don't use separate thread
+        # this is a workaround (cf. https://stackoverflow.com/a/36017741)
+        self.server._BaseServer__shutdown_request = True
 
 
 class PipeIn(base.CommandAnswering):
 
     def __init__(self, host):
         super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream'))
-        self.action_callbacks = {"PIPE": self.onPipeAction}
+        self.action_callbacks = {"STREAM": self.onStreamAction}
 
     def add_parser_options(self):
         self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")'))
 
     def getXmluiId(self, action_data):
         # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
-        #        should be available in the futur
+        #        should be available in the future
         # TODO: XMLUI module
         try:
             xml_ui = action_data['xmlui']
@@ -78,7 +112,7 @@
                 self.disp(_(u"Invalid XMLUI received"), error=True)
             return xmlui_id
 
-    def onPipeAction(self, action_data, action_id, security_limit, profile):
+    def onStreamAction(self, action_data, action_id, security_limit, profile):
         xmlui_id = self.getXmluiId(action_data)
         if xmlui_id is None:
             return self.host.quitFromSignal(1)
@@ -89,16 +123,22 @@
             return
 
         if not self.bare_jids or from_jid.bare in self.bare_jids:
-            tmp_dir = tempfile.mkdtemp()
-            fifopath = os.path.join(tmp_dir,"pipe_in")
-            os.mkfifo(fifopath)
-            xmlui_data = {'path': fifopath}
+            host, port = "localhost", START_PORT
+            while True:
+                try:
+                    server = SocketServer.TCPServer((host, port), StreamServer)
+                except socket.error as e:
+                    if e.errno == errno.EADDRINUSE:
+                        port += 1
+                    else:
+                        raise e
+                else:
+                    break
+            xmlui_data = {'answer': C.BOOL_TRUE,
+                          'port': unicode(port)}
             self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile)
-
-            with open(fifopath, 'r') as f:
-                shutil.copyfileobj(f, sys.stdout)
-            shutil.rmtree(tmp_dir)
-            self.host.quit()
+            server.serve_forever()
+            self.host.quitFromSignal()
 
     def start(self):
         self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]