diff libervia/backend/plugins/plugin_sec_oxps.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_sec_oxps.py@c23cad65ae99
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_sec_oxps.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,788 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Pubsub Encryption
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+import dataclasses
+import secrets
+import time
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
+from collections import OrderedDict
+
+import shortuuid
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from wokkel import rsm
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory import persistent
+from libervia.backend.tools import utils
+from libervia.backend.tools import xml_tools
+from libervia.backend.tools.common import data_format
+from libervia.backend.tools.common import uri
+from libervia.backend.tools.common.async_utils import async_lru
+
+from .plugin_xep_0373 import NS_OX, get_gpg_provider
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "OXPS"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "OpenPGP for XMPP Pubsub",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0334", "XEP-0373"],
+    C.PI_MAIN: "PubsubEncryption",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub e2e encryption via OpenPGP"""),
+}
+NS_OXPS = "urn:xmpp:openpgp:pubsub:0"
+
+KEY_REVOKED = "revoked"
+CACHE_MAX = 5
+
+
+@dataclasses.dataclass
+class SharedSecret:
+    id: str
+    key: str
+    timestamp: float
+    # bare JID of who has generated the secret
+    origin: jid.JID
+    revoked: bool = False
+    shared_with: Set[jid.JID] = dataclasses.field(default_factory=set)
+
+
+class PubsubEncryption:
+    namespace = NS_OXPS
+
+    def __init__(self, host):
+        log.info(_("OpenPGP for XMPP Pubsub plugin initialization"))
+        host.register_namespace("oxps", NS_OXPS)
+        self.host = host
+        self._p = host.plugins["XEP-0060"]
+        self._h = host.plugins["XEP-0334"]
+        self._ox = host.plugins["XEP-0373"]
+        host.trigger.add("XEP-0060_publish", self._publish_trigger)
+        host.trigger.add("XEP-0060_items", self._items_trigger)
+        host.trigger.add(
+            "message_received",
+            self._message_received_trigger,
+        )
+        host.bridge.add_method(
+            "ps_secret_share",
+            ".plugin",
+            in_sign="sssass",
+            out_sign="",
+            method=self._ps_secret_share,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_secret_revoke",
+            ".plugin",
+            in_sign="sssass",
+            out_sign="",
+            method=self._ps_secret_revoke,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_secret_rotate",
+            ".plugin",
+            in_sign="ssass",
+            out_sign="",
+            method=self._ps_secret_rotate,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_secrets_list",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._ps_secrets_list,
+            async_=True,
+        )
+
+    def get_handler(self, client):
+        return PubsubEncryption_Handler()
+
+    async def profile_connecting(self, client):
+        client.__storage = persistent.LazyPersistentBinaryDict(
+            IMPORT_NAME, client.profile
+        )
+        # cache to avoid useless DB access, and to avoid race condition by ensuring that
+        # the same shared_secrets instance is always used for a given node.
+        client.__cache = OrderedDict()
+        self.gpg_provider = get_gpg_provider(self.host, client)
+
+    async def load_secrets(
+        self,
+        client: SatXMPPEntity,
+        node_uri: str
+    ) -> Optional[Dict[str, SharedSecret]]:
+        """Load shared secret from databse or cache
+
+        A cache is used per client to avoid usueless db access, as shared secrets are
+        often needed several times in a row. Cache is also necessary to avoir race
+        condition, when updating a secret, by ensuring that the same instance is used
+        for all updates during a session.
+
+        @param node_uri: XMPP URI of the encrypted pubsub node
+        @return shared secrets, or None if no secrets are known yet
+        """
+        try:
+            shared_secrets = client.__cache[node_uri]
+        except KeyError:
+            pass
+        else:
+            client.__cache.move_to_end(node_uri)
+            return shared_secrets
+
+        secrets_as_dict = await client.__storage.get(node_uri)
+
+        if secrets_as_dict is None:
+            return None
+        else:
+            shared_secrets = {
+                s["id"]: SharedSecret(
+                    id=s["id"],
+                    key=s["key"],
+                    timestamp=s["timestamp"],
+                    origin=jid.JID(s["origin"]),
+                    revoked=s["revoked"],
+                    shared_with={jid.JID(w) for w in s["shared_with"]}
+                ) for s in secrets_as_dict
+            }
+            client.__cache[node_uri] = shared_secrets
+            while len(client.__cache) > CACHE_MAX:
+                client.__cache.popitem(False)
+            return shared_secrets
+
+    def __secrect_dict_factory(self, data: List[Tuple[str, Any]]) -> Dict[str, Any]:
+        ret = {}
+        for k, v in data:
+            if k == "origin":
+                v = v.full()
+            elif k == "shared_with":
+                v = [j.full() for j in v]
+            ret[k] = v
+        return ret
+
+    async def store_secrets(
+        self,
+        client: SatXMPPEntity,
+        node_uri: str,
+        shared_secrets: Dict[str, SharedSecret]
+    ) -> None:
+        """Store shared secrets to database
+
+        Shared secrets are serialised before being stored.
+        If ``node_uri`` is not in cache, the shared_secrets instance is also put in cache/
+
+        @param node_uri: XMPP URI of the encrypted pubsub node
+        @param shared_secrets: shared secrets to store
+        """
+        if node_uri not in client.__cache:
+            client.__cache[node_uri] = shared_secrets
+            while len(client.__cache) > CACHE_MAX:
+                client.__cache.popitem(False)
+
+        secrets_as_dict = [
+            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
+            for s in shared_secrets.values()
+        ]
+        await client.__storage.aset(node_uri, secrets_as_dict)
+
+    def generate_secret(self, client: SatXMPPEntity) -> SharedSecret:
+        """Generate a new shared secret"""
+        log.info("Generating a new shared secret.")
+        secret_key = secrets.token_urlsafe(64)
+        secret_id = shortuuid.uuid()
+        return SharedSecret(
+            id = secret_id,
+            key = secret_key,
+            timestamp = time.time(),
+            origin = client.jid.userhostJID()
+        )
+
+    def _ps_secret_revoke(
+        self,
+        service: str,
+        node: str,
+        secret_id: str,
+        recipients: List[str],
+        profile_key: str
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.revoke(
+                self.host.get_client(profile_key),
+                jid.JID(service) if service else None,
+                node,
+                secret_id,
+                [jid.JID(r) for r in recipients] or None,
+            )
+        )
+
+    async def revoke(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        secret_id: str,
+        recipients: Optional[Iterable[jid.JID]] = None
+    ) -> None:
+        """Revoke a secret and notify entities
+
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param secret_id: ID of the secret to revoke (must have been generated by
+            ourselves)
+        recipients: JIDs of entities to send the revocation notice to. If None, all
+            entities known to have the shared secret will be notified.
+            Use empty list if you don't want to notify anybody (not recommended)
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if not shared_secrets:
+            raise exceptions.NotFound(f"No shared secret is known for {node_uri}")
+        try:
+            shared_secret = shared_secrets[secret_id]
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No shared secret with ID {secret_id!r} has been found for {node_uri}"
+            )
+        else:
+            if shared_secret.origin != client.jid.userhostJID():
+                raise exceptions.PermissionError(
+                    f"The shared secret {shared_secret.id} originate from "
+                    f"{shared_secret.origin}, not you ({client.jid.userhostJID()}). You "
+                    "can't revoke it"
+                )
+            shared_secret.revoked = True
+        await self.store_secrets(client, node_uri, shared_secrets)
+        log.info(
+            f"shared secret {secret_id!r} for {node_uri} has been revoked."
+        )
+        if recipients is None:
+            recipients = shared_secret.shared_with
+        if recipients:
+            for recipient in recipients:
+                await self.send_revoke_notification(
+                    client, service, node, shared_secret.id, recipient
+                )
+            log.info(
+                f"shared secret {shared_secret.id} revocation notification for "
+                f"{node_uri} has been send to {''.join(str(r) for r in recipients)}"
+            )
+        else:
+            log.info(
+                "Due to empty recipients list, no revocation notification has been sent "
+                f"for shared secret {shared_secret.id} for {node_uri}"
+            )
+
+    async def send_revoke_notification(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        secret_id: str,
+        recipient: jid.JID
+    ) -> None:
+        revoke_elt = domish.Element((NS_OXPS, "revoke"))
+        revoke_elt["jid"] = service.full()
+        revoke_elt["node"] = node
+        revoke_elt["id"] = secret_id
+        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
+        payload_elt.addChild(revoke_elt)
+        openpgp_elt = await self._ox.build_openpgp_element(
+            client, signcrypt_elt, {recipient}
+        )
+        message_elt = domish.Element((None, "message"))
+        message_elt["from"] = client.jid.full()
+        message_elt["to"] = recipient.full()
+        message_elt.addChild((openpgp_elt))
+        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
+        client.send(message_elt)
+
+    def _ps_secret_share(
+        self,
+        recipient: str,
+        service: str,
+        node: str,
+        secret_ids: List[str],
+        profile_key: str
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.share_secrets(
+                self.host.get_client(profile_key),
+                jid.JID(recipient),
+                jid.JID(service) if service else None,
+                node,
+                secret_ids or None,
+            )
+        )
+
+    async def share_secret(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        shared_secret: SharedSecret,
+        recipient: jid.JID
+    ) -> None:
+        """Create and send <shared-secret> element"""
+        if service is None:
+            service = client.jid.userhostJID()
+        shared_secret_elt = domish.Element((NS_OXPS, "shared-secret"))
+        shared_secret_elt["jid"] = service.full()
+        shared_secret_elt["node"] = node
+        shared_secret_elt["id"] = shared_secret.id
+        shared_secret_elt["timestamp"] = utils.xmpp_date(shared_secret.timestamp)
+        if shared_secret.revoked:
+            shared_secret_elt["revoked"] = C.BOOL_TRUE
+        # TODO: add type attribute
+        shared_secret_elt.addContent(shared_secret.key)
+        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
+        payload_elt.addChild(shared_secret_elt)
+        openpgp_elt = await self._ox.build_openpgp_element(
+            client, signcrypt_elt, {recipient}
+        )
+        message_elt = domish.Element((None, "message"))
+        message_elt["from"] = client.jid.full()
+        message_elt["to"] = recipient.full()
+        message_elt.addChild((openpgp_elt))
+        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
+        client.send(message_elt)
+        shared_secret.shared_with.add(recipient)
+
+    async def share_secrets(
+        self,
+        client: SatXMPPEntity,
+        recipient: jid.JID,
+        service: Optional[jid.JID],
+        node: str,
+        secret_ids: Optional[List[str]] = None,
+    ) -> None:
+        """Share secrets of a pubsub node with a recipient
+
+        @param recipient: who to share secrets with
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param secret_ids: IDs of the secrets to share, or None to share all known secrets
+            (disabled or not)
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            # no secret shared yet, let's generate one
+            shared_secret = self.generate_secret(client)
+            shared_secrets = {shared_secret.id: shared_secret}
+            await self.store_secrets(client, node_uri, shared_secrets)
+        if secret_ids is None:
+            # we share all secrets of the node
+            to_share = shared_secrets.values()
+        else:
+            try:
+                to_share = [shared_secrets[s_id] for s_id in secret_ids]
+            except KeyError as e:
+                raise exceptions.NotFound(
+                    f"no shared secret found with given ID: {e}"
+                )
+        for shared_secret in to_share:
+            await self.share_secret(client, service, node, shared_secret, recipient)
+        await self.store_secrets(client, node_uri, shared_secrets)
+
+    def _ps_secret_rotate(
+        self,
+        service: str,
+        node: str,
+        recipients: List[str],
+        profile_key: str,
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.rotate_secret(
+                self.host.get_client(profile_key),
+                jid.JID(service) if service else None,
+                node,
+                [jid.JID(r) for r in recipients] or None
+            )
+        )
+
+    async def rotate_secret(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        recipients: Optional[List[jid.JID]] = None
+    ) -> None:
+        """Revoke all current known secrets, create and share a new one
+
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param recipients: who must receive the new shared secret
+            if None, all recipients known to have last active shared secret will get the
+            new secret
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+        for shared_secret in shared_secrets.values():
+            if not shared_secret.revoked:
+                await self.revoke(client, service, node, shared_secret.id)
+                shared_secret.revoked = True
+
+        if recipients is None:
+            if shared_secrets:
+                # we get recipients from latests shared secret's shared_with list,
+                # regarless of deprecation (cause all keys may be deprecated)
+                recipients = list(sorted(
+                    shared_secrets.values(),
+                    key=lambda s: s.timestamp,
+                    reverse=True
+                )[0].shared_with)
+            else:
+                recipients = []
+
+        shared_secret = self.generate_secret(client)
+        shared_secrets[shared_secret.id] = shared_secret
+        # we send notification to last entities known to already have the shared secret
+        for recipient in recipients:
+            await self.share_secret(client, service, node, shared_secret, recipient)
+        await self.store_secrets(client, node_uri, shared_secrets)
+
+    def _ps_secrets_list(
+        self,
+        service: str,
+        node: str,
+        profile_key: str
+    ) -> defer.Deferred:
+        d = defer.ensureDeferred(
+            self.list_shared_secrets(
+                self.host.get_client(profile_key),
+                jid.JID(service) if service else None,
+                node,
+            )
+        )
+        d.addCallback(lambda ret: data_format.serialise(ret))
+        return d
+
+    async def list_shared_secrets(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+    ) -> List[Dict[str, Any]]:
+        """Retrieve for shared secrets of a pubsub node
+
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @return: shared secrets data
+        @raise exceptions.NotFound: no shared secret found for this node
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            raise exceptions.NotFound(f"No shared secrets found for {node_uri}")
+        return [
+            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
+            for s in shared_secrets.values()
+        ]
+
+    async def handle_revoke_elt(
+        self,
+        client: SatXMPPEntity,
+        sender: jid.JID,
+        revoke_elt: domish.Element
+    ) -> None:
+        """Parse a <revoke> element and update local secrets
+
+        @param sender: bare jid of the entity who has signed the secret
+        @param revoke: <revoke/> element
+        """
+        try:
+            service = jid.JID(revoke_elt["jid"])
+            node = revoke_elt["node"]
+            secret_id = revoke_elt["id"]
+        except (KeyError, RuntimeError) as e:
+            log.warning(
+                f"ignoring invalid <revoke> element: {e}\n{revoke_elt.toXml()}"
+            )
+            return
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            log.warning(
+                f"Can't revoke shared secret {secret_id}: no known shared secrets for "
+                f"{node_uri}"
+            )
+            return
+
+        if any(s.origin != sender for s in shared_secrets.values()):
+            log.warning(
+                f"Rejecting shared secret revocation signed by invalid entity ({sender}):"
+                f"\n{revoke_elt.toXml}"
+            )
+            return
+
+        try:
+            shared_secret = shared_secrets[secret_id]
+        except KeyError:
+            log.warning(
+                f"Can't revoke shared secret {secret_id}: this secret ID is unknown for "
+                f"{node_uri}"
+            )
+            return
+
+        shared_secret.revoked = True
+        await self.store_secrets(client, node_uri, shared_secrets)
+        log.info(f"Shared secret {secret_id} has been revoked for {node_uri}")
+
+    async def handle_shared_secret_elt(
+        self,
+        client: SatXMPPEntity,
+        sender: jid.JID,
+        shared_secret_elt: domish.Element
+    ) -> None:
+        """Parse a <shared-secret> element and update local secrets
+
+        @param sender: bare jid of the entity who has signed the secret
+        @param shared_secret_elt: <shared-secret/> element
+        """
+        try:
+            service = jid.JID(shared_secret_elt["jid"])
+            node = shared_secret_elt["node"]
+            secret_id = shared_secret_elt["id"]
+            timestamp = utils.parse_xmpp_date(shared_secret_elt["timestamp"])
+            # TODO: handle "type" attribute
+            revoked = C.bool(shared_secret_elt.getAttribute("revoked", C.BOOL_FALSE))
+        except (KeyError, RuntimeError, ValueError) as e:
+            log.warning(
+                f"ignoring invalid <shared-secret> element: "
+                f"{e}\n{shared_secret_elt.toXml()}"
+            )
+            return
+        key = str(shared_secret_elt)
+        if not key:
+            log.warning(
+                "ignoring <shared-secret> element with empty key: "
+                f"{shared_secret_elt.toXml()}"
+            )
+            return
+        shared_secret = SharedSecret(
+            id=secret_id, key=key, timestamp=timestamp, origin=sender, revoked=revoked
+        )
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+            # no known shared secret yet for this node, we have to trust first user who
+            # send it
+        else:
+            if any(s.origin != sender for s in shared_secrets.values()):
+                log.warning(
+                    f"Rejecting shared secret signed by invalid entity ({sender}):\n"
+                    f"{shared_secret_elt.toXml}"
+                )
+                return
+
+        shared_secrets[shared_secret.id] = shared_secret
+        await self.store_secrets(client, node_uri, shared_secrets)
+        log.info(
+            f"shared secret {shared_secret.id} added for {node_uri} [{client.profile}]"
+        )
+
+    async def _publish_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        items: Optional[List[domish.Element]],
+        options: Optional[dict],
+        sender: jid.JID,
+        extra: Dict[str, Any]
+    ) -> bool:
+        if not items or not extra.get("encrypted"):
+            return True
+        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+            shared_secret = None
+        else:
+            current_secrets = [s for s in shared_secrets.values() if not s.revoked]
+            if not current_secrets:
+                shared_secret = None
+            elif len(current_secrets) > 1:
+                log.warning(
+                    f"more than one active shared secret found for node {node!r} at "
+                    f"{service}, using the most recent one"
+                )
+                current_secrets.sort(key=lambda s: s.timestamp, reverse=True)
+                shared_secret = current_secrets[0]
+            else:
+                shared_secret = current_secrets[0]
+
+        if shared_secret is None:
+            if any(s.origin != client.jid.userhostJID() for s in shared_secrets.values()):
+                raise exceptions.PermissionError(
+                    "there is no known active shared secret, and you are not the "
+                    "creator of previous shared secrets, we can't encrypt items at "
+                    f"{node_uri} ."
+                )
+            shared_secret = self.generate_secret(client)
+            shared_secrets[shared_secret.id] = shared_secret
+            await self.store_secrets(client, node_uri, shared_secrets)
+            # TODO: notify other entities
+
+        for item in items:
+            item_elts = list(item.elements())
+            if len(item_elts) != 1:
+                raise ValueError(
+                    f"there should be exactly one item payload: {item.toXml()}"
+                )
+            item_payload = item_elts[0]
+            log.debug(f"encrypting item {item.getAttribute('id', '')}")
+            encrypted_item = self.gpg_provider.encrypt_symmetrically(
+                item_payload.toXml().encode(), shared_secret.key
+            )
+            item.children.clear()
+            encrypted_elt = domish.Element((NS_OXPS, "encrypted"))
+            encrypted_elt["key"] = shared_secret.id
+            encrypted_elt.addContent(base64.b64encode(encrypted_item).decode())
+            item.addChild(encrypted_elt)
+
+        return True
+
+    async def _items_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        items: List[domish.Element],
+        rsm_response: rsm.RSMResponse,
+        extra: Dict[str, Any],
+    ) -> bool:
+        if not extra.get(C.KEY_DECRYPT, True):
+            return True
+        if service is None:
+            service = client.jid.userhostJID()
+        shared_secrets = None
+        for item in items:
+            payload = item.firstChildElement()
+            if (payload is not None
+                and payload.name == "encrypted"
+                and payload.uri == NS_OXPS):
+                encrypted_elt = payload
+                secret_id = encrypted_elt.getAttribute("key")
+                if not secret_id:
+                    log.warning(
+                        f'"key" attribute is missing from encrypted item: {item.toXml()}'
+                    )
+                    continue
+                if shared_secrets is None:
+                    node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
+                    shared_secrets = await self.load_secrets(client, node_uri)
+                    if shared_secrets is None:
+                        log.warning(
+                            f"No known shared secret for {node_uri}, can't decrypt"
+                        )
+                        return True
+                try:
+                    shared_secret = shared_secrets[secret_id]
+                except KeyError:
+                    log.warning(
+                        f"No key known for encrypted item {item['id']!r} (shared secret "
+                        f"id: {secret_id!r})"
+                    )
+                    continue
+                log.debug(f"decrypting item {item.getAttribute('id', '')}")
+                decrypted = self.gpg_provider.decrypt_symmetrically(
+                    base64.b64decode(str(encrypted_elt)),
+                    shared_secret.key
+                )
+                decrypted_elt = xml_tools.parse(decrypted)
+                item.children.clear()
+                item.addChild(decrypted_elt)
+                extra.setdefault("encrypted", {})[item["id"]] = {"type": NS_OXPS}
+        return True
+
+    async def _message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        sender = jid.JID(message_elt["from"]).userhostJID()
+        # there may be an openpgp element if OXIM is not activate, in this case we have to
+        # decrypt it here
+        openpgp_elt = next(message_elt.elements(NS_OX, "openpgp"), None)
+        if openpgp_elt is not None:
+            try:
+                payload_elt, __ = await self._ox.unpack_openpgp_element(
+                    client,
+                    openpgp_elt,
+                    "signcrypt",
+                    sender
+                )
+            except Exception as e:
+                log.warning(f"Can't decrypt element: {e}\n{message_elt.toXml()}")
+                return False
+            message_elt.children.remove(openpgp_elt)
+            for c in payload_elt.children:
+                message_elt.addChild(c)
+
+        shared_secret_elt = next(message_elt.elements(NS_OXPS, "shared-secret"), None)
+        if shared_secret_elt is None:
+            # no <shared-secret>, we check if there is a <revoke> element
+            revoke_elt = next(message_elt.elements(NS_OXPS, "revoke"), None)
+            if revoke_elt is None:
+                return True
+            else:
+                await self.handle_revoke_elt(client, sender, revoke_elt)
+        else:
+            await self.handle_shared_secret_elt(client, sender, shared_secret_elt)
+
+        return False
+
+
+@implementer(iwokkel.IDisco)
+class PubsubEncryption_Handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_OXPS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []