# HG changeset patch # User Goffi # Date 1750950153 -7200 # Node ID 930a4ea7ab6f1e262dc68667ff255c3290c8d8f3 # Parent 448d701187b8d83d3388a383f7e21b765436ffd7 plugin data policy: Data Policy implementation: This plugin implement data policy parsing and an algorithm to calculate a score based on them. rel 460 diff -r 448d701187b8 -r 930a4ea7ab6f libervia/backend/plugins/plugin_exp_data_policy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_exp_data_policy.py Thu Jun 26 17:02:33 2025 +0200 @@ -0,0 +1,742 @@ +#!/usr/bin/env python3 + +# Libervia plugin for handling stateless file sharing encryption +# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +from enum import Enum, StrEnum, auto +import enum +from typing import TYPE_CHECKING, Self, cast +from typing import get_type_hints + +from pydantic import BaseModel, ConfigDict, Field, computed_field +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from wokkel import data_form + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger + +if TYPE_CHECKING: + from libervia.backend.core.main import LiberviaBackend + +log = getLogger(__name__) +IMPORT_NAME = "DATA-POLICY" + +PLUGIN_INFO = { + C.PI_NAME: "Data Policy", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_EXP, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "DATA_POLICY", + C.PI_HANDLER: "no", +} + +NS_DATA_POLICY_BASE = "urn:xmpp:data-policy" +NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0" +NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:" +NS_DATA_POLICY_ID_SUFFIX = ":0" +NS_DATA_POLICY_ID_TPL = ( + f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}" +) + + +class IndividualScore(BaseModel): + score: int + description: str + + +class Score(BaseModel): + score: int + minimum: int + maximum: int + detail: list[IndividualScore] + + +class ScoredStrEnum(StrEnum): + _score_map = enum.nonmember({}) + _min_score = enum.nonmember(0) + _max_score = enum.nonmember(0) + + @classmethod + def get_score(cls, value: str) -> Score: + score, description = cls._score_map[value] + return Score( + score=score, + minimum=cls._min_score, + maximum=cls._max_score, + detail=[IndividualScore(score=score, description=description)], + ) + + def __init_subclass__(cls) -> None: + try: + score_map = cls._score_map + except AttributeError: + raise exceptions.InternalError('"_score_map" must be set.') + if not score_map: + raise exceptions.InternalError("ScoredEnum must set _score_map.") + + if set(score_map.keys()) != set(cls): + raise exceptions.InternalError( + "All enum members must be present in _score_map." + ) + + all_scores = [score for score, _ in score_map.values()] + cls._min_score = min(all_scores) + cls._max_score = max(all_scores) + + +class AuthMechanism(ScoredStrEnum): + NO_AUTH = auto() + PLAIN = auto() + HIDDEN = auto() + RESTRICTED = auto() + + _score_map = enum.nonmember( + { + NO_AUTH: (20, D_("No authentication is needed.")), + PLAIN: (-20, D_("Your login data are transmitted to this service.")), + HIDDEN: ( + 0, + D_("This service logs to your account, but doesn't get logging data."), + ), + RESTRICTED: ( + 15, + D_("This service logs to your account in a restricted way."), + ), + } + ) + + +class DataTransmission(ScoredStrEnum): + PLAIN = auto() + ENCRYPTED = auto() + E2E = auto() + GRE = auto() + + _score_map = enum.nonmember( + { + PLAIN: ( + -20, + D_( + "Data is transmitted without encryption. This is highly insecure and " + "risks data interception." + ), + ), + ENCRYPTED: ( + 0, + D_( + "Data is encrypted during transmission but not end-to-end. The " + "service can view the data." + ), + ), + E2E: ( + 10, + D_( + "Data is end-to-end encrypted from the service. Only the service and " + "the recipient(s) can view the data." + ), + ), + GRE: ( + 30, + D_( + "Data uses Gateway Relayed Encryption, ensuring end-to-end security, " + "only your and your recipient(s) can view the data. Highly secure." + ), + ), + } + ) + + +class AccessPolicy(ScoredStrEnum): + ADMINS = auto() + MODERATORS = auto() + ORGANIZATION_MEMBER = auto() + GOVERNMENT = auto() + ADVERTISERS = auto() + PARTNERS = auto() + NONE = auto() + + _score_map = enum.nonmember( + { + ADMINS: ( + -5, + D_( + "Service administrators can access user data for operational " + "purposes." + ), + ), + MODERATORS: ( + -10, + D_("Moderators can access user data within their moderation scope."), + ), + ORGANIZATION_MEMBER: ( + -15, + D_("Any organization member can access user data."), + ), + GOVERNMENT: ( + -10, + D_( + "Government authorities can access user data under legal " + "requirements." + ), + ), + ADVERTISERS: ( + -30, + D_("Third-party advertisers can access user data for targeted ads."), + ), + PARTNERS: ( + -20, + D_("Business partners can access user data under agreements."), + ), + NONE: (20, D_("No entity other than the user can access user data.")), + } + ) + + +class DataPolicy(BaseModel): + """Represents a data policy form as defined in Data Policy XEP. + + Fields correspond to the data policy specification and may be None when not provided. + """ + + model_config = ConfigDict(use_enum_values=True) + + auth_data: AuthMechanism | None = None + data_transmission: DataTransmission | None = None + encryption_algorithm: str | None = None + data_retention: str | None = None + data_deletion: bool | None = None + encryption_at_rest: bool | None = None + tos: str | None = None + data_export: bool | None = None + access_policy: set[AccessPolicy] | None = None + full_erasure: bool | None = None + backup_frequency: str | None = None + backup_retention: str | None = None + extra_info: str | None = None + + @computed_field + @property + def score(self) -> Score: + """Calculate a score based on the filled fields. + + This score helps assess the quality of the data policy at a glance. + """ + total_score = 0 + overall_min = 0 + overall_max = 0 + detail = [] + + fields_names = set(self.__class__.model_fields.keys()) + + for field_name in fields_names: + score = getattr(self, f"_{field_name}_score") + if score is not None: + total_score += score.score + overall_min += score.minimum + overall_max += score.maximum + detail.extend(score.detail) + + return Score( + score=total_score, minimum=overall_min, maximum=overall_max, detail=detail + ) + + @property + def _auth_data_score(self) -> Score | None: + if self.auth_data is None: + return None + + return AuthMechanism.get_score(self.auth_data) + + @property + def _data_transmission_score(self) -> Score | None: + if self.data_transmission is None: + return None + + return DataTransmission.get_score(self.data_transmission) + + @property + def _encryption_algorithm_score(self) -> Score | None: + ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}.")) + ENCRYPTION_ALGORITHM_UNSET = ( + -10, + D_("The encryption algorithm is not specified."), + ) + ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET) + if not self.data_transmission or self.data_transmission not in ( + DataTransmission.E2E, + DataTransmission.ENCRYPTED, + ): + return None + + if self.encryption_algorithm: + score, desc = ENCRYPTION_ALGORITHM_SET + desc = desc.format(self.encryption_algorithm) + else: + score, desc = ENCRYPTION_ALGORITHM_UNSET + + return Score( + score=0, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _data_retention_score(self) -> Score | None: + if self.data_retention is None: + return None + + DATA_RETENTION_0 = (10, D_("The service does not store data.")) + DATA_RETENTION_INFINITE = ( + -15, + D_("Data is stored indefinitely, which may pose privacy risks."), + ) + DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown.")) + DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s).")) + DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r}).")) + ALL = [ + DATA_RETENTION_0, + DATA_RETENTION_INFINITE, + DATA_RETENTION_UNKNOWN, + DATA_RETENTION_DEFAULT, + DATA_RETENTION_INVALID, + ] + + value = self.data_retention + if value == "0": + score, desc = DATA_RETENTION_0 + elif value == "infinite": + score, desc = DATA_RETENTION_INFINITE + elif value == "unknown": + score, desc = DATA_RETENTION_UNKNOWN + else: + try: + hours = int(value) + days = hours / 24 + desc = DATA_RETENTION_DEFAULT[1].format(days=days) + score = DATA_RETENTION_DEFAULT[0] + except ValueError: + score, desc = DATA_RETENTION_INVALID + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _data_deletion_score(self) -> Score | None: + if self.data_deletion is None: + return None + + DATA_DELETION_TRUE = (20, D_("Users can delete data on this service.")) + DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service.")) + ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE] + + score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _encryption_at_rest_score(self) -> Score | None: + if self.encryption_at_rest is None: + return None + + ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest.")) + ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest.")) + ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE] + + score, desc = ( + ENCRYPTION_AT_REST_TRUE + if self.encryption_at_rest + else ENCRYPTION_AT_REST_FALSE + ) + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _tos_score(self) -> Score: + TOS_SET = (5, D_("Terms of Service are linked.")) + TOS_UNSET = (-5, D_("Terms of Service are not linked.")) + ALL = [TOS_SET, TOS_UNSET] + score, desc = TOS_SET if self.tos else TOS_UNSET + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _data_export_score(self) -> Score | None: + if self.data_export is None: + return None + + DATA_EXPORT_TRUE = (15, D_("Users can export their data.")) + DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data.")) + ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE] + + score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _full_erasure_score(self) -> Score | None: + if self.full_erasure is None: + return None + + FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data.")) + FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data.")) + ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE] + + score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _backup_frequency_score(self) -> Score | None: + if self.backup_frequency is None: + return None + + BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups.")) + BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s).")) + BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}.")) + ALL = [ + BACKUP_FREQUENCY_0, + BACKUP_FREQUENCY_DEFAULT, + BACKUP_FREQUENCY_INVALID, + ] + + value = self.backup_frequency + if value == "0": + score, desc = BACKUP_FREQUENCY_0 + else: + try: + hours = int(value) + days = hours / 24 + desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days) + score = BACKUP_FREQUENCY_DEFAULT[0] + except ValueError: + score, desc = BACKUP_FREQUENCY_INVALID + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _backup_retention_score(self) -> Score | None: + if self.backup_retention is None: + return None + + BACKUP_RETENTION_0 = (0, D_("No backups are done.")) + BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely.")) + BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown.")) + BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s).")) + BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}.")) + ALL = [ + BACKUP_RETENTION_0, + BACKUP_RETENTION_INFINITE, + BACKUP_RETENTION_UNKNOWN, + BACKUP_RETENTION_DEFAULT, + BACKUP_RETENTION_INVALID, + ] + + value = self.backup_retention + if value == "0": + score, desc = BACKUP_RETENTION_0 + elif value == "infinite": + score, desc = BACKUP_RETENTION_INFINITE + elif value == "unknown": + score, desc = BACKUP_RETENTION_UNKNOWN + else: + try: + hours = int(value) + days = hours / 24 + desc = BACKUP_RETENTION_DEFAULT[1].format(days=days) + score = BACKUP_RETENTION_DEFAULT[0] + except ValueError: + score, desc = BACKUP_RETENTION_INVALID + + return Score( + score=score, + minimum=min(score for score, _ in ALL), + maximum=max(score for score, _ in ALL), + detail=[IndividualScore(score=score, description=desc)], + ) + + @property + def _access_policy_score(self) -> Score | None: + if self.access_policy is None: + return None + + total_score = 0 + overall_min = 0 + overall_max = 0 + details = [] + + for policy in self.access_policy: + policy_score = AccessPolicy.get_score(policy) + total_score += policy_score.score + overall_min += policy_score.minimum + overall_max += policy_score.maximum + details.extend(policy_score.detail) + + return Score( + score=total_score, minimum=overall_min, maximum=overall_max, detail=details + ) + + @property + def _extra_info_score(self) -> Score | None: + if self.extra_info is None: + return None + + return Score(score=0, minimum=0, maximum=0, detail=[]) + + @classmethod + def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None": + """Create a DataPolicy instance from a Wokkel Data Form. + + @param form: The data form to parse. + @return: Parsed DataPolicy instance or None if form type doesn't match. + """ + if not form.formNamespace or not form.formNamespace.startswith( + NS_DATA_POLICY_BASE + ): + return None + + fields = cls.model_fields.keys() + kwargs = {} + + for name in fields: + if name not in form: + continue + + value = form.get(name) + + match name: + case "access_policy": + if not value: + continue + + policy_set = set() + policies = [value] if not isinstance(value, list) else value + for policy in policies: + if policy: + policy_set.add(AccessPolicy(policy)) + if policy_set: + kwargs[name] = policy_set + + case "auth_data": + if value: + kwargs[name] = AuthMechanism(value) + + case "data_transmission": + if value: + kwargs[name] = DataTransmission(value) + + case "extra_info": + if isinstance(value, list): + kwargs[name] = "\n".join(str(line) for line in value) + else: + kwargs[name] = str(value) + + case _: + kwargs[name] = value + + return cls(**kwargs) + + def to_data_form( + self, category: str | None = None, type_: str | None = None + ) -> data_form.Form: + """Convert this model to a Wokkel Data Form. + + @return: Form with type='result' containing all non-None fields. + """ + if category is not None: + if type_ is not None: + raise exceptions.InternalError( + 'If "category" is set, "type_" must be set too.' + ) + form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_) + else: + form_ns = NS_DATA_POLICY + form_fields = [] + + for name in self.__class__.model_fields.keys(): + value = getattr(self, name) + if value is None: + continue + + match name: + case "auth_data" | "data_transmission": + form_fields.append( + data_form.Field( + fieldType="list-single", + var=name, + value=value.value, + ) + ) + case ( + "data_deletion" + | "encryption_at_rest" + | "data_export" + | "full_erasure" + ): + form_fields.append( + data_form.Field( + fieldType="boolean", + var=name, + value=value, + ) + ) + case "access_policy": + field_values = [policy.value for policy in value] + form_fields.append( + data_form.Field( + fieldType="list-multi", + var=name, + values=field_values, + ) + ) + case "extra_info": + lines = value.split("\n") + form_fields.append( + data_form.Field( + fieldType="text-multi", + var=name, + values=lines, + ) + ) + case _: + form_fields.append( + data_form.Field( + fieldType="text-single", + var=name, + value=str(value), + ) + ) + + return data_form.Form( + formType="result", + formNamespace=form_ns, + fields=form_fields, + ) + + +class DataPolicies(BaseModel): + main: DataPolicy + services: dict[str, DataPolicy] = Field( + default=dict(), + description=( + "Identity to data policy map. Identity is used as key with the " + 'template "{identity}:{type}".' + ), + ) + + +class DATA_POLICY: + namespace = NS_DATA_POLICY + + def __init__(self, host: "LiberviaBackend") -> None: + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + host.register_namespace("data-policy", NS_DATA_POLICY) + host.bridge.add_method( + "data_policy_get", + ".plugin", + in_sign="ss", + out_sign="s", + method=self._data_policy_get, + async_=True, + ) + + def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]: + client = self.host.get_client(profile_key) + d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target))) + d.addCallback( + lambda data_policies: ( + "" if data_policies is None else data_policies.model_dump_json() + ) + ) + d = cast(defer.Deferred[str], d) + return d + + async def get_data_policy( + self, client: SatXMPPEntity, target_jid: jid.JID + ) -> DataPolicies | None: + infos = await self.host.memory.disco.get_infos(client, target_jid) + + if NS_DATA_POLICY not in infos.extensions: + return None + + # Main data policy. + data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY]) + if data_policy is None: + log.error(f"DataPolicy should be found at this point.") + return None + + data_policies = DataPolicies(main=data_policy) + + # Now we looks for identities data policies. + for namespace, form in infos.extensions.items(): + if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith( + NS_DATA_POLICY_ID_SUFFIX + ): + identity_data = namespace[ + len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX) + ] + try: + category, type_ = identity_data.split(":", 1) + except ValueError: + log.warning( + "Invalid namespace for identity data policy: " f"{namespace!r}" + ) + else: + id_data_policy = DataPolicy.from_data_form(form) + if id_data_policy is not None: + data_policies.services[f"{category}:{type_}"] = id_data_policy + return data_policies