view libervia/web/pages/chat/_browser/__init__.py @ 1620:3a60bf3762ef

browser: threads and replies implementation: rel 457
author Goffi <goffi@goffi.org>
date Tue, 06 May 2025 00:40:07 +0200
parents a2cd4222c702
children
line wrap: on
line source

import json
import re
from typing import Callable
import errors

from bridge import AsyncBridge as Bridge
from browser import DOMNode, aio, console as log, document, window
from cache import cache, identities, roster
import dialog
from file_uploader import FileUploader
import jid
from javascript import pyobj2jsobj
from js_modules import emoji_picker_element
from js_modules.tippy_js import tippy as tippy_ori
import popup
from template import Template, safe
from tools import is_touch_device, remove_ids
from loading import remove_loading_screen
from jid_search import JidSearch
from interpreter import Inspector
from components import init_collapsible_cards

log.warning = log.warn
profile = window.profile or ""
# JID used in the local chat (real JID for one2one, room JID otherwise)
own_local_jid = jid.JID(window.own_local_jid)
target_jid = jid.JID(window.target_jid)
chat_type = window.chat_type
chat_url = window.chat_url
bridge = Bridge()

# Sensible value to consider that user is at the bottom
SCROLL_SENSITIVITY = 200

INPUT_MODES = {"normal", "edit", "quote"}
MODE_CLASS = "mode_{}"
SELECTED_THREAD_CLS = "has-background-info-light"


# FIXME: workaround for https://github.com/brython-dev/brython/issues/2542
def tippy(target, data):
    return tippy_ori(target, pyobj2jsobj(data))


class NewChatDialog:

    def __init__(self, on_select: Callable[[str], None]|None = None) -> None:
        self.on_select = on_select
        self.new_chat_dialog_tpl = Template("chat/new_chat_dialog.html")
        self.dialog_elt = self.new_chat_dialog_tpl.get_elt()
        self.modal = dialog.Modal(self.dialog_elt, is_card=True)

        # direct chat
        self.direct_search_input_elt = self.dialog_elt.select_one(
            "div.direct-content input.search-input"
        )
        self.direct_items_container = self.dialog_elt.select_one(".direct-items")
        self.direct_count_elt = self.dialog_elt.select_one(".direct-count")
        self.start_chat_btn = self.dialog_elt.select_one(".action_ok")
        assert self.start_chat_btn is not None
        self.start_chat_btn.bind("click", self.on_start_chat_btn)
        if not self.direct_count_elt or not self.start_chat_btn:
            log.error('"direct-count" or "action_ok" element is missing.')
        self.selected_entities = set()
        self.jid_search = JidSearch(
            self.direct_search_input_elt,
            self.direct_items_container,
            click_cb = self.on_search_item_click,
            template = "chat/search_item.html",
        )
        for elt in self.dialog_elt.select(".action_close"):
            elt.bind("click", lambda __: self.close())
        for elt in self.dialog_elt.select("div.direct-content .action_clear_search"):
            elt.bind("click", lambda __: self.clear_search_input())

        # groups
        self.groups_search_input_elt = self.dialog_elt.select_one(
            "div.groups-content input.search-input"
        )
        self.groups_items_container = self.dialog_elt.select_one(".groups-items")

        self.selected_entities = set()

        self.groups_jid_search = JidSearch(
            self.groups_search_input_elt,
            self.groups_items_container,
            click_cb = self.on_group_search_item_click,
            options={"type": "group"},
            template = "chat/groups_search_item.html",
        )
        for elt in self.dialog_elt.select(".action_close"):
            elt.bind("click", lambda __: self.close())
        for elt in self.dialog_elt.select("div.groups-content .action_clear_search"):
            elt.bind("click", lambda __: self.clear_groups_search_input())

        self.new_room_btn = self.dialog_elt.select_one(".action_new_room")
        assert self.new_room_btn is not None
        self.new_room_btn.bind("click", self.on_new_room_btn)
        self.panel_new_room = self.dialog_elt.select_one(".panel_new_room")
        assert self.panel_new_room is not None
        self.create_room_btn = self.dialog_elt.select_one(".action_create_room")
        assert self.create_room_btn is not None
        self.create_room_btn.bind("click", self._on_create_room_btn)

        self.error_message_elt = self.dialog_elt.select_one("div.error-message")
        assert self.error_message_elt is not None
        self.error_message_elt.select_one("button.delete").bind(
            "click",
            lambda __: self.error_message_elt.classList.add("is-hidden")
        )

        # tabs
        self.tabs = {}
        self.selected_tab_elt = None
        for tab_elt in self.dialog_elt.select('div.tabs>ul>li'):
            if tab_elt.classList.contains("is-active"):
                self.selected_tab_elt = tab_elt
            tab_name = tab_elt.dataset.tab
            tab_content_elt = self.dialog_elt.select_one(f".{tab_name}-content")
            assert tab_content_elt is not None
            self.tabs[tab_elt] = tab_content_elt
            tab_elt.bind(
                'click',
                lambda __, tab_elt=tab_elt: self.set_active_tab(tab_elt)
            )

    def set_active_tab(self, selected_tab_elt) -> None:
        """Display a tab."""
        self.selected_tab_elt = selected_tab_elt
        for tab_elt, tab_content_elt in self.tabs.items():
            if tab_elt == selected_tab_elt:
                tab_elt.classList.add("is-active")
                tab_content_elt.classList.remove("is-hidden")
            else:
                tab_elt.classList.remove("is-active")
                tab_content_elt.classList.add("is-hidden")
        self.update()

    def clear_search_input(self) -> None:
        """Clear search input, and update dialog."""
        self.direct_search_input_elt.value = ""
        self.direct_search_input_elt.dispatchEvent(window.Event.new("input"))
        self.update()

    def clear_groups_search_input(self) -> None:
        """Clear search input, and update dialog."""
        self.groups_search_input_elt.value = ""
        self.groups_search_input_elt.dispatchEvent(window.Event.new("input"))
        self.groups_update()

    def on_search_item_click(self, event, item) -> None:
        """A search item has been clicked"""
        search_item_elt = event.currentTarget
        search_item_elt.classList.toggle("is-selected")
        self.update()

    def on_group_search_item_click(self, event, item) -> None:
        """A search item has been clicked"""
        for item_elt in self.groups_items_container.select(".search-item"):
            if item_elt == event.currentTarget:
                item_elt.classList.add("is-selected")
            else:
                item_elt.classList.remove("is-selected")
        self.update()

    def update(self) -> None:
        """Update dialog elements (counter, button) when search items change."""
        assert self.selected_tab_elt is not None
        current_tab = self.selected_tab_elt.dataset.tab
        match current_tab:
            case "direct":
                self.selected_entities = {
                    item_elt.dataset.entity for item_elt in
                    self.direct_items_container.select(".search-item.is-selected")
                }
                self.direct_count_elt.text = str(len(self.selected_entities))
            case "groups":
                self.selected_entities = {
                    item_elt.dataset.entity for item_elt in
                    self.groups_items_container.select(".search-item.is-selected")
                }
            case _:
                raise ValueError(f"Unknown tab: {current_tab!r}.")

        self.start_chat_btn.disabled = not bool(self.selected_entities)

    def groups_update(self) -> None:
        """Update dialog elements when groups search items change."""
        self.selected_entities = {
            item_elt.dataset.entity for item_elt in
            self.direct_items_container.select(".search-item.is-selected")
        }
        self.start_chat_btn.disabled = not bool(self.selected_entities)

    def on_new_room_btn(self, evt) -> None:
        self.panel_new_room.classList.toggle("is-hidden")

    def _on_create_room_btn(self, evt) -> None:
        aio.run(self.on_create_room_btn())

    async def on_create_room_btn(self) -> None:
        assert self.on_select is not None
        input_elt = self.dialog_elt.select_one(".input-room-name")
        assert input_elt is not None
        try:
            joined_data = await bridge.muc_join(input_elt.value.strip(), "", {})
        except Exception as e:
            msg = f"Can't create room: {e}"
            log.error(msg)
            self.error_message_elt.select_one("p").text = msg
            self.error_message_elt.classList.remove("is-hidden")
            return

        joined, room_jid_s, occupants, user_nick, subject, statuses, profile = joined_data
        self.on_select(room_jid_s)

    def on_start_chat_btn(self, evt) -> None:
        evt.stopPropagation()
        if self.on_select is None:
            return
        if not self.selected_entities:
            raise errors.InternalError(
                "Start button should never be called when no entity is selected."
            )
        if len(self.selected_entities) == 1:
            selected_entity = next(iter(self.selected_entities))
            self.on_select(selected_entity)
        else:
            aio.run(self.create_room_selected_jids())

    async def create_room_selected_jids(self) -> None:
        assert self.on_select is not None
        joined_data = await bridge.muc_join("", "", {})
        joined, room_jid_s, occupants, user_nick, subject, statuses, profile = joined_data
        if not self.selected_entities:
            Inspector()
        for entity_jid in self.selected_entities:
            print(f"inviting {entity_jid=}")
            await bridge.muc_invite(entity_jid, room_jid_s, {})
        self.on_select(room_jid_s)


    def show(self) -> None:
        """Show the dialog."""
        # We want ot be sure to have the elements correctly set when dialog is shown
        self.update()
        self.modal.show()

    def close(self) -> None:
        """Close the dialog."""
        self.modal.close()


class LiberviaWebChat:
    def __init__(self):
        self._input_mode = "normal"
        self.input_data = {}
        self.direct_messages_tpl = Template("chat/direct_messages.html")
        self.message_tpl = Template("chat/message.html")
        self.extra_menu_tpl = Template("chat/extra_menu.html")
        self.reactions_tpl = Template("chat/reactions.html")
        self.reactions_details_tpl = Template("chat/reactions_details.html")
        self.url_preview_control_tpl = Template("components/url_preview_control.html")
        self.url_preview_tpl = Template("components/url_preview.html")
        self.new_messages_marker_elt = Template("chat/new_messages_marker.html").get_elt()
        self.editions_tpl = Template("chat/editions.html")
        self.occupant_item_tpl = Template("chat/occupant_item.html")

        # panels and their toggle buttons

        self.left_panel = document["left_panel"]
        self.left_toggle = document["left_panel-toggle"]
        self.left_toggle.bind("click", self.on_left_panel_toggle_click)
        self.main_panel = document["main_panel"]
        self.right_panel = document["right_panel"]
        self.right_toggle = document["right_panel-toggle"]
        self.right_toggle.bind("click", self.on_right_panel_toggle_click)

        self.messages_elt = document["messages"]

        # right-panel internal buttons
        init_collapsible_cards(self.right_panel)

        # attachments
        self.file_uploader = FileUploader(
            "", "chat/attachment_preview.html", on_delete_cb=self.on_attachment_delete
        )
        self.attachments_elt = document["attachments"]
        self.message_input = document["message_input_area"]

        # reply to/thread
        self.thread_panel_tpl = Template("chat/thread_panel.html")
        # current thread panel, if any.
        self.thread_panel = None
        self._reply_to = None
        self.thread_id = None
        document["cancel_reply_btn"].bind(
            "click",
            lambda __: setattr(self, "reply_to", None)
        )
        # use `thead` property to modify.
        self._show_thread = None

        # close_button = document.select_one(".modal-close")
        # close_button.bind("click", self.close_modal)

        # hide/show attachments
        MutationObserver = window.MutationObserver
        observer = MutationObserver.new(lambda *__: self.update_attachments_visibility())
        observer.observe(self.attachments_elt, {"childList": True})

        # we want the message scroll to be initially at the bottom
        self.messages_elt.scrollTop = self.messages_elt.scrollHeight

        # listeners/dynamic updates
        self.add_message_event_listeners()
        self.handle_url_previews()
        self.add_reactions_listeners()

        # input
        self.auto_resize_message_input()
        self.message_input.focus()

        # direct messages
        direct_messages_elt = self.direct_messages_tpl.get_elt(
            {
                "roster": roster,
                "identities": identities,
                "chat_url": chat_url
            }
        )
        document["direct-messages"] <= direct_messages_elt

    async def post_init(self) -> None:
        if chat_type == "group":
            occupants = await bridge.muc_occupants_get(
                str(target_jid)
            )
            document["occupants-count"].text = str(len(occupants))
            for occupant, occupant_data in occupants.items():
                occupant_elt = self.occupant_item_tpl.get_elt({
                    "nick": occupant,
                    "item": occupant_data,
                    "identities": identities,
                })
                document["group-occupants"] <= occupant_elt

    @property
    def input_mode(self) -> str:
        return self._input_mode

    @input_mode.setter
    def input_mode(self, new_mode: str) -> None:
        if new_mode == self.input_mode:
            return
        if new_mode not in INPUT_MODES:
            raise ValueError(f"Invalid input mode: {new_mode!r}")
        target_elt = self.message_input
        target_elt.classList.remove(MODE_CLASS.format(self._input_mode))
        self._input_mode = new_mode
        target_elt.classList.add(MODE_CLASS.format(new_mode))
        self.input_data.clear()

    @property
    def is_at_bottom(self):
        return (
            self.messages_elt.scrollHeight
            - self.messages_elt.scrollTop
            - self.messages_elt.clientHeight
            <= SCROLL_SENSITIVITY
        )

    @property
    def reply_to(self) -> dict|None:
        return self._reply_to

    @reply_to.setter
    def reply_to(self, reply_to: dict|None) -> None:
        if reply_to == self._reply_to:
            return
        self._reply_to = reply_to
        document["reply-to_message"].clear()
        if reply_to is None:
            document["reply-to"].classList.add("is-hidden")
        else:
            document["reply-to"].classList.remove("is-hidden")
            parent_message_elt = document[reply_to["id"]]
            cloned_parent_elt = parent_message_elt.cloneNode(True)
            remove_ids(cloned_parent_elt)
            message_actions_elt = cloned_parent_elt.select_one(".message-actions")
            if message_actions_elt is not None:
                message_actions_elt.remove()
            document["reply-to_message"] <= cloned_parent_elt

    @property
    def show_thread(self) -> str|None:
        """Indicate the thread to highlight"""
        return self._show_thread

    @show_thread.setter
    def show_thread(self, thread_id: str|None) -> None:
        """Set the thread to highlight, or None to clear view."""
        if self._show_thread == thread_id:
            return
        if self._show_thread is not None:
            # If we have a previously selected thread, we clean the view.
            for message_core_elt in self.messages_elt.select(".message-core"):
                message_core_elt.classList.remove(SELECTED_THREAD_CLS)
        self._show_thread = thread_id
        if thread_id is not None:
            for message_elt in self.messages_elt.select(".chat-message"):
                try:
                    message_thread_id = message_elt.dataset["thread"]
                except KeyError:
                    continue
                if message_thread_id == thread_id:
                    message_core_elt = message_elt.select_one(".message-core")
                    if not message_core_elt:
                        log.debug(
                            f"Not message core found for message {message_elt['id']!r}."
                        )
                    else:
                        message_core_elt.classList.add(SELECTED_THREAD_CLS)

    def open_chat(self, entity_jid: str) -> None:
        """Change the current chat for the given one."""
        # For now we keep it simple and just load the new location.
        window.location = f"{chat_url}/{entity_jid}"

    async def on_new_chat(self) -> None:
        new_chat_dialog = NewChatDialog(on_select = self.open_chat)
        new_chat_dialog.show()

    async def send_message(self):
        """Send message currently in input area

        The message and corresponding attachment will be sent
        """
        message = self.message_input.value.rstrip()
        log.info(f"{message=}")

        # attachments
        attachments = []
        for attachment_elt in self.attachments_elt.children:
            file_data = json.loads(attachment_elt.getAttribute("data-file"))
            attachments.append(file_data)

        if message or attachments:
            extra = {}

            if attachments:
                extra["attachments"] = attachments

            if self.reply_to:
                extra["reply"] = self.reply_to

            if self.thread_id:
                extra["thread"] = self.thread_id

            # now we send the message
            try:
                if self.input_mode == "edit":
                    message_id = self.input_data["id"]
                    edit_data = {
                        "message": {"": message},
                        "extra": extra
                    }
                    await bridge.message_edit(
                            message_id, json.dumps(edit_data, ensure_ascii=False)
                        )
                else:
                    await bridge.message_send(
                        str(target_jid),
                        {"": message}, {}, "auto", json.dumps(extra, ensure_ascii=False)
                    )
            except Exception as e:
                dialog.notification.show(f"Can't send message: {e}", "error")
            else:
                self.message_input.value = ""
                # We must not reset self.thread_id here, it is when the thread panel is
                #   closed.
                # FIXME: Another mechanism must be used if the input panel is cloned at
                #   some point, with a slide-in panel for thread, as thread_id would then
                #   be used for the thread panel's message input, but not if the main area
                #   message input is used.
                self.reply_to = None
                self.attachments_elt.clear()
                self.auto_resize_message_input()
        self.input_mode = "normal"

    def _on_message_new(
        self,
        uid: str,
        timestamp: float,
        from_jid_s: str,
        to_jid_s: str,
        message: dict,
        subject: dict,
        mess_type: str,
        extra_s: str,
        profile: str,
    ) -> None:
        from_jid = jid.JID(from_jid_s)
        to_jid = jid.JID(to_jid_s)
        if (
            from_jid.bare == window.target_jid
            or to_jid.bare == window.target_jid
        ):
            aio.run(
                self.on_message_new(
                    uid,
                    timestamp,
                    from_jid,
                    to_jid,
                    message,
                    subject,
                    mess_type,
                    json.loads(extra_s),
                    profile,
                )
            )

    def _on_message_update(
        self, uid: str, type_: str, update_data_s: str, profile: str
    ) -> None:
        aio.run(self.on_message_update(uid, type_, update_data_s, profile))

    async def on_message_update(
        self, uid: str, type_: str, update_data_s: str, profile: str
    ) -> None:
        update_data = json.loads(update_data_s)
        is_at_bottom = self.is_at_bottom
        if type_ == "REACTION":
            reactions = update_data["reactions"]
            log.debug(f"new reactions: {reactions}")
            try:
                reactions_wrapper_elt = document[f"msg_reactions_{uid}"]
            except KeyError:
                log.debug(f"Message {uid} not found, no reactions to update")
            else:
                log.debug(f"Message {uid} found, new reactions: {reactions}")
                reactions_elt = self.reactions_tpl.get_elt(
                    {
                        "chat_type": chat_type,
                        "own_local_jid": str(own_local_jid),
                        "reactions": reactions
                    })
                reactions_wrapper_elt.clear()
                reactions_wrapper_elt <= reactions_elt
                self.add_reactions_listeners(reactions_elt)
        elif type_ in ("EDIT", "RETRACT"):
            try:
                old_message_elt = document[uid]
            except KeyError:
                log.debug(f"Message {uid} not found, no {type_.lower()}ion to apply")
            else:
                template_data = await self.message_to_template_data(
                    uid,
                    update_data["timestamp"],
                    jid.JID(update_data["from"]),
                    jid.JID(update_data["to"]),
                    update_data["message"],
                    update_data["subject"],
                    update_data["type"],
                    update_data["extra"]
                )

                new_message_elt = self.message_tpl.get_elt(
                    template_data
                )
                old_message_elt.replaceWith(new_message_elt)
                self.add_message_event_listeners(new_message_elt)
                self.handle_url_previews(new_message_elt)
        else:
            log.warning(f"Unsupported update type: {type_!r}")

        # If user was at the bottom, keep the scroll at the bottom
        if is_at_bottom:
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    async def message_to_template_data(
        self,
        uid: str,
        timestamp: float,
        from_jid: jid.JID,
        to_jid: jid.JID,
        message_data: dict,
        subject_data: dict,
        mess_type: str,
        extra: dict,
    ) -> dict:
        """Generate template data to use with [message_tpl]

            @return: template data
        """
        xhtml_data = extra.get("xhtml")
        if not xhtml_data:
            xhtml = None
        else:
            try:
                xhtml = xhtml_data[""]
            except KeyError:
                xhtml = next(iter(xhtml_data.values()))

        if chat_type == "group":
            await cache.fill_identities([str(from_jid)])
        else:
            await cache.fill_identities([str(jid.JID(from_jid).bare)])
            from_jid = from_jid.bare
        attachments = extra.get("attachments", [])
        for attachment in attachments:
            if "url" not in attachment:
                try:
                    attachment["url"] = next(
                        s['url'] for s in attachment["sources"] if 'url' in s
                    )
                except (StopIteration, KeyError):
                    log.warning(
                        f"An attachment has no URL: {attachment}"
                    )
        msg_data = {
            "id": uid,
            "timestamp": extra.get("updated", timestamp),
            "type": mess_type,
            "from_": str(from_jid),
            "text": message_data.get("") or next(iter(message_data.values()), ""),
            "subject": subject_data.get("") or next(iter(subject_data.values()), ""),
            "type": mess_type,
            "reeceived": extra.get("received_timestamp") or timestamp,
            "encrypted": extra.get("encrypted", False),
            "received": extra.get("received", False),
            "attachments": attachments,
            "extra": extra
        }
        for key in ("thread", "thread_parent", "delay_sender", "info_type"):
            value = extra.get(key)
            if value is not None:
                msg_data[key] = value

        if xhtml:
            msg_data["html"] = safe(xhtml)

        return {
            "own_local_jid": str(own_local_jid),
            "chat_type": chat_type,
            "msg": msg_data,
            "identities": identities,
        }

    async def on_message_new(
        self,
        uid: str,
        timestamp: float,
        from_jid: jid.JID,
        to_jid: jid.JID,
        message_data: dict,
        subject_data: dict,
        mess_type: str,
        extra: dict,
        profile: str,
    ) -> None:
        # FIXME: visibilityState doesn't detect OS events such as `Alt + Tab`, using focus
        #   event may help to get those use cases, but it gives false positives.
        if (
            document.visibilityState == "hidden"
            and self.new_messages_marker_elt.parent is None
        ):
            # the page is not visible, and we have no new messages marker yet, so we add
            # it
            self.messages_elt <= self.new_messages_marker_elt

        template_data = await self.message_to_template_data(
            uid,
            timestamp,
            from_jid,
            to_jid,
            message_data,
            subject_data,
            mess_type,
            extra
        )

        message_elt = self.message_tpl.get_elt(
            template_data
        )


        if "reply" in extra:
            parent_id = extra["reply"]["id"]
            try:
                parent_message_elt = document[parent_id]
            except KeyError:
                log.info(
                    "Parent message of reply not found in current history: "
                    f"{parent_id!r}"
                )
            else:
                thread_id = extra.get("thread", parent_id)
                if "thread" not in parent_message_elt.dataset:
                    parent_message_elt.dataset["thread"] = thread_id
                    # TODO: Regenerate parent message, so thread icon will appear.

        if (
            "thread" in extra
            and self.thread_panel
            and self.thread_id
            and extra["thread"] == self.thread_id
        ):
            # FIXME: This is quite fragile, IDs are removed, meaning that some listener
            #   may not be working correctly, and it's using #input-panel as reference,
            #   but in the future, it may not moved from main area the thread panel like
            #   that anymore.
            cloned_message_elt = message_elt.cloneNode(True)
            remove_ids(cloned_message_elt)
            thread_messages_elt = document["thread-messages"]
            thread_messages_elt.insertBefore(cloned_message_elt, document["input-panel"])

        # Check if user is viewing older messages or is at the bottom
        is_at_bottom = self.is_at_bottom

        self.messages_elt <= message_elt
        self.add_message_event_listeners(message_elt)
        # we add preview in parallel on purpose, as they can be slow to get
        self.handle_url_previews(message_elt)

        # If user was at the bottom, keep the scroll at the bottom
        if is_at_bottom:
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    def auto_resize_message_input(self):
        """Resize the message input field according to content."""
        is_at_bottom = self.is_at_bottom

        # The textarea's height is first reset to 'auto' to ensure it's not influenced by
        # the previous content.
        self.message_input.style.height = "auto"

        # Then the height is set to the scrollHeight of the textarea (which is the height
        # of the content), plus the vertical border, resulting in a textarea that grows as
        # more lines of text are added.
        self.message_input.style.height = f"{self.message_input.scrollHeight + 2}px"

        if is_at_bottom:
            # we want the message are to still display the last message
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    def on_left_panel_toggle_click(self, evt) -> None:
        """Show/Hide side bar."""
        self.left_panel.classList.toggle("is-collapsed")
        self.main_panel.classList.toggle("is-expanded-left")

    def on_right_panel_toggle_click(self, evt) -> None:
        """Show/Hide side bar."""
        self.right_panel.classList.toggle("is-collapsed")
        self.main_panel.classList.toggle("is-expanded-right")

    def on_message_keydown(self, evt):
        """Handle the 'keydown' event of the message input field

        @param evt: The event object. 'target' refers to the textarea element.
        """
        if evt.keyCode == 13:  # <Enter> key
            if not window.navigator.maxTouchPoints:
                # we have a non touch device, we send message on <Enter>
                if not evt.shiftKey:
                    evt.preventDefault()  # Prevents line break
                    aio.run(self.send_message())
        elif evt.keyCode == 27:  # <ESC> key
            evt.preventDefault()
            self.message_input.value = ''
            self.input_mode = 'normal'
            self.auto_resize_message_input()
        elif evt.keyCode == 38:  # <Up> arrow key
            if self.input_mode == "normal" and self.message_input.value.strip() == "":
                evt.preventDefault()
                own_msgs = document.getElementsByClassName('own_msg')
                if own_msgs.length > 0:
                    last_msg = own_msgs[own_msgs.length - 1]
                    aio.run(self.on_action_edit(None, last_msg))

    def update_attachments_visibility(self):
        if len(self.attachments_elt.children):
            self.attachments_elt.classList.remove("is-contracted")
        else:
            self.attachments_elt.classList.add("is-contracted")

    def on_file_selected(self, evt):
        """Handle file selection"""
        log.info("file selected")
        files = evt.currentTarget.files
        self.file_uploader.upload_files(files, self.attachments_elt)
        self.message_input.focus()

    def on_attachment_delete(self, evt):
        evt.stopPropagation()
        target = evt.currentTarget
        item_elt = DOMNode(target.closest(".attachment-preview"))
        item_elt.remove()

    def on_attach_btn_click(self, evt):
        document["file-input"].click()

    def on_reply_btn_click(self, evt) -> None:
        chat_message_elt = evt.currentTarget.closest("div.chat-message")
        self.reply_to = {
            "id": chat_message_elt["id"],
            "to": chat_message_elt.dataset["from"]
        }
        log.debug(f'"reply_to" set to {self.reply_to!r}')


    def on_extra_btn_click(self, evt):
        message_elt = evt.currentTarget.closest("div.chat-message")
        message_core_elt = evt.currentTarget.closest("div.message-core")
        is_own = message_elt.classList.contains("own_msg")
        if is_own:
            own_messages = document.select('.own_msg')
            # with XMPP, we can currently only edit our last message
            can_edit = own_messages and message_elt is own_messages[-1]
        else:
            can_edit = False

        content_elt = self.extra_menu_tpl.get_elt({
            "edit": can_edit,
            "retract": is_own,
        })
        extra_popup = popup.create_popup(evt.target, content_elt, focus_elt=message_core_elt)

        def on_action_click(evt, callback):
            extra_popup.hide()
            aio.run(
                callback(evt, message_elt)
            )

        for cls_name, callback in (
            ("action_quote", self.on_action_quote),
            ("action_edit", self.on_action_edit),
            ("action_retract", self.on_action_retract),
        ):
            for elt in content_elt.select(f".{cls_name}"):
                elt.bind("click", lambda evt, callback=callback: on_action_click(
                    evt, callback
                ))

    def on_reaction_click(self, evt, message_elt):
        window.evt = evt
        aio.run(
            bridge.message_reactions_set(
                message_elt["id"], [evt.detail["unicode"]], "toggle"
            )
        )
        # if evt.deltaY != 0:
        #     document["attachments"].scrollLeft += evt.deltaY * 0.8
        #     evt.preventDefault()

    async def on_show_thread(self, thread_id: str) -> None:
        assert thread_id
        self.thread_id = thread_id

        thread_panel_elt = self.thread_panel_tpl.get_elt({
            "messages": []
        })
        self.thread_panel = thread_panel_elt
        thread_messages_elt = thread_panel_elt.select_one("#thread-messages")
        assert thread_messages_elt is not None
        history_data = await bridge.history_get(
            "", "", -2, True, {"thread_id": thread_id}
        )
        for message_data in history_data:
            uid, timestamp, from_jid, to_jid, message_data, subject_data, mess_type, extra = message_data
            template_data = await self.message_to_template_data(
                uid,
                timestamp=timestamp,
                from_jid=from_jid,
                to_jid=to_jid,
                message_data=message_data,
                subject_data=subject_data,
                mess_type=mess_type,
                extra=json.loads(extra)
            )
            message_elt = self.message_tpl.get_elt(template_data)
            thread_messages_elt <= message_elt
        # FIXME: The whole input-panel is currently moved to the thread panel so listeners
        #   don't have to be moved. At some point, it may be better to make a clone
        #   (without the IDs) and to clone listeners too, this way a slide-in panel could
        #   be used instead of a modal.
        input_panel_elt = document["input-panel"]
        thread_messages_elt <= input_panel_elt

        def on_thread_panel_close():
            document["input-panel-area"].appendChild(input_panel_elt)
            self.thread_id = None
            self.thread_panel = None

        thread_modal = dialog.Modal(
            thread_panel_elt,
            closable=True,
            close_cb=on_thread_panel_close
        )
        thread_modal.show()


    async def get_message_tuple(self, message_elt) -> tuple|None:
        """Retrieve message tuple from as sent by [message_new]

        If not corresponding message data is found, an error will shown, and None is
        returned.
        @param message_elt: message element, it's "id" attribute will be use to retrieve
            message data
        @return: message data as a tuple, or None if not message with this ID is found.
        """
        print(f"{message_elt=}")
        message_id = message_elt['id']
        history_data = await bridge.history_get(
            "", "", -2, True, {"id": message_elt['id']}
        )
        if not history_data:
            dialog.notification.show(f"Can't find message {message_id}", "error")
            return None
        return history_data[0]

    async def on_action_quote(self, __, message_elt) -> None:
        message_data = await self.get_message_tuple(message_elt)
        if message_data is not None:
            messages = message_data[4]
            body = next(iter(messages.values()), "")
            quote = "\n".join(f"> {l}" for l in body.split("\n"))
            self.message_input.value = f"{quote}\n{self.message_input.value}"
            self.input_mode = "quote"
            self.input_data["id"] = message_elt["id"]
            self.auto_resize_message_input()
            self.message_input.focus()

    async def on_action_edit(self, __, message_elt) -> None:
        message_data = await self.get_message_tuple(message_elt)
        if message_data is not None:
            messages = message_data[4]
            body = next(iter(messages.values()), "")
            if not body:
                dialog.notification.show("No content found in message, nothing to edit")
                return

            self.message_input.value = body
            self.input_mode = "edit"
            self.input_data["id"] = message_elt["id"]
            self.auto_resize_message_input()
            self.message_input.focus()

    async def on_action_retract(self, __, message_elt) -> None:
        confirmed = await dialog.Confirm(safe(
            "This message will be permanently removed. Are you sure?<br><br>"
            "WARNING: It is impossible to guarantee that other participants in the "
            "discussion will delete this message as well. You must assume it has been "
            "seen. If a password or other sensitive information has been accidentally "
            "shared, please ensure to take appropriate measures to change it and "
            "mitigate the risks."
        )).ashow()
        if confirmed:
            await bridge.message_retract(message_elt["id"])
        else:
            log.info(f"Retraction of message {message_elt['id']} cancelled by user.")

    def get_reaction_panel(self, source_elt):
        emoji_picker_elt = document.createElement("emoji-picker")
        message_elt = source_elt.closest("div.chat-message")
        emoji_picker_elt.bind(
            "emoji-click", lambda evt: self.on_reaction_click(evt, message_elt)
        )

        return emoji_picker_elt

    def add_message_event_listeners(self, parent_elt=None):
        """Prepare a message to be dynamic

        - make attachments dynamically clickable
        - make the extra button clickable
        """
        ## attachments
        # FIXME: only handle images for now, and display them in a modal
        if parent_elt is None:
            parent_elt = document
        img_elts = parent_elt.select(".message-attachment img")
        for img_elt in img_elts:
            img_elt.bind("click", self.open_modal)
            img_elt.style.cursor = "pointer"

        ## reply button
        for reply_btn in parent_elt.select(".reply-button"):
            reply_btn.bind("click", self.on_reply_btn_click)

        ## reaction button
        i = 0
        for reaction_btn in parent_elt.select(".reaction-button"):
            i+=1
            message_elt = reaction_btn.closest("div.message-core")
            tippy(
                reaction_btn,
                {
                    "trigger": "click",
                    "content": self.get_reaction_panel,
                    "appendTo": document.body,
                    "placement": "bottom",
                    "interactive": True,
                    "theme": "light",
                    "onShow": lambda __, message_elt=message_elt: (
                        message_elt.classList.add("has-popup-focus")
                    ),
                    "onHide": lambda __, message_elt=message_elt: (
                        message_elt.classList.remove("has-popup-focus")
                    ),
                }
            )

        ## extra button
        for extra_btn in parent_elt.select(".extra-button"):
            extra_btn.bind("click", self.on_extra_btn_click)

        ## editions
        for edition_icon_elt in parent_elt.select(".message-editions"):
            message_elt = edition_icon_elt.closest("div.chat-message")
            dataset = message_elt.dataset.to_dict()
            try:
                editions = json.loads(dataset["editions"])
            except (ValueError, KeyError):
                log.error(
                    f"Internal Error: invalid or missing editions data: {message_elt['id']}"
                )
            else:
                for edition in editions:
                    edition["text"] = (
                        edition["message"].get("")
                        or next(iter(edition["message"].values()), "")
                    )
                editions_elt = self.editions_tpl.get_elt({"editions": editions})
                tippy(
                    edition_icon_elt,
                    {
                        "content": editions_elt,
                        "theme": "light",
                        "appendTo": document.body
                    }

                )

        ## thread
        for thread_icon_elt in parent_elt.select(".message-thread"):
            message_elt = thread_icon_elt.closest("div.chat-message")
            thread_id = message_elt.dataset["thread"]
            thread_icon_elt.bind(
                "mouseenter",
                lambda __, thread_id=thread_id: setattr(self, "show_thread", thread_id)
            )
            thread_icon_elt.bind(
                "mouseleave",
                lambda __, thread_id=thread_id: setattr(self, "show_thread", None)
            )
            thread_icon_elt.bind(
                "click",
                lambda __, thread_id=thread_id: aio.run(self.on_show_thread(thread_id))
            )

    def add_reactions_listeners(self, parent_elt=None) -> None:
        """Add listener on reactions to handle details and reaction toggle"""
        if parent_elt is None:
            parent_elt = document

        is_touch = is_touch_device()

        for reaction_elt in parent_elt.select(".reaction"):
            # Reaction details
            dataset = reaction_elt.dataset.to_dict()
            reacting_jids = sorted(json.loads(dataset.get("jids", "[]")))
            reaction_details_elt = self.reactions_details_tpl.get_elt(
                {"reacting_jids": reacting_jids, "identities": identities}
            )

            # Configure tippy based on device type
            tippy_config = {
                "content": reaction_details_elt,
                "placement": "bottom",
                "theme": "light",
                "touch": ["hold", 500] if is_touch else True,
                "trigger": "click" if is_touch else "mouseenter focus",
                "delay": [0, 800] if is_touch else 0,
            }
            tippy(reaction_elt, tippy_config)

            # Toggle reaction when clicked/touched
            emoji_elt = reaction_elt.select_one(".emoji")
            emoji = emoji_elt.html.strip()
            message_elt = reaction_elt.closest("div.chat-message")
            msg_id = message_elt["id"]

            def toggle_reaction(event, msg_id=msg_id, emoji=emoji):
                # Prevent default if it's a touch device to distinguish from long press
                if is_touch:
                    event.preventDefault()
                aio.run(bridge.message_reactions_set(msg_id, [emoji], "toggle"))

            reaction_elt.bind("click", toggle_reaction)

    def find_links(self, message_elt):
        """Find all http and https links within the body of a message."""
        msg_body_elt = message_elt.select_one(".msg_body")
        if not msg_body_elt:
            return

        # Extracting links from text content
        text = msg_body_elt.text
        raw_urls = re.findall(
            r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F])|#)+",
            text,
        )

        # Extracting links from <a> elements
        a_elements = msg_body_elt.select("a")
        for a_elt in a_elements:
            href = a_elt.attrs.get("href", "")
            if href.startswith("http://") or href.startswith("https://"):
                raw_urls.append(href)

        # we remove duplicates
        urls = list(dict.fromkeys(raw_urls))

        return urls

    async def add_url_previews(self, url_previews_elt, urls) -> None:
        """Add URL previews to the .url-previews element of a message."""
        url_previews_elt.clear()
        for url in urls:
            try:
                url_preview_data_s = await bridge.url_preview_get(url, "")
            except Exception as e:
                log.warning(f"Couldn't get URL preview for {url}: {e}")
                continue

            if not url_preview_data_s:
                log.warning(f"No preview could be found for URL: {url}")
                continue

            url_preview_data = json.loads(url_preview_data_s)

            url_preview_elt = self.url_preview_tpl.get_elt(
                {"url_preview": url_preview_data}
            )
            url_previews_elt <= url_preview_elt

    def handle_url_previews(self, parent_elt=None):
        """Check if URL are presents in a message and show appropriate element

        According to settings, either a preview control panel will be shown to wait for
        user click, or directly the previews, or nothing at all.
        """

        if parent_elt is None:
            parent_elt = document
            chat_message_elts = parent_elt.select(".chat-message")
        else:
            chat_message_elts = [parent_elt]
        for message_elt in chat_message_elts:
            urls = self.find_links(message_elt)
            if urls:
                url_previews_elt = message_elt.select_one(".url-previews")
                url_previews_elt.classList.remove("is-hidden")
                preview_control_elt = self.url_preview_control_tpl.get_elt()
                fetch_preview_btn = preview_control_elt.select_one(
                    ".action_fetch_preview"
                )
                fetch_preview_btn.bind(
                    "click",
                    lambda __, previews_elt=url_previews_elt, preview_urls=urls: aio.run(
                        self.add_url_previews(previews_elt, preview_urls)
                    ),
                )
                url_previews_elt <= preview_control_elt

    def open_modal(self, evt):
        modal_image = document.select_one("#modal-image")
        modal_image.src = evt.currentTarget.src
        modal_image.alt = evt.currentTarget.alt
        modal = document.select_one("#modal")
        modal.classList.add("is-active")

    def close_modal(self, evt):
        modal = document.select_one("#modal")
        modal.classList.remove("is-active")

    def handle_visibility_change(self, evt):
        if (
            document.visibilityState == "hidden"
            and self.new_messages_marker_elt.parent is not None
        ):
            # if there is a new messages marker, we remove it
            self.new_messages_marker_elt.remove()


libervia_web_chat = LiberviaWebChat()

document["new_chat_btn"].bind(
    "click", lambda __: aio.run(libervia_web_chat.on_new_chat())
)

document["message_input"].bind(
    "input", lambda __: libervia_web_chat.auto_resize_message_input()
)
document["message_input"].bind("keydown", libervia_web_chat.on_message_keydown)
document["send_button"].bind(
    "click",
    lambda __: aio.run(libervia_web_chat.send_message())
)
document["attach-button"].bind("click", libervia_web_chat.on_attach_btn_click)
document["file-input"].bind("change", libervia_web_chat.on_file_selected)

document.bind("visibilitychange", libervia_web_chat.handle_visibility_change)

bridge.register_signal("message_new", libervia_web_chat._on_message_new)
bridge.register_signal("message_update", libervia_web_chat._on_message_update)
aio.run(libervia_web_chat.post_init())
remove_loading_screen()