Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_blog_import_dotclear.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_blog_import_dotclear.py@524856bd7b19 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_blog_import_dotclear.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 + + +# SàT plugin for import external blogs +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger + +log = getLogger(__name__) +from libervia.backend.core import exceptions +from libervia.backend.tools.common import data_format +from twisted.internet import threads +from collections import OrderedDict +import itertools +import time +import cgi +import os.path + + +PLUGIN_INFO = { + C.PI_NAME: "Dotclear import", + C.PI_IMPORT_NAME: "IMPORT_DOTCLEAR", + C.PI_TYPE: C.PLUG_TYPE_BLOG, + C.PI_DEPENDENCIES: ["BLOG_IMPORT"], + C.PI_MAIN: "DotclearImport", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine."""), +} + +SHORT_DESC = D_("import posts from Dotclear blog engine") + +LONG_DESC = D_( + """This importer handle Dotclear blog engine. + +To use it, you'll need to export your blog to a flat file. +You must go in your admin interface and select Plugins/Maintenance then Backup. +Export only one blog if you have many, i.e. select "Download database of current blog" +Depending on your configuration, your may need to use import/Export plugin and export as a flat file. + +location: you must use the absolute path to your backup for the location parameter +""" +) +POST_ID_PREFIX = "sat_dc_" +KNOWN_DATA_TYPES = ( + "link", + "setting", + "post", + "meta", + "media", + "post_media", + "comment", + "captcha", +) +ESCAPE_MAP = {"r": "\r", "n": "\n", '"': '"', "\\": "\\"} + + +class DotclearParser(object): + # XXX: we have to parse all file to build data + # this can be ressource intensive on huge blogs + + def __init__(self): + self.posts_data = OrderedDict() + self.tags = {} + + def get_post_id(self, post): + """Return a unique and constant post id + + @param post(dict): parsed post data + @return (unicode): post unique item id + """ + return "{}_{}_{}_{}:{}".format( + POST_ID_PREFIX, + post["blog_id"], + post["user_id"], + post["post_id"], + post["post_url"], + ) + + def get_comment_id(self, comment): + """Return a unique and constant comment id + + @param comment(dict): parsed comment + @return (unicode): comment unique comment id + """ + post_id = comment["post_id"] + parent_item_id = self.posts_data[post_id]["blog"]["id"] + return "{}_comment_{}".format(parent_item_id, comment["comment_id"]) + + def getTime(self, data, key): + """Parse time as given by dotclear, with timezone handling + + @param data(dict): dotclear data (post or comment) + @param key(unicode): key to get (e.g. "post_creadt") + @return (float): Unix time + """ + return time.mktime(time.strptime(data[key], "%Y-%m-%d %H:%M:%S")) + + def read_fields(self, fields_data): + buf = [] + idx = 0 + while True: + if fields_data[idx] != '"': + raise exceptions.ParsingError + while True: + idx += 1 + try: + char = fields_data[idx] + except IndexError: + raise exceptions.ParsingError("Data was expected") + if char == '"': + # we have reached the end of this field, + # we try to parse a new one + yield "".join(buf) + buf = [] + idx += 1 + try: + separator = fields_data[idx] + except IndexError: + return + if separator != ",": + raise exceptions.ParsingError("Field separator was expeceted") + idx += 1 + break # we have a new field + elif char == "\\": + idx += 1 + try: + char = ESCAPE_MAP[fields_data[idx]] + except IndexError: + raise exceptions.ParsingError("Escaped char was expected") + except KeyError: + char = fields_data[idx] + log.warning("Unknown key to escape: {}".format(char)) + buf.append(char) + + def parseFields(self, headers, data): + return dict(zip(headers, self.read_fields(data))) + + def post_handler(self, headers, data, index): + post = self.parseFields(headers, data) + log.debug("({}) post found: {}".format(index, post["post_title"])) + mb_data = { + "id": self.get_post_id(post), + "published": self.getTime(post, "post_creadt"), + "updated": self.getTime(post, "post_upddt"), + "author": post["user_id"], # there use info are not in the archive + # TODO: option to specify user info + "content_xhtml": "{}{}".format( + post["post_content_xhtml"], post["post_excerpt_xhtml"] + ), + "title": post["post_title"], + "allow_comments": C.bool_const(bool(int(post["post_open_comment"]))), + } + self.posts_data[post["post_id"]] = { + "blog": mb_data, + "comments": [[]], + "url": "/post/{}".format(post["post_url"]), + } + + def meta_handler(self, headers, data, index): + meta = self.parseFields(headers, data) + if meta["meta_type"] == "tag": + tags = self.tags.setdefault(meta["post_id"], set()) + tags.add(meta["meta_id"]) + + def meta_finished_handler(self): + for post_id, tags in self.tags.items(): + data_format.iter2dict("tag", tags, self.posts_data[post_id]["blog"]) + del self.tags + + def comment_handler(self, headers, data, index): + comment = self.parseFields(headers, data) + if comment["comment_site"]: + # we don't use atom:uri because it's used for jid in XMPP + content = '{}\n<hr>\n<a href="{}">author website</a>'.format( + comment["comment_content"], + cgi.escape(comment["comment_site"]).replace('"', "%22"), + ) + else: + content = comment["comment_content"] + mb_data = { + "id": self.get_comment_id(comment), + "published": self.getTime(comment, "comment_dt"), + "updated": self.getTime(comment, "comment_upddt"), + "author": comment["comment_author"], + # we don't keep email addresses to avoid the author to be spammed + # (they would be available publicly else) + # 'author_email': comment['comment_email'], + "content_xhtml": content, + } + self.posts_data[comment["post_id"]]["comments"][0].append( + {"blog": mb_data, "comments": [[]]} + ) + + def parse(self, db_path): + with open(db_path) as f: + signature = f.readline() + try: + version = signature.split("|")[1] + except IndexError: + version = None + log.debug("Dotclear version: {}".format(version)) + data_type = None + data_headers = None + index = None + while True: + buf = f.readline() + if not buf: + break + if buf.startswith("["): + header = buf.split(" ", 1) + data_type = header[0][1:] + if data_type not in KNOWN_DATA_TYPES: + log.warning("unkown data type: {}".format(data_type)) + index = 0 + try: + data_headers = header[1].split(",") + # we need to remove the ']' from the last header + last_header = data_headers[-1] + data_headers[-1] = last_header[: last_header.rfind("]")] + except IndexError: + log.warning("Can't read data)") + else: + if data_type is None: + continue + buf = buf.strip() + if not buf and data_type in KNOWN_DATA_TYPES: + try: + finished_handler = getattr( + self, "{}FinishedHandler".format(data_type) + ) + except AttributeError: + pass + else: + finished_handler() + log.debug("{} data finished".format(data_type)) + data_type = None + continue + assert data_type + try: + fields_handler = getattr(self, "{}Handler".format(data_type)) + except AttributeError: + pass + else: + fields_handler(data_headers, buf, index) + index += 1 + return (iter(self.posts_data.values()), len(self.posts_data)) + + +class DotclearImport(object): + def __init__(self, host): + log.info(_("plugin Dotclear import initialization")) + self.host = host + host.plugins["BLOG_IMPORT"].register( + "dotclear", self.dc_import, SHORT_DESC, LONG_DESC + ) + + def dc_import(self, client, location, options=None): + if not os.path.isabs(location): + raise exceptions.DataError( + "An absolute path to backup data need to be given as location" + ) + dc_parser = DotclearParser() + d = threads.deferToThread(dc_parser.parse, location) + return d