Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pipe.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | ab2696e34d29 |
children | 9d0df638c8b4 |
comparison
equal
deleted
inserted
replaced
3039:a1bc34f90fa5 | 3040:fee60f17ebac |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 import socket | |
21 import asyncio | |
22 import errno | |
23 from functools import partial | |
20 from sat_frontends.jp import base | 24 from sat_frontends.jp import base |
21 | |
22 from sat_frontends.jp.constants import Const as C | 25 from sat_frontends.jp.constants import Const as C |
26 from sat_frontends.jp import xmlui_manager | |
23 import sys | 27 import sys |
24 from sat.core.i18n import _ | 28 from sat.core.i18n import _ |
25 from sat_frontends.tools import jid | 29 from sat_frontends.tools import jid |
26 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI | |
27 from functools import partial | |
28 import socket | |
29 import socketserver | |
30 import errno | |
31 | 30 |
32 __commands__ = ["Pipe"] | 31 __commands__ = ["Pipe"] |
33 | 32 |
34 START_PORT = 9999 | 33 START_PORT = 9999 |
35 | 34 |
36 | 35 |
37 class PipeOut(base.CommandBase): | 36 class PipeOut(base.CommandBase): |
38 def __init__(self, host): | 37 def __init__(self, host): |
39 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) | 38 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) |
40 self.need_loop = True | |
41 | 39 |
42 def add_parser_options(self): | 40 def add_parser_options(self): |
43 self.parser.add_argument( | 41 self.parser.add_argument( |
44 "jid", help=_("the destination jid") | 42 "jid", help=_("the destination jid") |
45 ) | 43 ) |
46 | 44 |
47 def streamOutCb(self, port): | 45 async def start(self): |
48 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 46 """ Create named pipe, and send stdin to it """ |
49 s.connect(("127.0.0.1", int(port))) | 47 try: |
50 while True: | 48 port = await self.host.bridge.streamOut( |
51 buf = sys.stdin.read(4096) | 49 await self.host.get_full_jid(self.args.jid), |
52 if not buf: | 50 self.profile, |
53 break | 51 ) |
54 try: | 52 except Exception as e: |
55 s.sendall(buf) | 53 self.disp(f"can't start stream: {e}", error=True) |
56 except socket.error as e: | 54 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
57 if e.errno == errno.EPIPE: | 55 else: |
58 sys.stderr.write(str(e) + "\n") | 56 # FIXME: we use temporarily blocking code here, as it simplify |
59 self.host.quit(1) | 57 # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol, |
60 else: | 58 # sys.stdin.buffer)" doesn't work properly when a file is piped in |
61 raise e | 59 # (we get a "ValueError: Pipe transport is for pipes/sockets only.") |
62 self.host.quit() | 60 # while it's working well for simple text sending. |
63 | 61 |
64 def start(self): | 62 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
65 """ Create named pipe, and send stdin to it """ | 63 s.connect(("127.0.0.1", int(port))) |
66 self.host.bridge.streamOut( | 64 |
67 self.host.get_full_jid(self.args.jid), | 65 while True: |
68 self.profile, | 66 buf = sys.stdin.buffer.read(4096) |
69 callback=self.streamOutCb, | 67 if not buf: |
70 errback=partial( | 68 break |
71 self.errback, | 69 try: |
72 msg=_("can't start stream: {}"), | 70 s.sendall(buf) |
73 exit_code=C.EXIT_BRIDGE_ERRBACK, | 71 except socket.error as e: |
74 ), | 72 if e.errno == errno.EPIPE: |
75 ) | 73 sys.stderr.write(f"e\n") |
74 self.host.quit(1) | |
75 else: | |
76 raise e | |
77 self.host.quit() | |
76 | 78 |
77 | 79 |
78 class StreamServer(socketserver.BaseRequestHandler): | 80 async def handle_stream_in(reader, writer, host): |
79 def handle(self): | 81 """Write all received data to stdout""" |
80 while True: | 82 while True: |
81 data = self.request.recv(4096) | 83 data = await reader.read(4096) |
82 if not data: | 84 if not data: |
83 break | 85 break |
84 sys.stdout.write(data) | 86 sys.stdout.buffer.write(data) |
85 try: | 87 try: |
86 sys.stdout.flush() | 88 sys.stdout.flush() |
87 except IOError as e: | 89 except IOError as e: |
88 sys.stderr.write(str(e) + "\n") | 90 sys.stderr.write(f"{e}\n") |
89 break | 91 break |
90 # calling shutdown will do a deadlock as we don't use separate thread | 92 host.quitFromSignal() |
91 # this is a workaround (cf. https://stackoverflow.com/a/36017741) | |
92 self.server._BaseServer__shutdown_request = True | |
93 | 93 |
94 | 94 |
95 class PipeIn(base.CommandAnswering): | 95 class PipeIn(base.CommandAnswering): |
96 def __init__(self, host): | 96 def __init__(self, host): |
97 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream")) | 97 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream")) |
103 nargs="*", | 103 nargs="*", |
104 help=_('Jids accepted (none means "accept everything")'), | 104 help=_('Jids accepted (none means "accept everything")'), |
105 ) | 105 ) |
106 | 106 |
107 def getXmluiId(self, action_data): | 107 def getXmluiId(self, action_data): |
108 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module | |
109 # should be available in the future | |
110 # TODO: XMLUI module | |
111 try: | 108 try: |
112 xml_ui = action_data["xmlui"] | 109 xml_ui = action_data["xmlui"] |
113 except KeyError: | 110 except KeyError: |
114 self.disp(_("Action has no XMLUI"), 1) | 111 self.disp(_("Action has no XMLUI"), 1) |
115 else: | 112 else: |
116 ui = ET.fromstring(xml_ui.encode("utf-8")) | 113 ui = xmlui_manager.create(self.host, xml_ui) |
117 xmlui_id = ui.get("submit") | 114 if not ui.submit_id: |
118 if not xmlui_id: | |
119 self.disp(_("Invalid XMLUI received"), error=True) | 115 self.disp(_("Invalid XMLUI received"), error=True) |
120 return xmlui_id | 116 self.quitFromSignal(C.EXIT_INTERNAL_ERROR) |
117 return ui.submit_id | |
121 | 118 |
122 def onStreamAction(self, action_data, action_id, security_limit, profile): | 119 async def onStreamAction(self, action_data, action_id, security_limit, profile): |
123 xmlui_id = self.getXmluiId(action_data) | 120 xmlui_id = self.getXmluiId(action_data) |
124 if xmlui_id is None: | 121 if xmlui_id is None: |
125 return self.host.quitFromSignal(1) | 122 self.host.quitFromSignal(C.EXIT_ERROR) |
126 try: | 123 try: |
127 from_jid = jid.JID(action_data["meta_from_jid"]) | 124 from_jid = jid.JID(action_data["meta_from_jid"]) |
128 except KeyError: | 125 except KeyError: |
129 self.disp(_("Ignoring action without from_jid data"), 1) | 126 self.disp(_("Ignoring action without from_jid data"), error=True) |
130 return | 127 return |
131 | 128 |
132 if not self.bare_jids or from_jid.bare in self.bare_jids: | 129 if not self.bare_jids or from_jid.bare in self.bare_jids: |
133 host, port = "localhost", START_PORT | 130 host, port = "localhost", START_PORT |
134 while True: | 131 while True: |
135 try: | 132 try: |
136 server = socketserver.TCPServer((host, port), StreamServer) | 133 server = await asyncio.start_server( |
134 partial(handle_stream_in, host=self.host), host, port) | |
137 except socket.error as e: | 135 except socket.error as e: |
138 if e.errno == errno.EADDRINUSE: | 136 if e.errno == errno.EADDRINUSE: |
139 port += 1 | 137 port += 1 |
140 else: | 138 else: |
141 raise e | 139 raise e |
142 else: | 140 else: |
143 break | 141 break |
144 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} | 142 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} |
145 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) | 143 await self.host.bridge.launchAction( |
146 server.serve_forever() | 144 xmlui_id, xmlui_data, profile_key=profile) |
145 async with server: | |
146 await server.serve_forever() | |
147 self.host.quitFromSignal() | 147 self.host.quitFromSignal() |
148 | 148 |
149 def start(self): | 149 async def start(self): |
150 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] | 150 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] |
151 await self.start_answering() | |
151 | 152 |
152 | 153 |
153 class Pipe(base.CommandBase): | 154 class Pipe(base.CommandBase): |
154 subcommands = (PipeOut, PipeIn) | 155 subcommands = (PipeOut, PipeIn) |
155 | 156 |