diff libervia/backend/plugins/plugin_exp_data_policy.py @ 4378:930a4ea7ab6f

plugin data policy: Data Policy implementation: This plugin implement data policy parsing and an algorithm to calculate a score based on them. rel 460
author Goffi <goffi@goffi.org>
date Thu, 26 Jun 2025 17:02:33 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_exp_data_policy.py	Thu Jun 26 17:02:33 2025 +0200
@@ -0,0 +1,742 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for handling stateless file sharing encryption
+# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from enum import Enum, StrEnum, auto
+import enum
+from typing import TYPE_CHECKING, Self, cast
+from typing import get_type_hints
+
+from pydantic import BaseModel, ConfigDict, Field, computed_field
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from wokkel import data_form
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import D_, _
+from libervia.backend.core.log import getLogger
+
+if TYPE_CHECKING:
+    from libervia.backend.core.main import LiberviaBackend
+
+log = getLogger(__name__)
+IMPORT_NAME = "DATA-POLICY"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Data Policy",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_EXP,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: [],
+    C.PI_MAIN: "DATA_POLICY",
+    C.PI_HANDLER: "no",
+}
+
+NS_DATA_POLICY_BASE = "urn:xmpp:data-policy"
+NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0"
+NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:"
+NS_DATA_POLICY_ID_SUFFIX = ":0"
+NS_DATA_POLICY_ID_TPL = (
+    f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}"
+)
+
+
+class IndividualScore(BaseModel):
+    score: int
+    description: str
+
+
+class Score(BaseModel):
+    score: int
+    minimum: int
+    maximum: int
+    detail: list[IndividualScore]
+
+
+class ScoredStrEnum(StrEnum):
+    _score_map = enum.nonmember({})
+    _min_score = enum.nonmember(0)
+    _max_score = enum.nonmember(0)
+
+    @classmethod
+    def get_score(cls, value: str) -> Score:
+        score, description = cls._score_map[value]
+        return Score(
+            score=score,
+            minimum=cls._min_score,
+            maximum=cls._max_score,
+            detail=[IndividualScore(score=score, description=description)],
+        )
+
+    def __init_subclass__(cls) -> None:
+        try:
+            score_map = cls._score_map
+        except AttributeError:
+            raise exceptions.InternalError('"_score_map" must be set.')
+        if not score_map:
+            raise exceptions.InternalError("ScoredEnum must set _score_map.")
+
+        if set(score_map.keys()) != set(cls):
+            raise exceptions.InternalError(
+                "All enum members must be present in _score_map."
+            )
+
+        all_scores = [score for score, _ in score_map.values()]
+        cls._min_score = min(all_scores)
+        cls._max_score = max(all_scores)
+
+
+class AuthMechanism(ScoredStrEnum):
+    NO_AUTH = auto()
+    PLAIN = auto()
+    HIDDEN = auto()
+    RESTRICTED = auto()
+
+    _score_map = enum.nonmember(
+        {
+            NO_AUTH: (20, D_("No authentication is needed.")),
+            PLAIN: (-20, D_("Your login data are transmitted to this service.")),
+            HIDDEN: (
+                0,
+                D_("This service logs to your account, but doesn't get logging data."),
+            ),
+            RESTRICTED: (
+                15,
+                D_("This service logs to your account in a restricted way."),
+            ),
+        }
+    )
+
+
+class DataTransmission(ScoredStrEnum):
+    PLAIN = auto()
+    ENCRYPTED = auto()
+    E2E = auto()
+    GRE = auto()
+
+    _score_map = enum.nonmember(
+        {
+            PLAIN: (
+                -20,
+                D_(
+                    "Data is transmitted without encryption. This is highly insecure and "
+                    "risks data interception."
+                ),
+            ),
+            ENCRYPTED: (
+                0,
+                D_(
+                    "Data is encrypted during transmission but not end-to-end. The "
+                    "service can view the data."
+                ),
+            ),
+            E2E: (
+                10,
+                D_(
+                    "Data is end-to-end encrypted from the service. Only the service and "
+                    "the recipient(s) can view the data."
+                ),
+            ),
+            GRE: (
+                30,
+                D_(
+                    "Data uses Gateway Relayed Encryption, ensuring end-to-end security, "
+                    "only your and your recipient(s) can view the data. Highly secure."
+                ),
+            ),
+        }
+    )
+
+
+class AccessPolicy(ScoredStrEnum):
+    ADMINS = auto()
+    MODERATORS = auto()
+    ORGANIZATION_MEMBER = auto()
+    GOVERNMENT = auto()
+    ADVERTISERS = auto()
+    PARTNERS = auto()
+    NONE = auto()
+
+    _score_map = enum.nonmember(
+        {
+            ADMINS: (
+                -5,
+                D_(
+                    "Service administrators can access user data for operational "
+                    "purposes."
+                ),
+            ),
+            MODERATORS: (
+                -10,
+                D_("Moderators can access user data within their moderation scope."),
+            ),
+            ORGANIZATION_MEMBER: (
+                -15,
+                D_("Any organization member can access user data."),
+            ),
+            GOVERNMENT: (
+                -10,
+                D_(
+                    "Government authorities can access user data under legal "
+                    "requirements."
+                ),
+            ),
+            ADVERTISERS: (
+                -30,
+                D_("Third-party advertisers can access user data for targeted ads."),
+            ),
+            PARTNERS: (
+                -20,
+                D_("Business partners can access user data under agreements."),
+            ),
+            NONE: (20, D_("No entity other than the user can access user data.")),
+        }
+    )
+
+
+class DataPolicy(BaseModel):
+    """Represents a data policy form as defined in Data Policy XEP.
+
+    Fields correspond to the data policy specification and may be None when not provided.
+    """
+
+    model_config = ConfigDict(use_enum_values=True)
+
+    auth_data: AuthMechanism | None = None
+    data_transmission: DataTransmission | None = None
+    encryption_algorithm: str | None = None
+    data_retention: str | None = None
+    data_deletion: bool | None = None
+    encryption_at_rest: bool | None = None
+    tos: str | None = None
+    data_export: bool | None = None
+    access_policy: set[AccessPolicy] | None = None
+    full_erasure: bool | None = None
+    backup_frequency: str | None = None
+    backup_retention: str | None = None
+    extra_info: str | None = None
+
+    @computed_field
+    @property
+    def score(self) -> Score:
+        """Calculate a score based on the filled fields.
+
+        This score helps assess the quality of the data policy at a glance.
+        """
+        total_score = 0
+        overall_min = 0
+        overall_max = 0
+        detail = []
+
+        fields_names = set(self.__class__.model_fields.keys())
+
+        for field_name in fields_names:
+            score = getattr(self, f"_{field_name}_score")
+            if score is not None:
+                total_score += score.score
+                overall_min += score.minimum
+                overall_max += score.maximum
+                detail.extend(score.detail)
+
+        return Score(
+            score=total_score, minimum=overall_min, maximum=overall_max, detail=detail
+        )
+
+    @property
+    def _auth_data_score(self) -> Score | None:
+        if self.auth_data is None:
+            return None
+
+        return AuthMechanism.get_score(self.auth_data)
+
+    @property
+    def _data_transmission_score(self) -> Score | None:
+        if self.data_transmission is None:
+            return None
+
+        return DataTransmission.get_score(self.data_transmission)
+
+    @property
+    def _encryption_algorithm_score(self) -> Score | None:
+        ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}."))
+        ENCRYPTION_ALGORITHM_UNSET = (
+            -10,
+            D_("The encryption algorithm is not specified."),
+        )
+        ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET)
+        if not self.data_transmission or self.data_transmission not in (
+            DataTransmission.E2E,
+            DataTransmission.ENCRYPTED,
+        ):
+            return None
+
+        if self.encryption_algorithm:
+            score, desc = ENCRYPTION_ALGORITHM_SET
+            desc = desc.format(self.encryption_algorithm)
+        else:
+            score, desc = ENCRYPTION_ALGORITHM_UNSET
+
+        return Score(
+            score=0,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _data_retention_score(self) -> Score | None:
+        if self.data_retention is None:
+            return None
+
+        DATA_RETENTION_0 = (10, D_("The service does not store data."))
+        DATA_RETENTION_INFINITE = (
+            -15,
+            D_("Data is stored indefinitely, which may pose privacy risks."),
+        )
+        DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown."))
+        DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s)."))
+        DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r})."))
+        ALL = [
+            DATA_RETENTION_0,
+            DATA_RETENTION_INFINITE,
+            DATA_RETENTION_UNKNOWN,
+            DATA_RETENTION_DEFAULT,
+            DATA_RETENTION_INVALID,
+        ]
+
+        value = self.data_retention
+        if value == "0":
+            score, desc = DATA_RETENTION_0
+        elif value == "infinite":
+            score, desc = DATA_RETENTION_INFINITE
+        elif value == "unknown":
+            score, desc = DATA_RETENTION_UNKNOWN
+        else:
+            try:
+                hours = int(value)
+                days = hours / 24
+                desc = DATA_RETENTION_DEFAULT[1].format(days=days)
+                score = DATA_RETENTION_DEFAULT[0]
+            except ValueError:
+                score, desc = DATA_RETENTION_INVALID
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _data_deletion_score(self) -> Score | None:
+        if self.data_deletion is None:
+            return None
+
+        DATA_DELETION_TRUE = (20, D_("Users can delete data on this service."))
+        DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service."))
+        ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE]
+
+        score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _encryption_at_rest_score(self) -> Score | None:
+        if self.encryption_at_rest is None:
+            return None
+
+        ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest."))
+        ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest."))
+        ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE]
+
+        score, desc = (
+            ENCRYPTION_AT_REST_TRUE
+            if self.encryption_at_rest
+            else ENCRYPTION_AT_REST_FALSE
+        )
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _tos_score(self) -> Score:
+        TOS_SET = (5, D_("Terms of Service are linked."))
+        TOS_UNSET = (-5, D_("Terms of Service are not linked."))
+        ALL = [TOS_SET, TOS_UNSET]
+        score, desc = TOS_SET if self.tos else TOS_UNSET
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _data_export_score(self) -> Score | None:
+        if self.data_export is None:
+            return None
+
+        DATA_EXPORT_TRUE = (15, D_("Users can export their data."))
+        DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data."))
+        ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE]
+
+        score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _full_erasure_score(self) -> Score | None:
+        if self.full_erasure is None:
+            return None
+
+        FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data."))
+        FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data."))
+        ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE]
+
+        score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _backup_frequency_score(self) -> Score | None:
+        if self.backup_frequency is None:
+            return None
+
+        BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups."))
+        BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s)."))
+        BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}."))
+        ALL = [
+            BACKUP_FREQUENCY_0,
+            BACKUP_FREQUENCY_DEFAULT,
+            BACKUP_FREQUENCY_INVALID,
+        ]
+
+        value = self.backup_frequency
+        if value == "0":
+            score, desc = BACKUP_FREQUENCY_0
+        else:
+            try:
+                hours = int(value)
+                days = hours / 24
+                desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days)
+                score = BACKUP_FREQUENCY_DEFAULT[0]
+            except ValueError:
+                score, desc = BACKUP_FREQUENCY_INVALID
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _backup_retention_score(self) -> Score | None:
+        if self.backup_retention is None:
+            return None
+
+        BACKUP_RETENTION_0 = (0, D_("No backups are done."))
+        BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely."))
+        BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown."))
+        BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s)."))
+        BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}."))
+        ALL = [
+            BACKUP_RETENTION_0,
+            BACKUP_RETENTION_INFINITE,
+            BACKUP_RETENTION_UNKNOWN,
+            BACKUP_RETENTION_DEFAULT,
+            BACKUP_RETENTION_INVALID,
+        ]
+
+        value = self.backup_retention
+        if value == "0":
+            score, desc = BACKUP_RETENTION_0
+        elif value == "infinite":
+            score, desc = BACKUP_RETENTION_INFINITE
+        elif value == "unknown":
+            score, desc = BACKUP_RETENTION_UNKNOWN
+        else:
+            try:
+                hours = int(value)
+                days = hours / 24
+                desc = BACKUP_RETENTION_DEFAULT[1].format(days=days)
+                score = BACKUP_RETENTION_DEFAULT[0]
+            except ValueError:
+                score, desc = BACKUP_RETENTION_INVALID
+
+        return Score(
+            score=score,
+            minimum=min(score for score, _ in ALL),
+            maximum=max(score for score, _ in ALL),
+            detail=[IndividualScore(score=score, description=desc)],
+        )
+
+    @property
+    def _access_policy_score(self) -> Score | None:
+        if self.access_policy is None:
+            return None
+
+        total_score = 0
+        overall_min = 0
+        overall_max = 0
+        details = []
+
+        for policy in self.access_policy:
+            policy_score = AccessPolicy.get_score(policy)
+            total_score += policy_score.score
+            overall_min += policy_score.minimum
+            overall_max += policy_score.maximum
+            details.extend(policy_score.detail)
+
+        return Score(
+            score=total_score, minimum=overall_min, maximum=overall_max, detail=details
+        )
+
+    @property
+    def _extra_info_score(self) -> Score | None:
+        if self.extra_info is None:
+            return None
+
+        return Score(score=0, minimum=0, maximum=0, detail=[])
+
+    @classmethod
+    def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None":
+        """Create a DataPolicy instance from a Wokkel Data Form.
+
+        @param form: The data form to parse.
+        @return: Parsed DataPolicy instance or None if form type doesn't match.
+        """
+        if not form.formNamespace or not form.formNamespace.startswith(
+            NS_DATA_POLICY_BASE
+        ):
+            return None
+
+        fields = cls.model_fields.keys()
+        kwargs = {}
+
+        for name in fields:
+            if name not in form:
+                continue
+
+            value = form.get(name)
+
+            match name:
+                case "access_policy":
+                    if not value:
+                        continue
+
+                    policy_set = set()
+                    policies = [value] if not isinstance(value, list) else value
+                    for policy in policies:
+                        if policy:
+                            policy_set.add(AccessPolicy(policy))
+                    if policy_set:
+                        kwargs[name] = policy_set
+
+                case "auth_data":
+                    if value:
+                        kwargs[name] = AuthMechanism(value)
+
+                case "data_transmission":
+                    if value:
+                        kwargs[name] = DataTransmission(value)
+
+                case "extra_info":
+                    if isinstance(value, list):
+                        kwargs[name] = "\n".join(str(line) for line in value)
+                    else:
+                        kwargs[name] = str(value)
+
+                case _:
+                    kwargs[name] = value
+
+        return cls(**kwargs)
+
+    def to_data_form(
+        self, category: str | None = None, type_: str | None = None
+    ) -> data_form.Form:
+        """Convert this model to a Wokkel Data Form.
+
+        @return: Form with type='result' containing all non-None fields.
+        """
+        if category is not None:
+            if type_ is not None:
+                raise exceptions.InternalError(
+                    'If "category" is set, "type_" must be set too.'
+                )
+            form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_)
+        else:
+            form_ns = NS_DATA_POLICY
+        form_fields = []
+
+        for name in self.__class__.model_fields.keys():
+            value = getattr(self, name)
+            if value is None:
+                continue
+
+            match name:
+                case "auth_data" | "data_transmission":
+                    form_fields.append(
+                        data_form.Field(
+                            fieldType="list-single",
+                            var=name,
+                            value=value.value,
+                        )
+                    )
+                case (
+                    "data_deletion"
+                    | "encryption_at_rest"
+                    | "data_export"
+                    | "full_erasure"
+                ):
+                    form_fields.append(
+                        data_form.Field(
+                            fieldType="boolean",
+                            var=name,
+                            value=value,
+                        )
+                    )
+                case "access_policy":
+                    field_values = [policy.value for policy in value]
+                    form_fields.append(
+                        data_form.Field(
+                            fieldType="list-multi",
+                            var=name,
+                            values=field_values,
+                        )
+                    )
+                case "extra_info":
+                    lines = value.split("\n")
+                    form_fields.append(
+                        data_form.Field(
+                            fieldType="text-multi",
+                            var=name,
+                            values=lines,
+                        )
+                    )
+                case _:
+                    form_fields.append(
+                        data_form.Field(
+                            fieldType="text-single",
+                            var=name,
+                            value=str(value),
+                        )
+                    )
+
+        return data_form.Form(
+            formType="result",
+            formNamespace=form_ns,
+            fields=form_fields,
+        )
+
+
+class DataPolicies(BaseModel):
+    main: DataPolicy
+    services: dict[str, DataPolicy] = Field(
+        default=dict(),
+        description=(
+            "Identity to data policy map. Identity is used as key with the "
+            'template "{identity}:{type}".'
+        ),
+    )
+
+
+class DATA_POLICY:
+    namespace = NS_DATA_POLICY
+
+    def __init__(self, host: "LiberviaBackend") -> None:
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self.host = host
+        host.register_namespace("data-policy", NS_DATA_POLICY)
+        host.bridge.add_method(
+            "data_policy_get",
+            ".plugin",
+            in_sign="ss",
+            out_sign="s",
+            method=self._data_policy_get,
+            async_=True,
+        )
+
+    def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]:
+        client = self.host.get_client(profile_key)
+        d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target)))
+        d.addCallback(
+            lambda data_policies: (
+                "" if data_policies is None else data_policies.model_dump_json()
+            )
+        )
+        d = cast(defer.Deferred[str], d)
+        return d
+
+    async def get_data_policy(
+        self, client: SatXMPPEntity, target_jid: jid.JID
+    ) -> DataPolicies | None:
+        infos = await self.host.memory.disco.get_infos(client, target_jid)
+
+        if NS_DATA_POLICY not in infos.extensions:
+            return None
+
+        # Main data policy.
+        data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY])
+        if data_policy is None:
+            log.error(f"DataPolicy should be found at this point.")
+            return None
+
+        data_policies = DataPolicies(main=data_policy)
+
+        # Now we looks for identities data policies.
+        for namespace, form in infos.extensions.items():
+            if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith(
+                NS_DATA_POLICY_ID_SUFFIX
+            ):
+                identity_data = namespace[
+                    len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX)
+                ]
+                try:
+                    category, type_ = identity_data.split(":", 1)
+                except ValueError:
+                    log.warning(
+                        "Invalid namespace for identity data policy: " f"{namespace!r}"
+                    )
+                else:
+                    id_data_policy = DataPolicy.from_data_form(form)
+                    if id_data_policy is not None:
+                        data_policies.services[f"{category}:{type_}"] = id_data_policy
+        return data_policies