diff libervia/backend/plugins/plugin_sec_pte.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_sec_pte.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_sec_pte.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Pubsub Targeted Encryption
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Any, Dict, List, Optional
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from wokkel import rsm
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "PTE"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Pubsub Targeted Encryption",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0384"],
+    C.PI_MAIN: "PTE",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Encrypt some items to specific entities"""),
+}
+NS_PTE = "urn:xmpp:pte:0"
+
+
+class PTE:
+    namespace = NS_PTE
+
+    def __init__(self, host):
+        log.info(_("Pubsub Targeted Encryption plugin initialization"))
+        host.register_namespace("pte", NS_PTE)
+        self.host = host
+        self._o = host.plugins["XEP-0384"]
+        host.trigger.add("XEP-0060_publish", self._publish_trigger)
+        host.trigger.add("XEP-0060_items", self._items_trigger)
+
+    def get_handler(self, client):
+        return PTE_Handler()
+
+    async def _publish_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        items: Optional[List[domish.Element]],
+        options: Optional[dict],
+        sender: jid.JID,
+        extra: Dict[str, Any]
+    ) -> bool:
+        if not items or extra.get("encrypted_for") is None:
+            return True
+        encrypt_data = extra["encrypted_for"]
+        try:
+            targets = {jid.JID(t) for t in encrypt_data["targets"]}
+        except (KeyError, RuntimeError):
+            raise exceptions.DataError(f"Invalid encryption data: {encrypt_data}")
+        for item in items:
+            log.debug(
+                f"encrypting item {item.getAttribute('id', '')} for "
+                f"{', '.join(t.full() for t in targets)}"
+            )
+            encryption_type = encrypt_data.get("type", self._o.NS_TWOMEMO)
+            if encryption_type != self._o.NS_TWOMEMO:
+                raise NotImplementedError("only TWOMEMO is supported for now")
+            await self._o.encrypt(
+                client,
+                self._o.NS_TWOMEMO,
+                item,
+                targets,
+                is_muc_message=False,
+                stanza_id=None
+            )
+            item_elts = list(item.elements())
+            if len(item_elts) != 1:
+                raise ValueError(
+                    f"there should be exactly one item payload: {item.toXml()}"
+                )
+            encrypted_payload = item_elts[0]
+            item.children.clear()
+            encrypted_elt = item.addElement((NS_PTE, "encrypted"))
+            encrypted_elt["by"] = sender.userhost()
+            encrypted_elt["type"] = encryption_type
+            encrypted_elt.addChild(encrypted_payload)
+
+        return True
+
+    async def _items_trigger(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        items: List[domish.Element],
+        rsm_response: rsm.RSMResponse,
+        extra: Dict[str, Any],
+    ) -> bool:
+        if not extra.get(C.KEY_DECRYPT, True):
+            return True
+        if service is None:
+            service = client.jid.userhostJID()
+        for item in items:
+            payload = item.firstChildElement()
+            if (payload is not None
+                and payload.name == "encrypted"
+                and payload.uri == NS_PTE):
+                encrypted_elt = payload
+                item.children.clear()
+                try:
+                    encryption_type = encrypted_elt.getAttribute("type")
+                    encrypted_by = jid.JID(encrypted_elt["by"])
+                except (KeyError, RuntimeError):
+                    raise exceptions.DataError(
+                        f"invalid <encrypted> element: {encrypted_elt.toXml()}"
+                    )
+                if encryption_type!= self._o.NS_TWOMEMO:
+                    raise NotImplementedError("only TWOMEMO is supported for now")
+                log.debug(f"decrypting item {item.getAttribute('id', '')}")
+
+                # FIXME: we do use _message_received_trigger now to decrypt the stanza, a
+                #   cleaner separated decrypt method should be used
+                encrypted_elt["from"] = encrypted_by.full()
+                if not await self._o._message_received_trigger(
+                    client,
+                    encrypted_elt,
+                    defer.Deferred()
+                ) or not encrypted_elt.children:
+                    raise exceptions.EncryptionError("can't decrypt the message")
+
+                item.addChild(encrypted_elt.firstChildElement())
+
+                extra.setdefault("encrypted", {})[item["id"]] = {
+                    "type": NS_PTE,
+                    "algorithm": encryption_type
+                }
+        return True
+
+
+@implementer(iwokkel.IDisco)
+class PTE_Handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_PTE)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []