Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py @ 4387:a6270030968d default tip
doc (components): Document the handling of mailing lists in Email Gateway:
fix 462
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 Aug 2025 23:45:48 +0200 |
parents | c055042c01e3 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import TYPE_CHECKING, cast from twisted.internet import defer from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish from wokkel import data_form, disco, pubsub, rsm from libervia.backend import G from libervia.backend.core import exceptions from libervia.backend.core.core_types import SatXMPPComponent from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation from libervia.backend.plugins.plugin_pubsub_cache import PubsubCache from libervia.backend.plugins.plugin_xep_0498 import NodeData from libervia.backend.tools.utils import ensure_deferred if TYPE_CHECKING: from . import EmailGatewayComponent log = getLogger(__name__) # all nodes have the same config NODE_CONFIG = [ {"var": "pubsub#persist_items", "type": "boolean", "value": True}, {"var": "pubsub#max_items", "value": "max"}, {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, ] NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} for c in NODE_CONFIG: NODE_OPTIONS[c["var"]].update( {k: v for k, v in c.items() if k not in ("var", "value")} ) class EmailGWPubsubResource(pubsub.PubSubResource): def __init__(self, service: "EmailGWPubsubService") -> None: self.gateway = service.gateway self.host = self.gateway.host self.service = service self._pfs = service._pfs self._ps_cache = cast(PubsubCache, G.host.plugins["PUBSUB_CACHE"]) super().__init__() @property def client(self) -> SatXMPPComponent: client = self.gateway.client assert client is not None return client def getNodes( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred[list[str]]: return defer.ensureDeferred(self.get_nodes(requestor, service, nodeIdentifier)) async def get_nodes( self, requestor: jid.JID, service: jid.JID, node_id: str ) -> list[str]: nodes = await G.storage.get_pubsub_nodes(self.client, self.client.jid) return [self._pfs.namespace] + [cast(str, node.name) for node in nodes] @ensure_deferred async def items( self, request: rsm.PubSubRequest, ) -> tuple[list[domish.Element], rsm.RSMResponse | None]: client = self.client requestor_jid = request.sender.userhostJID() if not client.is_local(requestor_jid): raise error.StanzaError("forbidden") if request.nodeIdentifier != self._pfs.namespace: return await self.items_from_mailing_list(request, requestor_jid) files = await self.host.memory.get_files(client, requestor_jid) node_data = NodeData.from_files_data(client.jid, files) return node_data.to_elements(), None async def items_from_mailing_list( self, request: rsm.PubSubRequest, requestor_jid: jid.JID ) -> tuple[list[domish.Element], rsm.RSMResponse|None]: """Handle items coming from mailing lists. @param request: Pubsub request. @param requestor_jid: Bare jid of the requestor. @return: Items matching request, if allowed, and RSM response. @raise error.StanzaError: One of: - ``item-not-found`` if no corresponding node or item if found - ``forbidden`` if the requestor does not have sufficient privileges """ node = request.nodeIdentifier node = await G.storage.get_pubsub_node( self.client, self.client.jid, node, with_affiliations=True ) if node is None: raise error.StanzaError("item-not-found") match str(node.access_model): case AccessModel.open: pass case AccessModel.whitelist: for affiliation in node.affiliations: if ( affiliation.entity == requestor_jid and affiliation.affiliation in { Affiliation.owner, Affiliation.publisher, Affiliation.member } ): break else: raise error.StanzaError("forbidden") case _: raise exceptions.InternalError( f"Unmanaged access model: {node.access_model}" ) pubsub_items, metadata = await self._ps_cache.get_items_from_cache( self.client, node, request.maxItems, request.itemIdentifiers, request.subscriptionIdentifier, request.rsm ) if rsm_data := metadata.get("rsm"): rsm_response = rsm.RSMResponse(**rsm_data) else: rsm_response = None return ( [cast(domish.Element, ps_item.data) for ps_item in pubsub_items], rsm_response ) @ensure_deferred async def retract(self, request: rsm.PubSubRequest) -> None: client = self.gateway.client assert client is not None sender = request.sender.userhostJID() if not client.is_local(sender): raise error.StanzaError("forbidden") if request.nodeIdentifier != self._pfs.namespace: raise error.StanzaError("bad-request") for item_id in request.itemIdentifiers: try: # FIXME: item ID naming convention must be hanlded using dedicated methods # in XEP-0498. file_id = item_id.rsplit("_", 1)[1] except IndexError: file_id = "" if not file_id: raise error.StanzaError("bad-request") # Ownership is checked by ``file_delete``, and payload deletion is done there # too. await self.host.memory.file_delete(client, sender.userhostJID(), file_id) @ensure_deferred async def subscribe(self, request: rsm.PubSubRequest): raise rsm.Unsupported("subscribe") @ensure_deferred async def unsubscribe(self, request: rsm.PubSubRequest): raise rsm.Unsupported("unsubscribe") def getConfigurationOptions(self): return NODE_OPTIONS def getConfiguration( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred: return defer.succeed(NODE_CONFIG_VALUES) def getNodeInfo( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str, pep: bool = False, recipient: jid.JID | None = None, ) -> dict | None: if not nodeIdentifier: return None info = {"type": "leaf", "meta-data": NODE_CONFIG} return info class EmailGWPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" def __init__(self, gateway: "EmailGatewayComponent"): self.gateway = gateway self._pfs = gateway._pfs resource = EmailGWPubsubResource(self) super().__init__(resource) self.host = gateway.host self.discoIdentity = { "category": "pubsub", "type": "service", "name": "Libervia Email Gateway", } @ensure_deferred async def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature | disco.DiscoIdentity | data_form.Form]: infos = await super().getDiscoInfo(requestor, target, nodeIdentifier) infos.append(disco.DiscoFeature(self._pfs.namespace)) return infos