view sat_frontends/primitivus/chat.py @ 3911:8289ac1b34f4

plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo: - support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace - maintains one identity across both versions of OMEMO - migrates data from the old plugin - includes more features for protocol stability - uses SCE for modern OMEMO - fully type-checked, linted and format-checked - added type hints to various pieces of backend code used by the plugin - added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy) - core (xmpp): enabled `send` trigger and made it an asyncPoint fix 375
author Syndace <me@syndace.dev>
date Tue, 23 Aug 2022 21:06:24 +0200
parents be6d91572633
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# Primitivus: a SAT frontend
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from functools import total_ordering
from pathlib import Path
import bisect
import urwid
from urwid_satext import sat_widgets
from sat.core.i18n import _
from sat.core import log as logging
from sat_frontends.quick_frontend import quick_widgets
from sat_frontends.quick_frontend import quick_chat
from sat_frontends.quick_frontend import quick_games
from sat_frontends.primitivus import game_tarot
from sat_frontends.primitivus.constants import Const as C
from sat_frontends.primitivus.keys import action_key_map as a_key
from sat_frontends.primitivus.widget import PrimitivusWidget
from sat_frontends.primitivus.contact_list import ContactList


log = logging.getLogger(__name__)


OCCUPANTS_FOOTER = _("{} occupants")


class MessageWidget(urwid.WidgetWrap, quick_chat.MessageWidget):
    def __init__(self, mess_data):
        """
        @param mess_data(quick_chat.Message, None): message data
            None: used only for non text widgets (e.g.: focus separator)
        """
        self.mess_data = mess_data
        mess_data.widgets.add(self)
        super(MessageWidget, self).__init__(urwid.Text(self.markup))

    @property
    def markup(self):
        return (
            self._generateInfoMarkup()
            if self.mess_data.type == C.MESS_TYPE_INFO
            else self._generateMarkup()
        )

    @property
    def info_type(self):
        return self.mess_data.info_type

    @property
    def parent(self):
        return self.mess_data.parent

    @property
    def message(self):
        """Return currently displayed message"""
        return self.mess_data.main_message

    @message.setter
    def message(self, value):
        self.mess_data.message = {"": value}
        self.redraw()

    @property
    def type(self):
        try:
            return self.mess_data.type
        except AttributeError:
            return C.MESS_TYPE_INFO

    def redraw(self):
        self._w.set_text(self.markup)
        self.mess_data.parent.host.redraw()  # FIXME: should not be necessary

    def selectable(self):
        return True

    def keypress(self, size, key):
        return key

    def get_cursor_coords(self, size):
        return 0, 0

    def render(self, size, focus=False):
        # Text widget doesn't render cursor, but we want one
        # so we add it here
        canvas = urwid.CompositeCanvas(self._w.render(size, focus))
        if focus:
            canvas.set_cursor(self.get_cursor_coords(size))
        return canvas

    def _generateInfoMarkup(self):
        return ("info_msg", self.message)

    def _generateMarkup(self):
        """Generate text markup according to message data and Widget options"""
        markup = []
        d = self.mess_data
        mention = d.mention

        # message status
        if d.status is None:
            markup.append(" ")
        elif d.status == "delivered":
            markup.append(("msg_status_received", "✔"))
        else:
            log.warning("Unknown status: {}".format(d.status))

        # timestamp
        if self.parent.show_timestamp:
            attr = "msg_mention" if mention else "date"
            markup.append((attr, "[{}]".format(d.time_text)))
        else:
            if mention:
                markup.append(("msg_mention", "[*]"))

        # nickname
        if self.parent.show_short_nick:
            markup.append(
                ("my_nick" if d.own_mess else "other_nick", "**" if d.own_mess else "*")
            )
        else:
            markup.append(
                ("my_nick" if d.own_mess else "other_nick", "[{}] ".format(d.nick or ""))
            )

        msg = self.message  # needed to generate self.selected_lang

        if d.selected_lang:
            markup.append(("msg_lang", "[{}] ".format(d.selected_lang)))

        # message body
        markup.append(msg)

        return markup

    # events
    def update(self, update_dict=None):
        """update all the linked message widgets

        @param update_dict(dict, None): key=attribute updated value=new_value
        """
        self.redraw()


@total_ordering
class OccupantWidget(urwid.WidgetWrap):
    def __init__(self, occupant_data):
        self.occupant_data = occupant_data
        occupant_data.widgets.add(self)
        markup = self._generateMarkup()
        text = sat_widgets.ClickableText(markup)
        urwid.connect_signal(
            text,
            "click",
            self.occupant_data.parent._occupantsClicked,
            user_args=[self.occupant_data],
        )
        super(OccupantWidget, self).__init__(text)

    def __hash__(self):
        return id(self)

    def __eq__(self, other):
        if other is None:
            return False
        return self.occupant_data.nick == other.occupant_data.nick

    def __lt__(self, other):
        return self.occupant_data.nick.lower() < other.occupant_data.nick.lower()

    @property
    def markup(self):
        return self._generateMarkup()

    @property
    def parent(self):
        return self.mess_data.parent

    @property
    def nick(self):
        return self.occupant_data.nick

    def redraw(self):
        self._w.set_text(self.markup)
        self.occupant_data.parent.host.redraw()  # FIXME: should not be necessary

    def selectable(self):
        return True

    def keypress(self, size, key):
        return key

    def get_cursor_coords(self, size):
        return 0, 0

    def render(self, size, focus=False):
        # Text widget doesn't render cursor, but we want one
        # so we add it here
        canvas = urwid.CompositeCanvas(self._w.render(size, focus))
        if focus:
            canvas.set_cursor(self.get_cursor_coords(size))
        return canvas

    def _generateMarkup(self):
        # TODO: role and affiliation are shown in a Q&D way
        #       should be more intuitive and themable
        o = self.occupant_data
        markup = []
        markup.append(
            ("info_msg", "{}{} ".format(o.role[0].upper(), o.affiliation[0].upper()))
        )
        markup.append(o.nick)
        if o.state is not None:
            markup.append(" {}".format(C.CHAT_STATE_ICON[o.state]))
        return markup

    # events
    def update(self, update_dict=None):
        self.redraw()


class OccupantsWidget(urwid.WidgetWrap):
    def __init__(self, parent):
        self.parent = parent
        self.occupants_walker = urwid.SimpleListWalker([])
        self.occupants_footer = urwid.Text("", align="center")
        self.updateFooter()
        occupants_widget = urwid.Frame(
            urwid.ListBox(self.occupants_walker), footer=self.occupants_footer
        )
        super(OccupantsWidget, self).__init__(occupants_widget)
        occupants_list = sorted(list(self.parent.occupants.keys()), key=lambda o: o.lower())
        for occupant in occupants_list:
            occupant_data = self.parent.occupants[occupant]
            self.occupants_walker.append(OccupantWidget(occupant_data))

    def clear(self):
        del self.occupants_walker[:]

    def updateFooter(self):
        """update footer widget"""
        txt = OCCUPANTS_FOOTER.format(len(self.parent.occupants))
        self.occupants_footer.set_text(txt)

    def getNicks(self, start=""):
        """Return nicks of all occupants

        @param start(unicode): only return nicknames which start with this text
        """
        return [
            w.nick
            for w in self.occupants_walker
            if isinstance(w, OccupantWidget) and w.nick.startswith(start)
        ]

    def addUser(self, occupant_data):
        """add a user to the list"""
        bisect.insort(self.occupants_walker, OccupantWidget(occupant_data))
        self.updateFooter()
        self.parent.host.redraw()  # FIXME: should not be necessary

    def removeUser(self, occupant_data):
        """remove a user from the list"""
        for widget in occupant_data.widgets:
            self.occupants_walker.remove(widget)
        self.updateFooter()
        self.parent.host.redraw()  # FIXME: should not be necessary


class Chat(PrimitivusWidget, quick_chat.QuickChat):
    def __init__(self, host, target, type_=C.CHAT_ONE2ONE, nick=None, occupants=None,
                 subject=None, statuses=None, profiles=None):
        self.filters = []  # list of filter callbacks to apply
        self.mess_walker = urwid.SimpleListWalker([])
        self.mess_widgets = urwid.ListBox(self.mess_walker)
        self.chat_widget = urwid.Frame(self.mess_widgets)
        self.chat_colums = urwid.Columns([("weight", 8, self.chat_widget)])
        self.pile = urwid.Pile([self.chat_colums])
        PrimitivusWidget.__init__(self, self.pile, target)
        quick_chat.QuickChat.__init__(
            self, host, target, type_, nick, occupants, subject, statuses,
            profiles=profiles
        )

        # we must adapt the behaviour with the type
        if type_ == C.CHAT_GROUP:
            if len(self.chat_colums.contents) == 1:
                self.occupants_widget = OccupantsWidget(self)
                self.occupants_panel = sat_widgets.VerticalSeparator(
                    self.occupants_widget
                )
                self._appendOccupantsPanel()
                self.host.addListener("presence", self.presenceListener, [profiles])

        # focus marker is a separator indicated last visible message before focus was lost
        self.focus_marker = None  # link to current marker
        self.focus_marker_set = None  # True if a new marker has been inserted
        self.show_timestamp = True
        self.show_short_nick = False
        self.show_title = 1  # 0: clip title; 1: full title; 2: no title
        self.postInit()

    @property
    def message_widgets_rev(self):
        return reversed(self.mess_walker)

    def keypress(self, size, key):
        if key == a_key["OCCUPANTS_HIDE"]:  # user wants to (un)hide the occupants panel
            if self.type == C.CHAT_GROUP:
                widgets = [widget for (widget, options) in self.chat_colums.contents]
                if self.occupants_panel in widgets:
                    self._removeOccupantsPanel()
                else:
                    self._appendOccupantsPanel()
        elif key == a_key["TIMESTAMP_HIDE"]:  # user wants to (un)hide timestamp
            self.show_timestamp = not self.show_timestamp
            self.redraw()
        elif key == a_key["SHORT_NICKNAME"]:  # user wants to (not) use short nick
            self.show_short_nick = not self.show_short_nick
            self.redraw()
        elif (key == a_key["SUBJECT_SWITCH"]):
            # user wants to (un)hide group's subject or change its apperance
            if self.subject:
                self.show_title = (self.show_title + 1) % 3
                if self.show_title == 0:
                    self.setSubject(self.subject, "clip")
                elif self.show_title == 1:
                    self.setSubject(self.subject, "space")
                elif self.show_title == 2:
                    self.chat_widget.header = None
                self._invalidate()
        elif key == a_key["GOTO_BOTTOM"]:  # user wants to focus last message
            self.mess_widgets.focus_position = len(self.mess_walker) - 1

        return super(Chat, self).keypress(size, key)

    def completion(self, text, completion_data):
        """Completion method which complete nicknames in group chat

        for params, see [sat_widgets.AdvancedEdit]
        """
        if self.type != C.CHAT_GROUP:
            return text

        space = text.rfind(" ")
        start = text[space + 1 :]
        words = self.occupants_widget.getNicks(start)
        if not words:
            return text
        try:
            word_idx = words.index(completion_data["last_word"]) + 1
        except (KeyError, ValueError):
            word_idx = 0
        else:
            if word_idx == len(words):
                word_idx = 0
        word = completion_data["last_word"] = words[word_idx]
        return "{}{}{}".format(text[: space + 1], word, ": " if space < 0 else "")

    def getMenu(self):
        """Return Menu bar"""
        menu = sat_widgets.Menu(self.host.loop)
        if self.type == C.CHAT_GROUP:
            self.host.addMenus(menu, C.MENU_ROOM, {"room_jid": self.target.bare})
            game = _("Game")
            menu.addMenu(game, "Tarot", self.onTarotRequest)
        elif self.type == C.CHAT_ONE2ONE:
            # FIXME: self.target is a bare jid, we need to check that
            contact_list = self.host.contact_lists[self.profile]
            if not self.target.resource:
                full_jid = contact_list.getFullJid(self.target)
            else:
                full_jid = self.target
            self.host.addMenus(menu, C.MENU_SINGLE, {"jid": full_jid})
        return menu

    def setFilter(self, args):
        """set filtering of messages

        @param args(list[unicode]): filters following syntax "[filter]=[value]"
            empty list to clear all filters
            only lang=XX is handled for now
        """
        del self.filters[:]
        if args:
            if args[0].startswith("lang="):
                lang = args[0][5:].strip()
                self.filters.append(lambda mess_data: lang in mess_data.message)

        self.printMessages()

    def presenceListener(self, entity, show, priority, statuses, profile):
        """Update entity's presence status

        @param entity (jid.JID): entity updated
        @param show: availability
        @param priority: resource's priority
        @param statuses: dict of statuses
        @param profile: %(doc_profile)s
        """
        # FIXME: disable for refactoring, need to be checked and re-enabled
        return
        # assert self.type == C.CHAT_GROUP
        # if entity.bare != self.target:
        #     return
        # self.update(entity)

    def createMessage(self, message):
        self.appendMessage(message)

    def _scrollDown(self):
        """scroll down message only if we are already at the bottom (minus 1)"""
        current_focus = self.mess_widgets.focus_position
        bottom = len(self.mess_walker) - 1
        if current_focus == bottom - 1:
            self.mess_widgets.focus_position = bottom  # scroll down
        self.host.redraw()  # FIXME: should not be necessary

    def appendMessage(self, message, minor_notifs=True):
        """Create a MessageWidget and append it

        Can merge info messages together if desirable (e.g.: multiple joined/leave)
        @param message(quick_chat.Message): message to add
        @param minor_notifs(boolean): if True, basic notifications are allowed
            If False, notification are not shown except if we have an important one
            (like a mention).
            False is generally used when printing history, when we don't want every
            message to be notified.
        """
        if message.attachments:
            # FIXME: Q&D way to see attachments in Primitivus
            #   it should be done in a more user friendly way
            for lang, body in message.message.items():
                for attachment in message.attachments:
                    if 'url' in attachment:
                        body+=f"\n{attachment['url']}"
                    elif 'path' in attachment:
                        path = Path(attachment['path'])
                        body+=f"\n{path.as_uri()}"
                    else:
                        log.warning(f'No "url" nor "path" in attachment: {attachment}')
                    message.message[lang] = body

        if self.filters:
            if not all([f(message) for f in self.filters]):
                return

        if self.handleUserMoved(message):
            return

        if ((self.host.selected_widget != self or not self.host.x_notify.hasFocus())
            and self.focus_marker_set is not None):
            if not self.focus_marker_set and not self._locked and self.mess_walker:
                if self.focus_marker is not None:
                    try:
                        self.mess_walker.remove(self.focus_marker)
                    except ValueError:
                        # self.focus_marker may not be in mess_walker anymore if
                        # mess_walker has been cleared, e.g. when showing search
                        # result or using :history command
                        pass
                self.focus_marker = urwid.Divider("—")
                self.mess_walker.append(self.focus_marker)
                self.focus_marker_set = True
                self._scrollDown()
        else:
            if self.focus_marker_set:
                self.focus_marker_set = False

        wid = MessageWidget(message)
        self.mess_walker.append(wid)
        self._scrollDown()
        if self.isUserMoved(message):
            return  # no notification for moved messages

        # notifications

        if self._locked:
            # we don't want notifications when locked
            # because that's history messages
            return

        if wid.mess_data.mention:
            from_jid = wid.mess_data.from_jid
            msg = _(
                "You have been mentioned by {nick} in {room}".format(
                    nick=wid.mess_data.nick, room=self.target
                )
            )
            self.host.notify(
                C.NOTIFY_MENTION, from_jid, msg, widget=self, profile=self.profile
            )
        elif not minor_notifs:
            return
        elif self.type == C.CHAT_ONE2ONE:
            from_jid = wid.mess_data.from_jid
            msg = _("{entity} is talking to you".format(entity=from_jid))
            self.host.notify(
                C.NOTIFY_MESSAGE, from_jid, msg, widget=self, profile=self.profile
            )
        else:
            self.host.notify(
                C.NOTIFY_MESSAGE, self.target, widget=self, profile=self.profile
            )

    def addUser(self, nick):
        occupant = super(Chat, self).addUser(nick)
        self.occupants_widget.addUser(occupant)

    def removeUser(self, occupant_data):
        occupant = super(Chat, self).removeUser(occupant_data)
        if occupant is not None:
            self.occupants_widget.removeUser(occupant)

    def occupantsClear(self):
        super(Chat, self).occupantsClear()
        self.occupants_widget.clear()

    def _occupantsClicked(self, occupant, clicked_wid):
        assert self.type == C.CHAT_GROUP
        contact_list = self.host.contact_lists[self.profile]

        # we have a click on a nick, we need to create the widget if it doesn't exists
        self.getOrCreatePrivateWidget(occupant.jid)

        # now we select the new window
        for contact_list in self.host.widgets.getWidgets(
            ContactList, profiles=(self.profile,)
        ):
            contact_list.setFocus(occupant.jid, True)

    def _appendOccupantsPanel(self):
        self.chat_colums.contents.append((self.occupants_panel, ("weight", 2, False)))

    def _removeOccupantsPanel(self):
        for widget, options in self.chat_colums.contents:
            if widget is self.occupants_panel:
                self.chat_colums.contents.remove((widget, options))
                break

    def addGamePanel(self, widget):
        """Insert a game panel to this Chat dialog.

        @param widget (Widget): the game panel
        """
        assert len(self.pile.contents) == 1
        self.pile.contents.insert(0, (widget, ("weight", 1)))
        self.pile.contents.insert(1, (urwid.Filler(urwid.Divider("-"), ("fixed", 1))))
        self.host.redraw()

    def removeGamePanel(self, widget):
        """Remove the game panel from this Chat dialog.

        @param widget (Widget): the game panel
        """
        assert len(self.pile.contents) == 3
        del self.pile.contents[0]
        self.host.redraw()

    def setSubject(self, subject, wrap="space"):
        """Set title for a group chat"""
        quick_chat.QuickChat.setSubject(self, subject)
        self.subj_wid = urwid.Text(
            str(subject.replace("\n", "|") if wrap == "clip" else subject),
            align="left" if wrap == "clip" else "center",
            wrap=wrap,
        )
        self.chat_widget.header = urwid.AttrMap(self.subj_wid, "title")
        self.host.redraw()

    ## Messages

    def printMessages(self, clear=True):
        """generate message widgets

        @param clear(bool): clear message before printing if true
        """
        if clear:
            del self.mess_walker[:]
        for message in self.messages.values():
            self.appendMessage(message, minor_notifs=False)

    def redraw(self):
        """redraw all messages"""
        for w in self.mess_walker:
            try:
                w.redraw()
            except AttributeError:
                pass

    def updateHistory(self, size=C.HISTORY_LIMIT_DEFAULT, filters=None, profile="@NONE@"):
        del self.mess_walker[:]
        if filters and "search" in filters:
            self.mess_walker.append(
                urwid.Text(
                    _("Results for searching the globbing pattern: {}").format(
                        filters["search"]
                    )
                )
            )
            self.mess_walker.append(
                urwid.Text(_("Type ':history <lines>' to reset the chat history"))
            )
        super(Chat, self).updateHistory(size, filters, profile)

    def _onHistoryPrinted(self):
        """Refresh or scroll down the focus after the history is printed"""
        self.printMessages(clear=False)
        super(Chat, self)._onHistoryPrinted()

    def onPrivateCreated(self, widget):
        self.host.contact_lists[widget.profile].setSpecial(
            widget.target, C.CONTACT_SPECIAL_GROUP
        )

    def onSelected(self):
        self.focus_marker_set = False

    def notify(self, contact="somebody", msg=""):
        """Notify the user of a new message if primitivus doesn't have the focus.

        @param contact (unicode): contact who wrote to the users
        @param msg (unicode): the message that has been received
        """
        # FIXME: not called anymore after refactoring
        if msg == "":
            return
        if self.mess_widgets.get_focus()[1] == len(self.mess_walker) - 2:
            # we don't change focus if user is not at the bottom
            # as that mean that he is probably watching discussion history
            self.mess_widgets.focus_position = len(self.mess_walker) - 1
        self.host.redraw()
        if not self.host.x_notify.hasFocus():
            if self.type == C.CHAT_ONE2ONE:
                self.host.x_notify.sendNotification(
                    _("Primitivus: %s is talking to you") % contact
                )
            elif self.nick is not None and self.nick.lower() in msg.lower():
                self.host.x_notify.sendNotification(
                    _("Primitivus: %(user)s mentioned you in room '%(room)s'")
                    % {"user": contact, "room": self.target}
                )

    # MENU EVENTS #
    def onTarotRequest(self, menu):
        # TODO: move this to plugin_misc_tarot with dynamic menu
        if len(self.occupants) != 4:
            self.host.showPopUp(
                sat_widgets.Alert(
                    _("Can't start game"),
                    _(
                        "You need to be exactly 4 peoples in the room to start a Tarot game"
                    ),
                    ok_cb=self.host.removePopUp,
                )
            )
        else:
            self.host.bridge.tarotGameCreate(
                self.target, list(self.occupants), self.profile
            )

    # MISC EVENTS #

    def onDelete(self):
        # FIXME: to be checked after refactoring
        super(Chat, self).onDelete()
        if self.type == C.CHAT_GROUP:
            self.host.removeListener("presence", self.presenceListener)

    def onChatState(self, from_jid, state, profile):
        super(Chat, self).onChatState(from_jid, state, profile)
        if self.type == C.CHAT_ONE2ONE:
            self.title_dynamic = C.CHAT_STATE_ICON[state]
            self.host.redraw()  # FIXME: should not be necessary

    def _onSubjectDialogCb(self, button, dialog):
        self.changeSubject(dialog.text)
        self.host.removePopUp(dialog)

    def onSubjectDialog(self, new_subject=None):
        dialog = sat_widgets.InputDialog(
            _("Change title"),
            _("Enter the new title"),
            default_txt=new_subject if new_subject is not None else self.subject,
        )
        dialog.setCallback("ok", self._onSubjectDialogCb, dialog)
        dialog.setCallback("cancel", lambda __: self.host.removePopUp(dialog))
        self.host.showPopUp(dialog)


quick_widgets.register(quick_chat.QuickChat, Chat)
quick_widgets.register(quick_games.Tarot, game_tarot.TarotGame)