Mercurial > libervia-backend
view libervia/backend/plugins/plugin_sec_pubsub_signing.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Items Signature # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import time from typing import Any, Dict, List, Optional from lxml import etree import shortuuid from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import pubsub from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools import utils from libervia.backend.tools.common import data_format from .plugin_xep_0373 import VerificationFailed log = getLogger(__name__) IMPORT_NAME = "pubsub-signing" PLUGIN_INFO = { C.PI_NAME: "Pubsub Signing", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0373", "XEP-0470"], C.PI_MAIN: "PubsubSigning", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( """Pubsub Signature can be used to strongly authenticate a pubsub item""" ), } NS_PUBSUB_SIGNING = "urn:xmpp:pubsub-signing:0" NS_PUBSUB_SIGNING_OPENPGP = "urn:xmpp:pubsub-signing:openpgp:0" class PubsubSigning: namespace = NS_PUBSUB_SIGNING def __init__(self, host): log.info(_("Pubsub Signing plugin initialization")) host.register_namespace("pubsub-signing", NS_PUBSUB_SIGNING) self.host = host self._p = host.plugins["XEP-0060"] self._ox = host.plugins["XEP-0373"] self._a = host.plugins["XEP-0470"] self._a.register_attachment_handler( "signature", NS_PUBSUB_SIGNING, self.signature_get, self.signature_set ) host.trigger.add("XEP-0060_publish", self._publish_trigger) host.bridge.add_method( "ps_signature_check", ".plugin", in_sign="sssss", out_sign="s", method=self._check, async_=True, ) def get_handler(self, client): return PubsubSigning_Handler() def get_data_to_sign( self, item_elt: domish.Element, to_jid: jid.JID, timestamp: float, signer: str, ) -> bytes: """Generate the wrapper element, normalize, serialize and return it""" # we remove values which must not be in the serialised data item_id = item_elt.attributes.pop("id") item_publisher = item_elt.attributes.pop("publisher", None) item_parent = item_elt.parent # we need to be sure that item element namespace is right item_elt.uri = item_elt.defaultUri = pubsub.NS_PUBSUB sign_data_elt = domish.Element((NS_PUBSUB_SIGNING, "sign-data")) to_elt = sign_data_elt.addElement("to") to_elt["jid"] = to_jid.userhost() time_elt = sign_data_elt.addElement("time") time_elt["stamp"] = utils.xmpp_date(timestamp) sign_data_elt.addElement("signer", content=signer) sign_data_elt.addChild(item_elt) # FIXME: xml_tools.domish_elt_2_et_elt must be used once implementation is # complete. For now serialisation/deserialisation is more secure. # et_sign_data_elt = xml_tools.domish_elt_2_et_elt(sign_data_elt, True) et_sign_data_elt = etree.fromstring(sign_data_elt.toXml()) to_sign = etree.tostring( et_sign_data_elt, method="c14n2", with_comments=False, strip_text=True ) # the data to sign is serialised, we cna restore original values item_elt["id"] = item_id if item_publisher is not None: item_elt["publisher"] = item_publisher item_elt.parent = item_parent return to_sign def _check( self, service: str, node: str, item_id: str, signature_data_s: str, profile_key: str, ) -> defer.Deferred: d = defer.ensureDeferred( self.check( self.host.get_client(profile_key), jid.JID(service), node, item_id, data_format.deserialise(signature_data_s), ) ) d.addCallback(data_format.serialise) return d async def check( self, client: SatXMPPEntity, service: jid.JID, node: str, item_id: str, signature_data: Dict[str, Any], ) -> Dict[str, Any]: items, __ = await self._p.get_items(client, service, node, item_ids=[item_id]) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] timestamp = signature_data["timestamp"] signers = signature_data["signers"] if not signers: raise ValueError("we must have at least one signer to check the signature") if len(signers) > 1: raise NotImplemented("multiple signers are not supported yet") signer = jid.JID(signers[0]) signature = base64.b64decode(signature_data["signature"]) verification_keys = { k for k in await self._ox.import_all_public_keys(client, signer) if client.gpg_provider.can_sign(k) } signed_data = self.get_data_to_sign(item_elt, service, timestamp, signer.full()) try: client.gpg_provider.verify_detached(signed_data, signature, verification_keys) except VerificationFailed: validated = False else: validated = True trusts = { k.fingerprint: (await self._ox.get_trust(client, k, signer)).value.lower() for k in verification_keys } return { "signer": signer.full(), "validated": validated, "trusts": trusts, } def signature_get( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: signature_elt = next(attachments_elt.elements(NS_PUBSUB_SIGNING, "signature")) except StopIteration: pass else: time_elts = list(signature_elt.elements(NS_PUBSUB_SIGNING, "time")) if len(time_elts) != 1: raise exceptions.DataError("only a single <time/> element is allowed") try: timestamp = utils.parse_xmpp_date(time_elts[0]["stamp"]) except (KeyError, exceptions.ParsingError): raise exceptions.DataError( "invalid time element: {signature_elt.toXml()}" ) signature_data: Dict[str, Any] = { "timestamp": timestamp, "signers": [ str(s) for s in signature_elt.elements(NS_PUBSUB_SIGNING, "signer") ], } # FIXME: only OpenPGP signature is available for now, to be updated if and # when more algorithms are available. sign_elt = next( signature_elt.elements(NS_PUBSUB_SIGNING_OPENPGP, "sign"), None ) if sign_elt is None: log.warning( "no known signature profile element found, ignoring signature: " f"{signature_elt.toXml()}" ) return else: signature_data["signature"] = str(sign_elt) data["signature"] = signature_data async def signature_set( self, client: SatXMPPEntity, attachments_data: Dict[str, Any], former_elt: Optional[domish.Element], ) -> Optional[domish.Element]: signature_data = attachments_data["extra"].get("signature") if signature_data is None: return former_elt elif signature_data: item_elt = signature_data.get("item_elt") service = jid.JID(attachments_data["service"]) if item_elt is None: node = attachments_data["node"] item_id = attachments_data["id"] items, __ = await self._p.get_items( client, service, node, item_ids=[item_id] ) if not items != 1: raise exceptions.NotFound( f"target item not found for {item_id!r} at {node!r} for {service}" ) item_elt = items[0] signer = signature_data.get("signer") or client.jid.userhost() timestamp = time.time() timestamp_xmpp = utils.xmpp_date(timestamp) to_sign = self.get_data_to_sign(item_elt, service, timestamp, signer) signature_elt = domish.Element( (NS_PUBSUB_SIGNING, "signature"), ) time_elt = signature_elt.addElement("time") time_elt["stamp"] = timestamp_xmpp signature_elt.addElement("signer", content=signer) sign_elt = signature_elt.addElement((NS_PUBSUB_SIGNING_OPENPGP, "sign")) signing_keys = { k for k in self._ox.list_secret_keys(client) if client.gpg_provider.can_sign(k.public_key) } # the base64 encoded signature itself sign_elt.addContent( base64.b64encode( client.gpg_provider.sign_detached(to_sign, signing_keys) ).decode() ) return signature_elt else: return None async def _publish_trigger( self, client: SatXMPPEntity, service: jid.JID, node: str, items: Optional[List[domish.Element]], options: Optional[dict], sender: jid.JID, extra: Dict[str, Any], ) -> bool: if not items or not extra.get("signed"): return True for item_elt in items: # we need an ID to find corresponding attachment node, and so to sign an item if not item_elt.hasAttribute("id"): item_elt["id"] = shortuuid.uuid() await self._a.set_attachements( client, { "service": service.full(), "node": node, "id": item_elt["id"], "extra": { "signature": { "item_elt": item_elt, "signer": sender.userhost(), } }, }, ) return True @implementer(iwokkel.IDisco) class PubsubSigning_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_PUBSUB_SIGNING)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []