Mercurial > libervia-backend
changeset 2369:cdaa58e14553
plugin import: generic data import plugin:
this plugin handle common task for importers. Specialized importers (e.g. blog import) use it as a basic, and specific importers (e.g. Dotclear) register to the specialized one.
Blog importer generic method have been moved to it.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 01 Oct 2017 12:21:23 +0200 |
parents | 3865a772c360 |
children | 2c2b826b0bb3 |
files | src/core/constants.py src/plugins/plugin_blog_import.py src/plugins/plugin_import.py |
diffstat | 3 files changed, 302 insertions(+), 190 deletions(-) [+] |
line wrap: on
line diff
--- a/src/core/constants.py Sun Oct 01 12:21:23 2017 +0200 +++ b/src/core/constants.py Sun Oct 01 12:21:23 2017 +0200 @@ -185,7 +185,7 @@ PI_IMPORT_NAME = u'import_name' PI_MAIN = u'main' PI_HANDLER = u'handler' - PI_TYPE = u'type' + PI_TYPE = u'type' # FIXME: should be types, and should handle single unicode type or tuple of types (e.g. "blog" and "import") PI_MODES = u'modes' PI_PROTOCOLS = u'protocols' PI_DEPENDENCIES = u'dependencies' @@ -200,6 +200,7 @@ PLUG_TYPE_SEC = "SEC" PLUG_TYPE_SYNTAXE = "SYNTAXE" PLUG_TYPE_BLOG = "BLOG" + PLUG_TYPE_IMPORT = "IMPORT" PLUG_TYPE_ENTRY_POINT = "ENTRY_POINT" # Modes
--- a/src/plugins/plugin_blog_import.py Sun Oct 01 12:21:23 2017 +0200 +++ b/src/plugins/plugin_blog_import.py Sun Oct 01 12:21:23 2017 +0200 @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger @@ -31,18 +32,18 @@ import os.path import tempfile import urlparse -import uuid +import shortuuid PLUGIN_INFO = { C.PI_NAME: "blog import", C.PI_IMPORT_NAME: "BLOG_IMPORT", - C.PI_TYPE: C.PLUG_TYPE_BLOG, - C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0277", "TEXT-SYNTAXES", "UPLOAD"], + C.PI_TYPE: (C.PLUG_TYPE_BLOG, C.PLUG_TYPE_IMPORT), + C.PI_DEPENDENCIES: ["IMPORT", "XEP-0060", "XEP-0277", "TEXT-SYNTAXES", "UPLOAD"], C.PI_MAIN: "BlogImportPlugin", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _(u"""Blog import management: -This plugin manage the different blog importers which can register to it, and handler generic importing tasks.""") +This plugin manage the different blog importers which can register to it, and handle generic importing tasks.""") } OPT_HOST = 'host' @@ -50,67 +51,45 @@ OPT_UPLOAD_IGNORE_HOST = 'upload_ignore_host' OPT_IGNORE_TLS = 'ignore_tls_errors' URL_REDIRECT_PREFIX = 'url_redirect_' -BOOL_OPTIONS = (OPT_UPLOAD_IMAGES, OPT_IGNORE_TLS) BlogImporter = collections.namedtuple('BlogImporter', ('callback', 'short_desc', 'long_desc')) class BlogImportPlugin(object): + BOOL_OPTIONS = (OPT_UPLOAD_IMAGES, OPT_IGNORE_TLS) + OPT_DEFAULTS = {OPT_UPLOAD_IMAGES: True, + OPT_IGNORE_TLS: False} def __init__(self, host): log.info(_("plugin Blog Import initialization")) self.host = host - self._importers = {} self._u = host.plugins['UPLOAD'] self._p = host.plugins['XEP-0060'] self._m = host.plugins['XEP-0277'] self._s = self.host.plugins['TEXT-SYNTAXES'] - host.bridge.addMethod("blogImport", ".plugin", in_sign='ssa{ss}ss', out_sign='s', method=self._blogImport, async=True) - host.bridge.addMethod("blogImportList", ".plugin", in_sign='', out_sign='a(ss)', method=self.listImporters) - host.bridge.addMethod("blogImportDesc", ".plugin", in_sign='s', out_sign='(ss)', method=self.getDescription) - - def getProgress(self, progress_id, profile): - client = self.host.getClient(profile) - return client._blogImport_progress[progress_id] + host.plugins['IMPORT'].initialize(self, u'blog') - def listImporters(self): - importers = self._importers.keys() - importers.sort() - return [(name, self._importers[name].short_desc) for name in self._importers] - - def getDescription(self, name): - """Return import short and long descriptions + def importItem(self, client, item_import_data, options, return_data, service, node): + """importItem specialized for blog import - @param name(unicode): blog importer name - @return (tuple[unicode,unicode]): short and long description - """ - try: - importer = self._importers[name] - except KeyError: - raise exceptions.NotFound(u"Blog importer not found [{}]".format(name)) - else: - return importer.short_desc, importer.long_desc + @param items_import_data(iterable[dict]): + * mandatory keys: + 'blog' (dict): microblog data of the blog post (cf. http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en) + the importer MUST NOT create node or call XEP-0277 plugin itself + 'comments*' key MUST NOT be used in this microblog_data, see bellow for comments + It is recommanded to use a unique id in the "id" key which is constant per blog item, + so if the import fail, a new import will overwrite the failed items and avoid duplicates. - def _blogImport(self, name, location, options, pubsub_service='', profile=C.PROF_KEY_DEFAULT): - client = self.host.getClient(profile) - options = {key: unicode(value) for key, value in options.iteritems()} - for option in BOOL_OPTIONS: - try: - options[option] = C.bool(options[option]) - except KeyError: - pass - return self.blogImport(client, unicode(name), unicode(location), options) - - @defer.inlineCallbacks - def blogImport(self, client, name, location, options=None, pubsub_service=None): - """Import a blog - - @param name(unicode): name of the blog importer - @param location(unicode): location of the blog data to import - can be an url, a file path, or anything which make sense - check importer description for more details - @param options(dict, None): extra options. Below are the generic options, + 'comments' (list[list[dict]],None): Dictionaries must have the same keys as main item (i.e. 'blog' and 'comments') + a list of list is used because XEP-0277 can handler several comments nodes, + but in most cases, there will we only one item it the first list (something like [[{comment1_data},{comment2_data}, ...]]) + blog['allow_comments'] must be True if there is any comment, and False (or not present) if comments are not allowed. + If allow_comments is False and some comments are present, an exceptions.DataError will be raised + * optional keys: + 'url' (unicode): former url of the post (only the path, without host part) + if present the association to the new path will be displayed to user, so it can make redirections if necessary + @param options(dict, None): Below are the generic options, blog importer can have specific ones. All options have unicode values generic options: - OPT_HOST (unicode): original host @@ -120,114 +99,56 @@ - OPT_UPLOAD_IGNORE_HOST (unicode): don't upload images from this host - OPT_IGNORE_TLS (bool): ignore TLS error for image upload. Default: False - @param pubsub_service(jid.JID, None): jid of the PubSub service where blog must be imported - None to use profile's server - @return (unicode): progress id + @param return_data(dict): will contain link between former posts and new items + """ - if options is None: - options = {} - else: - for opt_name, opt_default in ((OPT_UPLOAD_IMAGES, True), - (OPT_IGNORE_TLS, False)): - # we want an filled options dict, with all empty or False values removed - try: - value =options[opt_name] - except KeyError: - if opt_default: - options[opt_name] = opt_default - else: - if not value: - del options[opt_name] + mb_data = item_import_data['blog'] + try: + item_id = mb_data['id'] + except KeyError: + item_id = mb_data['id'] = unicode(shortuuid.uuid()) + try: - importer = self._importers[name] + # we keep the link between old url and new blog item + # so the user can redirect its former blog urls + old_uri = item_import_data['url'] except KeyError: - raise exceptions.NotFound(u"Importer [{}] not found".format(name)) - posts_data, posts_count = yield importer.callback(client, location, options) - url_redirect = {} - progress_id = unicode(uuid.uuid4()) - try: - progress_data = client._blogImport_progress - except AttributeError: - progress_data = client._blogImport_progress = {} - progress_data[progress_id] = {u'position': '0'} - if posts_count is not None: - progress_data[progress_id]['size'] = unicode(posts_count) - metadata = {'name': u'{}: {}'.format(name, location), - 'direction': 'out', - 'type': 'BLOG_IMPORT' - } - self.host.registerProgressCb(progress_id, self.getProgress, metadata, profile=client.profile) - self.host.bridge.progressStarted(progress_id, metadata, client.profile) - self._recursiveImport(client, posts_data, progress_id, options, url_redirect) - defer.returnValue(progress_id) + pass + else: + new_uri = return_data[URL_REDIRECT_PREFIX + old_uri] = self._p.getNodeURI( + service if service is not None else client.jid.userhostJID(), + node or self._m.namespace, + item_id) + log.info(u"url link from {old} to {new}".format( + old=old_uri, new=new_uri)) + + return mb_data + + def importSubItems(self, client, item_import_data, mb_data, options): + # comments data + if len(item_import_data['comments']) != 1: + raise NotImplementedError(u"can't manage multiple comment links") + allow_comments = C.bool(mb_data.get('allow_comments', C.BOOL_FALSE)) + if allow_comments: + comments_service, comments_node = self._m.getCommentsService(client), self._m.getCommentsNode(mb_data['id']) + mb_data['comments_service'] = comments_service.full() + mb_data['comments_node'] = comments_node + recurse_kwargs = { + 'items_import_data':item_import_data['comments'][0], + 'service':comments_service, + 'node':comments_node} + return recurse_kwargs + else: + if item_import_data['comments'][0]: + raise exceptions.DataError(u"allow_comments set to False, but comments are there") + return None + + def publishItem(self, client, mb_data, service, node): + log.debug(u"uploading item [{id}]: {title}".format(id=mb_data['id'], title=mb_data.get('title',''))) + return self._m.send(client, mb_data, service, node) @defer.inlineCallbacks - def _recursiveImport(self, client, posts_data, progress_id, options, url_redirect, service=None, node=None, depth=0): - """Do the upload recursively - - @param posts_data(list): list of data as specified in [register] - @param options(dict): import options - @param url_redirect(dict): link between former posts and new items - @param service(jid.JID, None): PubSub service to use - @param node(unicode, None): PubSub node to use - @param depth(int): level of recursion - """ - for idx, data in enumerate(posts_data): - # data checks/filters - mb_data = data['blog'] - try: - item_id = mb_data['id'] - except KeyError: - item_id = mb_data['id'] = unicode(uuid.uuid4()) - - try: - # we keep the link between old url and new blog item - # so the user can redirect its former blog urls - old_uri = data['url'] - except KeyError: - pass - else: - new_uri = url_redirect[old_uri] = self._p.getNodeURI( - service if service is not None else client.jid.userhostJID(), - node or self._m.namespace, - item_id) - log.info(u"url link from {old} to {new}".format( - old=old_uri, new=new_uri)) - - yield self.blogFilters(client, mb_data, options) - - # comments data - if len(data['comments']) != 1: - raise NotImplementedError(u"can't manage multiple comment links") - allow_comments = C.bool(mb_data.get('allow_comments', C.BOOL_FALSE)) - if allow_comments: - comments_service, comments_node = self._m.getCommentsService(client), self._m.getCommentsNode(item_id) - mb_data['comments_service'] = comments_service.full() - mb_data['comments_node'] = comments_node - else: - if data['comments'][0]: - raise exceptions.DataError(u"allow_comments set to False, but comments are there") - - # post upload - depth or log.debug(u"uploading item [{id}]: {title}".format(id=mb_data['id'], title=mb_data.get('title',''))) - yield self._m.send(mb_data, service, node, profile=client.profile) - - # comments upload - depth or log.debug(u"uploading comments") - if allow_comments: - yield self._recursiveImport(client, data['comments'][0], progress_id, options, url_redirect, service=comments_service, node=comments_node, depth=depth+1) - if depth == 0: - client._blogImport_progress[progress_id]['position'] = unicode(idx+1) - - if depth == 0: - self.host.bridge.progressFinished(progress_id, - {u'{}{}'.format(URL_REDIRECT_PREFIX, old): new for old, new in url_redirect.iteritems()}, - client.profile) - self.host.removeProgressCb(progress_id, client.profile) - del client._blogImport_progress[progress_id] - - @defer.inlineCallbacks - def blogFilters(self, client, mb_data, options): + def itemFilters(self, client, mb_data, options): """Apply filters according to options modify mb_data in place @@ -352,41 +273,3 @@ os.unlink(tmp_file) except OSError: pass - - def register(self, name, callback, short_desc='', long_desc=''): - """Register a blogImport method - - @param name(unicode): unique importer name, should indicate the blogging software it handler and always lowercase - @param callback(callable): method to call: - the signature must be (client, location, options) (cf. [blogImport]) - the importer must return a tuple with (posts_data, posts_count) - - posts_data is an iterable of dict which must have the following keys: - 'blog' (dict): microblog data of the blog post (cf. http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en) - the importer MUST NOT create node or call XEP-0277 plugin itself - 'comments*' key MUST NOT be used in this microblog_data, see bellow for comments - It is recommanded to use a unique id in the "id" key which is constant per blog item, - so if the import fail, a new import will overwrite the failed items and avoid duplicates. - - 'comments' (list[list[dict]],None): Dictionaries must have the same keys as main item (i.e. 'blog' and 'comments') - a list of list is used because XEP-0277 can handler several comments nodes, - but in most cases, there will we only one item it the first list (something like [[{comment1_data},{comment2_data}, ...]]) - blog['allow_comments'] must be True if there is any comment, and False (or not present) if comments are not allowed. - If allow_comments is False and some comments are present, a exceptions.DataError will be raised - the import MAY optionally have the following keys: - 'url' (unicode): former url of the post (only the path, without host part) - if present the association to the new path will be displayed to user, so it can make redirections if necessary - - posts_count (int, None) indicate the total number of posts (without comments) - useful to display a progress indicator when the iterator is a generator - use None if you can't guess the total number of blog posts - @param short_desc(unicode): one line description of the importer - @param long_desc(unicode): long description of the importer, its options, etc. - """ - name = name.lower() - if name in self._importers: - raise exceptions.ConflictError(u"A blog importer with the name {} already exsit".format(name)) - self._importers[name] = BlogImporter(callback, short_desc, long_desc) - - def unregister(self, name): - del self._importers[name]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_import.py Sun Oct 01 12:21:23 2017 +0200 @@ -0,0 +1,228 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SàT plugin for generic data import handling +# Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core.log import getLogger +log = getLogger(__name__) +from twisted.internet import defer +from sat.core import exceptions +from functools import partial +import collections +import uuid + + +PLUGIN_INFO = { + C.PI_NAME: "import", + C.PI_IMPORT_NAME: "IMPORT", + C.PI_TYPE: C.PLUG_TYPE_IMPORT, + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "ImportPlugin", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _(u"""Generic import plugin, base for specialized importers""") +} + +Importer = collections.namedtuple('Importer', ('callback', 'short_desc', 'long_desc')) + + +class ImportPlugin(object): + + def __init__(self, host): + log.info(_("plugin Import initialization")) + self.host = host + + def initialize(self, import_handler, name): + """Initialize a specialized import handler + + @param import_handler(object): specialized import handler instance + must have the following methods: + - importItem: import a single main item (i.e. prepare data for publishing) + - importSubitems: import sub items (i.e. items linked to main item, e.g. comments). Must return a dict with kwargs for recursiveImport if items are to be imported. At east "items_import_data", "service" and "node" keys must be provided. + if None is returned, no subitems will be imported + - publishItem: actualy publish an item + - itemFilters: modify item according to options + @param name(unicode): import handler name + """ + assert name == name.lower().strip() + log.info(_(u'initializing {name} import handler').format(name=name)) + import_handler.name = name + import_handler.register = partial(self.register, import_handler) + import_handler.unregister = partial(self.unregister, import_handler) + import_handler.importers = {} + def _import(name, location, options, pubsub_service, profile): + return self._doImport(import_handler, name, location, options, pubsub_service, profile) + def _importList(): + return self.listImporters(import_handler) + def _importDesc(name): + return self.getDescription(import_handler, name) + + self.host.bridge.addMethod(name + "Import", ".plugin", in_sign='ssa{ss}ss', out_sign='s', method=_import, async=True) + self.host.bridge.addMethod(name + "ImportList", ".plugin", in_sign='', out_sign='a(ss)', method=_importList) + self.host.bridge.addMethod(name + "ImportDesc", ".plugin", in_sign='s', out_sign='(ss)', method=_importDesc) + + def getProgress(self, import_handler, progress_id, profile): + client = self.host.getClient(profile) + return client._import[import_handler.name][progress_id] + + def listImporters(self, import_handler): + importers = import_handler.importers.keys() + importers.sort() + return [(name, import_handler.importers[name].short_desc) for name in import_handler.importers] + + def getDescription(self, import_handler, name): + """Return import short and long descriptions + + @param name(unicode): importer name + @return (tuple[unicode,unicode]): short and long description + """ + try: + importer = import_handler.importers[name] + except KeyError: + raise exceptions.NotFound(u"{handler_name} importer not found [{name}]".format( + handler_name = import_handler.name, + name = name)) + else: + return importer.short_desc, importer.long_desc + + def _doImport(self, import_handler, name, location, options, pubsub_service='', profile=C.PROF_KEY_NONE): + client = self.host.getClient(profile) + options = {key: unicode(value) for key, value in options.iteritems()} + for option in import_handler.BOOL_OPTIONS: + try: + options[option] = C.bool(options[option]) + except KeyError: + pass + return self.doImport(client, import_handler, unicode(name), unicode(location), options) + + @defer.inlineCallbacks + def doImport(self, client, import_handler, name, location, options=None, pubsub_service=None): + """Import data + + @param import_handler(object): instance of the import handler + @param name(unicode): name of the importer + @param location(unicode): location of the data to import + can be an url, a file path, or anything which make sense + check importer description for more details + @param options(dict, None): extra options. + @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be imported + None to use profile's server + @return (unicode): progress id + """ + if options is None: + options = {} + else: + for opt_name, opt_default in import_handler.OPT_DEFAULTS.iteritems(): + # we want a filled options dict, with all empty or False values removed + try: + value =options[opt_name] + except KeyError: + if opt_default: + options[opt_name] = opt_default + else: + if not value: + del options[opt_name] + try: + importer = import_handler.importers[name] + except KeyError: + raise exceptions.NotFound(u"Importer [{}] not found".format(name)) + items_import_data, items_count = yield importer.callback(client, location, options) + progress_id = unicode(uuid.uuid4()) + try: + _import = client._import + except AttributeError: + _import = client._import = {} + progress_data = _import.setdefault(import_handler.name, {}) + progress_data[progress_id] = {u'position': '0'} + if items_count is not None: + progress_data[progress_id]['size'] = unicode(items_count) + metadata = {'name': u'{}: {}'.format(name, location), + 'direction': 'out', + 'type': import_handler.name.upper() + '_IMPORT' + } + self.host.registerProgressCb(progress_id, partial(self.getProgress, import_handler), metadata, profile=client.profile) + self.host.bridge.progressStarted(progress_id, metadata, client.profile) + url_redirect = {} + self.recursiveImport(client, import_handler, items_import_data, progress_id, options, url_redirect) + defer.returnValue(progress_id) + + @defer.inlineCallbacks + def recursiveImport(self, client, import_handler, items_import_data, progress_id, options, return_data=None, service=None, node=None, depth=0): + """Do the import recursively + + @param import_handler(object): instance of the import handler + @param items_import_data(iterable): iterable of data as specified in [register] + @param progress_id(unicode): id of progression + @param options(dict): import options + @param return_data(dict): data to return on progressFinished + @param service(jid.JID, None): PubSub service to use + @param node(unicode, None): PubSub node to use + @param depth(int): level of recursion + """ + if return_data is None: + return_data = {} + for idx, item_import_data in enumerate(items_import_data): + item_data = yield import_handler.importItem(client, item_import_data, options, return_data, service, node) + yield import_handler.itemFilters(client, item_data, options) + recurse_kwargs = yield import_handler.importSubItems(client, item_import_data, item_data, options) + yield import_handler.publishItem(client, item_data, service, node) + + if recurse_kwargs is not None: + recurse_kwargs['client'] = client + recurse_kwargs['import_handler'] = import_handler + recurse_kwargs['progress_id'] = progress_id + recurse_kwargs.setdefault('options', options) + recurse_kwargs['return_data'] = return_data + recurse_kwargs['depth'] = depth + 1 + log.debug(_(u"uploading subitems")) + yield self.recursiveImport(**recurse_kwargs) + + if depth == 0: + client._import[import_handler.name][progress_id]['position'] = unicode(idx+1) + + if depth == 0: + self.host.bridge.progressFinished(progress_id, + return_data, + client.profile) + self.host.removeProgressCb(progress_id, client.profile) + del client._import[import_handler.name][progress_id] + + def register(self, import_handler, name, callback, short_desc='', long_desc=''): + """Register an Importer method + + @param name(unicode): unique importer name, should indicate the software it can import and always lowercase + @param callback(callable): method to call: + the signature must be (client, location, options) (cf. [doImport]) + the importer must return a tuple with (items_import_data, items_count) + items_import_data(iterable[dict]) data specific to specialized importer + cf. importItem docstring of specialized importer for details + items_count (int, None) indicate the total number of items (without subitems) + useful to display a progress indicator when the iterator is a generator + use None if you can't guess the total number of items + @param short_desc(unicode): one line description of the importer + @param long_desc(unicode): long description of the importer, its options, etc. + """ + name = name.lower() + if name in import_handler.importers: + raise exceptions.ConflictError(_(u"An {handler_name} importer with the name {name} already exist").format( + handler_name = import_handler.name, + name = name)) + import_handler.importers[name] = Importer(callback, short_desc, long_desc) + + def unregister(self, import_handler, name): + del import_handler.importers[name]