Mercurial > libervia-backend
view sat/plugins/plugin_sec_pubsub_signing.py @ 4005:54a6b44f173b
component AP gateway: reset stream position after getting payload:
the stream position needs to be reset, as the body may be read again to compute signature
hash.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 16 Mar 2023 12:28:52 +0100 |
parents | d105ead599b6 |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Items Signature # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import time from typing import Any, Dict, List, Optional from lxml import etree import shortuuid from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import pubsub from zope.interface import implementer from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger from sat.tools import utils from sat.tools.common import data_format from .plugin_xep_0373 import VerificationFailed log = getLogger(__name__) IMPORT_NAME = "pubsub-signing" PLUGIN_INFO = { C.PI_NAME: "Pubsub Signing", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"], C.PI_MAIN: "PubsubSigning", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( """Pubsub Signature can be used to strongly authenticate a pubsub item""" ), } NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0" NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0" class PubsubSigning: namespace = NS_PUBSUB_SIGNING def __init__(self, host): log.info(_("Pubsub Signing plugin initialization")) host.registerNamespace("pubsub-signing", NS_PUBSUB_SIGNING) self.host = host self._p = host.plugins["XEP-0060"] self._ox = host.plugins["XEP-0373"] self._a = host.plugins["XEP-0470"] self._a.register_attachment_handler( "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set ) host.trigger.add("XEP-0060_publish", self._publish_trigger) host.bridge.addMethod( "psSignatureCheck", ".plugin", in_sign="sssss", out_sign="s", method=self._check, async_=True, ) def getHandler(self, client): return PubsubSigning_Handler() def get_data_to_sign( self, item_elt: domish.Element, to_jid: jid.JID, timestamp: float, signer: str, ) -> bytes: """Generate the wrapper element, normalize, serialize and return it""" # we remove values which must not be in the serialised data item_id = item_elt.attributes.pop("id") item_publisher = item_elt.attributes.pop("publisher", None) item_parent = item_elt.parent # we need to be sure that item element namespace is right item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data")) to_elt = sign_data_elt.addElement("to") to_elt["jid"] = to_jid.userhost() time_elt = sign_data_elt.addElement("time") time_elt["stamp"] = utils.xmpp_date(timestamp) sign_data_elt.addElement("signer", content=signer) sign_data_elt.addChild(item_elt) # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is # complete. For now serialisation/deserialisation is more secure. # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) to_sign = etree.tostring( et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True ) # the data to sign is serialised, we cna restore original values item_elt["id"] = item_id if item_publisher is not None: item_elt["publisher"] = item_publisher item_elt.parent = item_parent return to_sign def _check( self, service: str, node: str, item_id: str, signature_data_s: str, profile_key: str, ) -> defer.Deferred: d = defer.ensureDeferred( self.check( self.host.getClient(profile_key), jid.JID(service), node, item_id, data_format.deserialise(signature_data_s) ) ) d.addCallback(data_format.serialise) return d async def check( self, client: SatXMPPEntity, service: jid.JID, node: str, item_id: str, signature_data: Dict[str, Any], ) -> Dict[str, Any]: items, __ = await self._p.getItems( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] timestamp = signature_data["timestamp"] signers = signature_data["signers"] if not signers: raise ValueError("we must have at least one signer to check the signature") if len(signers) > 1: raise NotImplemented("multiple signers are not supported yet") signer = jid.JID(signers[0]) signature = base64.b64decode(signature_data["signature"]) verification_keys = { k for k in await self._ox.import_all_public_keys(client, signer) if client.gpg_provider.can_sign(k) } signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) try: client.gpg_provider.verify_detached(signed_data, signature, verification_keys) except VerificationFailed: validated = False else: validated = True trusts = { k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower() for k in verification_keys } return { "signer": signer.full(), "validated": validated, "trusts": trusts, } def signature_get( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: signature_elt = next( attachments_elt.elements(NS_PUBSUB_SIGNING, "signature") ) except StopIteration: pass else: time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) if len(time_elts) != 1: raise exceptions.DataError("only a single <time/> element is allowed") try: timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"]) except (KeyError, exceptions.ParsingError): raise exceptions.DataError( "invalid time element: {signature_elt.toXml()}" ) signature_data: Dict[str, Any] = { "timestamp": timestamp, "signers": [ str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") ] } # FIXME: only OpenPGP signature is available for now, to be updated if and # when more algorithms are available. sign_elt = next( signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None ) if sign_elt is None: log.warning( "no known signature profile element found, ignoring signature: " f"{signature_elt.toXml()}" ) return else: signature_data["signature"] = str(sign_elt) data["signature"] = signature_data async def signature_set( self, client: SatXMPPEntity, attachments_data: Dict[str, Any], former_elt: Optional[domish.Element] ) -> Optional[domish.Element]: signature_data = attachments_data["extra"].get("signature") if signature_data is None: return former_elt elif signature_data: item_elt = signature_data.get("item_elt") service = jid.JID(attachments_data["service"]) if item_elt is None: node = attachments_data["node"] item_id = attachments_data["id"] items, __ = await self._p.getItems( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] signer = signature_data.get("signer") or client.jid.userhost() timestamp = time.time() timestamp_xmpp = utils.xmpp_date(timestamp) to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer) signature_elt = domish.Element( (NS_PUBSUB_SIGNING, "signature"), ) time_elt = signature_elt.addElement("time") time_elt["stamp"] = timestamp_xmpp signature_elt.addElement("signer", content=signer) sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) signing_keys = { k for k in self._ox.list_secret_keys(client) if client.gpg_provider.can_sign(k.public_key) } # the base64 encoded signature itself sign_elt.addContent( base64.b64encode( client.gpg_provider.sign_detached(to_sign, signing_keys) ).decode() ) return signature_elt else: return None async def _publish_trigger( self, client: SatXMPPEntity, service: jid.JID, node: str, items: Optional[List[domish.Element]], options: Optional[dict], sender: jid.JID, extra: Dict[str, Any] ) -> bool: if not items or not extra.get("signed"): return True for item_elt in items: # we need an ID to find corresponding attachment node, and so to sign an item if not item_elt.hasAttribute("id"): item_elt["id"] = shortuuid.uuid() await self._a.set_attachements( client, { "service": service.full(), "node": node, "id": item_elt["id"], "extra": { "signature": { "item_elt": item_elt, "signer": sender.userhost(), } } } ) return True @implementer(iwokkel.IDisco) class PubsubSigning_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_PUBSUB_SIGNING)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []