view libervia/backend/plugins/plugin_adhoc_dbus.py @ 4232:0fbe5c605eb6

tests (unit/webrtc,XEP-0176, XEP-0234): Fix tests and add webrtc file transfer tests: fix 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:59:50 +0200
parents 50c919dfe61b
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for adding D-Bus to Ad-Hoc Commands
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import OrderedDict
import os.path
import uuid

from twisted.internet import defer
from twisted.internet import reactor
from twisted.words.protocols.jabber import jid
from wokkel import data_form

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)


try:
    from lxml import etree
except ImportError:
    etree = None
    log.warning("Missing module lxml, please download/install it from http://lxml.de/ ."
                "Auto D-Bus discovery will be disabled")

try:
    import txdbus
    from txdbus import client as dbus_client
except ImportError:
    txdbus = None
    log.warning(
        "Missing module txdbus, please download/install it, auto D-Bus discovery will be "
        "disabled"
    )


NS_MEDIA_PLAYER = "org.libervia.mediaplayer"
FD_NAME = "org.freedesktop.DBus"
FD_PATH = "/org/freedekstop/DBus"
INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable"
MPRIS_PREFIX = "org.mpris.MediaPlayer2"
CMD_GO_BACK = "GoBack"
CMD_GO_FWD = "GoFW"
SEEK_OFFSET = 5 * 1000 * 1000
MPRIS_COMMANDS = ["org.mpris.MediaPlayer2.Player." + cmd for cmd in (
    "Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")]
MPRIS_PATH = "/org/mpris/MediaPlayer2"
MPRIS_PROPERTIES = OrderedDict((
    ("org.mpris.MediaPlayer2", (
        "Identity",
        )),
    ("org.mpris.MediaPlayer2.Player", (
        "Metadata",
        "PlaybackStatus",
        "Volume",
        )),
    ))
MPRIS_METADATA_KEY = "Metadata"
MPRIS_METADATA_MAP = OrderedDict((
    ("xesam:title", "Title"),
    ))

INTROSPECT_METHOD = "Introspect"
IGNORED_IFACES_START = (
    "org.freedesktop",
    "org.qtproject",
    "org.kde.KMainWindow",
)  # commands in interface starting with these values will be ignored
FLAG_LOOP = "LOOP"

PLUGIN_INFO = {
    C.PI_NAME: "Ad-Hoc Commands - D-Bus",
    C.PI_IMPORT_NAME: "AD_HOC_DBUS",
    C.PI_TYPE: "Misc",
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0050"],
    C.PI_MAIN: "AdHocDBus",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands"""),
}


class AdHocDBus:

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        if etree is not None:
            host.bridge.add_method(
                "ad_hoc_dbus_add_auto",
                ".plugin",
                in_sign="sasasasasasass",
                out_sign="(sa(sss))",
                method=self._ad_hoc_dbus_add_auto,
                async_=True,
            )
        host.bridge.add_method(
            "ad_hoc_remotes_get",
            ".plugin",
            in_sign="s",
            out_sign="a(sss)",
            method=self._ad_hoc_remotes_get,
            async_=True,
        )
        self._c = host.plugins["XEP-0050"]
        host.register_namespace("mediaplayer", NS_MEDIA_PLAYER)
        self.session_con = None

    async def profile_connected(self, client):
        if txdbus is not None:
            if self.session_con is None:
                self.session_con = await dbus_client.connect(reactor, 'session')
                self.fd_object = await self.session_con.getRemoteObject(FD_NAME, FD_PATH)

            self._c.add_ad_hoc_command(
                client, self.local_media_cb, D_("Media Players"),
                node=NS_MEDIA_PLAYER,
                timeout=60*60*6  # 6 hours timeout, to avoid breaking remote
                                 # in the middle of a movie
            )

    async def _dbus_async_call(self, proxy, method, *args, **kwargs):
        """ Call a DBus method asynchronously and return a deferred

        @param proxy: DBus object proxy, as returner by get_object
        @param method: name of the method to call
        @param args: will be transmitted to the method
        @param kwargs: will be transmetted to the method, except for the following poped
                       values:
                       - interface: name of the interface to use
        @return: a deferred

        """
        return await proxy.callRemote(method, *args, **kwargs)

    async def _dbus_get_property(self, proxy, interface, name):
        return await self._dbus_async_call(
            proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties")


    async def _dbus_list_names(self):
        return await self.fd_object.callRemote("ListNames")

    async def _dbus_introspect(self, proxy):
        return await self._dbus_async_call(proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE)

    def _accept_method(self, method):
        """ Return True if we accept the method for a command
        @param method: etree.Element
        @return: True if the method is acceptable

        """
        if method.xpath(
            "arg[@direction='in']"
        ):  # we don't accept method with argument for the moment
            return False
        return True

    async def _introspect(self, methods, bus_name, proxy):
        log.debug("introspecting path [%s]" % proxy.object_path)
        assert etree is not None
        introspect_xml = await self._dbus_introspect(proxy)
        el = etree.fromstring(introspect_xml)
        for node in el.iterchildren("node", "interface"):
            if node.tag == "node":
                new_path = os.path.join(proxy.object_path, node.get("name"))
                new_proxy = await self.session_con.getRemoteObject(
                    bus_name, new_path
                )
                await self._introspect(methods, bus_name, new_proxy)
            elif node.tag == "interface":
                name = node.get("name")
                if any(name.startswith(ignored) for ignored in IGNORED_IFACES_START):
                    log.debug("interface [%s] is ignored" % name)
                    continue
                log.debug("introspecting interface [%s]" % name)
                for method in node.iterchildren("method"):
                    if self._accept_method(method):
                        method_name = method.get("name")
                        log.debug("method accepted: [%s]" % method_name)
                        methods.add((proxy.object_path, name, method_name))

    def _ad_hoc_dbus_add_auto(self, prog_name, allowed_jids, allowed_groups, allowed_magics,
                          forbidden_jids, forbidden_groups, flags, profile_key):
        client = self.host.get_client(profile_key)
        return self.ad_hoc_dbus_add_auto(
            client, prog_name, allowed_jids, allowed_groups, allowed_magics,
            forbidden_jids, forbidden_groups, flags)

    async def ad_hoc_dbus_add_auto(self, client, prog_name, allowed_jids=None, allowed_groups=None,
                         allowed_magics=None, forbidden_jids=None, forbidden_groups=None,
                         flags=None):
        bus_names = await self._dbus_list_names()
        bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name]
        if not bus_names:
            log.info("Can't find any bus for [%s]" % prog_name)
            return ("", [])
        bus_names.sort()
        for bus_name in bus_names:
            if bus_name.endswith(prog_name):
                break
        else:
            log.info(f"Can't find any command for {prog_name}")
            return ("", [])
        log.info(f"bus name found: {bus_name}")
        proxy = await self.session_con.getRemoteObject(bus_name, "/")
        methods = set()

        await self._introspect(methods, bus_name, proxy)

        if methods:
            self._add_command(
                client,
                prog_name,
                bus_name,
                methods,
                allowed_jids=allowed_jids,
                allowed_groups=allowed_groups,
                allowed_magics=allowed_magics,
                forbidden_jids=forbidden_jids,
                forbidden_groups=forbidden_groups,
                flags=flags,
            )

        return (str(bus_name), methods)

    def _add_command(self, client, adhoc_name, bus_name, methods, allowed_jids=None,
                    allowed_groups=None, allowed_magics=None, forbidden_jids=None,
                    forbidden_groups=None, flags=None):
        if flags is None:
            flags = set()

        async def d_bus_callback(client, command_elt, session_data, action, node):
            actions = session_data.setdefault("actions", [])
            names_map = session_data.setdefault("names_map", {})
            actions.append(action)

            if len(actions) == 1:
                # it's our first request, we ask the desired new status
                status = self._c.STATUS.EXECUTING
                form = data_form.Form("form", title=_("Command selection"))
                options = []
                for path, iface, command in methods:
                    label = command.rsplit(".", 1)[-1]
                    name = str(uuid.uuid4())
                    names_map[name] = (path, iface, command)
                    options.append(data_form.Option(name, label))

                field = data_form.Field(
                    "list-single", "command", options=options, required=True
                )
                form.addField(field)

                payload = form.toElement()
                note = None

            elif len(actions) == 2:
                # we should have the answer here
                try:
                    x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                    answer_form = data_form.Form.fromElement(x_elt)
                    command = answer_form["command"]
                except (KeyError, StopIteration):
                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)

                if command not in names_map:
                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)

                path, iface, command = names_map[command]
                proxy = await self.session_con.getRemoteObject(bus_name, path)

                await self._dbus_async_call(proxy, command, interface=iface)

                # job done, we can end the session, except if we have FLAG_LOOP
                if FLAG_LOOP in flags:
                    # We have a loop, so we clear everything and we execute again the
                    # command as we had a first call (command_elt is not used, so None
                    # is OK)
                    del actions[:]
                    names_map.clear()
                    return await d_bus_callback(
                        client, None, session_data, self._c.ACTION.EXECUTE, node
                    )
                form = data_form.Form("form", title=_("Updated"))
                form.addField(data_form.Field("fixed", "Command sent"))
                status = self._c.STATUS.COMPLETED
                payload = None
                note = (self._c.NOTE.INFO, _("Command sent"))
            else:
                raise self._c.AdHocError(self._c.ERROR.INTERNAL)

            return (payload, status, None, note)

        self._c.add_ad_hoc_command(
            client,
            d_bus_callback,
            adhoc_name,
            allowed_jids=allowed_jids,
            allowed_groups=allowed_groups,
            allowed_magics=allowed_magics,
            forbidden_jids=forbidden_jids,
            forbidden_groups=forbidden_groups,
        )

    ## Local media ##

    def _ad_hoc_remotes_get(self, profile):
        return self.ad_hoc_remotes_get(self.host.get_client(profile))

    async def ad_hoc_remotes_get(self, client):
        """Retrieve available remote media controlers in our devices
        @return (list[tuple[unicode, unicode, unicode]]): list of devices with:
            - entity full jid
            - device name
            - device label
        """
        found_data = await defer.ensureDeferred(self.host.find_by_features(
            client, [self.host.ns_map['commands']], service=False, roster=False,
            own_jid=True, local_device=True))

        remotes = []

        for found in found_data:
            for device_jid_s in found:
                device_jid = jid.JID(device_jid_s)
                cmd_list = await self._c.list_commands(client, device_jid)
                for cmd in cmd_list:
                    if cmd.nodeIdentifier == NS_MEDIA_PLAYER:
                        try:
                            result_elt = await self._c.do(client, device_jid,
                                                          NS_MEDIA_PLAYER, timeout=5)
                            command_elt = self._c.get_command_elt(result_elt)
                            form = data_form.findForm(command_elt, NS_MEDIA_PLAYER)
                            if form is None:
                                continue
                            mp_options = form.fields['media_player'].options
                            session_id = command_elt.getAttribute('sessionid')
                            if mp_options and session_id:
                                # we just want to discover player, so we cancel the
                                # session
                                self._c.do(client, device_jid, NS_MEDIA_PLAYER,
                                           action=self._c.ACTION.CANCEL,
                                           session_id=session_id)

                            for opt in mp_options:
                                remotes.append((device_jid_s,
                                                opt.value,
                                                opt.label or opt.value))
                        except Exception as e:
                            log.warning(_(
                                "Can't retrieve remote controllers on {device_jid}: "
                                "{reason}".format(device_jid=device_jid, reason=e)))
                        break
        return remotes

    async def do_mpris_command(self, proxy, command):
        iface, command = command.rsplit(".", 1)
        if command == CMD_GO_BACK:
            command = 'Seek'
            args = [-SEEK_OFFSET]
        elif command == CMD_GO_FWD:
            command = 'Seek'
            args = [SEEK_OFFSET]
        else:
            args = []
        return await self._dbus_async_call(proxy, command, *args, interface=iface)

    def add_mpris_metadata(self, form, metadata):
        """Serialise MRPIS Metadata according to MPRIS_METADATA_MAP"""
        for mpris_key, name in MPRIS_METADATA_MAP.items():
            if mpris_key in metadata:
                value = str(metadata[mpris_key])
                form.addField(data_form.Field(fieldType="fixed",
                                              var=name,
                                              value=value))

    async def local_media_cb(self, client, command_elt, session_data, action, node):
        assert txdbus is not None
        try:
            x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
            command_form = data_form.Form.fromElement(x_elt)
        except StopIteration:
            command_form = None

        if command_form is None or len(command_form.fields) == 0:
            # root request, we looks for media players
            bus_names = await self._dbus_list_names()
            bus_names = [b for b in bus_names if b.startswith(MPRIS_PREFIX)]
            if len(bus_names) == 0:
                note = (self._c.NOTE.INFO, D_("No media player found."))
                return (None, self._c.STATUS.COMPLETED, None, note)
            options = []
            status = self._c.STATUS.EXECUTING
            form = data_form.Form("form", title=D_("Media Player Selection"),
                                  formNamespace=NS_MEDIA_PLAYER)
            for bus in bus_names:
                player_name = bus[len(MPRIS_PREFIX)+1:]
                if not player_name:
                    log.warning(_("Ignoring MPRIS bus without suffix"))
                    continue
                options.append(data_form.Option(bus, player_name))
            field = data_form.Field(
                "list-single", "media_player", options=options, required=True
            )
            form.addField(field)
            payload = form.toElement()
            return (payload, status, None, None)
        else:
            # player request
            try:
                bus_name = command_form["media_player"]
            except KeyError:
                raise ValueError(_("missing media_player value"))

            if not bus_name.startswith(MPRIS_PREFIX):
                log.warning(_("Media player ad-hoc command trying to use non MPRIS bus. "
                              "Hack attempt? Refused bus: {bus_name}").format(
                              bus_name=bus_name))
                note = (self._c.NOTE.ERROR, D_("Invalid player name."))
                return (None, self._c.STATUS.COMPLETED, None, note)

            try:
                proxy = await self.session_con.getRemoteObject(
                    bus_name, MPRIS_PATH, "org.mpris.MediaPlayer2.Player"
                )
            except Exception as e:
                log.warning(_("Can't get D-Bus proxy: {reason}").format(reason=e))
                note = (self._c.NOTE.ERROR, D_("Media player is not available anymore"))
                return (None, self._c.STATUS.COMPLETED, None, note)
            try:
                command = command_form["command"]
            except KeyError:
                pass
            else:
                await self.do_mpris_command(proxy, command)

            # we construct the remote control form
            form = data_form.Form("form", title=D_("Media Player Selection"))
            form.addField(data_form.Field(fieldType="hidden",
                                          var="media_player",
                                          value=bus_name))
            for iface, properties_names in MPRIS_PROPERTIES.items():
                for name in properties_names:
                    try:
                        value = await self._dbus_get_property(proxy, iface, name)
                    except Exception as e:
                        log.warning(_("Can't retrieve attribute {name}: {reason}")
                                    .format(name=name, reason=e))
                        continue
                    if name == MPRIS_METADATA_KEY:
                        self.add_mpris_metadata(form, value)
                    else:
                        form.addField(data_form.Field(fieldType="fixed",
                                                      var=name,
                                                      value=str(value)))

            commands = [data_form.Option(c, c.rsplit(".", 1)[1]) for c in MPRIS_COMMANDS]
            form.addField(data_form.Field(fieldType="list-single",
                                          var="command",
                                          options=commands,
                                          required=True))

            payload = form.toElement()
            status = self._c.STATUS.EXECUTING
            return (payload, status, None, None)