diff libervia/backend/plugins/plugin_adhoc_dbus.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 50c919dfe61b
children
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_adhoc_dbus.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_adhoc_dbus.py	Wed Jun 19 18:44:57 2024 +0200
@@ -37,8 +37,10 @@
     from lxml import etree
 except ImportError:
     etree = None
-    log.warning("Missing module lxml, please download/install it from http://lxml.de/ ."
-                "Auto D-Bus discovery will be disabled")
+    log.warning(
+        "Missing module lxml, please download/install it from http://lxml.de/ ."
+        "Auto D-Bus discovery will be disabled"
+    )
 
 try:
     import txdbus
@@ -59,23 +61,26 @@
 CMD_GO_BACK = "GoBack"
 CMD_GO_FWD = "GoFW"
 SEEK_OFFSET = 5 * 1000 * 1000
-MPRIS_COMMANDS = ["org.mpris.MediaPlayer2.Player." + cmd for cmd in (
-    "Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")]
+MPRIS_COMMANDS = [
+    "org.mpris.MediaPlayer2.Player." + cmd
+    for cmd in ("Previous", CMD_GO_BACK, "PlayPause", CMD_GO_FWD, "Next")
+]
 MPRIS_PATH = "/org/mpris/MediaPlayer2"
-MPRIS_PROPERTIES = OrderedDict((
-    ("org.mpris.MediaPlayer2", (
-        "Identity",
-        )),
-    ("org.mpris.MediaPlayer2.Player", (
-        "Metadata",
-        "PlaybackStatus",
-        "Volume",
-        )),
-    ))
+MPRIS_PROPERTIES = OrderedDict(
+    (
+        ("org.mpris.MediaPlayer2", ("Identity",)),
+        (
+            "org.mpris.MediaPlayer2.Player",
+            (
+                "Metadata",
+                "PlaybackStatus",
+                "Volume",
+            ),
+        ),
+    )
+)
 MPRIS_METADATA_KEY = "Metadata"
-MPRIS_METADATA_MAP = OrderedDict((
-    ("xesam:title", "Title"),
-    ))
+MPRIS_METADATA_MAP = OrderedDict((("xesam:title", "Title"),))
 
 INTROSPECT_METHOD = "Introspect"
 IGNORED_IFACES_START = (
@@ -126,18 +131,20 @@
     async def profile_connected(self, client):
         if txdbus is not None:
             if self.session_con is None:
-                self.session_con = await dbus_client.connect(reactor, 'session')
+                self.session_con = await dbus_client.connect(reactor, "session")
                 self.fd_object = await self.session_con.getRemoteObject(FD_NAME, FD_PATH)
 
             self._c.add_ad_hoc_command(
-                client, self.local_media_cb, D_("Media Players"),
+                client,
+                self.local_media_cb,
+                D_("Media Players"),
                 node=NS_MEDIA_PLAYER,
-                timeout=60*60*6  # 6 hours timeout, to avoid breaking remote
-                                 # in the middle of a movie
+                timeout=60 * 60 * 6,  # 6 hours timeout, to avoid breaking remote
+                # in the middle of a movie
             )
 
     async def _dbus_async_call(self, proxy, method, *args, **kwargs):
-        """ Call a DBus method asynchronously and return a deferred
+        """Call a DBus method asynchronously and return a deferred
 
         @param proxy: DBus object proxy, as returner by get_object
         @param method: name of the method to call
@@ -152,17 +159,19 @@
 
     async def _dbus_get_property(self, proxy, interface, name):
         return await self._dbus_async_call(
-            proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties")
-
+            proxy, "Get", interface, name, interface="org.freedesktop.DBus.Properties"
+        )
 
     async def _dbus_list_names(self):
         return await self.fd_object.callRemote("ListNames")
 
     async def _dbus_introspect(self, proxy):
-        return await self._dbus_async_call(proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE)
+        return await self._dbus_async_call(
+            proxy, INTROSPECT_METHOD, interface=INTROSPECT_IFACE
+        )
 
     def _accept_method(self, method):
-        """ Return True if we accept the method for a command
+        """Return True if we accept the method for a command
         @param method: etree.Element
         @return: True if the method is acceptable
 
@@ -181,9 +190,7 @@
         for node in el.iterchildren("node", "interface"):
             if node.tag == "node":
                 new_path = os.path.join(proxy.object_path, node.get("name"))
-                new_proxy = await self.session_con.getRemoteObject(
-                    bus_name, new_path
-                )
+                new_proxy = await self.session_con.getRemoteObject(bus_name, new_path)
                 await self._introspect(methods, bus_name, new_proxy)
             elif node.tag == "interface":
                 name = node.get("name")
@@ -197,16 +204,40 @@
                         log.debug("method accepted: [%s]" % method_name)
                         methods.add((proxy.object_path, name, method_name))
 
-    def _ad_hoc_dbus_add_auto(self, prog_name, allowed_jids, allowed_groups, allowed_magics,
-                          forbidden_jids, forbidden_groups, flags, profile_key):
+    def _ad_hoc_dbus_add_auto(
+        self,
+        prog_name,
+        allowed_jids,
+        allowed_groups,
+        allowed_magics,
+        forbidden_jids,
+        forbidden_groups,
+        flags,
+        profile_key,
+    ):
         client = self.host.get_client(profile_key)
         return self.ad_hoc_dbus_add_auto(
-            client, prog_name, allowed_jids, allowed_groups, allowed_magics,
-            forbidden_jids, forbidden_groups, flags)
+            client,
+            prog_name,
+            allowed_jids,
+            allowed_groups,
+            allowed_magics,
+            forbidden_jids,
+            forbidden_groups,
+            flags,
+        )
 
-    async def ad_hoc_dbus_add_auto(self, client, prog_name, allowed_jids=None, allowed_groups=None,
-                         allowed_magics=None, forbidden_jids=None, forbidden_groups=None,
-                         flags=None):
+    async def ad_hoc_dbus_add_auto(
+        self,
+        client,
+        prog_name,
+        allowed_jids=None,
+        allowed_groups=None,
+        allowed_magics=None,
+        forbidden_jids=None,
+        forbidden_groups=None,
+        flags=None,
+    ):
         bus_names = await self._dbus_list_names()
         bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name]
         if not bus_names:
@@ -241,9 +272,19 @@
 
         return (str(bus_name), methods)
 
-    def _add_command(self, client, adhoc_name, bus_name, methods, allowed_jids=None,
-                    allowed_groups=None, allowed_magics=None, forbidden_jids=None,
-                    forbidden_groups=None, flags=None):
+    def _add_command(
+        self,
+        client,
+        adhoc_name,
+        bus_name,
+        methods,
+        allowed_jids=None,
+        allowed_groups=None,
+        allowed_magics=None,
+        forbidden_jids=None,
+        forbidden_groups=None,
+        flags=None,
+    ):
         if flags is None:
             flags = set()
 
@@ -331,9 +372,16 @@
             - device name
             - device label
         """
-        found_data = await defer.ensureDeferred(self.host.find_by_features(
-            client, [self.host.ns_map['commands']], service=False, roster=False,
-            own_jid=True, local_device=True))
+        found_data = await defer.ensureDeferred(
+            self.host.find_by_features(
+                client,
+                [self.host.ns_map["commands"]],
+                service=False,
+                roster=False,
+                own_jid=True,
+                local_device=True,
+            )
+        )
 
         remotes = []
 
@@ -344,39 +392,47 @@
                 for cmd in cmd_list:
                     if cmd.nodeIdentifier == NS_MEDIA_PLAYER:
                         try:
-                            result_elt = await self._c.do(client, device_jid,
-                                                          NS_MEDIA_PLAYER, timeout=5)
+                            result_elt = await self._c.do(
+                                client, device_jid, NS_MEDIA_PLAYER, timeout=5
+                            )
                             command_elt = self._c.get_command_elt(result_elt)
                             form = data_form.findForm(command_elt, NS_MEDIA_PLAYER)
                             if form is None:
                                 continue
-                            mp_options = form.fields['media_player'].options
-                            session_id = command_elt.getAttribute('sessionid')
+                            mp_options = form.fields["media_player"].options
+                            session_id = command_elt.getAttribute("sessionid")
                             if mp_options and session_id:
                                 # we just want to discover player, so we cancel the
                                 # session
-                                self._c.do(client, device_jid, NS_MEDIA_PLAYER,
-                                           action=self._c.ACTION.CANCEL,
-                                           session_id=session_id)
+                                self._c.do(
+                                    client,
+                                    device_jid,
+                                    NS_MEDIA_PLAYER,
+                                    action=self._c.ACTION.CANCEL,
+                                    session_id=session_id,
+                                )
 
                             for opt in mp_options:
-                                remotes.append((device_jid_s,
-                                                opt.value,
-                                                opt.label or opt.value))
+                                remotes.append(
+                                    (device_jid_s, opt.value, opt.label or opt.value)
+                                )
                         except Exception as e:
-                            log.warning(_(
-                                "Can't retrieve remote controllers on {device_jid}: "
-                                "{reason}".format(device_jid=device_jid, reason=e)))
+                            log.warning(
+                                _(
+                                    "Can't retrieve remote controllers on {device_jid}: "
+                                    "{reason}".format(device_jid=device_jid, reason=e)
+                                )
+                            )
                         break
         return remotes
 
     async def do_mpris_command(self, proxy, command):
         iface, command = command.rsplit(".", 1)
         if command == CMD_GO_BACK:
-            command = 'Seek'
+            command = "Seek"
             args = [-SEEK_OFFSET]
         elif command == CMD_GO_FWD:
-            command = 'Seek'
+            command = "Seek"
             args = [SEEK_OFFSET]
         else:
             args = []
@@ -387,9 +443,7 @@
         for mpris_key, name in MPRIS_METADATA_MAP.items():
             if mpris_key in metadata:
                 value = str(metadata[mpris_key])
-                form.addField(data_form.Field(fieldType="fixed",
-                                              var=name,
-                                              value=value))
+                form.addField(data_form.Field(fieldType="fixed", var=name, value=value))
 
     async def local_media_cb(self, client, command_elt, session_data, action, node):
         assert txdbus is not None
@@ -408,10 +462,11 @@
                 return (None, self._c.STATUS.COMPLETED, None, note)
             options = []
             status = self._c.STATUS.EXECUTING
-            form = data_form.Form("form", title=D_("Media Player Selection"),
-                                  formNamespace=NS_MEDIA_PLAYER)
+            form = data_form.Form(
+                "form", title=D_("Media Player Selection"), formNamespace=NS_MEDIA_PLAYER
+            )
             for bus in bus_names:
-                player_name = bus[len(MPRIS_PREFIX)+1:]
+                player_name = bus[len(MPRIS_PREFIX) + 1 :]
                 if not player_name:
                     log.warning(_("Ignoring MPRIS bus without suffix"))
                     continue
@@ -430,9 +485,12 @@
                 raise ValueError(_("missing media_player value"))
 
             if not bus_name.startswith(MPRIS_PREFIX):
-                log.warning(_("Media player ad-hoc command trying to use non MPRIS bus. "
-                              "Hack attempt? Refused bus: {bus_name}").format(
-                              bus_name=bus_name))
+                log.warning(
+                    _(
+                        "Media player ad-hoc command trying to use non MPRIS bus. "
+                        "Hack attempt? Refused bus: {bus_name}"
+                    ).format(bus_name=bus_name)
+                )
                 note = (self._c.NOTE.ERROR, D_("Invalid player name."))
                 return (None, self._c.STATUS.COMPLETED, None, note)
 
@@ -453,29 +511,36 @@
 
             # we construct the remote control form
             form = data_form.Form("form", title=D_("Media Player Selection"))
-            form.addField(data_form.Field(fieldType="hidden",
-                                          var="media_player",
-                                          value=bus_name))
+            form.addField(
+                data_form.Field(fieldType="hidden", var="media_player", value=bus_name)
+            )
             for iface, properties_names in MPRIS_PROPERTIES.items():
                 for name in properties_names:
                     try:
                         value = await self._dbus_get_property(proxy, iface, name)
                     except Exception as e:
-                        log.warning(_("Can't retrieve attribute {name}: {reason}")
-                                    .format(name=name, reason=e))
+                        log.warning(
+                            _("Can't retrieve attribute {name}: {reason}").format(
+                                name=name, reason=e
+                            )
+                        )
                         continue
                     if name == MPRIS_METADATA_KEY:
                         self.add_mpris_metadata(form, value)
                     else:
-                        form.addField(data_form.Field(fieldType="fixed",
-                                                      var=name,
-                                                      value=str(value)))
+                        form.addField(
+                            data_form.Field(fieldType="fixed", var=name, value=str(value))
+                        )
 
             commands = [data_form.Option(c, c.rsplit(".", 1)[1]) for c in MPRIS_COMMANDS]
-            form.addField(data_form.Field(fieldType="list-single",
-                                          var="command",
-                                          options=commands,
-                                          required=True))
+            form.addField(
+                data_form.Field(
+                    fieldType="list-single",
+                    var="command",
+                    options=commands,
+                    required=True,
+                )
+            )
 
             payload = form.toElement()
             status = self._c.STATUS.EXECUTING