Mercurial > libervia-backend
changeset 4338:7c0b7ecb816f
component email gateway: Add a pubsub service:
a pubsub service is implemented to retrieve and manage attachments using XEP-0498.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 (4 weeks ago) |
parents | 95792a1f26c7 |
children | 699aa8788d98 |
files | libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py |
diffstat | 2 files changed, 165 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Tue Dec 03 00:13:23 2024 +0100 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Tue Dec 03 00:13:23 2024 +0100 @@ -49,6 +49,9 @@ from libervia.backend.memory.sqla import select from libervia.backend.memory.sqla_mapping import PrivateIndBin from libervia.backend.models.core import MessageData +from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import ( + EmailGWPubsubService, +) from libervia.backend.plugins.plugin_xep_0033 import ( AddressType, AddressesData, @@ -175,8 +178,8 @@ else: log.debug(f"Connection to IMAP server successful for {user_jid}.") - def get_handler(self, __) -> XMPPHandler: - return EmailGatewayHandler() + def get_handler(self, __) -> tuple[XMPPHandler, XMPPHandler]: + return EmailGatewayHandler(), EmailGWPubsubService(self) async def profile_connecting(self, client: SatXMPPEntity) -> None: assert isinstance(client, SatXMPPComponent)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py Tue Dec 03 00:13:23 2024 +0100 @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from typing import TYPE_CHECKING +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from wokkel import data_form, disco, pubsub, rsm + +from libervia.backend.core.i18n import _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0498 import NodeData +from libervia.backend.tools.utils import ensure_deferred + +if TYPE_CHECKING: + from . import EmailGatewayComponent + + +log = getLogger(__name__) + +# all nodes have the same config +NODE_CONFIG = [ + {"var": "pubsub#persist_items", "type": "boolean", "value": True}, + {"var": "pubsub#max_items", "value": "max"}, + {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, + {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, +] + +NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} +NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} +for c in NODE_CONFIG: + NODE_OPTIONS[c["var"]].update( + {k: v for k, v in c.items() if k not in ("var", "value")} + ) + + +class EmailGWPubsubResource(pubsub.PubSubResource): + + def __init__(self, service: "EmailGWPubsubService") -> None: + self.gateway = service.gateway + self.host = self.gateway.host + self.service = service + self._pfs = service._pfs + super().__init__() + + def getNodes( + self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str + ) -> defer.Deferred[list[str]]: + return defer.succeed([self._pfs.namespace]) + + @ensure_deferred + async def items( + self, + request: rsm.PubSubRequest, + ) -> tuple[list[domish.Element], rsm.RSMResponse | None]: + client = self.gateway.client + assert client is not None + sender = request.sender.userhostJID() + if not client.is_local(sender): + raise error.StanzaError("forbidden") + + if request.nodeIdentifier != self._pfs.namespace: + return [], None + + files = await self.host.memory.get_files(client, sender) + node_data = NodeData.from_files_data(client.jid, files) + return node_data.to_elements(), None + + @ensure_deferred + async def retract(self, request: rsm.PubSubRequest) -> None: + client = self.gateway.client + assert client is not None + sender = request.sender.userhostJID() + if not client.is_local(sender): + raise error.StanzaError("forbidden") + if request.nodeIdentifier != self._pfs.namespace: + raise error.StanzaError("bad-request") + + for item_id in request.itemIdentifiers: + try: + # FIXME: item ID naming convention must be hanlded using dedicated methods + # in XEP-0498. + file_id = item_id.rsplit("_", 1)[1] + except IndexError: + file_id = "" + if not file_id: + raise error.StanzaError("bad-request") + # Ownership is checked by ``file_delete``, and payload deletion is done there + # too. + await self.host.memory.file_delete(client, sender.userhostJID(), file_id) + + @ensure_deferred + async def subscribe(self, request: rsm.PubSubRequest): + raise rsm.Unsupported("subscribe") + + @ensure_deferred + async def unsubscribe(self, request: rsm.PubSubRequest): + raise rsm.Unsupported("unsubscribe") + + def getConfigurationOptions(self): + return NODE_OPTIONS + + def getConfiguration( + self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str + ) -> defer.Deferred: + return defer.succeed(NODE_CONFIG_VALUES) + + def getNodeInfo( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str, + pep: bool = False, + recipient: jid.JID | None = None, + ) -> dict | None: + if not nodeIdentifier: + return None + info = {"type": "leaf", "meta-data": NODE_CONFIG} + return info + + +class EmailGWPubsubService(rsm.PubSubService): + """Pubsub service for XMPP requests""" + + def __init__(self, gateway: "EmailGatewayComponent"): + self.gateway = gateway + self._pfs = gateway._pfs + resource = EmailGWPubsubResource(self) + super().__init__(resource) + self.host = gateway.host + self.discoIdentity = { + "category": "pubsub", + "type": "service", + "name": "Libervia Email Gateway", + } + + @ensure_deferred + async def getDiscoInfo( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature | disco.DiscoIdentity | data_form.Form]: + infos = await super().getDiscoInfo(requestor, target, nodeIdentifier) + infos.append(disco.DiscoFeature(self._pfs.namespace)) + return infos