view libervia/cli/cmd_blog.py @ 4295:7d98d894933c

frontends (tools/aio): Fix excessive use of CPU: Use a short delay instead of `loop.call_soonloop.call_soon` when work as been done, as this was resulting in an excessive use of CPU.
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 17:38:31 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import argparse
import asyncio
from asyncio.subprocess import DEVNULL
from configparser import NoOptionError, NoSectionError
import json
import os
import os.path
from pathlib import Path
import re
import subprocess
import sys
import tempfile
from urllib.parse import urlparse

from libervia.backend.core.i18n import _
from libervia.backend.tools import config
from libervia.backend.tools.common import uri
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common.ansi import ANSI as A
from libervia.cli import common
from libervia.cli.constants import Const as C

from . import base, cmd_pubsub

__commands__ = ["Blog"]

SYNTAX_XHTML = "xhtml"
# extensions to use with known syntaxes
SYNTAX_EXT = {
    # FIXME: default syntax doesn't sounds needed, there should always be a syntax set
    #        by the plugin.
    "": "txt",  # used when the syntax is not found
    SYNTAX_XHTML: "xhtml",
    "markdown": "md",
}


CONF_SYNTAX_EXT = "syntax_ext_dict"
BLOG_TMP_DIR = "blog"
# key to remove from metadata tmp file if they exist
KEY_TO_REMOVE_METADATA = (
    "id",
    "content",
    "content_xhtml",
    "comments_node",
    "comments_service",
    "updated",
)

URL_REDIRECT_PREFIX = "url_redirect_"
AIONOTIFY_INSTALL = '"pip install aionotify"'
MB_KEYS = (
    "id",
    "url",
    "atom_id",
    "updated",
    "published",
    "language",
    "comments",  # this key is used for all comments* keys
    "tags",  # this key is used for all tag* keys
    "author",
    "author_jid",
    "author_email",
    "author_jid_verified",
    "content",
    "content_xhtml",
    "title",
    "title_xhtml",
    "extra",
)
OUTPUT_OPT_NO_HEADER = "no-header"
RE_ATTACHMENT_METADATA = re.compile(r"^(?P<key>[a-z_]+)=(?P<value>.*)")
ALLOWER_ATTACH_MD_KEY = ("desc", "media_type", "external")


async def guess_syntax_from_path(host, sat_conf, path):
    """Return syntax guessed according to filename extension

    @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration
    @param path(str): path to the content file
    @return(unicode): syntax to use
    """
    # we first try to guess syntax with extension
    ext = os.path.splitext(path)[1][1:]  # we get extension without the '.'
    if ext:
        for k, v in SYNTAX_EXT.items():
            if k and ext == v:
                return k

                # if not found, we use current syntax
    return await host.bridge.param_get_a("Syntax", "Composition", "value", host.profile)


class AltLinkAction(argparse.Action):
    def __init__(self, option_strings, dest, nargs="+", **kwargs):
        if nargs != "+":
            raise ValueError('nargs must be "+"')

        super().__init__(option_strings, dest, nargs=nargs, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        assert values
        url = values[0]
        if len(values) == 1:
            media_type = None
        elif len(values) == 2:
            media_type = values[1]
        else:
            parser.error(
                f"invalid number of argument for {', '.join(self.option_strings)}, it "
                "must have at most 2 arguments."
            )
        alt_link = {"url": url}
        if media_type is not None:
            alt_link["media_type"] = media_type
        alt_links = getattr(namespace, self.dest)
        if alt_links is None:
            alt_links = []
            setattr(namespace, self.dest, alt_links)
        alt_links.append(alt_link)


class BlogPublishCommon:
    """handle common option for publising commands (Set and Edit)"""

    async def get_current_syntax(self):
        """Retrieve current_syntax

        Use default syntax if --syntax has not been used, else check given syntax.
        Will set self.default_syntax_used to True if default syntax has been used
        """
        if self.args.syntax is None:
            self.default_syntax_used = True
            return await self.host.bridge.param_get_a(
                "Syntax", "Composition", "value", self.profile
            )
        else:
            self.default_syntax_used = False
            try:
                syntax = await self.host.bridge.syntax_get(self.args.syntax)
                self.current_syntax = self.args.syntax = syntax
            except Exception as e:
                if e.classname == "NotFound":
                    self.parser.error(
                        _("unknown syntax requested ({syntax})").format(
                            syntax=self.args.syntax
                        )
                    )
                else:
                    raise e
        return self.args.syntax

    def add_parser_options(self):
        self.parser.add_argument("-T", "--title", help=_("title of the item"))
        self.parser.add_argument(
            "-t",
            "--tag",
            action="append",
            help=_("tag (category) of your item"),
        )
        self.parser.add_argument(
            "-l",
            "--language",
            help=_("language of the item (ISO 639 code)"),
        )

        self.parser.add_argument(
            "-a",
            "--attachment",
            dest="attachments",
            nargs="+",
            help=_("attachment in the form URL [metadata_name=value]"),
        )

        self.parser.add_argument(
            "--alt-link",
            action=AltLinkAction,
            dest="alt_links",
            metavar=("URL", "MEDIA_TYPE"),
            help=(
                "add an alternative link, you can use {service}, {node} and {item} "
                "template values in URL"
            ),
        )

        comments_group = self.parser.add_mutually_exclusive_group()
        comments_group.add_argument(
            "-C",
            "--comments",
            action="store_const",
            const=True,
            dest="comments",
            help=_(
                "enable comments (default: comments not enabled except if they "
                "already exist)"
            ),
        )
        comments_group.add_argument(
            "--no-comments",
            action="store_const",
            const=False,
            dest="comments",
            help=_("disable comments (will remove comments node if it exist)"),
        )

        self.parser.add_argument(
            "--no-id-suffix",
            action="store_true",
            help=_("do no add random suffix to friendly ID"),
        )

        self.parser.add_argument(
            "-S",
            "--syntax",
            help=_("syntax to use (default: get profile's default syntax)"),
        )
        self.parser.add_argument(
            "-e",
            "--encrypt",
            action="store_true",
            help=_("end-to-end encrypt the blog post"),
        )
        self.parser.add_argument(
            "--encrypt-for",
            metavar="JID",
            action="append",
            help=_("encrypt a single item for"),
        )
        self.parser.add_argument(
            "-X",
            "--sign",
            action="store_true",
            help=_("cryptographically sign the blog post"),
        )

    async def set_mb_data_content(self, content, mb_data):
        if self.default_syntax_used:
            # default syntax has been used
            mb_data["content_rich"] = content
        elif self.current_syntax == SYNTAX_XHTML:
            mb_data["content_xhtml"] = content
        else:
            mb_data["content_xhtml"] = await self.host.bridge.syntax_convert(
                content, self.current_syntax, SYNTAX_XHTML, False, self.profile
            )

    def handle_attachments(self, mb_data: dict) -> None:
        """Check, validate and add attachments to mb_data"""
        if self.args.attachments:
            attachments = []
            attachment = {}
            for arg in self.args.attachments:
                m = RE_ATTACHMENT_METADATA.match(arg)
                if m is None:
                    # we should have an URL
                    url_parsed = urlparse(arg)
                    if url_parsed.scheme not in ("http", "https"):
                        self.parser.error(
                            "invalid URL in --attachment (only http(s) scheme is "
                            f" accepted): {arg}"
                        )
                    if attachment:
                        # if we hae a new URL, we have a new attachment
                        attachments.append(attachment)
                        attachment = {}
                    attachment["url"] = arg
                else:
                    # we should have a metadata
                    if "url" not in attachment:
                        self.parser.error(
                            "you must to specify an URL before any metadata in "
                            "--attachment"
                        )
                    key = m.group("key")
                    if key not in ALLOWER_ATTACH_MD_KEY:
                        self.parser.error(
                            f"invalid metadata key in --attachment: {key!r}"
                        )
                    value = m.group("value").strip()
                    if key == "external":
                        if not value:
                            value = True
                        else:
                            value = C.bool(value)
                    attachment[key] = value
            if attachment:
                attachments.append(attachment)
            if attachments:
                mb_data.setdefault("extra", {})["attachments"] = attachments

    def set_mb_data_from_args(self, mb_data):
        """set microblog metadata according to command line options

        if metadata already exist, it will be overwritten
        """
        if self.args.comments is not None:
            mb_data["allow_comments"] = self.args.comments
        if self.args.tag:
            mb_data["tags"] = self.args.tag
        if self.args.title is not None:
            mb_data["title"] = self.args.title
        if self.args.language is not None:
            mb_data["language"] = self.args.language
        if self.args.no_id_suffix:
            mb_data["user_friendly_id_suffix"] = False
        if self.args.alt_links:
            mb_data.setdefault("extra", {})["alt_links"] = self.args.alt_links
        if self.args.encrypt:
            mb_data["encrypted"] = True
        if self.args.sign:
            mb_data["signed"] = True
        if self.args.encrypt_for:
            mb_data["encrypted_for"] = {"targets": self.args.encrypt_for}
        self.handle_attachments(mb_data)


class Set(base.CommandBase, BlogPublishCommon):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            pubsub_flags={C.SINGLE_ITEM},
            help=_("publish a new blog item or update an existing one"),
        )
        BlogPublishCommon.__init__(self)

    def add_parser_options(self):
        BlogPublishCommon.add_parser_options(self)

    async def start(self):
        self.current_syntax = await self.get_current_syntax()
        self.pubsub_item = self.args.item
        mb_data = {}
        self.set_mb_data_from_args(mb_data)
        if self.pubsub_item:
            mb_data["id"] = self.pubsub_item
        content = sys.stdin.read()
        await self.set_mb_data_content(content, mb_data)

        try:
            item_id = await self.host.bridge.mb_send(
                self.args.service,
                self.args.node,
                data_format.serialise(mb_data),
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't send item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(f"Item published with ID {item_id}")
            self.host.quit(C.EXIT_OK)


class Get(base.CommandBase):
    TEMPLATE = "blog/articles.html"

    def __init__(self, host):
        extra_outputs = {"default": self.default_output, "fancy": self.fancy_output}
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_verbose=True,
            use_pubsub=True,
            pubsub_flags={C.MULTI_ITEMS, C.CACHE},
            use_output=C.OUTPUT_COMPLEX,
            extra_outputs=extra_outputs,
            help=_("get blog item(s)"),
        )

    def add_parser_options(self):
        #  TODO: a key(s) argument to select keys to display
        self.parser.add_argument(
            "-k",
            "--key",
            action="append",
            dest="keys",
            help=_("microblog data key(s) to display (default: depend of verbosity)"),
        )
        # TODO: add MAM filters

    def template_data_mapping(self, data):
        items, blog_items = data
        blog_items["items"] = items
        return {"blog_items": blog_items}

    def format_comments(self, item, keys):
        lines = []
        for data in item.get("comments", []):
            lines.append(data["uri"])
            for k in ("node", "service"):
                if OUTPUT_OPT_NO_HEADER in self.args.output_opts:
                    header = ""
                else:
                    header = f"{C.A_HEADER}comments_{k}: {A.RESET}"
                lines.append(header + data[k])
        return "\n".join(lines)

    def format_tags(self, item, keys):
        tags = item.pop("tags", [])
        return ", ".join(tags)

    def format_updated(self, item, keys):
        return common.format_time(item["updated"])

    def format_published(self, item, keys):
        return common.format_time(item["published"])

    def format_url(self, item, keys):
        return uri.build_xmpp_uri(
            "pubsub",
            subtype="microblog",
            path=self.metadata["service"],
            node=self.metadata["node"],
            item=item["id"],
        )

    def get_keys(self):
        """return keys to display according to verbosity or explicit key request"""
        verbosity = self.args.verbose
        if self.args.keys:
            if not set(MB_KEYS).issuperset(self.args.keys):
                self.disp(
                    "following keys are invalid: {invalid}.\n"
                    "Valid keys are: {valid}.".format(
                        invalid=", ".join(set(self.args.keys).difference(MB_KEYS)),
                        valid=", ".join(sorted(MB_KEYS)),
                    ),
                    error=True,
                )
                self.host.quit(C.EXIT_BAD_ARG)
            return self.args.keys
        else:
            if verbosity == 0:
                return ("title", "content")
            elif verbosity == 1:
                return (
                    "title",
                    "tags",
                    "author",
                    "author_jid",
                    "author_email",
                    "author_jid_verified",
                    "published",
                    "updated",
                    "content",
                )
            else:
                return MB_KEYS

    def default_output(self, data):
        """simple key/value output"""
        items, self.metadata = data
        keys = self.get_keys()

        #  k_cb use format_[key] methods for complex formattings
        k_cb = {}
        for k in keys:
            try:
                callback = getattr(self, "format_" + k)
            except AttributeError:
                pass
            else:
                k_cb[k] = callback
        for idx, item in enumerate(items):
            for k in keys:
                if k not in item and k not in k_cb:
                    continue
                if OUTPUT_OPT_NO_HEADER in self.args.output_opts:
                    header = ""
                else:
                    header = "{k_fmt}{key}:{k_fmt_e} {sep}".format(
                        k_fmt=C.A_HEADER,
                        key=k,
                        k_fmt_e=A.RESET,
                        sep="\n" if "content" in k else "",
                    )
                value = k_cb[k](item, keys) if k in k_cb else item[k]
                if isinstance(value, bool):
                    value = str(value).lower()
                elif isinstance(value, dict):
                    value = repr(value)
                self.disp(header + (value or ""))
                # we want a separation line after each item but the last one
            if idx < len(items) - 1:
                print("")

    def fancy_output(self, data):
        """display blog is a nice to read way

        this output doesn't use keys filter
        """
        # thanks to http://stackoverflow.com/a/943921
        rows, columns = list(map(int, os.popen("stty size", "r").read().split()))
        items, metadata = data
        verbosity = self.args.verbose
        sep = A.color(A.FG_BLUE, columns * "▬")
        if items:
            print(("\n" + sep + "\n"))

        for idx, item in enumerate(items):
            title = item.get("title")
            if verbosity > 0:
                author = item["author"]
                published, updated = item["published"], item.get("updated")
            else:
                author = published = updated = None
            if verbosity > 1:
                tags = item.pop("tags", [])
            else:
                tags = None
            content = item.get("content")

            if title:
                print((A.color(A.BOLD, A.FG_CYAN, item["title"])))
            meta = []
            if author:
                meta.append(A.color(A.FG_YELLOW, author))
            if published:
                meta.append(A.color(A.FG_YELLOW, "on ", common.format_time(published)))
            if updated != published:
                meta.append(
                    A.color(A.FG_YELLOW, "(updated on ", common.format_time(updated), ")")
                )
            print((" ".join(meta)))
            if tags:
                print((A.color(A.FG_MAGENTA, ", ".join(tags))))
            if (title or tags) and content:
                print("")
            if content:
                self.disp(content)

            print(("\n" + sep + "\n"))

    async def start(self):
        try:
            mb_data = data_format.deserialise(
                await self.host.bridge.mb_get(
                    self.args.service,
                    self.args.node,
                    self.args.max,
                    self.args.items,
                    self.get_pubsub_extra(),
                    self.profile,
                )
            )
        except Exception as e:
            self.disp(f"can't get blog items: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            items = mb_data.pop("items")
            await self.output((items, mb_data))
            self.host.quit(C.EXIT_OK)


class Edit(base.CommandBase, BlogPublishCommon, common.BaseEdit):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "edit",
            use_pubsub=True,
            pubsub_flags={C.SINGLE_ITEM},
            use_draft=True,
            use_verbose=True,
            help=_("edit an existing or new blog post"),
        )
        BlogPublishCommon.__init__(self)
        common.BaseEdit.__init__(self, self.host, BLOG_TMP_DIR, use_metadata=True)

    def add_parser_options(self):
        BlogPublishCommon.add_parser_options(self)
        self.parser.add_argument(
            "-P",
            "--preview",
            action="store_true",
            help=_("launch a blog preview in parallel"),
        )
        self.parser.add_argument(
            "--no-publish",
            action="store_true",
            help=_('add "publish: False" to metadata'),
        )

    def build_metadata_file(self, content_file_path, mb_data=None):
        """Build a metadata file using json

        The file is named after content_file_path, with extension replaced by
        _metadata.json
        @param content_file_path(str): path to the temporary file which will contain the
            body
        @param mb_data(dict, None): microblog metadata (for existing items)
        @return (tuple[dict, Path]): merged metadata put originaly in metadata file
            and path to temporary metadata file
        """
        # we first construct metadata from edited item ones and CLI argumments
        # or re-use the existing one if it exists
        meta_file_path = content_file_path.with_name(
            content_file_path.stem + common.METADATA_SUFF
        )
        if meta_file_path.exists():
            self.disp("Metadata file already exists, we re-use it")
            try:
                with meta_file_path.open("rb") as f:
                    mb_data = json.load(f)
            except (OSError, IOError, ValueError) as e:
                self.disp(
                    f"Can't read existing metadata file at {meta_file_path}, "
                    f"aborting: {e}",
                    error=True,
                )
                self.host.quit(1)
        else:
            mb_data = {} if mb_data is None else mb_data.copy()

            # in all cases, we want to remove unwanted keys
        for key in KEY_TO_REMOVE_METADATA:
            try:
                del mb_data[key]
            except KeyError:
                pass
                # and override metadata with command-line arguments
        self.set_mb_data_from_args(mb_data)

        if self.args.no_publish:
            mb_data["publish"] = False

            # then we create the file and write metadata there, as JSON dict
            # XXX: if we port libervia-cli one day on Windows, O_BINARY may need to be
            #   added here
        with os.fdopen(
            os.open(meta_file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600), "w+b"
        ) as f:
            # we need to use an intermediate unicode buffer to write to the file
            # unicode without escaping characters
            unicode_dump = json.dumps(
                mb_data,
                ensure_ascii=False,
                indent=4,
                separators=(",", ": "),
                sort_keys=True,
            )
            f.write(unicode_dump.encode("utf-8"))

        return mb_data, meta_file_path

    async def edit(self, content_file_path, content_file_obj, mb_data=None):
        """Edit the file contening the content using editor, and publish it"""
        # we first create metadata file
        meta_ori, meta_file_path = self.build_metadata_file(content_file_path, mb_data)

        coroutines = []

        # do we need a preview ?
        if self.args.preview:
            self.disp("Preview requested, launching it", 1)
            # we redirect outputs to /dev/null to avoid console pollution in editor
            # if user wants to see messages, (s)he can call "blog preview" directly
            coroutines.append(
                asyncio.create_subprocess_exec(
                    sys.argv[0],
                    "blog",
                    "preview",
                    "--inotify",
                    "true",
                    "-p",
                    self.profile,
                    str(content_file_path),
                    stdout=DEVNULL,
                    stderr=DEVNULL,
                )
            )

            # we launch editor
        coroutines.append(
            self.run_editor(
                "blog_editor_args",
                content_file_path,
                content_file_obj,
                meta_file_path=meta_file_path,
                meta_ori=meta_ori,
            )
        )

        await asyncio.gather(*coroutines)

    async def publish(self, content, mb_data):
        await self.set_mb_data_content(content, mb_data)

        if self.pubsub_item:
            mb_data["id"] = self.pubsub_item

        mb_data = data_format.serialise(mb_data)

        await self.host.bridge.mb_send(
            self.pubsub_service, self.pubsub_node, mb_data, self.profile
        )
        self.disp("Blog item published")

    def get_tmp_suff(self):
        # we get current syntax to determine file extension
        return SYNTAX_EXT.get(self.current_syntax, SYNTAX_EXT[""])

    async def get_item_data(self, service, node, item):
        items = [item] if item else []

        mb_data = data_format.deserialise(
            await self.host.bridge.mb_get(
                service, node, 1, items, data_format.serialise({}), self.profile
            )
        )
        item = mb_data["items"][0]

        try:
            content = item["content_xhtml"]
        except KeyError:
            content = item["content"]
            if content:
                content = await self.host.bridge.syntax_convert(
                    content, "text", SYNTAX_XHTML, False, self.profile
                )

        if content and self.current_syntax != SYNTAX_XHTML:
            content = await self.host.bridge.syntax_convert(
                content, SYNTAX_XHTML, self.current_syntax, False, self.profile
            )

        if content and self.current_syntax == SYNTAX_XHTML:
            content = content.strip()
            if not content.startswith("<div>"):
                content = "<div>" + content + "</div>"
            try:
                from lxml import etree
            except ImportError:
                self.disp(_("You need lxml to edit pretty XHTML"))
            else:
                parser = etree.XMLParser(remove_blank_text=True)
                root = etree.fromstring(content, parser)
                content = etree.tostring(root, encoding=str, pretty_print=True)

        return content, item, item["id"]

    async def start(self):
        # if there are user defined extension, we use them
        SYNTAX_EXT.update(
            config.config_get(self.sat_conf, C.CONFIG_SECTION, CONF_SYNTAX_EXT, {})
        )
        self.current_syntax = await self.get_current_syntax()

        (
            self.pubsub_service,
            self.pubsub_node,
            self.pubsub_item,
            content_file_path,
            content_file_obj,
            mb_data,
        ) = await self.get_item_path()

        await self.edit(content_file_path, content_file_obj, mb_data=mb_data)
        self.host.quit()


class Rename(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "rename",
            use_pubsub=True,
            pubsub_flags={C.SINGLE_ITEM},
            help=_("rename an blog item"),
        )

    def add_parser_options(self):
        self.parser.add_argument("new_id", help=_("new item id to use"))

    async def start(self):
        try:
            await self.host.bridge.mb_rename(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.new_id,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't rename item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("Item renamed")
            self.host.quit(C.EXIT_OK)


class Repeat(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "repeat",
            use_pubsub=True,
            pubsub_flags={C.SINGLE_ITEM},
            help=_("repeat (re-publish) a blog item"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            repeat_id = await self.host.bridge.mb_repeat(
                self.args.service,
                self.args.node,
                self.args.item,
                "",
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't repeat item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if repeat_id:
                self.disp(f"Item repeated at ID {str(repeat_id)!r}")
            else:
                self.disp("Item repeated")
            self.host.quit(C.EXIT_OK)


class Preview(base.CommandBase, common.BaseEdit):
    # TODO: need to be rewritten with template output

    def __init__(self, host):
        base.CommandBase.__init__(
            self, host, "preview", use_verbose=True, help=_("preview a blog content")
        )
        common.BaseEdit.__init__(self, self.host, BLOG_TMP_DIR, use_metadata=True)

    def add_parser_options(self):
        self.parser.add_argument(
            "--inotify",
            type=str,
            choices=("auto", "true", "false"),
            default="auto",
            help=_("use inotify to handle preview"),
        )
        self.parser.add_argument(
            "file",
            nargs="?",
            default="current",
            help=_("path to the content file"),
        )

    async def show_preview(self):
        # we implement show_preview here so we don't have to import webbrowser and urllib
        # when preview is not used
        url = "file:{}".format(self.urllib.parse.quote(self.preview_file_path))
        self.webbrowser.open_new_tab(url)

    async def _launch_preview_ext(self, cmd_line, opt_name):
        url = "file:{}".format(self.urllib.parse.quote(self.preview_file_path))
        args = common.parse_args(
            self.host, cmd_line, url=url, preview_file=self.preview_file_path
        )
        if not args:
            self.disp(
                'Couln\'t find command in "{name}", abording'.format(name=opt_name),
                error=True,
            )
            self.host.quit(1)
        subprocess.Popen(args)

    async def open_preview_ext(self):
        await self._launch_preview_ext(self.open_cb_cmd, "blog_preview_open_cmd")

    async def update_preview_ext(self):
        await self._launch_preview_ext(self.update_cb_cmd, "blog_preview_update_cmd")

    async def update_content(self):
        with self.content_file_path.open("rb") as f:
            content = f.read().decode("utf-8-sig")
            if content and self.syntax != SYNTAX_XHTML:
                # we use safe=True because we want to have a preview as close as possible
                # to what the people will see
                content = await self.host.bridge.syntax_convert(
                    content, self.syntax, SYNTAX_XHTML, True, self.profile
                )

        xhtml = (
            f'<html xmlns="http://www.w3.org/1999/xhtml">'
            f'<head><meta http-equiv="Content-Type" content="text/html;charset=utf-8" />'
            f"</head>"
            f"<body>{content}</body>"
            f"</html>"
        )

        with open(self.preview_file_path, "wb") as f:
            f.write(xhtml.encode("utf-8"))

    async def start(self):
        import webbrowser
        import urllib.request, urllib.parse, urllib.error

        self.webbrowser, self.urllib = webbrowser, urllib

        if self.args.inotify != "false":
            try:
                import aionotify

            except ImportError:
                if self.args.inotify == "auto":
                    aionotify = None
                    self.disp(
                        f"aionotify module not found, deactivating feature. You can "
                        f"install it with {AIONOTIFY_INSTALL}"
                    )
                else:
                    self.disp(
                        f"aioinotify not found, can't activate the feature! Please "
                        f"install it with {AIONOTIFY_INSTALL}",
                        error=True,
                    )
                    self.host.quit(1)
        else:
            aionotify = None

        sat_conf = self.sat_conf
        SYNTAX_EXT.update(
            config.config_get(sat_conf, C.CONFIG_SECTION, CONF_SYNTAX_EXT, {})
        )

        try:
            self.open_cb_cmd = config.config_get(
                sat_conf, C.CONFIG_SECTION, "blog_preview_open_cmd", Exception
            )
        except (NoOptionError, NoSectionError):
            self.open_cb_cmd = None
            open_cb = self.show_preview
        else:
            open_cb = self.open_preview_ext

        self.update_cb_cmd = config.config_get(
            sat_conf, C.CONFIG_SECTION, "blog_preview_update_cmd", self.open_cb_cmd
        )
        if self.update_cb_cmd is None:
            update_cb = self.show_preview
        else:
            update_cb = self.update_preview_ext

            # which file do we need to edit?
        if self.args.file == "current":
            self.content_file_path = self.get_current_file(self.profile)
        else:
            try:
                self.content_file_path = Path(self.args.file).resolve(strict=True)
            except FileNotFoundError:
                self.disp(_('File "{file}" doesn\'t exist!').format(file=self.args.file))
                self.host.quit(C.EXIT_NOT_FOUND)

        self.syntax = await guess_syntax_from_path(
            self.host, sat_conf, self.content_file_path
        )

        # at this point the syntax is converted, we can display the preview
        preview_file = tempfile.NamedTemporaryFile(suffix=".xhtml", delete=False)
        self.preview_file_path = preview_file.name
        preview_file.close()
        await self.update_content()

        if aionotify is None:
            # XXX: we don't delete file automatically because browser needs it
            #      (and webbrowser.open can return before it is read)
            self.disp(
                f"temporary file created at {self.preview_file_path}\nthis file will NOT "
                f"BE DELETED AUTOMATICALLY, please delete it yourself when you have "
                f"finished"
            )
            await open_cb()
        else:
            await open_cb()
            watcher = aionotify.Watcher()
            watcher_kwargs = {
                # Watcher don't accept Path so we convert to string
                "path": str(self.content_file_path),
                "alias": "content_file",
                "flags": aionotify.Flags.CLOSE_WRITE
                | aionotify.Flags.DELETE_SELF
                | aionotify.Flags.MOVE_SELF,
            }
            watcher.watch(**watcher_kwargs)

            loop = asyncio.get_event_loop()
            await watcher.setup(loop)

            try:
                while True:
                    event = await watcher.get_event()
                    self.disp("Content updated", 1)
                    if event.flags & (
                        aionotify.Flags.DELETE_SELF | aionotify.Flags.MOVE_SELF
                    ):
                        self.disp(
                            "DELETE/MOVE event catched, changing the watch",
                            2,
                        )
                        try:
                            watcher.unwatch("content_file")
                        except IOError as e:
                            self.disp(
                                f"Can't remove the watch: {e}",
                                2,
                            )
                        watcher = aionotify.Watcher()
                        watcher.watch(**watcher_kwargs)
                        try:
                            await watcher.setup(loop)
                        except OSError:
                            # if the new file is not here yet we can have an error
                            # as a workaround, we do a little rest and try again
                            await asyncio.sleep(1)
                            await watcher.setup(loop)
                    await self.update_content()
                    await update_cb()
            except FileNotFoundError:
                self.disp("The file seems to have been deleted.", error=True)
                self.host.quit(C.EXIT_NOT_FOUND)
            finally:
                os.unlink(self.preview_file_path)
                try:
                    watcher.unwatch("content_file")
                except IOError as e:
                    self.disp(
                        f"Can't remove the watch: {e}",
                        2,
                    )


class Import(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "import",
            use_pubsub=True,
            use_progress=True,
            help=_("import an external blog"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "importer",
            nargs="?",
            help=_("importer name, nothing to display importers list"),
        )
        self.parser.add_argument("--host", help=_("original blog host"))
        self.parser.add_argument(
            "--no-images-upload",
            action="store_true",
            help=_("do *NOT* upload images (default: do upload images)"),
        )
        self.parser.add_argument(
            "--upload-ignore-host",
            help=_("do not upload images from this host (default: upload all images)"),
        )
        self.parser.add_argument(
            "--ignore-tls-errors",
            action="store_true",
            help=_("ignore invalide TLS certificate for uploads"),
        )
        self.parser.add_argument(
            "-o",
            "--option",
            action="append",
            nargs=2,
            default=[],
            metavar=("NAME", "VALUE"),
            help=_("importer specific options (see importer description)"),
        )
        self.parser.add_argument(
            "location",
            nargs="?",
            help=_(
                "importer data location (see importer description), nothing to show "
                "importer description"
            ),
        )

    async def on_progress_started(self, metadata):
        self.disp(_("Blog upload started"), 2)

    async def on_progress_finished(self, metadata):
        self.disp(_("Blog uploaded successfully"), 2)
        redirections = {
            k[len(URL_REDIRECT_PREFIX) :]: v
            for k, v in metadata.items()
            if k.startswith(URL_REDIRECT_PREFIX)
        }
        if redirections:
            conf = "\n".join(
                [
                    "url_redirections_dict = {}".format(
                        # we need to add ' ' before each new line
                        # and to double each '%' for ConfigParser
                        "\n ".join(
                            json.dumps(redirections, indent=1, separators=(",", ": "))
                            .replace("%", "%%")
                            .split("\n")
                        )
                    ),
                ]
            )
            self.disp(
                _(
                    "\nTo redirect old URLs to new ones, put the following lines in your"
                    " sat.conf file, in [libervia] section:\n\n{conf}"
                ).format(conf=conf)
            )

    async def on_progress_error(self, error_msg):
        self.disp(
            _("Error while uploading blog: {error_msg}").format(error_msg=error_msg),
            error=True,
        )

    async def start(self):
        if self.args.location is None:
            for name in ("option", "service", "no_images_upload"):
                if getattr(self.args, name):
                    self.parser.error(
                        _(
                            "{name} argument can't be used without location argument"
                        ).format(name=name)
                    )
            if self.args.importer is None:
                self.disp(
                    "\n".join(
                        [
                            f"{name}: {desc}"
                            for name, desc in await self.host.bridge.blogImportList()
                        ]
                    )
                )
            else:
                try:
                    short_desc, long_desc = await self.host.bridge.blogImportDesc(
                        self.args.importer
                    )
                except Exception as e:
                    msg = [l for l in str(e).split("\n") if l][
                        -1
                    ]  # we only keep the last line
                    self.disp(msg)
                    self.host.quit(1)
                else:
                    self.disp(f"{self.args.importer}: {short_desc}\n\n{long_desc}")
            self.host.quit()
        else:
            # we have a location, an import is requested
            options = {key: value for key, value in self.args.option}
            if self.args.host:
                options["host"] = self.args.host
            if self.args.ignore_tls_errors:
                options["ignore_tls_errors"] = C.BOOL_TRUE
            if self.args.no_images_upload:
                options["upload_images"] = C.BOOL_FALSE
                if self.args.upload_ignore_host:
                    self.parser.error(
                        "upload-ignore-host option can't be used when no-images-upload "
                        "is set"
                    )
            elif self.args.upload_ignore_host:
                options["upload_ignore_host"] = self.args.upload_ignore_host

            try:
                progress_id = await self.host.bridge.blogImport(
                    self.args.importer,
                    self.args.location,
                    options,
                    self.args.service,
                    self.args.node,
                    self.profile,
                )
            except Exception as e:
                self.disp(
                    _("Error while trying to import a blog: {e}").format(e=e),
                    error=True,
                )
                self.host.quit(1)
            else:
                await self.set_progress_id(progress_id)


class AttachmentGet(cmd_pubsub.AttachmentGet):

    def __init__(self, host):
        super().__init__(host)
        self.override_pubsub_flags({C.SERVICE, C.SINGLE_ITEM})

    async def start(self):
        if not self.args.node:
            namespaces = await self.host.bridge.namespaces_get()
            try:
                ns_microblog = namespaces["microblog"]
            except KeyError:
                self.disp("XEP-0277 plugin is not loaded", error=True)
                self.host.quit(C.EXIT_MISSING_FEATURE)
            else:
                self.args.node = ns_microblog
        return await super().start()


class AttachmentSet(cmd_pubsub.AttachmentSet):

    def __init__(self, host):
        super().__init__(host)
        self.override_pubsub_flags({C.SERVICE, C.SINGLE_ITEM})

    async def start(self):
        if not self.args.node:
            namespaces = await self.host.bridge.namespaces_get()
            try:
                ns_microblog = namespaces["microblog"]
            except KeyError:
                self.disp("XEP-0277 plugin is not loaded", error=True)
                self.host.quit(C.EXIT_MISSING_FEATURE)
            else:
                self.args.node = ns_microblog
        return await super().start()


class Attachments(base.CommandBase):
    subcommands = (AttachmentGet, AttachmentSet)

    def __init__(self, host):
        super().__init__(
            host,
            "attachments",
            use_profile=False,
            help=_("set or retrieve blog attachments"),
        )


class Blog(base.CommandBase):
    subcommands = (Set, Get, Edit, Rename, Repeat, Preview, Import, Attachments)

    def __init__(self, host):
        super(Blog, self).__init__(
            host, "blog", use_profile=False, help=_("blog/microblog management")
        )