comparison libervia/cli/cmd_pipe.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_pipe.py@26b7ed2817da
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4074:26b7ed2817da 4075:47401850dec6
1 #!/usr/bin/env python3
2
3
4 # Libervia CLI
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 import asyncio
21 import errno
22 from functools import partial
23 import socket
24 import sys
25
26 from libervia.backend.core.i18n import _
27 from libervia.backend.tools.common import data_format
28 from libervia.cli import base
29 from libervia.cli import xmlui_manager
30 from libervia.cli.constants import Const as C
31 from libervia.frontends.tools import jid
32
33 __commands__ = ["Pipe"]
34
35 START_PORT = 9999
36
37
38 class PipeOut(base.CommandBase):
39 def __init__(self, host):
40 super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream"))
41
42 def add_parser_options(self):
43 self.parser.add_argument(
44 "jid", help=_("the destination jid")
45 )
46
47 async def start(self):
48 """ Create named pipe, and send stdin to it """
49 try:
50 port = await self.host.bridge.stream_out(
51 await self.host.get_full_jid(self.args.jid),
52 self.profile,
53 )
54 except Exception as e:
55 self.disp(f"can't start stream: {e}", error=True)
56 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
57 else:
58 # FIXME: we use temporarily blocking code here, as it simplify
59 # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol,
60 # sys.stdin.buffer)" doesn't work properly when a file is piped in
61 # (we get a "ValueError: Pipe transport is for pipes/sockets only.")
62 # while it's working well for simple text sending.
63
64 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
65 s.connect(("127.0.0.1", int(port)))
66
67 while True:
68 buf = sys.stdin.buffer.read(4096)
69 if not buf:
70 break
71 try:
72 s.sendall(buf)
73 except socket.error as e:
74 if e.errno == errno.EPIPE:
75 sys.stderr.write(f"e\n")
76 self.host.quit(1)
77 else:
78 raise e
79 self.host.quit()
80
81
82 async def handle_stream_in(reader, writer, host):
83 """Write all received data to stdout"""
84 while True:
85 data = await reader.read(4096)
86 if not data:
87 break
88 sys.stdout.buffer.write(data)
89 try:
90 sys.stdout.flush()
91 except IOError as e:
92 sys.stderr.write(f"{e}\n")
93 break
94 host.quit_from_signal()
95
96
97 class PipeIn(base.CommandAnswering):
98 def __init__(self, host):
99 super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream"))
100 self.action_callbacks = {"STREAM": self.on_stream_action}
101
102 def add_parser_options(self):
103 self.parser.add_argument(
104 "jids",
105 nargs="*",
106 help=_('Jids accepted (none means "accept everything")'),
107 )
108
109 def get_xmlui_id(self, action_data):
110 try:
111 xml_ui = action_data["xmlui"]
112 except KeyError:
113 self.disp(_("Action has no XMLUI"), 1)
114 else:
115 ui = xmlui_manager.create(self.host, xml_ui)
116 if not ui.submit_id:
117 self.disp(_("Invalid XMLUI received"), error=True)
118 self.quit_from_signal(C.EXIT_INTERNAL_ERROR)
119 return ui.submit_id
120
121 async def on_stream_action(self, action_data, action_id, security_limit, profile):
122 xmlui_id = self.get_xmlui_id(action_data)
123 if xmlui_id is None:
124 self.host.quit_from_signal(C.EXIT_ERROR)
125 try:
126 from_jid = jid.JID(action_data["from_jid"])
127 except KeyError:
128 self.disp(_("Ignoring action without from_jid data"), error=True)
129 return
130
131 if not self.bare_jids or from_jid.bare in self.bare_jids:
132 host, port = "localhost", START_PORT
133 while True:
134 try:
135 server = await asyncio.start_server(
136 partial(handle_stream_in, host=self.host), host, port)
137 except socket.error as e:
138 if e.errno == errno.EADDRINUSE:
139 port += 1
140 else:
141 raise e
142 else:
143 break
144 xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)}
145 await self.host.bridge.action_launch(
146 xmlui_id, data_format.serialise(xmlui_data), profile_key=profile
147 )
148 async with server:
149 await server.serve_forever()
150 self.host.quit_from_signal()
151
152 async def start(self):
153 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
154 await self.start_answering()
155
156
157 class Pipe(base.CommandBase):
158 subcommands = (PipeOut, PipeIn)
159
160 def __init__(self, host):
161 super(Pipe, self).__init__(
162 host, "pipe", use_profile=False, help=_("stream piping through XMPP")
163 )