Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0059.py @ 4370:0eaa50f21efb
plugin XEP-0461: Message Replies implementation:
Implement message replies. Thread ID are always added when a reply is initiated from
Libervia, so a thread can continue the reply.
rel 457
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 May 2025 00:34:01 +0200 |
parents | c9626f46b63e |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Result Set Management (XEP-0059) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Self from pydantic import BaseModel, Field, field_validator from zope.interface import implementer from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel, rsm from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Result Set Management", C.PI_IMPORT_NAME: "XEP-0059", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0059"], C.PI_MAIN: "XEP_0059", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Result Set Management"""), } RSM_PREFIX = "rsm_" class RSMRequest(BaseModel): """Pydantic model for RSM request parameters""" max: int = Field(default=10, gt=0) after: str | None = None before: str | None = None index: int | None = None @field_validator('after') def check_after_not_empty(cls, v: str | None) -> str | None: """Validate that after value isn't empty string Note: Empty before is allowed (means "last page") but empty after is not @param v: value to validate @return: validated value @raise ValueError: if value is an empty string """ if v == "": raise ValueError("RSM \"after\" can't be empty") return v def to_wokkel_request(self) -> rsm.RSMRequest: """Convert to wokkel RSMRequest @return: wokkel RSMRequest instance """ return rsm.RSMRequest( max_=self.max, after=self.after, before=self.before, index=self.index ) @classmethod def from_wokkel_request(cls, request: rsm.RSMRequest) -> Self: """Create from wokkel RSMRequest @param request: wokkel RSMRequest to convert @return: RSMRequestModel instance """ return cls( max=request.max, after=request.after, before=request.before, index=request.index ) def to_element(self) -> domish.Element: """Convert to domish.Element @return: XML element representing the RSM request """ return self.to_wokkel_request().toElement() @classmethod def from_element(cls, element: domish.Element) -> Self: """Create from domish.Element @param element: XML element to parse @return: RSMRequestModel instance @raise ValueError: if the element is invalid """ try: wokkel_req = rsm.RSMRequest.fromElement(element) except rsm.RSMNotFoundError: raise ValueError("No RSM set element found") except rsm.RSMError as e: raise ValueError(str(e)) return cls.from_wokkel_request(wokkel_req) class RSMResponse(BaseModel): """Pydantic model for RSM response parameters""" first: str | None = None last: str | None = None index: int | None = None count: int | None = None def to_wokkel_response(self) -> rsm.RSMResponse: """Convert to wokkel RSMResponse @return: wokkel RSMResponse instance """ return rsm.RSMResponse( first=self.first, last=self.last, index=self.index, count=self.count ) @classmethod def from_wokkel_response(cls, response: rsm.RSMResponse) -> Self: """Create from wokkel RSMResponse @param response: wokkel RSMResponse to convert @return: RSMResponseModel instance """ return cls( first=response.first, last=response.last, index=response.index, count=response.count ) def to_element(self) -> domish.Element: """Convert to domish.Element @return: XML element representing the RSM response """ return self.to_wokkel_response().toElement() @classmethod def from_element(cls, element: domish.Element) -> Self: """Create from domish.Element @param element: XML element to parse @return: RSMResponseModel instance @raise ValueError: if the element is invalid """ try: wokkel_resp = rsm.RSMResponse.fromElement(element) except rsm.RSMNotFoundError: raise ValueError("No RSM set element found") except rsm.RSMError as e: raise ValueError(str(e)) return cls.from_wokkel_response(wokkel_resp) class XEP_0059: def __init__(self, host: str) -> None: """Initialize the RSM plugin @param host: host instance """ log.info(f"Plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization.") def get_handler(self, client) -> 'XEP_0059_handler': """Get the XMPP handler for this plugin @param client: client instance @return: XEP_0059_handler instance """ return XEP_0059_handler() def parse_extra(self, extra: dict[str, str]) -> rsm.RSMRequest | None: """Parse extra dictionnary to retrieve RSM arguments @param extra: data to parse @return: request with parsed arguments or None if no RSM arguments found @raise ValueError: if rsm_max is negative """ if int(extra.get(f"{RSM_PREFIX}max", 0)) < 0: raise ValueError(_("rsm_max can't be negative")) rsm_args = {} for arg in ("max", "after", "before", "index"): try: argname = "max_" if arg == "max" else arg rsm_args[argname] = extra.pop(f"{RSM_PREFIX}{arg}") except KeyError: continue return RSMRequest(**rsm_args).to_wokkel_request() if rsm_args else None def response2dict( self, rsm_response: rsm.RSMResponse, data: dict[str, str] | None = None ) -> dict[str, str]: """Return a dict with RSM response data Key set in data can be: - first: first item id in the page - last: last item id in the page - index: position of the first item in the full set - count: total number of items in the full set If a value doesn't exist, it's not set. All values are set as strings. @param rsm_response: response to parse @param data: dict to update with rsm data. If None, a new dict is created @return: data dict with RSM values """ # FIXME: This method should not be used anymore, and removed once replace in # XEP-0313 plugin. if data is None: data = {} model = RSMResponse.from_wokkel_response(rsm_response) if model.first is not None: data["first"] = model.first if model.last is not None: data["last"] = model.last if model.index is not None: data["index"] = str(model.index) if model.count is not None: data["count"] = str(model.count) return data def get_next_request( self, rsm_request: RSMRequest, rsm_response: RSMResponse, log_progress: bool = True, ) -> RSMRequest | None: """Generate the request for the next page of items. Page will be retrieved forward. @param rsm_request: last request used @param rsm_response: response from the last request @param log_progress: whether to log progress information @return: request to retrieve next page, or None if we are at the end or if pagination is not possible. """ if rsm_request.max == 0: log.warning("Can't do pagination if max is 0") return None if rsm_response.count is not None and rsm_response.index is not None: next_index = rsm_response.index + rsm_request.max if next_index >= rsm_response.count: # We have reached the last page. return None if log_progress: log.debug( f"Retrieving items {next_index} to " f"{min(next_index + rsm_request.max, rsm_response.count)} on " f"{rsm_response.count} ({next_index/rsm_response.count*100:.2f}%)" ) if rsm_response.last is None: if rsm_response.count: log.warning('Can\'t do pagination, no "last" received.') return None return RSMRequest( max=rsm_request.max, after=rsm_response.last ) @implementer(iwokkel.IDisco) class XEP_0059_handler(xmlstream.XMPPHandler): def getDiscoInfo( self, requestor: str, target: str, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: """Get disco info for RSM @param requestor: JID of the requesting entity @param target: JID of the target entity @param nodeIdentifier: optional node identifier @return: list of disco features """ return [disco.DiscoFeature(rsm.NS_RSM)] def getDiscoItems( self, requestor: str, target: str, nodeIdentifier: str = "" ) -> list: """Get disco items for RSM @param requestor: JID of the requesting entity @param target: JID of the target entity @param nodeIdentifier: optional node identifier @return: empty list (RSM doesn't have items) """ return []