Mercurial > libervia-backend
diff sat/plugins/plugin_misc_text_commands.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/plugins/plugin_misc_text_commands.py@0046283a285d |
children | 56f94936df1e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_misc_text_commands.py Mon Apr 02 19:44:50 2018 +0200 @@ -0,0 +1,408 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SàT plugin for managing text commands +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core import exceptions +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +from sat.core.log import getLogger +log = getLogger(__name__) +from twisted.python import failure +from collections import OrderedDict + +PLUGIN_INFO = { + C.PI_NAME: "Text commands", + C.PI_IMPORT_NAME: C.TEXT_CMDS, + C.PI_TYPE: "Misc", + C.PI_PROTOCOLS: ["XEP-0245"], + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "TextCommands", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""IRC like text commands""") +} + + +class InvalidCommandSyntax(Exception): + """Throwed while parsing @command in docstring if syntax is invalid""" + pass + + +CMD_KEY = "@command" +CMD_TYPES = ('group', 'one2one', 'all') +FEEDBACK_INFO_TYPE = "TEXT_CMD" + + +class TextCommands(object): + #FIXME: doc strings for commands have to be translatable + # plugins need a dynamic translation system (translation + # should be downloadable independently) + + HELP_SUGGESTION = _("Type '/help' to get a list of the available commands. If you didn't want to use a command, please start your message with '//' to escape the slash.") + + def __init__(self, host): + log.info(_("Text commands initialization")) + self.host = host + # this is internal command, so we set high priority + host.trigger.add("sendMessage", self.sendMessageTrigger, priority=1000000) + self._commands = {} + self._whois = [] + self.registerTextCommands(self) + + def _parseDocString(self, cmd, cmd_name): + """Parse a docstring to get text command data + + @param cmd: function or method callback for the command, + its docstring will be used for self documentation in the following way: + - first line is the command short documentation, shown with /help + - @command keyword can be used, see http://wiki.goffi.org/wiki/Coding_style/en for documentation + @return (dict): dictionary with parsed data where key can be: + - "doc_short_help" (default: ""): the untranslated short documentation + - "type" (default "all"): the command type as specified in documentation + - "args" (default: ""): the arguments available, using syntax specified in documentation. + - "doc_arg_[name]": the doc of [name] argument + """ + data = OrderedDict([('doc_short_help', ""), + ('type', 'all'), + ('args', '')]) + docstring = cmd.__doc__ + if docstring is None: + log.warning(u"Not docstring found for command {}".format(cmd_name)) + docstring = "" + + doc_data = docstring.split("\n") + data["doc_short_help"] = doc_data[0] + + try: + cmd_indent = 0 # >0 when @command is found are we are parsing it + + for line in doc_data: + stripped = line.strip() + if cmd_indent and line[cmd_indent:cmd_indent+5] == " -": + colon_idx = line.find(":") + if colon_idx == -1: + raise InvalidCommandSyntax("No colon found in argument description") + arg_name = line[cmd_indent+6:colon_idx].strip() + if not arg_name: + raise InvalidCommandSyntax("No name found in argument description") + arg_help = line[colon_idx+1:].strip() + data["doc_arg_{}".format(arg_name)] = arg_help + elif cmd_indent: + # we are parsing command and indent level is not good, it's finished + break + elif stripped.startswith(CMD_KEY): + cmd_indent = line.find(CMD_KEY) + + # type + colon_idx = stripped.find(":") + if colon_idx == -1: + raise InvalidCommandSyntax("missing colon") + type_data = stripped[len(CMD_KEY):colon_idx].strip() + if len(type_data) == 0: + type_data="(all)" + elif len(type_data) <= 2 or type_data[0] != '(' or type_data[-1] != ')': + raise InvalidCommandSyntax("Bad type data syntax") + type_ = type_data[1:-1] + if type_ not in CMD_TYPES: + raise InvalidCommandSyntax("Unknown type {}".format(type_)) + data["type"] = type_ + + #args + data["args"] = stripped[colon_idx+1:].strip() + except InvalidCommandSyntax as e: + log.warning(u"Invalid command syntax for command {command}: {message}".format(command=cmd_name, message=e.message)) + + return data + + + def registerTextCommands(self, instance): + """ Add a text command + + @param instance: instance of a class containing text commands + """ + for attr in dir(instance): + if attr.startswith('cmd_'): + cmd = getattr(instance, attr) + if not callable(cmd): + log.warning(_(u"Skipping not callable [%s] attribute") % attr) + continue + cmd_name = attr[4:] + if not cmd_name: + log.warning(_("Skipping cmd_ method")) + if cmd_name in self._commands: + suff=2 + while (cmd_name + str(suff)) in self._commands: + suff+=1 + new_name = cmd_name + str(suff) + log.warning(_(u"Conflict for command [{old_name}], renaming it to [{new_name}]").format(old_name=cmd_name, new_name=new_name)) + cmd_name = new_name + self._commands[cmd_name] = cmd_data = OrderedDict({'callback':cmd}) # We use an Ordered dict to keep documenation order + cmd_data.update(self._parseDocString(cmd, cmd_name)) + log.info(_("Registered text command [%s]") % cmd_name) + + def addWhoIsCb(self, callback, priority=0): + """Add a callback which give information to the /whois command + + @param callback: a callback which will be called with the following arguments + - whois_msg: list of information strings to display, callback need to append its own strings to it + - target_jid: full jid from whom we want information + - profile: %(doc_profile)s + @param priority: priority of the information to show (the highest priority will be displayed first) + """ + self._whois.append((priority, callback)) + self._whois.sort(key=lambda item: item[0], reverse=True) + + def sendMessageTrigger(self, client, mess_data, pre_xml_treatments, post_xml_treatments): + """Install SendMessage command hook """ + pre_xml_treatments.addCallback(self._sendMessageCmdHook, client) + return True + + def _sendMessageCmdHook(self, mess_data, client): + """ Check text commands in message, and react consequently + + msg starting with / are potential command. If a command is found, it is executed, else and help message is sent + msg starting with // are escaped: they are sent with a single / + commands can abord message sending (if they return anything evaluating to False), or continue it (if they return True), eventually after modifying the message + an "unparsed" key is added to message, containing part of the message not yet parsed + commands can be deferred or not + @param mess_data(dict): data comming from sendMessage trigger + @param profile: %(doc_profile)s + """ + try: + msg = mess_data["message"][''] + msg_lang = '' + except KeyError: + try: + # we have not default message, we try to take the first found + msg_lang, msg = mess_data["message"].iteritems().next() + except StopIteration: + log.debug(u"No message found, skipping text commands") + return mess_data + + try: + if msg[:2] == '//': + # we have a double '/', it's the escape sequence + mess_data["message"][msg_lang] = msg[1:] + return mess_data + if msg[0] != '/': + return mess_data + except IndexError: + return mess_data + + # we have a command + d = None + command = msg[1:].partition(' ')[0].lower() + if command.isalpha(): + # looks like an actual command, we try to call the corresponding method + def retHandling(ret): + """ Handle command return value: + if ret is True, normally send message (possibly modified by command) + else, abord message sending + """ + if ret: + return mess_data + else: + log.debug(u"text command detected ({})".format(command)) + raise failure.Failure(exceptions.CancelError()) + + def genericErrback(failure): + try: + msg = u"with condition {}".format(failure.value.condition) + except AttributeError: + msg = u"with error {}".format(failure.value) + self.feedBack(client, u"Command failed {}".format(msg), mess_data) + return False + + mess_data["unparsed"] = msg[1 + len(command):] # part not yet parsed of the message + try: + cmd_data = self._commands[command] + except KeyError: + self.feedBack(client, _("Unknown command /%s. ") % command + self.HELP_SUGGESTION, mess_data) + log.debug("text command help message") + raise failure.Failure(exceptions.CancelError()) + else: + if not self._contextValid(mess_data, cmd_data): + # The command is not launched in the right context, we throw a message with help instructions + context_txt = _("group discussions") if cmd_data["type"] == "group" else _("one to one discussions") + feedback = _("/{command} command only applies in {context}.").format(command=command, context=context_txt) + self.feedBack(client, u"{} {}".format(feedback, self.HELP_SUGGESTION), mess_data) + log.debug("text command invalid message") + raise failure.Failure(exceptions.CancelError()) + else: + d = defer.maybeDeferred(cmd_data["callback"], client, mess_data) + d.addErrback(genericErrback) + d.addCallback(retHandling) + + return d or mess_data # if a command is detected, we should have a deferred, else we send the message normally + + def _contextValid(self, mess_data, cmd_data): + """Tell if a command can be used in the given context + + @param mess_data(dict): message data as given in sendMessage trigger + @param cmd_data(dict): command data as returned by self._parseDocString + @return (bool): True if command can be used in this context + """ + if ((cmd_data['type'] == "group" and mess_data["type"] != "groupchat") or + (cmd_data['type'] == 'one2one' and mess_data["type"] == "groupchat")): + return False + return True + + def getRoomJID(self, arg, service_jid): + """Return a room jid with a shortcut + + @param arg: argument: can be a full room jid (e.g.: sat@chat.jabberfr.org) + or a shortcut (e.g.: sat or sat@ for sat on current service) + @param service_jid: jid of the current service (e.g.: chat.jabberfr.org) + """ + nb_arobas = arg.count('@') + if nb_arobas == 1: + if arg[-1] != '@': + return jid.JID(arg) + return jid.JID(arg + service_jid) + return jid.JID(u"%s@%s" % (arg, service_jid)) + + def feedBack(self, client, message, mess_data, info_type=FEEDBACK_INFO_TYPE): + """Give a message back to the user""" + if mess_data["type"] == 'groupchat': + to_ = mess_data["to"].userhostJID() + else: + to_ = client.jid + + # we need to invert send message back, so sender need to original destinee + mess_data["from"] = mess_data["to"] + mess_data["to"] = to_ + mess_data["type"] = C.MESS_TYPE_INFO + mess_data["message"] = {'': message} + mess_data["extra"]["info_type"] = info_type + client.messageSendToBridge(mess_data) + + def cmd_whois(self, client, mess_data): + """show informations on entity + + @command: [JID|ROOM_NICK] + - JID: entity to request + - ROOM_NICK: nick of the room to request + """ + log.debug("Catched whois command") + + entity = mess_data["unparsed"].strip() + + if mess_data['type'] == "groupchat": + room = mess_data["to"].userhostJID() + try: + if self.host.plugins["XEP-0045"].isNickInRoom(client, room, entity): + entity = u"%s/%s" % (room, entity) + except KeyError: + log.warning("plugin XEP-0045 is not present") + + if not entity: + target_jid = mess_data["to"] + else: + try: + target_jid = jid.JID(entity) + if not target_jid.user or not target_jid.host: + raise jid.InvalidFormat + except (RuntimeError, jid.InvalidFormat, AttributeError): + self.feedBack(client, _("Invalid jid, can't whois"), mess_data) + return False + + if not target_jid.resource: + target_jid.resource = self.host.memory.getMainResource(client, target_jid) + + whois_msg = [_(u"whois for %(jid)s") % {'jid': target_jid}] + + d = defer.succeed(None) + for ignore, callback in self._whois: + d.addCallback(lambda ignore: callback(client, whois_msg, mess_data, target_jid)) + + def feedBack(ignore): + self.feedBack(client, u"\n".join(whois_msg), mess_data) + return False + + d.addCallback(feedBack) + return d + + def _getArgsHelp(self, cmd_data): + """Return help string for args of cmd_name, according to docstring data + + @param cmd_data: command data + @return (list[unicode]): help strings + """ + strings = [] + for doc_name, doc_help in cmd_data.iteritems(): + if doc_name.startswith("doc_arg_"): + arg_name = doc_name[8:] + strings.append(u"- {name}: {doc_help}".format(name=arg_name, doc_help=_(doc_help))) + + return strings + + def cmd_me(self, client, mess_data): + """display a message at third person + + @command (all): message + - message: message to show at third person + e.g.: "/me clenches his fist" will give "[YOUR_NICK] clenches his fist" + """ + # We just ignore the command as the match is done on receiption by clients + return True + + def cmd_whoami(self, client, mess_data): + """give your own jid""" + self.feedBack(client, client.jid.full(), mess_data) + + def cmd_help(self, client, mess_data): + """show help on available commands + + @command: [cmd_name] + - cmd_name: name of the command for detailed help + """ + cmd_name = mess_data["unparsed"].strip() + if cmd_name and cmd_name[0] == "/": + cmd_name = cmd_name[1:] + if cmd_name and cmd_name not in self._commands: + self.feedBack(client, _(u"Invalid command name [{}]\n".format(cmd_name)), mess_data) + cmd_name = "" + if not cmd_name: + # we show the global help + longuest = max([len(command) for command in self._commands]) + help_cmds = [] + + for command in sorted(self._commands): + cmd_data = self._commands[command] + if not self._contextValid(mess_data, cmd_data): + continue + spaces = (longuest - len(command)) * ' ' + help_cmds.append(" /{command}: {spaces} {short_help}".format( + command=command, + spaces=spaces, + short_help=cmd_data["doc_short_help"] + )) + + help_mess = _(u"Text commands available:\n%s") % (u'\n'.join(help_cmds), ) + else: + # we show detailled help for a command + cmd_data = self._commands[cmd_name] + syntax = cmd_data["args"] + help_mess = _(u"/{name}: {short_help}\n{syntax}{args_help}").format( + name=cmd_name, + short_help=cmd_data['doc_short_help'], + syntax=_(" "*4+"syntax: {}\n").format(syntax) if syntax else "", + args_help=u'\n'.join([u" "*8+"{}".format(line) for line in self._getArgsHelp(cmd_data)])) + + self.feedBack(client, help_mess, mess_data)