Mercurial > libervia-backend
view libervia/cli/cmd_pipe.py @ 4340:ea72364131d5 default tip @
doc (components): Update Email Gateway documentation:
A section has been added to explain how attachments are handled.
fix 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:53:18 +0100 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio import errno from functools import partial import socket import sys from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format from libervia.cli import base from libervia.cli import xmlui_manager from libervia.cli.constants import Const as C from libervia.frontends.tools import jid __commands__ = ["Pipe"] START_PORT = 9999 class PipeOut(base.CommandBase): def __init__(self, host): super(PipeOut, self).__init__(host, "out", help=_("send a pipe a stream")) def add_parser_options(self): self.parser.add_argument("jid", help=_("the destination jid")) async def start(self): """Create named pipe, and send stdin to it""" try: port = await self.host.bridge.stream_out( await self.host.get_full_jid(self.args.jid), self.profile, ) except Exception as e: self.disp(f"can't start stream: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: # FIXME: we use temporarily blocking code here, as it simplify # asyncio port: "loop.connect_read_pipe(lambda: reader_protocol, # sys.stdin.buffer)" doesn't work properly when a file is piped in # (we get a "ValueError: Pipe transport is for pipes/sockets only.") # while it's working well for simple text sending. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(("127.0.0.1", int(port))) while True: buf = sys.stdin.buffer.read(4096) if not buf: break try: s.sendall(buf) except socket.error as e: if e.errno == errno.EPIPE: sys.stderr.write(f"e\n") self.host.quit(1) else: raise e self.host.quit() async def handle_stream_in(reader, writer, host): """Write all received data to stdout""" while True: data = await reader.read(4096) if not data: break sys.stdout.buffer.write(data) try: sys.stdout.flush() except IOError as e: sys.stderr.write(f"{e}\n") break host.quit_from_signal() class PipeIn(base.CommandAnswering): def __init__(self, host): super(PipeIn, self).__init__(host, "in", help=_("receive a pipe stream")) self.action_callbacks = {"STREAM": self.on_stream_action} def add_parser_options(self): self.parser.add_argument( "jids", nargs="*", help=_('Jids accepted (none means "accept everything")'), ) def get_xmlui_id(self, action_data): try: xml_ui = action_data["xmlui"] except KeyError: self.disp(_("Action has no XMLUI"), 1) else: ui = xmlui_manager.create(self.host, xml_ui) if not ui.submit_id: self.disp(_("Invalid XMLUI received"), error=True) self.quit_from_signal(C.EXIT_INTERNAL_ERROR) return ui.submit_id async def on_stream_action(self, action_data, action_id, security_limit, profile): xmlui_id = self.get_xmlui_id(action_data) if xmlui_id is None: self.host.quit_from_signal(C.EXIT_ERROR) try: from_jid = jid.JID(action_data["from_jid"]) except KeyError: self.disp(_("Ignoring action without from_jid data"), error=True) return if not self.bare_jids or from_jid.bare in self.bare_jids: host, port = "localhost", START_PORT while True: try: server = await asyncio.start_server( partial(handle_stream_in, host=self.host), host, port ) except socket.error as e: if e.errno == errno.EADDRINUSE: port += 1 else: raise e else: break xmlui_data = {"answer": C.BOOL_TRUE, "port": str(port)} await self.host.bridge.action_launch( xmlui_id, data_format.serialise(xmlui_data), profile_key=profile ) async with server: await server.serve_forever() self.host.quit_from_signal() async def start(self): self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] await self.start_answering() class Pipe(base.CommandBase): subcommands = (PipeOut, PipeIn) def __init__(self, host): super(Pipe, self).__init__( host, "pipe", use_profile=False, help=_("stream piping through XMPP") )