changeset 4356:c9626f46b63e

plugin XEP-0059: Use Pydantic models for RSM.
author Goffi <goffi@goffi.org>
date Fri, 11 Apr 2025 18:19:28 +0200
parents 01ee3b902d33
children f43cbceba2a0
files libervia/backend/memory/disco.py libervia/backend/plugins/plugin_pubsub_cache.py libervia/backend/plugins/plugin_xep_0059.py libervia/backend/plugins/plugin_xep_0060.py
diffstat 4 files changed, 238 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/memory/disco.py	Fri Apr 11 18:19:28 2025 +0200
+++ b/libervia/backend/memory/disco.py	Fri Apr 11 18:19:28 2025 +0200
@@ -283,6 +283,7 @@
         @param use_cache(bool): if True, use cached data if available
         @return: a Deferred which fire disco.DiscoItems
         """
+        # FIXME: RSM is not managed here
         if jid_ is None:
             jid_ = client.server_jid
 
--- a/libervia/backend/plugins/plugin_pubsub_cache.py	Fri Apr 11 18:19:28 2025 +0200
+++ b/libervia/backend/plugins/plugin_pubsub_cache.py	Fri Apr 11 18:19:28 2025 +0200
@@ -18,7 +18,7 @@
 
 import time
 from datetime import datetime
-from typing import Optional, List, Tuple, Dict, Any
+from typing import Optional, List, Tuple, Dict, Any, cast
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
 from twisted.internet import defer
@@ -28,6 +28,7 @@
 from libervia.backend.core import exceptions
 from libervia.backend.core.log import getLogger
 from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.plugins.plugin_xep_0059 import XEP_0059, RSMRequest
 from libervia.backend.tools import xml_tools, utils
 from libervia.backend.tools.common import data_format
 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
@@ -285,8 +286,8 @@
                 )
                 await self.cache_items(client, pubsub_node, items)
             else:
-                rsm_p = self.host.plugins["XEP-0059"]
-                rsm_request = rsm.RSMRequest()
+                rsm_p = cast(XEP_0059, self.host.plugins["XEP-0059"])
+                rsm_request = RSMRequest()
                 cached_ids = set()
                 while True:
                     items, rsm_response = await client.pubsub_client.items(
@@ -314,9 +315,9 @@
                             )
                             rsm_request = None
                             break
-                    rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
                     if rsm_request is None:
                         break
+                    rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
 
             await self.host.memory.storage.update_pubsub_node_sync_state(
                 pubsub_node, SyncState.COMPLETED
@@ -656,7 +657,7 @@
                         f"({pubsub_node.sync_state_updated//60} minutes), "
                         "cancelling it and retrying."
                     )
-                    self.in_progress.pop[(service, node)].cancel()
+                    self.in_progress.pop((service, node)).cancel()
                     pubsub_node.sync_state = None
                     await self.host.memory.storage.delete_pubsub_items(pubsub_node)
                 else:
--- a/libervia/backend/plugins/plugin_xep_0059.py	Fri Apr 11 18:19:28 2025 +0200
+++ b/libervia/backend/plugins/plugin_xep_0059.py	Fri Apr 11 18:19:28 2025 +0200
@@ -17,12 +17,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional
+from typing import Self
+from pydantic import BaseModel, Field, field_validator
 from zope.interface import implementer
 from twisted.words.protocols.jabber import xmlstream
-from wokkel import disco
-from wokkel import iwokkel
-from wokkel import rsm
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel, rsm
 from libervia.backend.core.i18n import _
 from libervia.backend.core.constants import Const as C
 from libervia.backend.core.log import getLogger
@@ -45,109 +45,276 @@
 RSM_PREFIX = "rsm_"
 
 
-class XEP_0059(object):
-    # XXX: RSM management is done directly in Wokkel.
+class RSMRequest(BaseModel):
+    """Pydantic model for RSM request parameters"""
+    max: int = Field(default=10, gt=0)
+    after: str | None = None
+    before: str | None = None
+    index: int | None = None
+
+    @field_validator('after')
+    def check_after_not_empty(cls, v: str | None) -> str | None:
+        """Validate that after value isn't empty string
+
+        Note: Empty before is allowed (means "last page") but empty after is not
+        @param v: value to validate
+        @return: validated value
+        @raise ValueError: if value is an empty string
+        """
+        if v == "":
+            raise ValueError("RSM \"after\" can't be empty")
+        return v
+
+    def to_wokkel_request(self) -> rsm.RSMRequest:
+        """Convert to wokkel RSMRequest
+
+        @return: wokkel RSMRequest instance
+        """
+        return rsm.RSMRequest(
+            max_=self.max,
+            after=self.after,
+            before=self.before,
+            index=self.index
+        )
+
+    @classmethod
+    def from_wokkel_request(cls, request: rsm.RSMRequest) -> Self:
+        """Create from wokkel RSMRequest
+
+        @param request: wokkel RSMRequest to convert
+        @return: RSMRequestModel instance
+        """
+        return cls(
+            max=request.max,
+            after=request.after,
+            before=request.before,
+            index=request.index
+        )
+
+    def to_element(self) -> domish.Element:
+        """Convert to domish.Element
+
+        @return: XML element representing the RSM request
+        """
+        return self.to_wokkel_request().toElement()
+
+    @classmethod
+    def from_element(cls, element: domish.Element) -> Self:
+        """Create from domish.Element
+
+        @param element: XML element to parse
+        @return: RSMRequestModel instance
+        @raise ValueError: if the element is invalid
+        """
+        try:
+            wokkel_req = rsm.RSMRequest.fromElement(element)
+        except rsm.RSMNotFoundError:
+            raise ValueError("No RSM set element found")
+        except rsm.RSMError as e:
+            raise ValueError(str(e))
+        return cls.from_wokkel_request(wokkel_req)
+
 
-    def __init__(self, host):
-        log.info(_("Result Set Management plugin initialization"))
+class RSMResponse(BaseModel):
+    """Pydantic model for RSM response parameters"""
+    first: str | None = None
+    last: str | None = None
+    index: int | None = None
+    count: int | None = None
+
+    def to_wokkel_response(self) -> rsm.RSMResponse:
+        """Convert to wokkel RSMResponse
+
+        @return: wokkel RSMResponse instance
+        """
+        return rsm.RSMResponse(
+            first=self.first,
+            last=self.last,
+            index=self.index,
+            count=self.count
+        )
+
+    @classmethod
+    def from_wokkel_response(cls, response: rsm.RSMResponse) -> Self:
+        """Create from wokkel RSMResponse
+
+        @param response: wokkel RSMResponse to convert
+        @return: RSMResponseModel instance
+        """
+        return cls(
+            first=response.first,
+            last=response.last,
+            index=response.index,
+            count=response.count
+        )
+
+    def to_element(self) -> domish.Element:
+        """Convert to domish.Element
 
-    def get_handler(self, client):
+        @return: XML element representing the RSM response
+        """
+        return self.to_wokkel_response().toElement()
+
+    @classmethod
+    def from_element(cls, element: domish.Element) -> Self:
+        """Create from domish.Element
+
+        @param element: XML element to parse
+        @return: RSMResponseModel instance
+        @raise ValueError: if the element is invalid
+        """
+        try:
+            wokkel_resp = rsm.RSMResponse.fromElement(element)
+        except rsm.RSMNotFoundError:
+            raise ValueError("No RSM set element found")
+        except rsm.RSMError as e:
+            raise ValueError(str(e))
+        return cls.from_wokkel_response(wokkel_resp)
+
+
+class XEP_0059:
+    def __init__(self, host: str) -> None:
+        """Initialize the RSM plugin
+
+        @param host: host instance
+        """
+        log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.")
+
+    def get_handler(self, client) -> 'XEP_0059_handler':
+        """Get the XMPP handler for this plugin
+
+        @param client: client instance
+        @return: XEP_0059_handler instance
+        """
         return XEP_0059_handler()
 
-    def parse_extra(self, extra):
+    def parse_extra(self, extra: dict[str, str]) -> rsm.RSMRequest | None:
         """Parse extra dictionnary to retrieve RSM arguments
 
-        @param extra(dict): data for parse
-        @return (rsm.RSMRequest, None): request with parsed arguments
-            or None if no RSM arguments have been found
+        @param extra: data to parse
+        @return: request with parsed arguments or None if no RSM arguments found
+        @raise ValueError: if rsm_max is negative
         """
-        if int(extra.get(RSM_PREFIX + "max", 0)) < 0:
+        if int(extra.get(f"{RSM_PREFIX}max", 0)) < 0:
             raise ValueError(_("rsm_max can't be negative"))
 
         rsm_args = {}
         for arg in ("max", "after", "before", "index"):
             try:
                 argname = "max_" if arg == "max" else arg
-                rsm_args[argname] = extra.pop(RSM_PREFIX + arg)
+                rsm_args[argname] = extra.pop(f"{RSM_PREFIX}{arg}")
             except KeyError:
                 continue
 
-        if rsm_args:
-            return rsm.RSMRequest(**rsm_args)
-        else:
-            return None
+        return RSMRequest(**rsm_args).to_wokkel_request() if rsm_args else None
 
-    def response2dict(self, rsm_response, data=None):
-        """Return a dict with RSM response
+    def response2dict(
+        self,
+        rsm_response: rsm.RSMResponse,
+        data: dict[str, str] | None = None
+    ) -> dict[str, str]:
+        """Return a dict with RSM response data
 
         Key set in data can be:
-            - rsm_first: first item id in the page
-            - rsm_last: last item id in the page
-            - rsm_index: position of the first item in the full set (may be approximate)
-            - rsm_count: total number of items in the full set (may be approximage)
-        If a value doesn't exists, it's not set.
+            - first: first item id in the page
+            - last: last item id in the page
+            - index: position of the first item in the full set
+            - count: total number of items in the full set
+        If a value doesn't exist, it's not set.
         All values are set as strings.
-        @param rsm_response(rsm.RSMResponse): response to parse
-        @param data(dict, None): dict to update with rsm_* data.
-            If None, a new dict is created
-        @return (dict): data dict
+
+        @param rsm_response: response to parse
+        @param data: dict to update with rsm data. If None, a new dict is created
+        @return: data dict with RSM values
         """
+        # FIXME: This method should not be used anymore, and removed once replace in
+        #   XEP-0313 plugin.
         if data is None:
             data = {}
-        if rsm_response.first is not None:
-            data["first"] = rsm_response.first
-        if rsm_response.last is not None:
-            data["last"] = rsm_response.last
-        if rsm_response.index is not None:
-            data["index"] = rsm_response.index
+        model = RSMResponse.from_wokkel_response(rsm_response)
+
+        if model.first is not None:
+            data["first"] = model.first
+        if model.last is not None:
+            data["last"] = model.last
+        if model.index is not None:
+            data["index"] = str(model.index)
+        if model.count is not None:
+            data["count"] = str(model.count)
+
         return data
 
     def get_next_request(
         self,
-        rsm_request: rsm.RSMRequest,
-        rsm_response: rsm.RSMResponse,
+        rsm_request: RSMRequest,
+        rsm_response: RSMResponse,
         log_progress: bool = True,
-    ) -> Optional[rsm.RSMRequest]:
-        """Generate next request to paginate through all items
+    ) -> RSMRequest | None:
+        """Generate the request for the next page of items.
 
-        Page will be retrieved forward
+        Page will be retrieved forward.
         @param rsm_request: last request used
         @param rsm_response: response from the last request
-        @return: request to retrive next page, or None if we are at the end
-            or if pagination is not possible
+        @param log_progress: whether to log progress information
+        @return: request to retrieve next page, or None if we are at the end
+            or if pagination is not possible.
         """
         if rsm_request.max == 0:
             log.warning("Can't do pagination if max is 0")
             return None
-        if rsm_response is None:
-            # may happen if result set it empty, or we are at the end
-            return None
+
         if rsm_response.count is not None and rsm_response.index is not None:
             next_index = rsm_response.index + rsm_request.max
             if next_index >= rsm_response.count:
-                # we have reached the last page
+                # We have reached the last page.
                 return None
 
             if log_progress:
                 log.debug(
-                    f"retrieving items {next_index} to "
-                    f"{min(next_index+rsm_request.max, rsm_response.count)} on "
+                    f"Retrieving items {next_index} to "
+                    f"{min(next_index + rsm_request.max, rsm_response.count)} on "
                     f"{rsm_response.count} ({next_index/rsm_response.count*100:.2f}%)"
                 )
 
         if rsm_response.last is None:
             if rsm_response.count:
-                log.warning('Can\'t do pagination, no "last" received')
+                log.warning('Can\'t do pagination, no "last" received.')
             return None
 
-        return rsm.RSMRequest(max_=rsm_request.max, after=rsm_response.last)
+        return RSMRequest(
+            max=rsm_request.max,
+            after=rsm_response.last
+        )
 
 
 @implementer(iwokkel.IDisco)
 class XEP_0059_handler(xmlstream.XMPPHandler):
+    def getDiscoInfo(
+        self,
+        requestor: str,
+        target: str,
+        nodeIdentifier: str = ""
+    ) -> list[disco.DiscoFeature]:
+        """Get disco info for RSM
 
-    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        @param requestor: JID of the requesting entity
+        @param target: JID of the target entity
+        @param nodeIdentifier: optional node identifier
+        @return: list of disco features
+        """
         return [disco.DiscoFeature(rsm.NS_RSM)]
 
-    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+    def getDiscoItems(
+        self,
+        requestor: str,
+        target: str,
+        nodeIdentifier: str = ""
+    ) -> list:
+        """Get disco items for RSM
+
+        @param requestor: JID of the requesting entity
+        @param target: JID of the target entity
+        @param nodeIdentifier: optional node identifier
+        @return: empty list (RSM doesn't have items)
+        """
         return []
--- a/libervia/backend/plugins/plugin_xep_0060.py	Fri Apr 11 18:19:28 2025 +0200
+++ b/libervia/backend/plugins/plugin_xep_0060.py	Fri Apr 11 18:19:28 2025 +0200
@@ -40,6 +40,7 @@
 from libervia.backend.core.i18n import _
 from libervia.backend.core.log import getLogger
 from libervia.backend.core.xmpp import SatXMPPClient
+from libervia.backend.plugins.plugin_xep_0059 import RSMRequest
 from libervia.backend.tools import utils
 from libervia.backend.tools import sat_defer
 from libervia.backend.tools import xml_tools
@@ -812,7 +813,7 @@
         max_items: int | None = None,
         item_ids: list[str] | None = None,
         sub_id: str | None = None,
-        rsm_request: rsm.RSMRequest | None = None,
+        rsm_request: rsm.RSMRequest | RSMRequest | None = None,
         extra: dict | None = None,
     ) -> tuple[list[domish.Element], dict]:
         """Retrieve pubsub items from a node.
@@ -833,6 +834,15 @@
                     value of RSMResponse
                 - service, node: service and node used
         """
+        if rsm_request is not None:
+            # For the moment we convert RSMRequest to wokkel rsm.RSMRequest for backward
+            # compatibility with other plugins. In the future we should work as much as
+            # possible with XEP-0059's plugin RSMRequest, and convert to Wokkel
+            # rsm.RSMRequest only when we need to work with wokkel directly.
+            if isinstance(rsm_request, rsm.RSMRequest):
+                log.warning("Use for rsm.RSMRequest is deprecated")
+            else:
+                rsm_request = rsm_request.to_wokkel_request()
         if item_ids and max_items is not None:
             max_items = None
         if rsm_request and item_ids: