Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0433.py @ 4371:ed683d56b64c default tip
test (XEP-0461): some tests for XEP-0461:
rel 457
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 May 2025 00:34:01 +0200 |
parents | 5ea4f5f28082 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Extended Channel Search (XEP-0433) # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import difflib from typing import Any, Final, Iterator, Self, cast from pydantic import BaseModel, Field, ConfigDict, RootModel, model_validator from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from wokkel import data_form from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.models.types import JIDType from libervia.backend.plugins import plugin_misc_jid_search from libervia.backend.plugins.plugin_xep_0059 import RSMRequest log = getLogger(__name__) # Namespaces NS_CHANNEL_SEARCH: Final[str] = "urn:xmpp:channel-search:0" NS_SEARCH: Final[str] = f"{NS_CHANNEL_SEARCH}:search" NS_SEARCH_PARAMS: Final[str] = f"{NS_CHANNEL_SEARCH}:search-params" NS_ORDER: Final[str] = f"{NS_CHANNEL_SEARCH}:order" NS_ERROR: Final[str] = f"{NS_CHANNEL_SEARCH}:error" # Common sort keys SORT_ADDRESS: Final[str] = f"{{{NS_ORDER}}}address" SORT_NUSERS: Final[str] = f"{{{NS_ORDER}}}nusers" PLUGIN_INFO = { C.PI_NAME: "Extended Channel Search", C.PI_IMPORT_NAME: "XEP-0433", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_DEPENDENCIES: ["XEP-0059", "JID_SEARCH"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0433", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("Cross-domain search for public group chats"), } class SearchRequest(BaseModel): """Parameters for channel search request.""" model_config = ConfigDict(extra="forbid") query: str | None = Field(None, alias="q") all: bool = False sinname: bool = True sindescription: bool = True sinaddr: bool = True min_users: int | None = Field(default=None, ge=0) types: list[str] = [] key: str = SORT_ADDRESS rsm: RSMRequest | None = None @model_validator(mode="after") def check_conflicts(self) -> Self: if self.all and self.query: raise ValueError('Cannot combine "all" with search query') return self @classmethod def from_element(cls, element: domish.Element) -> Self: """Parse from XMPP data form element.""" form = data_form.Form.fromElement(element) if form.formNamespace != NS_SEARCH_PARAMS: raise ValueError("Invalid FORM_TYPE") kwargs = {} if "q" in form: kwargs["query"] = form["q"] if "all" in form: kwargs["all"] = form["all"] if "min_users" in form: try: kwargs["min_users"] = int(form["min_users"]) except ValueError: raise ValueError("Invalid min_users value") for field in ["sinname", "sindescription", "sinaddr", "types", "key"]: if field in form: kwargs[field] = form[field] return cls(**kwargs) def to_form(self) -> data_form.Form: """Convert to "submit" data form""" form = data_form.Form("submit", formNamespace=NS_SEARCH_PARAMS) # Add fields with original XML field names if self.query is not None: form.addField(data_form.Field(var="q", value=self.query)) if self.all: form.addField(data_form.Field("boolean", "all", value=True)) if not self.sinname: form.addField(data_form.Field("boolean", "sinname", value=False)) if not self.sindescription: form.addField(data_form.Field("boolean", "sindescription", value=False)) if not self.sinaddr: form.addField(data_form.Field("boolean", "sinaddr", value=False)) if self.min_users is not None: form.addField(data_form.Field(var="min_users", value=str(self.min_users))) if self.types: form.addField(data_form.Field("list-multi", "types", values=self.types)) if self.key != SORT_ADDRESS: form.addField(data_form.Field("list-single", "key", value=self.key)) return form def to_element(self) -> domish.Element: """Convert to XMPP data form submission.""" form = self.to_form() search_elt = domish.Element((NS_SEARCH, "search")) search_elt.addChild(form.toElement()) if self.rsm is not None: search_elt.addChild(self.rsm.to_element()) return search_elt class SearchItem(BaseModel): """Represents a single channel search result.""" address: JIDType name: str | None = None description: str | None = None language: str | None = None nusers: int | None = Field(default=None, ge=0) service_type: str | None = None is_open: bool | None = None anonymity_mode: str | None = None @classmethod def from_element(cls, element: domish.Element) -> Self: """Parse from <item> element.""" if not (element.name == "item" and element.uri == NS_SEARCH): raise ValueError("Invalid channel item element") address = element.getAttribute("address") if not address: raise ValueError("Missing required address attribute") data: dict[str, Any] = {"address": jid.JID(address)} for child in element.elements(): if child.uri != NS_SEARCH: continue content = str(child) match (name := child.name.replace("-", "_")): case "nusers": data[name] = int(content) case "is_open": data[name] = content.lower() == "true" case "service_type" | "anonymity_mode" if content: data[name] = content case _: data[name] = content return cls(**data) def to_element(self) -> domish.Element: """Convert to <item> element.""" item = domish.Element((NS_SEARCH, "item")) item["address"] = str(self.address) field_mappings = { "name": "name", "description": "description", "language": "language", "nusers": "nusers", "service_type": "service-type", "anonymity_mode": "anonymity-mode", } for field, element_name in field_mappings.items(): value = getattr(self, field) if value is not None: elem = item.addElement(element_name, NS_SEARCH) elem.addContent( str(value).lower() if isinstance(value, bool) else str(value) ) if self.is_open is not None: item.addElement( ("is-open", NS_SEARCH), content="true" if self.is_open else "false" ) return item class SearchItems(RootModel): root: list[SearchItem] def __iter__(self) -> Iterator[SearchItem]: # type: ignore return iter(self.root) def __getitem__(self, item) -> str: return self.root[item] def __len__(self) -> int: return len(self.root) def append(self, item: SearchItem) -> None: self.root.append(item) def sort(self, key=None, reverse=False) -> None: self.root.sort(key=key, reverse=reverse) # type: ignore @classmethod def from_element(cls, element: domish.Element) -> Self: if element.name == "result" and element.uri == NS_SEARCH: result_elt = element else: try: result_elt = next(element.elements(NS_SEARCH, "result")) except StopIteration: raise exceptions.NotFound("No <result> element found.") items = [] for item_elt in result_elt.elements(NS_SEARCH, "item"): items.append(SearchItem.from_element(item_elt)) return cls(items) def to_element(self) -> domish.Element: result_elt = domish.Element((NS_SEARCH, "result")) for search_item in self.root: result_elt.addChild(search_item.to_element()) return result_elt class XEP_0433: """Implementation of XEP-0433 Extended Channel Search.""" namespace: Final[str] = NS_CHANNEL_SEARCH def __init__(self, host: Any): log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") self.host = host host.trigger.add( "JID_SEARCH_perform_search", self.jid_search_perform_search_trigger ) self.allow_external = C.bool( host.memory.config_get(None, "allow_external_search", C.BOOL_FALSE) ) self.group_chat_search_default_jid = jid.JID( host.memory.config_get( None, "group_chat_search_default_jid", "api@search.jabber.network" ) ) host.bridge.add_method( "extended_search_request", ".plugin", in_sign="sss", out_sign="s", method=self._search, async_=True, ) async def jid_search_perform_search_trigger( self, client: SatXMPPEntity, search_term: str, options: plugin_misc_jid_search.Options, sequence_matcher: difflib.SequenceMatcher, matches: plugin_misc_jid_search.SearchItems, ) -> bool: if options.groupchat and self.allow_external: log.debug(f"Search {search_term!r} at {self.group_chat_search_default_jid}.") try: external_items = await self.search( client, self.group_chat_search_default_jid, SearchRequest(q=search_term), ) except Exception as e: log.warning(f"Can't do external search: {e}.") return True for search_item in external_items: room_search_item = plugin_misc_jid_search.RoomSearchItem( entity=search_item.address, name=( search_item.name or search_item.address.user or search_item.address.full() ), local=False, service_type=search_item.service_type, is_open=search_item.is_open, anonymity_mode=search_item.anonymity_mode, description=search_item.description, language=search_item.language, nusers=search_item.nusers, ) matches.append(room_search_item) return True def _search( self, target: str, search_request: str, profile: str ) -> defer.Deferred[str]: client = self.host.get_client(profile) d = defer.ensureDeferred( self.search( client, jid.JID(target), SearchRequest.model_validate_json(search_request) ) ) d.addCallback( lambda search_items: search_items.model_dump_json(exclude_none=True) ) d = cast(defer.Deferred[str], d) return d async def search( self, client: SatXMPPEntity, target: jid.JID, search_request: SearchRequest ) -> SearchItems: """Do a Search""" iq_elt = client.IQ("get") iq_elt.addChild(search_request.to_element()) iq_result_elt = await iq_elt.send(target.full()) return SearchItems.from_element(iq_result_elt)