diff libervia/backend/plugins/plugin_blog_import_dotclear.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_blog_import_dotclear.py@524856bd7b19
children 73d83cb53673
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_blog_import_dotclear.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+
+
+# SàT plugin for import external blogs
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from libervia.backend.core import exceptions
+from libervia.backend.tools.common import data_format
+from twisted.internet import threads
+from collections import OrderedDict
+import itertools
+import time
+import cgi
+import os.path
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Dotclear import",
+    C.PI_IMPORT_NAME: "IMPORT_DOTCLEAR",
+    C.PI_TYPE: C.PLUG_TYPE_BLOG,
+    C.PI_DEPENDENCIES: ["BLOG_IMPORT"],
+    C.PI_MAIN: "DotclearImport",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine."""),
+}
+
+SHORT_DESC = D_("import posts from Dotclear blog engine")
+
+LONG_DESC = D_(
+    """This importer handle Dotclear blog engine.
+
+To use it, you'll need to export your blog to a flat file.
+You must go in your admin interface and select Plugins/Maintenance then Backup.
+Export only one blog if you have many, i.e. select "Download database of current blog"
+Depending on your configuration, your may need to use import/Export plugin and export as a flat file.
+
+location: you must use the absolute path to your backup for the location parameter
+"""
+)
+POST_ID_PREFIX = "sat_dc_"
+KNOWN_DATA_TYPES = (
+    "link",
+    "setting",
+    "post",
+    "meta",
+    "media",
+    "post_media",
+    "comment",
+    "captcha",
+)
+ESCAPE_MAP = {"r": "\r", "n": "\n", '"': '"', "\\": "\\"}
+
+
+class DotclearParser(object):
+    # XXX: we have to parse all file to build data
+    #      this can be ressource intensive on huge blogs
+
+    def __init__(self):
+        self.posts_data = OrderedDict()
+        self.tags = {}
+
+    def get_post_id(self, post):
+        """Return a unique and constant post id
+
+        @param post(dict): parsed post data
+        @return (unicode): post unique item id
+        """
+        return "{}_{}_{}_{}:{}".format(
+            POST_ID_PREFIX,
+            post["blog_id"],
+            post["user_id"],
+            post["post_id"],
+            post["post_url"],
+        )
+
+    def get_comment_id(self, comment):
+        """Return a unique and constant comment id
+
+        @param comment(dict): parsed comment
+        @return (unicode): comment unique comment id
+        """
+        post_id = comment["post_id"]
+        parent_item_id = self.posts_data[post_id]["blog"]["id"]
+        return "{}_comment_{}".format(parent_item_id, comment["comment_id"])
+
+    def getTime(self, data, key):
+        """Parse time as given by dotclear, with timezone handling
+
+        @param data(dict): dotclear data (post or comment)
+        @param key(unicode): key to get (e.g. "post_creadt")
+        @return (float): Unix time
+        """
+        return time.mktime(time.strptime(data[key], "%Y-%m-%d %H:%M:%S"))
+
+    def read_fields(self, fields_data):
+        buf = []
+        idx = 0
+        while True:
+            if fields_data[idx] != '"':
+                raise exceptions.ParsingError
+            while True:
+                idx += 1
+                try:
+                    char = fields_data[idx]
+                except IndexError:
+                    raise exceptions.ParsingError("Data was expected")
+                if char == '"':
+                    # we have reached the end of this field,
+                    # we try to parse a new one
+                    yield "".join(buf)
+                    buf = []
+                    idx += 1
+                    try:
+                        separator = fields_data[idx]
+                    except IndexError:
+                        return
+                    if separator != ",":
+                        raise exceptions.ParsingError("Field separator was expeceted")
+                    idx += 1
+                    break  # we have a new field
+                elif char == "\\":
+                    idx += 1
+                    try:
+                        char = ESCAPE_MAP[fields_data[idx]]
+                    except IndexError:
+                        raise exceptions.ParsingError("Escaped char was expected")
+                    except KeyError:
+                        char = fields_data[idx]
+                        log.warning("Unknown key to escape: {}".format(char))
+                buf.append(char)
+
+    def parseFields(self, headers, data):
+        return dict(zip(headers, self.read_fields(data)))
+
+    def post_handler(self, headers, data, index):
+        post = self.parseFields(headers, data)
+        log.debug("({}) post found: {}".format(index, post["post_title"]))
+        mb_data = {
+            "id": self.get_post_id(post),
+            "published": self.getTime(post, "post_creadt"),
+            "updated": self.getTime(post, "post_upddt"),
+            "author": post["user_id"],  # there use info are not in the archive
+            # TODO: option to specify user info
+            "content_xhtml": "{}{}".format(
+                post["post_content_xhtml"], post["post_excerpt_xhtml"]
+            ),
+            "title": post["post_title"],
+            "allow_comments": C.bool_const(bool(int(post["post_open_comment"]))),
+        }
+        self.posts_data[post["post_id"]] = {
+            "blog": mb_data,
+            "comments": [[]],
+            "url": "/post/{}".format(post["post_url"]),
+        }
+
+    def meta_handler(self, headers, data, index):
+        meta = self.parseFields(headers, data)
+        if meta["meta_type"] == "tag":
+            tags = self.tags.setdefault(meta["post_id"], set())
+            tags.add(meta["meta_id"])
+
+    def meta_finished_handler(self):
+        for post_id, tags in self.tags.items():
+            data_format.iter2dict("tag", tags, self.posts_data[post_id]["blog"])
+        del self.tags
+
+    def comment_handler(self, headers, data, index):
+        comment = self.parseFields(headers, data)
+        if comment["comment_site"]:
+            # we don't use atom:uri because it's used for jid in XMPP
+            content = '{}\n<hr>\n<a href="{}">author website</a>'.format(
+                comment["comment_content"],
+                cgi.escape(comment["comment_site"]).replace('"', "%22"),
+            )
+        else:
+            content = comment["comment_content"]
+        mb_data = {
+            "id": self.get_comment_id(comment),
+            "published": self.getTime(comment, "comment_dt"),
+            "updated": self.getTime(comment, "comment_upddt"),
+            "author": comment["comment_author"],
+            # we don't keep email addresses to avoid the author to be spammed
+            # (they would be available publicly else)
+            # 'author_email': comment['comment_email'],
+            "content_xhtml": content,
+        }
+        self.posts_data[comment["post_id"]]["comments"][0].append(
+            {"blog": mb_data, "comments": [[]]}
+        )
+
+    def parse(self, db_path):
+        with open(db_path) as f:
+            signature = f.readline()
+            try:
+                version = signature.split("|")[1]
+            except IndexError:
+                version = None
+            log.debug("Dotclear version: {}".format(version))
+            data_type = None
+            data_headers = None
+            index = None
+            while True:
+                buf = f.readline()
+                if not buf:
+                    break
+                if buf.startswith("["):
+                    header = buf.split(" ", 1)
+                    data_type = header[0][1:]
+                    if data_type not in KNOWN_DATA_TYPES:
+                        log.warning("unkown data type: {}".format(data_type))
+                    index = 0
+                    try:
+                        data_headers = header[1].split(",")
+                        # we need to remove the ']' from the last header
+                        last_header = data_headers[-1]
+                        data_headers[-1] = last_header[: last_header.rfind("]")]
+                    except IndexError:
+                        log.warning("Can't read data)")
+                else:
+                    if data_type is None:
+                        continue
+                    buf = buf.strip()
+                    if not buf and data_type in KNOWN_DATA_TYPES:
+                        try:
+                            finished_handler = getattr(
+                                self, "{}FinishedHandler".format(data_type)
+                            )
+                        except AttributeError:
+                            pass
+                        else:
+                            finished_handler()
+                        log.debug("{} data finished".format(data_type))
+                        data_type = None
+                        continue
+                    assert data_type
+                    try:
+                        fields_handler = getattr(self, "{}Handler".format(data_type))
+                    except AttributeError:
+                        pass
+                    else:
+                        fields_handler(data_headers, buf, index)
+                    index += 1
+        return (iter(self.posts_data.values()), len(self.posts_data))
+
+
+class DotclearImport(object):
+    def __init__(self, host):
+        log.info(_("plugin Dotclear import initialization"))
+        self.host = host
+        host.plugins["BLOG_IMPORT"].register(
+            "dotclear", self.dc_import, SHORT_DESC, LONG_DESC
+        )
+
+    def dc_import(self, client, location, options=None):
+        if not os.path.isabs(location):
+            raise exceptions.DataError(
+                "An absolute path to backup data need to be given as location"
+            )
+        dc_parser = DotclearParser()
+        d = threads.deferToThread(dc_parser.parse, location)
+        return d