view libervia/backend/plugins/plugin_adhoc_dbus.py @ 4306:94e0968987cd

plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for adding D-Bus to Ad-Hoc Commands
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import OrderedDict
import os.path
import uuid

from twisted.internet import defer
from twisted.internet import reactor
from twisted.words.protocols.jabber import jid
from wokkel import data_form

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)


try:
    from lxml import etree
except ImportError:
    etree = None
    log.warning(
        "Missing module lxml, please download/install it from http://lxml.de/ ."
        "Auto D-Bus discovery will be disabled"
    )

try:
    import txdbus
    from txdbus import client as dbus_client
except ImportError:
    txdbus = None
    log.warning(
        "Missing module txdbus, please download/install it, auto D-Bus discovery will be "
        "disabled"
    )


NS_MEDIA_PLAYER = "org.libervia.mediaplayer"
FD_NAME = "org.freedesktop.DBus"
FD_PATH = "/org/freedekstop/DBus"
INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable"
MPRIS_PREFIX = "org.mpris.MediaPlayer2"
CMD_GO_BACK = "GoBack"
CMD_GO_FWD = "GoFW"
SEEK_OFFSET = 5 * 1000 * 1000
MPRIS_COMMANDS = [
    "org.mpris.MediaPlayer2.Player." + cmd
    for cmd in ("Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")
]
MPRIS_PATH = "/org/mpris/MediaPlayer2"
MPRIS_PROPERTIES = OrderedDict(
    (
        ("org.mpris.MediaPlayer2", ("Identity",)),
        (
            "org.mpris.MediaPlayer2.Player",
            (
                "Metadata",
                "PlaybackStatus",
                "Volume",
            ),
        ),
    )
)
MPRIS_METADATA_KEY = "Metadata"
MPRIS_METADATA_MAP = OrderedDict((("xesam:title", "Title"),))

INTROSPECT_METHOD = "Introspect"
IGNORED_IFACES_START = (
    "org.freedesktop",
    "org.qtproject",
    "org.kde.KMainWindow",
)  # commands in interface starting with these values will be ignored
FLAG_LOOP = "LOOP"

PLUGIN_INFO = {
    C.PI_NAME: "Ad-Hoc Commands - D-Bus",
    C.PI_IMPORT_NAME: "AD_HOC_DBUS",
    C.PI_TYPE: "Misc",
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0050"],
    C.PI_MAIN: "AdHocDBus",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands"""),
}


class AdHocDBus:

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        if etree is not None:
            host.bridge.add_method(
                "ad_hoc_dbus_add_auto",
                ".plugin",
                in_sign="sasasasasasass",
                out_sign="(sa(sss))",
                method=self._ad_hoc_dbus_add_auto,
                async_=True,
            )
        host.bridge.add_method(
            "ad_hoc_remotes_get",
            ".plugin",
            in_sign="s",
            out_sign="a(sss)",
            method=self._ad_hoc_remotes_get,
            async_=True,
        )
        self._c = host.plugins["XEP-0050"]
        host.register_namespace("mediaplayer", NS_MEDIA_PLAYER)
        self.session_con = None

    async def profile_connected(self, client):
        if txdbus is not None:
            if self.session_con is None:
                self.session_con = await dbus_client.connect(reactor, "session")
                self.fd_object = await self.session_con.getRemoteObject(FD_NAME, FD_PATH)

            self._c.add_ad_hoc_command(
                client,
                self.local_media_cb,
                D_("Media Players"),
                node=NS_MEDIA_PLAYER,
                timeout=60 * 60 * 6,  # 6 hours timeout, to avoid breaking remote
                # in the middle of a movie
            )

    async def _dbus_async_call(self, proxy, method, *args, **kwargs):
        """Call a DBus method asynchronously and return a deferred

        @param proxy: DBus object proxy, as returner by get_object
        @param method: name of the method to call
        @param args: will be transmitted to the method
        @param kwargs: will be transmetted to the method, except for the following poped
                       values:
                       - interface: name of the interface to use
        @return: a deferred

        """
        return await proxy.callRemote(method, *args, **kwargs)

    async def _dbus_get_property(self, proxy, interface, name):
        return await self._dbus_async_call(
            proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties"
        )

    async def _dbus_list_names(self):
        return await self.fd_object.callRemote("ListNames")

    async def _dbus_introspect(self, proxy):
        return await self._dbus_async_call(
            proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE
        )

    def _accept_method(self, method):
        """Return True if we accept the method for a command
        @param method: etree.Element
        @return: True if the method is acceptable

        """
        if method.xpath(
            "arg[@direction='in']"
        ):  # we don't accept method with argument for the moment
            return False
        return True

    async def _introspect(self, methods, bus_name, proxy):
        log.debug("introspecting path [%s]" % proxy.object_path)
        assert etree is not None
        introspect_xml = await self._dbus_introspect(proxy)
        el = etree.fromstring(introspect_xml)
        for node in el.iterchildren("node", "interface"):
            if node.tag == "node":
                new_path = os.path.join(proxy.object_path, node.get("name"))
                new_proxy = await self.session_con.getRemoteObject(bus_name, new_path)
                await self._introspect(methods, bus_name, new_proxy)
            elif node.tag == "interface":
                name = node.get("name")
                if any(name.startswith(ignored) for ignored in IGNORED_IFACES_START):
                    log.debug("interface [%s] is ignored" % name)
                    continue
                log.debug("introspecting interface [%s]" % name)
                for method in node.iterchildren("method"):
                    if self._accept_method(method):
                        method_name = method.get("name")
                        log.debug("method accepted: [%s]" % method_name)
                        methods.add((proxy.object_path, name, method_name))

    def _ad_hoc_dbus_add_auto(
        self,
        prog_name,
        allowed_jids,
        allowed_groups,
        allowed_magics,
        forbidden_jids,
        forbidden_groups,
        flags,
        profile_key,
    ):
        client = self.host.get_client(profile_key)
        return self.ad_hoc_dbus_add_auto(
            client,
            prog_name,
            allowed_jids,
            allowed_groups,
            allowed_magics,
            forbidden_jids,
            forbidden_groups,
            flags,
        )

    async def ad_hoc_dbus_add_auto(
        self,
        client,
        prog_name,
        allowed_jids=None,
        allowed_groups=None,
        allowed_magics=None,
        forbidden_jids=None,
        forbidden_groups=None,
        flags=None,
    ):
        bus_names = await self._dbus_list_names()
        bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name]
        if not bus_names:
            log.info("Can't find any bus for [%s]" % prog_name)
            return ("", [])
        bus_names.sort()
        for bus_name in bus_names:
            if bus_name.endswith(prog_name):
                break
        else:
            log.info(f"Can't find any command for {prog_name}")
            return ("", [])
        log.info(f"bus name found: {bus_name}")
        proxy = await self.session_con.getRemoteObject(bus_name, "/")
        methods = set()

        await self._introspect(methods, bus_name, proxy)

        if methods:
            self._add_command(
                client,
                prog_name,
                bus_name,
                methods,
                allowed_jids=allowed_jids,
                allowed_groups=allowed_groups,
                allowed_magics=allowed_magics,
                forbidden_jids=forbidden_jids,
                forbidden_groups=forbidden_groups,
                flags=flags,
            )

        return (str(bus_name), methods)

    def _add_command(
        self,
        client,
        adhoc_name,
        bus_name,
        methods,
        allowed_jids=None,
        allowed_groups=None,
        allowed_magics=None,
        forbidden_jids=None,
        forbidden_groups=None,
        flags=None,
    ):
        if flags is None:
            flags = set()

        async def d_bus_callback(client, command_elt, session_data, action, node):
            actions = session_data.setdefault("actions", [])
            names_map = session_data.setdefault("names_map", {})
            actions.append(action)

            if len(actions) == 1:
                # it's our first request, we ask the desired new status
                status = self._c.STATUS.EXECUTING
                form = data_form.Form("form", title=_("Command selection"))
                options = []
                for path, iface, command in methods:
                    label = command.rsplit(".", 1)[-1]
                    name = str(uuid.uuid4())
                    names_map[name] = (path, iface, command)
                    options.append(data_form.Option(name, label))

                field = data_form.Field(
                    "list-single", "command", options=options, required=True
                )
                form.addField(field)

                payload = form.toElement()
                note = None

            elif len(actions) == 2:
                # we should have the answer here
                try:
                    x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                    answer_form = data_form.Form.fromElement(x_elt)
                    command = answer_form["command"]
                except (KeyError, StopIteration):
                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)

                if command not in names_map:
                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)

                path, iface, command = names_map[command]
                proxy = await self.session_con.getRemoteObject(bus_name, path)

                await self._dbus_async_call(proxy, command, interface=iface)

                # job done, we can end the session, except if we have FLAG_LOOP
                if FLAG_LOOP in flags:
                    # We have a loop, so we clear everything and we execute again the
                    # command as we had a first call (command_elt is not used, so None
                    # is OK)
                    del actions[:]
                    names_map.clear()
                    return await d_bus_callback(
                        client, None, session_data, self._c.ACTION.EXECUTE, node
                    )
                form = data_form.Form("form", title=_("Updated"))
                form.addField(data_form.Field("fixed", "Command sent"))
                status = self._c.STATUS.COMPLETED
                payload = None
                note = (self._c.NOTE.INFO, _("Command sent"))
            else:
                raise self._c.AdHocError(self._c.ERROR.INTERNAL)

            return (payload, status, None, note)

        self._c.add_ad_hoc_command(
            client,
            d_bus_callback,
            adhoc_name,
            allowed_jids=allowed_jids,
            allowed_groups=allowed_groups,
            allowed_magics=allowed_magics,
            forbidden_jids=forbidden_jids,
            forbidden_groups=forbidden_groups,
        )

    ## Local media ##

    def _ad_hoc_remotes_get(self, profile):
        return self.ad_hoc_remotes_get(self.host.get_client(profile))

    async def ad_hoc_remotes_get(self, client):
        """Retrieve available remote media controlers in our devices
        @return (list[tuple[unicode, unicode, unicode]]): list of devices with:
            - entity full jid
            - device name
            - device label
        """
        found_data = await defer.ensureDeferred(
            self.host.find_by_features(
                client,
                [self.host.ns_map["commands"]],
                service=False,
                roster=False,
                own_jid=True,
                local_device=True,
            )
        )

        remotes = []

        for found in found_data:
            for device_jid_s in found:
                device_jid = jid.JID(device_jid_s)
                cmd_list = await self._c.list_commands(client, device_jid)
                for cmd in cmd_list:
                    if cmd.nodeIdentifier == NS_MEDIA_PLAYER:
                        try:
                            result_elt = await self._c.do(
                                client, device_jid, NS_MEDIA_PLAYER, timeout=5
                            )
                            command_elt = self._c.get_command_elt(result_elt)
                            form = data_form.findForm(command_elt, NS_MEDIA_PLAYER)
                            if form is None:
                                continue
                            mp_options = form.fields["media_player"].options
                            session_id = command_elt.getAttribute("sessionid")
                            if mp_options and session_id:
                                # we just want to discover player, so we cancel the
                                # session
                                self._c.do(
                                    client,
                                    device_jid,
                                    NS_MEDIA_PLAYER,
                                    action=self._c.ACTION.CANCEL,
                                    session_id=session_id,
                                )

                            for opt in mp_options:
                                remotes.append(
                                    (device_jid_s, opt.value, opt.label or opt.value)
                                )
                        except Exception as e:
                            log.warning(
                                _(
                                    "Can't retrieve remote controllers on {device_jid}: "
                                    "{reason}".format(device_jid=device_jid, reason=e)
                                )
                            )
                        break
        return remotes

    async def do_mpris_command(self, proxy, command):
        iface, command = command.rsplit(".", 1)
        if command == CMD_GO_BACK:
            command = "Seek"
            args = [-SEEK_OFFSET]
        elif command == CMD_GO_FWD:
            command = "Seek"
            args = [SEEK_OFFSET]
        else:
            args = []
        return await self._dbus_async_call(proxy, command, *args, interface=iface)

    def add_mpris_metadata(self, form, metadata):
        """Serialise MRPIS Metadata according to MPRIS_METADATA_MAP"""
        for mpris_key, name in MPRIS_METADATA_MAP.items():
            if mpris_key in metadata:
                value = str(metadata[mpris_key])
                form.addField(data_form.Field(fieldType="fixed", var=name, value=value))

    async def local_media_cb(self, client, command_elt, session_data, action, node):
        assert txdbus is not None
        try:
            x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
            command_form = data_form.Form.fromElement(x_elt)
        except StopIteration:
            command_form = None

        if command_form is None or len(command_form.fields) == 0:
            # root request, we looks for media players
            bus_names = await self._dbus_list_names()
            bus_names = [b for b in bus_names if b.startswith(MPRIS_PREFIX)]
            if len(bus_names) == 0:
                note = (self._c.NOTE.INFO, D_("No media player found."))
                return (None, self._c.STATUS.COMPLETED, None, note)
            options = []
            status = self._c.STATUS.EXECUTING
            form = data_form.Form(
                "form", title=D_("Media Player Selection"), formNamespace=NS_MEDIA_PLAYER
            )
            for bus in bus_names:
                player_name = bus[len(MPRIS_PREFIX) + 1 :]
                if not player_name:
                    log.warning(_("Ignoring MPRIS bus without suffix"))
                    continue
                options.append(data_form.Option(bus, player_name))
            field = data_form.Field(
                "list-single", "media_player", options=options, required=True
            )
            form.addField(field)
            payload = form.toElement()
            return (payload, status, None, None)
        else:
            # player request
            try:
                bus_name = command_form["media_player"]
            except KeyError:
                raise ValueError(_("missing media_player value"))

            if not bus_name.startswith(MPRIS_PREFIX):
                log.warning(
                    _(
                        "Media player ad-hoc command trying to use non MPRIS bus. "
                        "Hack attempt? Refused bus: {bus_name}"
                    ).format(bus_name=bus_name)
                )
                note = (self._c.NOTE.ERROR, D_("Invalid player name."))
                return (None, self._c.STATUS.COMPLETED, None, note)

            try:
                proxy = await self.session_con.getRemoteObject(
                    bus_name, MPRIS_PATH, "org.mpris.MediaPlayer2.Player"
                )
            except Exception as e:
                log.warning(_("Can't get D-Bus proxy: {reason}").format(reason=e))
                note = (self._c.NOTE.ERROR, D_("Media player is not available anymore"))
                return (None, self._c.STATUS.COMPLETED, None, note)
            try:
                command = command_form["command"]
            except KeyError:
                pass
            else:
                await self.do_mpris_command(proxy, command)

            # we construct the remote control form
            form = data_form.Form("form", title=D_("Media Player Selection"))
            form.addField(
                data_form.Field(fieldType="hidden", var="media_player", value=bus_name)
            )
            for iface, properties_names in MPRIS_PROPERTIES.items():
                for name in properties_names:
                    try:
                        value = await self._dbus_get_property(proxy, iface, name)
                    except Exception as e:
                        log.warning(
                            _("Can't retrieve attribute {name}: {reason}").format(
                                name=name, reason=e
                            )
                        )
                        continue
                    if name == MPRIS_METADATA_KEY:
                        self.add_mpris_metadata(form, value)
                    else:
                        form.addField(
                            data_form.Field(fieldType="fixed", var=name, value=str(value))
                        )

            commands = [data_form.Option(c, c.rsplit(".", 1)[1]) for c in MPRIS_COMMANDS]
            form.addField(
                data_form.Field(
                    fieldType="list-single",
                    var="command",
                    options=commands,
                    required=True,
                )
            )

            payload = form.toElement()
            status = self._c.STATUS.EXECUTING
            return (payload, status, None, None)