comparison sat/plugins/plugin_import.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/plugins/plugin_import.py@0046283a285d
children 56f94936df1e
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SàT plugin for generic data import handling
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger
23 log = getLogger(__name__)
24 from twisted.internet import defer
25 from sat.core import exceptions
26 from twisted.words.protocols.jabber import jid
27 from functools import partial
28 import collections
29 import uuid
30 import json
31
32
33 PLUGIN_INFO = {
34 C.PI_NAME: "import",
35 C.PI_IMPORT_NAME: "IMPORT",
36 C.PI_TYPE: C.PLUG_TYPE_IMPORT,
37 C.PI_DEPENDENCIES: [],
38 C.PI_MAIN: "ImportPlugin",
39 C.PI_HANDLER: "no",
40 C.PI_DESCRIPTION: _(u"""Generic import plugin, base for specialized importers""")
41 }
42
43 Importer = collections.namedtuple('Importer', ('callback', 'short_desc', 'long_desc'))
44
45
46 class ImportPlugin(object):
47
48 def __init__(self, host):
49 log.info(_("plugin Import initialization"))
50 self.host = host
51
52 def initialize(self, import_handler, name):
53 """Initialize a specialized import handler
54
55 @param import_handler(object): specialized import handler instance
56 must have the following methods:
57 - importItem: import a single main item (i.e. prepare data for publishing)
58 - importSubitems: import sub items (i.e. items linked to main item, e.g. comments).
59 Must return a dict with kwargs for recursiveImport if items are to be imported recursively.
60 At least "items_import_data", "service" and "node" keys must be provided.
61 if None is returned, no recursion will be done to import subitems, but import can still be done directly by the method.
62 - publishItem: actualy publish an item
63 - itemFilters: modify item according to options
64 @param name(unicode): import handler name
65 """
66 assert name == name.lower().strip()
67 log.info(_(u'initializing {name} import handler').format(name=name))
68 import_handler.name = name
69 import_handler.register = partial(self.register, import_handler)
70 import_handler.unregister = partial(self.unregister, import_handler)
71 import_handler.importers = {}
72 def _import(name, location, options, pubsub_service, pubsub_node, profile):
73 return self._doImport(import_handler, name, location, options, pubsub_service, pubsub_node, profile)
74 def _importList():
75 return self.listImporters(import_handler)
76 def _importDesc(name):
77 return self.getDescription(import_handler, name)
78
79 self.host.bridge.addMethod(name + "Import", ".plugin", in_sign='ssa{ss}sss', out_sign='s', method=_import, async=True)
80 self.host.bridge.addMethod(name + "ImportList", ".plugin", in_sign='', out_sign='a(ss)', method=_importList)
81 self.host.bridge.addMethod(name + "ImportDesc", ".plugin", in_sign='s', out_sign='(ss)', method=_importDesc)
82
83 def getProgress(self, import_handler, progress_id, profile):
84 client = self.host.getClient(profile)
85 return client._import[import_handler.name][progress_id]
86
87 def listImporters(self, import_handler):
88 importers = import_handler.importers.keys()
89 importers.sort()
90 return [(name, import_handler.importers[name].short_desc) for name in import_handler.importers]
91
92 def getDescription(self, import_handler, name):
93 """Return import short and long descriptions
94
95 @param name(unicode): importer name
96 @return (tuple[unicode,unicode]): short and long description
97 """
98 try:
99 importer = import_handler.importers[name]
100 except KeyError:
101 raise exceptions.NotFound(u"{handler_name} importer not found [{name}]".format(
102 handler_name = import_handler.name,
103 name = name))
104 else:
105 return importer.short_desc, importer.long_desc
106
107 def _doImport(self, import_handler, name, location, options, pubsub_service='', pubsub_node='', profile=C.PROF_KEY_NONE):
108 client = self.host.getClient(profile)
109 options = {key: unicode(value) for key, value in options.iteritems()}
110 for option in import_handler.BOOL_OPTIONS:
111 try:
112 options[option] = C.bool(options[option])
113 except KeyError:
114 pass
115 for option in import_handler.JSON_OPTIONS:
116 try:
117 options[option] = json.loads(options[option])
118 except ValueError:
119 raise exceptions.DataError(_(u'invalid json option: {name}').format(name=option))
120 pubsub_service = jid.JID(pubsub_service) if pubsub_service else None
121 return self.doImport(client, import_handler, unicode(name), unicode(location), options, pubsub_service, pubsub_node or None)
122
123 @defer.inlineCallbacks
124 def doImport(self, client, import_handler, name, location, options=None, pubsub_service=None, pubsub_node=None):
125 """Import data
126
127 @param import_handler(object): instance of the import handler
128 @param name(unicode): name of the importer
129 @param location(unicode): location of the data to import
130 can be an url, a file path, or anything which make sense
131 check importer description for more details
132 @param options(dict, None): extra options.
133 @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be imported
134 None to use profile's server
135 @param pubsub_node(unicode, None): PubSub node to use
136 None to use importer's default node
137 @return (unicode): progress id
138 """
139 if options is None:
140 options = {}
141 else:
142 for opt_name, opt_default in import_handler.OPT_DEFAULTS.iteritems():
143 # we want a filled options dict, with all empty or False values removed
144 try:
145 value =options[opt_name]
146 except KeyError:
147 if opt_default:
148 options[opt_name] = opt_default
149 else:
150 if not value:
151 del options[opt_name]
152
153 try:
154 importer = import_handler.importers[name]
155 except KeyError:
156 raise exceptions.NotFound(u"Importer [{}] not found".format(name))
157 items_import_data, items_count = yield importer.callback(client, location, options)
158 progress_id = unicode(uuid.uuid4())
159 try:
160 _import = client._import
161 except AttributeError:
162 _import = client._import = {}
163 progress_data = _import.setdefault(import_handler.name, {})
164 progress_data[progress_id] = {u'position': '0'}
165 if items_count is not None:
166 progress_data[progress_id]['size'] = unicode(items_count)
167 metadata = {'name': u'{}: {}'.format(name, location),
168 'direction': 'out',
169 'type': import_handler.name.upper() + '_IMPORT'
170 }
171 self.host.registerProgressCb(progress_id, partial(self.getProgress, import_handler), metadata, profile=client.profile)
172 self.host.bridge.progressStarted(progress_id, metadata, client.profile)
173 session = { # session data, can be used by importers
174 u'root_service': pubsub_service,
175 u'root_node': pubsub_node
176 }
177 self.recursiveImport(client, import_handler, items_import_data, progress_id, session, options, None, pubsub_service, pubsub_node)
178 defer.returnValue(progress_id)
179
180 @defer.inlineCallbacks
181 def recursiveImport(self, client, import_handler, items_import_data, progress_id, session, options, return_data=None, service=None, node=None, depth=0):
182 """Do the import recursively
183
184 @param import_handler(object): instance of the import handler
185 @param items_import_data(iterable): iterable of data as specified in [register]
186 @param progress_id(unicode): id of progression
187 @param session(dict): data for this import session
188 can be used by importer so store any useful data
189 "root_service" and "root_node" are set to the main pubsub service and node of the import
190 @param options(dict): import options
191 @param return_data(dict): data to return on progressFinished
192 @param service(jid.JID, None): PubSub service to use
193 @param node(unicode, None): PubSub node to use
194 @param depth(int): level of recursion
195 """
196 if return_data is None:
197 return_data = {}
198 for idx, item_import_data in enumerate(items_import_data):
199 item_data = yield import_handler.importItem(client, item_import_data, session, options, return_data, service, node)
200 yield import_handler.itemFilters(client, item_data, session, options)
201 recurse_kwargs = yield import_handler.importSubItems(client, item_import_data, item_data, session, options)
202 yield import_handler.publishItem(client, item_data, service, node, session)
203
204 if recurse_kwargs is not None:
205 recurse_kwargs['client'] = client
206 recurse_kwargs['import_handler'] = import_handler
207 recurse_kwargs['progress_id'] = progress_id
208 recurse_kwargs['session'] = session
209 recurse_kwargs.setdefault('options', options)
210 recurse_kwargs['return_data'] = return_data
211 recurse_kwargs['depth'] = depth + 1
212 log.debug(_(u"uploading subitems"))
213 yield self.recursiveImport(**recurse_kwargs)
214
215 if depth == 0:
216 client._import[import_handler.name][progress_id]['position'] = unicode(idx+1)
217
218 if depth == 0:
219 self.host.bridge.progressFinished(progress_id,
220 return_data,
221 client.profile)
222 self.host.removeProgressCb(progress_id, client.profile)
223 del client._import[import_handler.name][progress_id]
224
225 def register(self, import_handler, name, callback, short_desc='', long_desc=''):
226 """Register an Importer method
227
228 @param name(unicode): unique importer name, should indicate the software it can import and always lowercase
229 @param callback(callable): method to call:
230 the signature must be (client, location, options) (cf. [doImport])
231 the importer must return a tuple with (items_import_data, items_count)
232 items_import_data(iterable[dict]) data specific to specialized importer
233 cf. importItem docstring of specialized importer for details
234 items_count (int, None) indicate the total number of items (without subitems)
235 useful to display a progress indicator when the iterator is a generator
236 use None if you can't guess the total number of items
237 @param short_desc(unicode): one line description of the importer
238 @param long_desc(unicode): long description of the importer, its options, etc.
239 """
240 name = name.lower()
241 if name in import_handler.importers:
242 raise exceptions.ConflictError(_(u"An {handler_name} importer with the name {name} already exist").format(
243 handler_name = import_handler.name,
244 name = name))
245 import_handler.importers[name] = Importer(callback, short_desc, long_desc)
246
247 def unregister(self, import_handler, name):
248 del import_handler.importers[name]