comparison libervia/backend/plugins/plugin_blog_import_dotclear.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_blog_import_dotclear.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SàT plugin for import external blogs
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.backend.core.i18n import _, D_
21 from libervia.backend.core.constants import Const as C
22 from libervia.backend.core.log import getLogger
23
24 log = getLogger(__name__)
25 from libervia.backend.core import exceptions
26 from libervia.backend.tools.common import data_format
27 from twisted.internet import threads
28 from collections import OrderedDict
29 import itertools
30 import time
31 import cgi
32 import os.path
33
34
35 PLUGIN_INFO = {
36 C.PI_NAME: "Dotclear import",
37 C.PI_IMPORT_NAME: "IMPORT_DOTCLEAR",
38 C.PI_TYPE: C.PLUG_TYPE_BLOG,
39 C.PI_DEPENDENCIES: ["BLOG_IMPORT"],
40 C.PI_MAIN: "DotclearImport",
41 C.PI_HANDLER: "no",
42 C.PI_DESCRIPTION: _("""Blog importer for Dotclear blog engine."""),
43 }
44
45 SHORT_DESC = D_("import posts from Dotclear blog engine")
46
47 LONG_DESC = D_(
48 """This importer handle Dotclear blog engine.
49
50 To use it, you'll need to export your blog to a flat file.
51 You must go in your admin interface and select Plugins/Maintenance then Backup.
52 Export only one blog if you have many, i.e. select "Download database of current blog"
53 Depending on your configuration, your may need to use import/Export plugin and export as a flat file.
54
55 location: you must use the absolute path to your backup for the location parameter
56 """
57 )
58 POST_ID_PREFIX = "sat_dc_"
59 KNOWN_DATA_TYPES = (
60 "link",
61 "setting",
62 "post",
63 "meta",
64 "media",
65 "post_media",
66 "comment",
67 "captcha",
68 )
69 ESCAPE_MAP = {"r": "\r", "n": "\n", '"': '"', "\\": "\\"}
70
71
72 class DotclearParser(object):
73 # XXX: we have to parse all file to build data
74 # this can be ressource intensive on huge blogs
75
76 def __init__(self):
77 self.posts_data = OrderedDict()
78 self.tags = {}
79
80 def get_post_id(self, post):
81 """Return a unique and constant post id
82
83 @param post(dict): parsed post data
84 @return (unicode): post unique item id
85 """
86 return "{}_{}_{}_{}:{}".format(
87 POST_ID_PREFIX,
88 post["blog_id"],
89 post["user_id"],
90 post["post_id"],
91 post["post_url"],
92 )
93
94 def get_comment_id(self, comment):
95 """Return a unique and constant comment id
96
97 @param comment(dict): parsed comment
98 @return (unicode): comment unique comment id
99 """
100 post_id = comment["post_id"]
101 parent_item_id = self.posts_data[post_id]["blog"]["id"]
102 return "{}_comment_{}".format(parent_item_id, comment["comment_id"])
103
104 def getTime(self, data, key):
105 """Parse time as given by dotclear, with timezone handling
106
107 @param data(dict): dotclear data (post or comment)
108 @param key(unicode): key to get (e.g. "post_creadt")
109 @return (float): Unix time
110 """
111 return time.mktime(time.strptime(data[key], "%Y-%m-%d %H:%M:%S"))
112
113 def read_fields(self, fields_data):
114 buf = []
115 idx = 0
116 while True:
117 if fields_data[idx] != '"':
118 raise exceptions.ParsingError
119 while True:
120 idx += 1
121 try:
122 char = fields_data[idx]
123 except IndexError:
124 raise exceptions.ParsingError("Data was expected")
125 if char == '"':
126 # we have reached the end of this field,
127 # we try to parse a new one
128 yield "".join(buf)
129 buf = []
130 idx += 1
131 try:
132 separator = fields_data[idx]
133 except IndexError:
134 return
135 if separator != ",":
136 raise exceptions.ParsingError("Field separator was expeceted")
137 idx += 1
138 break # we have a new field
139 elif char == "\\":
140 idx += 1
141 try:
142 char = ESCAPE_MAP[fields_data[idx]]
143 except IndexError:
144 raise exceptions.ParsingError("Escaped char was expected")
145 except KeyError:
146 char = fields_data[idx]
147 log.warning("Unknown key to escape: {}".format(char))
148 buf.append(char)
149
150 def parseFields(self, headers, data):
151 return dict(zip(headers, self.read_fields(data)))
152
153 def post_handler(self, headers, data, index):
154 post = self.parseFields(headers, data)
155 log.debug("({}) post found: {}".format(index, post["post_title"]))
156 mb_data = {
157 "id": self.get_post_id(post),
158 "published": self.getTime(post, "post_creadt"),
159 "updated": self.getTime(post, "post_upddt"),
160 "author": post["user_id"], # there use info are not in the archive
161 # TODO: option to specify user info
162 "content_xhtml": "{}{}".format(
163 post["post_content_xhtml"], post["post_excerpt_xhtml"]
164 ),
165 "title": post["post_title"],
166 "allow_comments": C.bool_const(bool(int(post["post_open_comment"]))),
167 }
168 self.posts_data[post["post_id"]] = {
169 "blog": mb_data,
170 "comments": [[]],
171 "url": "/post/{}".format(post["post_url"]),
172 }
173
174 def meta_handler(self, headers, data, index):
175 meta = self.parseFields(headers, data)
176 if meta["meta_type"] == "tag":
177 tags = self.tags.setdefault(meta["post_id"], set())
178 tags.add(meta["meta_id"])
179
180 def meta_finished_handler(self):
181 for post_id, tags in self.tags.items():
182 data_format.iter2dict("tag", tags, self.posts_data[post_id]["blog"])
183 del self.tags
184
185 def comment_handler(self, headers, data, index):
186 comment = self.parseFields(headers, data)
187 if comment["comment_site"]:
188 # we don't use atom:uri because it's used for jid in XMPP
189 content = '{}\n<hr>\n<a href="{}">author website</a>'.format(
190 comment["comment_content"],
191 cgi.escape(comment["comment_site"]).replace('"', "%22"),
192 )
193 else:
194 content = comment["comment_content"]
195 mb_data = {
196 "id": self.get_comment_id(comment),
197 "published": self.getTime(comment, "comment_dt"),
198 "updated": self.getTime(comment, "comment_upddt"),
199 "author": comment["comment_author"],
200 # we don't keep email addresses to avoid the author to be spammed
201 # (they would be available publicly else)
202 # 'author_email': comment['comment_email'],
203 "content_xhtml": content,
204 }
205 self.posts_data[comment["post_id"]]["comments"][0].append(
206 {"blog": mb_data, "comments": [[]]}
207 )
208
209 def parse(self, db_path):
210 with open(db_path) as f:
211 signature = f.readline()
212 try:
213 version = signature.split("|")[1]
214 except IndexError:
215 version = None
216 log.debug("Dotclear version: {}".format(version))
217 data_type = None
218 data_headers = None
219 index = None
220 while True:
221 buf = f.readline()
222 if not buf:
223 break
224 if buf.startswith("["):
225 header = buf.split(" ", 1)
226 data_type = header[0][1:]
227 if data_type not in KNOWN_DATA_TYPES:
228 log.warning("unkown data type: {}".format(data_type))
229 index = 0
230 try:
231 data_headers = header[1].split(",")
232 # we need to remove the ']' from the last header
233 last_header = data_headers[-1]
234 data_headers[-1] = last_header[: last_header.rfind("]")]
235 except IndexError:
236 log.warning("Can't read data)")
237 else:
238 if data_type is None:
239 continue
240 buf = buf.strip()
241 if not buf and data_type in KNOWN_DATA_TYPES:
242 try:
243 finished_handler = getattr(
244 self, "{}FinishedHandler".format(data_type)
245 )
246 except AttributeError:
247 pass
248 else:
249 finished_handler()
250 log.debug("{} data finished".format(data_type))
251 data_type = None
252 continue
253 assert data_type
254 try:
255 fields_handler = getattr(self, "{}Handler".format(data_type))
256 except AttributeError:
257 pass
258 else:
259 fields_handler(data_headers, buf, index)
260 index += 1
261 return (iter(self.posts_data.values()), len(self.posts_data))
262
263
264 class DotclearImport(object):
265 def __init__(self, host):
266 log.info(_("plugin Dotclear import initialization"))
267 self.host = host
268 host.plugins["BLOG_IMPORT"].register(
269 "dotclear", self.dc_import, SHORT_DESC, LONG_DESC
270 )
271
272 def dc_import(self, client, location, options=None):
273 if not os.path.isabs(location):
274 raise exceptions.DataError(
275 "An absolute path to backup data need to be given as location"
276 )
277 dc_parser = DotclearParser()
278 d = threads.deferToThread(dc_parser.parse, location)
279 return d