comparison libervia/backend/plugins/plugin_import.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200 (19 months ago)
parents sat/plugins/plugin_import.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SàT plugin for generic data import handling
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.backend.core.i18n import _
21 from libervia.backend.core.constants import Const as C
22 from libervia.backend.core.log import getLogger
23
24 log = getLogger(__name__)
25 from twisted.internet import defer
26 from libervia.backend.core import exceptions
27 from twisted.words.protocols.jabber import jid
28 from functools import partial
29 import collections
30 import uuid
31 import json
32
33
34 PLUGIN_INFO = {
35 C.PI_NAME: "import",
36 C.PI_IMPORT_NAME: "IMPORT",
37 C.PI_TYPE: C.PLUG_TYPE_IMPORT,
38 C.PI_DEPENDENCIES: [],
39 C.PI_MAIN: "ImportPlugin",
40 C.PI_HANDLER: "no",
41 C.PI_DESCRIPTION: _("""Generic import plugin, base for specialized importers"""),
42 }
43
44 Importer = collections.namedtuple("Importer", ("callback", "short_desc", "long_desc"))
45
46
47 class ImportPlugin(object):
48 def __init__(self, host):
49 log.info(_("plugin import initialization"))
50 self.host = host
51
52 def initialize(self, import_handler, name):
53 """Initialize a specialized import handler
54
55 @param import_handler(object): specialized import handler instance
56 must have the following methods:
57 - import_item: import a single main item (i.e. prepare data for publishing)
58 - importSubitems: import sub items (i.e. items linked to main item, e.g. comments).
59 Must return a dict with kwargs for recursive_import if items are to be imported recursively.
60 At least "items_import_data", "service" and "node" keys must be provided.
61 if None is returned, no recursion will be done to import subitems, but import can still be done directly by the method.
62 - publish_item: actualy publish an item
63 - item_filters: modify item according to options
64 @param name(unicode): import handler name
65 """
66 assert name == name.lower().strip()
67 log.info(_("initializing {name} import handler").format(name=name))
68 import_handler.name = name
69 import_handler.register = partial(self.register, import_handler)
70 import_handler.unregister = partial(self.unregister, import_handler)
71 import_handler.importers = {}
72
73 def _import(name, location, options, pubsub_service, pubsub_node, profile):
74 return self._do_import(
75 import_handler,
76 name,
77 location,
78 options,
79 pubsub_service,
80 pubsub_node,
81 profile,
82 )
83
84 def _import_list():
85 return self.list_importers(import_handler)
86
87 def _import_desc(name):
88 return self.getDescription(import_handler, name)
89
90 self.host.bridge.add_method(
91 name + "import",
92 ".plugin",
93 in_sign="ssa{ss}sss",
94 out_sign="s",
95 method=_import,
96 async_=True,
97 )
98 self.host.bridge.add_method(
99 name + "ImportList",
100 ".plugin",
101 in_sign="",
102 out_sign="a(ss)",
103 method=_import_list,
104 )
105 self.host.bridge.add_method(
106 name + "ImportDesc",
107 ".plugin",
108 in_sign="s",
109 out_sign="(ss)",
110 method=_import_desc,
111 )
112
113 def get_progress(self, import_handler, progress_id, profile):
114 client = self.host.get_client(profile)
115 return client._import[import_handler.name][progress_id]
116
117 def list_importers(self, import_handler):
118 importers = list(import_handler.importers.keys())
119 importers.sort()
120 return [
121 (name, import_handler.importers[name].short_desc)
122 for name in import_handler.importers
123 ]
124
125 def getDescription(self, import_handler, name):
126 """Return import short and long descriptions
127
128 @param name(unicode): importer name
129 @return (tuple[unicode,unicode]): short and long description
130 """
131 try:
132 importer = import_handler.importers[name]
133 except KeyError:
134 raise exceptions.NotFound(
135 "{handler_name} importer not found [{name}]".format(
136 handler_name=import_handler.name, name=name
137 )
138 )
139 else:
140 return importer.short_desc, importer.long_desc
141
142 def _do_import(self, import_handler, name, location, options, pubsub_service="",
143 pubsub_node="", profile=C.PROF_KEY_NONE):
144 client = self.host.get_client(profile)
145 options = {key: str(value) for key, value in options.items()}
146 for option in import_handler.BOOL_OPTIONS:
147 try:
148 options[option] = C.bool(options[option])
149 except KeyError:
150 pass
151 for option in import_handler.JSON_OPTIONS:
152 try:
153 options[option] = json.loads(options[option])
154 except KeyError:
155 pass
156 except ValueError:
157 raise exceptions.DataError(
158 _("invalid json option: {option}").format(option=option)
159 )
160 pubsub_service = jid.JID(pubsub_service) if pubsub_service else None
161 return self.do_import(
162 client,
163 import_handler,
164 str(name),
165 str(location),
166 options,
167 pubsub_service,
168 pubsub_node or None,
169 )
170
171 @defer.inlineCallbacks
172 def do_import(self, client, import_handler, name, location, options=None,
173 pubsub_service=None, pubsub_node=None,):
174 """import data
175
176 @param import_handler(object): instance of the import handler
177 @param name(unicode): name of the importer
178 @param location(unicode): location of the data to import
179 can be an url, a file path, or anything which make sense
180 check importer description for more details
181 @param options(dict, None): extra options.
182 @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be
183 imported.
184 None to use profile's server
185 @param pubsub_node(unicode, None): PubSub node to use
186 None to use importer's default node
187 @return (unicode): progress id
188 """
189 if options is None:
190 options = {}
191 else:
192 for opt_name, opt_default in import_handler.OPT_DEFAULTS.items():
193 # we want a filled options dict, with all empty or False values removed
194 try:
195 value = options[opt_name]
196 except KeyError:
197 if opt_default:
198 options[opt_name] = opt_default
199 else:
200 if not value:
201 del options[opt_name]
202
203 try:
204 importer = import_handler.importers[name]
205 except KeyError:
206 raise exceptions.NotFound("Importer [{}] not found".format(name))
207 items_import_data, items_count = yield importer.callback(
208 client, location, options
209 )
210 progress_id = str(uuid.uuid4())
211 try:
212 _import = client._import
213 except AttributeError:
214 _import = client._import = {}
215 progress_data = _import.setdefault(import_handler.name, {})
216 progress_data[progress_id] = {"position": "0"}
217 if items_count is not None:
218 progress_data[progress_id]["size"] = str(items_count)
219 metadata = {
220 "name": "{}: {}".format(name, location),
221 "direction": "out",
222 "type": import_handler.name.upper() + "_IMPORT",
223 }
224 self.host.register_progress_cb(
225 progress_id,
226 partial(self.get_progress, import_handler),
227 metadata,
228 profile=client.profile,
229 )
230 self.host.bridge.progress_started(progress_id, metadata, client.profile)
231 session = { #  session data, can be used by importers
232 "root_service": pubsub_service,
233 "root_node": pubsub_node,
234 }
235 self.recursive_import(
236 client,
237 import_handler,
238 items_import_data,
239 progress_id,
240 session,
241 options,
242 None,
243 pubsub_service,
244 pubsub_node,
245 )
246 defer.returnValue(progress_id)
247
248 @defer.inlineCallbacks
249 def recursive_import(
250 self,
251 client,
252 import_handler,
253 items_import_data,
254 progress_id,
255 session,
256 options,
257 return_data=None,
258 service=None,
259 node=None,
260 depth=0,
261 ):
262 """Do the import recursively
263
264 @param import_handler(object): instance of the import handler
265 @param items_import_data(iterable): iterable of data as specified in [register]
266 @param progress_id(unicode): id of progression
267 @param session(dict): data for this import session
268 can be used by importer so store any useful data
269 "root_service" and "root_node" are set to the main pubsub service and node of the import
270 @param options(dict): import options
271 @param return_data(dict): data to return on progress_finished
272 @param service(jid.JID, None): PubSub service to use
273 @param node(unicode, None): PubSub node to use
274 @param depth(int): level of recursion
275 """
276 if return_data is None:
277 return_data = {}
278 for idx, item_import_data in enumerate(items_import_data):
279 item_data = yield import_handler.import_item(
280 client, item_import_data, session, options, return_data, service, node
281 )
282 yield import_handler.item_filters(client, item_data, session, options)
283 recurse_kwargs = yield import_handler.import_sub_items(
284 client, item_import_data, item_data, session, options
285 )
286 yield import_handler.publish_item(client, item_data, service, node, session)
287
288 if recurse_kwargs is not None:
289 recurse_kwargs["client"] = client
290 recurse_kwargs["import_handler"] = import_handler
291 recurse_kwargs["progress_id"] = progress_id
292 recurse_kwargs["session"] = session
293 recurse_kwargs.setdefault("options", options)
294 recurse_kwargs["return_data"] = return_data
295 recurse_kwargs["depth"] = depth + 1
296 log.debug(_("uploading subitems"))
297 yield self.recursive_import(**recurse_kwargs)
298
299 if depth == 0:
300 client._import[import_handler.name][progress_id]["position"] = str(
301 idx + 1
302 )
303
304 if depth == 0:
305 self.host.bridge.progress_finished(progress_id, return_data, client.profile)
306 self.host.remove_progress_cb(progress_id, client.profile)
307 del client._import[import_handler.name][progress_id]
308
309 def register(self, import_handler, name, callback, short_desc="", long_desc=""):
310 """Register an Importer method
311
312 @param name(unicode): unique importer name, should indicate the software it can import and always lowercase
313 @param callback(callable): method to call:
314 the signature must be (client, location, options) (cf. [do_import])
315 the importer must return a tuple with (items_import_data, items_count)
316 items_import_data(iterable[dict]) data specific to specialized importer
317 cf. import_item docstring of specialized importer for details
318 items_count (int, None) indicate the total number of items (without subitems)
319 useful to display a progress indicator when the iterator is a generator
320 use None if you can't guess the total number of items
321 @param short_desc(unicode): one line description of the importer
322 @param long_desc(unicode): long description of the importer, its options, etc.
323 """
324 name = name.lower()
325 if name in import_handler.importers:
326 raise exceptions.ConflictError(
327 _(
328 "An {handler_name} importer with the name {name} already exist"
329 ).format(handler_name=import_handler.name, name=name)
330 )
331 import_handler.importers[name] = Importer(callback, short_desc, long_desc)
332
333 def unregister(self, import_handler, name):
334 del import_handler.importers[name]