changeset 4338:7c0b7ecb816f

component email gateway: Add a pubsub service: a pubsub service is implemented to retrieve and manage attachments using XEP-0498. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100 (4 weeks ago)
parents 95792a1f26c7
children 699aa8788d98
files libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py
diffstat 2 files changed, 165 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Tue Dec 03 00:13:23 2024 +0100
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Tue Dec 03 00:13:23 2024 +0100
@@ -49,6 +49,9 @@
 from libervia.backend.memory.sqla import select
 from libervia.backend.memory.sqla_mapping import PrivateIndBin
 from libervia.backend.models.core import MessageData
+from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
+    EmailGWPubsubService,
+)
 from libervia.backend.plugins.plugin_xep_0033 import (
     AddressType,
     AddressesData,
@@ -175,8 +178,8 @@
                 else:
                     log.debug(f"Connection to IMAP server successful for {user_jid}.")
 
-    def get_handler(self, __) -> XMPPHandler:
-        return EmailGatewayHandler()
+    def get_handler(self, __) -> tuple[XMPPHandler, XMPPHandler]:
+        return EmailGatewayHandler(), EmailGWPubsubService(self)
 
     async def profile_connecting(self, client: SatXMPPEntity) -> None:
         assert isinstance(client, SatXMPPComponent)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py	Tue Dec 03 00:13:23 2024 +0100
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from pathlib import Path
+from typing import TYPE_CHECKING
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import data_form, disco, pubsub, rsm
+
+from libervia.backend.core.i18n import _
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.log import getLogger
+from libervia.backend.plugins.plugin_xep_0498 import NodeData
+from libervia.backend.tools.utils import ensure_deferred
+
+if TYPE_CHECKING:
+    from . import EmailGatewayComponent
+
+
+log = getLogger(__name__)
+
+# all nodes have the same config
+NODE_CONFIG = [
+    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+    {"var": "pubsub#max_items", "value": "max"},
+    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+]
+
+NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
+NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
+for c in NODE_CONFIG:
+    NODE_OPTIONS[c["var"]].update(
+        {k: v for k, v in c.items() if k not in ("var", "value")}
+    )
+
+
+class EmailGWPubsubResource(pubsub.PubSubResource):
+
+    def __init__(self, service: "EmailGWPubsubService") -> None:
+        self.gateway = service.gateway
+        self.host = self.gateway.host
+        self.service = service
+        self._pfs = service._pfs
+        super().__init__()
+
+    def getNodes(
+        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
+    ) -> defer.Deferred[list[str]]:
+        return defer.succeed([self._pfs.namespace])
+
+    @ensure_deferred
+    async def items(
+        self,
+        request: rsm.PubSubRequest,
+    ) -> tuple[list[domish.Element], rsm.RSMResponse | None]:
+        client = self.gateway.client
+        assert client is not None
+        sender = request.sender.userhostJID()
+        if not client.is_local(sender):
+            raise error.StanzaError("forbidden")
+
+        if request.nodeIdentifier != self._pfs.namespace:
+            return [], None
+
+        files = await self.host.memory.get_files(client, sender)
+        node_data = NodeData.from_files_data(client.jid, files)
+        return node_data.to_elements(), None
+
+    @ensure_deferred
+    async def retract(self, request: rsm.PubSubRequest) -> None:
+        client = self.gateway.client
+        assert client is not None
+        sender = request.sender.userhostJID()
+        if not client.is_local(sender):
+            raise error.StanzaError("forbidden")
+        if request.nodeIdentifier != self._pfs.namespace:
+            raise error.StanzaError("bad-request")
+
+        for item_id in request.itemIdentifiers:
+            try:
+                # FIXME: item ID naming convention must be hanlded using dedicated methods
+                #   in XEP-0498.
+                file_id = item_id.rsplit("_", 1)[1]
+            except IndexError:
+                file_id = ""
+            if not file_id:
+                raise error.StanzaError("bad-request")
+            # Ownership is checked by ``file_delete``, and payload deletion is done there
+            # too.
+            await self.host.memory.file_delete(client, sender.userhostJID(), file_id)
+
+    @ensure_deferred
+    async def subscribe(self, request: rsm.PubSubRequest):
+        raise rsm.Unsupported("subscribe")
+
+    @ensure_deferred
+    async def unsubscribe(self, request: rsm.PubSubRequest):
+        raise rsm.Unsupported("unsubscribe")
+
+    def getConfigurationOptions(self):
+        return NODE_OPTIONS
+
+    def getConfiguration(
+        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
+    ) -> defer.Deferred:
+        return defer.succeed(NODE_CONFIG_VALUES)
+
+    def getNodeInfo(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str,
+        pep: bool = False,
+        recipient: jid.JID | None = None,
+    ) -> dict | None:
+        if not nodeIdentifier:
+            return None
+        info = {"type": "leaf", "meta-data": NODE_CONFIG}
+        return info
+
+
+class EmailGWPubsubService(rsm.PubSubService):
+    """Pubsub service for XMPP requests"""
+
+    def __init__(self, gateway: "EmailGatewayComponent"):
+        self.gateway = gateway
+        self._pfs = gateway._pfs
+        resource = EmailGWPubsubResource(self)
+        super().__init__(resource)
+        self.host = gateway.host
+        self.discoIdentity = {
+            "category": "pubsub",
+            "type": "service",
+            "name": "Libervia Email Gateway",
+        }
+
+    @ensure_deferred
+    async def getDiscoInfo(
+        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
+    ) -> list[disco.DiscoFeature | disco.DiscoIdentity | data_form.Form]:
+        infos = await super().getDiscoInfo(requestor, target, nodeIdentifier)
+        infos.append(disco.DiscoFeature(self._pfs.namespace))
+        return infos