view libervia/backend/plugins/plugin_exp_data_policy.py @ 4379:79d463e3fdeb

core (types): Renamed `libervia.backend.core.core_types.MessageData` to `MessageDataLegacy`: There are 2 concurrent models, this one is an historical legacy. The type to use the one from `libervia.backend.models.core`, and it will progressively be used everywhere and fully based on Pydantic.
author Goffi <goffi@goffi.org>
date Fri, 04 Jul 2025 12:28:00 +0200
parents 930a4ea7ab6f
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for handling stateless file sharing encryption
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from enum import Enum, StrEnum, auto
import enum
from typing import TYPE_CHECKING, Self, cast
from typing import get_type_hints

from pydantic import BaseModel, ConfigDict, Field, computed_field
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from wokkel import data_form

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)
IMPORT_NAME = "DATA-POLICY"

PLUGIN_INFO = {
    C.PI_NAME: "Data Policy",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_TYPE: C.PLUG_TYPE_EXP,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "DATA_POLICY",
    C.PI_HANDLER: "no",
}

NS_DATA_POLICY_BASE = "urn:xmpp:data-policy"
NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0"
NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:"
NS_DATA_POLICY_ID_SUFFIX = ":0"
NS_DATA_POLICY_ID_TPL = (
    f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}"
)


class IndividualScore(BaseModel):
    score: int
    description: str


class Score(BaseModel):
    score: int
    minimum: int
    maximum: int
    detail: list[IndividualScore]


class ScoredStrEnum(StrEnum):
    _score_map = enum.nonmember({})
    _min_score = enum.nonmember(0)
    _max_score = enum.nonmember(0)

    @classmethod
    def get_score(cls, value: str) -> Score:
        score, description = cls._score_map[value]
        return Score(
            score=score,
            minimum=cls._min_score,
            maximum=cls._max_score,
            detail=[IndividualScore(score=score, description=description)],
        )

    def __init_subclass__(cls) -> None:
        try:
            score_map = cls._score_map
        except AttributeError:
            raise exceptions.InternalError('"_score_map" must be set.')
        if not score_map:
            raise exceptions.InternalError("ScoredEnum must set _score_map.")

        if set(score_map.keys()) != set(cls):
            raise exceptions.InternalError(
                "All enum members must be present in _score_map."
            )

        all_scores = [score for score, _ in score_map.values()]
        cls._min_score = min(all_scores)
        cls._max_score = max(all_scores)


class AuthMechanism(ScoredStrEnum):
    NO_AUTH = auto()
    PLAIN = auto()
    HIDDEN = auto()
    RESTRICTED = auto()

    _score_map = enum.nonmember(
        {
            NO_AUTH: (20, D_("No authentication is needed.")),
            PLAIN: (-20, D_("Your login data are transmitted to this service.")),
            HIDDEN: (
                0,
                D_("This service logs to your account, but doesn't get logging data."),
            ),
            RESTRICTED: (
                15,
                D_("This service logs to your account in a restricted way."),
            ),
        }
    )


class DataTransmission(ScoredStrEnum):
    PLAIN = auto()
    ENCRYPTED = auto()
    E2E = auto()
    GRE = auto()

    _score_map = enum.nonmember(
        {
            PLAIN: (
                -20,
                D_(
                    "Data is transmitted without encryption. This is highly insecure and "
                    "risks data interception."
                ),
            ),
            ENCRYPTED: (
                0,
                D_(
                    "Data is encrypted during transmission but not end-to-end. The "
                    "service can view the data."
                ),
            ),
            E2E: (
                10,
                D_(
                    "Data is end-to-end encrypted from the service. Only the service and "
                    "the recipient(s) can view the data."
                ),
            ),
            GRE: (
                30,
                D_(
                    "Data uses Gateway Relayed Encryption, ensuring end-to-end security, "
                    "only your and your recipient(s) can view the data. Highly secure."
                ),
            ),
        }
    )


class AccessPolicy(ScoredStrEnum):
    ADMINS = auto()
    MODERATORS = auto()
    ORGANIZATION_MEMBER = auto()
    GOVERNMENT = auto()
    ADVERTISERS = auto()
    PARTNERS = auto()
    NONE = auto()

    _score_map = enum.nonmember(
        {
            ADMINS: (
                -5,
                D_(
                    "Service administrators can access user data for operational "
                    "purposes."
                ),
            ),
            MODERATORS: (
                -10,
                D_("Moderators can access user data within their moderation scope."),
            ),
            ORGANIZATION_MEMBER: (
                -15,
                D_("Any organization member can access user data."),
            ),
            GOVERNMENT: (
                -10,
                D_(
                    "Government authorities can access user data under legal "
                    "requirements."
                ),
            ),
            ADVERTISERS: (
                -30,
                D_("Third-party advertisers can access user data for targeted ads."),
            ),
            PARTNERS: (
                -20,
                D_("Business partners can access user data under agreements."),
            ),
            NONE: (20, D_("No entity other than the user can access user data.")),
        }
    )


class DataPolicy(BaseModel):
    """Represents a data policy form as defined in Data Policy XEP.

    Fields correspond to the data policy specification and may be None when not provided.
    """

    model_config = ConfigDict(use_enum_values=True)

    auth_data: AuthMechanism | None = None
    data_transmission: DataTransmission | None = None
    encryption_algorithm: str | None = None
    data_retention: str | None = None
    data_deletion: bool | None = None
    encryption_at_rest: bool | None = None
    tos: str | None = None
    data_export: bool | None = None
    access_policy: set[AccessPolicy] | None = None
    full_erasure: bool | None = None
    backup_frequency: str | None = None
    backup_retention: str | None = None
    extra_info: str | None = None

    @computed_field
    @property
    def score(self) -> Score:
        """Calculate a score based on the filled fields.

        This score helps assess the quality of the data policy at a glance.
        """
        total_score = 0
        overall_min = 0
        overall_max = 0
        detail = []

        fields_names = set(self.__class__.model_fields.keys())

        for field_name in fields_names:
            score = getattr(self, f"_{field_name}_score")
            if score is not None:
                total_score += score.score
                overall_min += score.minimum
                overall_max += score.maximum
                detail.extend(score.detail)

        return Score(
            score=total_score, minimum=overall_min, maximum=overall_max, detail=detail
        )

    @property
    def _auth_data_score(self) -> Score | None:
        if self.auth_data is None:
            return None

        return AuthMechanism.get_score(self.auth_data)

    @property
    def _data_transmission_score(self) -> Score | None:
        if self.data_transmission is None:
            return None

        return DataTransmission.get_score(self.data_transmission)

    @property
    def _encryption_algorithm_score(self) -> Score | None:
        ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}."))
        ENCRYPTION_ALGORITHM_UNSET = (
            -10,
            D_("The encryption algorithm is not specified."),
        )
        ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET)
        if not self.data_transmission or self.data_transmission not in (
            DataTransmission.E2E,
            DataTransmission.ENCRYPTED,
        ):
            return None

        if self.encryption_algorithm:
            score, desc = ENCRYPTION_ALGORITHM_SET
            desc = desc.format(self.encryption_algorithm)
        else:
            score, desc = ENCRYPTION_ALGORITHM_UNSET

        return Score(
            score=0,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _data_retention_score(self) -> Score | None:
        if self.data_retention is None:
            return None

        DATA_RETENTION_0 = (10, D_("The service does not store data."))
        DATA_RETENTION_INFINITE = (
            -15,
            D_("Data is stored indefinitely, which may pose privacy risks."),
        )
        DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown."))
        DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s)."))
        DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r})."))
        ALL = [
            DATA_RETENTION_0,
            DATA_RETENTION_INFINITE,
            DATA_RETENTION_UNKNOWN,
            DATA_RETENTION_DEFAULT,
            DATA_RETENTION_INVALID,
        ]

        value = self.data_retention
        if value == "0":
            score, desc = DATA_RETENTION_0
        elif value == "infinite":
            score, desc = DATA_RETENTION_INFINITE
        elif value == "unknown":
            score, desc = DATA_RETENTION_UNKNOWN
        else:
            try:
                hours = int(value)
                days = hours / 24
                desc = DATA_RETENTION_DEFAULT[1].format(days=days)
                score = DATA_RETENTION_DEFAULT[0]
            except ValueError:
                score, desc = DATA_RETENTION_INVALID

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _data_deletion_score(self) -> Score | None:
        if self.data_deletion is None:
            return None

        DATA_DELETION_TRUE = (20, D_("Users can delete data on this service."))
        DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service."))
        ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE]

        score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _encryption_at_rest_score(self) -> Score | None:
        if self.encryption_at_rest is None:
            return None

        ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest."))
        ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest."))
        ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE]

        score, desc = (
            ENCRYPTION_AT_REST_TRUE
            if self.encryption_at_rest
            else ENCRYPTION_AT_REST_FALSE
        )

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _tos_score(self) -> Score:
        TOS_SET = (5, D_("Terms of Service are linked."))
        TOS_UNSET = (-5, D_("Terms of Service are not linked."))
        ALL = [TOS_SET, TOS_UNSET]
        score, desc = TOS_SET if self.tos else TOS_UNSET

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _data_export_score(self) -> Score | None:
        if self.data_export is None:
            return None

        DATA_EXPORT_TRUE = (15, D_("Users can export their data."))
        DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data."))
        ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE]

        score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _full_erasure_score(self) -> Score | None:
        if self.full_erasure is None:
            return None

        FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data."))
        FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data."))
        ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE]

        score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _backup_frequency_score(self) -> Score | None:
        if self.backup_frequency is None:
            return None

        BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups."))
        BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s)."))
        BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}."))
        ALL = [
            BACKUP_FREQUENCY_0,
            BACKUP_FREQUENCY_DEFAULT,
            BACKUP_FREQUENCY_INVALID,
        ]

        value = self.backup_frequency
        if value == "0":
            score, desc = BACKUP_FREQUENCY_0
        else:
            try:
                hours = int(value)
                days = hours / 24
                desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days)
                score = BACKUP_FREQUENCY_DEFAULT[0]
            except ValueError:
                score, desc = BACKUP_FREQUENCY_INVALID

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _backup_retention_score(self) -> Score | None:
        if self.backup_retention is None:
            return None

        BACKUP_RETENTION_0 = (0, D_("No backups are done."))
        BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely."))
        BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown."))
        BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s)."))
        BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}."))
        ALL = [
            BACKUP_RETENTION_0,
            BACKUP_RETENTION_INFINITE,
            BACKUP_RETENTION_UNKNOWN,
            BACKUP_RETENTION_DEFAULT,
            BACKUP_RETENTION_INVALID,
        ]

        value = self.backup_retention
        if value == "0":
            score, desc = BACKUP_RETENTION_0
        elif value == "infinite":
            score, desc = BACKUP_RETENTION_INFINITE
        elif value == "unknown":
            score, desc = BACKUP_RETENTION_UNKNOWN
        else:
            try:
                hours = int(value)
                days = hours / 24
                desc = BACKUP_RETENTION_DEFAULT[1].format(days=days)
                score = BACKUP_RETENTION_DEFAULT[0]
            except ValueError:
                score, desc = BACKUP_RETENTION_INVALID

        return Score(
            score=score,
            minimum=min(score for score, _ in ALL),
            maximum=max(score for score, _ in ALL),
            detail=[IndividualScore(score=score, description=desc)],
        )

    @property
    def _access_policy_score(self) -> Score | None:
        if self.access_policy is None:
            return None

        total_score = 0
        overall_min = 0
        overall_max = 0
        details = []

        for policy in self.access_policy:
            policy_score = AccessPolicy.get_score(policy)
            total_score += policy_score.score
            overall_min += policy_score.minimum
            overall_max += policy_score.maximum
            details.extend(policy_score.detail)

        return Score(
            score=total_score, minimum=overall_min, maximum=overall_max, detail=details
        )

    @property
    def _extra_info_score(self) -> Score | None:
        if self.extra_info is None:
            return None

        return Score(score=0, minimum=0, maximum=0, detail=[])

    @classmethod
    def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None":
        """Create a DataPolicy instance from a Wokkel Data Form.

        @param form: The data form to parse.
        @return: Parsed DataPolicy instance or None if form type doesn't match.
        """
        if not form.formNamespace or not form.formNamespace.startswith(
            NS_DATA_POLICY_BASE
        ):
            return None

        fields = cls.model_fields.keys()
        kwargs = {}

        for name in fields:
            if name not in form:
                continue

            value = form.get(name)

            match name:
                case "access_policy":
                    if not value:
                        continue

                    policy_set = set()
                    policies = [value] if not isinstance(value, list) else value
                    for policy in policies:
                        if policy:
                            policy_set.add(AccessPolicy(policy))
                    if policy_set:
                        kwargs[name] = policy_set

                case "auth_data":
                    if value:
                        kwargs[name] = AuthMechanism(value)

                case "data_transmission":
                    if value:
                        kwargs[name] = DataTransmission(value)

                case "extra_info":
                    if isinstance(value, list):
                        kwargs[name] = "\n".join(str(line) for line in value)
                    else:
                        kwargs[name] = str(value)

                case _:
                    kwargs[name] = value

        return cls(**kwargs)

    def to_data_form(
        self, category: str | None = None, type_: str | None = None
    ) -> data_form.Form:
        """Convert this model to a Wokkel Data Form.

        @return: Form with type='result' containing all non-None fields.
        """
        if category is not None:
            if type_ is not None:
                raise exceptions.InternalError(
                    'If "category" is set, "type_" must be set too.'
                )
            form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_)
        else:
            form_ns = NS_DATA_POLICY
        form_fields = []

        for name in self.__class__.model_fields.keys():
            value = getattr(self, name)
            if value is None:
                continue

            match name:
                case "auth_data" | "data_transmission":
                    form_fields.append(
                        data_form.Field(
                            fieldType="list-single",
                            var=name,
                            value=value.value,
                        )
                    )
                case (
                    "data_deletion"
                    | "encryption_at_rest"
                    | "data_export"
                    | "full_erasure"
                ):
                    form_fields.append(
                        data_form.Field(
                            fieldType="boolean",
                            var=name,
                            value=value,
                        )
                    )
                case "access_policy":
                    field_values = [policy.value for policy in value]
                    form_fields.append(
                        data_form.Field(
                            fieldType="list-multi",
                            var=name,
                            values=field_values,
                        )
                    )
                case "extra_info":
                    lines = value.split("\n")
                    form_fields.append(
                        data_form.Field(
                            fieldType="text-multi",
                            var=name,
                            values=lines,
                        )
                    )
                case _:
                    form_fields.append(
                        data_form.Field(
                            fieldType="text-single",
                            var=name,
                            value=str(value),
                        )
                    )

        return data_form.Form(
            formType="result",
            formNamespace=form_ns,
            fields=form_fields,
        )


class DataPolicies(BaseModel):
    main: DataPolicy
    services: dict[str, DataPolicy] = Field(
        default=dict(),
        description=(
            "Identity to data policy map. Identity is used as key with the "
            'template "{identity}:{type}".'
        ),
    )


class DATA_POLICY:
    namespace = NS_DATA_POLICY

    def __init__(self, host: "LiberviaBackend") -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.register_namespace("data-policy", NS_DATA_POLICY)
        host.bridge.add_method(
            "data_policy_get",
            ".plugin",
            in_sign="ss",
            out_sign="s",
            method=self._data_policy_get,
            async_=True,
        )

    def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]:
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target)))
        d.addCallback(
            lambda data_policies: (
                "" if data_policies is None else data_policies.model_dump_json()
            )
        )
        d = cast(defer.Deferred[str], d)
        return d

    async def get_data_policy(
        self, client: SatXMPPEntity, target_jid: jid.JID
    ) -> DataPolicies | None:
        infos = await self.host.memory.disco.get_infos(client, target_jid)

        if NS_DATA_POLICY not in infos.extensions:
            return None

        # Main data policy.
        data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY])
        if data_policy is None:
            log.error(f"DataPolicy should be found at this point.")
            return None

        data_policies = DataPolicies(main=data_policy)

        # Now we looks for identities data policies.
        for namespace, form in infos.extensions.items():
            if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith(
                NS_DATA_POLICY_ID_SUFFIX
            ):
                identity_data = namespace[
                    len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX)
                ]
                try:
                    category, type_ = identity_data.split(":", 1)
                except ValueError:
                    log.warning(
                        "Invalid namespace for identity data policy: " f"{namespace!r}"
                    )
                else:
                    id_data_policy = DataPolicy.from_data_form(form)
                    if id_data_policy is not None:
                        data_policies.services[f"{category}:{type_}"] = id_data_policy
        return data_policies