Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_misc_text_commands.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_text_commands.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SàT plugin for managing text commands | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from twisted.words.protocols.jabber import jid | |
21 from twisted.internet import defer | |
22 from twisted.python import failure | |
23 from libervia.backend.core.i18n import _ | |
24 from libervia.backend.core.constants import Const as C | |
25 from libervia.backend.core import exceptions | |
26 from libervia.backend.core.log import getLogger | |
27 from libervia.backend.tools import utils | |
28 | |
29 | |
30 log = getLogger(__name__) | |
31 | |
32 PLUGIN_INFO = { | |
33 C.PI_NAME: "Text commands", | |
34 C.PI_IMPORT_NAME: C.TEXT_CMDS, | |
35 C.PI_TYPE: "Misc", | |
36 C.PI_PROTOCOLS: ["XEP-0245"], | |
37 C.PI_DEPENDENCIES: [], | |
38 C.PI_MAIN: "TextCommands", | |
39 C.PI_HANDLER: "no", | |
40 C.PI_DESCRIPTION: _("""IRC like text commands"""), | |
41 } | |
42 | |
43 | |
44 class InvalidCommandSyntax(Exception): | |
45 """Throwed while parsing @command in docstring if syntax is invalid""" | |
46 | |
47 pass | |
48 | |
49 | |
50 CMD_KEY = "@command" | |
51 CMD_TYPES = ("group", "one2one", "all") | |
52 FEEDBACK_INFO_TYPE = "TEXT_CMD" | |
53 | |
54 | |
55 class TextCommands(object): | |
56 # FIXME: doc strings for commands have to be translatable | |
57 # plugins need a dynamic translation system (translation | |
58 # should be downloadable independently) | |
59 | |
60 HELP_SUGGESTION = _( | |
61 "Type '/help' to get a list of the available commands. If you didn't want to " | |
62 "use a command, please start your message with '//' to escape the slash." | |
63 ) | |
64 | |
65 def __init__(self, host): | |
66 log.info(_("Text commands initialization")) | |
67 self.host = host | |
68 # this is internal command, so we set high priority | |
69 host.trigger.add("sendMessage", self.send_message_trigger, priority=1000000) | |
70 self._commands = {} | |
71 self._whois = [] | |
72 self.register_text_commands(self) | |
73 | |
74 def _parse_doc_string(self, cmd, cmd_name): | |
75 """Parse a docstring to get text command data | |
76 | |
77 @param cmd: function or method callback for the command, | |
78 its docstring will be used for self documentation in the following way: | |
79 - first line is the command short documentation, shown with /help | |
80 - @command keyword can be used, | |
81 see http://wiki.goffi.org/wiki/Coding_style/en for documentation | |
82 @return (dict): dictionary with parsed data where key can be: | |
83 - "doc_short_help" (default: ""): the untranslated short documentation | |
84 - "type" (default "all"): the command type as specified in documentation | |
85 - "args" (default: ""): the arguments available, using syntax specified in | |
86 documentation. | |
87 - "doc_arg_[name]": the doc of [name] argument | |
88 """ | |
89 data = { | |
90 "doc_short_help": "", | |
91 "type": "all", | |
92 "args": "", | |
93 } | |
94 docstring = cmd.__doc__ | |
95 if docstring is None: | |
96 log.warning("No docstring found for command {}".format(cmd_name)) | |
97 docstring = "" | |
98 | |
99 doc_data = docstring.split("\n") | |
100 data["doc_short_help"] = doc_data[0] | |
101 | |
102 try: | |
103 cmd_indent = 0 # >0 when @command is found are we are parsing it | |
104 | |
105 for line in doc_data: | |
106 stripped = line.strip() | |
107 if cmd_indent and line[cmd_indent : cmd_indent + 5] == " -": | |
108 colon_idx = line.find(":") | |
109 if colon_idx == -1: | |
110 raise InvalidCommandSyntax( | |
111 "No colon found in argument description" | |
112 ) | |
113 arg_name = line[cmd_indent + 6 : colon_idx].strip() | |
114 if not arg_name: | |
115 raise InvalidCommandSyntax( | |
116 "No name found in argument description" | |
117 ) | |
118 arg_help = line[colon_idx + 1 :].strip() | |
119 data["doc_arg_{}".format(arg_name)] = arg_help | |
120 elif cmd_indent: | |
121 # we are parsing command and indent level is not good, it's finished | |
122 break | |
123 elif stripped.startswith(CMD_KEY): | |
124 cmd_indent = line.find(CMD_KEY) | |
125 | |
126 # type | |
127 colon_idx = stripped.find(":") | |
128 if colon_idx == -1: | |
129 raise InvalidCommandSyntax("missing colon") | |
130 type_data = stripped[len(CMD_KEY) : colon_idx].strip() | |
131 if len(type_data) == 0: | |
132 type_data = "(all)" | |
133 elif ( | |
134 len(type_data) <= 2 or type_data[0] != "(" or type_data[-1] != ")" | |
135 ): | |
136 raise InvalidCommandSyntax("Bad type data syntax") | |
137 type_ = type_data[1:-1] | |
138 if type_ not in CMD_TYPES: | |
139 raise InvalidCommandSyntax("Unknown type {}".format(type_)) | |
140 data["type"] = type_ | |
141 | |
142 # args | |
143 data["args"] = stripped[colon_idx + 1 :].strip() | |
144 except InvalidCommandSyntax as e: | |
145 log.warning( | |
146 "Invalid command syntax for command {command}: {message}".format( | |
147 command=cmd_name, message=e.message | |
148 ) | |
149 ) | |
150 | |
151 return data | |
152 | |
153 def register_text_commands(self, instance): | |
154 """ Add a text command | |
155 | |
156 @param instance: instance of a class containing text commands | |
157 """ | |
158 for attr in dir(instance): | |
159 if attr.startswith("cmd_"): | |
160 cmd = getattr(instance, attr) | |
161 if not callable(cmd): | |
162 log.warning(_("Skipping not callable [%s] attribute") % attr) | |
163 continue | |
164 cmd_name = attr[4:] | |
165 if not cmd_name: | |
166 log.warning(_("Skipping cmd_ method")) | |
167 if cmd_name in self._commands: | |
168 suff = 2 | |
169 while (cmd_name + str(suff)) in self._commands: | |
170 suff += 1 | |
171 new_name = cmd_name + str(suff) | |
172 log.warning( | |
173 _( | |
174 "Conflict for command [{old_name}], renaming it to [{new_name}]" | |
175 ).format(old_name=cmd_name, new_name=new_name) | |
176 ) | |
177 cmd_name = new_name | |
178 self._commands[cmd_name] = cmd_data = {"callback": cmd} | |
179 cmd_data.update(self._parse_doc_string(cmd, cmd_name)) | |
180 log.info(_("Registered text command [%s]") % cmd_name) | |
181 | |
182 def add_who_is_cb(self, callback, priority=0): | |
183 """Add a callback which give information to the /whois command | |
184 | |
185 @param callback: a callback which will be called with the following arguments | |
186 - whois_msg: list of information strings to display, callback need to append | |
187 its own strings to it | |
188 - target_jid: full jid from whom we want information | |
189 - profile: %(doc_profile)s | |
190 @param priority: priority of the information to show (the highest priority will | |
191 be displayed first) | |
192 """ | |
193 self._whois.append((priority, callback)) | |
194 self._whois.sort(key=lambda item: item[0], reverse=True) | |
195 | |
196 def send_message_trigger( | |
197 self, client, mess_data, pre_xml_treatments, post_xml_treatments | |
198 ): | |
199 """Install SendMessage command hook """ | |
200 pre_xml_treatments.addCallback(self._send_message_cmd_hook, client) | |
201 return True | |
202 | |
203 def _send_message_cmd_hook(self, mess_data, client): | |
204 """ Check text commands in message, and react consequently | |
205 | |
206 msg starting with / are potential command. If a command is found, it is executed, | |
207 else an help message is sent. | |
208 msg starting with // are escaped: they are sent with a single / | |
209 commands can abord message sending (if they return anything evaluating to False), | |
210 or continue it (if they return True), eventually after modifying the message | |
211 an "unparsed" key is added to message, containing part of the message not yet | |
212 parsed. | |
213 Commands can be deferred or not | |
214 @param mess_data(dict): data comming from sendMessage trigger | |
215 @param profile: %(doc_profile)s | |
216 """ | |
217 try: | |
218 msg = mess_data["message"][""] | |
219 msg_lang = "" | |
220 except KeyError: | |
221 try: | |
222 # we have not default message, we try to take the first found | |
223 msg_lang, msg = next(iter(mess_data["message"].items())) | |
224 except StopIteration: | |
225 log.debug("No message found, skipping text commands") | |
226 return mess_data | |
227 | |
228 try: | |
229 if msg[:2] == "//": | |
230 # we have a double '/', it's the escape sequence | |
231 mess_data["message"][msg_lang] = msg[1:] | |
232 return mess_data | |
233 if msg[0] != "/": | |
234 return mess_data | |
235 except IndexError: | |
236 return mess_data | |
237 | |
238 # we have a command | |
239 d = None | |
240 command = msg[1:].partition(" ")[0].lower().strip() | |
241 if not command.isidentifier(): | |
242 self.feed_back( | |
243 client, | |
244 _("Invalid command /%s. ") % command + self.HELP_SUGGESTION, | |
245 mess_data, | |
246 ) | |
247 raise failure.Failure(exceptions.CancelError()) | |
248 | |
249 # looks like an actual command, we try to call the corresponding method | |
250 def ret_handling(ret): | |
251 """ Handle command return value: | |
252 if ret is True, normally send message (possibly modified by command) | |
253 else, abord message sending | |
254 """ | |
255 if ret: | |
256 return mess_data | |
257 else: | |
258 log.debug("text command detected ({})".format(command)) | |
259 raise failure.Failure(exceptions.CancelError()) | |
260 | |
261 def generic_errback(failure): | |
262 try: | |
263 msg = "with condition {}".format(failure.value.condition) | |
264 except AttributeError: | |
265 msg = "with error {}".format(failure.value) | |
266 self.feed_back(client, "Command failed {}".format(msg), mess_data) | |
267 return False | |
268 | |
269 mess_data["unparsed"] = msg[ | |
270 1 + len(command) : | |
271 ] # part not yet parsed of the message | |
272 try: | |
273 cmd_data = self._commands[command] | |
274 except KeyError: | |
275 self.feed_back( | |
276 client, | |
277 _("Unknown command /%s. ") % command + self.HELP_SUGGESTION, | |
278 mess_data, | |
279 ) | |
280 log.debug("text command help message") | |
281 raise failure.Failure(exceptions.CancelError()) | |
282 else: | |
283 if not self._context_valid(mess_data, cmd_data): | |
284 # The command is not launched in the right context, we throw a message with help instructions | |
285 context_txt = ( | |
286 _("group discussions") | |
287 if cmd_data["type"] == "group" | |
288 else _("one to one discussions") | |
289 ) | |
290 feedback = _("/{command} command only applies in {context}.").format( | |
291 command=command, context=context_txt | |
292 ) | |
293 self.feed_back( | |
294 client, "{} {}".format(feedback, self.HELP_SUGGESTION), mess_data | |
295 ) | |
296 log.debug("text command invalid message") | |
297 raise failure.Failure(exceptions.CancelError()) | |
298 else: | |
299 d = utils.as_deferred(cmd_data["callback"], client, mess_data) | |
300 d.addErrback(generic_errback) | |
301 d.addCallback(ret_handling) | |
302 | |
303 return d | |
304 | |
305 def _context_valid(self, mess_data, cmd_data): | |
306 """Tell if a command can be used in the given context | |
307 | |
308 @param mess_data(dict): message data as given in sendMessage trigger | |
309 @param cmd_data(dict): command data as returned by self._parse_doc_string | |
310 @return (bool): True if command can be used in this context | |
311 """ | |
312 if (cmd_data["type"] == "group" and mess_data["type"] != "groupchat") or ( | |
313 cmd_data["type"] == "one2one" and mess_data["type"] == "groupchat" | |
314 ): | |
315 return False | |
316 return True | |
317 | |
318 def get_room_jid(self, arg, service_jid): | |
319 """Return a room jid with a shortcut | |
320 | |
321 @param arg: argument: can be a full room jid (e.g.: sat@chat.jabberfr.org) | |
322 or a shortcut (e.g.: sat or sat@ for sat on current service) | |
323 @param service_jid: jid of the current service (e.g.: chat.jabberfr.org) | |
324 """ | |
325 nb_arobas = arg.count("@") | |
326 if nb_arobas == 1: | |
327 if arg[-1] != "@": | |
328 return jid.JID(arg) | |
329 return jid.JID(arg + service_jid) | |
330 return jid.JID(f"{arg}@{service_jid}") | |
331 | |
332 def feed_back(self, client, message, mess_data, info_type=FEEDBACK_INFO_TYPE): | |
333 """Give a message back to the user""" | |
334 if mess_data["type"] == "groupchat": | |
335 to_ = mess_data["to"].userhostJID() | |
336 else: | |
337 to_ = client.jid | |
338 | |
339 # we need to invert send message back, so sender need to original destinee | |
340 mess_data["from"] = mess_data["to"] | |
341 mess_data["to"] = to_ | |
342 mess_data["type"] = C.MESS_TYPE_INFO | |
343 mess_data["message"] = {"": message} | |
344 mess_data["extra"]["info_type"] = info_type | |
345 client.message_send_to_bridge(mess_data) | |
346 | |
347 def cmd_whois(self, client, mess_data): | |
348 """show informations on entity | |
349 | |
350 @command: [JID|ROOM_NICK] | |
351 - JID: entity to request | |
352 - ROOM_NICK: nick of the room to request | |
353 """ | |
354 log.debug("Catched whois command") | |
355 | |
356 entity = mess_data["unparsed"].strip() | |
357 | |
358 if mess_data["type"] == "groupchat": | |
359 room = mess_data["to"].userhostJID() | |
360 try: | |
361 if self.host.plugins["XEP-0045"].is_nick_in_room(client, room, entity): | |
362 entity = "%s/%s" % (room, entity) | |
363 except KeyError: | |
364 log.warning("plugin XEP-0045 is not present") | |
365 | |
366 if not entity: | |
367 target_jid = mess_data["to"] | |
368 else: | |
369 try: | |
370 target_jid = jid.JID(entity) | |
371 if not target_jid.user or not target_jid.host: | |
372 raise jid.InvalidFormat | |
373 except (RuntimeError, jid.InvalidFormat, AttributeError): | |
374 self.feed_back(client, _("Invalid jid, can't whois"), mess_data) | |
375 return False | |
376 | |
377 if not target_jid.resource: | |
378 target_jid.resource = self.host.memory.main_resource_get(client, target_jid) | |
379 | |
380 whois_msg = [_("whois for %(jid)s") % {"jid": target_jid}] | |
381 | |
382 d = defer.succeed(None) | |
383 for __, callback in self._whois: | |
384 d.addCallback( | |
385 lambda __: callback(client, whois_msg, mess_data, target_jid) | |
386 ) | |
387 | |
388 def feed_back(__): | |
389 self.feed_back(client, "\n".join(whois_msg), mess_data) | |
390 return False | |
391 | |
392 d.addCallback(feed_back) | |
393 return d | |
394 | |
395 def _get_args_help(self, cmd_data): | |
396 """Return help string for args of cmd_name, according to docstring data | |
397 | |
398 @param cmd_data: command data | |
399 @return (list[unicode]): help strings | |
400 """ | |
401 strings = [] | |
402 for doc_name, doc_help in cmd_data.items(): | |
403 if doc_name.startswith("doc_arg_"): | |
404 arg_name = doc_name[8:] | |
405 strings.append( | |
406 "- {name}: {doc_help}".format(name=arg_name, doc_help=_(doc_help)) | |
407 ) | |
408 | |
409 return strings | |
410 | |
411 def cmd_me(self, client, mess_data): | |
412 """display a message at third person | |
413 | |
414 @command (all): message | |
415 - message: message to show at third person | |
416 e.g.: "/me clenches his fist" will give "[YOUR_NICK] clenches his fist" | |
417 """ | |
418 # We just ignore the command as the match is done on receiption by clients | |
419 return True | |
420 | |
421 def cmd_whoami(self, client, mess_data): | |
422 """give your own jid""" | |
423 self.feed_back(client, client.jid.full(), mess_data) | |
424 | |
425 def cmd_help(self, client, mess_data): | |
426 """show help on available commands | |
427 | |
428 @command: [cmd_name] | |
429 - cmd_name: name of the command for detailed help | |
430 """ | |
431 cmd_name = mess_data["unparsed"].strip() | |
432 if cmd_name and cmd_name[0] == "/": | |
433 cmd_name = cmd_name[1:] | |
434 if cmd_name and cmd_name not in self._commands: | |
435 self.feed_back( | |
436 client, _("Invalid command name [{}]\n".format(cmd_name)), mess_data | |
437 ) | |
438 cmd_name = "" | |
439 if not cmd_name: | |
440 # we show the global help | |
441 longuest = max([len(command) for command in self._commands]) | |
442 help_cmds = [] | |
443 | |
444 for command in sorted(self._commands): | |
445 cmd_data = self._commands[command] | |
446 if not self._context_valid(mess_data, cmd_data): | |
447 continue | |
448 spaces = (longuest - len(command)) * " " | |
449 help_cmds.append( | |
450 " /{command}: {spaces} {short_help}".format( | |
451 command=command, | |
452 spaces=spaces, | |
453 short_help=cmd_data["doc_short_help"], | |
454 ) | |
455 ) | |
456 | |
457 help_mess = _("Text commands available:\n%s") % ("\n".join(help_cmds),) | |
458 else: | |
459 # we show detailled help for a command | |
460 cmd_data = self._commands[cmd_name] | |
461 syntax = cmd_data["args"] | |
462 help_mess = _("/{name}: {short_help}\n{syntax}{args_help}").format( | |
463 name=cmd_name, | |
464 short_help=cmd_data["doc_short_help"], | |
465 syntax=_(" " * 4 + "syntax: {}\n").format(syntax) if syntax else "", | |
466 args_help="\n".join( | |
467 [" " * 8 + "{}".format(line) for line in self._get_args_help(cmd_data)] | |
468 ), | |
469 ) | |
470 | |
471 self.feed_back(client, help_mess, mess_data) |