Mercurial > libervia-backend
diff libervia/cli/cmd_blog.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_blog.py@26b7ed2817da |
children | 7df6ba11bdae |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_blog.py Fri Jun 02 14:54:26 2023 +0200 @@ -0,0 +1,1220 @@ +#!/usr/bin/env python3 + + +# Libervia CLI +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import asyncio +from asyncio.subprocess import DEVNULL +from configparser import NoOptionError, NoSectionError +import json +import os +import os.path +from pathlib import Path +import re +import subprocess +import sys +import tempfile +from urllib.parse import urlparse + +from libervia.backend.core.i18n import _ +from libervia.backend.tools import config +from libervia.backend.tools.common import uri +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common.ansi import ANSI as A +from libervia.cli import common +from libervia.cli.constants import Const as C + +from . import base, cmd_pubsub + +__commands__ = ["Blog"] + +SYNTAX_XHTML = "xhtml" +# extensions to use with known syntaxes +SYNTAX_EXT = { + # FIXME: default syntax doesn't sounds needed, there should always be a syntax set + # by the plugin. + "": "txt", # used when the syntax is not found + SYNTAX_XHTML: "xhtml", + "markdown": "md", +} + + +CONF_SYNTAX_EXT = "syntax_ext_dict" +BLOG_TMP_DIR = "blog" +# key to remove from metadata tmp file if they exist +KEY_TO_REMOVE_METADATA = ( + "id", + "content", + "content_xhtml", + "comments_node", + "comments_service", + "updated", +) + +URL_REDIRECT_PREFIX = "url_redirect_" +AIONOTIFY_INSTALL = '"pip install aionotify"' +MB_KEYS = ( + "id", + "url", + "atom_id", + "updated", + "published", + "language", + "comments", # this key is used for all comments* keys + "tags", # this key is used for all tag* keys + "author", + "author_jid", + "author_email", + "author_jid_verified", + "content", + "content_xhtml", + "title", + "title_xhtml", + "extra" +) +OUTPUT_OPT_NO_HEADER = "no-header" +RE_ATTACHMENT_METADATA = re.compile(r"^(?P<key>[a-z_]+)=(?P<value>.*)") +ALLOWER_ATTACH_MD_KEY = ("desc", "media_type", "external") + + +async def guess_syntax_from_path(host, sat_conf, path): + """Return syntax guessed according to filename extension + + @param sat_conf(ConfigParser.ConfigParser): instance opened on sat configuration + @param path(str): path to the content file + @return(unicode): syntax to use + """ + # we first try to guess syntax with extension + ext = os.path.splitext(path)[1][1:] # we get extension without the '.' + if ext: + for k, v in SYNTAX_EXT.items(): + if k and ext == v: + return k + + # if not found, we use current syntax + return await host.bridge.param_get_a("Syntax", "Composition", "value", host.profile) + + +class BlogPublishCommon: + """handle common option for publising commands (Set and Edit)""" + + async def get_current_syntax(self): + """Retrieve current_syntax + + Use default syntax if --syntax has not been used, else check given syntax. + Will set self.default_syntax_used to True if default syntax has been used + """ + if self.args.syntax is None: + self.default_syntax_used = True + return await self.host.bridge.param_get_a( + "Syntax", "Composition", "value", self.profile + ) + else: + self.default_syntax_used = False + try: + syntax = await self.host.bridge.syntax_get(self.args.syntax) + self.current_syntax = self.args.syntax = syntax + except Exception as e: + if e.classname == "NotFound": + self.parser.error( + _("unknown syntax requested ({syntax})").format( + syntax=self.args.syntax + ) + ) + else: + raise e + return self.args.syntax + + def add_parser_options(self): + self.parser.add_argument("-T", "--title", help=_("title of the item")) + self.parser.add_argument( + "-t", + "--tag", + action="append", + help=_("tag (category) of your item"), + ) + self.parser.add_argument( + "-l", + "--language", + help=_("language of the item (ISO 639 code)"), + ) + + self.parser.add_argument( + "-a", + "--attachment", + dest="attachments", + nargs="+", + help=_( + "attachment in the form URL [metadata_name=value]" + ) + ) + + comments_group = self.parser.add_mutually_exclusive_group() + comments_group.add_argument( + "-C", + "--comments", + action="store_const", + const=True, + dest="comments", + help=_( + "enable comments (default: comments not enabled except if they " + "already exist)" + ), + ) + comments_group.add_argument( + "--no-comments", + action="store_const", + const=False, + dest="comments", + help=_("disable comments (will remove comments node if it exist)"), + ) + + self.parser.add_argument( + "-S", + "--syntax", + help=_("syntax to use (default: get profile's default syntax)"), + ) + self.parser.add_argument( + "-e", + "--encrypt", + action="store_true", + help=_("end-to-end encrypt the blog post") + ) + self.parser.add_argument( + "--encrypt-for", + metavar="JID", + action="append", + help=_("encrypt a single item for") + ) + self.parser.add_argument( + "-X", + "--sign", + action="store_true", + help=_("cryptographically sign the blog post") + ) + + async def set_mb_data_content(self, content, mb_data): + if self.default_syntax_used: + # default syntax has been used + mb_data["content_rich"] = content + elif self.current_syntax == SYNTAX_XHTML: + mb_data["content_xhtml"] = content + else: + mb_data["content_xhtml"] = await self.host.bridge.syntax_convert( + content, self.current_syntax, SYNTAX_XHTML, False, self.profile + ) + + def handle_attachments(self, mb_data: dict) -> None: + """Check, validate and add attachments to mb_data""" + if self.args.attachments: + attachments = [] + attachment = {} + for arg in self.args.attachments: + m = RE_ATTACHMENT_METADATA.match(arg) + if m is None: + # we should have an URL + url_parsed = urlparse(arg) + if url_parsed.scheme not in ("http", "https"): + self.parser.error( + "invalid URL in --attachment (only http(s) scheme is " + f" accepted): {arg}" + ) + if attachment: + # if we hae a new URL, we have a new attachment + attachments.append(attachment) + attachment = {} + attachment["url"] = arg + else: + # we should have a metadata + if "url" not in attachment: + self.parser.error( + "you must to specify an URL before any metadata in " + "--attachment" + ) + key = m.group("key") + if key not in ALLOWER_ATTACH_MD_KEY: + self.parser.error( + f"invalid metadata key in --attachment: {key!r}" + ) + value = m.group("value").strip() + if key == "external": + if not value: + value=True + else: + value = C.bool(value) + attachment[key] = value + if attachment: + attachments.append(attachment) + if attachments: + mb_data.setdefault("extra", {})["attachments"] = attachments + + def set_mb_data_from_args(self, mb_data): + """set microblog metadata according to command line options + + if metadata already exist, it will be overwritten + """ + if self.args.comments is not None: + mb_data["allow_comments"] = self.args.comments + if self.args.tag: + mb_data["tags"] = self.args.tag + if self.args.title is not None: + mb_data["title"] = self.args.title + if self.args.language is not None: + mb_data["language"] = self.args.language + if self.args.encrypt: + mb_data["encrypted"] = True + if self.args.sign: + mb_data["signed"] = True + if self.args.encrypt_for: + mb_data["encrypted_for"] = {"targets": self.args.encrypt_for} + self.handle_attachments(mb_data) + + +class Set(base.CommandBase, BlogPublishCommon): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + help=_("publish a new blog item or update an existing one"), + ) + BlogPublishCommon.__init__(self) + + def add_parser_options(self): + BlogPublishCommon.add_parser_options(self) + + async def start(self): + self.current_syntax = await self.get_current_syntax() + self.pubsub_item = self.args.item + mb_data = {} + self.set_mb_data_from_args(mb_data) + if self.pubsub_item: + mb_data["id"] = self.pubsub_item + content = sys.stdin.read() + await self.set_mb_data_content(content, mb_data) + + try: + item_id = await self.host.bridge.mb_send( + self.args.service, + self.args.node, + data_format.serialise(mb_data), + self.profile, + ) + except Exception as e: + self.disp(f"can't send item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(f"Item published with ID {item_id}") + self.host.quit(C.EXIT_OK) + + +class Get(base.CommandBase): + TEMPLATE = "blog/articles.html" + + def __init__(self, host): + extra_outputs = {"default": self.default_output, "fancy": self.fancy_output} + base.CommandBase.__init__( + self, + host, + "get", + use_verbose=True, + use_pubsub=True, + pubsub_flags={C.MULTI_ITEMS, C.CACHE}, + use_output=C.OUTPUT_COMPLEX, + extra_outputs=extra_outputs, + help=_("get blog item(s)"), + ) + + def add_parser_options(self): + # TODO: a key(s) argument to select keys to display + self.parser.add_argument( + "-k", + "--key", + action="append", + dest="keys", + help=_("microblog data key(s) to display (default: depend of verbosity)"), + ) + # TODO: add MAM filters + + def template_data_mapping(self, data): + items, blog_items = data + blog_items["items"] = items + return {"blog_items": blog_items} + + def format_comments(self, item, keys): + lines = [] + for data in item.get("comments", []): + lines.append(data["uri"]) + for k in ("node", "service"): + if OUTPUT_OPT_NO_HEADER in self.args.output_opts: + header = "" + else: + header = f"{C.A_HEADER}comments_{k}: {A.RESET}" + lines.append(header + data[k]) + return "\n".join(lines) + + def format_tags(self, item, keys): + tags = item.pop("tags", []) + return ", ".join(tags) + + def format_updated(self, item, keys): + return common.format_time(item["updated"]) + + def format_published(self, item, keys): + return common.format_time(item["published"]) + + def format_url(self, item, keys): + return uri.build_xmpp_uri( + "pubsub", + subtype="microblog", + path=self.metadata["service"], + node=self.metadata["node"], + item=item["id"], + ) + + def get_keys(self): + """return keys to display according to verbosity or explicit key request""" + verbosity = self.args.verbose + if self.args.keys: + if not set(MB_KEYS).issuperset(self.args.keys): + self.disp( + "following keys are invalid: {invalid}.\n" + "Valid keys are: {valid}.".format( + invalid=", ".join(set(self.args.keys).difference(MB_KEYS)), + valid=", ".join(sorted(MB_KEYS)), + ), + error=True, + ) + self.host.quit(C.EXIT_BAD_ARG) + return self.args.keys + else: + if verbosity == 0: + return ("title", "content") + elif verbosity == 1: + return ( + "title", + "tags", + "author", + "author_jid", + "author_email", + "author_jid_verified", + "published", + "updated", + "content", + ) + else: + return MB_KEYS + + def default_output(self, data): + """simple key/value output""" + items, self.metadata = data + keys = self.get_keys() + + # k_cb use format_[key] methods for complex formattings + k_cb = {} + for k in keys: + try: + callback = getattr(self, "format_" + k) + except AttributeError: + pass + else: + k_cb[k] = callback + for idx, item in enumerate(items): + for k in keys: + if k not in item and k not in k_cb: + continue + if OUTPUT_OPT_NO_HEADER in self.args.output_opts: + header = "" + else: + header = "{k_fmt}{key}:{k_fmt_e} {sep}".format( + k_fmt=C.A_HEADER, + key=k, + k_fmt_e=A.RESET, + sep="\n" if "content" in k else "", + ) + value = k_cb[k](item, keys) if k in k_cb else item[k] + if isinstance(value, bool): + value = str(value).lower() + elif isinstance(value, dict): + value = repr(value) + self.disp(header + (value or "")) + # we want a separation line after each item but the last one + if idx < len(items) - 1: + print("") + + def fancy_output(self, data): + """display blog is a nice to read way + + this output doesn't use keys filter + """ + # thanks to http://stackoverflow.com/a/943921 + rows, columns = list(map(int, os.popen("stty size", "r").read().split())) + items, metadata = data + verbosity = self.args.verbose + sep = A.color(A.FG_BLUE, columns * "▬") + if items: + print(("\n" + sep + "\n")) + + for idx, item in enumerate(items): + title = item.get("title") + if verbosity > 0: + author = item["author"] + published, updated = item["published"], item.get("updated") + else: + author = published = updated = None + if verbosity > 1: + tags = item.pop("tags", []) + else: + tags = None + content = item.get("content") + + if title: + print((A.color(A.BOLD, A.FG_CYAN, item["title"]))) + meta = [] + if author: + meta.append(A.color(A.FG_YELLOW, author)) + if published: + meta.append(A.color(A.FG_YELLOW, "on ", common.format_time(published))) + if updated != published: + meta.append( + A.color(A.FG_YELLOW, "(updated on ", common.format_time(updated), ")") + ) + print((" ".join(meta))) + if tags: + print((A.color(A.FG_MAGENTA, ", ".join(tags)))) + if (title or tags) and content: + print("") + if content: + self.disp(content) + + print(("\n" + sep + "\n")) + + async def start(self): + try: + mb_data = data_format.deserialise( + await self.host.bridge.mb_get( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.get_pubsub_extra(), + self.profile, + ) + ) + except Exception as e: + self.disp(f"can't get blog items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + items = mb_data.pop("items") + await self.output((items, mb_data)) + self.host.quit(C.EXIT_OK) + + +class Edit(base.CommandBase, BlogPublishCommon, common.BaseEdit): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "edit", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + use_draft=True, + use_verbose=True, + help=_("edit an existing or new blog post"), + ) + BlogPublishCommon.__init__(self) + common.BaseEdit.__init__(self, self.host, BLOG_TMP_DIR, use_metadata=True) + + def add_parser_options(self): + BlogPublishCommon.add_parser_options(self) + self.parser.add_argument( + "-P", + "--preview", + action="store_true", + help=_("launch a blog preview in parallel"), + ) + self.parser.add_argument( + "--no-publish", + action="store_true", + help=_('add "publish: False" to metadata'), + ) + + def build_metadata_file(self, content_file_path, mb_data=None): + """Build a metadata file using json + + The file is named after content_file_path, with extension replaced by + _metadata.json + @param content_file_path(str): path to the temporary file which will contain the + body + @param mb_data(dict, None): microblog metadata (for existing items) + @return (tuple[dict, Path]): merged metadata put originaly in metadata file + and path to temporary metadata file + """ + # we first construct metadata from edited item ones and CLI argumments + # or re-use the existing one if it exists + meta_file_path = content_file_path.with_name( + content_file_path.stem + common.METADATA_SUFF + ) + if meta_file_path.exists(): + self.disp("Metadata file already exists, we re-use it") + try: + with meta_file_path.open("rb") as f: + mb_data = json.load(f) + except (OSError, IOError, ValueError) as e: + self.disp( + f"Can't read existing metadata file at {meta_file_path}, " + f"aborting: {e}", + error=True, + ) + self.host.quit(1) + else: + mb_data = {} if mb_data is None else mb_data.copy() + + # in all cases, we want to remove unwanted keys + for key in KEY_TO_REMOVE_METADATA: + try: + del mb_data[key] + except KeyError: + pass + # and override metadata with command-line arguments + self.set_mb_data_from_args(mb_data) + + if self.args.no_publish: + mb_data["publish"] = False + + # then we create the file and write metadata there, as JSON dict + # XXX: if we port libervia-cli one day on Windows, O_BINARY may need to be + # added here + with os.fdopen( + os.open(meta_file_path, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600), "w+b" + ) as f: + # we need to use an intermediate unicode buffer to write to the file + # unicode without escaping characters + unicode_dump = json.dumps( + mb_data, + ensure_ascii=False, + indent=4, + separators=(",", ": "), + sort_keys=True, + ) + f.write(unicode_dump.encode("utf-8")) + + return mb_data, meta_file_path + + async def edit(self, content_file_path, content_file_obj, mb_data=None): + """Edit the file contening the content using editor, and publish it""" + # we first create metadata file + meta_ori, meta_file_path = self.build_metadata_file(content_file_path, mb_data) + + coroutines = [] + + # do we need a preview ? + if self.args.preview: + self.disp("Preview requested, launching it", 1) + # we redirect outputs to /dev/null to avoid console pollution in editor + # if user wants to see messages, (s)he can call "blog preview" directly + coroutines.append( + asyncio.create_subprocess_exec( + sys.argv[0], + "blog", + "preview", + "--inotify", + "true", + "-p", + self.profile, + str(content_file_path), + stdout=DEVNULL, + stderr=DEVNULL, + ) + ) + + # we launch editor + coroutines.append( + self.run_editor( + "blog_editor_args", + content_file_path, + content_file_obj, + meta_file_path=meta_file_path, + meta_ori=meta_ori, + ) + ) + + await asyncio.gather(*coroutines) + + async def publish(self, content, mb_data): + await self.set_mb_data_content(content, mb_data) + + if self.pubsub_item: + mb_data["id"] = self.pubsub_item + + mb_data = data_format.serialise(mb_data) + + await self.host.bridge.mb_send( + self.pubsub_service, self.pubsub_node, mb_data, self.profile + ) + self.disp("Blog item published") + + def get_tmp_suff(self): + # we get current syntax to determine file extension + return SYNTAX_EXT.get(self.current_syntax, SYNTAX_EXT[""]) + + async def get_item_data(self, service, node, item): + items = [item] if item else [] + + mb_data = data_format.deserialise( + await self.host.bridge.mb_get( + service, node, 1, items, data_format.serialise({}), self.profile + ) + ) + item = mb_data["items"][0] + + try: + content = item["content_xhtml"] + except KeyError: + content = item["content"] + if content: + content = await self.host.bridge.syntax_convert( + content, "text", SYNTAX_XHTML, False, self.profile + ) + + if content and self.current_syntax != SYNTAX_XHTML: + content = await self.host.bridge.syntax_convert( + content, SYNTAX_XHTML, self.current_syntax, False, self.profile + ) + + if content and self.current_syntax == SYNTAX_XHTML: + content = content.strip() + if not content.startswith("<div>"): + content = "<div>" + content + "</div>" + try: + from lxml import etree + except ImportError: + self.disp(_("You need lxml to edit pretty XHTML")) + else: + parser = etree.XMLParser(remove_blank_text=True) + root = etree.fromstring(content, parser) + content = etree.tostring(root, encoding=str, pretty_print=True) + + return content, item, item["id"] + + async def start(self): + # if there are user defined extension, we use them + SYNTAX_EXT.update( + config.config_get(self.sat_conf, C.CONFIG_SECTION, CONF_SYNTAX_EXT, {}) + ) + self.current_syntax = await self.get_current_syntax() + + ( + self.pubsub_service, + self.pubsub_node, + self.pubsub_item, + content_file_path, + content_file_obj, + mb_data, + ) = await self.get_item_path() + + await self.edit(content_file_path, content_file_obj, mb_data=mb_data) + self.host.quit() + + +class Rename(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "rename", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + help=_("rename an blog item"), + ) + + def add_parser_options(self): + self.parser.add_argument("new_id", help=_("new item id to use")) + + async def start(self): + try: + await self.host.bridge.mb_rename( + self.args.service, + self.args.node, + self.args.item, + self.args.new_id, + self.profile, + ) + except Exception as e: + self.disp(f"can't rename item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("Item renamed") + self.host.quit(C.EXIT_OK) + + +class Repeat(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "repeat", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + help=_("repeat (re-publish) a blog item"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + repeat_id = await self.host.bridge.mb_repeat( + self.args.service, + self.args.node, + self.args.item, + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't repeat item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if repeat_id: + self.disp(f"Item repeated at ID {str(repeat_id)!r}") + else: + self.disp("Item repeated") + self.host.quit(C.EXIT_OK) + + +class Preview(base.CommandBase, common.BaseEdit): + # TODO: need to be rewritten with template output + + def __init__(self, host): + base.CommandBase.__init__( + self, host, "preview", use_verbose=True, help=_("preview a blog content") + ) + common.BaseEdit.__init__(self, self.host, BLOG_TMP_DIR, use_metadata=True) + + def add_parser_options(self): + self.parser.add_argument( + "--inotify", + type=str, + choices=("auto", "true", "false"), + default="auto", + help=_("use inotify to handle preview"), + ) + self.parser.add_argument( + "file", + nargs="?", + default="current", + help=_("path to the content file"), + ) + + async def show_preview(self): + # we implement show_preview here so we don't have to import webbrowser and urllib + # when preview is not used + url = "file:{}".format(self.urllib.parse.quote(self.preview_file_path)) + self.webbrowser.open_new_tab(url) + + async def _launch_preview_ext(self, cmd_line, opt_name): + url = "file:{}".format(self.urllib.parse.quote(self.preview_file_path)) + args = common.parse_args( + self.host, cmd_line, url=url, preview_file=self.preview_file_path + ) + if not args: + self.disp( + 'Couln\'t find command in "{name}", abording'.format(name=opt_name), + error=True, + ) + self.host.quit(1) + subprocess.Popen(args) + + async def open_preview_ext(self): + await self._launch_preview_ext(self.open_cb_cmd, "blog_preview_open_cmd") + + async def update_preview_ext(self): + await self._launch_preview_ext(self.update_cb_cmd, "blog_preview_update_cmd") + + async def update_content(self): + with self.content_file_path.open("rb") as f: + content = f.read().decode("utf-8-sig") + if content and self.syntax != SYNTAX_XHTML: + # we use safe=True because we want to have a preview as close as possible + # to what the people will see + content = await self.host.bridge.syntax_convert( + content, self.syntax, SYNTAX_XHTML, True, self.profile + ) + + xhtml = ( + f'<html xmlns="http://www.w3.org/1999/xhtml">' + f'<head><meta http-equiv="Content-Type" content="text/html;charset=utf-8" />' + f"</head>" + f"<body>{content}</body>" + f"</html>" + ) + + with open(self.preview_file_path, "wb") as f: + f.write(xhtml.encode("utf-8")) + + async def start(self): + import webbrowser + import urllib.request, urllib.parse, urllib.error + + self.webbrowser, self.urllib = webbrowser, urllib + + if self.args.inotify != "false": + try: + import aionotify + + except ImportError: + if self.args.inotify == "auto": + aionotify = None + self.disp( + f"aionotify module not found, deactivating feature. You can " + f"install it with {AIONOTIFY_INSTALL}" + ) + else: + self.disp( + f"aioinotify not found, can't activate the feature! Please " + f"install it with {AIONOTIFY_INSTALL}", + error=True, + ) + self.host.quit(1) + else: + aionotify = None + + sat_conf = self.sat_conf + SYNTAX_EXT.update( + config.config_get(sat_conf, C.CONFIG_SECTION, CONF_SYNTAX_EXT, {}) + ) + + try: + self.open_cb_cmd = config.config_get( + sat_conf, C.CONFIG_SECTION, "blog_preview_open_cmd", Exception + ) + except (NoOptionError, NoSectionError): + self.open_cb_cmd = None + open_cb = self.show_preview + else: + open_cb = self.open_preview_ext + + self.update_cb_cmd = config.config_get( + sat_conf, C.CONFIG_SECTION, "blog_preview_update_cmd", self.open_cb_cmd + ) + if self.update_cb_cmd is None: + update_cb = self.show_preview + else: + update_cb = self.update_preview_ext + + # which file do we need to edit? + if self.args.file == "current": + self.content_file_path = self.get_current_file(self.profile) + else: + try: + self.content_file_path = Path(self.args.file).resolve(strict=True) + except FileNotFoundError: + self.disp(_('File "{file}" doesn\'t exist!').format(file=self.args.file)) + self.host.quit(C.EXIT_NOT_FOUND) + + self.syntax = await guess_syntax_from_path( + self.host, sat_conf, self.content_file_path + ) + + # at this point the syntax is converted, we can display the preview + preview_file = tempfile.NamedTemporaryFile(suffix=".xhtml", delete=False) + self.preview_file_path = preview_file.name + preview_file.close() + await self.update_content() + + if aionotify is None: + # XXX: we don't delete file automatically because browser needs it + # (and webbrowser.open can return before it is read) + self.disp( + f"temporary file created at {self.preview_file_path}\nthis file will NOT " + f"BE DELETED AUTOMATICALLY, please delete it yourself when you have " + f"finished" + ) + await open_cb() + else: + await open_cb() + watcher = aionotify.Watcher() + watcher_kwargs = { + # Watcher don't accept Path so we convert to string + "path": str(self.content_file_path), + "alias": "content_file", + "flags": aionotify.Flags.CLOSE_WRITE + | aionotify.Flags.DELETE_SELF + | aionotify.Flags.MOVE_SELF, + } + watcher.watch(**watcher_kwargs) + + loop = asyncio.get_event_loop() + await watcher.setup(loop) + + try: + while True: + event = await watcher.get_event() + self.disp("Content updated", 1) + if event.flags & ( + aionotify.Flags.DELETE_SELF | aionotify.Flags.MOVE_SELF + ): + self.disp( + "DELETE/MOVE event catched, changing the watch", + 2, + ) + try: + watcher.unwatch("content_file") + except IOError as e: + self.disp( + f"Can't remove the watch: {e}", + 2, + ) + watcher = aionotify.Watcher() + watcher.watch(**watcher_kwargs) + try: + await watcher.setup(loop) + except OSError: + # if the new file is not here yet we can have an error + # as a workaround, we do a little rest and try again + await asyncio.sleep(1) + await watcher.setup(loop) + await self.update_content() + await update_cb() + except FileNotFoundError: + self.disp("The file seems to have been deleted.", error=True) + self.host.quit(C.EXIT_NOT_FOUND) + finally: + os.unlink(self.preview_file_path) + try: + watcher.unwatch("content_file") + except IOError as e: + self.disp( + f"Can't remove the watch: {e}", + 2, + ) + + +class Import(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "import", + use_pubsub=True, + use_progress=True, + help=_("import an external blog"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "importer", + nargs="?", + help=_("importer name, nothing to display importers list"), + ) + self.parser.add_argument("--host", help=_("original blog host")) + self.parser.add_argument( + "--no-images-upload", + action="store_true", + help=_("do *NOT* upload images (default: do upload images)"), + ) + self.parser.add_argument( + "--upload-ignore-host", + help=_("do not upload images from this host (default: upload all images)"), + ) + self.parser.add_argument( + "--ignore-tls-errors", + action="store_true", + help=_("ignore invalide TLS certificate for uploads"), + ) + self.parser.add_argument( + "-o", + "--option", + action="append", + nargs=2, + default=[], + metavar=("NAME", "VALUE"), + help=_("importer specific options (see importer description)"), + ) + self.parser.add_argument( + "location", + nargs="?", + help=_( + "importer data location (see importer description), nothing to show " + "importer description" + ), + ) + + async def on_progress_started(self, metadata): + self.disp(_("Blog upload started"), 2) + + async def on_progress_finished(self, metadata): + self.disp(_("Blog uploaded successfully"), 2) + redirections = { + k[len(URL_REDIRECT_PREFIX) :]: v + for k, v in metadata.items() + if k.startswith(URL_REDIRECT_PREFIX) + } + if redirections: + conf = "\n".join( + [ + "url_redirections_dict = {}".format( + # we need to add ' ' before each new line + # and to double each '%' for ConfigParser + "\n ".join( + json.dumps(redirections, indent=1, separators=(",", ": ")) + .replace("%", "%%") + .split("\n") + ) + ), + ] + ) + self.disp( + _( + "\nTo redirect old URLs to new ones, put the following lines in your" + " sat.conf file, in [libervia] section:\n\n{conf}" + ).format(conf=conf) + ) + + async def on_progress_error(self, error_msg): + self.disp( + _("Error while uploading blog: {error_msg}").format(error_msg=error_msg), + error=True, + ) + + async def start(self): + if self.args.location is None: + for name in ("option", "service", "no_images_upload"): + if getattr(self.args, name): + self.parser.error( + _( + "{name} argument can't be used without location argument" + ).format(name=name) + ) + if self.args.importer is None: + self.disp( + "\n".join( + [ + f"{name}: {desc}" + for name, desc in await self.host.bridge.blogImportList() + ] + ) + ) + else: + try: + short_desc, long_desc = await self.host.bridge.blogImportDesc( + self.args.importer + ) + except Exception as e: + msg = [l for l in str(e).split("\n") if l][ + -1 + ] # we only keep the last line + self.disp(msg) + self.host.quit(1) + else: + self.disp(f"{self.args.importer}: {short_desc}\n\n{long_desc}") + self.host.quit() + else: + # we have a location, an import is requested + options = {key: value for key, value in self.args.option} + if self.args.host: + options["host"] = self.args.host + if self.args.ignore_tls_errors: + options["ignore_tls_errors"] = C.BOOL_TRUE + if self.args.no_images_upload: + options["upload_images"] = C.BOOL_FALSE + if self.args.upload_ignore_host: + self.parser.error( + "upload-ignore-host option can't be used when no-images-upload " + "is set" + ) + elif self.args.upload_ignore_host: + options["upload_ignore_host"] = self.args.upload_ignore_host + + try: + progress_id = await self.host.bridge.blogImport( + self.args.importer, + self.args.location, + options, + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp( + _("Error while trying to import a blog: {e}").format(e=e), + error=True, + ) + self.host.quit(1) + else: + await self.set_progress_id(progress_id) + + +class AttachmentGet(cmd_pubsub.AttachmentGet): + + def __init__(self, host): + super().__init__(host) + self.override_pubsub_flags({C.SERVICE, C.SINGLE_ITEM}) + + + async def start(self): + if not self.args.node: + namespaces = await self.host.bridge.namespaces_get() + try: + ns_microblog = namespaces["microblog"] + except KeyError: + self.disp("XEP-0277 plugin is not loaded", error=True) + self.host.quit(C.EXIT_MISSING_FEATURE) + else: + self.args.node = ns_microblog + return await super().start() + + +class AttachmentSet(cmd_pubsub.AttachmentSet): + + def __init__(self, host): + super().__init__(host) + self.override_pubsub_flags({C.SERVICE, C.SINGLE_ITEM}) + + async def start(self): + if not self.args.node: + namespaces = await self.host.bridge.namespaces_get() + try: + ns_microblog = namespaces["microblog"] + except KeyError: + self.disp("XEP-0277 plugin is not loaded", error=True) + self.host.quit(C.EXIT_MISSING_FEATURE) + else: + self.args.node = ns_microblog + return await super().start() + + +class Attachments(base.CommandBase): + subcommands = (AttachmentGet, AttachmentSet) + + def __init__(self, host): + super().__init__( + host, + "attachments", + use_profile=False, + help=_("set or retrieve blog attachments"), + ) + + +class Blog(base.CommandBase): + subcommands = (Set, Get, Edit, Rename, Repeat, Preview, Import, Attachments) + + def __init__(self, host): + super(Blog, self).__init__( + host, "blog", use_profile=False, help=_("blog/microblog management") + )