Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0059.py @ 4356:c9626f46b63e
plugin XEP-0059: Use Pydantic models for RSM.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Apr 2025 18:19:28 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0059.py Fri Apr 11 18:19:28 2025 +0200 +++ b/libervia/backend/plugins/plugin_xep_0059.py Fri Apr 11 18:19:28 2025 +0200 @@ -17,12 +17,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional +from typing import Self +from pydantic import BaseModel, Field, field_validator from zope.interface import implementer from twisted.words.protocols.jabber import xmlstream -from wokkel import disco -from wokkel import iwokkel -from wokkel import rsm +from twisted.words.xish import domish +from wokkel import disco, iwokkel, rsm from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger @@ -45,109 +45,276 @@ RSM_PREFIX = "rsm_" -class XEP_0059(object): - # XXX: RSM management is done directly in Wokkel. +class RSMRequest(BaseModel): + """Pydantic model for RSM request parameters""" + max: int = Field(default=10, gt=0) + after: str | None = None + before: str | None = None + index: int | None = None + + @field_validator('after') + def check_after_not_empty(cls, v: str | None) -> str | None: + """Validate that after value isn't empty string + + Note: Empty before is allowed (means "last page") but empty after is not + @param v: value to validate + @return: validated value + @raise ValueError: if value is an empty string + """ + if v == "": + raise ValueError("RSM \"after\" can't be empty") + return v + + def to_wokkel_request(self) -> rsm.RSMRequest: + """Convert to wokkel RSMRequest + + @return: wokkel RSMRequest instance + """ + return rsm.RSMRequest( + max_=self.max, + after=self.after, + before=self.before, + index=self.index + ) + + @classmethod + def from_wokkel_request(cls, request: rsm.RSMRequest) -> Self: + """Create from wokkel RSMRequest + + @param request: wokkel RSMRequest to convert + @return: RSMRequestModel instance + """ + return cls( + max=request.max, + after=request.after, + before=request.before, + index=request.index + ) + + def to_element(self) -> domish.Element: + """Convert to domish.Element + + @return: XML element representing the RSM request + """ + return self.to_wokkel_request().toElement() + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Create from domish.Element + + @param element: XML element to parse + @return: RSMRequestModel instance + @raise ValueError: if the element is invalid + """ + try: + wokkel_req = rsm.RSMRequest.fromElement(element) + except rsm.RSMNotFoundError: + raise ValueError("No RSM set element found") + except rsm.RSMError as e: + raise ValueError(str(e)) + return cls.from_wokkel_request(wokkel_req) + - def __init__(self, host): - log.info(_("Result Set Management plugin initialization")) +class RSMResponse(BaseModel): + """Pydantic model for RSM response parameters""" + first: str | None = None + last: str | None = None + index: int | None = None + count: int | None = None + + def to_wokkel_response(self) -> rsm.RSMResponse: + """Convert to wokkel RSMResponse + + @return: wokkel RSMResponse instance + """ + return rsm.RSMResponse( + first=self.first, + last=self.last, + index=self.index, + count=self.count + ) + + @classmethod + def from_wokkel_response(cls, response: rsm.RSMResponse) -> Self: + """Create from wokkel RSMResponse + + @param response: wokkel RSMResponse to convert + @return: RSMResponseModel instance + """ + return cls( + first=response.first, + last=response.last, + index=response.index, + count=response.count + ) + + def to_element(self) -> domish.Element: + """Convert to domish.Element - def get_handler(self, client): + @return: XML element representing the RSM response + """ + return self.to_wokkel_response().toElement() + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Create from domish.Element + + @param element: XML element to parse + @return: RSMResponseModel instance + @raise ValueError: if the element is invalid + """ + try: + wokkel_resp = rsm.RSMResponse.fromElement(element) + except rsm.RSMNotFoundError: + raise ValueError("No RSM set element found") + except rsm.RSMError as e: + raise ValueError(str(e)) + return cls.from_wokkel_response(wokkel_resp) + + +class XEP_0059: + def __init__(self, host: str) -> None: + """Initialize the RSM plugin + + @param host: host instance + """ + log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") + + def get_handler(self, client) -> 'XEP_0059_handler': + """Get the XMPP handler for this plugin + + @param client: client instance + @return: XEP_0059_handler instance + """ return XEP_0059_handler() - def parse_extra(self, extra): + def parse_extra(self, extra: dict[str, str]) -> rsm.RSMRequest | None: """Parse extra dictionnary to retrieve RSM arguments - @param extra(dict): data for parse - @return (rsm.RSMRequest, None): request with parsed arguments - or None if no RSM arguments have been found + @param extra: data to parse + @return: request with parsed arguments or None if no RSM arguments found + @raise ValueError: if rsm_max is negative """ - if int(extra.get(RSM_PREFIX + "max", 0)) < 0: + if int(extra.get(f"{RSM_PREFIX}max", 0)) < 0: raise ValueError(_("rsm_max can't be negative")) rsm_args = {} for arg in ("max", "after", "before", "index"): try: argname = "max_" if arg == "max" else arg - rsm_args[argname] = extra.pop(RSM_PREFIX + arg) + rsm_args[argname] = extra.pop(f"{RSM_PREFIX}{arg}") except KeyError: continue - if rsm_args: - return rsm.RSMRequest(**rsm_args) - else: - return None + return RSMRequest(**rsm_args).to_wokkel_request() if rsm_args else None - def response2dict(self, rsm_response, data=None): - """Return a dict with RSM response + def response2dict( + self, + rsm_response: rsm.RSMResponse, + data: dict[str, str] | None = None + ) -> dict[str, str]: + """Return a dict with RSM response data Key set in data can be: - - rsm_first: first item id in the page - - rsm_last: last item id in the page - - rsm_index: position of the first item in the full set (may be approximate) - - rsm_count: total number of items in the full set (may be approximage) - If a value doesn't exists, it's not set. + - first: first item id in the page + - last: last item id in the page + - index: position of the first item in the full set + - count: total number of items in the full set + If a value doesn't exist, it's not set. All values are set as strings. - @param rsm_response(rsm.RSMResponse): response to parse - @param data(dict, None): dict to update with rsm_* data. - If None, a new dict is created - @return (dict): data dict + + @param rsm_response: response to parse + @param data: dict to update with rsm data. If None, a new dict is created + @return: data dict with RSM values """ + # FIXME: This method should not be used anymore, and removed once replace in + # XEP-0313 plugin. if data is None: data = {} - if rsm_response.first is not None: - data["first"] = rsm_response.first - if rsm_response.last is not None: - data["last"] = rsm_response.last - if rsm_response.index is not None: - data["index"] = rsm_response.index + model = RSMResponse.from_wokkel_response(rsm_response) + + if model.first is not None: + data["first"] = model.first + if model.last is not None: + data["last"] = model.last + if model.index is not None: + data["index"] = str(model.index) + if model.count is not None: + data["count"] = str(model.count) + return data def get_next_request( self, - rsm_request: rsm.RSMRequest, - rsm_response: rsm.RSMResponse, + rsm_request: RSMRequest, + rsm_response: RSMResponse, log_progress: bool = True, - ) -> Optional[rsm.RSMRequest]: - """Generate next request to paginate through all items + ) -> RSMRequest | None: + """Generate the request for the next page of items. - Page will be retrieved forward + Page will be retrieved forward. @param rsm_request: last request used @param rsm_response: response from the last request - @return: request to retrive next page, or None if we are at the end - or if pagination is not possible + @param log_progress: whether to log progress information + @return: request to retrieve next page, or None if we are at the end + or if pagination is not possible. """ if rsm_request.max == 0: log.warning("Can't do pagination if max is 0") return None - if rsm_response is None: - # may happen if result set it empty, or we are at the end - return None + if rsm_response.count is not None and rsm_response.index is not None: next_index = rsm_response.index + rsm_request.max if next_index >= rsm_response.count: - # we have reached the last page + # We have reached the last page. return None if log_progress: log.debug( - f"retrieving items {next_index} to " - f"{min(next_index+rsm_request.max, rsm_response.count)} on " + f"Retrieving items {next_index} to " + f"{min(next_index + rsm_request.max, rsm_response.count)} on " f"{rsm_response.count} ({next_index/rsm_response.count*100:.2f}%)" ) if rsm_response.last is None: if rsm_response.count: - log.warning('Can\'t do pagination, no "last" received') + log.warning('Can\'t do pagination, no "last" received.') return None - return rsm.RSMRequest(max_=rsm_request.max, after=rsm_response.last) + return RSMRequest( + max=rsm_request.max, + after=rsm_response.last + ) @implementer(iwokkel.IDisco) class XEP_0059_handler(xmlstream.XMPPHandler): + def getDiscoInfo( + self, + requestor: str, + target: str, + nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: + """Get disco info for RSM - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + @param requestor: JID of the requesting entity + @param target: JID of the target entity + @param nodeIdentifier: optional node identifier + @return: list of disco features + """ return [disco.DiscoFeature(rsm.NS_RSM)] - def getDiscoItems(self, requestor, target, nodeIdentifier=""): + def getDiscoItems( + self, + requestor: str, + target: str, + nodeIdentifier: str = "" + ) -> list: + """Get disco items for RSM + + @param requestor: JID of the requesting entity + @param target: JID of the target entity + @param nodeIdentifier: optional node identifier + @return: empty list (RSM doesn't have items) + """ return []