diff sat/plugins/plugin_blog_import.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 378188abe941
line wrap: on
line diff
--- a/sat/plugins/plugin_blog_import.py	Wed Jun 27 07:51:29 2018 +0200
+++ b/sat/plugins/plugin_blog_import.py	Wed Jun 27 20:14:46 2018 +0200
@@ -21,6 +21,7 @@
 from sat.core.i18n import _
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
+
 log = getLogger(__name__)
 from twisted.internet import defer
 from twisted.web import client as web_client
@@ -41,33 +42,36 @@
     C.PI_DEPENDENCIES: ["IMPORT", "XEP-0060", "XEP-0277", "TEXT-SYNTAXES", "UPLOAD"],
     C.PI_MAIN: "BlogImportPlugin",
     C.PI_HANDLER: "no",
-    C.PI_DESCRIPTION: _(u"""Blog import management:
-This plugin manage the different blog importers which can register to it, and handle generic importing tasks.""")
+    C.PI_DESCRIPTION: _(
+        u"""Blog import management:
+This plugin manage the different blog importers which can register to it, and handle generic importing tasks."""
+    ),
 }
 
-OPT_HOST = 'host'
-OPT_UPLOAD_IMAGES = 'upload_images'
-OPT_UPLOAD_IGNORE_HOST = 'upload_ignore_host'
-OPT_IGNORE_TLS = 'ignore_tls_errors'
-URL_REDIRECT_PREFIX = 'url_redirect_'
+OPT_HOST = "host"
+OPT_UPLOAD_IMAGES = "upload_images"
+OPT_UPLOAD_IGNORE_HOST = "upload_ignore_host"
+OPT_IGNORE_TLS = "ignore_tls_errors"
+URL_REDIRECT_PREFIX = "url_redirect_"
 
 
 class BlogImportPlugin(object):
     BOOL_OPTIONS = (OPT_UPLOAD_IMAGES, OPT_IGNORE_TLS)
     JSON_OPTIONS = ()
-    OPT_DEFAULTS = {OPT_UPLOAD_IMAGES: True,
-                    OPT_IGNORE_TLS: False}
+    OPT_DEFAULTS = {OPT_UPLOAD_IMAGES: True, OPT_IGNORE_TLS: False}
 
     def __init__(self, host):
         log.info(_("plugin Blog Import initialization"))
         self.host = host
-        self._u = host.plugins['UPLOAD']
-        self._p = host.plugins['XEP-0060']
-        self._m = host.plugins['XEP-0277']
-        self._s = self.host.plugins['TEXT-SYNTAXES']
-        host.plugins['IMPORT'].initialize(self, u'blog')
+        self._u = host.plugins["UPLOAD"]
+        self._p = host.plugins["XEP-0060"]
+        self._m = host.plugins["XEP-0277"]
+        self._s = self.host.plugins["TEXT-SYNTAXES"]
+        host.plugins["IMPORT"].initialize(self, u"blog")
 
-    def importItem(self, client, item_import_data, session, options, return_data, service, node):
+    def importItem(
+        self, client, item_import_data, session, options, return_data, service, node
+    ):
         """importItem specialized for blog import
 
         @param item_import_data(dict):
@@ -99,51 +103,58 @@
         @param return_data(dict): will contain link between former posts and new items
 
         """
-        mb_data = item_import_data['blog']
+        mb_data = item_import_data["blog"]
         try:
-            item_id = mb_data['id']
+            item_id = mb_data["id"]
         except KeyError:
-            item_id = mb_data['id'] = unicode(shortuuid.uuid())
+            item_id = mb_data["id"] = unicode(shortuuid.uuid())
 
         try:
             # we keep the link between old url and new blog item
             # so the user can redirect its former blog urls
-            old_uri = item_import_data['url']
+            old_uri = item_import_data["url"]
         except KeyError:
             pass
         else:
             new_uri = return_data[URL_REDIRECT_PREFIX + old_uri] = self._p.getNodeURI(
                 service if service is not None else client.jid.userhostJID(),
                 node or self._m.namespace,
-                item_id)
-            log.info(u"url link from {old} to {new}".format(
-                old=old_uri, new=new_uri))
+                item_id,
+            )
+            log.info(u"url link from {old} to {new}".format(old=old_uri, new=new_uri))
 
         return mb_data
 
     @defer.inlineCallbacks
     def importSubItems(self, client, item_import_data, mb_data, session, options):
         # comments data
-        if len(item_import_data['comments']) != 1:
+        if len(item_import_data["comments"]) != 1:
             raise NotImplementedError(u"can't manage multiple comment links")
-        allow_comments = C.bool(mb_data.get('allow_comments', C.BOOL_FALSE))
+        allow_comments = C.bool(mb_data.get("allow_comments", C.BOOL_FALSE))
         if allow_comments:
             comments_service = yield self._m.getCommentsService(client)
-            comments_node = self._m.getCommentsNode(mb_data['id'])
-            mb_data['comments_service'] = comments_service.full()
-            mb_data['comments_node'] = comments_node
+            comments_node = self._m.getCommentsNode(mb_data["id"])
+            mb_data["comments_service"] = comments_service.full()
+            mb_data["comments_node"] = comments_node
             recurse_kwargs = {
-                'items_import_data':item_import_data['comments'][0],
-                'service':comments_service,
-                'node':comments_node}
+                "items_import_data": item_import_data["comments"][0],
+                "service": comments_service,
+                "node": comments_node,
+            }
             defer.returnValue(recurse_kwargs)
         else:
-            if item_import_data['comments'][0]:
-                raise exceptions.DataError(u"allow_comments set to False, but comments are there")
+            if item_import_data["comments"][0]:
+                raise exceptions.DataError(
+                    u"allow_comments set to False, but comments are there"
+                )
             defer.returnValue(None)
 
     def publishItem(self, client, mb_data, service, node, session):
-        log.debug(u"uploading item [{id}]: {title}".format(id=mb_data['id'], title=mb_data.get('title','')))
+        log.debug(
+            u"uploading item [{id}]: {title}".format(
+                id=mb_data["id"], title=mb_data.get("title", "")
+            )
+        )
         return self._m.send(client, mb_data, service, node)
 
     @defer.inlineCallbacks
@@ -161,54 +172,80 @@
             return
 
         # we want only XHTML content
-        for prefix in ('content',): # a tuple is use, if title need to be added in the future
+        for prefix in (
+            "content",
+        ):  # a tuple is use, if title need to be added in the future
             try:
-                rich = mb_data['{}_rich'.format(prefix)]
+                rich = mb_data["{}_rich".format(prefix)]
             except KeyError:
                 pass
             else:
-                if '{}_xhtml'.format(prefix) in mb_data:
-                    raise exceptions.DataError(u"importer gave {prefix}_rich and {prefix}_xhtml at the same time, this is not allowed".format(prefix=prefix))
+                if "{}_xhtml".format(prefix) in mb_data:
+                    raise exceptions.DataError(
+                        u"importer gave {prefix}_rich and {prefix}_xhtml at the same time, this is not allowed".format(
+                            prefix=prefix
+                        )
+                    )
                 # we convert rich syntax to XHTML here, so we can handle filters easily
-                converted = yield self._s.convert(rich, self._s.getCurrentSyntax(client.profile), safe=False)
-                mb_data['{}_xhtml'.format(prefix)] = converted
-                del mb_data['{}_rich'.format(prefix)]
+                converted = yield self._s.convert(
+                    rich, self._s.getCurrentSyntax(client.profile), safe=False
+                )
+                mb_data["{}_xhtml".format(prefix)] = converted
+                del mb_data["{}_rich".format(prefix)]
 
             try:
-                mb_data['txt']
+                mb_data["txt"]
             except KeyError:
                 pass
             else:
-                if '{}_xhtml'.format(prefix) in mb_data:
-                    log.warning(u"{prefix}_text will be replaced by converted {prefix}_xhtml, so filters can be handled".format(prefix=prefix))
-                    del mb_data['{}_text'.format(prefix)]
+                if "{}_xhtml".format(prefix) in mb_data:
+                    log.warning(
+                        u"{prefix}_text will be replaced by converted {prefix}_xhtml, so filters can be handled".format(
+                            prefix=prefix
+                        )
+                    )
+                    del mb_data["{}_text".format(prefix)]
                 else:
-                    log.warning(u"importer gave a text {prefix}, blog filters don't work on text {prefix}".format(prefix=prefix))
+                    log.warning(
+                        u"importer gave a text {prefix}, blog filters don't work on text {prefix}".format(
+                            prefix=prefix
+                        )
+                    )
                     return
 
         # at this point, we have only XHTML version of content
         try:
-            top_elt = xml_tools.ElementParser()(mb_data['content_xhtml'], namespace=C.NS_XHTML)
+            top_elt = xml_tools.ElementParser()(
+                mb_data["content_xhtml"], namespace=C.NS_XHTML
+            )
         except domish.ParserError:
             # we clean the xml and try again our luck
-            cleaned = yield self._s.cleanXHTML(mb_data['content_xhtml'])
+            cleaned = yield self._s.cleanXHTML(mb_data["content_xhtml"])
             top_elt = xml_tools.ElementParser()(cleaned, namespace=C.NS_XHTML)
         opt_host = options.get(OPT_HOST)
         if opt_host:
             # we normalise the domain
             parsed_host = urlparse.urlsplit(opt_host)
-            opt_host = urlparse.urlunsplit((parsed_host.scheme or 'http', parsed_host.netloc or parsed_host.path, '', '', ''))
+            opt_host = urlparse.urlunsplit(
+                (
+                    parsed_host.scheme or "http",
+                    parsed_host.netloc or parsed_host.path,
+                    "",
+                    "",
+                    "",
+                )
+            )
 
         tmp_dir = tempfile.mkdtemp()
         try:
             # TODO: would be nice to also update the hyperlinks to these images, e.g. when you have <a href="{url}"><img src="{url}"></a>
-            for img_elt in xml_tools.findAll(top_elt, names=[u'img']):
+            for img_elt in xml_tools.findAll(top_elt, names=[u"img"]):
                 yield self.imgFilters(client, img_elt, options, opt_host, tmp_dir)
         finally:
-            os.rmdir(tmp_dir) # XXX: tmp_dir should be empty, or something went wrong
+            os.rmdir(tmp_dir)  # XXX: tmp_dir should be empty, or something went wrong
 
         # we now replace the content with filtered one
-        mb_data['content_xhtml'] = top_elt.toXml()
+        mb_data["content_xhtml"] = top_elt.toXml()
 
     @defer.inlineCallbacks
     def imgFilters(self, client, img_elt, options, opt_host, tmp_dir):
@@ -222,15 +259,18 @@
         @param tmp_dir(str): path to temp directory
         """
         try:
-            url = img_elt['src']
-            if url[0] == u'/':
+            url = img_elt["src"]
+            if url[0] == u"/":
                 if not opt_host:
-                    log.warning(u"host was not specified, we can't deal with src without host ({url}) and have to ignore the following <img/>:\n{xml}"
-                        .format(url=url, xml=img_elt.toXml()))
+                    log.warning(
+                        u"host was not specified, we can't deal with src without host ({url}) and have to ignore the following <img/>:\n{xml}".format(
+                            url=url, xml=img_elt.toXml()
+                        )
+                    )
                     return
                 else:
                     url = urlparse.urljoin(opt_host, url)
-            filename = url.rsplit('/',1)[-1].strip()
+            filename = url.rsplit("/", 1)[-1].strip()
             if not filename:
                 raise KeyError
         except (KeyError, IndexError):
@@ -238,7 +278,7 @@
             return
 
         # we change the url for the normalized one
-        img_elt['src'] = url
+        img_elt["src"] = url
 
         if options.get(OPT_UPLOAD_IMAGES, False):
             # upload is requested
@@ -250,23 +290,32 @@
                 # host is the ignored one, we skip
                 parsed_url = urlparse.urlsplit(url)
                 if ignore_host in parsed_url.hostname:
-                    log.info(u"Don't upload image at {url} because of {opt} option".format(
-                        url=url, opt=OPT_UPLOAD_IGNORE_HOST))
+                    log.info(
+                        u"Don't upload image at {url} because of {opt} option".format(
+                            url=url, opt=OPT_UPLOAD_IGNORE_HOST
+                        )
+                    )
                     return
 
             # we download images and re-upload them via XMPP
-            tmp_file = os.path.join(tmp_dir, filename).encode('utf-8')
-            upload_options = {'ignore_tls_errors': options.get(OPT_IGNORE_TLS, False)}
+            tmp_file = os.path.join(tmp_dir, filename).encode("utf-8")
+            upload_options = {"ignore_tls_errors": options.get(OPT_IGNORE_TLS, False)}
 
             try:
-                yield web_client.downloadPage(url.encode('utf-8'), tmp_file)
-                filename = filename.replace(u'%', u'_') # FIXME: tmp workaround for a bug in prosody http upload
-                dummy, download_d = yield self._u.upload(client, tmp_file, filename, options=upload_options)
+                yield web_client.downloadPage(url.encode("utf-8"), tmp_file)
+                filename = filename.replace(
+                    u"%", u"_"
+                )  # FIXME: tmp workaround for a bug in prosody http upload
+                dummy, download_d = yield self._u.upload(
+                    client, tmp_file, filename, options=upload_options
+                )
                 download_url = yield download_d
             except Exception as e:
-                log.warning(u"can't download image at {url}: {reason}".format(url=url, reason=e))
+                log.warning(
+                    u"can't download image at {url}: {reason}".format(url=url, reason=e)
+                )
             else:
-                img_elt['src'] = download_url
+                img_elt["src"] = download_url
 
             try:
                 os.unlink(tmp_file)