Mercurial > libervia-web
view libervia/web/pages/_browser/jid_search.py @ 1592:291a7026cb2b
server: handle new registration link feature, following backend implementation
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 10 Dec 2023 18:33:00 +0100 |
parents | f1d09a4d38dc |
children | c6976c5b85a1 |
line wrap: on
line source
import json from urllib.parse import quote, urljoin from bridge import AsyncBridge as Bridge from browser import aio, console as log, window from cache import cache from template import Template import jid log.warning = log.warn profile = window.profile or "" bridge = Bridge() class JidSearch: def __init__( self, search_elt, container_elt=None, filter_cb=None, empty_cb=None, get_url=None, click_cb=None, options: dict|None = None, submit_filter: bool = True, template: str = "components/search_item.html" ) -> None: """Initialize the JidSearch instance @param search_elt: The HTML <input> element for search @param container_elt: The HTML container to display the search results @param filter_cb: The callback to filter the search results @param empty_cb: The callback when the search box is empty @param get_url: The function to get URL for each entity in the search result @param click_cb: The function to handle the click event on each entity in the search result @param options: extra options. Key can be: no_group(bool) True if groups should not be visible extra_cb(dict) a map from CSS selector to callback, the callback will be binded to the "click" event, and will be called with the ``item`` as argument @param submit_filter: if True, only submit when a seemingly valid JID is entered @param template: template to use """ self.search_item_tpl = Template(template) self.search_elt = search_elt self.search_elt.bind("input", self.on_search_input) if submit_filter: try: form_elt = self.search_elt.closest("form") except KeyError: log.debug("No parent <form> found, can't apply submit filter.") else: form_elt.bind("submit", self.on_form_submit) self.last_query = None self.current_query = None self.container_elt = container_elt if options is None: options = {} self.options = options if click_cb is not None and get_url is None: self.get_url = lambda _: "#" else: self.get_url = get_url if get_url is not None else self.default_get_url self.click_cb = click_cb if filter_cb is None: if container_elt is None: raise ValueError("container_elt must be set if filter_cb is not set") filter_cb = self.show_items self.filter_cb = filter_cb self.empty_cb = empty_cb or self.on_empty_search current_search = search_elt.value.strip() or None if not current_search: self.empty_cb() else: aio.run(self.perform_search(current_search)) def default_get_url(self, item): """Default method to get the URL for a given entity @param item: The item (entity) for which the URL is required """ return urljoin(f"{window.location.href}/", quote(item["entity"])) def show_items(self, items): """Display the search items in the specified container @param items: The list of search items to be displayed """ assert self.container_elt is not None self.container_elt.clear() for item in items: search_item_elt = self.search_item_tpl.get_elt({ "url": self.get_url(item), "item": item, "identities": cache.identities, "options": self.options, }) if self.click_cb is not None: search_item_elt.bind('click', lambda evt, item=item: self.click_cb(item)) extra_cb = self.options.get("extra_cb") if extra_cb: for selector, cb in extra_cb.items(): for elt in search_item_elt.select(selector): log.debug(f"binding {selector=} {elt=} {cb=} {id(cb)=}") elt.bind( "click", lambda evt, item=item, cb=cb: cb(evt, item) ) self.container_elt <= search_item_elt def on_search_input(self, evt): """Handle the 'input' event for the search element @param evt: The event object """ evt.stopPropagation() evt.preventDefault() search_text = evt.target.value.strip() if not search_text: self.empty_cb() elif len(search_text) > 2: aio.run(self.perform_search(search_text)) def on_form_submit(self, evt): search_text = self.search_elt.value.strip() search_jid = jid.JID(search_text) if not search_jid.is_valid(): evt.stopPropagation() evt.preventDefault() async def perform_search(self, query): """Perform the search operation for a given query @param query: The search query """ if self.current_query is None: log.debug(f"performing search: {query=}") self.current_query = query jid_items = json.loads(await bridge.jid_search(query, "")) await cache.fill_identities(i["entity"] for i in jid_items) self.last_query = query self.current_query = None current_query = self.search_elt.value.strip() if current_query != query: await self.perform_search(current_query) return self.filter_cb(jid_items) def on_empty_search(self): """Handle the situation when the search box is empty""" assert self.container_elt is not None items = [ { "entity": jid_, "groups": data["groups"] } for jid_, data in cache.roster.items() ] self.show_items(items)