Mercurial > libervia-pubsub
view sat_pubsub/bookmark_compat.py @ 494:468b7cd6c344 default tip
bookmark compat: handle mapped errors:
This convert error on request (e.g. missing node) to appropriate stanza error.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 Dec 2024 12:23:47 +0100 |
parents | 4e8e8788bc86 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # # Copyright (c) 2015-2024 Jérôme Poisson # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. "This module implements a compatibility layer for XEP-0048." from twisted.internet import defer from twisted.python import log from twisted.words.protocols.jabber import error as jabber_error, jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel, pubsub from wokkel.iwokkel import IPubSubService from zope.interface import implementer from sat_pubsub import backend from sat_pubsub import const from . import error NS_IQ_PRIVATE = "jabber:iq:private" NS_STORAGE_BOOKMARKS = "storage:bookmarks" NS_BOOKMARKS2 = "urn:xmpp:bookmarks:1" NS_BOOKMARKS_COMPAT = f"{NS_BOOKMARKS2}#compat" IQ_PRIVATE_GET = f'/iq[@type="get"]/query[@xmlns="{NS_IQ_PRIVATE}"]' IQ_PRIVATE_SET = f'/iq[@type="set"]/query[@xmlns="{NS_IQ_PRIVATE}"]' @implementer(iwokkel.IDisco) class BookmarkCompatHandler(disco.DiscoClientProtocol): error_map = backend.PubSubResourceFromBackend.error_map def __init__(self, service_jid): super().__init__() self.backend = None def connectionInitialized(self): for handler in self.parent.handlers: if IPubSubService.providedBy(handler): self._pubsub_service = handler break self.backend = self.parent.parent.getServiceNamed("backend") self.xmlstream.addObserver(IQ_PRIVATE_GET, self._on_get) self.xmlstream.addObserver(IQ_PRIVATE_SET, self._on_set) def handle_mapped_error(self, e: error.Error, iq_elt: domish.Element) -> None: condition, pubsub_condition, feature = self.error_map[e.__class__] # type: ignore msg = e.msg if pubsub_condition: exc = pubsub.PubSubError(condition, pubsub_condition, feature, msg) else: exc = jabber_error.StanzaError(condition, text=msg) error_elt = exc.toResponse(iq_elt) self.xmlstream.send(error_elt) return def _on_get(self, iq_elt: domish.Element) -> None: """Handle incoming legacy IQ GET requests for bookmarks. This method processes legacy XEP-0048 IQ GET requests. It does some checks and then proceeds to return the bookmarks. @param iq_elt: The incoming IQ element. """ if not iq_elt.delegated: return assert self.xmlstream is not None iq_elt.handled = True from_jid = jid.JID(iq_elt["from"]) to_jid = jid.JID(iq_elt["to"]) if from_jid.userhostJID() != to_jid: msg = ( f"{from_jid.userhost()} is not allowed to access private storage of " f"{to_jid.userhost()}!" ) log.msg(f"Hack attempt? {msg}") error_elt = jabber_error.StanzaError("forbidden", text=msg).toResponse(iq_elt) self.xmlstream.send(error_elt) return query_elt = iq_elt.query assert query_elt is not None storage_elt = query_elt.firstChildElement() if ( storage_elt is None or storage_elt.name != "storage" or storage_elt.uri != NS_STORAGE_BOOKMARKS ): error_elt = jabber_error.StanzaError( "not-allowed", text=( f'"{NS_STORAGE_BOOKMARKS}" is the only private XML storage allowed ' "on this server" ), ).toResponse(iq_elt) self.xmlstream.send(error_elt) return defer.ensureDeferred(self.return_bookmarks(iq_elt, to_jid)) async def return_bookmarks(self, iq_elt: domish.Element, requestor: jid.JID) -> None: """Send IQ result for bookmark request on private XML. Retrieve bookmark from Bookmarks2 PEP node, convert them to private XML XEP-0048 format, and send the IQ result. @param iq_elt: The incoming IQ element. @param requestor: The JID of the entity requesting the bookmarks. """ assert self.backend is not None assert self.xmlstream is not None try: items, __ = await self.backend.getItems( NS_BOOKMARKS2, requestor, requestor, ext_data={"pep": True} ) except tuple(self.error_map.keys()) as e: return self.handle_mapped_error(e, iq_elt) iq_result_elt = xmlstream.toResponse(iq_elt, "result") query_elt = iq_result_elt.addElement((NS_IQ_PRIVATE, "query")) storage_elt = query_elt.addElement((NS_STORAGE_BOOKMARKS, "storage")) # For simply add all bookmarks to get XEP-0048 private XML elements. for item_elt in items: conference_elt = next(item_elt.elements(NS_BOOKMARKS2, "conference"), None) if conference_elt is not None: # The namespace is not the same for XEP-0048 conference_elt.uri = NS_STORAGE_BOOKMARKS for elt in conference_elt.children: elt.uri = NS_STORAGE_BOOKMARKS conference_elt["jid"] = item_elt["id"] storage_elt.addChild(conference_elt) else: log.msg( "Warning: Unexpectedly missing conference element: " f"{item_elt.toXml()}" ) self.xmlstream.send(iq_result_elt) def _on_set(self, iq_elt: domish.Element) -> None: if not iq_elt.delegated: return assert self.xmlstream is not None iq_elt.handled = True from_jid = jid.JID(iq_elt["from"]) to_jid = jid.JID(iq_elt["to"]) if from_jid.userhostJID() != to_jid: msg = ( f"{from_jid.userhost()} is not allowed to access private storage of " f"{to_jid.userhost()}!" ) log.msg(f"Hack attempt? {msg}") error_elt = jabber_error.StanzaError("forbidden", text=msg).toResponse(iq_elt) self.xmlstream.send(error_elt) return query_elt = iq_elt.query assert query_elt is not None storage_elt = query_elt.firstChildElement() if ( storage_elt is None or storage_elt.name != "storage" or storage_elt.uri != NS_STORAGE_BOOKMARKS ): error_elt = jabber_error.StanzaError( "not-allowed", text=( f'"{NS_STORAGE_BOOKMARKS}" is the only private XML storage allowed ' "on this server" ), ).toResponse(iq_elt) self.xmlstream.send(error_elt) return defer.ensureDeferred(self.on_set(iq_elt, from_jid)) async def publish_bookmarks( self, iq_elt: domish.Element, items: list[domish.Element], requestor: jid.JID ) -> None: """Publish bookmarks on Bookmarks2 PEP node""" assert self.backend is not None assert self.xmlstream is not None try: await self.backend.publish( NS_BOOKMARKS2, items, requestor, options={const.OPT_ACCESS_MODEL: const.VAL_AMODEL_WHITELIST}, pep=True, recipient=requestor, ) except error.NodeNotFound: # The node doesn't exist, we create it await self.backend.createNode( NS_BOOKMARKS2, requestor, options={const.OPT_ACCESS_MODEL: const.VAL_AMODEL_WHITELIST}, pep=True, recipient=requestor, ) await self.publish_bookmarks(iq_elt, items, requestor) except Exception as e: log.err(f"Error while publishing converted bookmarks: {e}") error_elt = jabber_error.StanzaError( "internal-server-error", text=(f"Something went wrong while publishing the bookmark items: {e}"), ).toResponse(iq_elt) self.xmlstream.send(error_elt) raise e async def on_set(self, iq_elt: domish.Element, requestor: jid.JID) -> None: """Handle IQ set request for bookmarks on private XML. This method processes an IQ set request to update bookmarks stored in private XML. It extracts conference elements from the request, transforms them into XEP-0402 format, and publishes them to XEP-0402 PEP node. @param iq_elt: The incoming IQ element containing the set request. @param requestor: The JID of the entity making the request. """ # TODO: We should check if items already exist in Bookmarks 2 and avoid updating # them if there is no change. However, considering that XEP-0048 is deprecated and # active implementations without XEP-0402 are rare, it might not be worth the # trouble. assert self.backend is not None assert self.xmlstream is not None query_elt = iq_elt.query assert query_elt is not None # <storage> presence has been checked in ``_on_set``, we know that we have it # here. storage_elt = next(query_elt.elements(NS_STORAGE_BOOKMARKS, "storage")) items = [] for conference_elt in storage_elt.elements(NS_STORAGE_BOOKMARKS, "conference"): item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) try: item_elt["id"] = conference_elt["jid"] except AttributeError: log.msg( "Warning: ignoring <conference> with missing jid: " f"conference_elt.toXml()" ) continue new_conference_elt = item_elt.addElement((NS_BOOKMARKS2, "conference")) for attr, value in conference_elt.attributes.items(): if attr == "jid": continue new_conference_elt[attr] = value for child in conference_elt.children: new_child = domish.Element((NS_BOOKMARKS2, child.name)) new_child.addContent(str(child)) new_conference_elt.addChild(new_child) items.append(item_elt) try: await self.publish_bookmarks(iq_elt, items, requestor) except tuple(self.error_map.keys()) as e: return self.handle_mapped_error(e, iq_elt) iq_result_elt = xmlstream.toResponse(iq_elt, "result") self.xmlstream.send(iq_result_elt) def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [ disco.DiscoFeature(NS_IQ_PRIVATE), disco.DiscoFeature(NS_BOOKMARKS_COMPAT), ] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []