Mercurial > libervia-web
view libervia/web/pages/chat/_browser/__init__.py @ 1600:0a4433a343a3
browser (calls): implement WebRTC file sharing:
- Send file through WebRTC when the new `file` button is used during a call.
- Show a confirmation dialog and download file sent by WebRTC.
rel 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 13:06:17 +0200 |
parents | 52098b5bab8d |
children | 19c83dd943df |
line wrap: on
line source
import json import re from bridge import AsyncBridge as Bridge from browser import DOMNode, aio, console as log, document, window from cache import cache, identities import dialog from file_uploader import FileUploader import jid from js_modules import emoji_picker_element from js_modules.tippy_js import tippy import popup from template import Template, safe from tools import is_touch_device log.warning = log.warn profile = window.profile or "" # JID used in the local chat (real JID for one2one, room JID otherwise) own_local_jid = jid.JID(window.own_local_jid) target_jid = jid.JID(window.target_jid) chat_type = window.chat_type bridge = Bridge() # Sensible value to consider that user is at the bottom SCROLL_SENSITIVITY = 200 INPUT_MODES = {"normal", "edit", "quote"} MODE_CLASS = "mode_{}" class LiberviaWebChat: def __init__(self): self._input_mode = "normal" self.input_data = {} self.message_tpl = Template("chat/message.html") self.extra_menu_tpl = Template("chat/extra_menu.html") self.reactions_tpl = Template("chat/reactions.html") self.reactions_details_tpl = Template("chat/reactions_details.html") self.url_preview_control_tpl = Template("components/url_preview_control.html") self.url_preview_tpl = Template("components/url_preview.html") self.new_messages_marker_elt = Template("chat/new_messages_marker.html").get_elt() self.editions_tpl = Template("chat/editions.html") self.messages_elt = document["messages"] # attachments self.file_uploader = FileUploader( "", "chat/attachment_preview.html", on_delete_cb=self.on_attachment_delete ) self.attachments_elt = document["attachments"] self.message_input = document["message_input"] close_button = document.select_one(".modal-close") close_button.bind("click", self.close_modal) # hide/show attachments MutationObserver = window.MutationObserver observer = MutationObserver.new(lambda *__: self.update_attachments_visibility()) observer.observe(self.attachments_elt, {"childList": True}) # we want the message scroll to be initially at the bottom self.messages_elt.scrollTop = self.messages_elt.scrollHeight # listeners/dynamic updates self.add_message_event_listeners() self.handle_url_previews() self.add_reactions_listeners() # input self.auto_resize_message_input() self.message_input.focus() @property def input_mode(self) -> str: return self._input_mode @input_mode.setter def input_mode(self, new_mode: str) -> None: if new_mode == self.input_mode: return if new_mode not in INPUT_MODES: raise ValueError(f"Invalid input mode: {new_mode!r}") target_elt = self.message_input target_elt.classList.remove(MODE_CLASS.format(self._input_mode)) self._input_mode = new_mode target_elt.classList.add(MODE_CLASS.format(new_mode)) self.input_data.clear() @property def is_at_bottom(self): return ( self.messages_elt.scrollHeight - self.messages_elt.scrollTop - self.messages_elt.clientHeight <= SCROLL_SENSITIVITY ) async def send_message(self): """Send message currently in input area The message and corresponding attachment will be sent """ message = self.message_input.value.rstrip() log.info(f"{message=}") # attachments attachments = [] for attachment_elt in self.attachments_elt.children: file_data = json.loads(attachment_elt.getAttribute("data-file")) attachments.append(file_data) if message or attachments: extra = {} if attachments: extra["attachments"] = attachments # now we send the message try: if self.input_mode == "edit": message_id = self.input_data["id"] edit_data = { "message": {"": message}, "extra": extra } await bridge.message_edit( message_id, json.dumps(edit_data, ensure_ascii=False) ) else: await bridge.message_send( str(target_jid), {"": message}, {}, "auto", json.dumps(extra, ensure_ascii=False) ) except Exception as e: dialog.notification.show(f"Can't send message: {e}", "error") else: self.message_input.value = "" self.attachments_elt.clear() self.input_mode = "normal" def _on_message_new( self, uid: str, timestamp: float, from_jid_s: str, to_jid_s: str, message: dict, subject: dict, mess_type: str, extra_s: str, profile: str, ) -> None: from_jid = jid.JID(from_jid_s) to_jid = jid.JID(to_jid_s) if ( from_jid.bare == window.target_jid or to_jid.bare == window.target_jid ): aio.run( self.on_message_new( uid, timestamp, from_jid, to_jid, message, subject, mess_type, json.loads(extra_s), profile, ) ) def _on_message_update( self, uid: str, type_: str, update_data_s: str, profile: str ) -> None: aio.run(self.on_message_update(uid, type_, update_data_s, profile)) async def on_message_update( self, uid: str, type_: str, update_data_s: str, profile: str ) -> None: update_data = json.loads(update_data_s) is_at_bottom = self.is_at_bottom if type_ == "REACTION": reactions = update_data["reactions"] log.debug(f"new reactions: {reactions}") try: reactions_wrapper_elt = document[f"msg_reactions_{uid}"] except KeyError: log.debug(f"Message {uid} not found, no reactions to update") else: log.debug(f"Message {uid} found, new reactions: {reactions}") reactions_elt = self.reactions_tpl.get_elt( { "chat_type": chat_type, "own_local_jid": str(own_local_jid), "reactions": reactions }) reactions_wrapper_elt.clear() reactions_wrapper_elt <= reactions_elt self.add_reactions_listeners(reactions_elt) elif type_ in ("EDIT", "RETRACT"): try: old_message_elt = document[uid] except KeyError: log.debug(f"Message {uid} not found, no {type_.lower()}ion to apply") else: template_data = await self.message_to_template_data( uid, update_data["timestamp"], jid.JID(update_data["from"]), jid.JID(update_data["to"]), update_data["message"], update_data["subject"], update_data["type"], update_data["extra"] ) new_message_elt = self.message_tpl.get_elt( template_data ) old_message_elt.replaceWith(new_message_elt) self.add_message_event_listeners(new_message_elt) self.handle_url_previews(new_message_elt) else: log.warning(f"Unsupported update type: {type_!r}") # If user was at the bottom, keep the scroll at the bottom if is_at_bottom: self.messages_elt.scrollTop = self.messages_elt.scrollHeight async def message_to_template_data( self, uid: str, timestamp: float, from_jid: jid.JID, to_jid: jid.JID, message_data: dict, subject_data: dict, mess_type: str, extra: dict, ) -> dict: """Generate template data to use with [message_tpl] @return: template data """ xhtml_data = extra.get("xhtml") if not xhtml_data: xhtml = None else: try: xhtml = xhtml_data[""] except KeyError: xhtml = next(iter(xhtml_data.values())) if chat_type == "group": await cache.fill_identities([str(from_jid)]) else: await cache.fill_identities([str(jid.JID(from_jid).bare)]) from_jid = from_jid.bare return { "own_local_jid": str(own_local_jid), "chat_type": chat_type, "msg": { "id": uid, "timestamp": extra.get("updated", timestamp), "type": mess_type, "from_": str(from_jid), "text": message_data.get("") or next(iter(message_data.values()), ""), "subject": subject_data.get("") or next(iter(subject_data.values()), ""), "type": mess_type, "thread": extra.get("thread"), "thread_parent": extra.get("thread_parent"), "reeceived": extra.get("received_timestamp") or timestamp, "delay_sender": extra.get("delay_sender"), "info_type": extra.get("info_type"), "html": safe(xhtml) if xhtml else None, "encrypted": extra.get("encrypted", False), "received": extra.get("received", False), "attachments": extra.get("attachments", []), "extra": extra }, "identities": identities, } async def on_message_new( self, uid: str, timestamp: float, from_jid: jid.JID, to_jid: jid.JID, message_data: dict, subject_data: dict, mess_type: str, extra: dict, profile: str, ) -> None: # FIXME: visibilityState doesn't detect OS events such as `Alt + Tab`, using focus # event may help to get those use cases, but it gives false positives. if ( document.visibilityState == "hidden" and self.new_messages_marker_elt.parent is None ): # the page is not visible, and we have no new messages marker yet, so we add # it self.messages_elt <= self.new_messages_marker_elt template_data = await self.message_to_template_data( uid, timestamp, from_jid, to_jid, message_data, subject_data, mess_type, extra ) message_elt = self.message_tpl.get_elt( template_data ) # Check if user is viewing older messages or is at the bottom is_at_bottom = self.is_at_bottom self.messages_elt <= message_elt self.add_message_event_listeners(message_elt) # we add preview in parallel on purpose, as they can be slow to get self.handle_url_previews(message_elt) # If user was at the bottom, keep the scroll at the bottom if is_at_bottom: self.messages_elt.scrollTop = self.messages_elt.scrollHeight def auto_resize_message_input(self): """Resize the message input field according to content.""" is_at_bottom = self.is_at_bottom # The textarea's height is first reset to 'auto' to ensure it's not influenced by # the previous content. self.message_input.style.height = "auto" # Then the height is set to the scrollHeight of the textarea (which is the height # of the content), plus the vertical border, resulting in a textarea that grows as # more lines of text are added. self.message_input.style.height = f"{self.message_input.scrollHeight + 2}px" if is_at_bottom: # we want the message are to still display the last message self.messages_elt.scrollTop = self.messages_elt.scrollHeight def on_message_keydown(self, evt): """Handle the 'keydown' event of the message input field @param evt: The event object. 'target' refers to the textarea element. """ if evt.keyCode == 13: # <Enter> key if not window.navigator.maxTouchPoints: # we have a non touch device, we send message on <Enter> if not evt.shiftKey: evt.preventDefault() # Prevents line break aio.run(self.send_message()) elif evt.keyCode == 27: # <ESC> key evt.preventDefault() self.message_input.value = '' self.input_mode = 'normal' self.auto_resize_message_input() elif evt.keyCode == 38: # <Up> arrow key if self.input_mode == "normal" and self.message_input.value.strip() == "": evt.preventDefault() own_msgs = document.getElementsByClassName('own_msg') if own_msgs.length > 0: last_msg = own_msgs[own_msgs.length - 1] aio.run(self.on_action_edit(None, last_msg)) def update_attachments_visibility(self): if len(self.attachments_elt.children): self.attachments_elt.classList.remove("is-contracted") else: self.attachments_elt.classList.add("is-contracted") def on_file_selected(self, evt): """Handle file selection""" log.info("file selected") files = evt.currentTarget.files self.file_uploader.upload_files(files, self.attachments_elt) self.message_input.focus() def on_attachment_delete(self, evt): evt.stopPropagation() target = evt.currentTarget item_elt = DOMNode(target.closest(".attachment-preview")) item_elt.remove() def on_attach_button_click(self, evt): document["file_input"].click() def on_extra_btn_click(self, evt): message_elt = evt.target.closest("div.is-chat-message") is_own = message_elt.classList.contains("own_msg") if is_own: own_messages = document.select('.own_msg') # with XMPP, we can currently only edit our last message can_edit = own_messages and message_elt is own_messages[-1] else: can_edit = False content_elt = self.extra_menu_tpl.get_elt({ "edit": can_edit, "retract": is_own, }) extra_popup = popup.create_popup(evt.target, content_elt, focus_elt=message_elt) def on_action_click(evt, callback): extra_popup.hide() aio.run( callback(evt, message_elt) ) for cls_name, callback in ( ("action_quote", self.on_action_quote), ("action_edit", self.on_action_edit), ("action_retract", self.on_action_retract), ): for elt in content_elt.select(f".{cls_name}"): elt.bind("click", lambda evt, callback=callback: on_action_click( evt, callback )) def on_reaction_click(self, evt, message_elt): window.evt = evt aio.run( bridge.message_reactions_set( message_elt["id"], [evt.detail["unicode"]], "toggle" ) ) if evt.deltaY != 0: document["attachments"].scrollLeft += evt.deltaY * 0.8 evt.preventDefault() async def get_message_tuple(self, message_elt) -> tuple|None: """Retrieve message tuple from as sent by [message_new] If not corresponding message data is found, an error will shown, and None is returned. @param message_elt: message element, it's "id" attribute will be use to retrieve message data @return: message data as a tuple, or None if not message with this ID is found. """ message_id = message_elt['id'] history_data = await bridge.history_get( "", "", -2, True, {"id": message_elt['id']} ) if not history_data: dialog.notification.show(f"Can't find message {message_id}", "error") return None return history_data[0] async def on_action_quote(self, __, message_elt) -> None: message_data = await self.get_message_tuple(message_elt) if message_data is not None: messages = message_data[4] body = next(iter(messages.values()), "") quote = "\n".join(f"> {l}" for l in body.split("\n")) self.message_input.value = f"{quote}\n{self.message_input.value}" self.input_mode = "quote" self.input_data["id"] = message_elt["id"] self.auto_resize_message_input() self.message_input.focus() async def on_action_edit(self, __, message_elt) -> None: message_data = await self.get_message_tuple(message_elt) if message_data is not None: messages = message_data[4] body = next(iter(messages.values()), "") if not body: dialog.notification.show("No content found in message, nothing to edit") return self.message_input.value = body self.input_mode = "edit" self.input_data["id"] = message_elt["id"] self.auto_resize_message_input() self.message_input.focus() async def on_action_retract(self, __, message_elt) -> None: confirmed = await dialog.Confirm(safe( "This message will be permanently removed. Are you sure?<br><br>" "WARNING: It is impossible to guarantee that other participants in the " "discussion will delete this message as well. You must assume it has been " "seen. If a password or other sensitive information has been accidentally " "shared, please ensure to take appropriate measures to change it and " "mitigate the risks." )).ashow() if confirmed: await bridge.message_retract(message_elt["id"]) else: log.info(f"Retraction of message {message_elt['id']} cancelled by user.") def get_reaction_panel(self, source_elt): emoji_picker_elt = document.createElement("emoji-picker") message_elt = source_elt.closest("div.is-chat-message") emoji_picker_elt.bind( "emoji-click", lambda evt: self.on_reaction_click(evt, message_elt) ) return emoji_picker_elt def add_message_event_listeners(self, parent_elt=None): """Prepare a message to be dynamic - make attachments dynamically clickable - make the extra button clickable """ ## attachments # FIXME: only handle images for now, and display them in a modal if parent_elt is None: parent_elt = document img_elts = parent_elt.select(".message-attachment img") for img_elt in img_elts: img_elt.bind("click", self.open_modal) img_elt.style.cursor = "pointer" ## reaction button for reaction_btn in parent_elt.select(".reaction-button"): message_elt = reaction_btn.closest("div.is-chat-message") tippy( reaction_btn, { "trigger": "click", "content": self.get_reaction_panel, "appendTo": document.body, "placement": "bottom", "interactive": True, "theme": "light", "onShow": lambda __, message_elt=message_elt: ( message_elt.classList.add("has-popup-focus") ), "onHide": lambda __, message_elt=message_elt: ( message_elt.classList.remove("has-popup-focus") ), }, ) ## extra button for extra_btn in parent_elt.select(".extra-button"): extra_btn.bind("click", self.on_extra_btn_click) ## editions for edition_icon_elt in parent_elt.select(".message-editions"): message_elt = edition_icon_elt.closest("div.is-chat-message") dataset = message_elt.dataset.to_dict() try: editions = json.loads(dataset["editions"]) except (ValueError, KeyError): log.error( f"Internal Error: invalid or missing editions data: {message_elt['id']}" ) else: for edition in editions: edition["text"] = ( edition["message"].get("") or next(iter(edition["message"].values()), "") ) editions_elt = self.editions_tpl.get_elt({"editions": editions}) tippy( edition_icon_elt, { "content": editions_elt, "theme": "light", "appendTo": document.body } ) def add_reactions_listeners(self, parent_elt=None) -> None: """Add listener on reactions to handle details and reaction toggle""" if parent_elt is None: parent_elt = document is_touch = is_touch_device() for reaction_elt in parent_elt.select(".reaction"): # Reaction details dataset = reaction_elt.dataset.to_dict() reacting_jids = sorted(json.loads(dataset.get("jids", "[]"))) reaction_details_elt = self.reactions_details_tpl.get_elt( {"reacting_jids": reacting_jids, "identities": identities} ) # Configure tippy based on device type tippy_config = { "content": reaction_details_elt, "placement": "bottom", "theme": "light", "touch": ["hold", 500] if is_touch else True, "trigger": "click" if is_touch else "mouseenter focus", "delay": [0, 800] if is_touch else 0, } tippy(reaction_elt, tippy_config) # Toggle reaction when clicked/touched emoji_elt = reaction_elt.select_one(".emoji") emoji = emoji_elt.html.strip() message_elt = reaction_elt.closest("div.is-chat-message") msg_id = message_elt["id"] def toggle_reaction(event, msg_id=msg_id, emoji=emoji): # Prevent default if it's a touch device to distinguish from long press if is_touch: event.preventDefault() aio.run(bridge.message_reactions_set(msg_id, [emoji], "toggle")) reaction_elt.bind("click", toggle_reaction) def find_links(self, message_elt): """Find all http and https links within the body of a message.""" msg_body_elt = message_elt.select_one(".msg_body") if not msg_body_elt: return # Extracting links from text content text = msg_body_elt.text raw_urls = re.findall( r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F])|#)+", text, ) # Extracting links from <a> elements a_elements = msg_body_elt.select("a") for a_elt in a_elements: href = a_elt.attrs.get("href", "") if href.startswith("http://") or href.startswith("https://"): raw_urls.append(href) # we remove duplicates urls = list(dict.fromkeys(raw_urls)) return urls async def add_url_previews(self, url_previews_elt, urls) -> None: """Add URL previews to the .url-previews element of a message.""" url_previews_elt.clear() for url in urls: try: url_preview_data_s = await bridge.url_preview_get(url, "") except Exception as e: log.warning(f"Couldn't get URL preview for {url}: {e}") continue if not url_preview_data_s: log.warning(f"No preview could be found for URL: {url}") continue url_preview_data = json.loads(url_preview_data_s) url_preview_elt = self.url_preview_tpl.get_elt( {"url_preview": url_preview_data} ) url_previews_elt <= url_preview_elt def handle_url_previews(self, parent_elt=None): """Check if URL are presents in a message and show appropriate element According to settings, either a preview control panel will be shown to wait for user click, or directly the previews, or nothing at all. """ if parent_elt is None: parent_elt = document chat_message_elts = parent_elt.select(".is-chat-message") else: chat_message_elts = [parent_elt] for message_elt in chat_message_elts: urls = self.find_links(message_elt) if urls: url_previews_elt = message_elt.select_one(".url-previews") url_previews_elt.classList.remove("is-hidden") preview_control_elt = self.url_preview_control_tpl.get_elt() fetch_preview_btn = preview_control_elt.select_one( ".action_fetch_preview" ) fetch_preview_btn.bind( "click", lambda __, previews_elt=url_previews_elt, preview_urls=urls: aio.run( self.add_url_previews(previews_elt, preview_urls) ), ) url_previews_elt <= preview_control_elt def open_modal(self, evt): modal_image = document.select_one("#modal-image") modal_image.src = evt.target.src modal_image.alt = evt.target.alt modal = document.select_one("#modal") modal.classList.add("is-active") def close_modal(self, evt): modal = document.select_one("#modal") modal.classList.remove("is-active") def handle_visibility_change(self, evt): if ( document.visibilityState == "hidden" and self.new_messages_marker_elt.parent is not None ): # if there is a new messages marker, we remove it self.new_messages_marker_elt.remove() libervia_web_chat = LiberviaWebChat() document["message_input"].bind( "input", lambda __: libervia_web_chat.auto_resize_message_input() ) document["message_input"].bind("keydown", libervia_web_chat.on_message_keydown) document["send_button"].bind( "click", lambda __: aio.run(libervia_web_chat.send_message()) ) document["attach_button"].bind("click", libervia_web_chat.on_attach_button_click) document["file_input"].bind("change", libervia_web_chat.on_file_selected) document.bind("visibilitychange", libervia_web_chat.handle_visibility_change) bridge.register_signal("message_new", libervia_web_chat._on_message_new) bridge.register_signal("message_update", libervia_web_chat._on_message_update)