Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0433.py @ 4360:5ea4f5f28082
plugin XEP-0433: Extended Channel Search implementation:
Implements client part of XEP-0433, and add its results to plugin JID Search.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Apr 2025 18:19:28 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4359:a987a8ce34b9 | 4360:5ea4f5f28082 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Extended Channel Search (XEP-0433) | |
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import difflib | |
20 from typing import Any, Final, Iterator, Self, cast | |
21 from pydantic import BaseModel, Field, ConfigDict, RootModel, model_validator | |
22 from twisted.internet import defer | |
23 from twisted.words.protocols.jabber import jid | |
24 from twisted.words.xish import domish | |
25 from wokkel import data_form | |
26 from libervia.backend.core import exceptions | |
27 from libervia.backend.core.constants import Const as C | |
28 from libervia.backend.core.core_types import SatXMPPEntity | |
29 from libervia.backend.core.i18n import _ | |
30 from libervia.backend.core.log import getLogger | |
31 from libervia.backend.models.types import JIDType | |
32 from libervia.backend.plugins import plugin_misc_jid_search | |
33 from libervia.backend.plugins.plugin_xep_0059 import RSMRequest | |
34 | |
35 log = getLogger(__name__) | |
36 | |
37 # Namespaces | |
38 NS_CHANNEL_SEARCH: Final[str] = "urn:xmpp:channel-search:0" | |
39 NS_SEARCH: Final[str] = f"{NS_CHANNEL_SEARCH}:search" | |
40 NS_SEARCH_PARAMS: Final[str] = f"{NS_CHANNEL_SEARCH}:search-params" | |
41 NS_ORDER: Final[str] = f"{NS_CHANNEL_SEARCH}:order" | |
42 NS_ERROR: Final[str] = f"{NS_CHANNEL_SEARCH}:error" | |
43 | |
44 # Common sort keys | |
45 SORT_ADDRESS: Final[str] = f"{{{NS_ORDER}}}address" | |
46 SORT_NUSERS: Final[str] = f"{{{NS_ORDER}}}nusers" | |
47 | |
48 PLUGIN_INFO = { | |
49 C.PI_NAME: "Extended Channel Search", | |
50 C.PI_IMPORT_NAME: "XEP-0433", | |
51 C.PI_TYPE: "XEP", | |
52 C.PI_MODES: C.PLUG_MODE_BOTH, | |
53 C.PI_DEPENDENCIES: ["XEP-0059", "JID_SEARCH"], | |
54 C.PI_RECOMMENDATIONS: [], | |
55 C.PI_MAIN: "XEP_0433", | |
56 C.PI_HANDLER: "no", | |
57 C.PI_DESCRIPTION: _("Cross-domain search for public group chats"), | |
58 } | |
59 | |
60 | |
61 class SearchRequest(BaseModel): | |
62 """Parameters for channel search request.""" | |
63 | |
64 model_config = ConfigDict(extra="forbid") | |
65 | |
66 query: str | None = Field(None, alias="q") | |
67 all: bool = False | |
68 sinname: bool = True | |
69 sindescription: bool = True | |
70 sinaddr: bool = True | |
71 min_users: int | None = Field(default=None, ge=0) | |
72 types: list[str] = [] | |
73 key: str = SORT_ADDRESS | |
74 rsm: RSMRequest | None = None | |
75 | |
76 @model_validator(mode="after") | |
77 def check_conflicts(self) -> Self: | |
78 if self.all and self.query: | |
79 raise ValueError('Cannot combine "all" with search query') | |
80 return self | |
81 | |
82 @classmethod | |
83 def from_element(cls, element: domish.Element) -> Self: | |
84 """Parse from XMPP data form element.""" | |
85 form = data_form.Form.fromElement(element) | |
86 if form.formNamespace != NS_SEARCH_PARAMS: | |
87 raise ValueError("Invalid FORM_TYPE") | |
88 | |
89 kwargs = {} | |
90 | |
91 if "q" in form: | |
92 kwargs["query"] = form["q"] | |
93 | |
94 if "all" in form: | |
95 kwargs["all"] = form["all"] | |
96 | |
97 if "min_users" in form: | |
98 try: | |
99 kwargs["min_users"] = int(form["min_users"]) | |
100 except ValueError: | |
101 raise ValueError("Invalid min_users value") | |
102 | |
103 for field in ["sinname", "sindescription", "sinaddr", "types", "key"]: | |
104 if field in form: | |
105 kwargs[field] = form[field] | |
106 | |
107 return cls(**kwargs) | |
108 | |
109 def to_form(self) -> data_form.Form: | |
110 """Convert to "submit" data form""" | |
111 form = data_form.Form("submit", formNamespace=NS_SEARCH_PARAMS) | |
112 | |
113 # Add fields with original XML field names | |
114 if self.query is not None: | |
115 form.addField(data_form.Field(var="q", value=self.query)) | |
116 | |
117 if self.all: | |
118 form.addField(data_form.Field("boolean", "all", value=True)) | |
119 | |
120 if not self.sinname: | |
121 form.addField(data_form.Field("boolean", "sinname", value=False)) | |
122 | |
123 if not self.sindescription: | |
124 form.addField(data_form.Field("boolean", "sindescription", value=False)) | |
125 | |
126 if not self.sinaddr: | |
127 form.addField(data_form.Field("boolean", "sinaddr", value=False)) | |
128 | |
129 if self.min_users is not None: | |
130 form.addField(data_form.Field(var="min_users", value=str(self.min_users))) | |
131 | |
132 if self.types: | |
133 form.addField(data_form.Field("list-multi", "types", values=self.types)) | |
134 | |
135 if self.key != SORT_ADDRESS: | |
136 form.addField(data_form.Field("list-single", "key", value=self.key)) | |
137 | |
138 return form | |
139 | |
140 def to_element(self) -> domish.Element: | |
141 """Convert to XMPP data form submission.""" | |
142 form = self.to_form() | |
143 search_elt = domish.Element((NS_SEARCH, "search")) | |
144 search_elt.addChild(form.toElement()) | |
145 if self.rsm is not None: | |
146 search_elt.addChild(self.rsm.to_element()) | |
147 | |
148 return search_elt | |
149 | |
150 | |
151 class SearchItem(BaseModel): | |
152 """Represents a single channel search result.""" | |
153 | |
154 address: JIDType | |
155 name: str | None = None | |
156 description: str | None = None | |
157 language: str | None = None | |
158 nusers: int | None = Field(default=None, ge=0) | |
159 service_type: str | None = None | |
160 is_open: bool | None = None | |
161 anonymity_mode: str | None = None | |
162 | |
163 @classmethod | |
164 def from_element(cls, element: domish.Element) -> Self: | |
165 """Parse from <item> element.""" | |
166 if not (element.name == "item" and element.uri == NS_SEARCH): | |
167 raise ValueError("Invalid channel item element") | |
168 | |
169 address = element.getAttribute("address") | |
170 if not address: | |
171 raise ValueError("Missing required address attribute") | |
172 | |
173 data: dict[str, Any] = {"address": jid.JID(address)} | |
174 | |
175 for child in element.elements(): | |
176 if child.uri != NS_SEARCH: | |
177 continue | |
178 | |
179 content = str(child) | |
180 match (name := child.name.replace("-", "_")): | |
181 case "nusers": | |
182 data[name] = int(content) | |
183 case "is_open": | |
184 data[name] = content.lower() == "true" | |
185 case "service_type" | "anonymity_mode" if content: | |
186 data[name] = content | |
187 case _: | |
188 data[name] = content | |
189 | |
190 return cls(**data) | |
191 | |
192 def to_element(self) -> domish.Element: | |
193 """Convert to <item> element.""" | |
194 item = domish.Element((NS_SEARCH, "item")) | |
195 item["address"] = str(self.address) | |
196 | |
197 field_mappings = { | |
198 "name": "name", | |
199 "description": "description", | |
200 "language": "language", | |
201 "nusers": "nusers", | |
202 "service_type": "service-type", | |
203 "anonymity_mode": "anonymity-mode", | |
204 } | |
205 | |
206 for field, element_name in field_mappings.items(): | |
207 value = getattr(self, field) | |
208 if value is not None: | |
209 elem = item.addElement(element_name, NS_SEARCH) | |
210 elem.addContent( | |
211 str(value).lower() if isinstance(value, bool) else str(value) | |
212 ) | |
213 | |
214 if self.is_open is not None: | |
215 item.addElement( | |
216 ("is-open", NS_SEARCH), content="true" if self.is_open else "false" | |
217 ) | |
218 | |
219 return item | |
220 | |
221 | |
222 class SearchItems(RootModel): | |
223 root: list[SearchItem] | |
224 | |
225 def __iter__(self) -> Iterator[SearchItem]: # type: ignore | |
226 return iter(self.root) | |
227 | |
228 def __getitem__(self, item) -> str: | |
229 return self.root[item] | |
230 | |
231 def __len__(self) -> int: | |
232 return len(self.root) | |
233 | |
234 def append(self, item: SearchItem) -> None: | |
235 self.root.append(item) | |
236 | |
237 def sort(self, key=None, reverse=False) -> None: | |
238 self.root.sort(key=key, reverse=reverse) # type: ignore | |
239 | |
240 @classmethod | |
241 def from_element(cls, element: domish.Element) -> Self: | |
242 if element.name == "result" and element.uri == NS_SEARCH: | |
243 result_elt = element | |
244 else: | |
245 try: | |
246 result_elt = next(element.elements(NS_SEARCH, "result")) | |
247 except StopIteration: | |
248 raise exceptions.NotFound("No <result> element found.") | |
249 items = [] | |
250 for item_elt in result_elt.elements(NS_SEARCH, "item"): | |
251 items.append(SearchItem.from_element(item_elt)) | |
252 return cls(items) | |
253 | |
254 def to_element(self) -> domish.Element: | |
255 result_elt = domish.Element((NS_SEARCH, "result")) | |
256 for search_item in self.root: | |
257 result_elt.addChild(search_item.to_element()) | |
258 return result_elt | |
259 | |
260 | |
261 class XEP_0433: | |
262 """Implementation of XEP-0433 Extended Channel Search.""" | |
263 | |
264 namespace: Final[str] = NS_CHANNEL_SEARCH | |
265 | |
266 def __init__(self, host: Any): | |
267 log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") | |
268 self.host = host | |
269 host.trigger.add( | |
270 "JID_SEARCH_perform_search", self.jid_search_perform_search_trigger | |
271 ) | |
272 self.allow_external = C.bool( | |
273 host.memory.config_get(None, "allow_external_search", C.BOOL_FALSE) | |
274 ) | |
275 self.group_chat_search_default_jid = jid.JID( | |
276 host.memory.config_get( | |
277 None, "group_chat_search_default_jid", "api@search.jabber.network" | |
278 ) | |
279 ) | |
280 | |
281 host.bridge.add_method( | |
282 "extended_search_request", | |
283 ".plugin", | |
284 in_sign="sss", | |
285 out_sign="s", | |
286 method=self._search, | |
287 async_=True, | |
288 ) | |
289 | |
290 async def jid_search_perform_search_trigger( | |
291 self, | |
292 client: SatXMPPEntity, | |
293 search_term: str, | |
294 options: plugin_misc_jid_search.Options, | |
295 sequence_matcher: difflib.SequenceMatcher, | |
296 matches: plugin_misc_jid_search.SearchItems, | |
297 ) -> bool: | |
298 if options.groupchat and self.allow_external: | |
299 log.debug(f"Search {search_term!r} at {self.group_chat_search_default_jid}.") | |
300 try: | |
301 external_items = await self.search( | |
302 client, | |
303 self.group_chat_search_default_jid, | |
304 SearchRequest(q=search_term), | |
305 ) | |
306 except Exception as e: | |
307 log.warning(f"Can't do external search: {e}.") | |
308 return True | |
309 for search_item in external_items: | |
310 room_search_item = plugin_misc_jid_search.RoomSearchItem( | |
311 entity=search_item.address, | |
312 name=( | |
313 search_item.name | |
314 or search_item.address.user | |
315 or search_item.address.full() | |
316 ), | |
317 local=False, | |
318 service_type=search_item.service_type, | |
319 is_open=search_item.is_open, | |
320 anonymity_mode=search_item.anonymity_mode, | |
321 description=search_item.description, | |
322 language=search_item.language, | |
323 nusers=search_item.nusers, | |
324 ) | |
325 matches.append(room_search_item) | |
326 return True | |
327 | |
328 def _search( | |
329 self, target: str, search_request: str, profile: str | |
330 ) -> defer.Deferred[str]: | |
331 client = self.host.get_client(profile) | |
332 d = defer.ensureDeferred( | |
333 self.search( | |
334 client, jid.JID(target), SearchRequest.model_validate_json(search_request) | |
335 ) | |
336 ) | |
337 d.addCallback( | |
338 lambda search_items: search_items.model_dump_json(exclude_none=True) | |
339 ) | |
340 d = cast(defer.Deferred[str], d) | |
341 return d | |
342 | |
343 async def search( | |
344 self, client: SatXMPPEntity, target: jid.JID, search_request: SearchRequest | |
345 ) -> SearchItems: | |
346 """Do a Search""" | |
347 iq_elt = client.IQ("get") | |
348 iq_elt.addChild(search_request.to_element()) | |
349 iq_result_elt = await iq_elt.send(target.full()) | |
350 return SearchItems.from_element(iq_result_elt) |