Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_import.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_import.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SàT plugin for generic data import handling | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from libervia.backend.core.i18n import _ | |
21 from libervia.backend.core.constants import Const as C | |
22 from libervia.backend.core.log import getLogger | |
23 | |
24 log = getLogger(__name__) | |
25 from twisted.internet import defer | |
26 from libervia.backend.core import exceptions | |
27 from twisted.words.protocols.jabber import jid | |
28 from functools import partial | |
29 import collections | |
30 import uuid | |
31 import json | |
32 | |
33 | |
34 PLUGIN_INFO = { | |
35 C.PI_NAME: "import", | |
36 C.PI_IMPORT_NAME: "IMPORT", | |
37 C.PI_TYPE: C.PLUG_TYPE_IMPORT, | |
38 C.PI_DEPENDENCIES: [], | |
39 C.PI_MAIN: "ImportPlugin", | |
40 C.PI_HANDLER: "no", | |
41 C.PI_DESCRIPTION: _("""Generic import plugin, base for specialized importers"""), | |
42 } | |
43 | |
44 Importer = collections.namedtuple("Importer", ("callback", "short_desc", "long_desc")) | |
45 | |
46 | |
47 class ImportPlugin(object): | |
48 def __init__(self, host): | |
49 log.info(_("plugin import initialization")) | |
50 self.host = host | |
51 | |
52 def initialize(self, import_handler, name): | |
53 """Initialize a specialized import handler | |
54 | |
55 @param import_handler(object): specialized import handler instance | |
56 must have the following methods: | |
57 - import_item: import a single main item (i.e. prepare data for publishing) | |
58 - importSubitems: import sub items (i.e. items linked to main item, e.g. comments). | |
59 Must return a dict with kwargs for recursive_import if items are to be imported recursively. | |
60 At least "items_import_data", "service" and "node" keys must be provided. | |
61 if None is returned, no recursion will be done to import subitems, but import can still be done directly by the method. | |
62 - publish_item: actualy publish an item | |
63 - item_filters: modify item according to options | |
64 @param name(unicode): import handler name | |
65 """ | |
66 assert name == name.lower().strip() | |
67 log.info(_("initializing {name} import handler").format(name=name)) | |
68 import_handler.name = name | |
69 import_handler.register = partial(self.register, import_handler) | |
70 import_handler.unregister = partial(self.unregister, import_handler) | |
71 import_handler.importers = {} | |
72 | |
73 def _import(name, location, options, pubsub_service, pubsub_node, profile): | |
74 return self._do_import( | |
75 import_handler, | |
76 name, | |
77 location, | |
78 options, | |
79 pubsub_service, | |
80 pubsub_node, | |
81 profile, | |
82 ) | |
83 | |
84 def _import_list(): | |
85 return self.list_importers(import_handler) | |
86 | |
87 def _import_desc(name): | |
88 return self.getDescription(import_handler, name) | |
89 | |
90 self.host.bridge.add_method( | |
91 name + "import", | |
92 ".plugin", | |
93 in_sign="ssa{ss}sss", | |
94 out_sign="s", | |
95 method=_import, | |
96 async_=True, | |
97 ) | |
98 self.host.bridge.add_method( | |
99 name + "ImportList", | |
100 ".plugin", | |
101 in_sign="", | |
102 out_sign="a(ss)", | |
103 method=_import_list, | |
104 ) | |
105 self.host.bridge.add_method( | |
106 name + "ImportDesc", | |
107 ".plugin", | |
108 in_sign="s", | |
109 out_sign="(ss)", | |
110 method=_import_desc, | |
111 ) | |
112 | |
113 def get_progress(self, import_handler, progress_id, profile): | |
114 client = self.host.get_client(profile) | |
115 return client._import[import_handler.name][progress_id] | |
116 | |
117 def list_importers(self, import_handler): | |
118 importers = list(import_handler.importers.keys()) | |
119 importers.sort() | |
120 return [ | |
121 (name, import_handler.importers[name].short_desc) | |
122 for name in import_handler.importers | |
123 ] | |
124 | |
125 def getDescription(self, import_handler, name): | |
126 """Return import short and long descriptions | |
127 | |
128 @param name(unicode): importer name | |
129 @return (tuple[unicode,unicode]): short and long description | |
130 """ | |
131 try: | |
132 importer = import_handler.importers[name] | |
133 except KeyError: | |
134 raise exceptions.NotFound( | |
135 "{handler_name} importer not found [{name}]".format( | |
136 handler_name=import_handler.name, name=name | |
137 ) | |
138 ) | |
139 else: | |
140 return importer.short_desc, importer.long_desc | |
141 | |
142 def _do_import(self, import_handler, name, location, options, pubsub_service="", | |
143 pubsub_node="", profile=C.PROF_KEY_NONE): | |
144 client = self.host.get_client(profile) | |
145 options = {key: str(value) for key, value in options.items()} | |
146 for option in import_handler.BOOL_OPTIONS: | |
147 try: | |
148 options[option] = C.bool(options[option]) | |
149 except KeyError: | |
150 pass | |
151 for option in import_handler.JSON_OPTIONS: | |
152 try: | |
153 options[option] = json.loads(options[option]) | |
154 except KeyError: | |
155 pass | |
156 except ValueError: | |
157 raise exceptions.DataError( | |
158 _("invalid json option: {option}").format(option=option) | |
159 ) | |
160 pubsub_service = jid.JID(pubsub_service) if pubsub_service else None | |
161 return self.do_import( | |
162 client, | |
163 import_handler, | |
164 str(name), | |
165 str(location), | |
166 options, | |
167 pubsub_service, | |
168 pubsub_node or None, | |
169 ) | |
170 | |
171 @defer.inlineCallbacks | |
172 def do_import(self, client, import_handler, name, location, options=None, | |
173 pubsub_service=None, pubsub_node=None,): | |
174 """import data | |
175 | |
176 @param import_handler(object): instance of the import handler | |
177 @param name(unicode): name of the importer | |
178 @param location(unicode): location of the data to import | |
179 can be an url, a file path, or anything which make sense | |
180 check importer description for more details | |
181 @param options(dict, None): extra options. | |
182 @param pubsub_service(jid.JID, None): jid of the PubSub service where data must be | |
183 imported. | |
184 None to use profile's server | |
185 @param pubsub_node(unicode, None): PubSub node to use | |
186 None to use importer's default node | |
187 @return (unicode): progress id | |
188 """ | |
189 if options is None: | |
190 options = {} | |
191 else: | |
192 for opt_name, opt_default in import_handler.OPT_DEFAULTS.items(): | |
193 # we want a filled options dict, with all empty or False values removed | |
194 try: | |
195 value = options[opt_name] | |
196 except KeyError: | |
197 if opt_default: | |
198 options[opt_name] = opt_default | |
199 else: | |
200 if not value: | |
201 del options[opt_name] | |
202 | |
203 try: | |
204 importer = import_handler.importers[name] | |
205 except KeyError: | |
206 raise exceptions.NotFound("Importer [{}] not found".format(name)) | |
207 items_import_data, items_count = yield importer.callback( | |
208 client, location, options | |
209 ) | |
210 progress_id = str(uuid.uuid4()) | |
211 try: | |
212 _import = client._import | |
213 except AttributeError: | |
214 _import = client._import = {} | |
215 progress_data = _import.setdefault(import_handler.name, {}) | |
216 progress_data[progress_id] = {"position": "0"} | |
217 if items_count is not None: | |
218 progress_data[progress_id]["size"] = str(items_count) | |
219 metadata = { | |
220 "name": "{}: {}".format(name, location), | |
221 "direction": "out", | |
222 "type": import_handler.name.upper() + "_IMPORT", | |
223 } | |
224 self.host.register_progress_cb( | |
225 progress_id, | |
226 partial(self.get_progress, import_handler), | |
227 metadata, | |
228 profile=client.profile, | |
229 ) | |
230 self.host.bridge.progress_started(progress_id, metadata, client.profile) | |
231 session = { # session data, can be used by importers | |
232 "root_service": pubsub_service, | |
233 "root_node": pubsub_node, | |
234 } | |
235 self.recursive_import( | |
236 client, | |
237 import_handler, | |
238 items_import_data, | |
239 progress_id, | |
240 session, | |
241 options, | |
242 None, | |
243 pubsub_service, | |
244 pubsub_node, | |
245 ) | |
246 defer.returnValue(progress_id) | |
247 | |
248 @defer.inlineCallbacks | |
249 def recursive_import( | |
250 self, | |
251 client, | |
252 import_handler, | |
253 items_import_data, | |
254 progress_id, | |
255 session, | |
256 options, | |
257 return_data=None, | |
258 service=None, | |
259 node=None, | |
260 depth=0, | |
261 ): | |
262 """Do the import recursively | |
263 | |
264 @param import_handler(object): instance of the import handler | |
265 @param items_import_data(iterable): iterable of data as specified in [register] | |
266 @param progress_id(unicode): id of progression | |
267 @param session(dict): data for this import session | |
268 can be used by importer so store any useful data | |
269 "root_service" and "root_node" are set to the main pubsub service and node of the import | |
270 @param options(dict): import options | |
271 @param return_data(dict): data to return on progress_finished | |
272 @param service(jid.JID, None): PubSub service to use | |
273 @param node(unicode, None): PubSub node to use | |
274 @param depth(int): level of recursion | |
275 """ | |
276 if return_data is None: | |
277 return_data = {} | |
278 for idx, item_import_data in enumerate(items_import_data): | |
279 item_data = yield import_handler.import_item( | |
280 client, item_import_data, session, options, return_data, service, node | |
281 ) | |
282 yield import_handler.item_filters(client, item_data, session, options) | |
283 recurse_kwargs = yield import_handler.import_sub_items( | |
284 client, item_import_data, item_data, session, options | |
285 ) | |
286 yield import_handler.publish_item(client, item_data, service, node, session) | |
287 | |
288 if recurse_kwargs is not None: | |
289 recurse_kwargs["client"] = client | |
290 recurse_kwargs["import_handler"] = import_handler | |
291 recurse_kwargs["progress_id"] = progress_id | |
292 recurse_kwargs["session"] = session | |
293 recurse_kwargs.setdefault("options", options) | |
294 recurse_kwargs["return_data"] = return_data | |
295 recurse_kwargs["depth"] = depth + 1 | |
296 log.debug(_("uploading subitems")) | |
297 yield self.recursive_import(**recurse_kwargs) | |
298 | |
299 if depth == 0: | |
300 client._import[import_handler.name][progress_id]["position"] = str( | |
301 idx + 1 | |
302 ) | |
303 | |
304 if depth == 0: | |
305 self.host.bridge.progress_finished(progress_id, return_data, client.profile) | |
306 self.host.remove_progress_cb(progress_id, client.profile) | |
307 del client._import[import_handler.name][progress_id] | |
308 | |
309 def register(self, import_handler, name, callback, short_desc="", long_desc=""): | |
310 """Register an Importer method | |
311 | |
312 @param name(unicode): unique importer name, should indicate the software it can import and always lowercase | |
313 @param callback(callable): method to call: | |
314 the signature must be (client, location, options) (cf. [do_import]) | |
315 the importer must return a tuple with (items_import_data, items_count) | |
316 items_import_data(iterable[dict]) data specific to specialized importer | |
317 cf. import_item docstring of specialized importer for details | |
318 items_count (int, None) indicate the total number of items (without subitems) | |
319 useful to display a progress indicator when the iterator is a generator | |
320 use None if you can't guess the total number of items | |
321 @param short_desc(unicode): one line description of the importer | |
322 @param long_desc(unicode): long description of the importer, its options, etc. | |
323 """ | |
324 name = name.lower() | |
325 if name in import_handler.importers: | |
326 raise exceptions.ConflictError( | |
327 _( | |
328 "An {handler_name} importer with the name {name} already exist" | |
329 ).format(handler_name=import_handler.name, name=name) | |
330 ) | |
331 import_handler.importers[name] = Importer(callback, short_desc, long_desc) | |
332 | |
333 def unregister(self, import_handler, name): | |
334 del import_handler.importers[name] |