diff sat/plugins/plugin_blog_import_dotclear.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 003b8b4b56a7
line wrap: on
line diff
--- a/sat/plugins/plugin_blog_import_dotclear.py	Wed Jun 27 07:51:29 2018 +0200
+++ b/sat/plugins/plugin_blog_import_dotclear.py	Wed Jun 27 20:14:46 2018 +0200
@@ -20,6 +20,7 @@
 from sat.core.i18n import _, D_
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
+
 log = getLogger(__name__)
 from sat.core import exceptions
 from sat.tools.common import data_format
@@ -38,12 +39,13 @@
     C.PI_DEPENDENCIES: ["BLOG_IMPORT"],
     C.PI_MAIN: "DotclearImport",
     C.PI_HANDLER: "no",
-    C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine.""")
+    C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine."""),
 }
 
 SHORT_DESC = D_(u"import posts from Dotclear blog engine")
 
-LONG_DESC = D_(u"""This importer handle Dotclear blog engine.
+LONG_DESC = D_(
+    u"""This importer handle Dotclear blog engine.
 
 To use it, you'll need to export your blog to a flat file.
 You must go in your admin interface and select Plugins/Maintenance then Backup.
@@ -51,15 +53,20 @@
 Depending on your configuration, your may need to use Import/Export plugin and export as a flat file.
 
 location: you must use the absolute path to your backup for the location parameter
-""")
+"""
+)
 POST_ID_PREFIX = u"sat_dc_"
-KNOWN_DATA_TYPES = ('link', 'setting', 'post', 'meta', 'media', 'post_media', 'comment', 'captcha')
-ESCAPE_MAP = {
-    'r': u'\r',
-    'n': u'\n',
-    '"': u'"',
-    '\\': u'\\',
-    }
+KNOWN_DATA_TYPES = (
+    "link",
+    "setting",
+    "post",
+    "meta",
+    "media",
+    "post_media",
+    "comment",
+    "captcha",
+)
+ESCAPE_MAP = {"r": u"\r", "n": u"\n", '"': u'"', "\\": u"\\"}
 
 
 class DotclearParser(object):
@@ -76,7 +83,13 @@
         @param post(dict): parsed post data
         @return (unicode): post unique item id
         """
-        return u"{}_{}_{}_{}:{}".format(POST_ID_PREFIX, post['blog_id'], post['user_id'], post['post_id'], post['post_url'])
+        return u"{}_{}_{}_{}:{}".format(
+            POST_ID_PREFIX,
+            post["blog_id"],
+            post["user_id"],
+            post["post_id"],
+            post["post_url"],
+        )
 
     def getCommentId(self, comment):
         """Return a unique and constant comment id
@@ -84,9 +97,9 @@
         @param comment(dict): parsed comment
         @return (unicode): comment unique comment id
         """
-        post_id = comment['post_id']
-        parent_item_id = self.posts_data[post_id]['blog']['id']
-        return u"{}_comment_{}".format(parent_item_id, comment['comment_id'])
+        post_id = comment["post_id"]
+        parent_item_id = self.posts_data[post_id]["blog"]["id"]
+        return u"{}_comment_{}".format(parent_item_id, comment["comment_id"])
 
     def getTime(self, data, key):
         """Parse time as given by dotclear, with timezone handling
@@ -112,18 +125,18 @@
                 if char == '"':
                     # we have reached the end of this field,
                     # we try to parse a new one
-                    yield u''.join(buf)
+                    yield u"".join(buf)
                     buf = []
                     idx += 1
                     try:
                         separator = fields_data[idx]
                     except IndexError:
                         return
-                    if separator != u',':
+                    if separator != u",":
                         raise exceptions.ParsingError("Field separator was expeceted")
                     idx += 1
-                    break # we have a new field
-                elif char == u'\\':
+                    break  # we have a new field
+                elif char == u"\\":
                     idx += 1
                     try:
                         char = ESCAPE_MAP[fields_data[idx]]
@@ -139,55 +152,65 @@
 
     def postHandler(self, headers, data, index):
         post = self.parseFields(headers, data)
-        log.debug(u'({}) post found: {}'.format(index, post['post_title']))
-        mb_data = {'id': self.getPostId(post),
-                   'published': self.getTime(post, 'post_creadt'),
-                   'updated': self.getTime(post, 'post_upddt'),
-                   'author': post['user_id'], # there use info are not in the archive
-                                              # TODO: option to specify user info
-                   'content_xhtml': u"{}{}".format(post['post_content_xhtml'], post['post_excerpt_xhtml']),
-                   'title': post['post_title'],
-                   'allow_comments': C.boolConst(bool(int(post['post_open_comment']))),
-                    }
-        self.posts_data[post['post_id']] = {'blog': mb_data, 'comments':[[]], 'url': u'/post/{}'.format(post['post_url'])}
+        log.debug(u"({}) post found: {}".format(index, post["post_title"]))
+        mb_data = {
+            "id": self.getPostId(post),
+            "published": self.getTime(post, "post_creadt"),
+            "updated": self.getTime(post, "post_upddt"),
+            "author": post["user_id"],  # there use info are not in the archive
+            # TODO: option to specify user info
+            "content_xhtml": u"{}{}".format(
+                post["post_content_xhtml"], post["post_excerpt_xhtml"]
+            ),
+            "title": post["post_title"],
+            "allow_comments": C.boolConst(bool(int(post["post_open_comment"]))),
+        }
+        self.posts_data[post["post_id"]] = {
+            "blog": mb_data,
+            "comments": [[]],
+            "url": u"/post/{}".format(post["post_url"]),
+        }
 
     def metaHandler(self, headers, data, index):
         meta = self.parseFields(headers, data)
-        if meta['meta_type'] == 'tag':
-            tags = self.tags.setdefault(meta['post_id'], set())
-            tags.add(meta['meta_id'])
+        if meta["meta_type"] == "tag":
+            tags = self.tags.setdefault(meta["post_id"], set())
+            tags.add(meta["meta_id"])
 
     def metaFinishedHandler(self):
         for post_id, tags in self.tags.iteritems():
-            data_format.iter2dict('tag', tags, self.posts_data[post_id]['blog'])
+            data_format.iter2dict("tag", tags, self.posts_data[post_id]["blog"])
         del self.tags
 
     def commentHandler(self, headers, data, index):
         comment = self.parseFields(headers, data)
-        if comment['comment_site']:
+        if comment["comment_site"]:
             # we don't use atom:uri because it's used for jid in XMPP
             content = u'{}\n<hr>\n<a href="{}">author website</a>'.format(
-                comment['comment_content'],
-                cgi.escape(comment['comment_site']).replace('"', u'%22'))
+                comment["comment_content"],
+                cgi.escape(comment["comment_site"]).replace('"', u"%22"),
+            )
         else:
-            content = comment['comment_content']
-        mb_data = {'id': self.getCommentId(comment),
-                   'published': self.getTime(comment, 'comment_dt'),
-                   'updated': self.getTime(comment, 'comment_upddt'),
-                   'author': comment['comment_author'],
-                   # we don't keep email addresses to avoid the author to be spammed
-                   # (they would be available publicly else)
-                   # 'author_email': comment['comment_email'],
-                   'content_xhtml': content,
-                  }
-        self.posts_data[comment['post_id']]['comments'][0].append(
-            {'blog': mb_data, 'comments': [[]]})
+            content = comment["comment_content"]
+        mb_data = {
+            "id": self.getCommentId(comment),
+            "published": self.getTime(comment, "comment_dt"),
+            "updated": self.getTime(comment, "comment_upddt"),
+            "author": comment["comment_author"],
+            # we don't keep email addresses to avoid the author to be spammed
+            # (they would be available publicly else)
+            # 'author_email': comment['comment_email'],
+            "content_xhtml": content,
+        }
+        self.posts_data[comment["post_id"]]["comments"][0].append(
+            {"blog": mb_data, "comments": [[]]}
+        )
 
     def parse(self, db_path):
         with open(db_path) as f:
-            signature = f.readline().decode('utf-8')
+            signature = f.readline().decode("utf-8")
             try:
-                version = signature.split('|')[1]
+                version = signature.split("|")[1]
             except IndexError:
                 version = None
             log.debug(u"Dotclear version: {}".format(version))
@@ -195,20 +218,20 @@
             data_headers = None
             index = None
             while True:
-                buf = f.readline().decode('utf-8')
+                buf = f.readline().decode("utf-8")
                 if not buf:
                     break
-                if buf.startswith('['):
-                    header = buf.split(' ', 1)
+                if buf.startswith("["):
+                    header = buf.split(" ", 1)
                     data_type = header[0][1:]
                     if data_type not in KNOWN_DATA_TYPES:
                         log.warning(u"unkown data type: {}".format(data_type))
                     index = 0
                     try:
-                        data_headers = header[1].split(',')
+                        data_headers = header[1].split(",")
                         # we need to remove the ']' from the last header
                         last_header = data_headers[-1]
-                        data_headers[-1] = last_header[:last_header.rfind(']')]
+                        data_headers[-1] = last_header[: last_header.rfind("]")]
                     except IndexError:
                         log.warning(u"Can't read data)")
                 else:
@@ -217,7 +240,9 @@
                     buf = buf.strip()
                     if not buf and data_type in KNOWN_DATA_TYPES:
                         try:
-                            finished_handler = getattr(self, '{}FinishedHandler'.format(data_type))
+                            finished_handler = getattr(
+                                self, "{}FinishedHandler".format(data_type)
+                            )
                         except AttributeError:
                             pass
                         else:
@@ -227,7 +252,7 @@
                         continue
                     assert data_type
                     try:
-                        fields_handler = getattr(self, '{}Handler'.format(data_type))
+                        fields_handler = getattr(self, "{}Handler".format(data_type))
                     except AttributeError:
                         pass
                     else:
@@ -237,15 +262,18 @@
 
 
 class DotclearImport(object):
-
     def __init__(self, host):
         log.info(_("plugin Dotclear Import initialization"))
         self.host = host
-        host.plugins['BLOG_IMPORT'].register('dotclear', self.DcImport, SHORT_DESC, LONG_DESC)
+        host.plugins["BLOG_IMPORT"].register(
+            "dotclear", self.DcImport, SHORT_DESC, LONG_DESC
+        )
 
     def DcImport(self, client, location, options=None):
         if not os.path.isabs(location):
-            raise exceptions.DataError(u"An absolute path to backup data need to be given as location")
+            raise exceptions.DataError(
+                u"An absolute path to backup data need to be given as location"
+            )
         dc_parser = DotclearParser()
         d = threads.deferToThread(dc_parser.parse, location)
         return d