diff libervia/backend/plugins/plugin_import.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_import.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_import.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,334 @@
+#!/usr/bin/env python3
+
+
+# SàT plugin for generic data import handling
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from libervia.backend.core.i18n import _
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from twisted.internet import defer
+from libervia.backend.core import exceptions
+from twisted.words.protocols.jabber import jid
+from functools import partial
+import collections
+import uuid
+import json
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "import",
+    C.PI_IMPORT_NAME: "IMPORT",
+    C.PI_TYPE: C.PLUG_TYPE_IMPORT,
+    C.PI_DEPENDENCIES: [],
+    C.PI_MAIN: "ImportPlugin",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Generic import plugin, base for specialized importers"""),
+}
+
+Importer = collections.namedtuple("Importer", ("callback", "short_desc", "long_desc"))
+
+
+class ImportPlugin(object):
+    def __init__(self, host):
+        log.info(_("plugin import initialization"))
+        self.host = host
+
+    def initialize(self, import_handler, name):
+        """Initialize a specialized import handler
+
+        @param import_handler(object): specialized import handler instance
+            must have the following methods:
+                - import_item: import a single main item (i.e. prepare data for publishing)
+                - importSubitems: import sub items (i.e. items linked to main item, e.g. comments).
+                    Must return a dict with kwargs for recursive_import if items are to be imported recursively.
+                    At least "items_import_data", "service" and "node" keys must be provided.
+                    if None is returned, no recursion will be done to import subitems, but import can still be done directly by the method.
+                - publish_item: actualy publish an item
+                - item_filters: modify item according to options
+        @param name(unicode): import handler name
+        """
+        assert name == name.lower().strip()
+        log.info(_("initializing {name} import handler").format(name=name))
+        import_handler.name = name
+        import_handler.register = partial(self.register, import_handler)
+        import_handler.unregister = partial(self.unregister, import_handler)
+        import_handler.importers = {}
+
+        def _import(name, location, options, pubsub_service, pubsub_node, profile):
+            return self._do_import(
+                import_handler,
+                name,
+                location,
+                options,
+                pubsub_service,
+                pubsub_node,
+                profile,
+            )
+
+        def _import_list():
+            return self.list_importers(import_handler)
+
+        def _import_desc(name):
+            return self.getDescription(import_handler, name)
+
+        self.host.bridge.add_method(
+            name + "import",
+            ".plugin",
+            in_sign="ssa{ss}sss",
+            out_sign="s",
+            method=_import,
+            async_=True,
+        )
+        self.host.bridge.add_method(
+            name + "ImportList",
+            ".plugin",
+            in_sign="",
+            out_sign="a(ss)",
+            method=_import_list,
+        )
+        self.host.bridge.add_method(
+            name + "ImportDesc",
+            ".plugin",
+            in_sign="s",
+            out_sign="(ss)",
+            method=_import_desc,
+        )
+
+    def get_progress(self, import_handler, progress_id, profile):
+        client = self.host.get_client(profile)
+        return client._import[import_handler.name][progress_id]
+
+    def list_importers(self, import_handler):
+        importers = list(import_handler.importers.keys())
+        importers.sort()
+        return [
+            (name, import_handler.importers[name].short_desc)
+            for name in import_handler.importers
+        ]
+
+    def getDescription(self, import_handler, name):
+        """Return import short and long descriptions
+
+        @param name(unicode): importer name
+        @return (tuple[unicode,unicode]): short and long description
+        """
+        try:
+            importer = import_handler.importers[name]
+        except KeyError:
+            raise exceptions.NotFound(
+                "{handler_name} importer not found [{name}]".format(
+                    handler_name=import_handler.name, name=name
+                )
+            )
+        else:
+            return importer.short_desc, importer.long_desc
+
+    def _do_import(self, import_handler, name, location, options, pubsub_service="",
+                  pubsub_node="", profile=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile)
+        options = {key: str(value) for key, value in options.items()}
+        for option in import_handler.BOOL_OPTIONS:
+            try:
+                options[option] = C.bool(options[option])
+            except KeyError:
+                pass
+        for option in import_handler.JSON_OPTIONS:
+            try:
+                options[option] = json.loads(options[option])
+            except KeyError:
+                pass
+            except ValueError:
+                raise exceptions.DataError(
+                    _("invalid json option: {option}").format(option=option)
+                )
+        pubsub_service = jid.JID(pubsub_service) if pubsub_service else None
+        return self.do_import(
+            client,
+            import_handler,
+            str(name),
+            str(location),
+            options,
+            pubsub_service,
+            pubsub_node or None,
+        )
+
+    @defer.inlineCallbacks
+    def do_import(self, client, import_handler, name, location, options=None,
+                 pubsub_service=None, pubsub_node=None,):
+        """import data
+
+        @param import_handler(object): instance of the import handler
+        @param name(unicode): name of the importer
+        @param location(unicode): location of the data to import
+            can be an url, a file path, or anything which make sense
+            check importer description for more details
+        @param options(dict, None): extra options.
+        @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be
+            imported.
+            None to use profile's server
+        @param pubsub_node(unicode, None): PubSub node to use
+            None to use importer's default node
+        @return (unicode): progress id
+        """
+        if options is None:
+            options = {}
+        else:
+            for opt_name, opt_default in import_handler.OPT_DEFAULTS.items():
+                # we want a filled options dict, with all empty or False values removed
+                try:
+                    value = options[opt_name]
+                except KeyError:
+                    if opt_default:
+                        options[opt_name] = opt_default
+                else:
+                    if not value:
+                        del options[opt_name]
+
+        try:
+            importer = import_handler.importers[name]
+        except KeyError:
+            raise exceptions.NotFound("Importer [{}] not found".format(name))
+        items_import_data, items_count = yield importer.callback(
+            client, location, options
+        )
+        progress_id = str(uuid.uuid4())
+        try:
+            _import = client._import
+        except AttributeError:
+            _import = client._import = {}
+        progress_data = _import.setdefault(import_handler.name, {})
+        progress_data[progress_id] = {"position": "0"}
+        if items_count is not None:
+            progress_data[progress_id]["size"] = str(items_count)
+        metadata = {
+            "name": "{}: {}".format(name, location),
+            "direction": "out",
+            "type": import_handler.name.upper() + "_IMPORT",
+        }
+        self.host.register_progress_cb(
+            progress_id,
+            partial(self.get_progress, import_handler),
+            metadata,
+            profile=client.profile,
+        )
+        self.host.bridge.progress_started(progress_id, metadata, client.profile)
+        session = {  #  session data, can be used by importers
+            "root_service": pubsub_service,
+            "root_node": pubsub_node,
+        }
+        self.recursive_import(
+            client,
+            import_handler,
+            items_import_data,
+            progress_id,
+            session,
+            options,
+            None,
+            pubsub_service,
+            pubsub_node,
+        )
+        defer.returnValue(progress_id)
+
+    @defer.inlineCallbacks
+    def recursive_import(
+        self,
+        client,
+        import_handler,
+        items_import_data,
+        progress_id,
+        session,
+        options,
+        return_data=None,
+        service=None,
+        node=None,
+        depth=0,
+    ):
+        """Do the import recursively
+
+        @param import_handler(object): instance of the import handler
+        @param items_import_data(iterable): iterable of data as specified in [register]
+        @param progress_id(unicode): id of progression
+        @param session(dict): data for this import session
+            can be used by importer so store any useful data
+            "root_service" and "root_node" are set to the main pubsub service and node of the import
+        @param options(dict): import options
+        @param return_data(dict): data to return on progress_finished
+        @param service(jid.JID, None): PubSub service to use
+        @param node(unicode, None): PubSub node to use
+        @param depth(int): level of recursion
+        """
+        if return_data is None:
+            return_data = {}
+        for idx, item_import_data in enumerate(items_import_data):
+            item_data = yield import_handler.import_item(
+                client, item_import_data, session, options, return_data, service, node
+            )
+            yield import_handler.item_filters(client, item_data, session, options)
+            recurse_kwargs = yield import_handler.import_sub_items(
+                client, item_import_data, item_data, session, options
+            )
+            yield import_handler.publish_item(client, item_data, service, node, session)
+
+            if recurse_kwargs is not None:
+                recurse_kwargs["client"] = client
+                recurse_kwargs["import_handler"] = import_handler
+                recurse_kwargs["progress_id"] = progress_id
+                recurse_kwargs["session"] = session
+                recurse_kwargs.setdefault("options", options)
+                recurse_kwargs["return_data"] = return_data
+                recurse_kwargs["depth"] = depth + 1
+                log.debug(_("uploading subitems"))
+                yield self.recursive_import(**recurse_kwargs)
+
+            if depth == 0:
+                client._import[import_handler.name][progress_id]["position"] = str(
+                    idx + 1
+                )
+
+        if depth == 0:
+            self.host.bridge.progress_finished(progress_id, return_data, client.profile)
+            self.host.remove_progress_cb(progress_id, client.profile)
+            del client._import[import_handler.name][progress_id]
+
+    def register(self, import_handler, name, callback, short_desc="", long_desc=""):
+        """Register an Importer method
+
+        @param name(unicode): unique importer name, should indicate the software it can import and always lowercase
+        @param callback(callable): method to call:
+            the signature must be (client, location, options) (cf. [do_import])
+            the importer must return a tuple with (items_import_data, items_count)
+            items_import_data(iterable[dict]) data specific to specialized importer
+                cf. import_item docstring of specialized importer for details
+            items_count (int, None) indicate the total number of items (without subitems)
+                useful to display a progress indicator when the iterator is a generator
+                use None if you can't guess the total number of items
+        @param short_desc(unicode): one line description of the importer
+        @param long_desc(unicode): long description of the importer, its options, etc.
+        """
+        name = name.lower()
+        if name in import_handler.importers:
+            raise exceptions.ConflictError(
+                _(
+                    "An {handler_name} importer with the name {name} already exist"
+                ).format(handler_name=import_handler.name, name=name)
+            )
+        import_handler.importers[name] = Importer(callback, short_desc, long_desc)
+
+    def unregister(self, import_handler, name):
+        del import_handler.importers[name]