Mercurial > libervia-backend
view sat/plugins/plugin_sec_pubsub_signing.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | d105ead599b6 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Items Signature # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import time from typing import Any, Dict, List, Optional from lxml import etree import shortuuid from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import pubsub from zope.interface import implementer from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger from sat.tools import utils from sat.tools.common import data_format from .plugin_xep_0373 import VerificationFailed log = getLogger(__name__) IMPORT_NAME = "pubsub-signing" PLUGIN_INFO = { C.PI_NAME: "Pubsub Signing", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"], C.PI_MAIN: "PubsubSigning", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( """Pubsub Signature can be used to strongly authenticate a pubsub item""" ), } NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0" NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0" class PubsubSigning: namespace = NS_PUBSUB_SIGNING def __init__(self, host): log.info(_("Pubsub Signing plugin initialization")) host.register_namespace("pubsub-signing", NS_PUBSUB_SIGNING) self.host = host self._p = host.plugins["XEP-0060"] self._ox = host.plugins["XEP-0373"] self._a = host.plugins["XEP-0470"] self._a.register_attachment_handler( "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set ) host.trigger.add("XEP-0060_publish", self._publish_trigger) host.bridge.add_method( "ps_signature_check", ".plugin", in_sign="sssss", out_sign="s", method=self._check, async_=True, ) def get_handler(self, client): return PubsubSigning_Handler() def get_data_to_sign( self, item_elt: domish.Element, to_jid: jid.JID, timestamp: float, signer: str, ) -> bytes: """Generate the wrapper element, normalize, serialize and return it""" # we remove values which must not be in the serialised data item_id = item_elt.attributes.pop("id") item_publisher = item_elt.attributes.pop("publisher", None) item_parent = item_elt.parent # we need to be sure that item element namespace is right item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data")) to_elt = sign_data_elt.addElement("to") to_elt["jid"] = to_jid.userhost() time_elt = sign_data_elt.addElement("time") time_elt["stamp"] = utils.xmpp_date(timestamp) sign_data_elt.addElement("signer", content=signer) sign_data_elt.addChild(item_elt) # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is # complete. For now serialisation/deserialisation is more secure. # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) to_sign = etree.tostring( et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True ) # the data to sign is serialised, we cna restore original values item_elt["id"] = item_id if item_publisher is not None: item_elt["publisher"] = item_publisher item_elt.parent = item_parent return to_sign def _check( self, service: str, node: str, item_id: str, signature_data_s: str, profile_key: str, ) -> defer.Deferred: d = defer.ensureDeferred( self.check( self.host.get_client(profile_key), jid.JID(service), node, item_id, data_format.deserialise(signature_data_s) ) ) d.addCallback(data_format.serialise) return d async def check( self, client: SatXMPPEntity, service: jid.JID, node: str, item_id: str, signature_data: Dict[str, Any], ) -> Dict[str, Any]: items, __ = await self._p.get_items( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] timestamp = signature_data["timestamp"] signers = signature_data["signers"] if not signers: raise ValueError("we must have at least one signer to check the signature") if len(signers) > 1: raise NotImplemented("multiple signers are not supported yet") signer = jid.JID(signers[0]) signature = base64.b64decode(signature_data["signature"]) verification_keys = { k for k in await self._ox.import_all_public_keys(client, signer) if client.gpg_provider.can_sign(k) } signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) try: client.gpg_provider.verify_detached(signed_data, signature, verification_keys) except VerificationFailed: validated = False else: validated = True trusts = { k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower() for k in verification_keys } return { "signer": signer.full(), "validated": validated, "trusts": trusts, } def signature_get( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: signature_elt = next( attachments_elt.elements(NS_PUBSUB_SIGNING, "signature") ) except StopIteration: pass else: time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) if len(time_elts) != 1: raise exceptions.DataError("only a single <time/> element is allowed") try: timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"]) except (KeyError, exceptions.ParsingError): raise exceptions.DataError( "invalid time element: {signature_elt.toXml()}" ) signature_data: Dict[str, Any] = { "timestamp": timestamp, "signers": [ str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") ] } # FIXME: only OpenPGP signature is available for now, to be updated if and # when more algorithms are available. sign_elt = next( signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None ) if sign_elt is None: log.warning( "no known signature profile element found, ignoring signature: " f"{signature_elt.toXml()}" ) return else: signature_data["signature"] = str(sign_elt) data["signature"] = signature_data async def signature_set( self, client: SatXMPPEntity, attachments_data: Dict[str, Any], former_elt: Optional[domish.Element] ) -> Optional[domish.Element]: signature_data = attachments_data["extra"].get("signature") if signature_data is None: return former_elt elif signature_data: item_elt = signature_data.get("item_elt") service = jid.JID(attachments_data["service"]) if item_elt is None: node = attachments_data["node"] item_id = attachments_data["id"] items, __ = await self._p.get_items( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] signer = signature_data.get("signer") or client.jid.userhost() timestamp = time.time() timestamp_xmpp = utils.xmpp_date(timestamp) to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer) signature_elt = domish.Element( (NS_PUBSUB_SIGNING, "signature"), ) time_elt = signature_elt.addElement("time") time_elt["stamp"] = timestamp_xmpp signature_elt.addElement("signer", content=signer) sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) signing_keys = { k for k in self._ox.list_secret_keys(client) if client.gpg_provider.can_sign(k.public_key) } # the base64 encoded signature itself sign_elt.addContent( base64.b64encode( client.gpg_provider.sign_detached(to_sign, signing_keys) ).decode() ) return signature_elt else: return None async def _publish_trigger( self, client: SatXMPPEntity, service: jid.JID, node: str, items: Optional[List[domish.Element]], options: Optional[dict], sender: jid.JID, extra: Dict[str, Any] ) -> bool: if not items or not extra.get("signed"): return True for item_elt in items: # we need an ID to find corresponding attachment node, and so to sign an item if not item_elt.hasAttribute("id"): item_elt["id"] = shortuuid.uuid() await self._a.set_attachements( client, { "service": service.full(), "node": node, "id": item_elt["id"], "extra": { "signature": { "item_elt": item_elt, "signer": sender.userhost(), } } } ) return True @implementer(iwokkel.IDisco) class PubsubSigning_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_PUBSUB_SIGNING)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []