view libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py @ 4339:699aa8788d98

tests (unit/email gateway): add tests for pubsub service: rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:52:06 +0100
parents 7c0b7ecb816f
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import data_form, disco, pubsub, rsm

from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0498 import NodeData
from libervia.backend.tools.utils import ensure_deferred

if TYPE_CHECKING:
    from . import EmailGatewayComponent


log = getLogger(__name__)

# all nodes have the same config
NODE_CONFIG = [
    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
    {"var": "pubsub#max_items", "value": "max"},
    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
]

NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
for c in NODE_CONFIG:
    NODE_OPTIONS[c["var"]].update(
        {k: v for k, v in c.items() if k not in ("var", "value")}
    )


class EmailGWPubsubResource(pubsub.PubSubResource):

    def __init__(self, service: "EmailGWPubsubService") -> None:
        self.gateway = service.gateway
        self.host = self.gateway.host
        self.service = service
        self._pfs = service._pfs
        super().__init__()

    def getNodes(
        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred[list[str]]:
        return defer.succeed([self._pfs.namespace])

    @ensure_deferred
    async def items(
        self,
        request: rsm.PubSubRequest,
    ) -> tuple[list[domish.Element], rsm.RSMResponse | None]:
        client = self.gateway.client
        assert client is not None
        sender = request.sender.userhostJID()
        if not client.is_local(sender):
            raise error.StanzaError("forbidden")

        if request.nodeIdentifier != self._pfs.namespace:
            return [], None

        files = await self.host.memory.get_files(client, sender)
        node_data = NodeData.from_files_data(client.jid, files)
        return node_data.to_elements(), None

    @ensure_deferred
    async def retract(self, request: rsm.PubSubRequest) -> None:
        client = self.gateway.client
        assert client is not None
        sender = request.sender.userhostJID()
        if not client.is_local(sender):
            raise error.StanzaError("forbidden")
        if request.nodeIdentifier != self._pfs.namespace:
            raise error.StanzaError("bad-request")

        for item_id in request.itemIdentifiers:
            try:
                # FIXME: item ID naming convention must be hanlded using dedicated methods
                #   in XEP-0498.
                file_id = item_id.rsplit("_", 1)[1]
            except IndexError:
                file_id = ""
            if not file_id:
                raise error.StanzaError("bad-request")
            # Ownership is checked by ``file_delete``, and payload deletion is done there
            # too.
            await self.host.memory.file_delete(client, sender.userhostJID(), file_id)

    @ensure_deferred
    async def subscribe(self, request: rsm.PubSubRequest):
        raise rsm.Unsupported("subscribe")

    @ensure_deferred
    async def unsubscribe(self, request: rsm.PubSubRequest):
        raise rsm.Unsupported("unsubscribe")

    def getConfigurationOptions(self):
        return NODE_OPTIONS

    def getConfiguration(
        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred:
        return defer.succeed(NODE_CONFIG_VALUES)

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: jid.JID | None = None,
    ) -> dict | None:
        if not nodeIdentifier:
            return None
        info = {"type": "leaf", "meta-data": NODE_CONFIG}
        return info


class EmailGWPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, gateway: "EmailGatewayComponent"):
        self.gateway = gateway
        self._pfs = gateway._pfs
        resource = EmailGWPubsubResource(self)
        super().__init__(resource)
        self.host = gateway.host
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia Email Gateway",
        }

    @ensure_deferred
    async def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature | disco.DiscoIdentity | data_form.Form]:
        infos = await super().getDiscoInfo(requestor, target, nodeIdentifier)
        infos.append(disco.DiscoFeature(self._pfs.namespace))
        return infos