Mercurial > libervia-backend
view libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 4b842c1fb686 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Items Signature # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import time from typing import Any, Dict, List, Optional from lxml import etree import shortuuid from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import pubsub from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools import utils from libervia.backend.tools.common import data_format from .plugin_xep_0373 import VerificationFailed log = getLogger(__name__) IMPORT_NAME = "pubsub-signing" PLUGIN_INFO = { C.PI_NAME: "Pubsub Signing", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"], C.PI_MAIN: "PubsubSigning", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( """Pubsub Signature can be used to strongly authenticate a pubsub item""" ), } NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0" NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0" class PubsubSigning: namespace = NS_PUBSUB_SIGNING def __init__(self, host): log.info(_("Pubsub Signing plugin initialization")) host.register_namespace("pubsub-signing", NS_PUBSUB_SIGNING) self.host = host self._p = host.plugins["XEP-0060"] self._ox = host.plugins["XEP-0373"] self._a = host.plugins["XEP-0470"] self._a.register_attachment_handler( "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set ) host.trigger.add("XEP-0060_publish", self._publish_trigger) host.bridge.add_method( "ps_signature_check", ".plugin", in_sign="sssss", out_sign="s", method=self._check, async_=True, ) def get_handler(self, client): return PubsubSigning_Handler() def get_data_to_sign( self, item_elt: domish.Element, to_jid: jid.JID, timestamp: float, signer: str, ) -> bytes: """Generate the wrapper element, normalize, serialize and return it""" # we remove values which must not be in the serialised data item_id = item_elt.attributes.pop("id") item_publisher = item_elt.attributes.pop("publisher", None) item_parent = item_elt.parent # we need to be sure that item element namespace is right item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data")) to_elt = sign_data_elt.addElement("to") to_elt["jid"] = to_jid.userhost() time_elt = sign_data_elt.addElement("time") time_elt["stamp"] = utils.xmpp_date(timestamp) sign_data_elt.addElement("signer", content=signer) sign_data_elt.addChild(item_elt) # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is # complete. For now serialisation/deserialisation is more secure. # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) to_sign = etree.tostring( et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True ) # the data to sign is serialised, we cna restore original values item_elt["id"] = item_id if item_publisher is not None: item_elt["publisher"] = item_publisher item_elt.parent = item_parent return to_sign def _check( self, service: str, node: str, item_id: str, signature_data_s: str, profile_key: str, ) -> defer.Deferred: d = defer.ensureDeferred( self.check( self.host.get_client(profile_key), jid.JID(service), node, item_id, data_format.deserialise(signature_data_s) ) ) d.addCallback(data_format.serialise) return d async def check( self, client: SatXMPPEntity, service: jid.JID, node: str, item_id: str, signature_data: Dict[str, Any], ) -> Dict[str, Any]: items, __ = await self._p.get_items( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] timestamp = signature_data["timestamp"] signers = signature_data["signers"] if not signers: raise ValueError("we must have at least one signer to check the signature") if len(signers) > 1: raise NotImplemented("multiple signers are not supported yet") signer = jid.JID(signers[0]) signature = base64.b64decode(signature_data["signature"]) verification_keys = { k for k in await self._ox.import_all_public_keys(client, signer) if client.gpg_provider.can_sign(k) } signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) try: client.gpg_provider.verify_detached(signed_data, signature, verification_keys) except VerificationFailed: validated = False else: validated = True trusts = { k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower() for k in verification_keys } return { "signer": signer.full(), "validated": validated, "trusts": trusts, } def signature_get( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: signature_elt = next( attachments_elt.elements(NS_PUBSUB_SIGNING, "signature") ) except StopIteration: pass else: time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) if len(time_elts) != 1: raise exceptions.DataError("only a single <time/> element is allowed") try: timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"]) except (KeyError, exceptions.ParsingError): raise exceptions.DataError( "invalid time element: {signature_elt.toXml()}" ) signature_data: Dict[str, Any] = { "timestamp": timestamp, "signers": [ str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") ] } # FIXME: only OpenPGP signature is available for now, to be updated if and # when more algorithms are available. sign_elt = next( signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None ) if sign_elt is None: log.warning( "no known signature profile element found, ignoring signature: " f"{signature_elt.toXml()}" ) return else: signature_data["signature"] = str(sign_elt) data["signature"] = signature_data async def signature_set( self, client: SatXMPPEntity, attachments_data: Dict[str, Any], former_elt: Optional[domish.Element] ) -> Optional[domish.Element]: signature_data = attachments_data["extra"].get("signature") if signature_data is None: return former_elt elif signature_data: item_elt = signature_data.get("item_elt") service = jid.JID(attachments_data["service"]) if item_elt is None: node = attachments_data["node"] item_id = attachments_data["id"] items, __ = await self._p.get_items( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] signer = signature_data.get("signer") or client.jid.userhost() timestamp = time.time() timestamp_xmpp = utils.xmpp_date(timestamp) to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer) signature_elt = domish.Element( (NS_PUBSUB_SIGNING, "signature"), ) time_elt = signature_elt.addElement("time") time_elt["stamp"] = timestamp_xmpp signature_elt.addElement("signer", content=signer) sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) signing_keys = { k for k in self._ox.list_secret_keys(client) if client.gpg_provider.can_sign(k.public_key) } # the base64 encoded signature itself sign_elt.addContent( base64.b64encode( client.gpg_provider.sign_detached(to_sign, signing_keys) ).decode() ) return signature_elt else: return None async def _publish_trigger( self, client: SatXMPPEntity, service: jid.JID, node: str, items: Optional[List[domish.Element]], options: Optional[dict], sender: jid.JID, extra: Dict[str, Any] ) -> bool: if not items or not extra.get("signed"): return True for item_elt in items: # we need an ID to find corresponding attachment node, and so to sign an item if not item_elt.hasAttribute("id"): item_elt["id"] = shortuuid.uuid() await self._a.set_attachements( client, { "service": service.full(), "node": node, "id": item_elt["id"], "extra": { "signature": { "item_elt": item_elt, "signer": sender.userhost(), } } } ) return True @implementer(iwokkel.IDisco) class PubsubSigning_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_PUBSUB_SIGNING)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []