Mercurial > libervia-backend
changeset 4360:5ea4f5f28082
plugin XEP-0433: Extended Channel Search implementation:
Implements client part of XEP-0433, and add its results to plugin JID Search.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Apr 2025 18:19:28 +0200 |
parents | a987a8ce34b9 |
children | 676a320415b9 |
files | libervia/backend/plugins/plugin_xep_0433.py |
diffstat | 1 files changed, 350 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0433.py Fri Apr 11 18:19:28 2025 +0200 @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Extended Channel Search (XEP-0433) +# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import difflib +from typing import Any, Final, Iterator, Self, cast +from pydantic import BaseModel, Field, ConfigDict, RootModel, model_validator +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish +from wokkel import data_form +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.models.types import JIDType +from libervia.backend.plugins import plugin_misc_jid_search +from libervia.backend.plugins.plugin_xep_0059 import RSMRequest + +log = getLogger(__name__) + +# Namespaces +NS_CHANNEL_SEARCH: Final[str] = "urn:xmpp:channel-search:0" +NS_SEARCH: Final[str] = f"{NS_CHANNEL_SEARCH}:search" +NS_SEARCH_PARAMS: Final[str] = f"{NS_CHANNEL_SEARCH}:search-params" +NS_ORDER: Final[str] = f"{NS_CHANNEL_SEARCH}:order" +NS_ERROR: Final[str] = f"{NS_CHANNEL_SEARCH}:error" + +# Common sort keys +SORT_ADDRESS: Final[str] = f"{{{NS_ORDER}}}address" +SORT_NUSERS: Final[str] = f"{{{NS_ORDER}}}nusers" + +PLUGIN_INFO = { + C.PI_NAME: "Extended Channel Search", + C.PI_IMPORT_NAME: "XEP-0433", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_DEPENDENCIES: ["XEP-0059", "JID_SEARCH"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0433", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("Cross-domain search for public group chats"), +} + + +class SearchRequest(BaseModel): + """Parameters for channel search request.""" + + model_config = ConfigDict(extra="forbid") + + query: str | None = Field(None, alias="q") + all: bool = False + sinname: bool = True + sindescription: bool = True + sinaddr: bool = True + min_users: int | None = Field(default=None, ge=0) + types: list[str] = [] + key: str = SORT_ADDRESS + rsm: RSMRequest | None = None + + @model_validator(mode="after") + def check_conflicts(self) -> Self: + if self.all and self.query: + raise ValueError('Cannot combine "all" with search query') + return self + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Parse from XMPP data form element.""" + form = data_form.Form.fromElement(element) + if form.formNamespace != NS_SEARCH_PARAMS: + raise ValueError("Invalid FORM_TYPE") + + kwargs = {} + + if "q" in form: + kwargs["query"] = form["q"] + + if "all" in form: + kwargs["all"] = form["all"] + + if "min_users" in form: + try: + kwargs["min_users"] = int(form["min_users"]) + except ValueError: + raise ValueError("Invalid min_users value") + + for field in ["sinname", "sindescription", "sinaddr", "types", "key"]: + if field in form: + kwargs[field] = form[field] + + return cls(**kwargs) + + def to_form(self) -> data_form.Form: + """Convert to "submit" data form""" + form = data_form.Form("submit", formNamespace=NS_SEARCH_PARAMS) + + # Add fields with original XML field names + if self.query is not None: + form.addField(data_form.Field(var="q", value=self.query)) + + if self.all: + form.addField(data_form.Field("boolean", "all", value=True)) + + if not self.sinname: + form.addField(data_form.Field("boolean", "sinname", value=False)) + + if not self.sindescription: + form.addField(data_form.Field("boolean", "sindescription", value=False)) + + if not self.sinaddr: + form.addField(data_form.Field("boolean", "sinaddr", value=False)) + + if self.min_users is not None: + form.addField(data_form.Field(var="min_users", value=str(self.min_users))) + + if self.types: + form.addField(data_form.Field("list-multi", "types", values=self.types)) + + if self.key != SORT_ADDRESS: + form.addField(data_form.Field("list-single", "key", value=self.key)) + + return form + + def to_element(self) -> domish.Element: + """Convert to XMPP data form submission.""" + form = self.to_form() + search_elt = domish.Element((NS_SEARCH, "search")) + search_elt.addChild(form.toElement()) + if self.rsm is not None: + search_elt.addChild(self.rsm.to_element()) + + return search_elt + + +class SearchItem(BaseModel): + """Represents a single channel search result.""" + + address: JIDType + name: str | None = None + description: str | None = None + language: str | None = None + nusers: int | None = Field(default=None, ge=0) + service_type: str | None = None + is_open: bool | None = None + anonymity_mode: str | None = None + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Parse from <item> element.""" + if not (element.name == "item" and element.uri == NS_SEARCH): + raise ValueError("Invalid channel item element") + + address = element.getAttribute("address") + if not address: + raise ValueError("Missing required address attribute") + + data: dict[str, Any] = {"address": jid.JID(address)} + + for child in element.elements(): + if child.uri != NS_SEARCH: + continue + + content = str(child) + match (name := child.name.replace("-", "_")): + case "nusers": + data[name] = int(content) + case "is_open": + data[name] = content.lower() == "true" + case "service_type" | "anonymity_mode" if content: + data[name] = content + case _: + data[name] = content + + return cls(**data) + + def to_element(self) -> domish.Element: + """Convert to <item> element.""" + item = domish.Element((NS_SEARCH, "item")) + item["address"] = str(self.address) + + field_mappings = { + "name": "name", + "description": "description", + "language": "language", + "nusers": "nusers", + "service_type": "service-type", + "anonymity_mode": "anonymity-mode", + } + + for field, element_name in field_mappings.items(): + value = getattr(self, field) + if value is not None: + elem = item.addElement(element_name, NS_SEARCH) + elem.addContent( + str(value).lower() if isinstance(value, bool) else str(value) + ) + + if self.is_open is not None: + item.addElement( + ("is-open", NS_SEARCH), content="true" if self.is_open else "false" + ) + + return item + + +class SearchItems(RootModel): + root: list[SearchItem] + + def __iter__(self) -> Iterator[SearchItem]: # type: ignore + return iter(self.root) + + def __getitem__(self, item) -> str: + return self.root[item] + + def __len__(self) -> int: + return len(self.root) + + def append(self, item: SearchItem) -> None: + self.root.append(item) + + def sort(self, key=None, reverse=False) -> None: + self.root.sort(key=key, reverse=reverse) # type: ignore + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + if element.name == "result" and element.uri == NS_SEARCH: + result_elt = element + else: + try: + result_elt = next(element.elements(NS_SEARCH, "result")) + except StopIteration: + raise exceptions.NotFound("No <result> element found.") + items = [] + for item_elt in result_elt.elements(NS_SEARCH, "item"): + items.append(SearchItem.from_element(item_elt)) + return cls(items) + + def to_element(self) -> domish.Element: + result_elt = domish.Element((NS_SEARCH, "result")) + for search_item in self.root: + result_elt.addChild(search_item.to_element()) + return result_elt + + +class XEP_0433: + """Implementation of XEP-0433 Extended Channel Search.""" + + namespace: Final[str] = NS_CHANNEL_SEARCH + + def __init__(self, host: Any): + log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") + self.host = host + host.trigger.add( + "JID_SEARCH_perform_search", self.jid_search_perform_search_trigger + ) + self.allow_external = C.bool( + host.memory.config_get(None, "allow_external_search", C.BOOL_FALSE) + ) + self.group_chat_search_default_jid = jid.JID( + host.memory.config_get( + None, "group_chat_search_default_jid", "api@search.jabber.network" + ) + ) + + host.bridge.add_method( + "extended_search_request", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._search, + async_=True, + ) + + async def jid_search_perform_search_trigger( + self, + client: SatXMPPEntity, + search_term: str, + options: plugin_misc_jid_search.Options, + sequence_matcher: difflib.SequenceMatcher, + matches: plugin_misc_jid_search.SearchItems, + ) -> bool: + if options.groupchat and self.allow_external: + log.debug(f"Search {search_term!r} at {self.group_chat_search_default_jid}.") + try: + external_items = await self.search( + client, + self.group_chat_search_default_jid, + SearchRequest(q=search_term), + ) + except Exception as e: + log.warning(f"Can't do external search: {e}.") + return True + for search_item in external_items: + room_search_item = plugin_misc_jid_search.RoomSearchItem( + entity=search_item.address, + name=( + search_item.name + or search_item.address.user + or search_item.address.full() + ), + local=False, + service_type=search_item.service_type, + is_open=search_item.is_open, + anonymity_mode=search_item.anonymity_mode, + description=search_item.description, + language=search_item.language, + nusers=search_item.nusers, + ) + matches.append(room_search_item) + return True + + def _search( + self, target: str, search_request: str, profile: str + ) -> defer.Deferred[str]: + client = self.host.get_client(profile) + d = defer.ensureDeferred( + self.search( + client, jid.JID(target), SearchRequest.model_validate_json(search_request) + ) + ) + d.addCallback( + lambda search_items: search_items.model_dump_json(exclude_none=True) + ) + d = cast(defer.Deferred[str], d) + return d + + async def search( + self, client: SatXMPPEntity, target: jid.JID, search_request: SearchRequest + ) -> SearchItems: + """Do a Search""" + iq_elt = client.IQ("get") + iq_elt.addChild(search_request.to_element()) + iq_result_elt = await iq_elt.send(target.full()) + return SearchItems.from_element(iq_result_elt)