comparison sat_frontends/jp/cmd_pipe.py @ 3040:fee60f17ebac

jp: jp asyncio port: /!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\ This patch implements the port of jp to asyncio, so it is now correctly using the bridge asynchronously, and it can be used with bridges like `pb`. This also simplify the code, notably for things which were previously implemented with many callbacks (like pagination with RSM). During the process, some behaviours have been modified/fixed, in jp and backends, check diff for details.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:56:41 +0200
parents ab2696e34d29
children 9d0df638c8b4
comparison
equal deleted inserted replaced
3039:a1bc34f90fa5 3040:fee60f17ebac
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 import socket
21 import asyncio
22 import errno
23 from functools import partial
20 from sat_frontends.jp import base 24 from sat_frontends.jp import base
21
22 from sat_frontends.jp.constants import Const as C 25 from sat_frontends.jp.constants import Const as C
26 from sat_frontends.jp import xmlui_manager
23 import sys 27 import sys
24 from sat.core.i18n import _ 28 from sat.core.i18n import _
25 from sat_frontends.tools import jid 29 from sat_frontends.tools import jid
26 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI
27 from functools import partial
28 import socket
29 import socketserver
30 import errno
31 30
32 __commands__ = ["Pipe"] 31 __commands__ = ["Pipe"]
33 32
34 START_PORT = 9999 33 START_PORT = 9999
35 34
36 35
37 class PipeOut(base.CommandBase): 36 class PipeOut(base.CommandBase):
38 def __init__(self, host): 37 def __init__(self, host):
39 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) 38 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream"))
40 self.need_loop = True
41 39
42 def add_parser_options(self): 40 def add_parser_options(self):
43 self.parser.add_argument( 41 self.parser.add_argument(
44 "jid", help=_("the destination jid") 42 "jid", help=_("the destination jid")
45 ) 43 )
46 44
47 def streamOutCb(self, port): 45 async def start(self):
48 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 46 """ Create named pipe, and send stdin to it """
49 s.connect(("127.0.0.1", int(port))) 47 try:
50 while True: 48 port = await self.host.bridge.streamOut(
51 buf = sys.stdin.read(4096) 49 await self.host.get_full_jid(self.args.jid),
52 if not buf: 50 self.profile,
53 break 51 )
54 try: 52 except Exception as e:
55 s.sendall(buf) 53 self.disp(f"can't start stream: {e}", error=True)
56 except socket.error as e: 54 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
57 if e.errno == errno.EPIPE: 55 else:
58 sys.stderr.write(str(e) + "\n") 56 # FIXME: we use temporarily blocking code here, as it simplify
59 self.host.quit(1) 57 # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol,
60 else: 58 # sys.stdin.buffer)" doesn't work properly when a file is piped in
61 raise e 59 # (we get a "ValueError: Pipe transport is for pipes/sockets only.")
62 self.host.quit() 60 # while it's working well for simple text sending.
63 61
64 def start(self): 62 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
65 """ Create named pipe, and send stdin to it """ 63 s.connect(("127.0.0.1", int(port)))
66 self.host.bridge.streamOut( 64
67 self.host.get_full_jid(self.args.jid), 65 while True:
68 self.profile, 66 buf = sys.stdin.buffer.read(4096)
69 callback=self.streamOutCb, 67 if not buf:
70 errback=partial( 68 break
71 self.errback, 69 try:
72 msg=_("can't start stream: {}"), 70 s.sendall(buf)
73 exit_code=C.EXIT_BRIDGE_ERRBACK, 71 except socket.error as e:
74 ), 72 if e.errno == errno.EPIPE:
75 ) 73 sys.stderr.write(f"e\n")
74 self.host.quit(1)
75 else:
76 raise e
77 self.host.quit()
76 78
77 79
78 class StreamServer(socketserver.BaseRequestHandler): 80 async def handle_stream_in(reader, writer, host):
79 def handle(self): 81 """Write all received data to stdout"""
80 while True: 82 while True:
81 data = self.request.recv(4096) 83 data = await reader.read(4096)
82 if not data: 84 if not data:
83 break 85 break
84 sys.stdout.write(data) 86 sys.stdout.buffer.write(data)
85 try: 87 try:
86 sys.stdout.flush() 88 sys.stdout.flush()
87 except IOError as e: 89 except IOError as e:
88 sys.stderr.write(str(e) + "\n") 90 sys.stderr.write(f"{e}\n")
89 break 91 break
90 #  calling shutdown will do a deadlock as we don't use separate thread 92 host.quitFromSignal()
91 # this is a workaround (cf. https://stackoverflow.com/a/36017741)
92 self.server._BaseServer__shutdown_request = True
93 93
94 94
95 class PipeIn(base.CommandAnswering): 95 class PipeIn(base.CommandAnswering):
96 def __init__(self, host): 96 def __init__(self, host):
97 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream")) 97 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream"))
103 nargs="*", 103 nargs="*",
104 help=_('Jids accepted (none means "accept everything")'), 104 help=_('Jids accepted (none means "accept everything")'),
105 ) 105 )
106 106
107 def getXmluiId(self, action_data): 107 def getXmluiId(self, action_data):
108 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
109 # should be available in the future
110 # TODO: XMLUI module
111 try: 108 try:
112 xml_ui = action_data["xmlui"] 109 xml_ui = action_data["xmlui"]
113 except KeyError: 110 except KeyError:
114 self.disp(_("Action has no XMLUI"), 1) 111 self.disp(_("Action has no XMLUI"), 1)
115 else: 112 else:
116 ui = ET.fromstring(xml_ui.encode("utf-8")) 113 ui = xmlui_manager.create(self.host, xml_ui)
117 xmlui_id = ui.get("submit") 114 if not ui.submit_id:
118 if not xmlui_id:
119 self.disp(_("Invalid XMLUI received"), error=True) 115 self.disp(_("Invalid XMLUI received"), error=True)
120 return xmlui_id 116 self.quitFromSignal(C.EXIT_INTERNAL_ERROR)
117 return ui.submit_id
121 118
122 def onStreamAction(self, action_data, action_id, security_limit, profile): 119 async def onStreamAction(self, action_data, action_id, security_limit, profile):
123 xmlui_id = self.getXmluiId(action_data) 120 xmlui_id = self.getXmluiId(action_data)
124 if xmlui_id is None: 121 if xmlui_id is None:
125 return self.host.quitFromSignal(1) 122 self.host.quitFromSignal(C.EXIT_ERROR)
126 try: 123 try:
127 from_jid = jid.JID(action_data["meta_from_jid"]) 124 from_jid = jid.JID(action_data["meta_from_jid"])
128 except KeyError: 125 except KeyError:
129 self.disp(_("Ignoring action without from_jid data"), 1) 126 self.disp(_("Ignoring action without from_jid data"), error=True)
130 return 127 return
131 128
132 if not self.bare_jids or from_jid.bare in self.bare_jids: 129 if not self.bare_jids or from_jid.bare in self.bare_jids:
133 host, port = "localhost", START_PORT 130 host, port = "localhost", START_PORT
134 while True: 131 while True:
135 try: 132 try:
136 server = socketserver.TCPServer((host, port), StreamServer) 133 server = await asyncio.start_server(
134 partial(handle_stream_in, host=self.host), host, port)
137 except socket.error as e: 135 except socket.error as e:
138 if e.errno == errno.EADDRINUSE: 136 if e.errno == errno.EADDRINUSE:
139 port += 1 137 port += 1
140 else: 138 else:
141 raise e 139 raise e
142 else: 140 else:
143 break 141 break
144 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} 142 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)}
145 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) 143 await self.host.bridge.launchAction(
146 server.serve_forever() 144 xmlui_id, xmlui_data, profile_key=profile)
145 async with server:
146 await server.serve_forever()
147 self.host.quitFromSignal() 147 self.host.quitFromSignal()
148 148
149 def start(self): 149 async def start(self):
150 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] 150 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
151 await self.start_answering()
151 152
152 153
153 class Pipe(base.CommandBase): 154 class Pipe(base.CommandBase):
154 subcommands = (PipeOut, PipeIn) 155 subcommands = (PipeOut, PipeIn)
155 156