diff libervia/backend/plugins/plugin_adhoc_dbus.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_adhoc_dbus.py@524856bd7b19
children 319a0e47dc8b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_adhoc_dbus.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,478 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for adding D-Bus to Ad-Hoc Commands
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from libervia.backend.core.i18n import D_, _
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from wokkel import data_form
+
+try:
+    from lxml import etree
+except ImportError:
+    etree = None
+    log.warning("Missing module lxml, please download/install it from http://lxml.de/ ."
+                "Auto D-Bus discovery will be disabled")
+from collections import OrderedDict
+import os.path
+import uuid
+try:
+    import dbus
+    from dbus.mainloop.glib import DBusGMainLoop
+except ImportError:
+    dbus = None
+    log.warning("Missing module dbus, please download/install it, "
+                "auto D-Bus discovery will be disabled")
+
+else:
+    DBusGMainLoop(set_as_default=True)
+
+NS_MEDIA_PLAYER = "org.libervia.mediaplayer"
+FD_NAME = "org.freedesktop.DBus"
+FD_PATH = "/org/freedekstop/DBus"
+INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable"
+MPRIS_PREFIX = "org.mpris.MediaPlayer2"
+CMD_GO_BACK = "GoBack"
+CMD_GO_FWD = "GoFW"
+SEEK_OFFSET = 5 * 1000 * 1000
+MPRIS_COMMANDS = ["org.mpris.MediaPlayer2.Player." + cmd for cmd in (
+    "Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")]
+MPRIS_PATH = "/org/mpris/MediaPlayer2"
+MPRIS_PROPERTIES = OrderedDict((
+    ("org.mpris.MediaPlayer2", (
+        "Identity",
+        )),
+    ("org.mpris.MediaPlayer2.Player", (
+        "Metadata",
+        "PlaybackStatus",
+        "Volume",
+        )),
+    ))
+MPRIS_METADATA_KEY = "Metadata"
+MPRIS_METADATA_MAP = OrderedDict((
+    ("xesam:title", "Title"),
+    ))
+
+INTROSPECT_METHOD = "Introspect"
+IGNORED_IFACES_START = (
+    "org.freedesktop",
+    "org.qtproject",
+    "org.kde.KMainWindow",
+)  # commands in interface starting with these values will be ignored
+FLAG_LOOP = "LOOP"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Ad-Hoc Commands - D-Bus",
+    C.PI_IMPORT_NAME: "AD_HOC_DBUS",
+    C.PI_TYPE: "Misc",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0050"],
+    C.PI_MAIN: "AdHocDBus",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands"""),
+}
+
+
+class AdHocDBus(object):
+
+    def __init__(self, host):
+        log.info(_("plugin Ad-Hoc D-Bus initialization"))
+        self.host = host
+        if etree is not None:
+            host.bridge.add_method(
+                "ad_hoc_dbus_add_auto",
+                ".plugin",
+                in_sign="sasasasasasass",
+                out_sign="(sa(sss))",
+                method=self._ad_hoc_dbus_add_auto,
+                async_=True,
+            )
+        host.bridge.add_method(
+            "ad_hoc_remotes_get",
+            ".plugin",
+            in_sign="s",
+            out_sign="a(sss)",
+            method=self._ad_hoc_remotes_get,
+            async_=True,
+        )
+        self._c = host.plugins["XEP-0050"]
+        host.register_namespace("mediaplayer", NS_MEDIA_PLAYER)
+        if dbus is not None:
+            self.session_bus = dbus.SessionBus()
+            self.fd_object = self.session_bus.get_object(
+                FD_NAME, FD_PATH, introspect=False)
+
+    def profile_connected(self, client):
+        if dbus is not None:
+            self._c.add_ad_hoc_command(
+                client, self.local_media_cb, D_("Media Players"),
+                node=NS_MEDIA_PLAYER,
+                timeout=60*60*6  # 6 hours timeout, to avoid breaking remote
+                                 # in the middle of a movie
+            )
+
+    def _dbus_async_call(self, proxy, method, *args, **kwargs):
+        """ Call a DBus method asynchronously and return a deferred
+
+        @param proxy: DBus object proxy, as returner by get_object
+        @param method: name of the method to call
+        @param args: will be transmitted to the method
+        @param kwargs: will be transmetted to the method, except for the following poped
+                       values:
+                       - interface: name of the interface to use
+        @return: a deferred
+
+        """
+        d = defer.Deferred()
+        interface = kwargs.pop("interface", None)
+        kwargs["reply_handler"] = lambda ret=None: d.callback(ret)
+        kwargs["error_handler"] = d.errback
+        proxy.get_dbus_method(method, dbus_interface=interface)(*args, **kwargs)
+        return d
+
+    def _dbus_get_property(self, proxy, interface, name):
+        return self._dbus_async_call(
+            proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties")
+
+
+    def _dbus_list_names(self):
+        return self._dbus_async_call(self.fd_object, "ListNames")
+
+    def _dbus_introspect(self, proxy):
+        return self._dbus_async_call(proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE)
+
+    def _accept_method(self, method):
+        """ Return True if we accept the method for a command
+        @param method: etree.Element
+        @return: True if the method is acceptable
+
+        """
+        if method.xpath(
+            "arg[@direction='in']"
+        ):  # we don't accept method with argument for the moment
+            return False
+        return True
+
+    @defer.inlineCallbacks
+    def _introspect(self, methods, bus_name, proxy):
+        log.debug("introspecting path [%s]" % proxy.object_path)
+        introspect_xml = yield self._dbus_introspect(proxy)
+        el = etree.fromstring(introspect_xml)
+        for node in el.iterchildren("node", "interface"):
+            if node.tag == "node":
+                new_path = os.path.join(proxy.object_path, node.get("name"))
+                new_proxy = self.session_bus.get_object(
+                    bus_name, new_path, introspect=False
+                )
+                yield self._introspect(methods, bus_name, new_proxy)
+            elif node.tag == "interface":
+                name = node.get("name")
+                if any(name.startswith(ignored) for ignored in IGNORED_IFACES_START):
+                    log.debug("interface [%s] is ignored" % name)
+                    continue
+                log.debug("introspecting interface [%s]" % name)
+                for method in node.iterchildren("method"):
+                    if self._accept_method(method):
+                        method_name = method.get("name")
+                        log.debug("method accepted: [%s]" % method_name)
+                        methods.add((proxy.object_path, name, method_name))
+
+    def _ad_hoc_dbus_add_auto(self, prog_name, allowed_jids, allowed_groups, allowed_magics,
+                          forbidden_jids, forbidden_groups, flags, profile_key):
+        client = self.host.get_client(profile_key)
+        return self.ad_hoc_dbus_add_auto(
+            client, prog_name, allowed_jids, allowed_groups, allowed_magics,
+            forbidden_jids, forbidden_groups, flags)
+
+    @defer.inlineCallbacks
+    def ad_hoc_dbus_add_auto(self, client, prog_name, allowed_jids=None, allowed_groups=None,
+                         allowed_magics=None, forbidden_jids=None, forbidden_groups=None,
+                         flags=None):
+        bus_names = yield self._dbus_list_names()
+        bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name]
+        if not bus_names:
+            log.info("Can't find any bus for [%s]" % prog_name)
+            defer.returnValue(("", []))
+        bus_names.sort()
+        for bus_name in bus_names:
+            if bus_name.endswith(prog_name):
+                break
+        log.info("bus name found: [%s]" % bus_name)
+        proxy = self.session_bus.get_object(bus_name, "/", introspect=False)
+        methods = set()
+
+        yield self._introspect(methods, bus_name, proxy)
+
+        if methods:
+            self._add_command(
+                client,
+                prog_name,
+                bus_name,
+                methods,
+                allowed_jids=allowed_jids,
+                allowed_groups=allowed_groups,
+                allowed_magics=allowed_magics,
+                forbidden_jids=forbidden_jids,
+                forbidden_groups=forbidden_groups,
+                flags=flags,
+            )
+
+        defer.returnValue((str(bus_name), methods))
+
+    def _add_command(self, client, adhoc_name, bus_name, methods, allowed_jids=None,
+                    allowed_groups=None, allowed_magics=None, forbidden_jids=None,
+                    forbidden_groups=None, flags=None):
+        if flags is None:
+            flags = set()
+
+        def d_bus_callback(client, command_elt, session_data, action, node):
+            actions = session_data.setdefault("actions", [])
+            names_map = session_data.setdefault("names_map", {})
+            actions.append(action)
+
+            if len(actions) == 1:
+                # it's our first request, we ask the desired new status
+                status = self._c.STATUS.EXECUTING
+                form = data_form.Form("form", title=_("Command selection"))
+                options = []
+                for path, iface, command in methods:
+                    label = command.rsplit(".", 1)[-1]
+                    name = str(uuid.uuid4())
+                    names_map[name] = (path, iface, command)
+                    options.append(data_form.Option(name, label))
+
+                field = data_form.Field(
+                    "list-single", "command", options=options, required=True
+                )
+                form.addField(field)
+
+                payload = form.toElement()
+                note = None
+
+            elif len(actions) == 2:
+                # we should have the answer here
+                try:
+                    x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
+                    answer_form = data_form.Form.fromElement(x_elt)
+                    command = answer_form["command"]
+                except (KeyError, StopIteration):
+                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
+
+                if command not in names_map:
+                    raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
+
+                path, iface, command = names_map[command]
+                proxy = self.session_bus.get_object(bus_name, path)
+
+                self._dbus_async_call(proxy, command, interface=iface)
+
+                # job done, we can end the session, except if we have FLAG_LOOP
+                if FLAG_LOOP in flags:
+                    # We have a loop, so we clear everything and we execute again the
+                    # command as we had a first call (command_elt is not used, so None
+                    # is OK)
+                    del actions[:]
+                    names_map.clear()
+                    return d_bus_callback(
+                        client, None, session_data, self._c.ACTION.EXECUTE, node
+                    )
+                form = data_form.Form("form", title=_("Updated"))
+                form.addField(data_form.Field("fixed", "Command sent"))
+                status = self._c.STATUS.COMPLETED
+                payload = None
+                note = (self._c.NOTE.INFO, _("Command sent"))
+            else:
+                raise self._c.AdHocError(self._c.ERROR.INTERNAL)
+
+            return (payload, status, None, note)
+
+        self._c.add_ad_hoc_command(
+            client,
+            d_bus_callback,
+            adhoc_name,
+            allowed_jids=allowed_jids,
+            allowed_groups=allowed_groups,
+            allowed_magics=allowed_magics,
+            forbidden_jids=forbidden_jids,
+            forbidden_groups=forbidden_groups,
+        )
+
+    ## Local media ##
+
+    def _ad_hoc_remotes_get(self, profile):
+        return self.ad_hoc_remotes_get(self.host.get_client(profile))
+
+    @defer.inlineCallbacks
+    def ad_hoc_remotes_get(self, client):
+        """Retrieve available remote media controlers in our devices
+        @return (list[tuple[unicode, unicode, unicode]]): list of devices with:
+            - entity full jid
+            - device name
+            - device label
+        """
+        found_data = yield defer.ensureDeferred(self.host.find_by_features(
+            client, [self.host.ns_map['commands']], service=False, roster=False,
+            own_jid=True, local_device=True))
+
+        remotes = []
+
+        for found in found_data:
+            for device_jid_s in found:
+                device_jid = jid.JID(device_jid_s)
+                cmd_list = yield self._c.list(client, device_jid)
+                for cmd in cmd_list:
+                    if cmd.nodeIdentifier == NS_MEDIA_PLAYER:
+                        try:
+                            result_elt = yield self._c.do(client, device_jid,
+                                                          NS_MEDIA_PLAYER, timeout=5)
+                            command_elt = self._c.get_command_elt(result_elt)
+                            form = data_form.findForm(command_elt, NS_MEDIA_PLAYER)
+                            if form is None:
+                                continue
+                            mp_options = form.fields['media_player'].options
+                            session_id = command_elt.getAttribute('sessionid')
+                            if mp_options and session_id:
+                                # we just want to discover player, so we cancel the
+                                # session
+                                self._c.do(client, device_jid, NS_MEDIA_PLAYER,
+                                           action=self._c.ACTION.CANCEL,
+                                           session_id=session_id)
+
+                            for opt in mp_options:
+                                remotes.append((device_jid_s,
+                                                opt.value,
+                                                opt.label or opt.value))
+                        except Exception as e:
+                            log.warning(_(
+                                "Can't retrieve remote controllers on {device_jid}: "
+                                "{reason}".format(device_jid=device_jid, reason=e)))
+                        break
+        defer.returnValue(remotes)
+
+    def do_mpris_command(self, proxy, command):
+        iface, command = command.rsplit(".", 1)
+        if command == CMD_GO_BACK:
+            command = 'Seek'
+            args = [-SEEK_OFFSET]
+        elif command == CMD_GO_FWD:
+            command = 'Seek'
+            args = [SEEK_OFFSET]
+        else:
+            args = []
+        return self._dbus_async_call(proxy, command, *args, interface=iface)
+
+    def add_mpris_metadata(self, form, metadata):
+        """Serialise MRPIS Metadata according to MPRIS_METADATA_MAP"""
+        for mpris_key, name in MPRIS_METADATA_MAP.items():
+            if mpris_key in metadata:
+                value = str(metadata[mpris_key])
+                form.addField(data_form.Field(fieldType="fixed",
+                                              var=name,
+                                              value=value))
+
+    @defer.inlineCallbacks
+    def local_media_cb(self, client, command_elt, session_data, action, node):
+        try:
+            x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
+            command_form = data_form.Form.fromElement(x_elt)
+        except StopIteration:
+            command_form = None
+
+        if command_form is None or len(command_form.fields) == 0:
+            # root request, we looks for media players
+            bus_names = yield self._dbus_list_names()
+            bus_names = [b for b in bus_names if b.startswith(MPRIS_PREFIX)]
+            if len(bus_names) == 0:
+                note = (self._c.NOTE.INFO, D_("No media player found."))
+                defer.returnValue((None, self._c.STATUS.COMPLETED, None, note))
+            options = []
+            status = self._c.STATUS.EXECUTING
+            form = data_form.Form("form", title=D_("Media Player Selection"),
+                                  formNamespace=NS_MEDIA_PLAYER)
+            for bus in bus_names:
+                player_name = bus[len(MPRIS_PREFIX)+1:]
+                if not player_name:
+                    log.warning(_("Ignoring MPRIS bus without suffix"))
+                    continue
+                options.append(data_form.Option(bus, player_name))
+            field = data_form.Field(
+                "list-single", "media_player", options=options, required=True
+            )
+            form.addField(field)
+            payload = form.toElement()
+            defer.returnValue((payload, status, None, None))
+        else:
+            # player request
+            try:
+                bus_name = command_form["media_player"]
+            except KeyError:
+                raise ValueError(_("missing media_player value"))
+
+            if not bus_name.startswith(MPRIS_PREFIX):
+                log.warning(_("Media player ad-hoc command trying to use non MPRIS bus. "
+                              "Hack attempt? Refused bus: {bus_name}").format(
+                              bus_name=bus_name))
+                note = (self._c.NOTE.ERROR, D_("Invalid player name."))
+                defer.returnValue((None, self._c.STATUS.COMPLETED, None, note))
+
+            try:
+                proxy = self.session_bus.get_object(bus_name, MPRIS_PATH)
+            except dbus.exceptions.DBusException as e:
+                log.warning(_("Can't get D-Bus proxy: {reason}").format(reason=e))
+                note = (self._c.NOTE.ERROR, D_("Media player is not available anymore"))
+                defer.returnValue((None, self._c.STATUS.COMPLETED, None, note))
+            try:
+                command = command_form["command"]
+            except KeyError:
+                pass
+            else:
+                yield self.do_mpris_command(proxy, command)
+
+            # we construct the remote control form
+            form = data_form.Form("form", title=D_("Media Player Selection"))
+            form.addField(data_form.Field(fieldType="hidden",
+                                          var="media_player",
+                                          value=bus_name))
+            for iface, properties_names in MPRIS_PROPERTIES.items():
+                for name in properties_names:
+                    try:
+                        value = yield self._dbus_get_property(proxy, iface, name)
+                    except Exception as e:
+                        log.warning(_("Can't retrieve attribute {name}: {reason}")
+                                    .format(name=name, reason=e))
+                        continue
+                    if name == MPRIS_METADATA_KEY:
+                        self.add_mpris_metadata(form, value)
+                    else:
+                        form.addField(data_form.Field(fieldType="fixed",
+                                                      var=name,
+                                                      value=str(value)))
+
+            commands = [data_form.Option(c, c.rsplit(".", 1)[1]) for c in MPRIS_COMMANDS]
+            form.addField(data_form.Field(fieldType="list-single",
+                                          var="command",
+                                          options=commands,
+                                          required=True))
+
+            payload = form.toElement()
+            status = self._c.STATUS.EXECUTING
+            defer.returnValue((payload, status, None, None))