view libervia/backend/tools/common/async_process.py @ 4282:8da377040ba6

doc (encryption): update pubsub encryption specifications.
author Goffi <goffi@goffi.org>
date Sat, 13 Jul 2024 17:45:47 +0200
parents 9308b2d15fd2
children
line wrap: on
line source

#!/usr/bin/env python3


# SAT: a jabber client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""tools to launch process in a async way (using Twisted)"""

import os.path
from pathlib import Path
from typing import Any
from twisted.internet import defer, reactor, protocol
from twisted.python.failure import Failure
from libervia.backend.core.i18n import _
from libervia.backend.core import exceptions
from libervia.backend.core.log import getLogger

log = getLogger(__name__)


class CommandProtocol(protocol.ProcessProtocol):
    """handle an external command"""

    # name of the command (unicode)
    name = None
    # full path to the command (bytes)
    command = None
    # True to activate logging of command outputs (bool)
    log = False

    def __init__(self, deferred, stdin=None):
        """
        @param deferred(defer.Deferred): will be called when command is completed
        @param stdin(str, None): if not None, will be push to standard input
        """
        self._stdin = stdin
        self._deferred = deferred
        self.data = []
        self.err_data = []
        self.cmd_args: list[str] | None = None
        self.cmd_kwargs: dict[str, Any] | None = None

    @property
    def command_name(self):
        """returns command name or empty string if it can't be guessed"""
        if self.name is not None:
            return self.name
        elif self.command is not None:
            return os.path.splitext(os.path.basename(self.command))[0].decode(
                "utf-8", "ignore"
            )
        else:
            return ""

    def connectionMade(self):
        if self._stdin is not None:
            self.transport.write(self._stdin)
            self.transport.closeStdin()

    def outReceived(self, data):
        if self.log:
            log.info(data.decode("utf-8", "replace"))
        self.data.append(data)

    def errReceived(self, data):
        if self.log:
            log.warning(data.decode("utf-8", "replace"))
        self.err_data.append(data)

    def processEnded(self, reason):
        data = b"".join(self.data)
        if reason.value.exitCode == 0:
            log.debug(f"{self.command_name!r} command succeed")
            # we don't use "replace" on purpose, we want an exception if decoding
            # is not working properly
            self._deferred.callback(data)
        else:
            err_data = b"".join(self.err_data)

            assert self.cmd_args is not None
            assert self.cmd_kwargs is not None
            msg = _(
                "Can't complete {name} command (error code: {code}):\n"
                "Executed command: {command}\n"
                "Keyword arguments:\n"
                "{command_kw}\n\n"
                "stderr:\n{stderr}\n{stdout}\n"
            ).format(
                name=self.command_name,
                code=reason.value.exitCode,
                command=" ".join(self.cmd_args),
                command_kw="\n".join(
                    f"    - {k} = {v!r}" for k, v in self.cmd_kwargs.items()
                ),
                stderr=err_data.decode(errors="replace"),
                stdout="stdout: " + data.decode(errors="replace") if data else "",
            )
            self._deferred.errback(
                Failure(exceptions.CommandException(msg, data, err_data))
            )

    @classmethod
    def run(cls, *args, **kwargs):
        """Create a new CommandProtocol and execute the given command.

        @param *args(unicode): command arguments
            if cls.command is specified, it will be the path to the command to execute
            otherwise, first argument must be the path
        @param **kwargs: can be:
            - stdin(unicode, None): data to push to standard input
            - verbose(bool): if True stdout and stderr will be logged
            other keyword arguments will be used in reactor.spawnProcess
        @return ((D)bytes): stdout in case of success
        @raise RuntimeError: command returned a non zero status
            stdin and stdout will be given as arguments

        """
        stdin = kwargs.pop("stdin", None)
        if stdin is not None:
            stdin = stdin.encode("utf-8")
        verbose = kwargs.pop("verbose", False)
        args = list(args)
        d = defer.Deferred()
        prot = cls(d, stdin=stdin)
        if verbose:
            prot.log = True
        if cls.command is None:
            if not args:
                raise ValueError(
                    "You must either specify cls.command or use a full path to command "
                    "to execute as first argument"
                )
            command = args.pop(0)
            if isinstance(command, Path):
                command = str(command)
            if prot.name is None:
                name = os.path.splitext(os.path.basename(command))[0]
                prot.name = name
        else:
            command = cls.command
        cmd_args = [command] + args
        prot.cmd_args = cmd_args
        prot.cmd_kwargs = kwargs
        if "env" not in kwargs:
            # we pass parent environment by default
            # FIXME: `None` doesn't seem to work, despite what documentation says, to be
            #    checked and reported upstream if confirmed.
            kwargs["env"] = os.environ
        reactor.spawnProcess(prot, command, cmd_args, **kwargs)
        return d


run = CommandProtocol.run