view libervia/backend/plugins/plugin_xep_0059.py @ 4357:f43cbceba2a0

various minor fixes.
author Goffi <goffi@goffi.org>
date Fri, 11 Apr 2025 18:19:28 +0200
parents c9626f46b63e
children
line wrap: on
line source

#!/usr/bin/env python3

# Result Set Management (XEP-0059)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Self
from pydantic import BaseModel, Field, field_validator
from zope.interface import implementer
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel, rsm
from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger


log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Result Set Management",
    C.PI_IMPORT_NAME: "XEP-0059",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0059"],
    C.PI_MAIN: "XEP_0059",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Result Set Management"""),
}

RSM_PREFIX = "rsm_"


class RSMRequest(BaseModel):
    """Pydantic model for RSM request parameters"""
    max: int = Field(default=10, gt=0)
    after: str | None = None
    before: str | None = None
    index: int | None = None

    @field_validator('after')
    def check_after_not_empty(cls, v: str | None) -> str | None:
        """Validate that after value isn't empty string

        Note: Empty before is allowed (means "last page") but empty after is not
        @param v: value to validate
        @return: validated value
        @raise ValueError: if value is an empty string
        """
        if v == "":
            raise ValueError("RSM \"after\" can't be empty")
        return v

    def to_wokkel_request(self) -> rsm.RSMRequest:
        """Convert to wokkel RSMRequest

        @return: wokkel RSMRequest instance
        """
        return rsm.RSMRequest(
            max_=self.max,
            after=self.after,
            before=self.before,
            index=self.index
        )

    @classmethod
    def from_wokkel_request(cls, request: rsm.RSMRequest) -> Self:
        """Create from wokkel RSMRequest

        @param request: wokkel RSMRequest to convert
        @return: RSMRequestModel instance
        """
        return cls(
            max=request.max,
            after=request.after,
            before=request.before,
            index=request.index
        )

    def to_element(self) -> domish.Element:
        """Convert to domish.Element

        @return: XML element representing the RSM request
        """
        return self.to_wokkel_request().toElement()

    @classmethod
    def from_element(cls, element: domish.Element) -> Self:
        """Create from domish.Element

        @param element: XML element to parse
        @return: RSMRequestModel instance
        @raise ValueError: if the element is invalid
        """
        try:
            wokkel_req = rsm.RSMRequest.fromElement(element)
        except rsm.RSMNotFoundError:
            raise ValueError("No RSM set element found")
        except rsm.RSMError as e:
            raise ValueError(str(e))
        return cls.from_wokkel_request(wokkel_req)


class RSMResponse(BaseModel):
    """Pydantic model for RSM response parameters"""
    first: str | None = None
    last: str | None = None
    index: int | None = None
    count: int | None = None

    def to_wokkel_response(self) -> rsm.RSMResponse:
        """Convert to wokkel RSMResponse

        @return: wokkel RSMResponse instance
        """
        return rsm.RSMResponse(
            first=self.first,
            last=self.last,
            index=self.index,
            count=self.count
        )

    @classmethod
    def from_wokkel_response(cls, response: rsm.RSMResponse) -> Self:
        """Create from wokkel RSMResponse

        @param response: wokkel RSMResponse to convert
        @return: RSMResponseModel instance
        """
        return cls(
            first=response.first,
            last=response.last,
            index=response.index,
            count=response.count
        )

    def to_element(self) -> domish.Element:
        """Convert to domish.Element

        @return: XML element representing the RSM response
        """
        return self.to_wokkel_response().toElement()

    @classmethod
    def from_element(cls, element: domish.Element) -> Self:
        """Create from domish.Element

        @param element: XML element to parse
        @return: RSMResponseModel instance
        @raise ValueError: if the element is invalid
        """
        try:
            wokkel_resp = rsm.RSMResponse.fromElement(element)
        except rsm.RSMNotFoundError:
            raise ValueError("No RSM set element found")
        except rsm.RSMError as e:
            raise ValueError(str(e))
        return cls.from_wokkel_response(wokkel_resp)


class XEP_0059:
    def __init__(self, host: str) -> None:
        """Initialize the RSM plugin

        @param host: host instance
        """
        log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.")

    def get_handler(self, client) -> 'XEP_0059_handler':
        """Get the XMPP handler for this plugin

        @param client: client instance
        @return: XEP_0059_handler instance
        """
        return XEP_0059_handler()

    def parse_extra(self, extra: dict[str, str]) -> rsm.RSMRequest | None:
        """Parse extra dictionnary to retrieve RSM arguments

        @param extra: data to parse
        @return: request with parsed arguments or None if no RSM arguments found
        @raise ValueError: if rsm_max is negative
        """
        if int(extra.get(f"{RSM_PREFIX}max", 0)) < 0:
            raise ValueError(_("rsm_max can't be negative"))

        rsm_args = {}
        for arg in ("max", "after", "before", "index"):
            try:
                argname = "max_" if arg == "max" else arg
                rsm_args[argname] = extra.pop(f"{RSM_PREFIX}{arg}")
            except KeyError:
                continue

        return RSMRequest(**rsm_args).to_wokkel_request() if rsm_args else None

    def response2dict(
        self,
        rsm_response: rsm.RSMResponse,
        data: dict[str, str] | None = None
    ) -> dict[str, str]:
        """Return a dict with RSM response data

        Key set in data can be:
            - first: first item id in the page
            - last: last item id in the page
            - index: position of the first item in the full set
            - count: total number of items in the full set
        If a value doesn't exist, it's not set.
        All values are set as strings.

        @param rsm_response: response to parse
        @param data: dict to update with rsm data. If None, a new dict is created
        @return: data dict with RSM values
        """
        # FIXME: This method should not be used anymore, and removed once replace in
        #   XEP-0313 plugin.
        if data is None:
            data = {}
        model = RSMResponse.from_wokkel_response(rsm_response)

        if model.first is not None:
            data["first"] = model.first
        if model.last is not None:
            data["last"] = model.last
        if model.index is not None:
            data["index"] = str(model.index)
        if model.count is not None:
            data["count"] = str(model.count)

        return data

    def get_next_request(
        self,
        rsm_request: RSMRequest,
        rsm_response: RSMResponse,
        log_progress: bool = True,
    ) -> RSMRequest | None:
        """Generate the request for the next page of items.

        Page will be retrieved forward.
        @param rsm_request: last request used
        @param rsm_response: response from the last request
        @param log_progress: whether to log progress information
        @return: request to retrieve next page, or None if we are at the end
            or if pagination is not possible.
        """
        if rsm_request.max == 0:
            log.warning("Can't do pagination if max is 0")
            return None

        if rsm_response.count is not None and rsm_response.index is not None:
            next_index = rsm_response.index + rsm_request.max
            if next_index >= rsm_response.count:
                # We have reached the last page.
                return None

            if log_progress:
                log.debug(
                    f"Retrieving items {next_index} to "
                    f"{min(next_index + rsm_request.max, rsm_response.count)} on "
                    f"{rsm_response.count} ({next_index/rsm_response.count*100:.2f}%)"
                )

        if rsm_response.last is None:
            if rsm_response.count:
                log.warning('Can\'t do pagination, no "last" received.')
            return None

        return RSMRequest(
            max=rsm_request.max,
            after=rsm_response.last
        )


@implementer(iwokkel.IDisco)
class XEP_0059_handler(xmlstream.XMPPHandler):
    def getDiscoInfo(
        self,
        requestor: str,
        target: str,
        nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        """Get disco info for RSM

        @param requestor: JID of the requesting entity
        @param target: JID of the target entity
        @param nodeIdentifier: optional node identifier
        @return: list of disco features
        """
        return [disco.DiscoFeature(rsm.NS_RSM)]

    def getDiscoItems(
        self,
        requestor: str,
        target: str,
        nodeIdentifier: str = ""
    ) -> list:
        """Get disco items for RSM

        @param requestor: JID of the requesting entity
        @param target: JID of the target entity
        @param nodeIdentifier: optional node identifier
        @return: empty list (RSM doesn't have items)
        """
        return []