Mercurial > libervia-backend
view libervia/backend/plugins/plugin_exp_data_policy.py @ 4378:930a4ea7ab6f
plugin data policy: Data Policy implementation:
This plugin implement data policy parsing and an algorithm to calculate a score based on
them.
rel 460
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Jun 2025 17:02:33 +0200 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for handling stateless file sharing encryption # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from enum import Enum, StrEnum, auto import enum from typing import TYPE_CHECKING, Self, cast from typing import get_type_hints from pydantic import BaseModel, ConfigDict, Field, computed_field from twisted.internet import defer from twisted.words.protocols.jabber import jid from wokkel import data_form from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) IMPORT_NAME = "DATA-POLICY" PLUGIN_INFO = { C.PI_NAME: "Data Policy", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_EXP, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [], C.PI_MAIN: "DATA_POLICY", C.PI_HANDLER: "no", } NS_DATA_POLICY_BASE = "urn:xmpp:data-policy" NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0" NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:" NS_DATA_POLICY_ID_SUFFIX = ":0" NS_DATA_POLICY_ID_TPL = ( f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}" ) class IndividualScore(BaseModel): score: int description: str class Score(BaseModel): score: int minimum: int maximum: int detail: list[IndividualScore] class ScoredStrEnum(StrEnum): _score_map = enum.nonmember({}) _min_score = enum.nonmember(0) _max_score = enum.nonmember(0) @classmethod def get_score(cls, value: str) -> Score: score, description = cls._score_map[value] return Score( score=score, minimum=cls._min_score, maximum=cls._max_score, detail=[IndividualScore(score=score, description=description)], ) def __init_subclass__(cls) -> None: try: score_map = cls._score_map except AttributeError: raise exceptions.InternalError('"_score_map" must be set.') if not score_map: raise exceptions.InternalError("ScoredEnum must set _score_map.") if set(score_map.keys()) != set(cls): raise exceptions.InternalError( "All enum members must be present in _score_map." ) all_scores = [score for score, _ in score_map.values()] cls._min_score = min(all_scores) cls._max_score = max(all_scores) class AuthMechanism(ScoredStrEnum): NO_AUTH = auto() PLAIN = auto() HIDDEN = auto() RESTRICTED = auto() _score_map = enum.nonmember( { NO_AUTH: (20, D_("No authentication is needed.")), PLAIN: (-20, D_("Your login data are transmitted to this service.")), HIDDEN: ( 0, D_("This service logs to your account, but doesn't get logging data."), ), RESTRICTED: ( 15, D_("This service logs to your account in a restricted way."), ), } ) class DataTransmission(ScoredStrEnum): PLAIN = auto() ENCRYPTED = auto() E2E = auto() GRE = auto() _score_map = enum.nonmember( { PLAIN: ( -20, D_( "Data is transmitted without encryption. This is highly insecure and " "risks data interception." ), ), ENCRYPTED: ( 0, D_( "Data is encrypted during transmission but not end-to-end. The " "service can view the data." ), ), E2E: ( 10, D_( "Data is end-to-end encrypted from the service. Only the service and " "the recipient(s) can view the data." ), ), GRE: ( 30, D_( "Data uses Gateway Relayed Encryption, ensuring end-to-end security, " "only your and your recipient(s) can view the data. Highly secure." ), ), } ) class AccessPolicy(ScoredStrEnum): ADMINS = auto() MODERATORS = auto() ORGANIZATION_MEMBER = auto() GOVERNMENT = auto() ADVERTISERS = auto() PARTNERS = auto() NONE = auto() _score_map = enum.nonmember( { ADMINS: ( -5, D_( "Service administrators can access user data for operational " "purposes." ), ), MODERATORS: ( -10, D_("Moderators can access user data within their moderation scope."), ), ORGANIZATION_MEMBER: ( -15, D_("Any organization member can access user data."), ), GOVERNMENT: ( -10, D_( "Government authorities can access user data under legal " "requirements." ), ), ADVERTISERS: ( -30, D_("Third-party advertisers can access user data for targeted ads."), ), PARTNERS: ( -20, D_("Business partners can access user data under agreements."), ), NONE: (20, D_("No entity other than the user can access user data.")), } ) class DataPolicy(BaseModel): """Represents a data policy form as defined in Data Policy XEP. Fields correspond to the data policy specification and may be None when not provided. """ model_config = ConfigDict(use_enum_values=True) auth_data: AuthMechanism | None = None data_transmission: DataTransmission | None = None encryption_algorithm: str | None = None data_retention: str | None = None data_deletion: bool | None = None encryption_at_rest: bool | None = None tos: str | None = None data_export: bool | None = None access_policy: set[AccessPolicy] | None = None full_erasure: bool | None = None backup_frequency: str | None = None backup_retention: str | None = None extra_info: str | None = None @computed_field @property def score(self) -> Score: """Calculate a score based on the filled fields. This score helps assess the quality of the data policy at a glance. """ total_score = 0 overall_min = 0 overall_max = 0 detail = [] fields_names = set(self.__class__.model_fields.keys()) for field_name in fields_names: score = getattr(self, f"_{field_name}_score") if score is not None: total_score += score.score overall_min += score.minimum overall_max += score.maximum detail.extend(score.detail) return Score( score=total_score, minimum=overall_min, maximum=overall_max, detail=detail ) @property def _auth_data_score(self) -> Score | None: if self.auth_data is None: return None return AuthMechanism.get_score(self.auth_data) @property def _data_transmission_score(self) -> Score | None: if self.data_transmission is None: return None return DataTransmission.get_score(self.data_transmission) @property def _encryption_algorithm_score(self) -> Score | None: ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}.")) ENCRYPTION_ALGORITHM_UNSET = ( -10, D_("The encryption algorithm is not specified."), ) ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET) if not self.data_transmission or self.data_transmission not in ( DataTransmission.E2E, DataTransmission.ENCRYPTED, ): return None if self.encryption_algorithm: score, desc = ENCRYPTION_ALGORITHM_SET desc = desc.format(self.encryption_algorithm) else: score, desc = ENCRYPTION_ALGORITHM_UNSET return Score( score=0, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _data_retention_score(self) -> Score | None: if self.data_retention is None: return None DATA_RETENTION_0 = (10, D_("The service does not store data.")) DATA_RETENTION_INFINITE = ( -15, D_("Data is stored indefinitely, which may pose privacy risks."), ) DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown.")) DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s).")) DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r}).")) ALL = [ DATA_RETENTION_0, DATA_RETENTION_INFINITE, DATA_RETENTION_UNKNOWN, DATA_RETENTION_DEFAULT, DATA_RETENTION_INVALID, ] value = self.data_retention if value == "0": score, desc = DATA_RETENTION_0 elif value == "infinite": score, desc = DATA_RETENTION_INFINITE elif value == "unknown": score, desc = DATA_RETENTION_UNKNOWN else: try: hours = int(value) days = hours / 24 desc = DATA_RETENTION_DEFAULT[1].format(days=days) score = DATA_RETENTION_DEFAULT[0] except ValueError: score, desc = DATA_RETENTION_INVALID return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _data_deletion_score(self) -> Score | None: if self.data_deletion is None: return None DATA_DELETION_TRUE = (20, D_("Users can delete data on this service.")) DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service.")) ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE] score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _encryption_at_rest_score(self) -> Score | None: if self.encryption_at_rest is None: return None ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest.")) ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest.")) ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE] score, desc = ( ENCRYPTION_AT_REST_TRUE if self.encryption_at_rest else ENCRYPTION_AT_REST_FALSE ) return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _tos_score(self) -> Score: TOS_SET = (5, D_("Terms of Service are linked.")) TOS_UNSET = (-5, D_("Terms of Service are not linked.")) ALL = [TOS_SET, TOS_UNSET] score, desc = TOS_SET if self.tos else TOS_UNSET return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _data_export_score(self) -> Score | None: if self.data_export is None: return None DATA_EXPORT_TRUE = (15, D_("Users can export their data.")) DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data.")) ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE] score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _full_erasure_score(self) -> Score | None: if self.full_erasure is None: return None FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data.")) FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data.")) ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE] score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _backup_frequency_score(self) -> Score | None: if self.backup_frequency is None: return None BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups.")) BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s).")) BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}.")) ALL = [ BACKUP_FREQUENCY_0, BACKUP_FREQUENCY_DEFAULT, BACKUP_FREQUENCY_INVALID, ] value = self.backup_frequency if value == "0": score, desc = BACKUP_FREQUENCY_0 else: try: hours = int(value) days = hours / 24 desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days) score = BACKUP_FREQUENCY_DEFAULT[0] except ValueError: score, desc = BACKUP_FREQUENCY_INVALID return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _backup_retention_score(self) -> Score | None: if self.backup_retention is None: return None BACKUP_RETENTION_0 = (0, D_("No backups are done.")) BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely.")) BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown.")) BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s).")) BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}.")) ALL = [ BACKUP_RETENTION_0, BACKUP_RETENTION_INFINITE, BACKUP_RETENTION_UNKNOWN, BACKUP_RETENTION_DEFAULT, BACKUP_RETENTION_INVALID, ] value = self.backup_retention if value == "0": score, desc = BACKUP_RETENTION_0 elif value == "infinite": score, desc = BACKUP_RETENTION_INFINITE elif value == "unknown": score, desc = BACKUP_RETENTION_UNKNOWN else: try: hours = int(value) days = hours / 24 desc = BACKUP_RETENTION_DEFAULT[1].format(days=days) score = BACKUP_RETENTION_DEFAULT[0] except ValueError: score, desc = BACKUP_RETENTION_INVALID return Score( score=score, minimum=min(score for score, _ in ALL), maximum=max(score for score, _ in ALL), detail=[IndividualScore(score=score, description=desc)], ) @property def _access_policy_score(self) -> Score | None: if self.access_policy is None: return None total_score = 0 overall_min = 0 overall_max = 0 details = [] for policy in self.access_policy: policy_score = AccessPolicy.get_score(policy) total_score += policy_score.score overall_min += policy_score.minimum overall_max += policy_score.maximum details.extend(policy_score.detail) return Score( score=total_score, minimum=overall_min, maximum=overall_max, detail=details ) @property def _extra_info_score(self) -> Score | None: if self.extra_info is None: return None return Score(score=0, minimum=0, maximum=0, detail=[]) @classmethod def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None": """Create a DataPolicy instance from a Wokkel Data Form. @param form: The data form to parse. @return: Parsed DataPolicy instance or None if form type doesn't match. """ if not form.formNamespace or not form.formNamespace.startswith( NS_DATA_POLICY_BASE ): return None fields = cls.model_fields.keys() kwargs = {} for name in fields: if name not in form: continue value = form.get(name) match name: case "access_policy": if not value: continue policy_set = set() policies = [value] if not isinstance(value, list) else value for policy in policies: if policy: policy_set.add(AccessPolicy(policy)) if policy_set: kwargs[name] = policy_set case "auth_data": if value: kwargs[name] = AuthMechanism(value) case "data_transmission": if value: kwargs[name] = DataTransmission(value) case "extra_info": if isinstance(value, list): kwargs[name] = "\n".join(str(line) for line in value) else: kwargs[name] = str(value) case _: kwargs[name] = value return cls(**kwargs) def to_data_form( self, category: str | None = None, type_: str | None = None ) -> data_form.Form: """Convert this model to a Wokkel Data Form. @return: Form with type='result' containing all non-None fields. """ if category is not None: if type_ is not None: raise exceptions.InternalError( 'If "category" is set, "type_" must be set too.' ) form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_) else: form_ns = NS_DATA_POLICY form_fields = [] for name in self.__class__.model_fields.keys(): value = getattr(self, name) if value is None: continue match name: case "auth_data" | "data_transmission": form_fields.append( data_form.Field( fieldType="list-single", var=name, value=value.value, ) ) case ( "data_deletion" | "encryption_at_rest" | "data_export" | "full_erasure" ): form_fields.append( data_form.Field( fieldType="boolean", var=name, value=value, ) ) case "access_policy": field_values = [policy.value for policy in value] form_fields.append( data_form.Field( fieldType="list-multi", var=name, values=field_values, ) ) case "extra_info": lines = value.split("\n") form_fields.append( data_form.Field( fieldType="text-multi", var=name, values=lines, ) ) case _: form_fields.append( data_form.Field( fieldType="text-single", var=name, value=str(value), ) ) return data_form.Form( formType="result", formNamespace=form_ns, fields=form_fields, ) class DataPolicies(BaseModel): main: DataPolicy services: dict[str, DataPolicy] = Field( default=dict(), description=( "Identity to data policy map. Identity is used as key with the " 'template "{identity}:{type}".' ), ) class DATA_POLICY: namespace = NS_DATA_POLICY def __init__(self, host: "LiberviaBackend") -> None: log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host host.register_namespace("data-policy", NS_DATA_POLICY) host.bridge.add_method( "data_policy_get", ".plugin", in_sign="ss", out_sign="s", method=self._data_policy_get, async_=True, ) def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]: client = self.host.get_client(profile_key) d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target))) d.addCallback( lambda data_policies: ( "" if data_policies is None else data_policies.model_dump_json() ) ) d = cast(defer.Deferred[str], d) return d async def get_data_policy( self, client: SatXMPPEntity, target_jid: jid.JID ) -> DataPolicies | None: infos = await self.host.memory.disco.get_infos(client, target_jid) if NS_DATA_POLICY not in infos.extensions: return None # Main data policy. data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY]) if data_policy is None: log.error(f"DataPolicy should be found at this point.") return None data_policies = DataPolicies(main=data_policy) # Now we looks for identities data policies. for namespace, form in infos.extensions.items(): if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith( NS_DATA_POLICY_ID_SUFFIX ): identity_data = namespace[ len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX) ] try: category, type_ = identity_data.split(":", 1) except ValueError: log.warning( "Invalid namespace for identity data policy: " f"{namespace!r}" ) else: id_data_policy = DataPolicy.from_data_form(form) if id_data_policy is not None: data_policies.services[f"{category}:{type_}"] = id_data_policy return data_policies