Mercurial > libervia-backend
diff sat/plugins/plugin_blog_import_dotclear.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 003b8b4b56a7 |
line wrap: on
line diff
--- a/sat/plugins/plugin_blog_import_dotclear.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_blog_import_dotclear.py Wed Jun 27 20:14:46 2018 +0200 @@ -20,6 +20,7 @@ from sat.core.i18n import _, D_ from sat.core.constants import Const as C from sat.core.log import getLogger + log = getLogger(__name__) from sat.core import exceptions from sat.tools.common import data_format @@ -38,12 +39,13 @@ C.PI_DEPENDENCIES: ["BLOG_IMPORT"], C.PI_MAIN: "DotclearImport", C.PI_HANDLER: "no", - C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine.""") + C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine."""), } SHORT_DESC = D_(u"import posts from Dotclear blog engine") -LONG_DESC = D_(u"""This importer handle Dotclear blog engine. +LONG_DESC = D_( + u"""This importer handle Dotclear blog engine. To use it, you'll need to export your blog to a flat file. You must go in your admin interface and select Plugins/Maintenance then Backup. @@ -51,15 +53,20 @@ Depending on your configuration, your may need to use Import/Export plugin and export as a flat file. location: you must use the absolute path to your backup for the location parameter -""") +""" +) POST_ID_PREFIX = u"sat_dc_" -KNOWN_DATA_TYPES = ('link', 'setting', 'post', 'meta', 'media', 'post_media', 'comment', 'captcha') -ESCAPE_MAP = { - 'r': u'\r', - 'n': u'\n', - '"': u'"', - '\\': u'\\', - } +KNOWN_DATA_TYPES = ( + "link", + "setting", + "post", + "meta", + "media", + "post_media", + "comment", + "captcha", +) +ESCAPE_MAP = {"r": u"\r", "n": u"\n", '"': u'"', "\\": u"\\"} class DotclearParser(object): @@ -76,7 +83,13 @@ @param post(dict): parsed post data @return (unicode): post unique item id """ - return u"{}_{}_{}_{}:{}".format(POST_ID_PREFIX, post['blog_id'], post['user_id'], post['post_id'], post['post_url']) + return u"{}_{}_{}_{}:{}".format( + POST_ID_PREFIX, + post["blog_id"], + post["user_id"], + post["post_id"], + post["post_url"], + ) def getCommentId(self, comment): """Return a unique and constant comment id @@ -84,9 +97,9 @@ @param comment(dict): parsed comment @return (unicode): comment unique comment id """ - post_id = comment['post_id'] - parent_item_id = self.posts_data[post_id]['blog']['id'] - return u"{}_comment_{}".format(parent_item_id, comment['comment_id']) + post_id = comment["post_id"] + parent_item_id = self.posts_data[post_id]["blog"]["id"] + return u"{}_comment_{}".format(parent_item_id, comment["comment_id"]) def getTime(self, data, key): """Parse time as given by dotclear, with timezone handling @@ -112,18 +125,18 @@ if char == '"': # we have reached the end of this field, # we try to parse a new one - yield u''.join(buf) + yield u"".join(buf) buf = [] idx += 1 try: separator = fields_data[idx] except IndexError: return - if separator != u',': + if separator != u",": raise exceptions.ParsingError("Field separator was expeceted") idx += 1 - break # we have a new field - elif char == u'\\': + break # we have a new field + elif char == u"\\": idx += 1 try: char = ESCAPE_MAP[fields_data[idx]] @@ -139,55 +152,65 @@ def postHandler(self, headers, data, index): post = self.parseFields(headers, data) - log.debug(u'({}) post found: {}'.format(index, post['post_title'])) - mb_data = {'id': self.getPostId(post), - 'published': self.getTime(post, 'post_creadt'), - 'updated': self.getTime(post, 'post_upddt'), - 'author': post['user_id'], # there use info are not in the archive - # TODO: option to specify user info - 'content_xhtml': u"{}{}".format(post['post_content_xhtml'], post['post_excerpt_xhtml']), - 'title': post['post_title'], - 'allow_comments': C.boolConst(bool(int(post['post_open_comment']))), - } - self.posts_data[post['post_id']] = {'blog': mb_data, 'comments':[[]], 'url': u'/post/{}'.format(post['post_url'])} + log.debug(u"({}) post found: {}".format(index, post["post_title"])) + mb_data = { + "id": self.getPostId(post), + "published": self.getTime(post, "post_creadt"), + "updated": self.getTime(post, "post_upddt"), + "author": post["user_id"], # there use info are not in the archive + # TODO: option to specify user info + "content_xhtml": u"{}{}".format( + post["post_content_xhtml"], post["post_excerpt_xhtml"] + ), + "title": post["post_title"], + "allow_comments": C.boolConst(bool(int(post["post_open_comment"]))), + } + self.posts_data[post["post_id"]] = { + "blog": mb_data, + "comments": [[]], + "url": u"/post/{}".format(post["post_url"]), + } def metaHandler(self, headers, data, index): meta = self.parseFields(headers, data) - if meta['meta_type'] == 'tag': - tags = self.tags.setdefault(meta['post_id'], set()) - tags.add(meta['meta_id']) + if meta["meta_type"] == "tag": + tags = self.tags.setdefault(meta["post_id"], set()) + tags.add(meta["meta_id"]) def metaFinishedHandler(self): for post_id, tags in self.tags.iteritems(): - data_format.iter2dict('tag', tags, self.posts_data[post_id]['blog']) + data_format.iter2dict("tag", tags, self.posts_data[post_id]["blog"]) del self.tags def commentHandler(self, headers, data, index): comment = self.parseFields(headers, data) - if comment['comment_site']: + if comment["comment_site"]: # we don't use atom:uri because it's used for jid in XMPP content = u'{}\n<hr>\n<a href="{}">author website</a>'.format( - comment['comment_content'], - cgi.escape(comment['comment_site']).replace('"', u'%22')) + comment["comment_content"], + cgi.escape(comment["comment_site"]).replace('"', u"%22"), + ) else: - content = comment['comment_content'] - mb_data = {'id': self.getCommentId(comment), - 'published': self.getTime(comment, 'comment_dt'), - 'updated': self.getTime(comment, 'comment_upddt'), - 'author': comment['comment_author'], - # we don't keep email addresses to avoid the author to be spammed - # (they would be available publicly else) - # 'author_email': comment['comment_email'], - 'content_xhtml': content, - } - self.posts_data[comment['post_id']]['comments'][0].append( - {'blog': mb_data, 'comments': [[]]}) + content = comment["comment_content"] + mb_data = { + "id": self.getCommentId(comment), + "published": self.getTime(comment, "comment_dt"), + "updated": self.getTime(comment, "comment_upddt"), + "author": comment["comment_author"], + # we don't keep email addresses to avoid the author to be spammed + # (they would be available publicly else) + # 'author_email': comment['comment_email'], + "content_xhtml": content, + } + self.posts_data[comment["post_id"]]["comments"][0].append( + {"blog": mb_data, "comments": [[]]} + ) def parse(self, db_path): with open(db_path) as f: - signature = f.readline().decode('utf-8') + signature = f.readline().decode("utf-8") try: - version = signature.split('|')[1] + version = signature.split("|")[1] except IndexError: version = None log.debug(u"Dotclear version: {}".format(version)) @@ -195,20 +218,20 @@ data_headers = None index = None while True: - buf = f.readline().decode('utf-8') + buf = f.readline().decode("utf-8") if not buf: break - if buf.startswith('['): - header = buf.split(' ', 1) + if buf.startswith("["): + header = buf.split(" ", 1) data_type = header[0][1:] if data_type not in KNOWN_DATA_TYPES: log.warning(u"unkown data type: {}".format(data_type)) index = 0 try: - data_headers = header[1].split(',') + data_headers = header[1].split(",") # we need to remove the ']' from the last header last_header = data_headers[-1] - data_headers[-1] = last_header[:last_header.rfind(']')] + data_headers[-1] = last_header[: last_header.rfind("]")] except IndexError: log.warning(u"Can't read data)") else: @@ -217,7 +240,9 @@ buf = buf.strip() if not buf and data_type in KNOWN_DATA_TYPES: try: - finished_handler = getattr(self, '{}FinishedHandler'.format(data_type)) + finished_handler = getattr( + self, "{}FinishedHandler".format(data_type) + ) except AttributeError: pass else: @@ -227,7 +252,7 @@ continue assert data_type try: - fields_handler = getattr(self, '{}Handler'.format(data_type)) + fields_handler = getattr(self, "{}Handler".format(data_type)) except AttributeError: pass else: @@ -237,15 +262,18 @@ class DotclearImport(object): - def __init__(self, host): log.info(_("plugin Dotclear Import initialization")) self.host = host - host.plugins['BLOG_IMPORT'].register('dotclear', self.DcImport, SHORT_DESC, LONG_DESC) + host.plugins["BLOG_IMPORT"].register( + "dotclear", self.DcImport, SHORT_DESC, LONG_DESC + ) def DcImport(self, client, location, options=None): if not os.path.isabs(location): - raise exceptions.DataError(u"An absolute path to backup data need to be given as location") + raise exceptions.DataError( + u"An absolute path to backup data need to be given as location" + ) dc_parser = DotclearParser() d = threads.deferToThread(dc_parser.parse, location) return d