comparison libervia/backend/plugins/plugin_misc_text_commands.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_misc_text_commands.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SàT plugin for managing text commands
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from twisted.words.protocols.jabber import jid
21 from twisted.internet import defer
22 from twisted.python import failure
23 from libervia.backend.core.i18n import _
24 from libervia.backend.core.constants import Const as C
25 from libervia.backend.core import exceptions
26 from libervia.backend.core.log import getLogger
27 from libervia.backend.tools import utils
28
29
30 log = getLogger(__name__)
31
32 PLUGIN_INFO = {
33 C.PI_NAME: "Text commands",
34 C.PI_IMPORT_NAME: C.TEXT_CMDS,
35 C.PI_TYPE: "Misc",
36 C.PI_PROTOCOLS: ["XEP-0245"],
37 C.PI_DEPENDENCIES: [],
38 C.PI_MAIN: "TextCommands",
39 C.PI_HANDLER: "no",
40 C.PI_DESCRIPTION: _("""IRC like text commands"""),
41 }
42
43
44 class InvalidCommandSyntax(Exception):
45 """Throwed while parsing @command in docstring if syntax is invalid"""
46
47 pass
48
49
50 CMD_KEY = "@command"
51 CMD_TYPES = ("group", "one2one", "all")
52 FEEDBACK_INFO_TYPE = "TEXT_CMD"
53
54
55 class TextCommands(object):
56 # FIXME: doc strings for commands have to be translatable
57 # plugins need a dynamic translation system (translation
58 # should be downloadable independently)
59
60 HELP_SUGGESTION = _(
61 "Type '/help' to get a list of the available commands. If you didn't want to "
62 "use a command, please start your message with '//' to escape the slash."
63 )
64
65 def __init__(self, host):
66 log.info(_("Text commands initialization"))
67 self.host = host
68 # this is internal command, so we set high priority
69 host.trigger.add("sendMessage", self.send_message_trigger, priority=1000000)
70 self._commands = {}
71 self._whois = []
72 self.register_text_commands(self)
73
74 def _parse_doc_string(self, cmd, cmd_name):
75 """Parse a docstring to get text command data
76
77 @param cmd: function or method callback for the command,
78 its docstring will be used for self documentation in the following way:
79 - first line is the command short documentation, shown with /help
80 - @command keyword can be used,
81 see http://wiki.goffi.org/wiki/Coding_style/en for documentation
82 @return (dict): dictionary with parsed data where key can be:
83 - "doc_short_help" (default: ""): the untranslated short documentation
84 - "type" (default "all"): the command type as specified in documentation
85 - "args" (default: ""): the arguments available, using syntax specified in
86 documentation.
87 - "doc_arg_[name]": the doc of [name] argument
88 """
89 data = {
90 "doc_short_help": "",
91 "type": "all",
92 "args": "",
93 }
94 docstring = cmd.__doc__
95 if docstring is None:
96 log.warning("No docstring found for command {}".format(cmd_name))
97 docstring = ""
98
99 doc_data = docstring.split("\n")
100 data["doc_short_help"] = doc_data[0]
101
102 try:
103 cmd_indent = 0 # >0 when @command is found are we are parsing it
104
105 for line in doc_data:
106 stripped = line.strip()
107 if cmd_indent and line[cmd_indent : cmd_indent + 5] == " -":
108 colon_idx = line.find(":")
109 if colon_idx == -1:
110 raise InvalidCommandSyntax(
111 "No colon found in argument description"
112 )
113 arg_name = line[cmd_indent + 6 : colon_idx].strip()
114 if not arg_name:
115 raise InvalidCommandSyntax(
116 "No name found in argument description"
117 )
118 arg_help = line[colon_idx + 1 :].strip()
119 data["doc_arg_{}".format(arg_name)] = arg_help
120 elif cmd_indent:
121 # we are parsing command and indent level is not good, it's finished
122 break
123 elif stripped.startswith(CMD_KEY):
124 cmd_indent = line.find(CMD_KEY)
125
126 # type
127 colon_idx = stripped.find(":")
128 if colon_idx == -1:
129 raise InvalidCommandSyntax("missing colon")
130 type_data = stripped[len(CMD_KEY) : colon_idx].strip()
131 if len(type_data) == 0:
132 type_data = "(all)"
133 elif (
134 len(type_data) <= 2 or type_data[0] != "(" or type_data[-1] != ")"
135 ):
136 raise InvalidCommandSyntax("Bad type data syntax")
137 type_ = type_data[1:-1]
138 if type_ not in CMD_TYPES:
139 raise InvalidCommandSyntax("Unknown type {}".format(type_))
140 data["type"] = type_
141
142 # args
143 data["args"] = stripped[colon_idx + 1 :].strip()
144 except InvalidCommandSyntax as e:
145 log.warning(
146 "Invalid command syntax for command {command}: {message}".format(
147 command=cmd_name, message=e.message
148 )
149 )
150
151 return data
152
153 def register_text_commands(self, instance):
154 """ Add a text command
155
156 @param instance: instance of a class containing text commands
157 """
158 for attr in dir(instance):
159 if attr.startswith("cmd_"):
160 cmd = getattr(instance, attr)
161 if not callable(cmd):
162 log.warning(_("Skipping not callable [%s] attribute") % attr)
163 continue
164 cmd_name = attr[4:]
165 if not cmd_name:
166 log.warning(_("Skipping cmd_ method"))
167 if cmd_name in self._commands:
168 suff = 2
169 while (cmd_name + str(suff)) in self._commands:
170 suff += 1
171 new_name = cmd_name + str(suff)
172 log.warning(
173 _(
174 "Conflict for command [{old_name}], renaming it to [{new_name}]"
175 ).format(old_name=cmd_name, new_name=new_name)
176 )
177 cmd_name = new_name
178 self._commands[cmd_name] = cmd_data = {"callback": cmd}
179 cmd_data.update(self._parse_doc_string(cmd, cmd_name))
180 log.info(_("Registered text command [%s]") % cmd_name)
181
182 def add_who_is_cb(self, callback, priority=0):
183 """Add a callback which give information to the /whois command
184
185 @param callback: a callback which will be called with the following arguments
186 - whois_msg: list of information strings to display, callback need to append
187 its own strings to it
188 - target_jid: full jid from whom we want information
189 - profile: %(doc_profile)s
190 @param priority: priority of the information to show (the highest priority will
191 be displayed first)
192 """
193 self._whois.append((priority, callback))
194 self._whois.sort(key=lambda item: item[0], reverse=True)
195
196 def send_message_trigger(
197 self, client, mess_data, pre_xml_treatments, post_xml_treatments
198 ):
199 """Install SendMessage command hook """
200 pre_xml_treatments.addCallback(self._send_message_cmd_hook, client)
201 return True
202
203 def _send_message_cmd_hook(self, mess_data, client):
204 """ Check text commands in message, and react consequently
205
206 msg starting with / are potential command. If a command is found, it is executed,
207 else an help message is sent.
208 msg starting with // are escaped: they are sent with a single /
209 commands can abord message sending (if they return anything evaluating to False),
210 or continue it (if they return True), eventually after modifying the message
211 an "unparsed" key is added to message, containing part of the message not yet
212 parsed.
213 Commands can be deferred or not
214 @param mess_data(dict): data comming from sendMessage trigger
215 @param profile: %(doc_profile)s
216 """
217 try:
218 msg = mess_data["message"][""]
219 msg_lang = ""
220 except KeyError:
221 try:
222 # we have not default message, we try to take the first found
223 msg_lang, msg = next(iter(mess_data["message"].items()))
224 except StopIteration:
225 log.debug("No message found, skipping text commands")
226 return mess_data
227
228 try:
229 if msg[:2] == "//":
230 # we have a double '/', it's the escape sequence
231 mess_data["message"][msg_lang] = msg[1:]
232 return mess_data
233 if msg[0] != "/":
234 return mess_data
235 except IndexError:
236 return mess_data
237
238 # we have a command
239 d = None
240 command = msg[1:].partition(" ")[0].lower().strip()
241 if not command.isidentifier():
242 self.feed_back(
243 client,
244 _("Invalid command /%s. ") % command + self.HELP_SUGGESTION,
245 mess_data,
246 )
247 raise failure.Failure(exceptions.CancelError())
248
249 # looks like an actual command, we try to call the corresponding method
250 def ret_handling(ret):
251 """ Handle command return value:
252 if ret is True, normally send message (possibly modified by command)
253 else, abord message sending
254 """
255 if ret:
256 return mess_data
257 else:
258 log.debug("text command detected ({})".format(command))
259 raise failure.Failure(exceptions.CancelError())
260
261 def generic_errback(failure):
262 try:
263 msg = "with condition {}".format(failure.value.condition)
264 except AttributeError:
265 msg = "with error {}".format(failure.value)
266 self.feed_back(client, "Command failed {}".format(msg), mess_data)
267 return False
268
269 mess_data["unparsed"] = msg[
270 1 + len(command) :
271 ] # part not yet parsed of the message
272 try:
273 cmd_data = self._commands[command]
274 except KeyError:
275 self.feed_back(
276 client,
277 _("Unknown command /%s. ") % command + self.HELP_SUGGESTION,
278 mess_data,
279 )
280 log.debug("text command help message")
281 raise failure.Failure(exceptions.CancelError())
282 else:
283 if not self._context_valid(mess_data, cmd_data):
284 # The command is not launched in the right context, we throw a message with help instructions
285 context_txt = (
286 _("group discussions")
287 if cmd_data["type"] == "group"
288 else _("one to one discussions")
289 )
290 feedback = _("/{command} command only applies in {context}.").format(
291 command=command, context=context_txt
292 )
293 self.feed_back(
294 client, "{} {}".format(feedback, self.HELP_SUGGESTION), mess_data
295 )
296 log.debug("text command invalid message")
297 raise failure.Failure(exceptions.CancelError())
298 else:
299 d = utils.as_deferred(cmd_data["callback"], client, mess_data)
300 d.addErrback(generic_errback)
301 d.addCallback(ret_handling)
302
303 return d
304
305 def _context_valid(self, mess_data, cmd_data):
306 """Tell if a command can be used in the given context
307
308 @param mess_data(dict): message data as given in sendMessage trigger
309 @param cmd_data(dict): command data as returned by self._parse_doc_string
310 @return (bool): True if command can be used in this context
311 """
312 if (cmd_data["type"] == "group" and mess_data["type"] != "groupchat") or (
313 cmd_data["type"] == "one2one" and mess_data["type"] == "groupchat"
314 ):
315 return False
316 return True
317
318 def get_room_jid(self, arg, service_jid):
319 """Return a room jid with a shortcut
320
321 @param arg: argument: can be a full room jid (e.g.: sat@chat.jabberfr.org)
322 or a shortcut (e.g.: sat or sat@ for sat on current service)
323 @param service_jid: jid of the current service (e.g.: chat.jabberfr.org)
324 """
325 nb_arobas = arg.count("@")
326 if nb_arobas == 1:
327 if arg[-1] != "@":
328 return jid.JID(arg)
329 return jid.JID(arg + service_jid)
330 return jid.JID(f"{arg}@{service_jid}")
331
332 def feed_back(self, client, message, mess_data, info_type=FEEDBACK_INFO_TYPE):
333 """Give a message back to the user"""
334 if mess_data["type"] == "groupchat":
335 to_ = mess_data["to"].userhostJID()
336 else:
337 to_ = client.jid
338
339 # we need to invert send message back, so sender need to original destinee
340 mess_data["from"] = mess_data["to"]
341 mess_data["to"] = to_
342 mess_data["type"] = C.MESS_TYPE_INFO
343 mess_data["message"] = {"": message}
344 mess_data["extra"]["info_type"] = info_type
345 client.message_send_to_bridge(mess_data)
346
347 def cmd_whois(self, client, mess_data):
348 """show informations on entity
349
350 @command: [JID|ROOM_NICK]
351 - JID: entity to request
352 - ROOM_NICK: nick of the room to request
353 """
354 log.debug("Catched whois command")
355
356 entity = mess_data["unparsed"].strip()
357
358 if mess_data["type"] == "groupchat":
359 room = mess_data["to"].userhostJID()
360 try:
361 if self.host.plugins["XEP-0045"].is_nick_in_room(client, room, entity):
362 entity = "%s/%s" % (room, entity)
363 except KeyError:
364 log.warning("plugin XEP-0045 is not present")
365
366 if not entity:
367 target_jid = mess_data["to"]
368 else:
369 try:
370 target_jid = jid.JID(entity)
371 if not target_jid.user or not target_jid.host:
372 raise jid.InvalidFormat
373 except (RuntimeError, jid.InvalidFormat, AttributeError):
374 self.feed_back(client, _("Invalid jid, can't whois"), mess_data)
375 return False
376
377 if not target_jid.resource:
378 target_jid.resource = self.host.memory.main_resource_get(client, target_jid)
379
380 whois_msg = [_("whois for %(jid)s") % {"jid": target_jid}]
381
382 d = defer.succeed(None)
383 for __, callback in self._whois:
384 d.addCallback(
385 lambda __: callback(client, whois_msg, mess_data, target_jid)
386 )
387
388 def feed_back(__):
389 self.feed_back(client, "\n".join(whois_msg), mess_data)
390 return False
391
392 d.addCallback(feed_back)
393 return d
394
395 def _get_args_help(self, cmd_data):
396 """Return help string for args of cmd_name, according to docstring data
397
398 @param cmd_data: command data
399 @return (list[unicode]): help strings
400 """
401 strings = []
402 for doc_name, doc_help in cmd_data.items():
403 if doc_name.startswith("doc_arg_"):
404 arg_name = doc_name[8:]
405 strings.append(
406 "- {name}: {doc_help}".format(name=arg_name, doc_help=_(doc_help))
407 )
408
409 return strings
410
411 def cmd_me(self, client, mess_data):
412 """display a message at third person
413
414 @command (all): message
415 - message: message to show at third person
416 e.g.: "/me clenches his fist" will give "[YOUR_NICK] clenches his fist"
417 """
418 # We just ignore the command as the match is done on receiption by clients
419 return True
420
421 def cmd_whoami(self, client, mess_data):
422 """give your own jid"""
423 self.feed_back(client, client.jid.full(), mess_data)
424
425 def cmd_help(self, client, mess_data):
426 """show help on available commands
427
428 @command: [cmd_name]
429 - cmd_name: name of the command for detailed help
430 """
431 cmd_name = mess_data["unparsed"].strip()
432 if cmd_name and cmd_name[0] == "/":
433 cmd_name = cmd_name[1:]
434 if cmd_name and cmd_name not in self._commands:
435 self.feed_back(
436 client, _("Invalid command name [{}]\n".format(cmd_name)), mess_data
437 )
438 cmd_name = ""
439 if not cmd_name:
440 # we show the global help
441 longuest = max([len(command) for command in self._commands])
442 help_cmds = []
443
444 for command in sorted(self._commands):
445 cmd_data = self._commands[command]
446 if not self._context_valid(mess_data, cmd_data):
447 continue
448 spaces = (longuest - len(command)) * " "
449 help_cmds.append(
450 " /{command}: {spaces} {short_help}".format(
451 command=command,
452 spaces=spaces,
453 short_help=cmd_data["doc_short_help"],
454 )
455 )
456
457 help_mess = _("Text commands available:\n%s") % ("\n".join(help_cmds),)
458 else:
459 # we show detailled help for a command
460 cmd_data = self._commands[cmd_name]
461 syntax = cmd_data["args"]
462 help_mess = _("/{name}: {short_help}\n{syntax}{args_help}").format(
463 name=cmd_name,
464 short_help=cmd_data["doc_short_help"],
465 syntax=_(" " * 4 + "syntax: {}\n").format(syntax) if syntax else "",
466 args_help="\n".join(
467 [" " * 8 + "{}".format(line) for line in self._get_args_help(cmd_data)]
468 ),
469 )
470
471 self.feed_back(client, help_mess, mess_data)