Mercurial > libervia-backend
changeset 4356:c9626f46b63e
plugin XEP-0059: Use Pydantic models for RSM.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Apr 2025 18:19:28 +0200 |
parents | 01ee3b902d33 |
children | f43cbceba2a0 |
files | libervia/backend/memory/disco.py libervia/backend/plugins/plugin_pubsub_cache.py libervia/backend/plugins/plugin_xep_0059.py libervia/backend/plugins/plugin_xep_0060.py |
diffstat | 4 files changed, 238 insertions(+), 59 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/memory/disco.py Fri Apr 11 18:19:28 2025 +0200 +++ b/libervia/backend/memory/disco.py Fri Apr 11 18:19:28 2025 +0200 @@ -283,6 +283,7 @@ @param use_cache(bool): if True, use cached data if available @return: a Deferred which fire disco.DiscoItems """ + # FIXME: RSM is not managed here if jid_ is None: jid_ = client.server_jid
--- a/libervia/backend/plugins/plugin_pubsub_cache.py Fri Apr 11 18:19:28 2025 +0200 +++ b/libervia/backend/plugins/plugin_pubsub_cache.py Fri Apr 11 18:19:28 2025 +0200 @@ -18,7 +18,7 @@ import time from datetime import datetime -from typing import Optional, List, Tuple, Dict, Any +from typing import Optional, List, Tuple, Dict, Any, cast from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from twisted.internet import defer @@ -28,6 +28,7 @@ from libervia.backend.core import exceptions from libervia.backend.core.log import getLogger from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.plugins.plugin_xep_0059 import XEP_0059, RSMRequest from libervia.backend.tools import xml_tools, utils from libervia.backend.tools.common import data_format from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError @@ -285,8 +286,8 @@ ) await self.cache_items(client, pubsub_node, items) else: - rsm_p = self.host.plugins["XEP-0059"] - rsm_request = rsm.RSMRequest() + rsm_p = cast(XEP_0059, self.host.plugins["XEP-0059"]) + rsm_request = RSMRequest() cached_ids = set() while True: items, rsm_response = await client.pubsub_client.items( @@ -314,9 +315,9 @@ ) rsm_request = None break - rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) if rsm_request is None: break + rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) await self.host.memory.storage.update_pubsub_node_sync_state( pubsub_node, SyncState.COMPLETED @@ -656,7 +657,7 @@ f"({pubsub_node.sync_state_updated//60} minutes), " "cancelling it and retrying." ) - self.in_progress.pop[(service, node)].cancel() + self.in_progress.pop((service, node)).cancel() pubsub_node.sync_state = None await self.host.memory.storage.delete_pubsub_items(pubsub_node) else:
--- a/libervia/backend/plugins/plugin_xep_0059.py Fri Apr 11 18:19:28 2025 +0200 +++ b/libervia/backend/plugins/plugin_xep_0059.py Fri Apr 11 18:19:28 2025 +0200 @@ -17,12 +17,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional +from typing import Self +from pydantic import BaseModel, Field, field_validator from zope.interface import implementer from twisted.words.protocols.jabber import xmlstream -from wokkel import disco -from wokkel import iwokkel -from wokkel import rsm +from twisted.words.xish import domish +from wokkel import disco, iwokkel, rsm from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger @@ -45,109 +45,276 @@ RSM_PREFIX = "rsm_" -class XEP_0059(object): - # XXX: RSM management is done directly in Wokkel. +class RSMRequest(BaseModel): + """Pydantic model for RSM request parameters""" + max: int = Field(default=10, gt=0) + after: str | None = None + before: str | None = None + index: int | None = None + + @field_validator('after') + def check_after_not_empty(cls, v: str | None) -> str | None: + """Validate that after value isn't empty string + + Note: Empty before is allowed (means "last page") but empty after is not + @param v: value to validate + @return: validated value + @raise ValueError: if value is an empty string + """ + if v == "": + raise ValueError("RSM \"after\" can't be empty") + return v + + def to_wokkel_request(self) -> rsm.RSMRequest: + """Convert to wokkel RSMRequest + + @return: wokkel RSMRequest instance + """ + return rsm.RSMRequest( + max_=self.max, + after=self.after, + before=self.before, + index=self.index + ) + + @classmethod + def from_wokkel_request(cls, request: rsm.RSMRequest) -> Self: + """Create from wokkel RSMRequest + + @param request: wokkel RSMRequest to convert + @return: RSMRequestModel instance + """ + return cls( + max=request.max, + after=request.after, + before=request.before, + index=request.index + ) + + def to_element(self) -> domish.Element: + """Convert to domish.Element + + @return: XML element representing the RSM request + """ + return self.to_wokkel_request().toElement() + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Create from domish.Element + + @param element: XML element to parse + @return: RSMRequestModel instance + @raise ValueError: if the element is invalid + """ + try: + wokkel_req = rsm.RSMRequest.fromElement(element) + except rsm.RSMNotFoundError: + raise ValueError("No RSM set element found") + except rsm.RSMError as e: + raise ValueError(str(e)) + return cls.from_wokkel_request(wokkel_req) + - def __init__(self, host): - log.info(_("Result Set Management plugin initialization")) +class RSMResponse(BaseModel): + """Pydantic model for RSM response parameters""" + first: str | None = None + last: str | None = None + index: int | None = None + count: int | None = None + + def to_wokkel_response(self) -> rsm.RSMResponse: + """Convert to wokkel RSMResponse + + @return: wokkel RSMResponse instance + """ + return rsm.RSMResponse( + first=self.first, + last=self.last, + index=self.index, + count=self.count + ) + + @classmethod + def from_wokkel_response(cls, response: rsm.RSMResponse) -> Self: + """Create from wokkel RSMResponse + + @param response: wokkel RSMResponse to convert + @return: RSMResponseModel instance + """ + return cls( + first=response.first, + last=response.last, + index=response.index, + count=response.count + ) + + def to_element(self) -> domish.Element: + """Convert to domish.Element - def get_handler(self, client): + @return: XML element representing the RSM response + """ + return self.to_wokkel_response().toElement() + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Create from domish.Element + + @param element: XML element to parse + @return: RSMResponseModel instance + @raise ValueError: if the element is invalid + """ + try: + wokkel_resp = rsm.RSMResponse.fromElement(element) + except rsm.RSMNotFoundError: + raise ValueError("No RSM set element found") + except rsm.RSMError as e: + raise ValueError(str(e)) + return cls.from_wokkel_response(wokkel_resp) + + +class XEP_0059: + def __init__(self, host: str) -> None: + """Initialize the RSM plugin + + @param host: host instance + """ + log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") + + def get_handler(self, client) -> 'XEP_0059_handler': + """Get the XMPP handler for this plugin + + @param client: client instance + @return: XEP_0059_handler instance + """ return XEP_0059_handler() - def parse_extra(self, extra): + def parse_extra(self, extra: dict[str, str]) -> rsm.RSMRequest | None: """Parse extra dictionnary to retrieve RSM arguments - @param extra(dict): data for parse - @return (rsm.RSMRequest, None): request with parsed arguments - or None if no RSM arguments have been found + @param extra: data to parse + @return: request with parsed arguments or None if no RSM arguments found + @raise ValueError: if rsm_max is negative """ - if int(extra.get(RSM_PREFIX + "max", 0)) < 0: + if int(extra.get(f"{RSM_PREFIX}max", 0)) < 0: raise ValueError(_("rsm_max can't be negative")) rsm_args = {} for arg in ("max", "after", "before", "index"): try: argname = "max_" if arg == "max" else arg - rsm_args[argname] = extra.pop(RSM_PREFIX + arg) + rsm_args[argname] = extra.pop(f"{RSM_PREFIX}{arg}") except KeyError: continue - if rsm_args: - return rsm.RSMRequest(**rsm_args) - else: - return None + return RSMRequest(**rsm_args).to_wokkel_request() if rsm_args else None - def response2dict(self, rsm_response, data=None): - """Return a dict with RSM response + def response2dict( + self, + rsm_response: rsm.RSMResponse, + data: dict[str, str] | None = None + ) -> dict[str, str]: + """Return a dict with RSM response data Key set in data can be: - - rsm_first: first item id in the page - - rsm_last: last item id in the page - - rsm_index: position of the first item in the full set (may be approximate) - - rsm_count: total number of items in the full set (may be approximage) - If a value doesn't exists, it's not set. + - first: first item id in the page + - last: last item id in the page + - index: position of the first item in the full set + - count: total number of items in the full set + If a value doesn't exist, it's not set. All values are set as strings. - @param rsm_response(rsm.RSMResponse): response to parse - @param data(dict, None): dict to update with rsm_* data. - If None, a new dict is created - @return (dict): data dict + + @param rsm_response: response to parse + @param data: dict to update with rsm data. If None, a new dict is created + @return: data dict with RSM values """ + # FIXME: This method should not be used anymore, and removed once replace in + # XEP-0313 plugin. if data is None: data = {} - if rsm_response.first is not None: - data["first"] = rsm_response.first - if rsm_response.last is not None: - data["last"] = rsm_response.last - if rsm_response.index is not None: - data["index"] = rsm_response.index + model = RSMResponse.from_wokkel_response(rsm_response) + + if model.first is not None: + data["first"] = model.first + if model.last is not None: + data["last"] = model.last + if model.index is not None: + data["index"] = str(model.index) + if model.count is not None: + data["count"] = str(model.count) + return data def get_next_request( self, - rsm_request: rsm.RSMRequest, - rsm_response: rsm.RSMResponse, + rsm_request: RSMRequest, + rsm_response: RSMResponse, log_progress: bool = True, - ) -> Optional[rsm.RSMRequest]: - """Generate next request to paginate through all items + ) -> RSMRequest | None: + """Generate the request for the next page of items. - Page will be retrieved forward + Page will be retrieved forward. @param rsm_request: last request used @param rsm_response: response from the last request - @return: request to retrive next page, or None if we are at the end - or if pagination is not possible + @param log_progress: whether to log progress information + @return: request to retrieve next page, or None if we are at the end + or if pagination is not possible. """ if rsm_request.max == 0: log.warning("Can't do pagination if max is 0") return None - if rsm_response is None: - # may happen if result set it empty, or we are at the end - return None + if rsm_response.count is not None and rsm_response.index is not None: next_index = rsm_response.index + rsm_request.max if next_index >= rsm_response.count: - # we have reached the last page + # We have reached the last page. return None if log_progress: log.debug( - f"retrieving items {next_index} to " - f"{min(next_index+rsm_request.max, rsm_response.count)} on " + f"Retrieving items {next_index} to " + f"{min(next_index + rsm_request.max, rsm_response.count)} on " f"{rsm_response.count} ({next_index/rsm_response.count*100:.2f}%)" ) if rsm_response.last is None: if rsm_response.count: - log.warning('Can\'t do pagination, no "last" received') + log.warning('Can\'t do pagination, no "last" received.') return None - return rsm.RSMRequest(max_=rsm_request.max, after=rsm_response.last) + return RSMRequest( + max=rsm_request.max, + after=rsm_response.last + ) @implementer(iwokkel.IDisco) class XEP_0059_handler(xmlstream.XMPPHandler): + def getDiscoInfo( + self, + requestor: str, + target: str, + nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: + """Get disco info for RSM - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + @param requestor: JID of the requesting entity + @param target: JID of the target entity + @param nodeIdentifier: optional node identifier + @return: list of disco features + """ return [disco.DiscoFeature(rsm.NS_RSM)] - def getDiscoItems(self, requestor, target, nodeIdentifier=""): + def getDiscoItems( + self, + requestor: str, + target: str, + nodeIdentifier: str = "" + ) -> list: + """Get disco items for RSM + + @param requestor: JID of the requesting entity + @param target: JID of the target entity + @param nodeIdentifier: optional node identifier + @return: empty list (RSM doesn't have items) + """ return []
--- a/libervia/backend/plugins/plugin_xep_0060.py Fri Apr 11 18:19:28 2025 +0200 +++ b/libervia/backend/plugins/plugin_xep_0060.py Fri Apr 11 18:19:28 2025 +0200 @@ -40,6 +40,7 @@ from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.xmpp import SatXMPPClient +from libervia.backend.plugins.plugin_xep_0059 import RSMRequest from libervia.backend.tools import utils from libervia.backend.tools import sat_defer from libervia.backend.tools import xml_tools @@ -812,7 +813,7 @@ max_items: int | None = None, item_ids: list[str] | None = None, sub_id: str | None = None, - rsm_request: rsm.RSMRequest | None = None, + rsm_request: rsm.RSMRequest | RSMRequest | None = None, extra: dict | None = None, ) -> tuple[list[domish.Element], dict]: """Retrieve pubsub items from a node. @@ -833,6 +834,15 @@ value of RSMResponse - service, node: service and node used """ + if rsm_request is not None: + # For the moment we convert RSMRequest to wokkel rsm.RSMRequest for backward + # compatibility with other plugins. In the future we should work as much as + # possible with XEP-0059's plugin RSMRequest, and convert to Wokkel + # rsm.RSMRequest only when we need to work with wokkel directly. + if isinstance(rsm_request, rsm.RSMRequest): + log.warning("Use for rsm.RSMRequest is deprecated") + else: + rsm_request = rsm_request.to_wokkel_request() if item_ids and max_items is not None: max_items = None if rsm_request and item_ids: