view libervia/backend/plugins/plugin_sec_oxps.py @ 4139:6745c6bd4c7a

frontends (tools): webrtc implementation: this is a factored implementation usable by all non-web frontends. Sources and Sinks can be configured easily to use tests source or local webcam/microphone, autosinks or a `appsink` that the frontend will use. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:03:36 +0100
parents 4b842c1fb686
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Pubsub Encryption
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
import dataclasses
import secrets
import time
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from collections import OrderedDict

import shortuuid
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from wokkel import rsm
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.memory import persistent
from libervia.backend.tools import utils
from libervia.backend.tools import xml_tools
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common import uri
from libervia.backend.tools.common.async_utils import async_lru

from .plugin_xep_0373 import NS_OX, get_gpg_provider


log = getLogger(__name__)

IMPORT_NAME = "OXPS"

PLUGIN_INFO = {
    C.PI_NAME: "OpenPGP for XMPP Pubsub",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0334", "XEP-0373"],
    C.PI_MAIN: "PubsubEncryption",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Pubsub e2e encryption via OpenPGP"""),
}
NS_OXPS = "urn:xmpp:openpgp:pubsub:0"

KEY_REVOKED = "revoked"
CACHE_MAX = 5


@dataclasses.dataclass
class SharedSecret:
    id: str
    key: str
    timestamp: float
    # bare JID of who has generated the secret
    origin: jid.JID
    revoked: bool = False
    shared_with: Set[jid.JID] = dataclasses.field(default_factory=set)


class PubsubEncryption:
    namespace = NS_OXPS

    def __init__(self, host):
        log.info(_("OpenPGP for XMPP Pubsub plugin initialization"))
        host.register_namespace("oxps", NS_OXPS)
        self.host = host
        self._p = host.plugins["XEP-0060"]
        self._h = host.plugins["XEP-0334"]
        self._ox = host.plugins["XEP-0373"]
        host.trigger.add("XEP-0060_publish", self._publish_trigger)
        host.trigger.add("XEP-0060_items", self._items_trigger)
        host.trigger.add(
            "message_received",
            self._message_received_trigger,
        )
        host.bridge.add_method(
            "ps_secret_share",
            ".plugin",
            in_sign="sssass",
            out_sign="",
            method=self._ps_secret_share,
            async_=True,
        )
        host.bridge.add_method(
            "ps_secret_revoke",
            ".plugin",
            in_sign="sssass",
            out_sign="",
            method=self._ps_secret_revoke,
            async_=True,
        )
        host.bridge.add_method(
            "ps_secret_rotate",
            ".plugin",
            in_sign="ssass",
            out_sign="",
            method=self._ps_secret_rotate,
            async_=True,
        )
        host.bridge.add_method(
            "ps_secrets_list",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._ps_secrets_list,
            async_=True,
        )

    def get_handler(self, client):
        return PubsubEncryption_Handler()

    async def profile_connecting(self, client):
        client.__storage = persistent.LazyPersistentBinaryDict(
            IMPORT_NAME, client.profile
        )
        # cache to avoid useless DB access, and to avoid race condition by ensuring that
        # the same shared_secrets instance is always used for a given node.
        client.__cache = OrderedDict()
        self.gpg_provider = get_gpg_provider(self.host, client)

    async def load_secrets(
        self,
        client: SatXMPPEntity,
        node_uri: str
    ) -> Optional[Dict[str, SharedSecret]]:
        """Load shared secret from databse or cache

        A cache is used per client to avoid usueless db access, as shared secrets are
        often needed several times in a row. Cache is also necessary to avoir race
        condition, when updating a secret, by ensuring that the same instance is used
        for all updates during a session.

        @param node_uri: XMPP URI of the encrypted pubsub node
        @return shared secrets, or None if no secrets are known yet
        """
        try:
            shared_secrets = client.__cache[node_uri]
        except KeyError:
            pass
        else:
            client.__cache.move_to_end(node_uri)
            return shared_secrets

        secrets_as_dict = await client.__storage.get(node_uri)

        if secrets_as_dict is None:
            return None
        else:
            shared_secrets = {
                s["id"]: SharedSecret(
                    id=s["id"],
                    key=s["key"],
                    timestamp=s["timestamp"],
                    origin=jid.JID(s["origin"]),
                    revoked=s["revoked"],
                    shared_with={jid.JID(w) for w in s["shared_with"]}
                ) for s in secrets_as_dict
            }
            client.__cache[node_uri] = shared_secrets
            while len(client.__cache) > CACHE_MAX:
                client.__cache.popitem(False)
            return shared_secrets

    def __secrect_dict_factory(self, data: List[Tuple[str, Any]]) -> Dict[str, Any]:
        ret = {}
        for k, v in data:
            if k == "origin":
                v = v.full()
            elif k == "shared_with":
                v = [j.full() for j in v]
            ret[k] = v
        return ret

    async def store_secrets(
        self,
        client: SatXMPPEntity,
        node_uri: str,
        shared_secrets: Dict[str, SharedSecret]
    ) -> None:
        """Store shared secrets to database

        Shared secrets are serialised before being stored.
        If ``node_uri`` is not in cache, the shared_secrets instance is also put in cache/

        @param node_uri: XMPP URI of the encrypted pubsub node
        @param shared_secrets: shared secrets to store
        """
        if node_uri not in client.__cache:
            client.__cache[node_uri] = shared_secrets
            while len(client.__cache) > CACHE_MAX:
                client.__cache.popitem(False)

        secrets_as_dict = [
            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
            for s in shared_secrets.values()
        ]
        await client.__storage.aset(node_uri, secrets_as_dict)

    def generate_secret(self, client: SatXMPPEntity) -> SharedSecret:
        """Generate a new shared secret"""
        log.info("Generating a new shared secret.")
        secret_key = secrets.token_urlsafe(64)
        secret_id = shortuuid.uuid()
        return SharedSecret(
            id = secret_id,
            key = secret_key,
            timestamp = time.time(),
            origin = client.jid.userhostJID()
        )

    def _ps_secret_revoke(
        self,
        service: str,
        node: str,
        secret_id: str,
        recipients: List[str],
        profile_key: str
    ) -> defer.Deferred:
        return defer.ensureDeferred(
            self.revoke(
                self.host.get_client(profile_key),
                jid.JID(service) if service else None,
                node,
                secret_id,
                [jid.JID(r) for r in recipients] or None,
            )
        )

    async def revoke(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        secret_id: str,
        recipients: Optional[Iterable[jid.JID]] = None
    ) -> None:
        """Revoke a secret and notify entities

        @param service: pubsub/PEP service where the node is
        @param node: node name
        @param secret_id: ID of the secret to revoke (must have been generated by
            ourselves)
        recipients: JIDs of entities to send the revocation notice to. If None, all
            entities known to have the shared secret will be notified.
            Use empty list if you don't want to notify anybody (not recommended)
        """
        if service is None:
            service = client.jid.userhostJID()
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if not shared_secrets:
            raise exceptions.NotFound(f"No shared secret is known for {node_uri}")
        try:
            shared_secret = shared_secrets[secret_id]
        except KeyError:
            raise exceptions.NotFound(
                f"No shared secret with ID {secret_id!r} has been found for {node_uri}"
            )
        else:
            if shared_secret.origin != client.jid.userhostJID():
                raise exceptions.PermissionError(
                    f"The shared secret {shared_secret.id} originate from "
                    f"{shared_secret.origin}, not you ({client.jid.userhostJID()}). You "
                    "can't revoke it"
                )
            shared_secret.revoked = True
        await self.store_secrets(client, node_uri, shared_secrets)
        log.info(
            f"shared secret {secret_id!r} for {node_uri} has been revoked."
        )
        if recipients is None:
            recipients = shared_secret.shared_with
        if recipients:
            for recipient in recipients:
                await self.send_revoke_notification(
                    client, service, node, shared_secret.id, recipient
                )
            log.info(
                f"shared secret {shared_secret.id} revocation notification for "
                f"{node_uri} has been send to {''.join(str(r) for r in recipients)}"
            )
        else:
            log.info(
                "Due to empty recipients list, no revocation notification has been sent "
                f"for shared secret {shared_secret.id} for {node_uri}"
            )

    async def send_revoke_notification(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        secret_id: str,
        recipient: jid.JID
    ) -> None:
        revoke_elt = domish.Element((NS_OXPS, "revoke"))
        revoke_elt["jid"] = service.full()
        revoke_elt["node"] = node
        revoke_elt["id"] = secret_id
        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
        payload_elt.addChild(revoke_elt)
        openpgp_elt = await self._ox.build_openpgp_element(
            client, signcrypt_elt, {recipient}
        )
        message_elt = domish.Element((None, "message"))
        message_elt["from"] = client.jid.full()
        message_elt["to"] = recipient.full()
        message_elt.addChild((openpgp_elt))
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)

    def _ps_secret_share(
        self,
        recipient: str,
        service: str,
        node: str,
        secret_ids: List[str],
        profile_key: str
    ) -> defer.Deferred:
        return defer.ensureDeferred(
            self.share_secrets(
                self.host.get_client(profile_key),
                jid.JID(recipient),
                jid.JID(service) if service else None,
                node,
                secret_ids or None,
            )
        )

    async def share_secret(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        shared_secret: SharedSecret,
        recipient: jid.JID
    ) -> None:
        """Create and send <shared-secret> element"""
        if service is None:
            service = client.jid.userhostJID()
        shared_secret_elt = domish.Element((NS_OXPS, "shared-secret"))
        shared_secret_elt["jid"] = service.full()
        shared_secret_elt["node"] = node
        shared_secret_elt["id"] = shared_secret.id
        shared_secret_elt["timestamp"] = utils.xmpp_date(shared_secret.timestamp)
        if shared_secret.revoked:
            shared_secret_elt["revoked"] = C.BOOL_TRUE
        # TODO: add type attribute
        shared_secret_elt.addContent(shared_secret.key)
        signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
        payload_elt.addChild(shared_secret_elt)
        openpgp_elt = await self._ox.build_openpgp_element(
            client, signcrypt_elt, {recipient}
        )
        message_elt = domish.Element((None, "message"))
        message_elt["from"] = client.jid.full()
        message_elt["to"] = recipient.full()
        message_elt.addChild((openpgp_elt))
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)
        shared_secret.shared_with.add(recipient)

    async def share_secrets(
        self,
        client: SatXMPPEntity,
        recipient: jid.JID,
        service: Optional[jid.JID],
        node: str,
        secret_ids: Optional[List[str]] = None,
    ) -> None:
        """Share secrets of a pubsub node with a recipient

        @param recipient: who to share secrets with
        @param service: pubsub/PEP service where the node is
        @param node: node name
        @param secret_ids: IDs of the secrets to share, or None to share all known secrets
            (disabled or not)
        """
        if service is None:
            service = client.jid.userhostJID()
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            # no secret shared yet, let's generate one
            shared_secret = self.generate_secret(client)
            shared_secrets = {shared_secret.id: shared_secret}
            await self.store_secrets(client, node_uri, shared_secrets)
        if secret_ids is None:
            # we share all secrets of the node
            to_share = shared_secrets.values()
        else:
            try:
                to_share = [shared_secrets[s_id] for s_id in secret_ids]
            except KeyError as e:
                raise exceptions.NotFound(
                    f"no shared secret found with given ID: {e}"
                )
        for shared_secret in to_share:
            await self.share_secret(client, service, node, shared_secret, recipient)
        await self.store_secrets(client, node_uri, shared_secrets)

    def _ps_secret_rotate(
        self,
        service: str,
        node: str,
        recipients: List[str],
        profile_key: str,
    ) -> defer.Deferred:
        return defer.ensureDeferred(
            self.rotate_secret(
                self.host.get_client(profile_key),
                jid.JID(service) if service else None,
                node,
                [jid.JID(r) for r in recipients] or None
            )
        )

    async def rotate_secret(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        recipients: Optional[List[jid.JID]] = None
    ) -> None:
        """Revoke all current known secrets, create and share a new one

        @param service: pubsub/PEP service where the node is
        @param node: node name
        @param recipients: who must receive the new shared secret
            if None, all recipients known to have last active shared secret will get the
            new secret
        """
        if service is None:
            service = client.jid.userhostJID()
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            shared_secrets = {}
        for shared_secret in shared_secrets.values():
            if not shared_secret.revoked:
                await self.revoke(client, service, node, shared_secret.id)
                shared_secret.revoked = True

        if recipients is None:
            if shared_secrets:
                # we get recipients from latests shared secret's shared_with list,
                # regarless of deprecation (cause all keys may be deprecated)
                recipients = list(sorted(
                    shared_secrets.values(),
                    key=lambda s: s.timestamp,
                    reverse=True
                )[0].shared_with)
            else:
                recipients = []

        shared_secret = self.generate_secret(client)
        shared_secrets[shared_secret.id] = shared_secret
        # we send notification to last entities known to already have the shared secret
        for recipient in recipients:
            await self.share_secret(client, service, node, shared_secret, recipient)
        await self.store_secrets(client, node_uri, shared_secrets)

    def _ps_secrets_list(
        self,
        service: str,
        node: str,
        profile_key: str
    ) -> defer.Deferred:
        d = defer.ensureDeferred(
            self.list_shared_secrets(
                self.host.get_client(profile_key),
                jid.JID(service) if service else None,
                node,
            )
        )
        d.addCallback(lambda ret: data_format.serialise(ret))
        return d

    async def list_shared_secrets(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
    ) -> List[Dict[str, Any]]:
        """Retrieve for shared secrets of a pubsub node

        @param service: pubsub/PEP service where the node is
        @param node: node name
        @return: shared secrets data
        @raise exceptions.NotFound: no shared secret found for this node
        """
        if service is None:
            service = client.jid.userhostJID()
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            raise exceptions.NotFound(f"No shared secrets found for {node_uri}")
        return [
            dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
            for s in shared_secrets.values()
        ]

    async def handle_revoke_elt(
        self,
        client: SatXMPPEntity,
        sender: jid.JID,
        revoke_elt: domish.Element
    ) -> None:
        """Parse a <revoke> element and update local secrets

        @param sender: bare jid of the entity who has signed the secret
        @param revoke: <revoke/> element
        """
        try:
            service = jid.JID(revoke_elt["jid"])
            node = revoke_elt["node"]
            secret_id = revoke_elt["id"]
        except (KeyError, RuntimeError) as e:
            log.warning(
                f"ignoring invalid <revoke> element: {e}\n{revoke_elt.toXml()}"
            )
            return
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            log.warning(
                f"Can't revoke shared secret {secret_id}: no known shared secrets for "
                f"{node_uri}"
            )
            return

        if any(s.origin != sender for s in shared_secrets.values()):
            log.warning(
                f"Rejecting shared secret revocation signed by invalid entity ({sender}):"
                f"\n{revoke_elt.toXml}"
            )
            return

        try:
            shared_secret = shared_secrets[secret_id]
        except KeyError:
            log.warning(
                f"Can't revoke shared secret {secret_id}: this secret ID is unknown for "
                f"{node_uri}"
            )
            return

        shared_secret.revoked = True
        await self.store_secrets(client, node_uri, shared_secrets)
        log.info(f"Shared secret {secret_id} has been revoked for {node_uri}")

    async def handle_shared_secret_elt(
        self,
        client: SatXMPPEntity,
        sender: jid.JID,
        shared_secret_elt: domish.Element
    ) -> None:
        """Parse a <shared-secret> element and update local secrets

        @param sender: bare jid of the entity who has signed the secret
        @param shared_secret_elt: <shared-secret/> element
        """
        try:
            service = jid.JID(shared_secret_elt["jid"])
            node = shared_secret_elt["node"]
            secret_id = shared_secret_elt["id"]
            timestamp = utils.parse_xmpp_date(shared_secret_elt["timestamp"])
            # TODO: handle "type" attribute
            revoked = C.bool(shared_secret_elt.getAttribute("revoked", C.BOOL_FALSE))
        except (KeyError, RuntimeError, ValueError) as e:
            log.warning(
                f"ignoring invalid <shared-secret> element: "
                f"{e}\n{shared_secret_elt.toXml()}"
            )
            return
        key = str(shared_secret_elt)
        if not key:
            log.warning(
                "ignoring <shared-secret> element with empty key: "
                f"{shared_secret_elt.toXml()}"
            )
            return
        shared_secret = SharedSecret(
            id=secret_id, key=key, timestamp=timestamp, origin=sender, revoked=revoked
        )
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            shared_secrets = {}
            # no known shared secret yet for this node, we have to trust first user who
            # send it
        else:
            if any(s.origin != sender for s in shared_secrets.values()):
                log.warning(
                    f"Rejecting shared secret signed by invalid entity ({sender}):\n"
                    f"{shared_secret_elt.toXml}"
                )
                return

        shared_secrets[shared_secret.id] = shared_secret
        await self.store_secrets(client, node_uri, shared_secrets)
        log.info(
            f"shared secret {shared_secret.id} added for {node_uri} [{client.profile}]"
        )

    async def _publish_trigger(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        items: Optional[List[domish.Element]],
        options: Optional[dict],
        sender: jid.JID,
        extra: Dict[str, Any]
    ) -> bool:
        if not items or not extra.get("encrypted"):
            return True
        node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
        shared_secrets = await self.load_secrets(client, node_uri)
        if shared_secrets is None:
            shared_secrets = {}
            shared_secret = None
        else:
            current_secrets = [s for s in shared_secrets.values() if not s.revoked]
            if not current_secrets:
                shared_secret = None
            elif len(current_secrets) > 1:
                log.warning(
                    f"more than one active shared secret found for node {node!r} at "
                    f"{service}, using the most recent one"
                )
                current_secrets.sort(key=lambda s: s.timestamp, reverse=True)
                shared_secret = current_secrets[0]
            else:
                shared_secret = current_secrets[0]

        if shared_secret is None:
            if any(s.origin != client.jid.userhostJID() for s in shared_secrets.values()):
                raise exceptions.PermissionError(
                    "there is no known active shared secret, and you are not the "
                    "creator of previous shared secrets, we can't encrypt items at "
                    f"{node_uri} ."
                )
            shared_secret = self.generate_secret(client)
            shared_secrets[shared_secret.id] = shared_secret
            await self.store_secrets(client, node_uri, shared_secrets)
            # TODO: notify other entities

        for item in items:
            item_elts = list(item.elements())
            if len(item_elts) != 1:
                raise ValueError(
                    f"there should be exactly one item payload: {item.toXml()}"
                )
            item_payload = item_elts[0]
            log.debug(f"encrypting item {item.getAttribute('id', '')}")
            encrypted_item = self.gpg_provider.encrypt_symmetrically(
                item_payload.toXml().encode(), shared_secret.key
            )
            item.children.clear()
            encrypted_elt = domish.Element((NS_OXPS, "encrypted"))
            encrypted_elt["key"] = shared_secret.id
            encrypted_elt.addContent(base64.b64encode(encrypted_item).decode())
            item.addChild(encrypted_elt)

        return True

    async def _items_trigger(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        items: List[domish.Element],
        rsm_response: rsm.RSMResponse,
        extra: Dict[str, Any],
    ) -> bool:
        if not extra.get(C.KEY_DECRYPT, True):
            return True
        if service is None:
            service = client.jid.userhostJID()
        shared_secrets = None
        for item in items:
            payload = item.firstChildElement()
            if (payload is not None
                and payload.name == "encrypted"
                and payload.uri == NS_OXPS):
                encrypted_elt = payload
                secret_id = encrypted_elt.getAttribute("key")
                if not secret_id:
                    log.warning(
                        f'"key" attribute is missing from encrypted item: {item.toXml()}'
                    )
                    continue
                if shared_secrets is None:
                    node_uri = uri.build_xmpp_uri("pubsub", path=service.full(), node=node)
                    shared_secrets = await self.load_secrets(client, node_uri)
                    if shared_secrets is None:
                        log.warning(
                            f"No known shared secret for {node_uri}, can't decrypt"
                        )
                        return True
                try:
                    shared_secret = shared_secrets[secret_id]
                except KeyError:
                    log.warning(
                        f"No key known for encrypted item {item['id']!r} (shared secret "
                        f"id: {secret_id!r})"
                    )
                    continue
                log.debug(f"decrypting item {item.getAttribute('id', '')}")
                decrypted = self.gpg_provider.decrypt_symmetrically(
                    base64.b64decode(str(encrypted_elt)),
                    shared_secret.key
                )
                decrypted_elt = xml_tools.parse(decrypted)
                item.children.clear()
                item.addChild(decrypted_elt)
                extra.setdefault("encrypted", {})[item["id"]] = {"type": NS_OXPS}
        return True

    async def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        sender = jid.JID(message_elt["from"]).userhostJID()
        # there may be an openpgp element if OXIM is not activate, in this case we have to
        # decrypt it here
        openpgp_elt = next(message_elt.elements(NS_OX, "openpgp"), None)
        if openpgp_elt is not None:
            try:
                payload_elt, __ = await self._ox.unpack_openpgp_element(
                    client,
                    openpgp_elt,
                    "signcrypt",
                    sender
                )
            except Exception as e:
                log.warning(f"Can't decrypt element: {e}\n{message_elt.toXml()}")
                return False
            message_elt.children.remove(openpgp_elt)
            for c in payload_elt.children:
                message_elt.addChild(c)

        shared_secret_elt = next(message_elt.elements(NS_OXPS, "shared-secret"), None)
        if shared_secret_elt is None:
            # no <shared-secret>, we check if there is a <revoke> element
            revoke_elt = next(message_elt.elements(NS_OXPS, "revoke"), None)
            if revoke_elt is None:
                return True
            else:
                await self.handle_revoke_elt(client, sender, revoke_elt)
        else:
            await self.handle_shared_secret_elt(client, sender, shared_secret_elt)

        return False


@implementer(iwokkel.IDisco)
class PubsubEncryption_Handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_OXPS)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []