view libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 4b842c1fb686
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Pubsub Items Signature
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
import time
from typing import Any, Dict, List, Optional

from lxml import etree
import shortuuid
from twisted.internet import defer
from twisted.words.protocols.jabber import jid, xmlstream
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from wokkel import pubsub
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import utils
from libervia.backend.tools.common import data_format

from .plugin_xep_0373 import VerificationFailed


log = getLogger(__name__)

IMPORT_NAME = "pubsub-signing"

PLUGIN_INFO = {
    C.PI_NAME: "Pubsub Signing",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"],
    C.PI_MAIN: "PubsubSigning",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        """Pubsub Signature can be used to strongly authenticate a pubsub item"""
    ),
}
NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0"
NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0"


class PubsubSigning:
    namespace = NS_PUBSUB_SIGNING

    def __init__(self, host):
        log.info(_("Pubsub Signing plugin initialization"))
        host.register_namespace("pubsub-signing", NS_PUBSUB_SIGNING)
        self.host = host
        self._p = host.plugins["XEP-0060"]
        self._ox = host.plugins["XEP-0373"]
        self._a = host.plugins["XEP-0470"]
        self._a.register_attachment_handler(
            "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set
        )
        host.trigger.add("XEP-0060_publish", self._publish_trigger)
        host.bridge.add_method(
            "ps_signature_check",
            ".plugin",
            in_sign="sssss",
            out_sign="s",
            method=self._check,
            async_=True,
        )

    def get_handler(self, client):
        return PubsubSigning_Handler()

    def get_data_to_sign(
        self,
        item_elt: domish.Element,
        to_jid: jid.JID,
        timestamp: float,
        signer: str,
    ) -> bytes:
        """Generate the wrapper element, normalize, serialize and return it"""
        # we remove values which must not be in the serialised data
        item_id = item_elt.attributes.pop("id")
        item_publisher = item_elt.attributes.pop("publisher", None)
        item_parent = item_elt.parent

        # we need to be sure that item element namespace is right
        item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB

        sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data"))
        to_elt = sign_data_elt.addElement("to")
        to_elt["jid"] = to_jid.userhost()
        time_elt = sign_data_elt.addElement("time")
        time_elt["stamp"] = utils.xmpp_date(timestamp)
        sign_data_elt.addElement("signer", content=signer)
        sign_data_elt.addChild(item_elt)
        # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is
        #   complete. For now serialisation/deserialisation is more secure.
        # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True)
        et_sign_data_elt = etree.fromstring(sign_data_elt.toXml())
        to_sign = etree.tostring(
            et_sign_data_elt,
            method="c14n2",
            with_comments=False,
            strip_text=True
        )
        # the data to sign is serialised, we cna restore original values
        item_elt["id"] = item_id
        if item_publisher is not None:
            item_elt["publisher"] = item_publisher
        item_elt.parent = item_parent
        return to_sign

    def _check(
        self,
        service: str,
        node: str,
        item_id: str,
        signature_data_s: str,
        profile_key: str,
    ) -> defer.Deferred:
        d = defer.ensureDeferred(
            self.check(
                self.host.get_client(profile_key),
                jid.JID(service),
                node,
                item_id,
                data_format.deserialise(signature_data_s)
            )
        )
        d.addCallback(data_format.serialise)
        return d

    async def check(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        item_id: str,
        signature_data: Dict[str, Any],
    ) -> Dict[str, Any]:
        items, __ = await self._p.get_items(
            client, service, node, item_ids=[item_id]
        )
        if not items != 1:
            raise exceptions.NotFound(
                f"target item not found for {item_id!r} at {node!r} for {service}"
            )
        item_elt = items[0]
        timestamp = signature_data["timestamp"]
        signers = signature_data["signers"]
        if not signers:
            raise ValueError("we must have at least one signer to check the signature")
        if len(signers) > 1:
            raise NotImplemented("multiple signers are not supported yet")
        signer = jid.JID(signers[0])
        signature = base64.b64decode(signature_data["signature"])
        verification_keys = {
            k for k in await self._ox.import_all_public_keys(client, signer)
            if client.gpg_provider.can_sign(k)
        }
        signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full())
        try:
            client.gpg_provider.verify_detached(signed_data, signature, verification_keys)
        except VerificationFailed:
            validated = False
        else:
            validated = True

        trusts = {
            k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower()
            for k in verification_keys
        }
        return {
            "signer": signer.full(),
            "validated": validated,
            "trusts": trusts,
        }

    def signature_get(
        self,
        client: SatXMPPEntity,
        attachments_elt: domish.Element,
        data: Dict[str, Any],
    ) -> None:
        try:
            signature_elt = next(
                attachments_elt.elements(NS_PUBSUB_SIGNING, "signature")
            )
        except StopIteration:
            pass
        else:
            time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time"))
            if len(time_elts) != 1:
                raise exceptions.DataError("only a single <time/> element is allowed")
            try:
                timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"])
            except (KeyError, exceptions.ParsingError):
                raise exceptions.DataError(
                    "invalid time element: {signature_elt.toXml()}"
                )

            signature_data: Dict[str, Any] = {
                "timestamp": timestamp,
                "signers": [
                    str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer")
                ]
            }
            # FIXME: only OpenPGP signature is available for now, to be updated if and
            #   when more algorithms are available.
            sign_elt = next(
                signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"),
                None
            )
            if sign_elt is None:
                log.warning(
                    "no known signature profile element found, ignoring signature: "
                    f"{signature_elt.toXml()}"
                )
                return
            else:
                signature_data["signature"] = str(sign_elt)

            data["signature"] = signature_data

    async def signature_set(
        self,
        client: SatXMPPEntity,
        attachments_data: Dict[str, Any],
        former_elt: Optional[domish.Element]
    ) -> Optional[domish.Element]:
        signature_data = attachments_data["extra"].get("signature")
        if signature_data is None:
            return former_elt
        elif signature_data:
            item_elt = signature_data.get("item_elt")
            service = jid.JID(attachments_data["service"])
            if item_elt is None:
                node = attachments_data["node"]
                item_id = attachments_data["id"]
                items, __ = await self._p.get_items(
                    client, service, node, item_ids=[item_id]
                )
                if not items != 1:
                    raise exceptions.NotFound(
                        f"target item not found for {item_id!r} at {node!r} for {service}"
                    )
                item_elt = items[0]

            signer = signature_data.get("signer") or client.jid.userhost()
            timestamp = time.time()
            timestamp_xmpp = utils.xmpp_date(timestamp)
            to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer)

            signature_elt = domish.Element(
                (NS_PUBSUB_SIGNING, "signature"),
            )
            time_elt = signature_elt.addElement("time")
            time_elt["stamp"] = timestamp_xmpp
            signature_elt.addElement("signer", content=signer)

            sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign"))
            signing_keys = {
                k for k in self._ox.list_secret_keys(client)
                if client.gpg_provider.can_sign(k.public_key)
            }
            # the base64 encoded signature itself
            sign_elt.addContent(
                base64.b64encode(
                    client.gpg_provider.sign_detached(to_sign, signing_keys)
                ).decode()
            )
            return signature_elt
        else:
            return None

    async def _publish_trigger(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        items: Optional[List[domish.Element]],
        options: Optional[dict],
        sender: jid.JID,
        extra: Dict[str, Any]
    ) -> bool:
        if not items or not extra.get("signed"):
            return True

        for item_elt in items:
            # we need an ID to find corresponding attachment node, and so to sign an item
            if not item_elt.hasAttribute("id"):
                item_elt["id"] = shortuuid.uuid()
            await self._a.set_attachements(
                client,
                {
                    "service": service.full(),
                    "node": node,
                    "id": item_elt["id"],
                    "extra": {
                        "signature": {
                            "item_elt": item_elt,
                            "signer": sender.userhost(),
                        }
                    }
                }
            )

        return True


@implementer(iwokkel.IDisco)
class PubsubSigning_Handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_PUBSUB_SIGNING)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []