Mercurial > libervia-backend
view libervia/backend/plugins/plugin_misc_jid_search.py @ 4309:b56b1eae7994
component email gateway: add multicasting:
XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033
metadata are converted to suitable Email headers and vice versa.
Email address and JID are both supported, and delivery is done by the gateway when
suitable on incoming messages.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle XMPP entities search # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections import OrderedDict from dataclasses import dataclass, asdict import difflib from typing import List, Optional from twisted.internet import defer from twisted.words.protocols.jabber import jid from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools.common import data_format log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "JID Search", C.PI_IMPORT_NAME: "JID_SEARCH", C.PI_TYPE: C.PLUG_TYPE_MISC, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "JidSearch", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Search for XMPP entities"""), } RATIO_CUTOFF = 0.6 MAX_CACHE_SIZE = 10 @dataclass class JidSearchItem: entity: jid.JID name: str = "" in_roster: bool = False groups: list[str] | None = None exact_match: bool = False relevance: float | None = None JidSearchCache = OrderedDict[str, list[JidSearchItem]] class JidSearch: def __init__(self, host) -> None: log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host host.bridge.add_method( "jid_search", ".plugin", in_sign="sss", out_sign="s", method=self._search, async_=True, ) def profile_connecting(self, client: SatXMPPEntity) -> None: client._jid_search_cache = JidSearchCache() def _search(self, search_term: str, options_s: str, profile: str) -> defer.Deferred: client = self.host.get_client(profile) d = defer.ensureDeferred( self.search(client, search_term, data_format.deserialise(options_s)) ) d.addCallback( lambda search_items: data_format.serialise([asdict(i) for i in search_items]) ) return d async def search( self, client: SatXMPPEntity, search_term: str, options: Optional[dict] = None ) -> List[JidSearchItem]: """Searches for entities in various locations. @param client: The SatXMPPEntity client where the search is to be performed. @param search_term: The query to be searched. @param options: Additional search options. @return: A list of matches found. """ search_term = search_term.strip().lower() sequence_matcher = difflib.SequenceMatcher() sequence_matcher.set_seq1(search_term) # FIXME: cache can give different results due to the filtering mechanism (if a # cached search term match the beginning of current search term, its results a # re-used and filtered, and sometimes items can be missing in compraison to the # results without caching). This may need to be fixed. cache: JidSearchCache = client._jid_search_cache # Look for a match in the cache for cache_key in cache: if search_term.startswith(cache_key): log.debug( f"Match found in cache for {search_term!r} in [{client.profile}]." ) # If an exact match is found, return the results as is if search_term == cache_key: log.debug("Exact match found in cache, reusing results.") matches = cache[cache_key] else: # If only the beginning matches, filter the cache results log.debug("Prefix match found in cache, filtering results.") matches = [] for jid_search_item in cache[cache_key]: self._process_matching( search_term, sequence_matcher, matches, jid_search_item ) cache.move_to_end(cache_key) break else: # If no match is found in the cache, perform a new search matches = await self._perform_search(client, search_term, sequence_matcher) cache[search_term] = matches if len(cache) > MAX_CACHE_SIZE: cache.popitem(last=False) # If no exact match is found, but the search term is a valid JID, we add the JID # as a result exact_match = any(m.exact_match for m in matches) if not exact_match and "@" in search_term: try: search_jid = jid.JID(search_term) except jid.InvalidFormat: pass else: matches.append( JidSearchItem( entity=search_jid, in_roster=False, exact_match=True, relevance=1, ) ) matches.sort( key=lambda item: (item.exact_match, item.relevance or 0, item.in_roster), reverse=True, ) return matches def _process_matching( self, search_term: str, sequence_matcher: difflib.SequenceMatcher, matches: List[JidSearchItem], item: JidSearchItem, ) -> None: """Process matching of items @param sequence_matcher: The sequence matcher to be used for the matching process. @param matches: A list where the match is to be appended. @param item: The item that to be matched. @return: True if it was an exact match """ item_name_lower = item.name.lower() item_entity_lower = item.entity.full().lower() if search_term in (item_name_lower, item_entity_lower): item.exact_match = True item.relevance = 1 matches.append(item) return item.exact_match = False sequence_matcher.set_seq2(item_name_lower) name_ratio = sequence_matcher.ratio() if name_ratio >= RATIO_CUTOFF: item.relevance = name_ratio matches.append(item) return sequence_matcher.set_seq2(item_entity_lower) jid_ratio = sequence_matcher.ratio() if jid_ratio >= RATIO_CUTOFF: item.relevance = jid_ratio matches.append(item) return localpart = item.entity.user.lower() if item.entity.user else "" if localpart: sequence_matcher.set_seq2(localpart) domain_ratio = sequence_matcher.ratio() if domain_ratio >= RATIO_CUTOFF: item.relevance = domain_ratio matches.append(item) return if item.groups: group_ratios = [] for group in item.groups: sequence_matcher.set_seq2(group.lower()) group_ratios.append(sequence_matcher.ratio()) group_ratio = max(group_ratios) if group_ratio >= RATIO_CUTOFF: item.relevance = group_ratio matches.append(item) return domain = item.entity.host.lower() sequence_matcher.set_seq2(domain) domain_ratio = sequence_matcher.ratio() if domain_ratio >= RATIO_CUTOFF: item.relevance = domain_ratio matches.append(item) return async def _perform_search( self, client: SatXMPPEntity, search_term: str, sequence_matcher: difflib.SequenceMatcher, ) -> List[JidSearchItem]: """Performs a new search when no match is found in the cache. @param search_term: The query to be searched. @param sequence_matcher: The SequenceMatcher object to be used for matching. @return: A list of matches found. """ matches = [] try: roster = client.roster except AttributeError: # components have no roster roster = [] else: roster = client.roster.get_items() for roster_item in roster: jid_search_item = JidSearchItem( entity=roster_item.entity, name=roster_item.name, in_roster=True, groups=list(roster_item.groups), ) self._process_matching( search_term, sequence_matcher, matches, jid_search_item ) return matches