comparison libervia/backend/plugins/plugin_xep_0433.py @ 4360:5ea4f5f28082

plugin XEP-0433: Extended Channel Search implementation: Implements client part of XEP-0433, and add its results to plugin JID Search.
author Goffi <goffi@goffi.org>
date Fri, 11 Apr 2025 18:19:28 +0200
parents
children
comparison
equal deleted inserted replaced
4359:a987a8ce34b9 4360:5ea4f5f28082
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Extended Channel Search (XEP-0433)
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import difflib
20 from typing import Any, Final, Iterator, Self, cast
21 from pydantic import BaseModel, Field, ConfigDict, RootModel, model_validator
22 from twisted.internet import defer
23 from twisted.words.protocols.jabber import jid
24 from twisted.words.xish import domish
25 from wokkel import data_form
26 from libervia.backend.core import exceptions
27 from libervia.backend.core.constants import Const as C
28 from libervia.backend.core.core_types import SatXMPPEntity
29 from libervia.backend.core.i18n import _
30 from libervia.backend.core.log import getLogger
31 from libervia.backend.models.types import JIDType
32 from libervia.backend.plugins import plugin_misc_jid_search
33 from libervia.backend.plugins.plugin_xep_0059 import RSMRequest
34
35 log = getLogger(__name__)
36
37 # Namespaces
38 NS_CHANNEL_SEARCH: Final[str] = "urn:xmpp:channel-search:0"
39 NS_SEARCH: Final[str] = f"{NS_CHANNEL_SEARCH}:search"
40 NS_SEARCH_PARAMS: Final[str] = f"{NS_CHANNEL_SEARCH}:search-params"
41 NS_ORDER: Final[str] = f"{NS_CHANNEL_SEARCH}:order"
42 NS_ERROR: Final[str] = f"{NS_CHANNEL_SEARCH}:error"
43
44 # Common sort keys
45 SORT_ADDRESS: Final[str] = f"{{{NS_ORDER}}}address"
46 SORT_NUSERS: Final[str] = f"{{{NS_ORDER}}}nusers"
47
48 PLUGIN_INFO = {
49 C.PI_NAME: "Extended Channel Search",
50 C.PI_IMPORT_NAME: "XEP-0433",
51 C.PI_TYPE: "XEP",
52 C.PI_MODES: C.PLUG_MODE_BOTH,
53 C.PI_DEPENDENCIES: ["XEP-0059", "JID_SEARCH"],
54 C.PI_RECOMMENDATIONS: [],
55 C.PI_MAIN: "XEP_0433",
56 C.PI_HANDLER: "no",
57 C.PI_DESCRIPTION: _("Cross-domain search for public group chats"),
58 }
59
60
61 class SearchRequest(BaseModel):
62 """Parameters for channel search request."""
63
64 model_config = ConfigDict(extra="forbid")
65
66 query: str | None = Field(None, alias="q")
67 all: bool = False
68 sinname: bool = True
69 sindescription: bool = True
70 sinaddr: bool = True
71 min_users: int | None = Field(default=None, ge=0)
72 types: list[str] = []
73 key: str = SORT_ADDRESS
74 rsm: RSMRequest | None = None
75
76 @model_validator(mode="after")
77 def check_conflicts(self) -> Self:
78 if self.all and self.query:
79 raise ValueError('Cannot combine "all" with search query')
80 return self
81
82 @classmethod
83 def from_element(cls, element: domish.Element) -> Self:
84 """Parse from XMPP data form element."""
85 form = data_form.Form.fromElement(element)
86 if form.formNamespace != NS_SEARCH_PARAMS:
87 raise ValueError("Invalid FORM_TYPE")
88
89 kwargs = {}
90
91 if "q" in form:
92 kwargs["query"] = form["q"]
93
94 if "all" in form:
95 kwargs["all"] = form["all"]
96
97 if "min_users" in form:
98 try:
99 kwargs["min_users"] = int(form["min_users"])
100 except ValueError:
101 raise ValueError("Invalid min_users value")
102
103 for field in ["sinname", "sindescription", "sinaddr", "types", "key"]:
104 if field in form:
105 kwargs[field] = form[field]
106
107 return cls(**kwargs)
108
109 def to_form(self) -> data_form.Form:
110 """Convert to "submit" data form"""
111 form = data_form.Form("submit", formNamespace=NS_SEARCH_PARAMS)
112
113 # Add fields with original XML field names
114 if self.query is not None:
115 form.addField(data_form.Field(var="q", value=self.query))
116
117 if self.all:
118 form.addField(data_form.Field("boolean", "all", value=True))
119
120 if not self.sinname:
121 form.addField(data_form.Field("boolean", "sinname", value=False))
122
123 if not self.sindescription:
124 form.addField(data_form.Field("boolean", "sindescription", value=False))
125
126 if not self.sinaddr:
127 form.addField(data_form.Field("boolean", "sinaddr", value=False))
128
129 if self.min_users is not None:
130 form.addField(data_form.Field(var="min_users", value=str(self.min_users)))
131
132 if self.types:
133 form.addField(data_form.Field("list-multi", "types", values=self.types))
134
135 if self.key != SORT_ADDRESS:
136 form.addField(data_form.Field("list-single", "key", value=self.key))
137
138 return form
139
140 def to_element(self) -> domish.Element:
141 """Convert to XMPP data form submission."""
142 form = self.to_form()
143 search_elt = domish.Element((NS_SEARCH, "search"))
144 search_elt.addChild(form.toElement())
145 if self.rsm is not None:
146 search_elt.addChild(self.rsm.to_element())
147
148 return search_elt
149
150
151 class SearchItem(BaseModel):
152 """Represents a single channel search result."""
153
154 address: JIDType
155 name: str | None = None
156 description: str | None = None
157 language: str | None = None
158 nusers: int | None = Field(default=None, ge=0)
159 service_type: str | None = None
160 is_open: bool | None = None
161 anonymity_mode: str | None = None
162
163 @classmethod
164 def from_element(cls, element: domish.Element) -> Self:
165 """Parse from <item> element."""
166 if not (element.name == "item" and element.uri == NS_SEARCH):
167 raise ValueError("Invalid channel item element")
168
169 address = element.getAttribute("address")
170 if not address:
171 raise ValueError("Missing required address attribute")
172
173 data: dict[str, Any] = {"address": jid.JID(address)}
174
175 for child in element.elements():
176 if child.uri != NS_SEARCH:
177 continue
178
179 content = str(child)
180 match (name := child.name.replace("-", "_")):
181 case "nusers":
182 data[name] = int(content)
183 case "is_open":
184 data[name] = content.lower() == "true"
185 case "service_type" | "anonymity_mode" if content:
186 data[name] = content
187 case _:
188 data[name] = content
189
190 return cls(**data)
191
192 def to_element(self) -> domish.Element:
193 """Convert to <item> element."""
194 item = domish.Element((NS_SEARCH, "item"))
195 item["address"] = str(self.address)
196
197 field_mappings = {
198 "name": "name",
199 "description": "description",
200 "language": "language",
201 "nusers": "nusers",
202 "service_type": "service-type",
203 "anonymity_mode": "anonymity-mode",
204 }
205
206 for field, element_name in field_mappings.items():
207 value = getattr(self, field)
208 if value is not None:
209 elem = item.addElement(element_name, NS_SEARCH)
210 elem.addContent(
211 str(value).lower() if isinstance(value, bool) else str(value)
212 )
213
214 if self.is_open is not None:
215 item.addElement(
216 ("is-open", NS_SEARCH), content="true" if self.is_open else "false"
217 )
218
219 return item
220
221
222 class SearchItems(RootModel):
223 root: list[SearchItem]
224
225 def __iter__(self) -> Iterator[SearchItem]: # type: ignore
226 return iter(self.root)
227
228 def __getitem__(self, item) -> str:
229 return self.root[item]
230
231 def __len__(self) -> int:
232 return len(self.root)
233
234 def append(self, item: SearchItem) -> None:
235 self.root.append(item)
236
237 def sort(self, key=None, reverse=False) -> None:
238 self.root.sort(key=key, reverse=reverse) # type: ignore
239
240 @classmethod
241 def from_element(cls, element: domish.Element) -> Self:
242 if element.name == "result" and element.uri == NS_SEARCH:
243 result_elt = element
244 else:
245 try:
246 result_elt = next(element.elements(NS_SEARCH, "result"))
247 except StopIteration:
248 raise exceptions.NotFound("No <result> element found.")
249 items = []
250 for item_elt in result_elt.elements(NS_SEARCH, "item"):
251 items.append(SearchItem.from_element(item_elt))
252 return cls(items)
253
254 def to_element(self) -> domish.Element:
255 result_elt = domish.Element((NS_SEARCH, "result"))
256 for search_item in self.root:
257 result_elt.addChild(search_item.to_element())
258 return result_elt
259
260
261 class XEP_0433:
262 """Implementation of XEP-0433 Extended Channel Search."""
263
264 namespace: Final[str] = NS_CHANNEL_SEARCH
265
266 def __init__(self, host: Any):
267 log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.")
268 self.host = host
269 host.trigger.add(
270 "JID_SEARCH_perform_search", self.jid_search_perform_search_trigger
271 )
272 self.allow_external = C.bool(
273 host.memory.config_get(None, "allow_external_search", C.BOOL_FALSE)
274 )
275 self.group_chat_search_default_jid = jid.JID(
276 host.memory.config_get(
277 None, "group_chat_search_default_jid", "api@search.jabber.network"
278 )
279 )
280
281 host.bridge.add_method(
282 "extended_search_request",
283 ".plugin",
284 in_sign="sss",
285 out_sign="s",
286 method=self._search,
287 async_=True,
288 )
289
290 async def jid_search_perform_search_trigger(
291 self,
292 client: SatXMPPEntity,
293 search_term: str,
294 options: plugin_misc_jid_search.Options,
295 sequence_matcher: difflib.SequenceMatcher,
296 matches: plugin_misc_jid_search.SearchItems,
297 ) -> bool:
298 if options.groupchat and self.allow_external:
299 log.debug(f"Search {search_term!r} at {self.group_chat_search_default_jid}.")
300 try:
301 external_items = await self.search(
302 client,
303 self.group_chat_search_default_jid,
304 SearchRequest(q=search_term),
305 )
306 except Exception as e:
307 log.warning(f"Can't do external search: {e}.")
308 return True
309 for search_item in external_items:
310 room_search_item = plugin_misc_jid_search.RoomSearchItem(
311 entity=search_item.address,
312 name=(
313 search_item.name
314 or search_item.address.user
315 or search_item.address.full()
316 ),
317 local=False,
318 service_type=search_item.service_type,
319 is_open=search_item.is_open,
320 anonymity_mode=search_item.anonymity_mode,
321 description=search_item.description,
322 language=search_item.language,
323 nusers=search_item.nusers,
324 )
325 matches.append(room_search_item)
326 return True
327
328 def _search(
329 self, target: str, search_request: str, profile: str
330 ) -> defer.Deferred[str]:
331 client = self.host.get_client(profile)
332 d = defer.ensureDeferred(
333 self.search(
334 client, jid.JID(target), SearchRequest.model_validate_json(search_request)
335 )
336 )
337 d.addCallback(
338 lambda search_items: search_items.model_dump_json(exclude_none=True)
339 )
340 d = cast(defer.Deferred[str], d)
341 return d
342
343 async def search(
344 self, client: SatXMPPEntity, target: jid.JID, search_request: SearchRequest
345 ) -> SearchItems:
346 """Do a Search"""
347 iq_elt = client.IQ("get")
348 iq_elt.addChild(search_request.to_element())
349 iq_result_elt = await iq_elt.send(target.full())
350 return SearchItems.from_element(iq_result_elt)