Mercurial > libervia-backend
view sat/plugins/plugin_sec_oxps.py @ 4007:1d5a81e3c9e8
plugin XEP-0384: skip MessageReceived trigger when in a component:
OMEMO is not used in components so far, but the trigger is trying to request OMEMO PEP
nodes, which causes an error with virtual pubsub service of AP component.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 16 Mar 2023 12:31:24 +0100 |
parents | 9badc46c5481 |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Encryption # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 import dataclasses import secrets import time from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from collections import OrderedDict import shortuuid from twisted.internet import defer from twisted.words.protocols.jabber import jid, xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import rsm from zope.interface import implementer from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger from sat.memory import persistent from sat.tools import utils from sat.tools import xml_tools from sat.tools.common import data_format from sat.tools.common import uri from sat.tools.common.async_utils import async_lru from .plugin_xep_0373 import NS_OX, get_gpg_provider log = getLogger(__name__) IMPORT_NAME = "OXPS" PLUGIN_INFO = { C.PI_NAME: "OpenPGP for XMPP Pubsub", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0334", "XEP-0373"], C.PI_MAIN: "PubsubEncryption", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Pubsub e2e encryption via OpenPGP"""), } NS_OXPS = "urn:xmpp:openpgp:pubsub:0" KEY_REVOKED = "revoked" CACHE_MAX = 5 @dataclasses.dataclass class SharedSecret: id: str key: str timestamp: float # bare JID of who has generated the secret origin: jid.JID revoked: bool = False shared_with: Set[jid.JID] = dataclasses.field(default_factory=set) class PubsubEncryption: namespace = NS_OXPS def __init__(self, host): log.info(_("OpenPGP for XMPP Pubsub plugin initialization")) host.registerNamespace("oxps", NS_OXPS) self.host = host self._p = host.plugins["XEP-0060"] self._h = host.plugins["XEP-0334"] self._ox = host.plugins["XEP-0373"] host.trigger.add("XEP-0060_publish", self._publish_trigger) host.trigger.add("XEP-0060_items", self._items_trigger) host.trigger.add( "messageReceived", self._message_received_trigger, ) host.bridge.addMethod( "psSecretShare", ".plugin", in_sign="sssass", out_sign="", method=self._ps_secret_share, async_=True, ) host.bridge.addMethod( "psSecretRevoke", ".plugin", in_sign="sssass", out_sign="", method=self._ps_secret_revoke, async_=True, ) host.bridge.addMethod( "psSecretRotate", ".plugin", in_sign="ssass", out_sign="", method=self._ps_secret_rotate, async_=True, ) host.bridge.addMethod( "psSecretsList", ".plugin", in_sign="sss", out_sign="s", method=self._ps_secrets_list, async_=True, ) def getHandler(self, client): return PubsubEncryption_Handler() async def profileConnecting(self, client): client.__storage = persistent.LazyPersistentBinaryDict( IMPORT_NAME, client.profile ) # cache to avoid useless DB access, and to avoid race condition by ensuring that # the same shared_secrets instance is always used for a given node. client.__cache = OrderedDict() self.gpg_provider = get_gpg_provider(self.host, client) async def load_secrets( self, client: SatXMPPEntity, node_uri: str ) -> Optional[Dict[str, SharedSecret]]: """Load shared secret from databse or cache A cache is used per client to avoid usueless db access, as shared secrets are often needed several times in a row. Cache is also necessary to avoir race condition, when updating a secret, by ensuring that the same instance is used for all updates during a session. @param node_uri: XMPP URI of the encrypted pubsub node @return shared secrets, or None if no secrets are known yet """ try: shared_secrets = client.__cache[node_uri] except KeyError: pass else: client.__cache.move_to_end(node_uri) return shared_secrets secrets_as_dict = await client.__storage.get(node_uri) if secrets_as_dict is None: return None else: shared_secrets = { s["id"]: SharedSecret( id=s["id"], key=s["key"], timestamp=s["timestamp"], origin=jid.JID(s["origin"]), revoked=s["revoked"], shared_with={jid.JID(w) for w in s["shared_with"]} ) for s in secrets_as_dict } client.__cache[node_uri] = shared_secrets while len(client.__cache) > CACHE_MAX: client.__cache.popitem(False) return shared_secrets def __secrect_dict_factory(self, data: List[Tuple[str, Any]]) -> Dict[str, Any]: ret = {} for k, v in data: if k == "origin": v = v.full() elif k == "shared_with": v = [j.full() for j in v] ret[k] = v return ret async def store_secrets( self, client: SatXMPPEntity, node_uri: str, shared_secrets: Dict[str, SharedSecret] ) -> None: """Store shared secrets to database Shared secrets are serialised before being stored. If ``node_uri`` is not in cache, the shared_secrets instance is also put in cache/ @param node_uri: XMPP URI of the encrypted pubsub node @param shared_secrets: shared secrets to store """ if node_uri not in client.__cache: client.__cache[node_uri] = shared_secrets while len(client.__cache) > CACHE_MAX: client.__cache.popitem(False) secrets_as_dict = [ dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory) for s in shared_secrets.values() ] await client.__storage.aset(node_uri, secrets_as_dict) def generate_secret(self, client: SatXMPPEntity) -> SharedSecret: """Generate a new shared secret""" log.info("Generating a new shared secret.") secret_key = secrets.token_urlsafe(64) secret_id = shortuuid.uuid() return SharedSecret( id = secret_id, key = secret_key, timestamp = time.time(), origin = client.jid.userhostJID() ) def _ps_secret_revoke( self, service: str, node: str, secret_id: str, recipients: List[str], profile_key: str ) -> defer.Deferred: return defer.ensureDeferred( self.revoke( self.host.getClient(profile_key), jid.JID(service) if service else None, node, secret_id, [jid.JID(r) for r in recipients] or None, ) ) async def revoke( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, secret_id: str, recipients: Optional[Iterable[jid.JID]] = None ) -> None: """Revoke a secret and notify entities @param service: pubsub/PEP service where the node is @param node: node name @param secret_id: ID of the secret to revoke (must have been generated by ourselves) recipients: JIDs of entities to send the revocation notice to. If None, all entities known to have the shared secret will be notified. Use empty list if you don't want to notify anybody (not recommended) """ if service is None: service = client.jid.userhostJID() node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if not shared_secrets: raise exceptions.NotFound(f"No shared secret is known for {node_uri}") try: shared_secret = shared_secrets[secret_id] except KeyError: raise exceptions.NotFound( f"No shared secret with ID {secret_id!r} has been found for {node_uri}" ) else: if shared_secret.origin != client.jid.userhostJID(): raise exceptions.PermissionError( f"The shared secret {shared_secret.id} originate from " f"{shared_secret.origin}, not you ({client.jid.userhostJID()}). You " "can't revoke it" ) shared_secret.revoked = True await self.store_secrets(client, node_uri, shared_secrets) log.info( f"shared secret {secret_id!r} for {node_uri} has been revoked." ) if recipients is None: recipients = shared_secret.shared_with if recipients: for recipient in recipients: await self.send_revoke_notification( client, service, node, shared_secret.id, recipient ) log.info( f"shared secret {shared_secret.id} revocation notification for " f"{node_uri} has been send to {''.join(str(r) for r in recipients)}" ) else: log.info( "Due to empty recipients list, no revocation notification has been sent " f"for shared secret {shared_secret.id} for {node_uri}" ) async def send_revoke_notification( self, client: SatXMPPEntity, service: jid.JID, node: str, secret_id: str, recipient: jid.JID ) -> None: revoke_elt = domish.Element((NS_OXPS, "revoke")) revoke_elt["jid"] = service.full() revoke_elt["node"] = node revoke_elt["id"] = secret_id signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient]) payload_elt.addChild(revoke_elt) openpgp_elt = await self._ox.build_openpgp_element( client, signcrypt_elt, {recipient} ) message_elt = domish.Element((None, "message")) message_elt["from"] = client.jid.full() message_elt["to"] = recipient.full() message_elt.addChild((openpgp_elt)) self._h.addHintElements(message_elt, [self._h.HINT_STORE]) client.send(message_elt) def _ps_secret_share( self, recipient: str, service: str, node: str, secret_ids: List[str], profile_key: str ) -> defer.Deferred: return defer.ensureDeferred( self.share_secrets( self.host.getClient(profile_key), jid.JID(recipient), jid.JID(service) if service else None, node, secret_ids or None, ) ) async def share_secret( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, shared_secret: SharedSecret, recipient: jid.JID ) -> None: """Create and send <shared-secret> element""" if service is None: service = client.jid.userhostJID() shared_secret_elt = domish.Element((NS_OXPS, "shared-secret")) shared_secret_elt["jid"] = service.full() shared_secret_elt["node"] = node shared_secret_elt["id"] = shared_secret.id shared_secret_elt["timestamp"] = utils.xmpp_date(shared_secret.timestamp) if shared_secret.revoked: shared_secret_elt["revoked"] = C.BOOL_TRUE # TODO: add type attribute shared_secret_elt.addContent(shared_secret.key) signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient]) payload_elt.addChild(shared_secret_elt) openpgp_elt = await self._ox.build_openpgp_element( client, signcrypt_elt, {recipient} ) message_elt = domish.Element((None, "message")) message_elt["from"] = client.jid.full() message_elt["to"] = recipient.full() message_elt.addChild((openpgp_elt)) self._h.addHintElements(message_elt, [self._h.HINT_STORE]) client.send(message_elt) shared_secret.shared_with.add(recipient) async def share_secrets( self, client: SatXMPPEntity, recipient: jid.JID, service: Optional[jid.JID], node: str, secret_ids: Optional[List[str]] = None, ) -> None: """Share secrets of a pubsub node with a recipient @param recipient: who to share secrets with @param service: pubsub/PEP service where the node is @param node: node name @param secret_ids: IDs of the secrets to share, or None to share all known secrets (disabled or not) """ if service is None: service = client.jid.userhostJID() node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: # no secret shared yet, let's generate one shared_secret = self.generate_secret(client) shared_secrets = {shared_secret.id: shared_secret} await self.store_secrets(client, node_uri, shared_secrets) if secret_ids is None: # we share all secrets of the node to_share = shared_secrets.values() else: try: to_share = [shared_secrets[s_id] for s_id in secret_ids] except KeyError as e: raise exceptions.NotFound( f"no shared secret found with given ID: {e}" ) for shared_secret in to_share: await self.share_secret(client, service, node, shared_secret, recipient) await self.store_secrets(client, node_uri, shared_secrets) def _ps_secret_rotate( self, service: str, node: str, recipients: List[str], profile_key: str, ) -> defer.Deferred: return defer.ensureDeferred( self.rotate_secret( self.host.getClient(profile_key), jid.JID(service) if service else None, node, [jid.JID(r) for r in recipients] or None ) ) async def rotate_secret( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, recipients: Optional[List[jid.JID]] = None ) -> None: """Revoke all current known secrets, create and share a new one @param service: pubsub/PEP service where the node is @param node: node name @param recipients: who must receive the new shared secret if None, all recipients known to have last active shared secret will get the new secret """ if service is None: service = client.jid.userhostJID() node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: shared_secrets = {} for shared_secret in shared_secrets.values(): if not shared_secret.revoked: await self.revoke(client, service, node, shared_secret.id) shared_secret.revoked = True if recipients is None: if shared_secrets: # we get recipients from latests shared secret's shared_with list, # regarless of deprecation (cause all keys may be deprecated) recipients = list(sorted( shared_secrets.values(), key=lambda s: s.timestamp, reverse=True )[0].shared_with) else: recipients = [] shared_secret = self.generate_secret(client) shared_secrets[shared_secret.id] = shared_secret # we send notification to last entities known to already have the shared secret for recipient in recipients: await self.share_secret(client, service, node, shared_secret, recipient) await self.store_secrets(client, node_uri, shared_secrets) def _ps_secrets_list( self, service: str, node: str, profile_key: str ) -> defer.Deferred: d = defer.ensureDeferred( self.list_shared_secrets( self.host.getClient(profile_key), jid.JID(service) if service else None, node, ) ) d.addCallback(lambda ret: data_format.serialise(ret)) return d async def list_shared_secrets( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, ) -> List[Dict[str, Any]]: """Retrieve for shared secrets of a pubsub node @param service: pubsub/PEP service where the node is @param node: node name @return: shared secrets data @raise exceptions.NotFound: no shared secret found for this node """ if service is None: service = client.jid.userhostJID() node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: raise exceptions.NotFound(f"No shared secrets found for {node_uri}") return [ dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory) for s in shared_secrets.values() ] async def handle_revoke_elt( self, client: SatXMPPEntity, sender: jid.JID, revoke_elt: domish.Element ) -> None: """Parse a <revoke> element and update local secrets @param sender: bare jid of the entity who has signed the secret @param revoke: <revoke/> element """ try: service = jid.JID(revoke_elt["jid"]) node = revoke_elt["node"] secret_id = revoke_elt["id"] except (KeyError, RuntimeError) as e: log.warning( f"ignoring invalid <revoke> element: {e}\n{revoke_elt.toXml()}" ) return node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: log.warning( f"Can't revoke shared secret {secret_id}: no known shared secrets for " f"{node_uri}" ) return if any(s.origin != sender for s in shared_secrets.values()): log.warning( f"Rejecting shared secret revocation signed by invalid entity ({sender}):" f"\n{revoke_elt.toXml}" ) return try: shared_secret = shared_secrets[secret_id] except KeyError: log.warning( f"Can't revoke shared secret {secret_id}: this secret ID is unknown for " f"{node_uri}" ) return shared_secret.revoked = True await self.store_secrets(client, node_uri, shared_secrets) log.info(f"Shared secret {secret_id} has been revoked for {node_uri}") async def handle_shared_secret_elt( self, client: SatXMPPEntity, sender: jid.JID, shared_secret_elt: domish.Element ) -> None: """Parse a <shared-secret> element and update local secrets @param sender: bare jid of the entity who has signed the secret @param shared_secret_elt: <shared-secret/> element """ try: service = jid.JID(shared_secret_elt["jid"]) node = shared_secret_elt["node"] secret_id = shared_secret_elt["id"] timestamp = utils.parse_xmpp_date(shared_secret_elt["timestamp"]) # TODO: handle "type" attribute revoked = C.bool(shared_secret_elt.getAttribute("revoked", C.BOOL_FALSE)) except (KeyError, RuntimeError, ValueError) as e: log.warning( f"ignoring invalid <shared-secret> element: " f"{e}\n{shared_secret_elt.toXml()}" ) return key = str(shared_secret_elt) if not key: log.warning( "ignoring <shared-secret> element with empty key: " f"{shared_secret_elt.toXml()}" ) return shared_secret = SharedSecret( id=secret_id, key=key, timestamp=timestamp, origin=sender, revoked=revoked ) node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: shared_secrets = {} # no known shared secret yet for this node, we have to trust first user who # send it else: if any(s.origin != sender for s in shared_secrets.values()): log.warning( f"Rejecting shared secret signed by invalid entity ({sender}):\n" f"{shared_secret_elt.toXml}" ) return shared_secrets[shared_secret.id] = shared_secret await self.store_secrets(client, node_uri, shared_secrets) log.info( f"shared secret {shared_secret.id} added for {node_uri} [{client.profile}]" ) async def _publish_trigger( self, client: SatXMPPEntity, service: jid.JID, node: str, items: Optional[List[domish.Element]], options: Optional[dict], sender: jid.JID, extra: Dict[str, Any] ) -> bool: if not items or not extra.get("encrypted"): return True node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: shared_secrets = {} shared_secret = None else: current_secrets = [s for s in shared_secrets.values() if not s.revoked] if not current_secrets: shared_secret = None elif len(current_secrets) > 1: log.warning( f"more than one active shared secret found for node {node!r} at " f"{service}, using the most recent one" ) current_secrets.sort(key=lambda s: s.timestamp, reverse=True) shared_secret = current_secrets[0] else: shared_secret = current_secrets[0] if shared_secret is None: if any(s.origin != client.jid.userhostJID() for s in shared_secrets.values()): raise exceptions.PermissionError( "there is no known active shared secret, and you are not the " "creator of previous shared secrets, we can't encrypt items at " f"{node_uri} ." ) shared_secret = self.generate_secret(client) shared_secrets[shared_secret.id] = shared_secret await self.store_secrets(client, node_uri, shared_secrets) # TODO: notify other entities for item in items: item_elts = list(item.elements()) if len(item_elts) != 1: raise ValueError( f"there should be exactly one item payload: {item.toXml()}" ) item_payload = item_elts[0] log.debug(f"encrypting item {item.getAttribute('id', '')}") encrypted_item = self.gpg_provider.encrypt_symmetrically( item_payload.toXml().encode(), shared_secret.key ) item.children.clear() encrypted_elt = domish.Element((NS_OXPS, "encrypted")) encrypted_elt["key"] = shared_secret.id encrypted_elt.addContent(base64.b64encode(encrypted_item).decode()) item.addChild(encrypted_elt) return True async def _items_trigger( self, client: SatXMPPEntity, service: Optional[jid.JID], node: str, items: List[domish.Element], rsm_response: rsm.RSMResponse, extra: Dict[str, Any], ) -> bool: if not extra.get(C.KEY_DECRYPT, True): return True if service is None: service = client.jid.userhostJID() shared_secrets = None for item in items: payload = item.firstChildElement() if (payload is not None and payload.name == "encrypted" and payload.uri == NS_OXPS): encrypted_elt = payload secret_id = encrypted_elt.getAttribute("key") if not secret_id: log.warning( f'"key" attribute is missing from encrypted item: {item.toXml()}' ) continue if shared_secrets is None: node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node) shared_secrets = await self.load_secrets(client, node_uri) if shared_secrets is None: log.warning( f"No known shared secret for {node_uri}, can't decrypt" ) return True try: shared_secret = shared_secrets[secret_id] except KeyError: log.warning( f"No key known for encrypted item {item['id']!r} (shared secret " f"id: {secret_id!r})" ) continue log.debug(f"decrypting item {item.getAttribute('id', '')}") decrypted = self.gpg_provider.decrypt_symmetrically( base64.b64decode(str(encrypted_elt)), shared_secret.key ) decrypted_elt = xml_tools.parse(decrypted) item.children.clear() item.addChild(decrypted_elt) extra.setdefault("encrypted", {})[item["id"]] = {"type": NS_OXPS} return True async def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred ) -> bool: sender = jid.JID(message_elt["from"]).userhostJID() # there may be an openpgp element if OXIM is not activate, in this case we have to # decrypt it here openpgp_elt = next(message_elt.elements(NS_OX, "openpgp"), None) if openpgp_elt is not None: try: payload_elt, __ = await self._ox.unpack_openpgp_element( client, openpgp_elt, "signcrypt", sender ) except Exception as e: log.warning(f"Can't decrypt element: {e}\n{message_elt.toXml()}") return False message_elt.children.remove(openpgp_elt) for c in payload_elt.children: message_elt.addChild(c) shared_secret_elt = next(message_elt.elements(NS_OXPS, "shared-secret"), None) if shared_secret_elt is None: # no <shared-secret>, we check if there is a <revoke> element revoke_elt = next(message_elt.elements(NS_OXPS, "revoke"), None) if revoke_elt is None: return True else: await self.handle_revoke_elt(client, sender, revoke_elt) else: await self.handle_shared_secret_elt(client, sender, shared_secret_elt) return False @implementer(iwokkel.IDisco) class PubsubEncryption_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_OXPS)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []