Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_misc_text_commands.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_text_commands.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_misc_text_commands.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,471 @@ +#!/usr/bin/env python3 + + +# SàT plugin for managing text commands +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +from twisted.python import failure +from libervia.backend.core.i18n import _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core import exceptions +from libervia.backend.core.log import getLogger +from libervia.backend.tools import utils + + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "Text commands", + C.PI_IMPORT_NAME: C.TEXT_CMDS, + C.PI_TYPE: "Misc", + C.PI_PROTOCOLS: ["XEP-0245"], + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "TextCommands", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""IRC like text commands"""), +} + + +class InvalidCommandSyntax(Exception): + """Throwed while parsing @command in docstring if syntax is invalid""" + + pass + + +CMD_KEY = "@command" +CMD_TYPES = ("group", "one2one", "all") +FEEDBACK_INFO_TYPE = "TEXT_CMD" + + +class TextCommands(object): + # FIXME: doc strings for commands have to be translatable + # plugins need a dynamic translation system (translation + # should be downloadable independently) + + HELP_SUGGESTION = _( + "Type '/help' to get a list of the available commands. If you didn't want to " + "use a command, please start your message with '//' to escape the slash." + ) + + def __init__(self, host): + log.info(_("Text commands initialization")) + self.host = host + # this is internal command, so we set high priority + host.trigger.add("sendMessage", self.send_message_trigger, priority=1000000) + self._commands = {} + self._whois = [] + self.register_text_commands(self) + + def _parse_doc_string(self, cmd, cmd_name): + """Parse a docstring to get text command data + + @param cmd: function or method callback for the command, + its docstring will be used for self documentation in the following way: + - first line is the command short documentation, shown with /help + - @command keyword can be used, + see http://wiki.goffi.org/wiki/Coding_style/en for documentation + @return (dict): dictionary with parsed data where key can be: + - "doc_short_help" (default: ""): the untranslated short documentation + - "type" (default "all"): the command type as specified in documentation + - "args" (default: ""): the arguments available, using syntax specified in + documentation. + - "doc_arg_[name]": the doc of [name] argument + """ + data = { + "doc_short_help": "", + "type": "all", + "args": "", + } + docstring = cmd.__doc__ + if docstring is None: + log.warning("No docstring found for command {}".format(cmd_name)) + docstring = "" + + doc_data = docstring.split("\n") + data["doc_short_help"] = doc_data[0] + + try: + cmd_indent = 0 # >0 when @command is found are we are parsing it + + for line in doc_data: + stripped = line.strip() + if cmd_indent and line[cmd_indent : cmd_indent + 5] == " -": + colon_idx = line.find(":") + if colon_idx == -1: + raise InvalidCommandSyntax( + "No colon found in argument description" + ) + arg_name = line[cmd_indent + 6 : colon_idx].strip() + if not arg_name: + raise InvalidCommandSyntax( + "No name found in argument description" + ) + arg_help = line[colon_idx + 1 :].strip() + data["doc_arg_{}".format(arg_name)] = arg_help + elif cmd_indent: + # we are parsing command and indent level is not good, it's finished + break + elif stripped.startswith(CMD_KEY): + cmd_indent = line.find(CMD_KEY) + + # type + colon_idx = stripped.find(":") + if colon_idx == -1: + raise InvalidCommandSyntax("missing colon") + type_data = stripped[len(CMD_KEY) : colon_idx].strip() + if len(type_data) == 0: + type_data = "(all)" + elif ( + len(type_data) <= 2 or type_data[0] != "(" or type_data[-1] != ")" + ): + raise InvalidCommandSyntax("Bad type data syntax") + type_ = type_data[1:-1] + if type_ not in CMD_TYPES: + raise InvalidCommandSyntax("Unknown type {}".format(type_)) + data["type"] = type_ + + # args + data["args"] = stripped[colon_idx + 1 :].strip() + except InvalidCommandSyntax as e: + log.warning( + "Invalid command syntax for command {command}: {message}".format( + command=cmd_name, message=e.message + ) + ) + + return data + + def register_text_commands(self, instance): + """ Add a text command + + @param instance: instance of a class containing text commands + """ + for attr in dir(instance): + if attr.startswith("cmd_"): + cmd = getattr(instance, attr) + if not callable(cmd): + log.warning(_("Skipping not callable [%s] attribute") % attr) + continue + cmd_name = attr[4:] + if not cmd_name: + log.warning(_("Skipping cmd_ method")) + if cmd_name in self._commands: + suff = 2 + while (cmd_name + str(suff)) in self._commands: + suff += 1 + new_name = cmd_name + str(suff) + log.warning( + _( + "Conflict for command [{old_name}], renaming it to [{new_name}]" + ).format(old_name=cmd_name, new_name=new_name) + ) + cmd_name = new_name + self._commands[cmd_name] = cmd_data = {"callback": cmd} + cmd_data.update(self._parse_doc_string(cmd, cmd_name)) + log.info(_("Registered text command [%s]") % cmd_name) + + def add_who_is_cb(self, callback, priority=0): + """Add a callback which give information to the /whois command + + @param callback: a callback which will be called with the following arguments + - whois_msg: list of information strings to display, callback need to append + its own strings to it + - target_jid: full jid from whom we want information + - profile: %(doc_profile)s + @param priority: priority of the information to show (the highest priority will + be displayed first) + """ + self._whois.append((priority, callback)) + self._whois.sort(key=lambda item: item[0], reverse=True) + + def send_message_trigger( + self, client, mess_data, pre_xml_treatments, post_xml_treatments + ): + """Install SendMessage command hook """ + pre_xml_treatments.addCallback(self._send_message_cmd_hook, client) + return True + + def _send_message_cmd_hook(self, mess_data, client): + """ Check text commands in message, and react consequently + + msg starting with / are potential command. If a command is found, it is executed, + else an help message is sent. + msg starting with // are escaped: they are sent with a single / + commands can abord message sending (if they return anything evaluating to False), + or continue it (if they return True), eventually after modifying the message + an "unparsed" key is added to message, containing part of the message not yet + parsed. + Commands can be deferred or not + @param mess_data(dict): data comming from sendMessage trigger + @param profile: %(doc_profile)s + """ + try: + msg = mess_data["message"][""] + msg_lang = "" + except KeyError: + try: + # we have not default message, we try to take the first found + msg_lang, msg = next(iter(mess_data["message"].items())) + except StopIteration: + log.debug("No message found, skipping text commands") + return mess_data + + try: + if msg[:2] == "//": + # we have a double '/', it's the escape sequence + mess_data["message"][msg_lang] = msg[1:] + return mess_data + if msg[0] != "/": + return mess_data + except IndexError: + return mess_data + + # we have a command + d = None + command = msg[1:].partition(" ")[0].lower().strip() + if not command.isidentifier(): + self.feed_back( + client, + _("Invalid command /%s. ") % command + self.HELP_SUGGESTION, + mess_data, + ) + raise failure.Failure(exceptions.CancelError()) + + # looks like an actual command, we try to call the corresponding method + def ret_handling(ret): + """ Handle command return value: + if ret is True, normally send message (possibly modified by command) + else, abord message sending + """ + if ret: + return mess_data + else: + log.debug("text command detected ({})".format(command)) + raise failure.Failure(exceptions.CancelError()) + + def generic_errback(failure): + try: + msg = "with condition {}".format(failure.value.condition) + except AttributeError: + msg = "with error {}".format(failure.value) + self.feed_back(client, "Command failed {}".format(msg), mess_data) + return False + + mess_data["unparsed"] = msg[ + 1 + len(command) : + ] # part not yet parsed of the message + try: + cmd_data = self._commands[command] + except KeyError: + self.feed_back( + client, + _("Unknown command /%s. ") % command + self.HELP_SUGGESTION, + mess_data, + ) + log.debug("text command help message") + raise failure.Failure(exceptions.CancelError()) + else: + if not self._context_valid(mess_data, cmd_data): + # The command is not launched in the right context, we throw a message with help instructions + context_txt = ( + _("group discussions") + if cmd_data["type"] == "group" + else _("one to one discussions") + ) + feedback = _("/{command} command only applies in {context}.").format( + command=command, context=context_txt + ) + self.feed_back( + client, "{} {}".format(feedback, self.HELP_SUGGESTION), mess_data + ) + log.debug("text command invalid message") + raise failure.Failure(exceptions.CancelError()) + else: + d = utils.as_deferred(cmd_data["callback"], client, mess_data) + d.addErrback(generic_errback) + d.addCallback(ret_handling) + + return d + + def _context_valid(self, mess_data, cmd_data): + """Tell if a command can be used in the given context + + @param mess_data(dict): message data as given in sendMessage trigger + @param cmd_data(dict): command data as returned by self._parse_doc_string + @return (bool): True if command can be used in this context + """ + if (cmd_data["type"] == "group" and mess_data["type"] != "groupchat") or ( + cmd_data["type"] == "one2one" and mess_data["type"] == "groupchat" + ): + return False + return True + + def get_room_jid(self, arg, service_jid): + """Return a room jid with a shortcut + + @param arg: argument: can be a full room jid (e.g.: sat@chat.jabberfr.org) + or a shortcut (e.g.: sat or sat@ for sat on current service) + @param service_jid: jid of the current service (e.g.: chat.jabberfr.org) + """ + nb_arobas = arg.count("@") + if nb_arobas == 1: + if arg[-1] != "@": + return jid.JID(arg) + return jid.JID(arg + service_jid) + return jid.JID(f"{arg}@{service_jid}") + + def feed_back(self, client, message, mess_data, info_type=FEEDBACK_INFO_TYPE): + """Give a message back to the user""" + if mess_data["type"] == "groupchat": + to_ = mess_data["to"].userhostJID() + else: + to_ = client.jid + + # we need to invert send message back, so sender need to original destinee + mess_data["from"] = mess_data["to"] + mess_data["to"] = to_ + mess_data["type"] = C.MESS_TYPE_INFO + mess_data["message"] = {"": message} + mess_data["extra"]["info_type"] = info_type + client.message_send_to_bridge(mess_data) + + def cmd_whois(self, client, mess_data): + """show informations on entity + + @command: [JID|ROOM_NICK] + - JID: entity to request + - ROOM_NICK: nick of the room to request + """ + log.debug("Catched whois command") + + entity = mess_data["unparsed"].strip() + + if mess_data["type"] == "groupchat": + room = mess_data["to"].userhostJID() + try: + if self.host.plugins["XEP-0045"].is_nick_in_room(client, room, entity): + entity = "%s/%s" % (room, entity) + except KeyError: + log.warning("plugin XEP-0045 is not present") + + if not entity: + target_jid = mess_data["to"] + else: + try: + target_jid = jid.JID(entity) + if not target_jid.user or not target_jid.host: + raise jid.InvalidFormat + except (RuntimeError, jid.InvalidFormat, AttributeError): + self.feed_back(client, _("Invalid jid, can't whois"), mess_data) + return False + + if not target_jid.resource: + target_jid.resource = self.host.memory.main_resource_get(client, target_jid) + + whois_msg = [_("whois for %(jid)s") % {"jid": target_jid}] + + d = defer.succeed(None) + for __, callback in self._whois: + d.addCallback( + lambda __: callback(client, whois_msg, mess_data, target_jid) + ) + + def feed_back(__): + self.feed_back(client, "\n".join(whois_msg), mess_data) + return False + + d.addCallback(feed_back) + return d + + def _get_args_help(self, cmd_data): + """Return help string for args of cmd_name, according to docstring data + + @param cmd_data: command data + @return (list[unicode]): help strings + """ + strings = [] + for doc_name, doc_help in cmd_data.items(): + if doc_name.startswith("doc_arg_"): + arg_name = doc_name[8:] + strings.append( + "- {name}: {doc_help}".format(name=arg_name, doc_help=_(doc_help)) + ) + + return strings + + def cmd_me(self, client, mess_data): + """display a message at third person + + @command (all): message + - message: message to show at third person + e.g.: "/me clenches his fist" will give "[YOUR_NICK] clenches his fist" + """ + # We just ignore the command as the match is done on receiption by clients + return True + + def cmd_whoami(self, client, mess_data): + """give your own jid""" + self.feed_back(client, client.jid.full(), mess_data) + + def cmd_help(self, client, mess_data): + """show help on available commands + + @command: [cmd_name] + - cmd_name: name of the command for detailed help + """ + cmd_name = mess_data["unparsed"].strip() + if cmd_name and cmd_name[0] == "/": + cmd_name = cmd_name[1:] + if cmd_name and cmd_name not in self._commands: + self.feed_back( + client, _("Invalid command name [{}]\n".format(cmd_name)), mess_data + ) + cmd_name = "" + if not cmd_name: + # we show the global help + longuest = max([len(command) for command in self._commands]) + help_cmds = [] + + for command in sorted(self._commands): + cmd_data = self._commands[command] + if not self._context_valid(mess_data, cmd_data): + continue + spaces = (longuest - len(command)) * " " + help_cmds.append( + " /{command}: {spaces} {short_help}".format( + command=command, + spaces=spaces, + short_help=cmd_data["doc_short_help"], + ) + ) + + help_mess = _("Text commands available:\n%s") % ("\n".join(help_cmds),) + else: + # we show detailled help for a command + cmd_data = self._commands[cmd_name] + syntax = cmd_data["args"] + help_mess = _("/{name}: {short_help}\n{syntax}{args_help}").format( + name=cmd_name, + short_help=cmd_data["doc_short_help"], + syntax=_(" " * 4 + "syntax: {}\n").format(syntax) if syntax else "", + args_help="\n".join( + [" " * 8 + "{}".format(line) for line in self._get_args_help(cmd_data)] + ), + ) + + self.feed_back(client, help_mess, mess_data)