view libervia/web/pages/chat/_browser/__init__.py @ 1577:9ba532041a8e

browser (chat): implement message reactions.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 16:31:36 +0100
parents fb31d3dba0c3
children eab815e48795
line wrap: on
line source

import json
import re

from bridge import AsyncBridge as Bridge
from browser import DOMNode, aio, bind, console as log, document, window
from cache import cache, identities
import dialog
from file_uploader import FileUploader
import jid
from js_modules import emoji_picker_element
from js_modules.tippy_js import tippy
from template import Template, safe
from tools import is_touch_device

log.warning = log.warn
profile = window.profile or ""
own_jid = jid.JID(window.own_jid)
target_jid = jid.JID(window.target_jid)
chat_type = window.chat_type
bridge = Bridge()

# Sensible value to consider that user is at the bottom
SCROLL_SENSITIVITY = 200


class LiberviaWebChat:
    def __init__(self):
        self.message_tpl = Template("chat/message.html")
        self.reactions_tpl = Template("chat/reactions.html")
        self.reactions_details_tpl = Template("chat/reactions_details.html")
        self.url_preview_control_tpl = Template("components/url_preview_control.html")
        self.url_preview_tpl = Template("components/url_preview.html")
        self.new_messages_marker_elt = Template("chat/new_messages_marker.html").get_elt()

        self.messages_elt = document["messages"]

        # attachments
        self.file_uploader = FileUploader(
            "", "chat/attachment_preview.html", on_delete_cb=self.on_attachment_delete
        )
        self.attachments_elt = document["attachments"]
        self.message_input = document["message_input"]

        close_button = document.select_one(".modal-close")
        close_button.bind("click", self.close_modal)

        # hide/show attachments
        MutationObserver = window.MutationObserver
        observer = MutationObserver.new(lambda *__: self.update_attachments_visibility())
        observer.observe(self.attachments_elt, {"childList": True})

        # we want the message scroll to be initially at the bottom
        self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    @property
    def is_at_bottom(self):
        return (
            self.messages_elt.scrollHeight
            - self.messages_elt.scrollTop
            - self.messages_elt.clientHeight
            <= SCROLL_SENSITIVITY
        )

    def send_message(self):
        """Send message currently in input area

        The message and corresponding attachment will be sent
        """
        message = self.message_input.value.rstrip()
        log.info(f"{message=}")

        # attachments
        attachments = []
        for attachment_elt in self.attachments_elt.children:
            file_data = json.loads(attachment_elt.getAttribute("data-file"))
            attachments.append(file_data)

        if message or attachments:
            extra = {}

            if attachments:
                extra["attachments"] = attachments

            # now we send the message
            try:
                aio.run(
                    bridge.message_send(
                        str(target_jid), {"": message}, {}, "auto", json.dumps(extra, ensure_ascii=False)
                    )
                )
            except Exception as e:
                dialog.notification.show(f"Can't send message: {e}", "error")
            else:
                self.message_input.value = ""
                self.attachments_elt.clear()

    def _on_message_new(
        self,
        uid: str,
        timestamp: float,
        from_jid_s: str,
        to_jid_s: str,
        message: dict,
        subject: dict,
        mess_type: str,
        extra_s: str,
        profile: str,
    ) -> None:
        from_jid = jid.JID(from_jid_s)
        to_jid = jid.JID(to_jid_s)
        if (
            from_jid.bare == window.target_jid
            or to_jid.bare == window.target_jid
        ):
            aio.run(
                self.on_message_new(
                    uid,
                    timestamp,
                    from_jid,
                    to_jid,
                    message,
                    subject,
                    mess_type,
                    json.loads(extra_s),
                    profile,
                )
            )

    def _on_message_update(
        self, uid: str, type_: str, update_data_s: str, profile: str
    ) -> None:
        aio.run(self.on_message_update(uid, type_, update_data_s, profile))

    async def on_message_update(
        self, uid: str, type_: str, update_data_s: str, profile: str
    ) -> None:
        update_data = json.loads(update_data_s)
        is_at_bottom = self.is_at_bottom
        if type_ == "REACTION":
            reactions = update_data["reactions"]
            log.debug(f"new reactions: {reactions}")
            try:
                reactions_wrapper_elt = document[f"msg_reactions_{uid}"]
            except KeyError:
                log.debug(f"Message {uid} not found, no reactions to update")
            else:
                log.debug(f"Message {uid} found, new reactions: {reactions}")
                reactions_elt = self.reactions_tpl.get_elt({"reactions": reactions})
                reactions_wrapper_elt.clear()
                reactions_wrapper_elt <= reactions_elt
                self.add_reactions_listeners(reactions_elt)
        else:
            log.warning(f"Unsupported update type: {type_!r}")

        # If user was at the bottom, keep the scroll at the bottom
        if is_at_bottom:
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    async def on_message_new(
        self,
        uid: str,
        timestamp: float,
        from_jid: jid.JID,
        to_jid: jid.JID,
        message_data: dict,
        subject_data: dict,
        mess_type: str,
        extra: dict,
        profile: str,
    ) -> None:
        # FIXME: visibilityState doesn't detect OS events such as `Alt + Tab`, using focus
        #   event may help to get those use cases, but it gives false positives.
        if (
            document.visibilityState == "hidden"
            and self.new_messages_marker_elt.parent is None
        ):
            # the page is not visible, and we have no new messages marker yet, so we add
            # it
            self.messages_elt <= self.new_messages_marker_elt
        xhtml_data = extra.get("xhtml")
        if not xhtml_data:
            xhtml = None
        else:
            try:
                xhtml = xhtml_data[""]
            except KeyError:
                xhtml = next(iter(xhtml_data.values()))

        if chat_type == "group":
            await cache.fill_identities([str(from_jid)])
        else:
            await cache.fill_identities([str(jid.JID(from_jid).bare)])

        msg_data = {
            "id": uid,
            "timestamp": timestamp,
            "type": mess_type,
            "from_": str(from_jid),
            "text": message_data.get("") or next(iter(message_data.values()), ""),
            "subject": subject_data.get("") or next(iter(subject_data.values()), ""),
            "type": mess_type,
            "thread": extra.get("thread"),
            "thread_parent": extra.get("thread_parent"),
            "reeceived": extra.get("received_timestamp") or timestamp,
            "delay_sender": extra.get("delay_sender"),
            "info_type": extra.get("info_type"),
            "html": safe(xhtml) if xhtml else None,
            "encrypted": extra.get("encrypted", False),
            "received": extra.get("received", False),
            "edited": extra.get("edited", False),
            "attachments": extra.get("attachments", []),
        }
        message_elt = self.message_tpl.get_elt(
            {
                "own_jid": own_jid,
                "msg": msg_data,
                "identities": identities,
            }
        )

        # Check if user is viewing older messages or is at the bottom
        is_at_bottom = self.is_at_bottom

        self.messages_elt <= message_elt
        self.add_message_event_listeners(message_elt)
        # we add preview in parallel on purpose, as they can be slow to get
        self.handle_url_previews(message_elt)

        # If user was at the bottom, keep the scroll at the bottom
        if is_at_bottom:
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    def auto_resize_message_input(self):
        """Resize the message input field according to content."""

        is_at_bottom = self.is_at_bottom

        # The textarea's height is first reset to 'auto' to ensure it's not influenced by
        # the previous content.
        self.message_input.style.height = "auto"

        # Then the height is set to the scrollHeight of the textarea (which is the height
        # of the content), plus the vertical border, resulting in a textarea that grows as
        # more lines of text are added.
        self.message_input.style.height = f"{self.message_input.scrollHeight + 2}px"

        if is_at_bottom:
            # we want the message are to still display the last message
            self.messages_elt.scrollTop = self.messages_elt.scrollHeight

    def on_message_keydown(self, evt):
        """Handle the 'keydown' event of the message input field

        @param evt: The event object. 'target' refers to the textarea element.
        """
        if evt.keyCode == 13:  # <Enter> key
            if not window.navigator.maxTouchPoints:
                # we have a non touch device, we send message on <Enter>
                if not evt.shiftKey:
                    evt.preventDefault()  # Prevents line break
                    self.send_message()

    def update_attachments_visibility(self):
        if len(self.attachments_elt.children):
            self.attachments_elt.classList.remove("is-contracted")
        else:
            self.attachments_elt.classList.add("is-contracted")

    def on_file_selected(self, evt):
        """Handle file selection"""
        log.info("file selected")
        files = evt.currentTarget.files
        self.file_uploader.upload_files(files, self.attachments_elt)
        self.message_input.focus()

    def on_attachment_delete(self, evt):
        evt.stopPropagation()
        target = evt.currentTarget
        item_elt = DOMNode(target.closest(".attachment-preview"))
        item_elt.remove()

    def on_attach_button_click(self, evt):
        document["file_input"].click()

    def on_extra_btn_click(self, evt):
        print("extra bouton clicked!")

    def on_reaction_click(self, evt, message_elt):
        window.evt = evt
        aio.run(
            bridge.message_reactions_set(
                message_elt["id"], [evt.detail["unicode"]], "toggle"
            )
        )

    @bind(document["attachments"], "wheel")
    def wheel_event(evt):
        """Make the mouse wheel to act on horizontal scrolling for attachments

        Attachments don't have vertical scrolling, thus is makes sense to use the wheel
        for horizontal scrolling
        """
        if evt.deltaY != 0:
            document["attachments"].scrollLeft += evt.deltaY * 0.8
            evt.preventDefault()

    def get_reaction_panel(self, source_elt):
        emoji_picker_elt = document.createElement("emoji-picker")
        message_elt = source_elt.closest("div.is-chat-message")
        emoji_picker_elt.bind(
            "emoji-click", lambda evt: self.on_reaction_click(evt, message_elt)
        )

        return emoji_picker_elt

    def add_message_event_listeners(self, parent_elt=None):
        """Prepare a message to be dynamic

        - make attachments dynamically clickable
        - make the extra button clickable
        """

        ## attachments
        # FIXME: only handle images for now, and display them in a modal
        if parent_elt is None:
            parent_elt = document
        img_elts = parent_elt.select(".message-attachment img")
        for img_elt in img_elts:
            img_elt.bind("click", self.open_modal)
            img_elt.style.cursor = "pointer"

        ## reaction button
        for reaction_btn in parent_elt.select(".reaction-button"):
            message_elt = reaction_btn.closest("div.is-chat-message")
            tippy(
                reaction_btn,
                {
                    "trigger": "click",
                    "content": self.get_reaction_panel,
                    "appendTo": document.body,
                    "placement": "bottom",
                    "interactive": True,
                    "theme": "light",
                    "onShow": lambda __, message_elt=message_elt: (
                        message_elt.classList.add("chat-message-highlight")
                    ),
                    "onHide": lambda __, message_elt=message_elt: (
                        message_elt.classList.remove("chat-message-highlight")
                    ),
                },
            )

        ## extra button
        for extra_btn in parent_elt.select(".extra-button"):
            extra_btn.bind("click", self.on_extra_btn_click)

    def add_reactions_listeners(self, parent_elt=None) -> None:
        """Add listener on reactions to handle details and reaction toggle"""
        if parent_elt is None:
            parent_elt = document

        is_touch = is_touch_device()

        for reaction_elt in parent_elt.select(".reaction"):
            # Reaction details
            dataset = reaction_elt.dataset.to_dict()
            reacting_jids = sorted(json.loads(dataset.get("jids", "[]")))
            reaction_details_elt = self.reactions_details_tpl.get_elt(
                {"reacting_jids": reacting_jids, "identities": identities}
            )

            # Configure tippy based on device type
            tippy_config = {
                "content": reaction_details_elt,
                "placement": "bottom",
                "theme": "light",
                "touch": ["hold", 500] if is_touch else True,
                "trigger": "click" if is_touch else "mouseenter focus",
                "delay": [0, 800] if is_touch else 0,
            }
            tippy(reaction_elt, tippy_config)

            # Toggle reaction when clicked/touched
            emoji_elt = reaction_elt.select_one(".emoji")
            emoji = emoji_elt.html.strip()
            message_elt = reaction_elt.closest("div.is-chat-message")
            msg_id = message_elt["id"]

            def toggle_reaction(event, msg_id=msg_id, emoji=emoji):
                # Prevent default if it's a touch device to distinguish from long press
                if is_touch:
                    event.preventDefault()
                aio.run(bridge.message_reactions_set(msg_id, [emoji], "toggle"))

            reaction_elt.bind("click", toggle_reaction)

    def find_links(self, message_elt):
        """Find all http and https links within the body of a message."""
        msg_body_elt = message_elt.select_one(".msg_body")
        if not msg_body_elt:
            return

        # Extracting links from text content
        text = msg_body_elt.text
        raw_urls = re.findall(
            r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F])|#)+",
            text,
        )

        # Extracting links from <a> elements
        a_elements = msg_body_elt.select("a")
        for a_elt in a_elements:
            href = a_elt.attrs.get("href", "")
            if href.startswith("http://") or href.startswith("https://"):
                raw_urls.append(href)

        # we remove duplicates
        urls = list(dict.fromkeys(raw_urls))

        return urls

    async def add_url_previews(self, url_previews_elt, urls) -> None:
        """Add URL previews to the .url-previews element of a message."""
        url_previews_elt.clear()
        for url in urls:
            try:
                url_preview_data_s = await bridge.url_preview_get(url, "")
            except Exception as e:
                log.warning(f"Couldn't get URL preview for {url}: {e}")
                continue

            if not url_preview_data_s:
                log.warning(f"No preview could be found for URL: {url}")
                continue

            url_preview_data = json.loads(url_preview_data_s)

            url_preview_elt = self.url_preview_tpl.get_elt(
                {"url_preview": url_preview_data}
            )
            url_previews_elt <= url_preview_elt

    def handle_url_previews(self, parent_elt=None):
        """Check if URL are presents in a message and show appropriate element

        According to settings, either a preview control panel will be shown to wait for
        user click, or directly the previews, or nothing at all.
        """

        if parent_elt is None:
            parent_elt = document
            chat_message_elts = parent_elt.select(".is-chat-message")
        else:
            chat_message_elts = [parent_elt]
        for message_elt in chat_message_elts:
            urls = self.find_links(message_elt)
            if urls:
                url_previews_elt = message_elt.select_one(".url-previews")
                url_previews_elt.classList.remove("is-hidden")
                preview_control_elt = self.url_preview_control_tpl.get_elt()
                fetch_preview_btn = preview_control_elt.select_one(
                    ".action_fetch_preview"
                )
                fetch_preview_btn.bind(
                    "click",
                    lambda __, previews_elt=url_previews_elt, preview_urls=urls: aio.run(
                        self.add_url_previews(previews_elt, preview_urls)
                    ),
                )
                url_previews_elt <= preview_control_elt

    def open_modal(self, evt):
        modal_image = document.select_one("#modal-image")
        modal_image.src = evt.target.src
        modal_image.alt = evt.target.alt
        modal = document.select_one("#modal")
        modal.classList.add("is-active")

    def close_modal(self, evt):
        modal = document.select_one("#modal")
        modal.classList.remove("is-active")

    def handle_visibility_change(self, evt):
        if (
            document.visibilityState == "hidden"
            and self.new_messages_marker_elt.parent is not None
        ):
            # if there is a new messages marker, we remove it
            self.new_messages_marker_elt.remove()


libervia_web_chat = LiberviaWebChat()

document["message_input"].bind(
    "input", lambda __: libervia_web_chat.auto_resize_message_input()
)
document["message_input"].bind("keydown", libervia_web_chat.on_message_keydown)
document["send_button"].bind("click", lambda __: libervia_web_chat.send_message())
document["attach_button"].bind("click", libervia_web_chat.on_attach_button_click)
document["file_input"].bind("change", libervia_web_chat.on_file_selected)

document.bind("visibilitychange", libervia_web_chat.handle_visibility_change)

bridge.register_signal("message_new", libervia_web_chat._on_message_new)
bridge.register_signal("message_update", libervia_web_chat._on_message_update)

libervia_web_chat.add_message_event_listeners()
libervia_web_chat.handle_url_previews()
libervia_web_chat.add_reactions_listeners()