view sat/plugins/plugin_import.py @ 3798:b5013bada4b6

core (memory/sqla): generic `get` + `session_add` in `delete`: new generic `get` allow to retrieve an ORM object with given column and value, and optionally join load attributes. In `delete`, it is now possible to specify instances to add to session, this allow, to update them in the same transaction. It is also now possible to specify a list of object to delete them all in the same transaction. rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents 04283582966f
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# SàT plugin for generic data import handling
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger

log = getLogger(__name__)
from twisted.internet import defer
from sat.core import exceptions
from twisted.words.protocols.jabber import jid
from functools import partial
import collections
import uuid
import json


PLUGIN_INFO = {
    C.PI_NAME: "import",
    C.PI_IMPORT_NAME: "IMPORT",
    C.PI_TYPE: C.PLUG_TYPE_IMPORT,
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "ImportPlugin",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Generic import plugin, base for specialized importers"""),
}

Importer = collections.namedtuple("Importer", ("callback", "short_desc", "long_desc"))


class ImportPlugin(object):
    def __init__(self, host):
        log.info(_("plugin Import initialization"))
        self.host = host

    def initialize(self, import_handler, name):
        """Initialize a specialized import handler

        @param import_handler(object): specialized import handler instance
            must have the following methods:
                - importItem: import a single main item (i.e. prepare data for publishing)
                - importSubitems: import sub items (i.e. items linked to main item, e.g. comments).
                    Must return a dict with kwargs for recursiveImport if items are to be imported recursively.
                    At least "items_import_data", "service" and "node" keys must be provided.
                    if None is returned, no recursion will be done to import subitems, but import can still be done directly by the method.
                - publishItem: actualy publish an item
                - itemFilters: modify item according to options
        @param name(unicode): import handler name
        """
        assert name == name.lower().strip()
        log.info(_("initializing {name} import handler").format(name=name))
        import_handler.name = name
        import_handler.register = partial(self.register, import_handler)
        import_handler.unregister = partial(self.unregister, import_handler)
        import_handler.importers = {}

        def _import(name, location, options, pubsub_service, pubsub_node, profile):
            return self._doImport(
                import_handler,
                name,
                location,
                options,
                pubsub_service,
                pubsub_node,
                profile,
            )

        def _importList():
            return self.listImporters(import_handler)

        def _importDesc(name):
            return self.getDescription(import_handler, name)

        self.host.bridge.addMethod(
            name + "Import",
            ".plugin",
            in_sign="ssa{ss}sss",
            out_sign="s",
            method=_import,
            async_=True,
        )
        self.host.bridge.addMethod(
            name + "ImportList",
            ".plugin",
            in_sign="",
            out_sign="a(ss)",
            method=_importList,
        )
        self.host.bridge.addMethod(
            name + "ImportDesc",
            ".plugin",
            in_sign="s",
            out_sign="(ss)",
            method=_importDesc,
        )

    def getProgress(self, import_handler, progress_id, profile):
        client = self.host.getClient(profile)
        return client._import[import_handler.name][progress_id]

    def listImporters(self, import_handler):
        importers = list(import_handler.importers.keys())
        importers.sort()
        return [
            (name, import_handler.importers[name].short_desc)
            for name in import_handler.importers
        ]

    def getDescription(self, import_handler, name):
        """Return import short and long descriptions

        @param name(unicode): importer name
        @return (tuple[unicode,unicode]): short and long description
        """
        try:
            importer = import_handler.importers[name]
        except KeyError:
            raise exceptions.NotFound(
                "{handler_name} importer not found [{name}]".format(
                    handler_name=import_handler.name, name=name
                )
            )
        else:
            return importer.short_desc, importer.long_desc

    def _doImport(self, import_handler, name, location, options, pubsub_service="",
                  pubsub_node="", profile=C.PROF_KEY_NONE):
        client = self.host.getClient(profile)
        options = {key: str(value) for key, value in options.items()}
        for option in import_handler.BOOL_OPTIONS:
            try:
                options[option] = C.bool(options[option])
            except KeyError:
                pass
        for option in import_handler.JSON_OPTIONS:
            try:
                options[option] = json.loads(options[option])
            except KeyError:
                pass
            except ValueError:
                raise exceptions.DataError(
                    _("invalid json option: {option}").format(option=option)
                )
        pubsub_service = jid.JID(pubsub_service) if pubsub_service else None
        return self.doImport(
            client,
            import_handler,
            str(name),
            str(location),
            options,
            pubsub_service,
            pubsub_node or None,
        )

    @defer.inlineCallbacks
    def doImport(self, client, import_handler, name, location, options=None,
                 pubsub_service=None, pubsub_node=None,):
        """Import data

        @param import_handler(object): instance of the import handler
        @param name(unicode): name of the importer
        @param location(unicode): location of the data to import
            can be an url, a file path, or anything which make sense
            check importer description for more details
        @param options(dict, None): extra options.
        @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be
            imported.
            None to use profile's server
        @param pubsub_node(unicode, None): PubSub node to use
            None to use importer's default node
        @return (unicode): progress id
        """
        if options is None:
            options = {}
        else:
            for opt_name, opt_default in import_handler.OPT_DEFAULTS.items():
                # we want a filled options dict, with all empty or False values removed
                try:
                    value = options[opt_name]
                except KeyError:
                    if opt_default:
                        options[opt_name] = opt_default
                else:
                    if not value:
                        del options[opt_name]

        try:
            importer = import_handler.importers[name]
        except KeyError:
            raise exceptions.NotFound("Importer [{}] not found".format(name))
        items_import_data, items_count = yield importer.callback(
            client, location, options
        )
        progress_id = str(uuid.uuid4())
        try:
            _import = client._import
        except AttributeError:
            _import = client._import = {}
        progress_data = _import.setdefault(import_handler.name, {})
        progress_data[progress_id] = {"position": "0"}
        if items_count is not None:
            progress_data[progress_id]["size"] = str(items_count)
        metadata = {
            "name": "{}: {}".format(name, location),
            "direction": "out",
            "type": import_handler.name.upper() + "_IMPORT",
        }
        self.host.registerProgressCb(
            progress_id,
            partial(self.getProgress, import_handler),
            metadata,
            profile=client.profile,
        )
        self.host.bridge.progressStarted(progress_id, metadata, client.profile)
        session = {  #  session data, can be used by importers
            "root_service": pubsub_service,
            "root_node": pubsub_node,
        }
        self.recursiveImport(
            client,
            import_handler,
            items_import_data,
            progress_id,
            session,
            options,
            None,
            pubsub_service,
            pubsub_node,
        )
        defer.returnValue(progress_id)

    @defer.inlineCallbacks
    def recursiveImport(
        self,
        client,
        import_handler,
        items_import_data,
        progress_id,
        session,
        options,
        return_data=None,
        service=None,
        node=None,
        depth=0,
    ):
        """Do the import recursively

        @param import_handler(object): instance of the import handler
        @param items_import_data(iterable): iterable of data as specified in [register]
        @param progress_id(unicode): id of progression
        @param session(dict): data for this import session
            can be used by importer so store any useful data
            "root_service" and "root_node" are set to the main pubsub service and node of the import
        @param options(dict): import options
        @param return_data(dict): data to return on progressFinished
        @param service(jid.JID, None): PubSub service to use
        @param node(unicode, None): PubSub node to use
        @param depth(int): level of recursion
        """
        if return_data is None:
            return_data = {}
        for idx, item_import_data in enumerate(items_import_data):
            item_data = yield import_handler.importItem(
                client, item_import_data, session, options, return_data, service, node
            )
            yield import_handler.itemFilters(client, item_data, session, options)
            recurse_kwargs = yield import_handler.importSubItems(
                client, item_import_data, item_data, session, options
            )
            yield import_handler.publishItem(client, item_data, service, node, session)

            if recurse_kwargs is not None:
                recurse_kwargs["client"] = client
                recurse_kwargs["import_handler"] = import_handler
                recurse_kwargs["progress_id"] = progress_id
                recurse_kwargs["session"] = session
                recurse_kwargs.setdefault("options", options)
                recurse_kwargs["return_data"] = return_data
                recurse_kwargs["depth"] = depth + 1
                log.debug(_("uploading subitems"))
                yield self.recursiveImport(**recurse_kwargs)

            if depth == 0:
                client._import[import_handler.name][progress_id]["position"] = str(
                    idx + 1
                )

        if depth == 0:
            self.host.bridge.progressFinished(progress_id, return_data, client.profile)
            self.host.removeProgressCb(progress_id, client.profile)
            del client._import[import_handler.name][progress_id]

    def register(self, import_handler, name, callback, short_desc="", long_desc=""):
        """Register an Importer method

        @param name(unicode): unique importer name, should indicate the software it can import and always lowercase
        @param callback(callable): method to call:
            the signature must be (client, location, options) (cf. [doImport])
            the importer must return a tuple with (items_import_data, items_count)
            items_import_data(iterable[dict]) data specific to specialized importer
                cf. importItem docstring of specialized importer for details
            items_count (int, None) indicate the total number of items (without subitems)
                useful to display a progress indicator when the iterator is a generator
                use None if you can't guess the total number of items
        @param short_desc(unicode): one line description of the importer
        @param long_desc(unicode): long description of the importer, its options, etc.
        """
        name = name.lower()
        if name in import_handler.importers:
            raise exceptions.ConflictError(
                _(
                    "An {handler_name} importer with the name {name} already exist"
                ).format(handler_name=import_handler.name, name=name)
            )
        import_handler.importers[name] = Importer(callback, short_desc, long_desc)

    def unregister(self, import_handler, name):
        del import_handler.importers[name]