view libervia/backend/plugins/plugin_misc_jid_search.py @ 4141:ba8ddfdd334f

cli (loops): run GLib loop in same thread as asyncio: use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:05:53 +0100 (14 months ago)
parents 238e305f2306
children 0d7bb4df2343
line wrap: on
line source
#!/usr/bin/env python3

# Libervia plugin to handle XMPP entities search
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import OrderedDict
from dataclasses import dataclass, asdict
import difflib
from typing import List, Optional

from twisted.internet import defer
from twisted.words.protocols.jabber import jid

from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import data_format

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "JID Search",
    C.PI_IMPORT_NAME: "JID_SEARCH",
    C.PI_TYPE: C.PLUG_TYPE_MISC,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "JidSearch",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Search for XMPP entities"""),
}
RATIO_CUTOFF = 0.6
MAX_CACHE_SIZE = 10


@dataclass
class JidSearchItem:
    entity: jid.JID
    name: str = ""
    in_roster: bool = False
    groups: list[str] | None = None
    exact_match: bool = False
    relevance: float | None = None


JidSearchCache = OrderedDict[str, list[JidSearchItem]]


class JidSearch:
    def __init__(self, host) -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.bridge.add_method(
            "jid_search",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._search,
            async_=True,
        )

    def profile_connecting(self, client: SatXMPPEntity) -> None:
        client._jid_search_cache = JidSearchCache()

    def _search(self, search_term: str, options_s: str, profile: str) -> defer.Deferred:
        client = self.host.get_client(profile)
        d = defer.ensureDeferred(
            self.search(client, search_term, data_format.deserialise(options_s))
        )
        d.addCallback(
            lambda search_items: data_format.serialise([asdict(i) for i in search_items])
        )
        return d

    async def search(
        self, client: SatXMPPEntity, search_term: str, options: Optional[dict] = None
    ) -> List[JidSearchItem]:
        """Searches for entities in various locations.

        @param client: The SatXMPPEntity client where the search is to be performed.
        @param search_term: The query to be searched.
        @param options: Additional search options.
        @return: A list of matches found.
        """
        search_term = search_term.strip().lower()
        sequence_matcher = difflib.SequenceMatcher()
        sequence_matcher.set_seq1(search_term)
        # FIXME: cache can give different results due to the filtering mechanism (if a
        #   cached search term match the beginning of current search term, its results a
        #   re-used and filtered, and sometimes items can be missing in compraison to the
        #   results without caching). This may need to be fixed.
        cache: JidSearchCache = client._jid_search_cache

        # Look for a match in the cache
        for cache_key in cache:
            if search_term.startswith(cache_key):
                log.debug(
                    f"Match found in cache for {search_term!r} in [{client.profile}]."
                )
                # If an exact match is found, return the results as is
                if search_term == cache_key:
                    log.debug("Exact match found in cache, reusing results.")
                    matches = cache[cache_key]
                else:
                    # If only the beginning matches, filter the cache results
                    log.debug("Prefix match found in cache, filtering results.")
                    matches = []
                    for jid_search_item in cache[cache_key]:
                        self._process_matching(
                            search_term, sequence_matcher, matches, jid_search_item
                        )
                cache.move_to_end(cache_key)
                break
        else:
            # If no match is found in the cache, perform a new search
            matches = await self._perform_search(client, search_term, sequence_matcher)
            cache[search_term] = matches
            if len(cache) > MAX_CACHE_SIZE:
                cache.popitem(last=False)

        # If no exact match is found, but the search term is a valid JID, we add the JID
        # as a result
        exact_match = any(m.exact_match for m in matches)
        if not exact_match and "@" in search_term:
            try:
                search_jid = jid.JID(search_term)
            except jid.InvalidFormat:
                pass
            else:
                matches.append(
                    JidSearchItem(
                        entity=search_jid,
                        in_roster=False,
                        exact_match=True,
                        relevance=1,
                    )
                )


        matches.sort(
            key=lambda item: (item.exact_match, item.relevance or 0, item.in_roster),
            reverse=True,
        )

        return matches

    def _process_matching(
        self,
        search_term: str,
        sequence_matcher: difflib.SequenceMatcher,
        matches: List[JidSearchItem],
        item: JidSearchItem,
    ) -> None:
        """Process matching of items

        @param sequence_matcher: The sequence matcher to be used for the matching process.
        @param matches: A list where the match is to be appended.
        @param item: The item that to be matched.
        @return: True if it was an exact match
        """

        item_name_lower = item.name.lower()
        item_entity_lower = item.entity.full().lower()

        if search_term in (item_name_lower, item_entity_lower):
            item.exact_match = True
            item.relevance = 1
            matches.append(item)
            return

        item.exact_match = False

        sequence_matcher.set_seq2(item_name_lower)
        name_ratio = sequence_matcher.ratio()
        if name_ratio >= RATIO_CUTOFF:
            item.relevance = name_ratio
            matches.append(item)
            return

        sequence_matcher.set_seq2(item_entity_lower)
        jid_ratio = sequence_matcher.ratio()
        if jid_ratio >= RATIO_CUTOFF:
            item.relevance = jid_ratio
            matches.append(item)
            return

        localpart = item.entity.user.lower() if item.entity.user else ""
        if localpart:
            sequence_matcher.set_seq2(localpart)
            domain_ratio = sequence_matcher.ratio()
            if domain_ratio >= RATIO_CUTOFF:
                item.relevance = domain_ratio
                matches.append(item)
                return

        if item.groups:
            group_ratios = []
            for group in item.groups:
                sequence_matcher.set_seq2(group.lower())
                group_ratios.append(sequence_matcher.ratio())
            group_ratio = max(group_ratios)
            if group_ratio >= RATIO_CUTOFF:
                item.relevance = group_ratio
                matches.append(item)
                return

        domain = item.entity.host.lower()
        sequence_matcher.set_seq2(domain)
        domain_ratio = sequence_matcher.ratio()
        if domain_ratio >= RATIO_CUTOFF:
            item.relevance = domain_ratio
            matches.append(item)
            return

    async def _perform_search(
        self,
        client: SatXMPPEntity,
        search_term: str,
        sequence_matcher: difflib.SequenceMatcher,
    ) -> List[JidSearchItem]:
        """Performs a new search when no match is found in the cache.

        @param search_term: The query to be searched.
        @param sequence_matcher: The SequenceMatcher object to be used for matching.
        @return: A list of matches found.
        """
        matches = []

        try:
            roster = client.roster
        except AttributeError:
            # components have no roster
            roster = []
        else:
            roster = client.roster.get_items()

        for roster_item in roster:
            jid_search_item = JidSearchItem(
                entity=roster_item.entity,
                name=roster_item.name,
                in_roster=True,
                groups=list(roster_item.groups),
            )

            self._process_matching(
                search_term, sequence_matcher, matches, jid_search_item
            )

        return matches