Mercurial > libervia-backend
comparison libervia/cli/cmd_pipe.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_pipe.py@26b7ed2817da |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4074:26b7ed2817da | 4075:47401850dec6 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # Libervia CLI | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 import asyncio | |
21 import errno | |
22 from functools import partial | |
23 import socket | |
24 import sys | |
25 | |
26 from libervia.backend.core.i18n import _ | |
27 from libervia.backend.tools.common import data_format | |
28 from libervia.cli import base | |
29 from libervia.cli import xmlui_manager | |
30 from libervia.cli.constants import Const as C | |
31 from libervia.frontends.tools import jid | |
32 | |
33 __commands__ = ["Pipe"] | |
34 | |
35 START_PORT = 9999 | |
36 | |
37 | |
38 class PipeOut(base.CommandBase): | |
39 def __init__(self, host): | |
40 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) | |
41 | |
42 def add_parser_options(self): | |
43 self.parser.add_argument( | |
44 "jid", help=_("the destination jid") | |
45 ) | |
46 | |
47 async def start(self): | |
48 """ Create named pipe, and send stdin to it """ | |
49 try: | |
50 port = await self.host.bridge.stream_out( | |
51 await self.host.get_full_jid(self.args.jid), | |
52 self.profile, | |
53 ) | |
54 except Exception as e: | |
55 self.disp(f"can't start stream: {e}", error=True) | |
56 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
57 else: | |
58 # FIXME: we use temporarily blocking code here, as it simplify | |
59 # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol, | |
60 # sys.stdin.buffer)" doesn't work properly when a file is piped in | |
61 # (we get a "ValueError: Pipe transport is for pipes/sockets only.") | |
62 # while it's working well for simple text sending. | |
63 | |
64 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
65 s.connect(("127.0.0.1", int(port))) | |
66 | |
67 while True: | |
68 buf = sys.stdin.buffer.read(4096) | |
69 if not buf: | |
70 break | |
71 try: | |
72 s.sendall(buf) | |
73 except socket.error as e: | |
74 if e.errno == errno.EPIPE: | |
75 sys.stderr.write(f"e\n") | |
76 self.host.quit(1) | |
77 else: | |
78 raise e | |
79 self.host.quit() | |
80 | |
81 | |
82 async def handle_stream_in(reader, writer, host): | |
83 """Write all received data to stdout""" | |
84 while True: | |
85 data = await reader.read(4096) | |
86 if not data: | |
87 break | |
88 sys.stdout.buffer.write(data) | |
89 try: | |
90 sys.stdout.flush() | |
91 except IOError as e: | |
92 sys.stderr.write(f"{e}\n") | |
93 break | |
94 host.quit_from_signal() | |
95 | |
96 | |
97 class PipeIn(base.CommandAnswering): | |
98 def __init__(self, host): | |
99 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream")) | |
100 self.action_callbacks = {"STREAM": self.on_stream_action} | |
101 | |
102 def add_parser_options(self): | |
103 self.parser.add_argument( | |
104 "jids", | |
105 nargs="*", | |
106 help=_('Jids accepted (none means "accept everything")'), | |
107 ) | |
108 | |
109 def get_xmlui_id(self, action_data): | |
110 try: | |
111 xml_ui = action_data["xmlui"] | |
112 except KeyError: | |
113 self.disp(_("Action has no XMLUI"), 1) | |
114 else: | |
115 ui = xmlui_manager.create(self.host, xml_ui) | |
116 if not ui.submit_id: | |
117 self.disp(_("Invalid XMLUI received"), error=True) | |
118 self.quit_from_signal(C.EXIT_INTERNAL_ERROR) | |
119 return ui.submit_id | |
120 | |
121 async def on_stream_action(self, action_data, action_id, security_limit, profile): | |
122 xmlui_id = self.get_xmlui_id(action_data) | |
123 if xmlui_id is None: | |
124 self.host.quit_from_signal(C.EXIT_ERROR) | |
125 try: | |
126 from_jid = jid.JID(action_data["from_jid"]) | |
127 except KeyError: | |
128 self.disp(_("Ignoring action without from_jid data"), error=True) | |
129 return | |
130 | |
131 if not self.bare_jids or from_jid.bare in self.bare_jids: | |
132 host, port = "localhost", START_PORT | |
133 while True: | |
134 try: | |
135 server = await asyncio.start_server( | |
136 partial(handle_stream_in, host=self.host), host, port) | |
137 except socket.error as e: | |
138 if e.errno == errno.EADDRINUSE: | |
139 port += 1 | |
140 else: | |
141 raise e | |
142 else: | |
143 break | |
144 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} | |
145 await self.host.bridge.action_launch( | |
146 xmlui_id, data_format.serialise(xmlui_data), profile_key=profile | |
147 ) | |
148 async with server: | |
149 await server.serve_forever() | |
150 self.host.quit_from_signal() | |
151 | |
152 async def start(self): | |
153 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] | |
154 await self.start_answering() | |
155 | |
156 | |
157 class Pipe(base.CommandBase): | |
158 subcommands = (PipeOut, PipeIn) | |
159 | |
160 def __init__(self, host): | |
161 super(Pipe, self).__init__( | |
162 host, "pipe", use_profile=False, help=_("stream piping through XMPP") | |
163 ) |