Mercurial > libervia-backend
diff sat_frontends/jp/cmd_pipe.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | ab2696e34d29 |
children | 9d0df638c8b4 |
line wrap: on
line diff
--- a/sat_frontends/jp/cmd_pipe.py Wed Sep 25 08:53:38 2019 +0200 +++ b/sat_frontends/jp/cmd_pipe.py Wed Sep 25 08:56:41 2019 +0200 @@ -17,17 +17,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import socket +import asyncio +import errno +from functools import partial from sat_frontends.jp import base - from sat_frontends.jp.constants import Const as C +from sat_frontends.jp import xmlui_manager import sys from sat.core.i18n import _ from sat_frontends.tools import jid -import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI -from functools import partial -import socket -import socketserver -import errno __commands__ = ["Pipe"] @@ -37,59 +36,60 @@ class PipeOut(base.CommandBase): def __init__(self, host): super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) - self.need_loop = True def add_parser_options(self): self.parser.add_argument( "jid", help=_("the destination jid") ) - def streamOutCb(self, port): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(("127.0.0.1", int(port))) - while True: - buf = sys.stdin.read(4096) - if not buf: - break - try: - s.sendall(buf) - except socket.error as e: - if e.errno == errno.EPIPE: - sys.stderr.write(str(e) + "\n") - self.host.quit(1) - else: - raise e - self.host.quit() + async def start(self): + """ Create named pipe, and send stdin to it """ + try: + port = await self.host.bridge.streamOut( + await self.host.get_full_jid(self.args.jid), + self.profile, + ) + except Exception as e: + self.disp(f"can't start stream: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + # FIXME: we use temporarily blocking code here, as it simplify + # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol, + # sys.stdin.buffer)" doesn't work properly when a file is piped in + # (we get a "ValueError: Pipe transport is for pipes/sockets only.") + # while it's working well for simple text sending. - def start(self): - """ Create named pipe, and send stdin to it """ - self.host.bridge.streamOut( - self.host.get_full_jid(self.args.jid), - self.profile, - callback=self.streamOutCb, - errback=partial( - self.errback, - msg=_("can't start stream: {}"), - exit_code=C.EXIT_BRIDGE_ERRBACK, - ), - ) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(("127.0.0.1", int(port))) + + while True: + buf = sys.stdin.buffer.read(4096) + if not buf: + break + try: + s.sendall(buf) + except socket.error as e: + if e.errno == errno.EPIPE: + sys.stderr.write(f"e\n") + self.host.quit(1) + else: + raise e + self.host.quit() -class StreamServer(socketserver.BaseRequestHandler): - def handle(self): - while True: - data = self.request.recv(4096) - if not data: - break - sys.stdout.write(data) - try: - sys.stdout.flush() - except IOError as e: - sys.stderr.write(str(e) + "\n") - break - # calling shutdown will do a deadlock as we don't use separate thread - # this is a workaround (cf. https://stackoverflow.com/a/36017741) - self.server._BaseServer__shutdown_request = True +async def handle_stream_in(reader, writer, host): + """Write all received data to stdout""" + while True: + data = await reader.read(4096) + if not data: + break + sys.stdout.buffer.write(data) + try: + sys.stdout.flush() + except IOError as e: + sys.stderr.write(f"{e}\n") + break + host.quitFromSignal() class PipeIn(base.CommandAnswering): @@ -105,35 +105,33 @@ ) def getXmluiId(self, action_data): - # FIXME: we temporarily use ElementTree, but a real XMLUI managing module - # should be available in the future - # TODO: XMLUI module try: xml_ui = action_data["xmlui"] except KeyError: self.disp(_("Action has no XMLUI"), 1) else: - ui = ET.fromstring(xml_ui.encode("utf-8")) - xmlui_id = ui.get("submit") - if not xmlui_id: + ui = xmlui_manager.create(self.host, xml_ui) + if not ui.submit_id: self.disp(_("Invalid XMLUI received"), error=True) - return xmlui_id + self.quitFromSignal(C.EXIT_INTERNAL_ERROR) + return ui.submit_id - def onStreamAction(self, action_data, action_id, security_limit, profile): + async def onStreamAction(self, action_data, action_id, security_limit, profile): xmlui_id = self.getXmluiId(action_data) if xmlui_id is None: - return self.host.quitFromSignal(1) + self.host.quitFromSignal(C.EXIT_ERROR) try: from_jid = jid.JID(action_data["meta_from_jid"]) except KeyError: - self.disp(_("Ignoring action without from_jid data"), 1) + self.disp(_("Ignoring action without from_jid data"), error=True) return if not self.bare_jids or from_jid.bare in self.bare_jids: host, port = "localhost", START_PORT while True: try: - server = socketserver.TCPServer((host, port), StreamServer) + server = await asyncio.start_server( + partial(handle_stream_in, host=self.host), host, port) except socket.error as e: if e.errno == errno.EADDRINUSE: port += 1 @@ -142,12 +140,15 @@ else: break xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} - self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) - server.serve_forever() + await self.host.bridge.launchAction( + xmlui_id, xmlui_data, profile_key=profile) + async with server: + await server.serve_forever() self.host.quitFromSignal() - def start(self): + async def start(self): self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] + await self.start_answering() class Pipe(base.CommandBase):