Mercurial > libervia-backend
view libervia/tui/base.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia TUI # Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _, D_ from libervia.tui.constants import Const as C from libervia.backend.core import log_config log_config.libervia_configure(C.LOG_BACKEND_STANDARD, C) from libervia.backend.core import log as logging log = logging.getLogger(__name__) from libervia.backend.tools import config as sat_config import urwid from urwid.util import is_wide_char from urwid_satext import sat_widgets from libervia.frontends.quick_frontend.quick_app import QuickApp from libervia.frontends.quick_frontend import quick_utils from libervia.frontends.quick_frontend import quick_chat from libervia.tui.profile_manager import ProfileManager from libervia.tui.contact_list import ContactList from libervia.tui.chat import Chat from libervia.tui import xmlui from libervia.tui.progress import Progress from libervia.tui.notify import Notify from libervia.tui.keys import action_key_map as a_key from libervia.tui import config from libervia.frontends.tools.misc import InputHistory from libervia.backend.tools.common import dynamic_import from libervia.frontends.tools import jid import signal import sys ## bridge handling # we get bridge name from conf and initialise the right class accordingly main_config = sat_config.parse_main_conf() bridge_name = sat_config.config_get(main_config, "", "bridge", "dbus") if "dbus" not in bridge_name: print("only D-Bus bridge is currently supported") sys.exit(3) class EditBar(sat_widgets.ModalEdit): """ The modal edit bar where you would enter messages and commands. """ def __init__(self, host): modes = { None: (C.MODE_NORMAL, ""), a_key["MODE_INSERTION"]: (C.MODE_INSERTION, "> "), a_key["MODE_COMMAND"]: (C.MODE_COMMAND, ":"), } # XXX: captions *MUST* be unicode super(EditBar, self).__init__(modes) self.host = host self.set_completion_method(self._text_completion) urwid.connect_signal(self, "click", self.on_text_entered) def _text_completion(self, text, completion_data, mode): if mode == C.MODE_INSERTION: if self.host.selected_widget is not None: try: completion = self.host.selected_widget.completion except AttributeError: return text else: return completion(text, completion_data) else: return text def on_text_entered(self, editBar): """Called when text is entered in the main edit bar""" if self.mode == C.MODE_INSERTION: if isinstance(self.host.selected_widget, quick_chat.QuickChat): chat_widget = self.host.selected_widget self.host.message_send( chat_widget.target, {"": editBar.get_edit_text()}, # TODO: handle language mess_type=( C.MESS_TYPE_GROUPCHAT if chat_widget.type == C.CHAT_GROUP else C.MESS_TYPE_CHAT ), # TODO: put this in QuickChat errback=lambda failure: self.host.show_dialog( _("Error while sending message ({})").format(failure), type="error", ), profile_key=chat_widget.profile, ) editBar.set_edit_text("") elif self.mode == C.MODE_COMMAND: self.command_handler() def command_handler(self): # TODO: separate class with auto documentation (with introspection) # and completion method tokens = self.get_edit_text().split(" ") command, args = tokens[0], tokens[1:] if command == "quit": self.host.on_exit() raise urwid.ExitMainLoop() elif command == "messages": wid = sat_widgets.GenericList(logging.memory_get()) self.host.select_widget(wid) # FIXME: reactivate the command # elif command == 'presence': # values = [value for value in commonConst.PRESENCE.keys()] # values = [value if value else 'online' for value in values] # the empty value actually means 'online' # if args and args[0] in values: # presence = '' if args[0] == 'online' else args[0] # self.host.status_bar.on_change(user_data=sat_widgets.ClickableText(commonConst.PRESENCE[presence])) # else: # self.host.status_bar.on_presence_click() # elif command == 'status': # if args: # self.host.status_bar.on_change(user_data=sat_widgets.AdvancedEdit(args[0])) # else: # self.host.status_bar.on_status_click() elif command == "history": widget = self.host.selected_widget if isinstance(widget, quick_chat.QuickChat): try: limit = int(args[0]) except (IndexError, ValueError): limit = 50 widget.update_history(size=limit, profile=widget.profile) elif command == "search": widget = self.host.selected_widget if isinstance(widget, quick_chat.QuickChat): pattern = " ".join(args) if not pattern: self.host.notif_bar.add_message( D_("Please specify the globbing pattern to search for") ) else: widget.update_history( size=C.HISTORY_LIMIT_NONE, filters={"search": pattern}, profile=widget.profile, ) elif command == "filter": # FIXME: filter is now only for current widget, # need to be able to set it globally or per widget widget = self.host.selected_widget # FIXME: Q&D way, need to be more generic if isinstance(widget, quick_chat.QuickChat): widget.set_filter(args) elif command in ("topic", "suject", "title"): try: new_title = args[0].strip() except IndexError: new_title = None widget = self.host.selected_widget if isinstance(widget, quick_chat.QuickChat) and widget.type == C.CHAT_GROUP: widget.on_subject_dialog(new_title) else: return self.set_edit_text("") def _history_cb(self, text): self.set_edit_text(text) self.set_edit_pos(len(text)) def keypress(self, size, key): """Callback when a key is pressed. Send "composing" states and move the index of the temporary history stack.""" if key == a_key["MODAL_ESCAPE"]: # first save the text to the current mode, then change to NORMAL self.host._update_input_history(self.get_edit_text(), mode=self.mode) self.host._update_input_history(mode=C.MODE_NORMAL) if self._mode == C.MODE_NORMAL and key in self._modes: self.host._update_input_history(mode=self._modes[key][0]) if key == a_key["HISTORY_PREV"]: self.host._update_input_history( self.get_edit_text(), -1, self._history_cb, self.mode ) return elif key == a_key["HISTORY_NEXT"]: self.host._update_input_history( self.get_edit_text(), +1, self._history_cb, self.mode ) return elif key == a_key["EDIT_ENTER"]: self.host._update_input_history(self.get_edit_text(), mode=self.mode) else: if ( self._mode == C.MODE_INSERTION and isinstance(self.host.selected_widget, quick_chat.QuickChat) and key not in sat_widgets.FOCUS_KEYS and key not in (a_key["HISTORY_PREV"], a_key["HISTORY_NEXT"]) and self.host.sync ): self.host.bridge.chat_state_composing( self.host.selected_widget.target, self.host.selected_widget.profile ) return super(EditBar, self).keypress(size, key) class LiberviaTUITopWidget(sat_widgets.FocusPile): """Top most widget used in LiberviaTUI""" _focus_inversed = True positions = ("menu", "body", "notif_bar", "edit_bar") can_hide = ("menu", "notif_bar") def __init__(self, body, menu, notif_bar, edit_bar): self._body = body self._menu = menu self._notif_bar = notif_bar self._edit_bar = edit_bar self._hidden = {"notif_bar"} self._focus_extra = False super(LiberviaTUITopWidget, self).__init__( [("pack", self._menu), self._body, ("pack", self._edit_bar)] ) for position in self.positions: setattr( self, position, property( lambda: self, self.widget_get(position=position), lambda pos, new_wid: self.widget_set(new_wid, position=pos), ), ) self.focus_position = len(self.contents) - 1 def get_visible_positions(self, keep=None): """Return positions that are not hidden in the right order @param keep: if not None, this position will be keep in the right order, even if it's hidden (can be useful to find its index) @return (list): list of visible positions """ return [ pos for pos in self.positions if (keep and pos == keep) or pos not in self._hidden ] def keypress(self, size, key): """Manage FOCUS keys that focus directly a main part (one of self.positions) To avoid key conflicts, a combinaison must be made with FOCUS_EXTRA then an other key """ if key == a_key["FOCUS_EXTRA"]: self._focus_extra = True return if self._focus_extra: self._focus_extra = False if key in ("m", "1"): focus = "menu" elif key in ("b", "2"): focus = "body" elif key in ("n", "3"): focus = "notif_bar" elif key in ("e", "4"): focus = "edit_bar" else: return super(LiberviaTUITopWidget, self).keypress(size, key) if focus in self._hidden: return self.focus_position = self.get_visible_positions().index(focus) return return super(LiberviaTUITopWidget, self).keypress(size, key) def widget_get(self, position): if not position in self.positions: raise ValueError("Unknown position {}".format(position)) return getattr(self, "_{}".format(position)) def widget_set(self, widget, position): if not position in self.positions: raise ValueError("Unknown position {}".format(position)) return setattr(self, "_{}".format(position), widget) def hide_switch(self, position): if not position in self.can_hide: raise ValueError("Can't switch position {}".format(position)) hide = not position in self._hidden widget = self.widget_get(position) idx = self.get_visible_positions(position).index(position) if hide: del self.contents[idx] self._hidden.add(position) else: self.contents.insert(idx, (widget, ("pack", None))) self._hidden.remove(position) def show(self, position): if position in self._hidden: self.hide_switch(position) def hide(self, position): if not position in self._hidden: self.hide_switch(position) class LiberviaTUIApp(QuickApp, InputHistory): MB_HANDLER = False AVATARS_HANDLER = False def __init__(self): bridge_module = dynamic_import.bridge(bridge_name, "libervia.frontends.bridge") if bridge_module is None: log.error("Can't import {} bridge".format(bridge_name)) sys.exit(3) else: log.debug("Loading {} bridge".format(bridge_name)) QuickApp.__init__( self, bridge_factory=bridge_module.bridge, xmlui=xmlui, check_options=quick_utils.check_options, connect_bridge=False, ) ## main loop setup ## event_loop = ( urwid.GLibEventLoop if "dbus" in bridge_name else urwid.TwistedEventLoop ) self.loop = urwid.MainLoop( urwid.SolidFill(), C.PALETTE, event_loop=event_loop(), input_filter=self.input_filter, unhandled_input=self.key_handler, ) @classmethod def run(cls): cls().start() def on_bridge_connected(self): ##misc setup## self._visible_widgets = set() self.notif_bar = sat_widgets.NotificationBar() urwid.connect_signal(self.notif_bar, "change", self.on_notification) self.progress_wid = self.widgets.get_or_create_widget( Progress, None, on_new_widget=None ) urwid.connect_signal( self.notif_bar.progress, "click", lambda x: self.select_widget(self.progress_wid), ) self.__saved_overlay = None self.x_notify = Notify() # we already manage exit with a_key['APP_QUIT'], so we don't want C-c signal.signal(signal.SIGINT, signal.SIG_IGN) sat_conf = sat_config.parse_main_conf() self._bracketed_paste = C.bool( sat_config.config_get(sat_conf, C.CONFIG_SECTION, "bracketed_paste", "false") ) if self._bracketed_paste: log.debug("setting bracketed paste mode as requested") sys.stdout.write("\033[?2004h") self._bracketed_mode_set = True self.loop.widget = self.main_widget = ProfileManager(self) self.post_init() @property def visible_widgets(self): return self._visible_widgets @property def mode(self): return self.editBar.mode @mode.setter def mode(self, value): self.editBar.mode = value def mode_hint(self, value): """Change mode if make sens (i.e.: if there is nothing in the editBar)""" if not self.editBar.get_edit_text(): self.mode = value def debug(self): """convenient method to reset screen and launch (i)p(u)db""" log.info("Entered debug mode") try: import pudb pudb.set_trace() except ImportError: import os os.system("reset") try: import ipdb ipdb.set_trace() except ImportError: import pdb pdb.set_trace() def redraw(self): """redraw the screen""" try: self.loop.draw_screen() except AttributeError: pass def start(self): self.connect_bridge() self.loop.run() def post_init(self): try: config.apply_config(self) except Exception as e: log.error("configuration error: {}".format(e)) popup = self.alert( _("Configuration Error"), _( "Something went wrong while reading the configuration, please check :messages" ), ) if self.options.profile: self._early_popup = popup else: self.show_pop_up(popup) super(LiberviaTUIApp, self).post_init(self.main_widget) def keys_to_text(self, keys): """Generator return normal text from urwid keys""" for k in keys: if k == "tab": yield "\t" elif k == "enter": yield "\n" elif is_wide_char(k, 0) or (len(k) == 1 and ord(k) >= 32): yield k def input_filter(self, input_, raw): if self.__saved_overlay and input_ != a_key["OVERLAY_HIDE"]: return ## paste detection/handling if ( len(input_) > 1 # XXX: it may be needed to increase this value if buffer and not isinstance( input_[0], tuple ) # or other things result in several chars at once and not "window resize" in input_ ): # (e.g. using LiberviaTUI through ssh). Need some testing # and experience to adjust value. if input_[0] == "begin paste" and not self._bracketed_paste: log.info("Bracketed paste mode detected") self._bracketed_paste = True if self._bracketed_paste: # after this block, extra will contain non pasted keys # and input_ will contain pasted keys try: begin_idx = input_.index("begin paste") except ValueError: # this is not a paste, maybe we have something buffering # or bracketed mode is set in conf but not enabled in term extra = input_ input_ = [] else: try: end_idx = input_.index("end paste") except ValueError: log.warning("missing end paste sequence, discarding paste") extra = input_[:begin_idx] del input_[begin_idx:] else: extra = input_[:begin_idx] + input_[end_idx + 1 :] input_ = input_[begin_idx + 1 : end_idx] else: extra = None log.debug("Paste detected (len {})".format(len(input_))) try: edit_bar = self.editBar except AttributeError: log.warning("Paste treated as normal text: there is no edit bar yet") if extra is None: extra = [] extra.extend(input_) else: if self.main_widget.focus == edit_bar: # XXX: if a paste is detected, we append it directly to the edit bar text # so the user can check it and press [enter] if it's OK buf_paste = "".join(self.keys_to_text(input_)) pos = edit_bar.edit_pos edit_bar.set_edit_text( "{}{}{}".format( edit_bar.edit_text[:pos], buf_paste, edit_bar.edit_text[pos:] ) ) edit_bar.edit_pos += len(buf_paste) else: # we are not on the edit_bar, # so we treat pasted text as normal text if extra is None: extra = [] extra.extend(input_) if not extra: return input_ = extra ## end of paste detection/handling for i in input_: if isinstance(i, tuple): if i[0] == "mouse press": if i[1] == 4: # Mouse wheel up input_[input_.index(i)] = a_key["HISTORY_PREV"] if i[1] == 5: # Mouse wheel down input_[input_.index(i)] = a_key["HISTORY_NEXT"] return input_ def key_handler(self, input_): if input_ == a_key["MENU_HIDE"]: """User want to (un)hide the menu roller""" try: self.main_widget.hide_switch("menu") except AttributeError: pass elif input_ == a_key["NOTIFICATION_NEXT"]: """User wants to see next notification""" self.notif_bar.show_next() elif input_ == a_key["OVERLAY_HIDE"]: """User wants to (un)hide overlay window""" if isinstance(self.loop.widget, urwid.Overlay): self.__saved_overlay = self.loop.widget self.loop.widget = self.main_widget else: if self.__saved_overlay: self.loop.widget = self.__saved_overlay self.__saved_overlay = None elif ( input_ == a_key["DEBUG"] and ".dev0" in self.bridge.version_get() ): # Debug only for dev versions self.debug() elif input_ == a_key["CONTACTS_HIDE"]: # user wants to (un)hide the contact lists try: for wid, options in self.center_part.contents: if self.contact_lists_pile is wid: self.center_part.contents.remove((wid, options)) break else: self.center_part.contents.insert( 0, (self.contact_lists_pile, ("weight", 2, False)) ) except AttributeError: # The main widget is not built (probably in Profile Manager) pass elif input_ == "window resize": width, height = self.loop.screen_size if height <= 5 and width <= 35: if not "save_main_widget" in dir(self): self.save_main_widget = self.loop.widget self.loop.widget = urwid.Filler( urwid.Text(_("Pleeeeasse, I can't even breathe !")) ) else: if "save_main_widget" in dir(self): self.loop.widget = self.save_main_widget del self.save_main_widget try: return self.menu_roller.check_shortcuts(input_) except AttributeError: return input_ def add_menus(self, menu, type_filter, menu_data=None): """Add cached menus to instance @param menu: sat_widgets.Menu instance @param type_filter: menu type like is sat.core.sat_main.import_menu @param menu_data: data to send with these menus """ def add_menu_cb(callback_id): self.action_launch(callback_id, menu_data, profile=self.current_profile) for id_, type_, path, path_i18n, extra in self.bridge.menus_get( "", C.NO_SECURITY_LIMIT ): # TODO: manage extra if type_ != type_filter: continue if len(path) != 2: raise NotImplementedError("Menu with a path != 2 are not implemented yet") menu.add_menu( path_i18n[0], path_i18n[1], lambda dummy, id_=id_: add_menu_cb(id_) ) def _build_menu_roller(self): menu = sat_widgets.Menu(self.loop) general = _("General") menu.add_menu(general, _("Connect"), self.on_connect_request) menu.add_menu(general, _("Disconnect"), self.on_disconnect_request) menu.add_menu(general, _("Parameters"), self.on_param) menu.add_menu(general, _("About"), self.on_about_request) menu.add_menu(general, _("Exit"), self.on_exit_request, a_key["APP_QUIT"]) menu.add_menu(_("Contacts")) # add empty menu to save the place in the menu order groups = _("Groups") menu.add_menu(groups) menu.add_menu( groups, _("Join room"), self.on_join_room_request, a_key["ROOM_JOIN"] ) # additionals menus # FIXME: do this in a more generic way (in quickapp) self.add_menus(menu, C.MENU_GLOBAL) menu_roller = sat_widgets.MenuRoller([(_("Main menu"), menu, C.MENU_ID_MAIN)]) return menu_roller def _build_main_widget(self): self.contact_lists_pile = urwid.Pile([]) # self.center_part = urwid.Columns([('weight',2,self.contact_lists[profile]),('weight',8,Chat('',self))]) self.center_part = urwid.Columns( [ ("weight", 2, self.contact_lists_pile), ("weight", 8, urwid.Filler(urwid.Text(""))), ] ) self.editBar = EditBar(self) self.menu_roller = self._build_menu_roller() self.main_widget = LiberviaTUITopWidget( self.center_part, self.menu_roller, self.notif_bar, self.editBar ) return self.main_widget def plugging_profiles(self): self.loop.widget = self._build_main_widget() self.redraw() try: # if a popup arrived before main widget is build, we need to show it now self.show_pop_up(self._early_popup) except AttributeError: pass else: del self._early_popup def profile_plugged(self, profile): QuickApp.profile_plugged(self, profile) contact_list = self.widgets.get_or_create_widget( ContactList, None, on_new_widget=None, on_click=self.contact_selected, on_change=lambda w: self.redraw(), profile=profile, ) self.contact_lists_pile.contents.append((contact_list, ("weight", 1))) return contact_list def is_hidden(self): """Tells if the frontend window is hidden. @return bool """ return False # FIXME: implement when necessary def alert(self, title, message): """Shortcut method to create an alert message Alert will have an "OK" button, which remove it if pressed @param title(unicode): title of the dialog @param message(unicode): body of the dialog @return (urwid_satext.Alert): the created Alert instance """ popup = sat_widgets.Alert(title, message) popup.set_callback("ok", lambda dummy: self.remove_pop_up(popup)) self.show_pop_up(popup, width=75, height=20) return popup def remove_pop_up(self, widget=None): """Remove current pop-up, and if there is other in queue, show it @param widget(None, urwid.Widget): if not None remove this popup from front or queue """ # TODO: refactor popup management in a cleaner way # buttons' callback use themselve as first argument, and we never use # a Button directly in a popup, so we consider urwid.Button as None if widget is not None and not isinstance(widget, urwid.Button): if isinstance(self.loop.widget, urwid.Overlay): current_popup = self.loop.widget.top_w if not current_popup == widget: try: self.notif_bar.remove_pop_up(widget) except ValueError: log.warning( "Trying to remove an unknown widget {}".format(widget) ) return self.loop.widget = self.main_widget next_popup = self.notif_bar.get_next_popup() if next_popup: # we still have popup to show, we display it self.show_pop_up(next_popup) else: self.redraw() def show_pop_up( self, pop_up_widget, width=None, height=None, align="center", valign="middle" ): """Show a pop-up window if possible, else put it in queue @param pop_up_widget: pop up to show @param width(int, None): width of the popup None to use default @param height(int, None): height of the popup None to use default @param align: same as for [urwid.Overlay] """ if width == None: width = 75 if isinstance(pop_up_widget, xmlui.LiberviaTUINoteDialog) else 135 if height == None: height = 20 if isinstance(pop_up_widget, xmlui.LiberviaTUINoteDialog) else 40 if not isinstance(self.loop.widget, urwid.Overlay): display_widget = urwid.Overlay( pop_up_widget, self.main_widget, align, width, valign, height ) self.loop.widget = display_widget self.redraw() else: self.notif_bar.add_pop_up(pop_up_widget) def bar_notify(self, message): """ "Notify message to user via notification bar""" self.notif_bar.add_message(message) self.redraw() def notify( self, type_, entity=None, message=None, subject=None, callback=None, cb_args=None, widget=None, profile=C.PROF_KEY_NONE, ): if widget is None or widget is not None and widget != self.selected_widget: # we ignore notification if the widget is selected but we can # still do a desktop notification is the X window has not the focus super(LiberviaTUIApp, self).notify( type_, entity, message, subject, callback, cb_args, widget, profile ) # we don't want notifications without message on desktop if message is not None and not self.x_notify.has_focus(): if message is None: message = _("{app}: a new event has just happened{entity}").format( app=C.APP_NAME, entity=" ({})".format(entity) if entity else "" ) self.x_notify.send_notification(message) def new_widget(self, widget, user_action=False): """Method called when a new widget is created if suitable, the widget will be displayed @param widget(widget.LiberviaTUIWidget): created widget @param user_action(bool): if True, the widget has been created following an explicit user action. In this case, the widget may get focus immediately """ # FIXME: when several widgets are possible (e.g. with :split) # do not replace current widget when self.selected_widget != None if user_action or self.selected_widget is None: self.select_widget(widget) def select_widget(self, widget): """Display a widget if possible, else add it in the notification bar queue @param widget: BoxWidget """ assert len(self.center_part.widget_list) <= 2 wid_idx = len(self.center_part.widget_list) - 1 self.center_part.widget_list[wid_idx] = widget try: self.menu_roller.remove_menu(C.MENU_ID_WIDGET) except KeyError: log.debug("No menu to delete") self.selected_widget = widget try: on_selected = self.selected_widget.on_selected except AttributeError: pass else: on_selected() self._visible_widgets = set( [widget] ) # XXX: we can only have one widget visible at the time for now self.contact_lists.select(None) for ( wid ) in ( self.visible_widgets ): # FIXME: check if widgets.get_widgets is not more appropriate if isinstance(wid, Chat): contact_list = self.contact_lists[wid.profile] contact_list.select(wid.target) self.redraw() def remove_window(self): """Remove window showed on the right column""" # TODO: better Window management than this hack assert len(self.center_part.widget_list) <= 2 wid_idx = len(self.center_part.widget_list) - 1 self.center_part.widget_list[wid_idx] = urwid.Filler(urwid.Text("")) self.center_part.focus_position = 0 self.redraw() def add_progress(self, pid, message, profile): """Follow a SàT progression @param pid: progression id @param message: message to show to identify the progression """ self.progress_wid.add(pid, message, profile) def set_progress(self, percentage): """Set the progression shown in notification bar""" self.notif_bar.set_progress(percentage) def contact_selected(self, contact_list, entity): self.clear_notifs(entity, profile=contact_list.profile) if entity.resource: # we have clicked on a private MUC conversation chat_widget = self.widgets.get_or_create_widget( Chat, entity, on_new_widget=None, force_hash=Chat.get_private_hash(contact_list.profile, entity), profile=contact_list.profile, ) else: chat_widget = self.widgets.get_or_create_widget( Chat, entity, on_new_widget=None, profile=contact_list.profile ) self.select_widget(chat_widget) self.menu_roller.add_menu( _("Chat menu"), chat_widget.get_menu(), C.MENU_ID_WIDGET ) def _dialog_ok_cb(self, widget, data): popup, answer_cb, answer_data = data self.remove_pop_up(popup) if answer_cb is not None: answer_cb(True, answer_data) def _dialog_cancel_cb(self, widget, data): popup, answer_cb, answer_data = data self.remove_pop_up(popup) if answer_cb is not None: answer_cb(False, answer_data) def show_dialog( self, message, title="", type="info", answer_cb=None, answer_data=None ): if type == "info": popup = sat_widgets.Alert(title, message, ok_cb=answer_cb) if answer_cb is None: popup.set_callback("ok", lambda dummy: self.remove_pop_up(popup)) elif type == "error": popup = sat_widgets.Alert(title, message, ok_cb=answer_cb) if answer_cb is None: popup.set_callback("ok", lambda dummy: self.remove_pop_up(popup)) elif type == "yes/no": popup = sat_widgets.ConfirmDialog(message) popup.set_callback("yes", self._dialog_ok_cb, (popup, answer_cb, answer_data)) popup.set_callback( "no", self._dialog_cancel_cb, (popup, answer_cb, answer_data) ) else: popup = sat_widgets.Alert(title, message, ok_cb=answer_cb) if answer_cb is None: popup.set_callback("ok", lambda dummy: self.remove_pop_up(popup)) log.error("unmanaged dialog type: {}".format(type)) self.show_pop_up(popup) def dialog_failure(self, failure): """Show a failure that has been returned by an asynchronous bridge method. @param failure (defer.Failure): Failure instance """ self.alert(failure.classname, failure.message) def on_notification(self, notif_bar): """Called when a new notification has been received""" if not isinstance(self.main_widget, LiberviaTUITopWidget): # if we are not in the main configuration, we ignore the notifications bar return if self.notif_bar.can_hide(): # No notification left, we can hide the bar self.main_widget.hide("notif_bar") else: self.main_widget.show("notif_bar") self.redraw() # FIXME: invalidate cache in a more efficient way def _action_manager_unknown_error(self): self.alert(_("Error"), _("Unmanaged action")) def room_joined_handler(self, room_jid_s, room_nicks, user_nick, subject, profile): super(LiberviaTUIApp, self).room_joined_handler( room_jid_s, room_nicks, user_nick, subject, profile ) # if self.selected_widget is None: # for contact_list in self.widgets.get_widgets(ContactList): # if profile in contact_list.profiles: # contact_list.set_focus(jid.JID(room_jid_s), True) def progress_started_handler(self, pid, metadata, profile): super(LiberviaTUIApp, self).progress_started_handler(pid, metadata, profile) self.add_progress(pid, metadata.get("name", _("unkown")), profile) def progress_finished_handler(self, pid, metadata, profile): log.info("Progress {} finished".format(pid)) super(LiberviaTUIApp, self).progress_finished_handler(pid, metadata, profile) def progress_error_handler(self, pid, err_msg, profile): log.warning("Progress {pid} error: {err_msg}".format(pid=pid, err_msg=err_msg)) super(LiberviaTUIApp, self).progress_error_handler(pid, err_msg, profile) ##DIALOGS CALLBACKS## def on_join_room(self, button, edit): self.remove_pop_up() room_jid = jid.JID(edit.get_edit_text()) self.bridge.muc_join( room_jid, self.profiles[self.current_profile].whoami.node, {}, self.current_profile, callback=lambda dummy: None, errback=self.dialog_failure, ) # MENU EVENTS# def on_connect_request(self, menu): QuickApp.connect(self, self.current_profile) def on_disconnect_request(self, menu): self.disconnect(self.current_profile) def on_param(self, menu): def success(params): ui = xmlui.create(self, xml_data=params, profile=self.current_profile) ui.show() def failure(error): self.alert(_("Error"), _("Can't get parameters (%s)") % error) self.bridge.param_ui_get( app=C.APP_NAME, profile_key=self.current_profile, callback=success, errback=failure, ) def on_exit_request(self, menu): QuickApp.on_exit(self) try: if ( self._bracketed_mode_set ): # we don't unset if bracketed paste mode was detected automatically (i.e. not in conf) log.debug("unsetting bracketed paste mode") sys.stdout.write("\033[?2004l") except AttributeError: pass raise urwid.ExitMainLoop() def on_join_room_request(self, menu): """User wants to join a MUC room""" pop_up_widget = sat_widgets.InputDialog( _("Entering a MUC room"), _("Please enter MUC's JID"), default_txt=self.bridge.muc_get_default_service(), ok_cb=self.on_join_room, ) pop_up_widget.set_callback( "cancel", lambda dummy: self.remove_pop_up(pop_up_widget) ) self.show_pop_up(pop_up_widget) def on_about_request(self, menu): self.alert(_("About"), C.APP_NAME + " v" + self.bridge.version_get()) # MISC CALLBACKS# def set_presence_status(self, show="", status=None, profile=C.PROF_KEY_NONE): contact_list_wid = self.widgets.get_widget(ContactList, profiles=profile) if contact_list_wid is not None: contact_list_wid.status_bar.set_presence_status(show, status) else: log.warning("No ContactList widget found for profile {}".format(profile)) if __name__ == "__main__": LiberviaTUIApp().start()