Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_sec_pte.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_sec_pte.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_sec_pte.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Pubsub Targeted Encryption +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Any, Dict, List, Optional + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, xmlstream +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from wokkel import rsm +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger + + +log = getLogger(__name__) + +IMPORT_NAME = "PTE" + +PLUGIN_INFO = { + C.PI_NAME: "Pubsub Targeted Encryption", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_XEP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0384"], + C.PI_MAIN: "PTE", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Encrypt some items to specific entities"""), +} +NS_PTE = "urn:xmpp:pte:0" + + +class PTE: + namespace = NS_PTE + + def __init__(self, host): + log.info(_("Pubsub Targeted Encryption plugin initialization")) + host.register_namespace("pte", NS_PTE) + self.host = host + self._o = host.plugins["XEP-0384"] + host.trigger.add("XEP-0060_publish", self._publish_trigger) + host.trigger.add("XEP-0060_items", self._items_trigger) + + def get_handler(self, client): + return PTE_Handler() + + async def _publish_trigger( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + items: Optional[List[domish.Element]], + options: Optional[dict], + sender: jid.JID, + extra: Dict[str, Any] + ) -> bool: + if not items or extra.get("encrypted_for") is None: + return True + encrypt_data = extra["encrypted_for"] + try: + targets = {jid.JID(t) for t in encrypt_data["targets"]} + except (KeyError, RuntimeError): + raise exceptions.DataError(f"Invalid encryption data: {encrypt_data}") + for item in items: + log.debug( + f"encrypting item {item.getAttribute('id', '')} for " + f"{', '.join(t.full() for t in targets)}" + ) + encryption_type = encrypt_data.get("type", self._o.NS_TWOMEMO) + if encryption_type != self._o.NS_TWOMEMO: + raise NotImplementedError("only TWOMEMO is supported for now") + await self._o.encrypt( + client, + self._o.NS_TWOMEMO, + item, + targets, + is_muc_message=False, + stanza_id=None + ) + item_elts = list(item.elements()) + if len(item_elts) != 1: + raise ValueError( + f"there should be exactly one item payload: {item.toXml()}" + ) + encrypted_payload = item_elts[0] + item.children.clear() + encrypted_elt = item.addElement((NS_PTE, "encrypted")) + encrypted_elt["by"] = sender.userhost() + encrypted_elt["type"] = encryption_type + encrypted_elt.addChild(encrypted_payload) + + return True + + async def _items_trigger( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: str, + items: List[domish.Element], + rsm_response: rsm.RSMResponse, + extra: Dict[str, Any], + ) -> bool: + if not extra.get(C.KEY_DECRYPT, True): + return True + if service is None: + service = client.jid.userhostJID() + for item in items: + payload = item.firstChildElement() + if (payload is not None + and payload.name == "encrypted" + and payload.uri == NS_PTE): + encrypted_elt = payload + item.children.clear() + try: + encryption_type = encrypted_elt.getAttribute("type") + encrypted_by = jid.JID(encrypted_elt["by"]) + except (KeyError, RuntimeError): + raise exceptions.DataError( + f"invalid <encrypted> element: {encrypted_elt.toXml()}" + ) + if encryption_type!= self._o.NS_TWOMEMO: + raise NotImplementedError("only TWOMEMO is supported for now") + log.debug(f"decrypting item {item.getAttribute('id', '')}") + + # FIXME: we do use _message_received_trigger now to decrypt the stanza, a + # cleaner separated decrypt method should be used + encrypted_elt["from"] = encrypted_by.full() + if not await self._o._message_received_trigger( + client, + encrypted_elt, + defer.Deferred() + ) or not encrypted_elt.children: + raise exceptions.EncryptionError("can't decrypt the message") + + item.addChild(encrypted_elt.firstChildElement()) + + extra.setdefault("encrypted", {})[item["id"]] = { + "type": NS_PTE, + "algorithm": encryption_type + } + return True + + +@implementer(iwokkel.IDisco) +class PTE_Handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [disco.DiscoFeature(NS_PTE)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []