Mercurial > libervia-backend
diff sat/plugins/plugin_blog_import.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 378188abe941 |
line wrap: on
line diff
--- a/sat/plugins/plugin_blog_import.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_blog_import.py Wed Jun 27 20:14:46 2018 +0200 @@ -21,6 +21,7 @@ from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger + log = getLogger(__name__) from twisted.internet import defer from twisted.web import client as web_client @@ -41,33 +42,36 @@ C.PI_DEPENDENCIES: ["IMPORT", "XEP-0060", "XEP-0277", "TEXT-SYNTAXES", "UPLOAD"], C.PI_MAIN: "BlogImportPlugin", C.PI_HANDLER: "no", - C.PI_DESCRIPTION: _(u"""Blog import management: -This plugin manage the different blog importers which can register to it, and handle generic importing tasks.""") + C.PI_DESCRIPTION: _( + u"""Blog import management: +This plugin manage the different blog importers which can register to it, and handle generic importing tasks.""" + ), } -OPT_HOST = 'host' -OPT_UPLOAD_IMAGES = 'upload_images' -OPT_UPLOAD_IGNORE_HOST = 'upload_ignore_host' -OPT_IGNORE_TLS = 'ignore_tls_errors' -URL_REDIRECT_PREFIX = 'url_redirect_' +OPT_HOST = "host" +OPT_UPLOAD_IMAGES = "upload_images" +OPT_UPLOAD_IGNORE_HOST = "upload_ignore_host" +OPT_IGNORE_TLS = "ignore_tls_errors" +URL_REDIRECT_PREFIX = "url_redirect_" class BlogImportPlugin(object): BOOL_OPTIONS = (OPT_UPLOAD_IMAGES, OPT_IGNORE_TLS) JSON_OPTIONS = () - OPT_DEFAULTS = {OPT_UPLOAD_IMAGES: True, - OPT_IGNORE_TLS: False} + OPT_DEFAULTS = {OPT_UPLOAD_IMAGES: True, OPT_IGNORE_TLS: False} def __init__(self, host): log.info(_("plugin Blog Import initialization")) self.host = host - self._u = host.plugins['UPLOAD'] - self._p = host.plugins['XEP-0060'] - self._m = host.plugins['XEP-0277'] - self._s = self.host.plugins['TEXT-SYNTAXES'] - host.plugins['IMPORT'].initialize(self, u'blog') + self._u = host.plugins["UPLOAD"] + self._p = host.plugins["XEP-0060"] + self._m = host.plugins["XEP-0277"] + self._s = self.host.plugins["TEXT-SYNTAXES"] + host.plugins["IMPORT"].initialize(self, u"blog") - def importItem(self, client, item_import_data, session, options, return_data, service, node): + def importItem( + self, client, item_import_data, session, options, return_data, service, node + ): """importItem specialized for blog import @param item_import_data(dict): @@ -99,51 +103,58 @@ @param return_data(dict): will contain link between former posts and new items """ - mb_data = item_import_data['blog'] + mb_data = item_import_data["blog"] try: - item_id = mb_data['id'] + item_id = mb_data["id"] except KeyError: - item_id = mb_data['id'] = unicode(shortuuid.uuid()) + item_id = mb_data["id"] = unicode(shortuuid.uuid()) try: # we keep the link between old url and new blog item # so the user can redirect its former blog urls - old_uri = item_import_data['url'] + old_uri = item_import_data["url"] except KeyError: pass else: new_uri = return_data[URL_REDIRECT_PREFIX + old_uri] = self._p.getNodeURI( service if service is not None else client.jid.userhostJID(), node or self._m.namespace, - item_id) - log.info(u"url link from {old} to {new}".format( - old=old_uri, new=new_uri)) + item_id, + ) + log.info(u"url link from {old} to {new}".format(old=old_uri, new=new_uri)) return mb_data @defer.inlineCallbacks def importSubItems(self, client, item_import_data, mb_data, session, options): # comments data - if len(item_import_data['comments']) != 1: + if len(item_import_data["comments"]) != 1: raise NotImplementedError(u"can't manage multiple comment links") - allow_comments = C.bool(mb_data.get('allow_comments', C.BOOL_FALSE)) + allow_comments = C.bool(mb_data.get("allow_comments", C.BOOL_FALSE)) if allow_comments: comments_service = yield self._m.getCommentsService(client) - comments_node = self._m.getCommentsNode(mb_data['id']) - mb_data['comments_service'] = comments_service.full() - mb_data['comments_node'] = comments_node + comments_node = self._m.getCommentsNode(mb_data["id"]) + mb_data["comments_service"] = comments_service.full() + mb_data["comments_node"] = comments_node recurse_kwargs = { - 'items_import_data':item_import_data['comments'][0], - 'service':comments_service, - 'node':comments_node} + "items_import_data": item_import_data["comments"][0], + "service": comments_service, + "node": comments_node, + } defer.returnValue(recurse_kwargs) else: - if item_import_data['comments'][0]: - raise exceptions.DataError(u"allow_comments set to False, but comments are there") + if item_import_data["comments"][0]: + raise exceptions.DataError( + u"allow_comments set to False, but comments are there" + ) defer.returnValue(None) def publishItem(self, client, mb_data, service, node, session): - log.debug(u"uploading item [{id}]: {title}".format(id=mb_data['id'], title=mb_data.get('title',''))) + log.debug( + u"uploading item [{id}]: {title}".format( + id=mb_data["id"], title=mb_data.get("title", "") + ) + ) return self._m.send(client, mb_data, service, node) @defer.inlineCallbacks @@ -161,54 +172,80 @@ return # we want only XHTML content - for prefix in ('content',): # a tuple is use, if title need to be added in the future + for prefix in ( + "content", + ): # a tuple is use, if title need to be added in the future try: - rich = mb_data['{}_rich'.format(prefix)] + rich = mb_data["{}_rich".format(prefix)] except KeyError: pass else: - if '{}_xhtml'.format(prefix) in mb_data: - raise exceptions.DataError(u"importer gave {prefix}_rich and {prefix}_xhtml at the same time, this is not allowed".format(prefix=prefix)) + if "{}_xhtml".format(prefix) in mb_data: + raise exceptions.DataError( + u"importer gave {prefix}_rich and {prefix}_xhtml at the same time, this is not allowed".format( + prefix=prefix + ) + ) # we convert rich syntax to XHTML here, so we can handle filters easily - converted = yield self._s.convert(rich, self._s.getCurrentSyntax(client.profile), safe=False) - mb_data['{}_xhtml'.format(prefix)] = converted - del mb_data['{}_rich'.format(prefix)] + converted = yield self._s.convert( + rich, self._s.getCurrentSyntax(client.profile), safe=False + ) + mb_data["{}_xhtml".format(prefix)] = converted + del mb_data["{}_rich".format(prefix)] try: - mb_data['txt'] + mb_data["txt"] except KeyError: pass else: - if '{}_xhtml'.format(prefix) in mb_data: - log.warning(u"{prefix}_text will be replaced by converted {prefix}_xhtml, so filters can be handled".format(prefix=prefix)) - del mb_data['{}_text'.format(prefix)] + if "{}_xhtml".format(prefix) in mb_data: + log.warning( + u"{prefix}_text will be replaced by converted {prefix}_xhtml, so filters can be handled".format( + prefix=prefix + ) + ) + del mb_data["{}_text".format(prefix)] else: - log.warning(u"importer gave a text {prefix}, blog filters don't work on text {prefix}".format(prefix=prefix)) + log.warning( + u"importer gave a text {prefix}, blog filters don't work on text {prefix}".format( + prefix=prefix + ) + ) return # at this point, we have only XHTML version of content try: - top_elt = xml_tools.ElementParser()(mb_data['content_xhtml'], namespace=C.NS_XHTML) + top_elt = xml_tools.ElementParser()( + mb_data["content_xhtml"], namespace=C.NS_XHTML + ) except domish.ParserError: # we clean the xml and try again our luck - cleaned = yield self._s.cleanXHTML(mb_data['content_xhtml']) + cleaned = yield self._s.cleanXHTML(mb_data["content_xhtml"]) top_elt = xml_tools.ElementParser()(cleaned, namespace=C.NS_XHTML) opt_host = options.get(OPT_HOST) if opt_host: # we normalise the domain parsed_host = urlparse.urlsplit(opt_host) - opt_host = urlparse.urlunsplit((parsed_host.scheme or 'http', parsed_host.netloc or parsed_host.path, '', '', '')) + opt_host = urlparse.urlunsplit( + ( + parsed_host.scheme or "http", + parsed_host.netloc or parsed_host.path, + "", + "", + "", + ) + ) tmp_dir = tempfile.mkdtemp() try: # TODO: would be nice to also update the hyperlinks to these images, e.g. when you have <a href="{url}"><img src="{url}"></a> - for img_elt in xml_tools.findAll(top_elt, names=[u'img']): + for img_elt in xml_tools.findAll(top_elt, names=[u"img"]): yield self.imgFilters(client, img_elt, options, opt_host, tmp_dir) finally: - os.rmdir(tmp_dir) # XXX: tmp_dir should be empty, or something went wrong + os.rmdir(tmp_dir) # XXX: tmp_dir should be empty, or something went wrong # we now replace the content with filtered one - mb_data['content_xhtml'] = top_elt.toXml() + mb_data["content_xhtml"] = top_elt.toXml() @defer.inlineCallbacks def imgFilters(self, client, img_elt, options, opt_host, tmp_dir): @@ -222,15 +259,18 @@ @param tmp_dir(str): path to temp directory """ try: - url = img_elt['src'] - if url[0] == u'/': + url = img_elt["src"] + if url[0] == u"/": if not opt_host: - log.warning(u"host was not specified, we can't deal with src without host ({url}) and have to ignore the following <img/>:\n{xml}" - .format(url=url, xml=img_elt.toXml())) + log.warning( + u"host was not specified, we can't deal with src without host ({url}) and have to ignore the following <img/>:\n{xml}".format( + url=url, xml=img_elt.toXml() + ) + ) return else: url = urlparse.urljoin(opt_host, url) - filename = url.rsplit('/',1)[-1].strip() + filename = url.rsplit("/", 1)[-1].strip() if not filename: raise KeyError except (KeyError, IndexError): @@ -238,7 +278,7 @@ return # we change the url for the normalized one - img_elt['src'] = url + img_elt["src"] = url if options.get(OPT_UPLOAD_IMAGES, False): # upload is requested @@ -250,23 +290,32 @@ # host is the ignored one, we skip parsed_url = urlparse.urlsplit(url) if ignore_host in parsed_url.hostname: - log.info(u"Don't upload image at {url} because of {opt} option".format( - url=url, opt=OPT_UPLOAD_IGNORE_HOST)) + log.info( + u"Don't upload image at {url} because of {opt} option".format( + url=url, opt=OPT_UPLOAD_IGNORE_HOST + ) + ) return # we download images and re-upload them via XMPP - tmp_file = os.path.join(tmp_dir, filename).encode('utf-8') - upload_options = {'ignore_tls_errors': options.get(OPT_IGNORE_TLS, False)} + tmp_file = os.path.join(tmp_dir, filename).encode("utf-8") + upload_options = {"ignore_tls_errors": options.get(OPT_IGNORE_TLS, False)} try: - yield web_client.downloadPage(url.encode('utf-8'), tmp_file) - filename = filename.replace(u'%', u'_') # FIXME: tmp workaround for a bug in prosody http upload - dummy, download_d = yield self._u.upload(client, tmp_file, filename, options=upload_options) + yield web_client.downloadPage(url.encode("utf-8"), tmp_file) + filename = filename.replace( + u"%", u"_" + ) # FIXME: tmp workaround for a bug in prosody http upload + dummy, download_d = yield self._u.upload( + client, tmp_file, filename, options=upload_options + ) download_url = yield download_d except Exception as e: - log.warning(u"can't download image at {url}: {reason}".format(url=url, reason=e)) + log.warning( + u"can't download image at {url}: {reason}".format(url=url, reason=e) + ) else: - img_elt['src'] = download_url + img_elt["src"] = download_url try: os.unlink(tmp_file)