view libervia/web/pages/_browser/jid_search.py @ 1577:9ba532041a8e

browser (chat): implement message reactions.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 16:31:36 +0100
parents 02432346e9b2
children f1d09a4d38dc
line wrap: on
line source

import json
from urllib.parse import quote, urljoin

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, window
from cache import cache
from template import Template
import jid

log.warning = log.warn
profile = window.profile or ""
bridge = Bridge()

class JidSearch:

    def __init__(
        self,
        search_elt,
        container_elt=None,
        filter_cb=None,
        empty_cb=None,
        get_url=None,
        click_cb=None,
        options: dict|None = None,
        submit_filter: bool = True,
        template: str = "components/search_item.html"
    ) -> None:
        """Initialize the JidSearch instance

        @param search_elt: The HTML <input> element for search
        @param container_elt: The HTML container to display the search results
        @param filter_cb: The callback to filter the search results
        @param empty_cb: The callback when the search box is empty
        @param get_url: The function to get URL for each entity in the search result
        @param click_cb: The function to handle the click event on each entity in the
            search result
        @param options: extra options. Key can be:
            no_group(bool)
                True if groups should not be visible
            extra_cb(dict)
                a map from CSS selector to callback, the callback will be binded to the
                "click" event, and will be called with the ``item`` as argument
        @param submit_filter: if True, only submit when a seemingly valid JID is entered
        @param template: template to use
        """
        self.search_item_tpl = Template(template)
        self.search_elt = search_elt
        self.search_elt.bind("input", self.on_search_input)
        if submit_filter:
            form_elt = self.search_elt.closest("form")
            form_elt.bind("submit", self.on_form_submit)
        self.last_query = None
        self.current_query = None
        self.container_elt = container_elt
        if options is None:
            options = {}
        self.options = options

        if click_cb is not None and get_url is None:
            self.get_url = lambda _: "#"
        else:
            self.get_url = get_url if get_url is not None else self.default_get_url
        self.click_cb = click_cb

        if filter_cb is None:
            if container_elt is None:
                raise ValueError("container_elt must be set if filter_cb is not set")
            filter_cb = self.show_items
        self.filter_cb = filter_cb

        self.empty_cb = empty_cb or self.on_empty_search

        current_search = search_elt.value.strip() or None
        if not current_search:
            self.empty_cb()
        else:
            aio.run(self.perform_search(current_search))

    def default_get_url(self, item):
        """Default method to get the URL for a given entity

        @param item: The item (entity) for which the URL is required
        """
        return urljoin(f"{window.location.href}/", quote(item["entity"]))

    def show_items(self, items):
        """Display the search items in the specified container

        @param items: The list of search items to be displayed
        """
        assert self.container_elt is not None
        self.container_elt.clear()
        for item in items:
            search_item_elt = self.search_item_tpl.get_elt({
                "url": self.get_url(item),
                "item": item,
                "identities": cache.identities,
                "options": self.options,
            })
            if self.click_cb is not None:
                search_item_elt.bind('click', lambda evt, item=item: self.click_cb(item))
            extra_cb = self.options.get("extra_cb")
            if extra_cb:
                for selector, cb in extra_cb.items():
                    for elt in search_item_elt.select(selector):
                        log.debug(f"binding {selector=} {elt=} {cb=} {id(cb)=}")
                        elt.bind(
                            "click",
                            lambda evt, item=item, cb=cb: cb(evt, item)
                        )
            self.container_elt <= search_item_elt

    def on_search_input(self, evt):
        """Handle the 'input' event for the search element

        @param evt: The event object
        """
        evt.stopPropagation()
        evt.preventDefault()
        search_text = evt.target.value.strip()
        if not search_text:
            self.empty_cb()
        elif len(search_text) > 2:
            aio.run(self.perform_search(search_text))

    def on_form_submit(self, evt):
        search_text = self.search_elt.value.strip()
        search_jid = jid.JID(search_text)
        if not search_jid.is_valid():
            evt.stopPropagation()
            evt.preventDefault()

    async def perform_search(self, query):
        """Perform the search operation for a given query

        @param query: The search query
        """
        if self.current_query is None:
            log.debug(f"performing search: {query=}")
            self.current_query = query
            jid_items = json.loads(await bridge.jid_search(query, ""))
            await cache.fill_identities(i["entity"] for i in jid_items)

            self.last_query = query
            self.current_query = None
            current_query = self.search_elt.value.strip()
            if current_query != query:
                await self.perform_search(current_query)
                return
            self.filter_cb(jid_items)

    def on_empty_search(self):
        """Handle the situation when the search box is empty"""
        assert self.container_elt is not None
        items = [
            {
                "entity": jid_,
                "groups": data["groups"]
            }
            for jid_, data in cache.roster.items()
        ]
        self.show_items(items)