diff libervia/tui/chat.py @ 4076:b620a8e882e1

refactoring: rename `libervia.frontends.primitivus` to `libervia.tui`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 16:25:25 +0200
parents libervia/frontends/primitivus/chat.py@26b7ed2817da
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/tui/chat.py	Fri Jun 02 16:25:25 2023 +0200
@@ -0,0 +1,708 @@
+#!/usr/bin/env python3
+
+
+# Libervia TUI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from functools import total_ordering
+from pathlib import Path
+import bisect
+import urwid
+from urwid_satext import sat_widgets
+from libervia.backend.core.i18n import _
+from libervia.backend.core import log as logging
+from libervia.frontends.quick_frontend import quick_widgets
+from libervia.frontends.quick_frontend import quick_chat
+from libervia.frontends.quick_frontend import quick_games
+from libervia.tui import game_tarot
+from libervia.tui.constants import Const as C
+from libervia.tui.keys import action_key_map as a_key
+from libervia.tui.widget import LiberviaTUIWidget
+from libervia.tui.contact_list import ContactList
+
+
+log = logging.getLogger(__name__)
+
+
+OCCUPANTS_FOOTER = _("{} occupants")
+
+
+class MessageWidget(urwid.WidgetWrap, quick_chat.MessageWidget):
+    def __init__(self, mess_data):
+        """
+        @param mess_data(quick_chat.Message, None): message data
+            None: used only for non text widgets (e.g.: focus separator)
+        """
+        self.mess_data = mess_data
+        mess_data.widgets.add(self)
+        super(MessageWidget, self).__init__(urwid.Text(self.markup))
+
+    @property
+    def markup(self):
+        return (
+            self._generate_info_markup()
+            if self.mess_data.type == C.MESS_TYPE_INFO
+            else self._generate_markup()
+        )
+
+    @property
+    def info_type(self):
+        return self.mess_data.info_type
+
+    @property
+    def parent(self):
+        return self.mess_data.parent
+
+    @property
+    def message(self):
+        """Return currently displayed message"""
+        return self.mess_data.main_message
+
+    @message.setter
+    def message(self, value):
+        self.mess_data.message = {"": value}
+        self.redraw()
+
+    @property
+    def type(self):
+        try:
+            return self.mess_data.type
+        except AttributeError:
+            return C.MESS_TYPE_INFO
+
+    def redraw(self):
+        self._w.set_text(self.markup)
+        self.mess_data.parent.host.redraw()  # FIXME: should not be necessary
+
+    def selectable(self):
+        return True
+
+    def keypress(self, size, key):
+        return key
+
+    def get_cursor_coords(self, size):
+        return 0, 0
+
+    def render(self, size, focus=False):
+        # Text widget doesn't render cursor, but we want one
+        # so we add it here
+        canvas = urwid.CompositeCanvas(self._w.render(size, focus))
+        if focus:
+            canvas.set_cursor(self.get_cursor_coords(size))
+        return canvas
+
+    def _generate_info_markup(self):
+        return ("info_msg", self.message)
+
+    def _generate_markup(self):
+        """Generate text markup according to message data and Widget options"""
+        markup = []
+        d = self.mess_data
+        mention = d.mention
+
+        # message status
+        if d.status is None:
+            markup.append(" ")
+        elif d.status == "delivered":
+            markup.append(("msg_status_received", "✔"))
+        else:
+            log.warning("Unknown status: {}".format(d.status))
+
+        # timestamp
+        if self.parent.show_timestamp:
+            attr = "msg_mention" if mention else "date"
+            markup.append((attr, "[{}]".format(d.time_text)))
+        else:
+            if mention:
+                markup.append(("msg_mention", "[*]"))
+
+        # nickname
+        if self.parent.show_short_nick:
+            markup.append(
+                ("my_nick" if d.own_mess else "other_nick", "**" if d.own_mess else "*")
+            )
+        else:
+            markup.append(
+                ("my_nick" if d.own_mess else "other_nick", "[{}] ".format(d.nick or ""))
+            )
+
+        msg = self.message  # needed to generate self.selected_lang
+
+        if d.selected_lang:
+            markup.append(("msg_lang", "[{}] ".format(d.selected_lang)))
+
+        # message body
+        markup.append(msg)
+
+        return markup
+
+    # events
+    def update(self, update_dict=None):
+        """update all the linked message widgets
+
+        @param update_dict(dict, None): key=attribute updated value=new_value
+        """
+        self.redraw()
+
+
+@total_ordering
+class OccupantWidget(urwid.WidgetWrap):
+    def __init__(self, occupant_data):
+        self.occupant_data = occupant_data
+        occupant_data.widgets.add(self)
+        markup = self._generate_markup()
+        text = sat_widgets.ClickableText(markup)
+        urwid.connect_signal(
+            text,
+            "click",
+            self.occupant_data.parent._occupants_clicked,
+            user_args=[self.occupant_data],
+        )
+        super(OccupantWidget, self).__init__(text)
+
+    def __hash__(self):
+        return id(self)
+
+    def __eq__(self, other):
+        if other is None:
+            return False
+        return self.occupant_data.nick == other.occupant_data.nick
+
+    def __lt__(self, other):
+        return self.occupant_data.nick.lower() < other.occupant_data.nick.lower()
+
+    @property
+    def markup(self):
+        return self._generate_markup()
+
+    @property
+    def parent(self):
+        return self.mess_data.parent
+
+    @property
+    def nick(self):
+        return self.occupant_data.nick
+
+    def redraw(self):
+        self._w.set_text(self.markup)
+        self.occupant_data.parent.host.redraw()  # FIXME: should not be necessary
+
+    def selectable(self):
+        return True
+
+    def keypress(self, size, key):
+        return key
+
+    def get_cursor_coords(self, size):
+        return 0, 0
+
+    def render(self, size, focus=False):
+        # Text widget doesn't render cursor, but we want one
+        # so we add it here
+        canvas = urwid.CompositeCanvas(self._w.render(size, focus))
+        if focus:
+            canvas.set_cursor(self.get_cursor_coords(size))
+        return canvas
+
+    def _generate_markup(self):
+        # TODO: role and affiliation are shown in a Q&D way
+        #       should be more intuitive and themable
+        o = self.occupant_data
+        markup = []
+        markup.append(
+            ("info_msg", "{}{} ".format(o.role[0].upper(), o.affiliation[0].upper()))
+        )
+        markup.append(o.nick)
+        if o.state is not None:
+            markup.append(" {}".format(C.CHAT_STATE_ICON[o.state]))
+        return markup
+
+    # events
+    def update(self, update_dict=None):
+        self.redraw()
+
+
+class OccupantsWidget(urwid.WidgetWrap):
+    def __init__(self, parent):
+        self.parent = parent
+        self.occupants_walker = urwid.SimpleListWalker([])
+        self.occupants_footer = urwid.Text("", align="center")
+        self.update_footer()
+        occupants_widget = urwid.Frame(
+            urwid.ListBox(self.occupants_walker), footer=self.occupants_footer
+        )
+        super(OccupantsWidget, self).__init__(occupants_widget)
+        occupants_list = sorted(list(self.parent.occupants.keys()), key=lambda o: o.lower())
+        for occupant in occupants_list:
+            occupant_data = self.parent.occupants[occupant]
+            self.occupants_walker.append(OccupantWidget(occupant_data))
+
+    def clear(self):
+        del self.occupants_walker[:]
+
+    def update_footer(self):
+        """update footer widget"""
+        txt = OCCUPANTS_FOOTER.format(len(self.parent.occupants))
+        self.occupants_footer.set_text(txt)
+
+    def get_nicks(self, start=""):
+        """Return nicks of all occupants
+
+        @param start(unicode): only return nicknames which start with this text
+        """
+        return [
+            w.nick
+            for w in self.occupants_walker
+            if isinstance(w, OccupantWidget) and w.nick.startswith(start)
+        ]
+
+    def addUser(self, occupant_data):
+        """add a user to the list"""
+        bisect.insort(self.occupants_walker, OccupantWidget(occupant_data))
+        self.update_footer()
+        self.parent.host.redraw()  # FIXME: should not be necessary
+
+    def removeUser(self, occupant_data):
+        """remove a user from the list"""
+        for widget in occupant_data.widgets:
+            self.occupants_walker.remove(widget)
+        self.update_footer()
+        self.parent.host.redraw()  # FIXME: should not be necessary
+
+
+class Chat(LiberviaTUIWidget, quick_chat.QuickChat):
+    def __init__(self, host, target, type_=C.CHAT_ONE2ONE, nick=None, occupants=None,
+                 subject=None, statuses=None, profiles=None):
+        self.filters = []  # list of filter callbacks to apply
+        self.mess_walker = urwid.SimpleListWalker([])
+        self.mess_widgets = urwid.ListBox(self.mess_walker)
+        self.chat_widget = urwid.Frame(self.mess_widgets)
+        self.chat_colums = urwid.Columns([("weight", 8, self.chat_widget)])
+        self.pile = urwid.Pile([self.chat_colums])
+        LiberviaTUIWidget.__init__(self, self.pile, target)
+        quick_chat.QuickChat.__init__(
+            self, host, target, type_, nick, occupants, subject, statuses,
+            profiles=profiles
+        )
+
+        # we must adapt the behaviour with the type
+        if type_ == C.CHAT_GROUP:
+            if len(self.chat_colums.contents) == 1:
+                self.occupants_widget = OccupantsWidget(self)
+                self.occupants_panel = sat_widgets.VerticalSeparator(
+                    self.occupants_widget
+                )
+                self._append_occupants_panel()
+                self.host.addListener("presence", self.presence_listener, [profiles])
+
+        # focus marker is a separator indicated last visible message before focus was lost
+        self.focus_marker = None  # link to current marker
+        self.focus_marker_set = None  # True if a new marker has been inserted
+        self.show_timestamp = True
+        self.show_short_nick = False
+        self.show_title = 1  # 0: clip title; 1: full title; 2: no title
+        self.post_init()
+
+    @property
+    def message_widgets_rev(self):
+        return reversed(self.mess_walker)
+
+    def keypress(self, size, key):
+        if key == a_key["OCCUPANTS_HIDE"]:  # user wants to (un)hide the occupants panel
+            if self.type == C.CHAT_GROUP:
+                widgets = [widget for (widget, options) in self.chat_colums.contents]
+                if self.occupants_panel in widgets:
+                    self._remove_occupants_panel()
+                else:
+                    self._append_occupants_panel()
+        elif key == a_key["TIMESTAMP_HIDE"]:  # user wants to (un)hide timestamp
+            self.show_timestamp = not self.show_timestamp
+            self.redraw()
+        elif key == a_key["SHORT_NICKNAME"]:  # user wants to (not) use short nick
+            self.show_short_nick = not self.show_short_nick
+            self.redraw()
+        elif (key == a_key["SUBJECT_SWITCH"]):
+            # user wants to (un)hide group's subject or change its apperance
+            if self.subject:
+                self.show_title = (self.show_title + 1) % 3
+                if self.show_title == 0:
+                    self.set_subject(self.subject, "clip")
+                elif self.show_title == 1:
+                    self.set_subject(self.subject, "space")
+                elif self.show_title == 2:
+                    self.chat_widget.header = None
+                self._invalidate()
+        elif key == a_key["GOTO_BOTTOM"]:  # user wants to focus last message
+            self.mess_widgets.focus_position = len(self.mess_walker) - 1
+
+        return super(Chat, self).keypress(size, key)
+
+    def completion(self, text, completion_data):
+        """Completion method which complete nicknames in group chat
+
+        for params, see [sat_widgets.AdvancedEdit]
+        """
+        if self.type != C.CHAT_GROUP:
+            return text
+
+        space = text.rfind(" ")
+        start = text[space + 1 :]
+        words = self.occupants_widget.get_nicks(start)
+        if not words:
+            return text
+        try:
+            word_idx = words.index(completion_data["last_word"]) + 1
+        except (KeyError, ValueError):
+            word_idx = 0
+        else:
+            if word_idx == len(words):
+                word_idx = 0
+        word = completion_data["last_word"] = words[word_idx]
+        return "{}{}{}".format(text[: space + 1], word, ": " if space < 0 else "")
+
+    def get_menu(self):
+        """Return Menu bar"""
+        menu = sat_widgets.Menu(self.host.loop)
+        if self.type == C.CHAT_GROUP:
+            self.host.add_menus(menu, C.MENU_ROOM, {"room_jid": self.target.bare})
+            game = _("Game")
+            menu.add_menu(game, "Tarot", self.on_tarot_request)
+        elif self.type == C.CHAT_ONE2ONE:
+            # FIXME: self.target is a bare jid, we need to check that
+            contact_list = self.host.contact_lists[self.profile]
+            if not self.target.resource:
+                full_jid = contact_list.get_full_jid(self.target)
+            else:
+                full_jid = self.target
+            self.host.add_menus(menu, C.MENU_SINGLE, {"jid": full_jid})
+        return menu
+
+    def set_filter(self, args):
+        """set filtering of messages
+
+        @param args(list[unicode]): filters following syntax "[filter]=[value]"
+            empty list to clear all filters
+            only lang=XX is handled for now
+        """
+        del self.filters[:]
+        if args:
+            if args[0].startswith("lang="):
+                lang = args[0][5:].strip()
+                self.filters.append(lambda mess_data: lang in mess_data.message)
+
+        self.print_messages()
+
+    def presence_listener(self, entity, show, priority, statuses, profile):
+        """Update entity's presence status
+
+        @param entity (jid.JID): entity updated
+        @param show: availability
+        @param priority: resource's priority
+        @param statuses: dict of statuses
+        @param profile: %(doc_profile)s
+        """
+        # FIXME: disable for refactoring, need to be checked and re-enabled
+        return
+        # assert self.type == C.CHAT_GROUP
+        # if entity.bare != self.target:
+        #     return
+        # self.update(entity)
+
+    def create_message(self, message):
+        self.appendMessage(message)
+
+    def _scrollDown(self):
+        """scroll down message only if we are already at the bottom (minus 1)"""
+        current_focus = self.mess_widgets.focus_position
+        bottom = len(self.mess_walker) - 1
+        if current_focus == bottom - 1:
+            self.mess_widgets.focus_position = bottom  # scroll down
+        self.host.redraw()  # FIXME: should not be necessary
+
+    def appendMessage(self, message, minor_notifs=True):
+        """Create a MessageWidget and append it
+
+        Can merge info messages together if desirable (e.g.: multiple joined/leave)
+        @param message(quick_chat.Message): message to add
+        @param minor_notifs(boolean): if True, basic notifications are allowed
+            If False, notification are not shown except if we have an important one
+            (like a mention).
+            False is generally used when printing history, when we don't want every
+            message to be notified.
+        """
+        if message.attachments:
+            # FIXME: Q&D way to see attachments in LiberviaTUI
+            #   it should be done in a more user friendly way
+            for lang, body in message.message.items():
+                for attachment in message.attachments:
+                    if 'url' in attachment:
+                        body+=f"\n{attachment['url']}"
+                    elif 'path' in attachment:
+                        path = Path(attachment['path'])
+                        body+=f"\n{path.as_uri()}"
+                    else:
+                        log.warning(f'No "url" nor "path" in attachment: {attachment}')
+                    message.message[lang] = body
+
+        if self.filters:
+            if not all([f(message) for f in self.filters]):
+                return
+
+        if self.handle_user_moved(message):
+            return
+
+        if ((self.host.selected_widget != self or not self.host.x_notify.has_focus())
+            and self.focus_marker_set is not None):
+            if not self.focus_marker_set and not self._locked and self.mess_walker:
+                if self.focus_marker is not None:
+                    try:
+                        self.mess_walker.remove(self.focus_marker)
+                    except ValueError:
+                        # self.focus_marker may not be in mess_walker anymore if
+                        # mess_walker has been cleared, e.g. when showing search
+                        # result or using :history command
+                        pass
+                self.focus_marker = urwid.Divider("—")
+                self.mess_walker.append(self.focus_marker)
+                self.focus_marker_set = True
+                self._scrollDown()
+        else:
+            if self.focus_marker_set:
+                self.focus_marker_set = False
+
+        wid = MessageWidget(message)
+        self.mess_walker.append(wid)
+        self._scrollDown()
+        if self.is_user_moved(message):
+            return  # no notification for moved messages
+
+        # notifications
+
+        if self._locked:
+            # we don't want notifications when locked
+            # because that's history messages
+            return
+
+        if wid.mess_data.mention:
+            from_jid = wid.mess_data.from_jid
+            msg = _(
+                "You have been mentioned by {nick} in {room}".format(
+                    nick=wid.mess_data.nick, room=self.target
+                )
+            )
+            self.host.notify(
+                C.NOTIFY_MENTION, from_jid, msg, widget=self, profile=self.profile
+            )
+        elif not minor_notifs:
+            return
+        elif self.type == C.CHAT_ONE2ONE:
+            from_jid = wid.mess_data.from_jid
+            msg = _("{entity} is talking to you".format(entity=from_jid))
+            self.host.notify(
+                C.NOTIFY_MESSAGE, from_jid, msg, widget=self, profile=self.profile
+            )
+        else:
+            self.host.notify(
+                C.NOTIFY_MESSAGE, self.target, widget=self, profile=self.profile
+            )
+
+    def addUser(self, nick):
+        occupant = super(Chat, self).addUser(nick)
+        self.occupants_widget.addUser(occupant)
+
+    def removeUser(self, occupant_data):
+        occupant = super(Chat, self).removeUser(occupant_data)
+        if occupant is not None:
+            self.occupants_widget.removeUser(occupant)
+
+    def occupants_clear(self):
+        super(Chat, self).occupants_clear()
+        self.occupants_widget.clear()
+
+    def _occupants_clicked(self, occupant, clicked_wid):
+        assert self.type == C.CHAT_GROUP
+        contact_list = self.host.contact_lists[self.profile]
+
+        # we have a click on a nick, we need to create the widget if it doesn't exists
+        self.get_or_create_private_widget(occupant.jid)
+
+        # now we select the new window
+        for contact_list in self.host.widgets.get_widgets(
+            ContactList, profiles=(self.profile,)
+        ):
+            contact_list.set_focus(occupant.jid, True)
+
+    def _append_occupants_panel(self):
+        self.chat_colums.contents.append((self.occupants_panel, ("weight", 2, False)))
+
+    def _remove_occupants_panel(self):
+        for widget, options in self.chat_colums.contents:
+            if widget is self.occupants_panel:
+                self.chat_colums.contents.remove((widget, options))
+                break
+
+    def add_game_panel(self, widget):
+        """Insert a game panel to this Chat dialog.
+
+        @param widget (Widget): the game panel
+        """
+        assert len(self.pile.contents) == 1
+        self.pile.contents.insert(0, (widget, ("weight", 1)))
+        self.pile.contents.insert(1, (urwid.Filler(urwid.Divider("-"), ("fixed", 1))))
+        self.host.redraw()
+
+    def remove_game_panel(self, widget):
+        """Remove the game panel from this Chat dialog.
+
+        @param widget (Widget): the game panel
+        """
+        assert len(self.pile.contents) == 3
+        del self.pile.contents[0]
+        self.host.redraw()
+
+    def set_subject(self, subject, wrap="space"):
+        """Set title for a group chat"""
+        quick_chat.QuickChat.set_subject(self, subject)
+        self.subj_wid = urwid.Text(
+            str(subject.replace("\n", "|") if wrap == "clip" else subject),
+            align="left" if wrap == "clip" else "center",
+            wrap=wrap,
+        )
+        self.chat_widget.header = urwid.AttrMap(self.subj_wid, "title")
+        self.host.redraw()
+
+    ## Messages
+
+    def print_messages(self, clear=True):
+        """generate message widgets
+
+        @param clear(bool): clear message before printing if true
+        """
+        if clear:
+            del self.mess_walker[:]
+        for message in self.messages.values():
+            self.appendMessage(message, minor_notifs=False)
+
+    def redraw(self):
+        """redraw all messages"""
+        for w in self.mess_walker:
+            try:
+                w.redraw()
+            except AttributeError:
+                pass
+
+    def update_history(self, size=C.HISTORY_LIMIT_DEFAULT, filters=None, profile="@NONE@"):
+        del self.mess_walker[:]
+        if filters and "search" in filters:
+            self.mess_walker.append(
+                urwid.Text(
+                    _("Results for searching the globbing pattern: {}").format(
+                        filters["search"]
+                    )
+                )
+            )
+            self.mess_walker.append(
+                urwid.Text(_("Type ':history <lines>' to reset the chat history"))
+            )
+        super(Chat, self).update_history(size, filters, profile)
+
+    def _on_history_printed(self):
+        """Refresh or scroll down the focus after the history is printed"""
+        self.print_messages(clear=False)
+        super(Chat, self)._on_history_printed()
+
+    def on_private_created(self, widget):
+        self.host.contact_lists[widget.profile].set_special(
+            widget.target, C.CONTACT_SPECIAL_GROUP
+        )
+
+    def on_selected(self):
+        self.focus_marker_set = False
+
+    def notify(self, contact="somebody", msg=""):
+        """Notify the user of a new message if Libervia TUI doesn't have the focus.
+
+        @param contact (unicode): contact who wrote to the users
+        @param msg (unicode): the message that has been received
+        """
+        # FIXME: not called anymore after refactoring
+        if msg == "":
+            return
+        if self.mess_widgets.get_focus()[1] == len(self.mess_walker) - 2:
+            # we don't change focus if user is not at the bottom
+            # as that mean that he is probably watching discussion history
+            self.mess_widgets.focus_position = len(self.mess_walker) - 1
+        self.host.redraw()
+        if not self.host.x_notify.has_focus():
+            if self.type == C.CHAT_ONE2ONE:
+                self.host.x_notify.send_notification(
+                    _("LiberviaTUI: %s is talking to you") % contact
+                )
+            elif self.nick is not None and self.nick.lower() in msg.lower():
+                self.host.x_notify.send_notification(
+                    _("LiberviaTUI: %(user)s mentioned you in room '%(room)s'")
+                    % {"user": contact, "room": self.target}
+                )
+
+    # MENU EVENTS #
+    def on_tarot_request(self, menu):
+        # TODO: move this to plugin_misc_tarot with dynamic menu
+        if len(self.occupants) != 4:
+            self.host.show_pop_up(
+                sat_widgets.Alert(
+                    _("Can't start game"),
+                    _(
+                        "You need to be exactly 4 peoples in the room to start a Tarot game"
+                    ),
+                    ok_cb=self.host.remove_pop_up,
+                )
+            )
+        else:
+            self.host.bridge.tarot_game_create(
+                self.target, list(self.occupants), self.profile
+            )
+
+    # MISC EVENTS #
+
+    def on_delete(self):
+        # FIXME: to be checked after refactoring
+        super(Chat, self).on_delete()
+        if self.type == C.CHAT_GROUP:
+            self.host.removeListener("presence", self.presence_listener)
+
+    def on_chat_state(self, from_jid, state, profile):
+        super(Chat, self).on_chat_state(from_jid, state, profile)
+        if self.type == C.CHAT_ONE2ONE:
+            self.title_dynamic = C.CHAT_STATE_ICON[state]
+            self.host.redraw()  # FIXME: should not be necessary
+
+    def _on_subject_dialog_cb(self, button, dialog):
+        self.change_subject(dialog.text)
+        self.host.remove_pop_up(dialog)
+
+    def on_subject_dialog(self, new_subject=None):
+        dialog = sat_widgets.InputDialog(
+            _("Change title"),
+            _("Enter the new title"),
+            default_txt=new_subject if new_subject is not None else self.subject,
+        )
+        dialog.set_callback("ok", self._on_subject_dialog_cb, dialog)
+        dialog.set_callback("cancel", lambda __: self.host.remove_pop_up(dialog))
+        self.host.show_pop_up(dialog)
+
+
+quick_widgets.register(quick_chat.QuickChat, Chat)
+quick_widgets.register(quick_games.Tarot, game_tarot.TarotGame)