view libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 699aa8788d98
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import data_form, disco, pubsub, rsm

from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0498 import NodeData
from libervia.backend.tools.utils import ensure_deferred

if TYPE_CHECKING:
    from . import EmailGatewayComponent


log = getLogger(__name__)

# all nodes have the same config
NODE_CONFIG = [
    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
    {"var": "pubsub#max_items", "value": "max"},
    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
]

NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
for c in NODE_CONFIG:
    NODE_OPTIONS[c["var"]].update(
        {k: v for k, v in c.items() if k not in ("var", "value")}
    )


class EmailGWPubsubResource(pubsub.PubSubResource):

    def __init__(self, service: "EmailGWPubsubService") -> None:
        self.gateway = service.gateway
        self.host = self.gateway.host
        self.service = service
        self._pfs = service._pfs
        super().__init__()

    def getNodes(
        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred[list[str]]:
        return defer.succeed([self._pfs.namespace])

    @ensure_deferred
    async def items(
        self,
        request: rsm.PubSubRequest,
    ) -> tuple[list[domish.Element], rsm.RSMResponse | None]:
        client = self.gateway.client
        assert client is not None
        sender = request.sender.userhostJID()
        if not client.is_local(sender):
            raise error.StanzaError("forbidden")

        if request.nodeIdentifier != self._pfs.namespace:
            return [], None

        files = await self.host.memory.get_files(client, sender)
        node_data = NodeData.from_files_data(client.jid, files)
        return node_data.to_elements(), None

    @ensure_deferred
    async def retract(self, request: rsm.PubSubRequest) -> None:
        client = self.gateway.client
        assert client is not None
        sender = request.sender.userhostJID()
        if not client.is_local(sender):
            raise error.StanzaError("forbidden")
        if request.nodeIdentifier != self._pfs.namespace:
            raise error.StanzaError("bad-request")

        for item_id in request.itemIdentifiers:
            try:
                # FIXME: item ID naming convention must be hanlded using dedicated methods
                #   in XEP-0498.
                file_id = item_id.rsplit("_", 1)[1]
            except IndexError:
                file_id = ""
            if not file_id:
                raise error.StanzaError("bad-request")
            # Ownership is checked by ``file_delete``, and payload deletion is done there
            # too.
            await self.host.memory.file_delete(client, sender.userhostJID(), file_id)

    @ensure_deferred
    async def subscribe(self, request: rsm.PubSubRequest):
        raise rsm.Unsupported("subscribe")

    @ensure_deferred
    async def unsubscribe(self, request: rsm.PubSubRequest):
        raise rsm.Unsupported("unsubscribe")

    def getConfigurationOptions(self):
        return NODE_OPTIONS

    def getConfiguration(
        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred:
        return defer.succeed(NODE_CONFIG_VALUES)

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: jid.JID | None = None,
    ) -> dict | None:
        if not nodeIdentifier:
            return None
        info = {"type": "leaf", "meta-data": NODE_CONFIG}
        return info


class EmailGWPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, gateway: "EmailGatewayComponent"):
        self.gateway = gateway
        self._pfs = gateway._pfs
        resource = EmailGWPubsubResource(self)
        super().__init__(resource)
        self.host = gateway.host
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia Email Gateway",
        }

    @ensure_deferred
    async def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature | disco.DiscoIdentity | data_form.Form]:
        infos = await super().getDiscoInfo(requestor, target, nodeIdentifier)
        infos.append(disco.DiscoFeature(self._pfs.namespace))
        return infos