Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_adhoc_dbus.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_adhoc_dbus.py@524856bd7b19 |
children | 319a0e47dc8b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_adhoc_dbus.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,478 @@ +#!/usr/bin/env python3 + + +# SAT plugin for adding D-Bus to Ad-Hoc Commands +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger + +log = getLogger(__name__) +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from wokkel import data_form + +try: + from lxml import etree +except ImportError: + etree = None + log.warning("Missing module lxml, please download/install it from http://lxml.de/ ." + "Auto D-Bus discovery will be disabled") +from collections import OrderedDict +import os.path +import uuid +try: + import dbus + from dbus.mainloop.glib import DBusGMainLoop +except ImportError: + dbus = None + log.warning("Missing module dbus, please download/install it, " + "auto D-Bus discovery will be disabled") + +else: + DBusGMainLoop(set_as_default=True) + +NS_MEDIA_PLAYER = "org.libervia.mediaplayer" +FD_NAME = "org.freedesktop.DBus" +FD_PATH = "/org/freedekstop/DBus" +INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable" +MPRIS_PREFIX = "org.mpris.MediaPlayer2" +CMD_GO_BACK = "GoBack" +CMD_GO_FWD = "GoFW" +SEEK_OFFSET = 5 * 1000 * 1000 +MPRIS_COMMANDS = ["org.mpris.MediaPlayer2.Player." + cmd for cmd in ( + "Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")] +MPRIS_PATH = "/org/mpris/MediaPlayer2" +MPRIS_PROPERTIES = OrderedDict(( + ("org.mpris.MediaPlayer2", ( + "Identity", + )), + ("org.mpris.MediaPlayer2.Player", ( + "Metadata", + "PlaybackStatus", + "Volume", + )), + )) +MPRIS_METADATA_KEY = "Metadata" +MPRIS_METADATA_MAP = OrderedDict(( + ("xesam:title", "Title"), + )) + +INTROSPECT_METHOD = "Introspect" +IGNORED_IFACES_START = ( + "org.freedesktop", + "org.qtproject", + "org.kde.KMainWindow", +) # commands in interface starting with these values will be ignored +FLAG_LOOP = "LOOP" + +PLUGIN_INFO = { + C.PI_NAME: "Ad-Hoc Commands - D-Bus", + C.PI_IMPORT_NAME: "AD_HOC_DBUS", + C.PI_TYPE: "Misc", + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0050"], + C.PI_MAIN: "AdHocDBus", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands"""), +} + + +class AdHocDBus(object): + + def __init__(self, host): + log.info(_("plugin Ad-Hoc D-Bus initialization")) + self.host = host + if etree is not None: + host.bridge.add_method( + "ad_hoc_dbus_add_auto", + ".plugin", + in_sign="sasasasasasass", + out_sign="(sa(sss))", + method=self._ad_hoc_dbus_add_auto, + async_=True, + ) + host.bridge.add_method( + "ad_hoc_remotes_get", + ".plugin", + in_sign="s", + out_sign="a(sss)", + method=self._ad_hoc_remotes_get, + async_=True, + ) + self._c = host.plugins["XEP-0050"] + host.register_namespace("mediaplayer", NS_MEDIA_PLAYER) + if dbus is not None: + self.session_bus = dbus.SessionBus() + self.fd_object = self.session_bus.get_object( + FD_NAME, FD_PATH, introspect=False) + + def profile_connected(self, client): + if dbus is not None: + self._c.add_ad_hoc_command( + client, self.local_media_cb, D_("Media Players"), + node=NS_MEDIA_PLAYER, + timeout=60*60*6 # 6 hours timeout, to avoid breaking remote + # in the middle of a movie + ) + + def _dbus_async_call(self, proxy, method, *args, **kwargs): + """ Call a DBus method asynchronously and return a deferred + + @param proxy: DBus object proxy, as returner by get_object + @param method: name of the method to call + @param args: will be transmitted to the method + @param kwargs: will be transmetted to the method, except for the following poped + values: + - interface: name of the interface to use + @return: a deferred + + """ + d = defer.Deferred() + interface = kwargs.pop("interface", None) + kwargs["reply_handler"] = lambda ret=None: d.callback(ret) + kwargs["error_handler"] = d.errback + proxy.get_dbus_method(method, dbus_interface=interface)(*args, **kwargs) + return d + + def _dbus_get_property(self, proxy, interface, name): + return self._dbus_async_call( + proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties") + + + def _dbus_list_names(self): + return self._dbus_async_call(self.fd_object, "ListNames") + + def _dbus_introspect(self, proxy): + return self._dbus_async_call(proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE) + + def _accept_method(self, method): + """ Return True if we accept the method for a command + @param method: etree.Element + @return: True if the method is acceptable + + """ + if method.xpath( + "arg[@direction='in']" + ): # we don't accept method with argument for the moment + return False + return True + + @defer.inlineCallbacks + def _introspect(self, methods, bus_name, proxy): + log.debug("introspecting path [%s]" % proxy.object_path) + introspect_xml = yield self._dbus_introspect(proxy) + el = etree.fromstring(introspect_xml) + for node in el.iterchildren("node", "interface"): + if node.tag == "node": + new_path = os.path.join(proxy.object_path, node.get("name")) + new_proxy = self.session_bus.get_object( + bus_name, new_path, introspect=False + ) + yield self._introspect(methods, bus_name, new_proxy) + elif node.tag == "interface": + name = node.get("name") + if any(name.startswith(ignored) for ignored in IGNORED_IFACES_START): + log.debug("interface [%s] is ignored" % name) + continue + log.debug("introspecting interface [%s]" % name) + for method in node.iterchildren("method"): + if self._accept_method(method): + method_name = method.get("name") + log.debug("method accepted: [%s]" % method_name) + methods.add((proxy.object_path, name, method_name)) + + def _ad_hoc_dbus_add_auto(self, prog_name, allowed_jids, allowed_groups, allowed_magics, + forbidden_jids, forbidden_groups, flags, profile_key): + client = self.host.get_client(profile_key) + return self.ad_hoc_dbus_add_auto( + client, prog_name, allowed_jids, allowed_groups, allowed_magics, + forbidden_jids, forbidden_groups, flags) + + @defer.inlineCallbacks + def ad_hoc_dbus_add_auto(self, client, prog_name, allowed_jids=None, allowed_groups=None, + allowed_magics=None, forbidden_jids=None, forbidden_groups=None, + flags=None): + bus_names = yield self._dbus_list_names() + bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name] + if not bus_names: + log.info("Can't find any bus for [%s]" % prog_name) + defer.returnValue(("", [])) + bus_names.sort() + for bus_name in bus_names: + if bus_name.endswith(prog_name): + break + log.info("bus name found: [%s]" % bus_name) + proxy = self.session_bus.get_object(bus_name, "/", introspect=False) + methods = set() + + yield self._introspect(methods, bus_name, proxy) + + if methods: + self._add_command( + client, + prog_name, + bus_name, + methods, + allowed_jids=allowed_jids, + allowed_groups=allowed_groups, + allowed_magics=allowed_magics, + forbidden_jids=forbidden_jids, + forbidden_groups=forbidden_groups, + flags=flags, + ) + + defer.returnValue((str(bus_name), methods)) + + def _add_command(self, client, adhoc_name, bus_name, methods, allowed_jids=None, + allowed_groups=None, allowed_magics=None, forbidden_jids=None, + forbidden_groups=None, flags=None): + if flags is None: + flags = set() + + def d_bus_callback(client, command_elt, session_data, action, node): + actions = session_data.setdefault("actions", []) + names_map = session_data.setdefault("names_map", {}) + actions.append(action) + + if len(actions) == 1: + # it's our first request, we ask the desired new status + status = self._c.STATUS.EXECUTING + form = data_form.Form("form", title=_("Command selection")) + options = [] + for path, iface, command in methods: + label = command.rsplit(".", 1)[-1] + name = str(uuid.uuid4()) + names_map[name] = (path, iface, command) + options.append(data_form.Option(name, label)) + + field = data_form.Field( + "list-single", "command", options=options, required=True + ) + form.addField(field) + + payload = form.toElement() + note = None + + elif len(actions) == 2: + # we should have the answer here + try: + x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) + answer_form = data_form.Form.fromElement(x_elt) + command = answer_form["command"] + except (KeyError, StopIteration): + raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) + + if command not in names_map: + raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) + + path, iface, command = names_map[command] + proxy = self.session_bus.get_object(bus_name, path) + + self._dbus_async_call(proxy, command, interface=iface) + + # job done, we can end the session, except if we have FLAG_LOOP + if FLAG_LOOP in flags: + # We have a loop, so we clear everything and we execute again the + # command as we had a first call (command_elt is not used, so None + # is OK) + del actions[:] + names_map.clear() + return d_bus_callback( + client, None, session_data, self._c.ACTION.EXECUTE, node + ) + form = data_form.Form("form", title=_("Updated")) + form.addField(data_form.Field("fixed", "Command sent")) + status = self._c.STATUS.COMPLETED + payload = None + note = (self._c.NOTE.INFO, _("Command sent")) + else: + raise self._c.AdHocError(self._c.ERROR.INTERNAL) + + return (payload, status, None, note) + + self._c.add_ad_hoc_command( + client, + d_bus_callback, + adhoc_name, + allowed_jids=allowed_jids, + allowed_groups=allowed_groups, + allowed_magics=allowed_magics, + forbidden_jids=forbidden_jids, + forbidden_groups=forbidden_groups, + ) + + ## Local media ## + + def _ad_hoc_remotes_get(self, profile): + return self.ad_hoc_remotes_get(self.host.get_client(profile)) + + @defer.inlineCallbacks + def ad_hoc_remotes_get(self, client): + """Retrieve available remote media controlers in our devices + @return (list[tuple[unicode, unicode, unicode]]): list of devices with: + - entity full jid + - device name + - device label + """ + found_data = yield defer.ensureDeferred(self.host.find_by_features( + client, [self.host.ns_map['commands']], service=False, roster=False, + own_jid=True, local_device=True)) + + remotes = [] + + for found in found_data: + for device_jid_s in found: + device_jid = jid.JID(device_jid_s) + cmd_list = yield self._c.list(client, device_jid) + for cmd in cmd_list: + if cmd.nodeIdentifier == NS_MEDIA_PLAYER: + try: + result_elt = yield self._c.do(client, device_jid, + NS_MEDIA_PLAYER, timeout=5) + command_elt = self._c.get_command_elt(result_elt) + form = data_form.findForm(command_elt, NS_MEDIA_PLAYER) + if form is None: + continue + mp_options = form.fields['media_player'].options + session_id = command_elt.getAttribute('sessionid') + if mp_options and session_id: + # we just want to discover player, so we cancel the + # session + self._c.do(client, device_jid, NS_MEDIA_PLAYER, + action=self._c.ACTION.CANCEL, + session_id=session_id) + + for opt in mp_options: + remotes.append((device_jid_s, + opt.value, + opt.label or opt.value)) + except Exception as e: + log.warning(_( + "Can't retrieve remote controllers on {device_jid}: " + "{reason}".format(device_jid=device_jid, reason=e))) + break + defer.returnValue(remotes) + + def do_mpris_command(self, proxy, command): + iface, command = command.rsplit(".", 1) + if command == CMD_GO_BACK: + command = 'Seek' + args = [-SEEK_OFFSET] + elif command == CMD_GO_FWD: + command = 'Seek' + args = [SEEK_OFFSET] + else: + args = [] + return self._dbus_async_call(proxy, command, *args, interface=iface) + + def add_mpris_metadata(self, form, metadata): + """Serialise MRPIS Metadata according to MPRIS_METADATA_MAP""" + for mpris_key, name in MPRIS_METADATA_MAP.items(): + if mpris_key in metadata: + value = str(metadata[mpris_key]) + form.addField(data_form.Field(fieldType="fixed", + var=name, + value=value)) + + @defer.inlineCallbacks + def local_media_cb(self, client, command_elt, session_data, action, node): + try: + x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) + command_form = data_form.Form.fromElement(x_elt) + except StopIteration: + command_form = None + + if command_form is None or len(command_form.fields) == 0: + # root request, we looks for media players + bus_names = yield self._dbus_list_names() + bus_names = [b for b in bus_names if b.startswith(MPRIS_PREFIX)] + if len(bus_names) == 0: + note = (self._c.NOTE.INFO, D_("No media player found.")) + defer.returnValue((None, self._c.STATUS.COMPLETED, None, note)) + options = [] + status = self._c.STATUS.EXECUTING + form = data_form.Form("form", title=D_("Media Player Selection"), + formNamespace=NS_MEDIA_PLAYER) + for bus in bus_names: + player_name = bus[len(MPRIS_PREFIX)+1:] + if not player_name: + log.warning(_("Ignoring MPRIS bus without suffix")) + continue + options.append(data_form.Option(bus, player_name)) + field = data_form.Field( + "list-single", "media_player", options=options, required=True + ) + form.addField(field) + payload = form.toElement() + defer.returnValue((payload, status, None, None)) + else: + # player request + try: + bus_name = command_form["media_player"] + except KeyError: + raise ValueError(_("missing media_player value")) + + if not bus_name.startswith(MPRIS_PREFIX): + log.warning(_("Media player ad-hoc command trying to use non MPRIS bus. " + "Hack attempt? Refused bus: {bus_name}").format( + bus_name=bus_name)) + note = (self._c.NOTE.ERROR, D_("Invalid player name.")) + defer.returnValue((None, self._c.STATUS.COMPLETED, None, note)) + + try: + proxy = self.session_bus.get_object(bus_name, MPRIS_PATH) + except dbus.exceptions.DBusException as e: + log.warning(_("Can't get D-Bus proxy: {reason}").format(reason=e)) + note = (self._c.NOTE.ERROR, D_("Media player is not available anymore")) + defer.returnValue((None, self._c.STATUS.COMPLETED, None, note)) + try: + command = command_form["command"] + except KeyError: + pass + else: + yield self.do_mpris_command(proxy, command) + + # we construct the remote control form + form = data_form.Form("form", title=D_("Media Player Selection")) + form.addField(data_form.Field(fieldType="hidden", + var="media_player", + value=bus_name)) + for iface, properties_names in MPRIS_PROPERTIES.items(): + for name in properties_names: + try: + value = yield self._dbus_get_property(proxy, iface, name) + except Exception as e: + log.warning(_("Can't retrieve attribute {name}: {reason}") + .format(name=name, reason=e)) + continue + if name == MPRIS_METADATA_KEY: + self.add_mpris_metadata(form, value) + else: + form.addField(data_form.Field(fieldType="fixed", + var=name, + value=str(value))) + + commands = [data_form.Option(c, c.rsplit(".", 1)[1]) for c in MPRIS_COMMANDS] + form.addField(data_form.Field(fieldType="list-single", + var="command", + options=commands, + required=True)) + + payload = form.toElement() + status = self._c.STATUS.EXECUTING + defer.returnValue((payload, status, None, None))