diff sat_frontends/jp/cmd_pipe.py @ 3040:fee60f17ebac

jp: jp asyncio port: /!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\ This patch implements the port of jp to asyncio, so it is now correctly using the bridge asynchronously, and it can be used with bridges like `pb`. This also simplify the code, notably for things which were previously implemented with many callbacks (like pagination with RSM). During the process, some behaviours have been modified/fixed, in jp and backends, check diff for details.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:56:41 +0200
parents ab2696e34d29
children 9d0df638c8b4
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pipe.py	Wed Sep 25 08:53:38 2019 +0200
+++ b/sat_frontends/jp/cmd_pipe.py	Wed Sep 25 08:56:41 2019 +0200
@@ -17,17 +17,16 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import socket
+import asyncio
+import errno
+from functools import partial
 from sat_frontends.jp import base
-
 from sat_frontends.jp.constants import Const as C
+from sat_frontends.jp import xmlui_manager
 import sys
 from sat.core.i18n import _
 from sat_frontends.tools import jid
-import xml.etree.ElementTree as ET  # FIXME: used temporarily to manage XMLUI
-from functools import partial
-import socket
-import socketserver
-import errno
 
 __commands__ = ["Pipe"]
 
@@ -37,59 +36,60 @@
 class PipeOut(base.CommandBase):
     def __init__(self, host):
         super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream"))
-        self.need_loop = True
 
     def add_parser_options(self):
         self.parser.add_argument(
             "jid", help=_("the destination jid")
         )
 
-    def streamOutCb(self, port):
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        s.connect(("127.0.0.1", int(port)))
-        while True:
-            buf = sys.stdin.read(4096)
-            if not buf:
-                break
-            try:
-                s.sendall(buf)
-            except socket.error as e:
-                if e.errno == errno.EPIPE:
-                    sys.stderr.write(str(e) + "\n")
-                    self.host.quit(1)
-                else:
-                    raise e
-        self.host.quit()
+    async def start(self):
+        """ Create named pipe, and send stdin to it """
+        try:
+            port = await self.host.bridge.streamOut(
+                await self.host.get_full_jid(self.args.jid),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't start stream: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            # FIXME: we use temporarily blocking code here, as it simplify
+            #        asyncio port: "loop.connect_read_pipe(lambda: reader_protocol,
+            #        sys.stdin.buffer)" doesn't work properly when a file is piped in
+            #        (we get a "ValueError: Pipe transport is for pipes/sockets only.")
+            #        while it's working well for simple text sending.
 
-    def start(self):
-        """ Create named pipe, and send stdin to it """
-        self.host.bridge.streamOut(
-            self.host.get_full_jid(self.args.jid),
-            self.profile,
-            callback=self.streamOutCb,
-            errback=partial(
-                self.errback,
-                msg=_("can't start stream: {}"),
-                exit_code=C.EXIT_BRIDGE_ERRBACK,
-            ),
-        )
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect(("127.0.0.1", int(port)))
+
+            while True:
+                buf = sys.stdin.buffer.read(4096)
+                if not buf:
+                    break
+                try:
+                    s.sendall(buf)
+                except socket.error as e:
+                    if e.errno == errno.EPIPE:
+                        sys.stderr.write(f"e\n")
+                        self.host.quit(1)
+                    else:
+                        raise e
+            self.host.quit()
 
 
-class StreamServer(socketserver.BaseRequestHandler):
-    def handle(self):
-        while True:
-            data = self.request.recv(4096)
-            if not data:
-                break
-            sys.stdout.write(data)
-            try:
-                sys.stdout.flush()
-            except IOError as e:
-                sys.stderr.write(str(e) + "\n")
-                break
-        #  calling shutdown will do a deadlock as we don't use separate thread
-        # this is a workaround (cf. https://stackoverflow.com/a/36017741)
-        self.server._BaseServer__shutdown_request = True
+async def handle_stream_in(reader, writer, host):
+    """Write all received data to stdout"""
+    while True:
+        data = await reader.read(4096)
+        if not data:
+            break
+        sys.stdout.buffer.write(data)
+        try:
+            sys.stdout.flush()
+        except IOError as e:
+            sys.stderr.write(f"{e}\n")
+            break
+    host.quitFromSignal()
 
 
 class PipeIn(base.CommandAnswering):
@@ -105,35 +105,33 @@
         )
 
     def getXmluiId(self, action_data):
-        # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
-        #        should be available in the future
-        # TODO: XMLUI module
         try:
             xml_ui = action_data["xmlui"]
         except KeyError:
             self.disp(_("Action has no XMLUI"), 1)
         else:
-            ui = ET.fromstring(xml_ui.encode("utf-8"))
-            xmlui_id = ui.get("submit")
-            if not xmlui_id:
+            ui = xmlui_manager.create(self.host, xml_ui)
+            if not ui.submit_id:
                 self.disp(_("Invalid XMLUI received"), error=True)
-            return xmlui_id
+                self.quitFromSignal(C.EXIT_INTERNAL_ERROR)
+            return ui.submit_id
 
-    def onStreamAction(self, action_data, action_id, security_limit, profile):
+    async def onStreamAction(self, action_data, action_id, security_limit, profile):
         xmlui_id = self.getXmluiId(action_data)
         if xmlui_id is None:
-            return self.host.quitFromSignal(1)
+            self.host.quitFromSignal(C.EXIT_ERROR)
         try:
             from_jid = jid.JID(action_data["meta_from_jid"])
         except KeyError:
-            self.disp(_("Ignoring action without from_jid data"), 1)
+            self.disp(_("Ignoring action without from_jid data"), error=True)
             return
 
         if not self.bare_jids or from_jid.bare in self.bare_jids:
             host, port = "localhost", START_PORT
             while True:
                 try:
-                    server = socketserver.TCPServer((host, port), StreamServer)
+                    server = await asyncio.start_server(
+                        partial(handle_stream_in, host=self.host), host, port)
                 except socket.error as e:
                     if e.errno == errno.EADDRINUSE:
                         port += 1
@@ -142,12 +140,15 @@
                 else:
                     break
             xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)}
-            self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile)
-            server.serve_forever()
+            await self.host.bridge.launchAction(
+                xmlui_id, xmlui_data, profile_key=profile)
+            async with server:
+                await server.serve_forever()
             self.host.quitFromSignal()
 
-    def start(self):
+    async def start(self):
         self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
+        await self.start_answering()
 
 
 class Pipe(base.CommandBase):