changeset 3934:e345d93fb6e5

plugin OXPS: OpenPGP for XMPP Pubsub implementation: OpenPGP for XMPP Pubsub (, currently a protoXEP) is implemented and activated when `encrypted` is set to `True` in pubsub's `extra` data. On item retrieval, the decryption is transparent if the key is known, except if the `decrypt` key in `extra` is set to `False` (notably useful when one wants to checks that data is well encrypted). Methods and corresponding bridge methods have been implemented to manage shared secrets (to share, revoke or rotate the secrets). plugin XEP-0060's `XEP-0060_publish` trigger point as been move before actual publish so item can be modified (here e2ee) by the triggers. A new `XEP-0060_items` trigger point has also been added. `encrypted` flag can be used with plugin XEP-0277's microblog data rel 380
author Goffi <>
date Sat, 15 Oct 2022 20:36:53 +0200 (2022-10-15)
parents cecf45416403
children 80d29f55ba8b
files sat/core/ sat/core/ sat/plugins/ sat/plugins/ sat/plugins/ sat/plugins/ sat/plugins/
diffstat 7 files changed, 837 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/core/	Sat Oct 15 20:36:53 2022 +0200
@@ -370,6 +370,7 @@
     ## Common extra keys/values ##
     KEY_ORDER_BY = "order_by"
     KEY_USE_CACHE = "use_cache"
+    KEY_DECRYPT = "decrypt"
     ORDER_BY_CREATION = 'creation'
     ORDER_BY_MODIFICATION = 'modification'
--- a/sat/core/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/core/	Sat Oct 15 20:36:53 2022 +0200
@@ -123,7 +123,7 @@
-class ParsingError(Exception):
+class ParsingError(ValueError):
--- a/sat/plugins/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/plugins/	Sat Oct 15 20:36:53 2022 +0200
@@ -753,7 +753,7 @@
                     "(Re)Synchronising the node {node} at {service} on user request"
-                ).format(node=node, service=service.full)
+                ).format(node=node, service=service.full())
             # we first delete and recreate the node (will also delete its items)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/	Sat Oct 15 20:36:53 2022 +0200
@@ -0,0 +1,786 @@
+#!/usr/bin/env python3
+# Libervia plugin for Pubsub Encryption
+# Copyright (C) 2009-2022 Jérôme Poisson (
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+import base64
+import dataclasses
+import secrets
+import time
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
+from collections import OrderedDict
+import shortuuid
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from wokkel import rsm
+from zope.interface import implementer
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.memory import persistent
+from import utils
+from import xml_tools
+from import data_format
+from import uri
+from import async_lru
+from .plugin_xep_0373 import NS_OX, get_gpg_provider
+log = getLogger(__name__)
+    C.PI_NAME: "OpenPGP for XMPP Pubsub",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0334", "XEP-0373"],
+    C.PI_MAIN: "PubsubEncryption",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub e2e encryption via OpenPGP"""),
+NS_OXPS = "urn:xmpp:openpgp:pubsub:0"
+KEY_REVOKED = "revoked"
+class SharedSecret:
+    id: str
+    key: str
+    timestamp: float
+    # bare JID of who has generated the secret
+    origin: jid.JID
+    revoked: bool = False
+    shared_with: Set[jid.JID] = dataclasses.field(default_factory=set)
+class PubsubEncryption:
+    namespace = NS_OXPS
+    def __init__(self, host):
+"OpenPGP for XMPP Pubsub plugin initialization"))
+        host.registerNamespace("oxps", NS_OXPS)
+ = host
+        self._p = host.plugins["XEP-0060"]
+        self._h = host.plugins["XEP-0334"]
+        self._ox = host.plugins["XEP-0373"]
+        host.trigger.add("XEP-0060_publish", self._publish_trigger)
+        host.trigger.add("XEP-0060_items", self._items_trigger)
+        host.trigger.add(
+            "messageReceived",
+            self._message_received_trigger,
+        )
+        host.bridge.addMethod(
+            "psSecretShare",
+            ".plugin",
+            in_sign="sssass",
+            out_sign="",
+            method=self._ps_secret_share,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psSecretRevoke",
+            ".plugin",
+            in_sign="sssass",
+            out_sign="",
+            method=self._ps_secret_revoke,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psSecretRotate",
+            ".plugin",
+            in_sign="ssass",
+            out_sign="",
+            method=self._ps_secret_rotate,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psSecretsList",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._ps_secrets_list,
+            async_=True,
+        )
+    def getHandler(self, client):
+        return PubsubEncryption_Handler()
+    async def profileConnecting(self, client):
+        client.__storage = persistent.LazyPersistentBinaryDict(
+            IMPORT_NAME, client.profile
+        )
+        # cache to avoid useless DB access, and to avoid race condition by ensuring that
+        # the same shared_secrets instance is always used for a given node.
+        client.__cache = OrderedDict()
+        self.gpg_provider = get_gpg_provider(, client)
+    async def load_secrets(
+        self,
+        client: SatXMPPEntity,
+        node_uri: str
+    ) -> Optional[Dict[str, SharedSecret]]:
+        """Load shared secret from databse or cache
+        A cache is used per client to avoid usueless db access, as shared secrets are
+        often needed several times in a row. Cache is also necessary to avoir race
+        condition, when updating a secret, by ensuring that the same instance is used
+        for all updates during a session.
+        @param node_uri: XMPP URI of the encrypted pubsub node
+        @return shared secrets, or None if no secrets are known yet
+        """
+        try:
+            shared_secrets = client.__cache[node_uri]
+        except KeyError:
+            pass
+        else:
+            client.__cache.move_to_end(node_uri)
+            return shared_secrets
+        secrets_as_dict = await client.__storage.get(node_uri)
+        if secrets_as_dict is None:
+            return None
+        else:
+            shared_secrets = {
+                s["id"]: SharedSecret(
+                    id=s["id"],
+                    key=s["key"],
+                    timestamp=s["timestamp"],
+                    origin=jid.JID(s["origin"]),
+                    revoked=s["revoked"],
+                    shared_with={jid.JID(w) for w in s["shared_with"]}
+                ) for s in secrets_as_dict
+            }
+            client.__cache[node_uri] = shared_secrets
+            while len(client.__cache) > CACHE_MAX:
+                client.__cache.popitem(False)
+            return shared_secrets
+    def __secrect_dict_factory(self, data: List[Tuple[str, Any]]) -> Dict[str, Any]:
+        ret = {}
+        for k, v in data:
+            if k == "origin":
+                v = v.full()
+            elif k == "shared_with":
+                v = [j.full() for j in v]
+            ret[k] = v
+        return ret
+    async def store_secrets(
+        self,
+        client: SatXMPPEntity,
+        node_uri: str,
+        shared_secrets: Dict[str, SharedSecret]
+    ) -> None:
+        """Store shared secrets to database
+        Shared secrets are serialised before being stored.
+        If ``node_uri`` is not in cache, the shared_secrets instance is also put in cache/
+        @param node_uri: XMPP URI of the encrypted pubsub node
+        @param shared_secrets: shared secrets to store
+        """
+        if node_uri not in client.__cache:
+            client.__cache[node_uri] = shared_secrets
+            while len(client.__cache) > CACHE_MAX:
+                client.__cache.popitem(False)
+        secrets_as_dict = [
+            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
+            for s in shared_secrets.values()
+        ]
+        await client.__storage.aset(node_uri, secrets_as_dict)
+    def generate_secret(self, client: SatXMPPEntity) -> SharedSecret:
+        """Generate a new shared secret"""
+"Generating a new shared secret.")
+        secret_key = secrets.token_urlsafe(64)
+        secret_id = shortuuid.uuid()
+        return SharedSecret(
+            id = secret_id,
+            key = secret_key,
+            timestamp = time.time(),
+            origin = client.jid.userhostJID()
+        )
+    def _ps_secret_revoke(
+        self,
+        service: str,
+        node: str,
+        secret_id: str,
+        recipients: List[str],
+        profile_key: str
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.revoke(
+      ,
+                jid.JID(service) if service else None,
+                node,
+                secret_id,
+                [jid.JID(r) for r in recipients] or None,
+            )
+        )
+    async def revoke(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        secret_id: str,
+        recipients: Optional[Iterable[jid.JID]] = None
+    ) -> None:
+        """Revoke a secret and notify entities
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param secret_id: ID of the secret to revoke (must have been generated by
+            ourselves)
+        recipients: JIDs of entities to send the revocation notice to. If None, all
+            entities known to have the shared secret will be notified.
+            Use empty list if you don't want to notify anybody (not recommended)
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if not shared_secrets:
+            raise exceptions.NotFound(f"No shared secret is known for {node_uri}")
+        try:
+            shared_secret = shared_secrets[secret_id]
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No shared secret with ID {secret_id!r} has been found for {node_uri}"
+            )
+        else:
+            if shared_secret.origin != client.jid.userhostJID():
+                raise exceptions.PermissionError(
+                    f"The shared secret {} originate from "
+                    f"{shared_secret.origin}, not you ({client.jid.userhostJID()}). You "
+                    "can't revoke it"
+                )
+            shared_secret.revoked = True
+        await self.store_secrets(client, node_uri, shared_secrets)
+            f"shared secret {secret_id!r} for {node_uri} has been revoked."
+        )
+        if recipients is None:
+            recipients = shared_secret.shared_with
+        if recipients:
+            for recipient in recipients:
+                await self.send_revoke_notification(
+                    client, service, node,, recipient
+                )
+                f"shared secret {} revocation notification for "
+                f"{node_uri} has been send to {''.join(str(r) for r in recipients)}"
+            )
+        else:
+                "Due to empty recipients list, no revocation notification has been sent "
+                f"for shared secret {} for {node_uri}"
+            )
+    async def send_revoke_notification(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        secret_id: str,
+        recipient: jid.JID
+    ) -> None:
+        revoke_elt = domish.Element((NS_OXPS, "revoke"))
+        revoke_elt["jid"] = service.full()
+        revoke_elt["node"] = node
+        revoke_elt["id"] = secret_id
+        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
+        payload_elt.addChild(revoke_elt)
+        openpgp_elt = await self._ox.build_openpgp_element(
+            client, signcrypt_elt, {recipient}
+        )
+        message_elt = domish.Element((None, "message"))
+        message_elt["from"] = client.jid.full()
+        message_elt["to"] = recipient.full()
+        message_elt.addChild((openpgp_elt))
+        self._h.addHintElements(message_elt, [self._h.HINT_STORE])
+        client.send(message_elt)
+    def _ps_secret_share(
+        self,
+        recipient: str,
+        service: str,
+        node: str,
+        secret_ids: List[str],
+        profile_key: str
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.share_secrets(
+      ,
+                jid.JID(recipient),
+                jid.JID(service) if service else None,
+                node,
+                secret_ids or None,
+            )
+        )
+    async def share_secret(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        shared_secret: SharedSecret,
+        recipient: jid.JID
+    ) -> None:
+        """Create and send <shared-secret> element"""
+        if service is None:
+            service = client.jid.userhostJID()
+        shared_secret_elt = domish.Element((NS_OXPS, "shared-secret"))
+        shared_secret_elt["jid"] = service.full()
+        shared_secret_elt["node"] = node
+        shared_secret_elt["id"] =
+        shared_secret_elt["timestamp"] = utils.xmpp_date(shared_secret.timestamp)
+        if shared_secret.revoked:
+            shared_secret_elt["revoked"] = C.BOOL_TRUE
+        # TODO: add type attribute
+        shared_secret_elt.addContent(shared_secret.key)
+        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
+        payload_elt.addChild(shared_secret_elt)
+        openpgp_elt = await self._ox.build_openpgp_element(
+            client, signcrypt_elt, {recipient}
+        )
+        message_elt = domish.Element((None, "message"))
+        message_elt["from"] = client.jid.full()
+        message_elt["to"] = recipient.full()
+        message_elt.addChild((openpgp_elt))
+        self._h.addHintElements(message_elt, [self._h.HINT_STORE])
+        client.send(message_elt)
+        shared_secret.shared_with.add(recipient)
+    async def share_secrets(
+        self,
+        client: SatXMPPEntity,
+        recipient: jid.JID,
+        service: Optional[jid.JID],
+        node: str,
+        secret_ids: Optional[List[str]] = None,
+    ) -> None:
+        """Share secrets of a pubsub node with a recipient
+        @param recipient: who to share secrets with
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param secret_ids: IDs of the secrets to share, or None to share all known secrets
+            (disabled or not)
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            # no secret shared yet, let's generate one
+            shared_secret = self.generate_secret(client)
+            shared_secrets = { shared_secret}
+            await self.store_secrets(client, node_uri, shared_secrets)
+        if secret_ids is None:
+            # we share all secrets of the node
+            to_share = shared_secrets.values()
+        else:
+            try:
+                to_share = [shared_secrets[s_id] for s_id in secret_ids]
+            except KeyError as e:
+                raise exceptions.NotFound(
+                    f"no shared secret found with given ID: {e}"
+                )
+        for shared_secret in to_share:
+            await self.share_secret(client, service, node, shared_secret, recipient)
+        await self.store_secrets(client, node_uri, shared_secrets)
+    def _ps_secret_rotate(
+        self,
+        service: str,
+        node: str,
+        recipients: List[str],
+        profile_key: str,
+    ) -> defer.Deferred:
+        return defer.ensureDeferred(
+            self.rotate_secret(
+      ,
+                jid.JID(service) if service else None,
+                node,
+                [jid.JID(r) for r in recipients] or None
+            )
+        )
+    async def rotate_secret(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        recipients: Optional[List[jid.JID]] = None
+    ) -> None:
+        """Revoke all current known secrets, create and share a new one
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @param recipients: who must receive the new shared secret
+            if None, all recipients known to have last active shared secret will get the
+            new secret
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+        for shared_secret in shared_secrets.values():
+            if not shared_secret.revoked:
+                await self.revoke(client, service, node,
+                shared_secret.revoked = True
+        if recipients is None:
+            if shared_secrets:
+                # we get recipients from latests shared secret's shared_with list,
+                # regarless of deprecation (cause all keys may be deprecated)
+                recipients = list(sorted(
+                    shared_secrets.values(),
+                    key=lambda s: s.timestamp,
+                    reverse=True
+                )[0].shared_with)
+            else:
+                recipients = []
+        shared_secret = self.generate_secret(client)
+        shared_secrets[] = shared_secret
+        # we send notification to last entities known to already have the shared secret
+        for recipient in recipients:
+            await self.share_secret(client, service, node, shared_secret, recipient)
+        await self.store_secrets(client, node_uri, shared_secrets)
+    def _ps_secrets_list(
+        self,
+        service: str,
+        node: str,
+        profile_key: str
+    ) -> defer.Deferred:
+        d = defer.ensureDeferred(
+            self.list_shared_secrets(
+      ,
+                jid.JID(service) if service else None,
+                node,
+            )
+        )
+        d.addCallback(lambda ret: data_format.serialise(ret))
+        return d
+    async def list_shared_secrets(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+    ) -> List[Dict[str, Any]]:
+        """Retrieve for shared secrets of a pubsub node
+        @param service: pubsub/PEP service where the node is
+        @param node: node name
+        @return: shared secrets data
+        @raise exceptions.NotFound: no shared secret found for this node
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            raise exceptions.NotFound(f"No shared secrets found for {node_uri}")
+        return [
+            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
+            for s in shared_secrets.values()
+        ]
+    async def handle_revoke_elt(
+        self,
+        client: SatXMPPEntity,
+        sender: jid.JID,
+        revoke_elt: domish.Element
+    ) -> None:
+        """Parse a <revoke> element and update local secrets
+        @param sender: bare jid of the entity who has signed the secret
+        @param revoke: <revoke/> element
+        """
+        try:
+            service = jid.JID(revoke_elt["jid"])
+            node = revoke_elt["node"]
+            secret_id = revoke_elt["id"]
+        except (KeyError, RuntimeError) as e:
+            log.warning(
+                f"ignoring invalid <revoke> element: {e}\n{revoke_elt.toXml()}"
+            )
+            return
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            log.warning(
+                f"Can't revoke shared secret {secret_id}: no known shared secrets for "
+                f"{node_uri}"
+            )
+            return
+        if any(s.origin != sender for s in shared_secrets.values()):
+            log.warning(
+                f"Rejecting shared secret revocation signed by invalid entity ({sender}):"
+                f"\n{revoke_elt.toXml}"
+            )
+            return
+        try:
+            shared_secret = shared_secrets[secret_id]
+        except KeyError:
+            log.warning(
+                f"Can't revoke shared secret {secret_id}: this secret ID is unknown for "
+                f"{node_uri}"
+            )
+            return
+        shared_secret.revoked = True
+        await self.store_secrets(client, node_uri, shared_secrets)
+"Shared secret {secret_id} has been revoked for {node_uri}")
+    async def handle_shared_secret_elt(
+        self,
+        client: SatXMPPEntity,
+        sender: jid.JID,
+        shared_secret_elt: domish.Element
+    ) -> None:
+        """Parse a <shared-secret> element and update local secrets
+        @param sender: bare jid of the entity who has signed the secret
+        @param shared_secret_elt: <shared-secret/> element
+        """
+        try:
+            service = jid.JID(shared_secret_elt["jid"])
+            node = shared_secret_elt["node"]
+            secret_id = shared_secret_elt["id"]
+            timestamp = utils.parse_xmpp_date(shared_secret_elt["timestamp"])
+            # TODO: handle "type" attribute
+            revoked = C.bool(shared_secret_elt.getAttribute("revoked", C.BOOL_FALSE))
+        except (KeyError, RuntimeError, ValueError) as e:
+            log.warning(
+                f"ignoring invalid <shared-secret> element: "
+                f"{e}\n{shared_secret_elt.toXml()}"
+            )
+            return
+        key = str(shared_secret_elt)
+        if not key:
+            log.warning(
+                "ignoring <shared-secret> element with empty key: "
+                f"{shared_secret_elt.toXml()}"
+            )
+            return
+        shared_secret = SharedSecret(
+            id=secret_id, key=key, timestamp=timestamp, origin=sender, revoked=revoked
+        )
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+            # no known shared secret yet for this node, we have to trust first user who
+            # send it
+        else:
+            if any(s.origin != sender for s in shared_secrets.values()):
+                log.warning(
+                    f"Rejecting shared secret signed by invalid entity ({sender}):\n"
+                    f"{shared_secret_elt.toXml}"
+                )
+                return
+        shared_secrets[] = shared_secret
+        await self.store_secrets(client, node_uri, shared_secrets)
+            f"shared secret {} added for {node_uri} [{client.profile}]"
+        )
+    async def _publish_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        items: Optional[List[domish.Element]],
+        options: Optional[dict],
+        sender: jid.JID,
+        extra: Dict[str, Any]
+    ) -> bool:
+        if not items or not extra.get("encrypted"):
+            return True
+        node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+        shared_secrets = await self.load_secrets(client, node_uri)
+        if shared_secrets is None:
+            shared_secrets = {}
+            shared_secret = None
+        else:
+            current_secrets = [s for s in shared_secrets.values() if not s.revoked]
+            if not current_secrets:
+                shared_secret = None
+            elif len(current_secrets) > 1:
+                log.warning(
+                    f"more than one active shared secret found for node {node!r} at "
+                    f"{service}, using the most recent one"
+                )
+                current_secrets.sort(key=lambda s: s.timestamp, reverse=True)
+                shared_secret = current_secrets[0]
+            else:
+                shared_secret = current_secrets[0]
+        if shared_secret is None:
+            if any(s.origin != client.jid.userhostJID() for s in shared_secrets.values()):
+                raise exceptions.PermissionError(
+                    "there is no known active shared secret, and you are not the "
+                    "creator of previous shared secrets, we can't encrypt items at "
+                    f"{node_uri} ."
+                )
+            shared_secret = self.generate_secret(client)
+            shared_secrets[] = shared_secret
+            await self.store_secrets(client, node_uri, shared_secrets)
+            # TODO: notify other entities
+        for item in items:
+            item_elts = list(item.elements())
+            if len(item_elts) != 1:
+                raise ValueError(
+                    f"there should be exactly one item payload: {item.toXml()}"
+                )
+            item_payload = item_elts[0]
+            log.debug(f"encrypting item {item.getAttribute('id', '')}")
+            encrypted_item = self.gpg_provider.encrypt_symmetrically(
+                item_payload.toXml().encode(), shared_secret.key
+            )
+            item.children.clear()
+            encrypted_elt = domish.Element((NS_OXPS, "encrypted"))
+            encrypted_elt["key"] =
+            encrypted_elt.addContent(base64.b64encode(encrypted_item).decode())
+            item.addChild(encrypted_elt)
+        return True
+    async def _items_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        items: List[domish.Element],
+        rsm_response: rsm.RSMResponse,
+        extra: Dict[str, Any],
+    ) -> None:
+        if not extra.get(C.KEY_DECRYPT, True):
+            return
+        if service is None:
+            service = client.jid.userhostJID()
+        shared_secrets = None
+        for item in items:
+            payload = item.firstChildElement()
+            if (payload is not None
+                and == "encrypted"
+                and payload.uri == NS_OXPS):
+                encrypted_elt = payload
+                secret_id = encrypted_elt.getAttribute("key")
+                if not secret_id:
+                    log.warning(
+                        f'"key" attribute is missing from encrypted item: {item.toXml()}'
+                    )
+                    continue
+                if shared_secrets is None:
+                    node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
+                    shared_secrets = await self.load_secrets(client, node_uri)
+                    if shared_secrets is None:
+                        log.warning(
+                            f"No known shared secret for {node_uri}, can't decrypt"
+                        )
+                        return
+                try:
+                    shared_secret = shared_secrets[secret_id]
+                except KeyError:
+                    log.warning(
+                        f"No key known for encrypted item {item['id']!r} (shared secret "
+                        f"id: {secret_id!r})"
+                    )
+                    continue
+                log.debug(f"decrypting item {item.getAttribute('id', '')}")
+                decrypted = self.gpg_provider.decrypt_symmetrically(
+                    base64.b64decode(str(encrypted_elt)),
+                    shared_secret.key
+                )
+                decrypted_elt = xml_tools.parse(decrypted)
+                item.children.clear()
+                item.addChild(decrypted_elt)
+    async def _message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        sender = jid.JID(message_elt["from"]).userhostJID()
+        # there may be an openpgp element if OXIM is not activate, in this case we have to
+        # decrypt it here
+        openpgp_elt = next(message_elt.elements(NS_OX, "openpgp"), None)
+        if openpgp_elt is not None:
+            try:
+                payload_elt, __ = await self._ox.unpack_openpgp_element(
+                    client,
+                    openpgp_elt,
+                    "signcrypt",
+                    sender
+                )
+            except Exception as e:
+                log.warning(f"Can't decrypt element: {e}\n{message_elt.toXml()}")
+                return False
+            message_elt.children.remove(openpgp_elt)
+            for c in payload_elt.children:
+                message_elt.addChild(c)
+        shared_secret_elt = next(message_elt.elements(NS_OXPS, "shared-secret"), None)
+        if shared_secret_elt is None:
+            # no <shared-secret>, we check if there is a <revoke> element
+            revoke_elt = next(message_elt.elements(NS_OXPS, "revoke"), None)
+            if revoke_elt is None:
+                return True
+            else:
+                await self.handle_revoke_elt(client, sender, revoke_elt)
+        else:
+            await self.handle_shared_secret_elt(client, sender, shared_secret_elt)
+        return False
+class PubsubEncryption_Handler(xmlstream.XMPPHandler):
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_OXPS)]
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []
--- a/sat/plugins/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/plugins/	Sat Oct 15 20:36:53 2022 +0200
@@ -19,7 +19,7 @@
 from collections import namedtuple
 from functools import reduce
-from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, Callable
+from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
 import urllib.error
 import urllib.parse
 import urllib.request
@@ -658,25 +658,33 @@
         nodeIdentifier: str,
         items: Optional[List[domish.Element]] = None,
         options: Optional[dict] = None,
-        sender: Optional[jid.JID] = None
+        sender: Optional[jid.JID] = None,
+        extra: Optional[Dict[str, Any]] = None
     ) -> domish.Element:
         """Publish pubsub items
         @param sender: sender of the request,
             client.jid will be used if nto set
+        @param extra: extra data
+            not used directly by ``publish``, but may be used in triggers
         @return: IQ result stanza
+        @trigger XEP-0060_publish: called just before publication.
+            if it returns False, extra must have a "iq_result_elt" key set with
+            domish.Element to return.
         if sender is None:
             sender = client.jid
+        if extra is None:
+            extra = {}
+        if not await
+            "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender,
+            extra
+        ):
+            return extra["iq_result_elt"]
         iq_result_elt = await client.pubsub_client.publish(
             service, nodeIdentifier, items, sender,
-        await
-            "XEP-0060_publish", client, service, nodeIdentifier, items, options,
-            iq_result_elt
-        )
         return iq_result_elt
     def _unwrapMAMMessage(self, message_elt):
@@ -768,7 +776,7 @@
             mam_query = extra["mam"]
         except KeyError:
-            d = client.pubsub_client.items(
+            d = defer.ensureDeferred(client.pubsub_client.items(
                 service = service,
                 nodeIdentifier = node,
                 maxItems = max_items,
@@ -776,8 +784,9 @@
                 sender = None,
                 itemIdentifiers = item_ids,
                 orderBy = extra.get(C.KEY_ORDER_BY),
-                rsm_request = rsm_request
-            )
+                rsm_request = rsm_request,
+                extra = extra
+            ))
             # we have no MAM data here, so we add None
             d.addTimeout(TIMEOUT, reactor)
@@ -1652,6 +1661,32 @@
     def connectionInitialized(self):
+    async def items(
+        self,
+        service: Optional[jid.JID],
+        nodeIdentifier: str,
+        maxItems: Optional[int] = None,
+        subscriptionIdentifier: Optional[str] = None,
+        sender: Optional[jid.JID] = None,
+        itemIdentifiers: Optional[Set[str]] = None,
+        orderBy: Optional[List[str]] = None,
+        rsm_request: Optional[rsm.RSMRequest] = None,
+        extra: Optional[Dict[str, Any]] = None,
+    ):
+        if extra is None:
+            extra = {}
+        items, rsm_response = await super().items(
+            service, nodeIdentifier, maxItems, subscriptionIdentifier, sender,
+            itemIdentifiers, orderBy, rsm_request
+        )
+        # items must be returned, thus this async point can't stop the workflow (but it
+        # can modify returned items)
+        await
+            "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response,
+            extra
+        )
+        return items, rsm_response
     def _getNodeCallbacks(self, node, event):
         """Generate callbacks from given node and event
--- a/sat/plugins/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/plugins/	Sat Oct 15 20:36:53 2022 +0200
@@ -983,7 +983,9 @@
             return None
-        await self._p.publish(client, service, node, [item])
+        extra = {"encrypted": True} if data.get("encrypted") else None
+        await self._p.publish(client, service, node, [item], extra=extra)
         return item_id
     def _mbRepeat(
--- a/sat/plugins/	Tue Sep 20 16:22:18 2022 +0200
+++ b/sat/plugins/	Sat Oct 15 20:36:53 2022 +0200
@@ -35,7 +35,6 @@
 from wokkel import disco, iwokkel
 from zope.interface import implementer
-from build.lib.sat.plugins.plugin_xep_0054 import IMPORT_NAME
 from sat.core import exceptions
 from sat.core.constants import Const as C
 from sat.core.core_types import SatXMPPEntity