diff sat/plugins/plugin_adhoc_dbus.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 8dd9db785ac8
line wrap: on
line diff
--- a/sat/plugins/plugin_adhoc_dbus.py	Wed Jun 27 07:51:29 2018 +0200
+++ b/sat/plugins/plugin_adhoc_dbus.py	Wed Jun 27 20:14:46 2018 +0200
@@ -20,18 +20,23 @@
 from sat.core.i18n import _
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
+
 log = getLogger(__name__)
 from sat.core import exceptions
 from twisted.internet import defer
 from wokkel import data_form
+
 try:
     from lxml import etree
 except ImportError:
-    raise exceptions.MissingModule(u"Missing module lxml, please download/install it from http://lxml.de/")
+    raise exceptions.MissingModule(
+        u"Missing module lxml, please download/install it from http://lxml.de/"
+    )
 import os.path
 import uuid
 import dbus
 from dbus.mainloop.glib import DBusGMainLoop
+
 DBusGMainLoop(set_as_default=True)
 
 FD_NAME = "org.freedesktop.DBus"
@@ -39,8 +44,12 @@
 INTROSPECT_IFACE = "org.freedesktop.DBus.Introspectable"
 
 INTROSPECT_METHOD = "Introspect"
-IGNORED_IFACES_START = ('org.freedesktop', 'org.qtproject', 'org.kde.KMainWindow') # commands in interface starting with these values will be ignored
-FLAG_LOOP = 'LOOP'
+IGNORED_IFACES_START = (
+    "org.freedesktop",
+    "org.qtproject",
+    "org.kde.KMainWindow",
+)  # commands in interface starting with these values will be ignored
+FLAG_LOOP = "LOOP"
 
 PLUGIN_INFO = {
     C.PI_NAME: "Ad-Hoc Commands - D-Bus",
@@ -50,21 +59,25 @@
     C.PI_DEPENDENCIES: ["XEP-0050"],
     C.PI_MAIN: "AdHocDBus",
     C.PI_HANDLER: "no",
-    C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands""")
+    C.PI_DESCRIPTION: _("""Add D-Bus management to Ad-Hoc commands"""),
 }
 
 
 class AdHocDBus(object):
-
     def __init__(self, host):
         log.info(_("plugin Ad-Hoc D-Bus initialization"))
         self.host = host
-        host.bridge.addMethod("adHocDBusAddAuto", ".plugin", in_sign='sasasasasasass', out_sign='(sa(sss))',
-                              method=self._adHocDBusAddAuto,
-                              async=True)
+        host.bridge.addMethod(
+            "adHocDBusAddAuto",
+            ".plugin",
+            in_sign="sasasasasasass",
+            out_sign="(sa(sss))",
+            method=self._adHocDBusAddAuto,
+            async=True,
+        )
         self.session_bus = dbus.SessionBus()
         self.fd_object = self.session_bus.get_object(FD_NAME, FD_PATH, introspect=False)
-        self.XEP_0050 = host.plugins['XEP-0050']
+        self.XEP_0050 = host.plugins["XEP-0050"]
 
     def _DBusAsyncCall(self, proxy, method, *args, **kwargs):
         """ Call a DBus method asynchronously and return a deferred
@@ -77,9 +90,9 @@
 
         """
         d = defer.Deferred()
-        interface = kwargs.pop('interface', None)
-        kwargs['reply_handler'] = lambda ret=None: d.callback(ret)
-        kwargs['error_handler'] = d.errback
+        interface = kwargs.pop("interface", None)
+        kwargs["reply_handler"] = lambda ret=None: d.callback(ret)
+        kwargs["error_handler"] = d.errback
         proxy.get_dbus_method(method, dbus_interface=interface)(*args, **kwargs)
         return d
 
@@ -95,7 +108,9 @@
         @return: True if the method is acceptable
 
         """
-        if method.xpath("arg[@direction='in']"): # we don't accept method with argument for the moment
+        if method.xpath(
+            "arg[@direction='in']"
+        ):  # we don't accept method with argument for the moment
             return False
         return True
 
@@ -104,30 +119,61 @@
         log.debug("introspecting path [%s]" % proxy.object_path)
         introspect_xml = yield self._DBusIntrospect(proxy)
         el = etree.fromstring(introspect_xml)
-        for node in el.iterchildren('node', 'interface'):
-            if node.tag == 'node':
-                new_path = os.path.join(proxy.object_path, node.get('name'))
-                new_proxy = self.session_bus.get_object(bus_name, new_path, introspect=False)
+        for node in el.iterchildren("node", "interface"):
+            if node.tag == "node":
+                new_path = os.path.join(proxy.object_path, node.get("name"))
+                new_proxy = self.session_bus.get_object(
+                    bus_name, new_path, introspect=False
+                )
                 yield self._introspect(methods, bus_name, new_proxy)
-            elif node.tag == 'interface':
-                name = node.get('name')
+            elif node.tag == "interface":
+                name = node.get("name")
                 if any(name.startswith(ignored) for ignored in IGNORED_IFACES_START):
-                    log.debug('interface [%s] is ignored' % name)
+                    log.debug("interface [%s] is ignored" % name)
                     continue
                 log.debug("introspecting interface [%s]" % name)
-                for method in node.iterchildren('method'):
+                for method in node.iterchildren("method"):
                     if self._acceptMethod(method):
-                        method_name = method.get('name')
+                        method_name = method.get("name")
                         log.debug("method accepted: [%s]" % method_name)
                         methods.add((proxy.object_path, name, method_name))
 
-    def _adHocDBusAddAuto(self, prog_name, allowed_jids, allowed_groups, allowed_magics, forbidden_jids, forbidden_groups, flags, profile_key):
-        return self.adHocDBusAddAuto(prog_name, allowed_jids, allowed_groups, allowed_magics, forbidden_jids, forbidden_groups, flags, profile_key)
+    def _adHocDBusAddAuto(
+        self,
+        prog_name,
+        allowed_jids,
+        allowed_groups,
+        allowed_magics,
+        forbidden_jids,
+        forbidden_groups,
+        flags,
+        profile_key,
+    ):
+        return self.adHocDBusAddAuto(
+            prog_name,
+            allowed_jids,
+            allowed_groups,
+            allowed_magics,
+            forbidden_jids,
+            forbidden_groups,
+            flags,
+            profile_key,
+        )
 
     @defer.inlineCallbacks
-    def adHocDBusAddAuto(self, prog_name, allowed_jids=None, allowed_groups=None, allowed_magics=None, forbidden_jids=None, forbidden_groups=None, flags=None, profile_key=C.PROF_KEY_NONE):
+    def adHocDBusAddAuto(
+        self,
+        prog_name,
+        allowed_jids=None,
+        allowed_groups=None,
+        allowed_magics=None,
+        forbidden_jids=None,
+        forbidden_groups=None,
+        flags=None,
+        profile_key=C.PROF_KEY_NONE,
+    ):
         bus_names = yield self._DBusListNames()
-        bus_names = [bus_name for bus_name in bus_names if '.' + prog_name in bus_name]
+        bus_names = [bus_name for bus_name in bus_names if "." + prog_name in bus_name]
         if not bus_names:
             log.info("Can't find any bus for [%s]" % prog_name)
             defer.returnValue(("", []))
@@ -136,45 +182,62 @@
             if bus_name.endswith(prog_name):
                 break
         log.info("bus name found: [%s]" % bus_name)
-        proxy = self.session_bus.get_object(bus_name, '/', introspect=False)
+        proxy = self.session_bus.get_object(bus_name, "/", introspect=False)
         methods = set()
 
         yield self._introspect(methods, bus_name, proxy)
 
         if methods:
-            self._addCommand(prog_name, bus_name, methods,
-                             allowed_jids = allowed_jids,
-                             allowed_groups = allowed_groups,
-                             allowed_magics = allowed_magics,
-                             forbidden_jids = forbidden_jids,
-                             forbidden_groups = forbidden_groups,
-                             flags = flags,
-                             profile_key = profile_key)
+            self._addCommand(
+                prog_name,
+                bus_name,
+                methods,
+                allowed_jids=allowed_jids,
+                allowed_groups=allowed_groups,
+                allowed_magics=allowed_magics,
+                forbidden_jids=forbidden_jids,
+                forbidden_groups=forbidden_groups,
+                flags=flags,
+                profile_key=profile_key,
+            )
 
         defer.returnValue((bus_name, methods))
 
-
-    def _addCommand(self, adhoc_name, bus_name, methods, allowed_jids=None, allowed_groups=None, allowed_magics=None, forbidden_jids=None, forbidden_groups=None, flags=None, profile_key=C.PROF_KEY_NONE):
+    def _addCommand(
+        self,
+        adhoc_name,
+        bus_name,
+        methods,
+        allowed_jids=None,
+        allowed_groups=None,
+        allowed_magics=None,
+        forbidden_jids=None,
+        forbidden_groups=None,
+        flags=None,
+        profile_key=C.PROF_KEY_NONE,
+    ):
         if flags is None:
             flags = set()
 
         def DBusCallback(command_elt, session_data, action, node, profile):
-            actions = session_data.setdefault('actions',[])
-            names_map = session_data.setdefault('names_map', {})
+            actions = session_data.setdefault("actions", [])
+            names_map = session_data.setdefault("names_map", {})
             actions.append(action)
 
             if len(actions) == 1:
                 # it's our first request, we ask the desired new status
                 status = self.XEP_0050.STATUS.EXECUTING
-                form = data_form.Form('form', title=_('Command selection'))
+                form = data_form.Form("form", title=_("Command selection"))
                 options = []
                 for path, iface, command in methods:
-                    label = command.rsplit('.',1)[-1]
+                    label = command.rsplit(".", 1)[-1]
                     name = str(uuid.uuid4())
                     names_map[name] = (path, iface, command)
                     options.append(data_form.Option(name, label))
 
-                field = data_form.Field('list-single', 'command', options=options, required=True)
+                field = data_form.Field(
+                    "list-single", "command", options=options, required=True
+                )
                 form.addField(field)
 
                 payload = form.toElement()
@@ -183,9 +246,9 @@
             elif len(actions) == 2:
                 # we should have the answer here
                 try:
-                    x_elt = command_elt.elements(data_form.NS_X_DATA,'x').next()
+                    x_elt = command_elt.elements(data_form.NS_X_DATA, "x").next()
                     answer_form = data_form.Form.fromElement(x_elt)
-                    command = answer_form['command']
+                    command = answer_form["command"]
                 except (KeyError, StopIteration):
                     raise self.XEP_0050.AdHocError(self.XEP_0050.ERROR.BAD_PAYLOAD)
 
@@ -202,9 +265,11 @@
                     # We have a loop, so we clear everything and we execute again the command as we had a first call (command_elt is not used, so None is OK)
                     del actions[:]
                     names_map.clear()
-                    return DBusCallback(None, session_data, self.XEP_0050.ACTION.EXECUTE, node, profile)
-                form = data_form.Form('form', title=_(u'Updated'))
-                form.addField(data_form.Field('fixed', u'Command sent'))
+                    return DBusCallback(
+                        None, session_data, self.XEP_0050.ACTION.EXECUTE, node, profile
+                    )
+                form = data_form.Form("form", title=_(u"Updated"))
+                form.addField(data_form.Field("fixed", u"Command sent"))
                 status = self.XEP_0050.STATUS.COMPLETED
                 payload = None
                 note = (self.XEP_0050.NOTE.INFO, _(u"Command sent"))
@@ -213,10 +278,13 @@
 
             return (payload, status, None, note)
 
-        self.XEP_0050.addAdHocCommand(DBusCallback, adhoc_name,
-                                      allowed_jids = allowed_jids,
-                                      allowed_groups = allowed_groups,
-                                      allowed_magics = allowed_magics,
-                                      forbidden_jids = forbidden_jids,
-                                      forbidden_groups = forbidden_groups,
-                                      profile_key = profile_key)
+        self.XEP_0050.addAdHocCommand(
+            DBusCallback,
+            adhoc_name,
+            allowed_jids=allowed_jids,
+            allowed_groups=allowed_groups,
+            allowed_magics=allowed_magics,
+            forbidden_jids=forbidden_jids,
+            forbidden_groups=forbidden_groups,
+            profile_key=profile_key,
+        )