Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_sec_pubsub_signing.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_sec_pubsub_signing.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Pubsub Items Signature +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import base64 +import time +from typing import Any, Dict, List, Optional + +from lxml import etree +import shortuuid +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, xmlstream +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from wokkel import pubsub +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.tools import utils +from libervia.backend.tools.common import data_format + +from .plugin_xep_0373 import VerificationFailed + + +log = getLogger(__name__) + +IMPORT_NAME = "pubsub-signing" + +PLUGIN_INFO = { + C.PI_NAME: "Pubsub Signing", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_XEP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"], + C.PI_MAIN: "PubsubSigning", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _( + """Pubsub Signature can be used to strongly authenticate a pubsub item""" + ), +} +NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0" +NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0" + + +class PubsubSigning: + namespace = NS_PUBSUB_SIGNING + + def __init__(self, host): + log.info(_("Pubsub Signing plugin initialization")) + host.register_namespace("pubsub-signing", NS_PUBSUB_SIGNING) + self.host = host + self._p = host.plugins["XEP-0060"] + self._ox = host.plugins["XEP-0373"] + self._a = host.plugins["XEP-0470"] + self._a.register_attachment_handler( + "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set + ) + host.trigger.add("XEP-0060_publish", self._publish_trigger) + host.bridge.add_method( + "ps_signature_check", + ".plugin", + in_sign="sssss", + out_sign="s", + method=self._check, + async_=True, + ) + + def get_handler(self, client): + return PubsubSigning_Handler() + + def get_data_to_sign( + self, + item_elt: domish.Element, + to_jid: jid.JID, + timestamp: float, + signer: str, + ) -> bytes: + """Generate the wrapper element, normalize, serialize and return it""" + # we remove values which must not be in the serialised data + item_id = item_elt.attributes.pop("id") + item_publisher = item_elt.attributes.pop("publisher", None) + item_parent = item_elt.parent + + # we need to be sure that item element namespace is right + item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB + + sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data")) + to_elt = sign_data_elt.addElement("to") + to_elt["jid"] = to_jid.userhost() + time_elt = sign_data_elt.addElement("time") + time_elt["stamp"] = utils.xmpp_date(timestamp) + sign_data_elt.addElement("signer", content=signer) + sign_data_elt.addChild(item_elt) + # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is + # complete. For now serialisation/deserialisation is more secure. + # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) + et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) + to_sign = etree.tostring( + et_sign_data_elt, + method="c14n2", + with_comments=False, + strip_text=True + ) + # the data to sign is serialised, we cna restore original values + item_elt["id"] = item_id + if item_publisher is not None: + item_elt["publisher"] = item_publisher + item_elt.parent = item_parent + return to_sign + + def _check( + self, + service: str, + node: str, + item_id: str, + signature_data_s: str, + profile_key: str, + ) -> defer.Deferred: + d = defer.ensureDeferred( + self.check( + self.host.get_client(profile_key), + jid.JID(service), + node, + item_id, + data_format.deserialise(signature_data_s) + ) + ) + d.addCallback(data_format.serialise) + return d + + async def check( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item_id: str, + signature_data: Dict[str, Any], + ) -> Dict[str, Any]: + items, __ = await self._p.get_items( + client, service, node, item_ids=[item_id] + ) + if not items != 1: + raise exceptions.NotFound( + f"target item not found for {item_id!r} at {node!r} for {service}" + ) + item_elt = items[0] + timestamp = signature_data["timestamp"] + signers = signature_data["signers"] + if not signers: + raise ValueError("we must have at least one signer to check the signature") + if len(signers) > 1: + raise NotImplemented("multiple signers are not supported yet") + signer = jid.JID(signers[0]) + signature = base64.b64decode(signature_data["signature"]) + verification_keys = { + k for k in await self._ox.import_all_public_keys(client, signer) + if client.gpg_provider.can_sign(k) + } + signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) + try: + client.gpg_provider.verify_detached(signed_data, signature, verification_keys) + except VerificationFailed: + validated = False + else: + validated = True + + trusts = { + k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower() + for k in verification_keys + } + return { + "signer": signer.full(), + "validated": validated, + "trusts": trusts, + } + + def signature_get( + self, + client: SatXMPPEntity, + attachments_elt: domish.Element, + data: Dict[str, Any], + ) -> None: + try: + signature_elt = next( + attachments_elt.elements(NS_PUBSUB_SIGNING, "signature") + ) + except StopIteration: + pass + else: + time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) + if len(time_elts) != 1: + raise exceptions.DataError("only a single <time/> element is allowed") + try: + timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"]) + except (KeyError, exceptions.ParsingError): + raise exceptions.DataError( + "invalid time element: {signature_elt.toXml()}" + ) + + signature_data: Dict[str, Any] = { + "timestamp": timestamp, + "signers": [ + str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") + ] + } + # FIXME: only OpenPGP signature is available for now, to be updated if and + # when more algorithms are available. + sign_elt = next( + signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), + None + ) + if sign_elt is None: + log.warning( + "no known signature profile element found, ignoring signature: " + f"{signature_elt.toXml()}" + ) + return + else: + signature_data["signature"] = str(sign_elt) + + data["signature"] = signature_data + + async def signature_set( + self, + client: SatXMPPEntity, + attachments_data: Dict[str, Any], + former_elt: Optional[domish.Element] + ) -> Optional[domish.Element]: + signature_data = attachments_data["extra"].get("signature") + if signature_data is None: + return former_elt + elif signature_data: + item_elt = signature_data.get("item_elt") + service = jid.JID(attachments_data["service"]) + if item_elt is None: + node = attachments_data["node"] + item_id = attachments_data["id"] + items, __ = await self._p.get_items( + client, service, node, item_ids=[item_id] + ) + if not items != 1: + raise exceptions.NotFound( + f"target item not found for {item_id!r} at {node!r} for {service}" + ) + item_elt = items[0] + + signer = signature_data.get("signer") or client.jid.userhost() + timestamp = time.time() + timestamp_xmpp = utils.xmpp_date(timestamp) + to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer) + + signature_elt = domish.Element( + (NS_PUBSUB_SIGNING, "signature"), + ) + time_elt = signature_elt.addElement("time") + time_elt["stamp"] = timestamp_xmpp + signature_elt.addElement("signer", content=signer) + + sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) + signing_keys = { + k for k in self._ox.list_secret_keys(client) + if client.gpg_provider.can_sign(k.public_key) + } + # the base64 encoded signature itself + sign_elt.addContent( + base64.b64encode( + client.gpg_provider.sign_detached(to_sign, signing_keys) + ).decode() + ) + return signature_elt + else: + return None + + async def _publish_trigger( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + items: Optional[List[domish.Element]], + options: Optional[dict], + sender: jid.JID, + extra: Dict[str, Any] + ) -> bool: + if not items or not extra.get("signed"): + return True + + for item_elt in items: + # we need an ID to find corresponding attachment node, and so to sign an item + if not item_elt.hasAttribute("id"): + item_elt["id"] = shortuuid.uuid() + await self._a.set_attachements( + client, + { + "service": service.full(), + "node": node, + "id": item_elt["id"], + "extra": { + "signature": { + "item_elt": item_elt, + "signer": sender.userhost(), + } + } + } + ) + + return True + + +@implementer(iwokkel.IDisco) +class PubsubSigning_Handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [disco.DiscoFeature(NS_PUBSUB_SIGNING)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []