comparison libervia/backend/plugins/plugin_exp_data_policy.py @ 4378:930a4ea7ab6f

plugin data policy: Data Policy implementation: This plugin implement data policy parsing and an algorithm to calculate a score based on them. rel 460
author Goffi <goffi@goffi.org>
date Thu, 26 Jun 2025 17:02:33 +0200
parents
children
comparison
equal deleted inserted replaced
4377:448d701187b8 4378:930a4ea7ab6f
1 #!/usr/bin/env python3
2
3 # Libervia plugin for handling stateless file sharing encryption
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from enum import Enum, StrEnum, auto
21 import enum
22 from typing import TYPE_CHECKING, Self, cast
23 from typing import get_type_hints
24
25 from pydantic import BaseModel, ConfigDict, Field, computed_field
26 from twisted.internet import defer
27 from twisted.words.protocols.jabber import jid
28 from wokkel import data_form
29
30 from libervia.backend.core import exceptions
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.core.core_types import SatXMPPEntity
33 from libervia.backend.core.i18n import D_, _
34 from libervia.backend.core.log import getLogger
35
36 if TYPE_CHECKING:
37 from libervia.backend.core.main import LiberviaBackend
38
39 log = getLogger(__name__)
40 IMPORT_NAME = "DATA-POLICY"
41
42 PLUGIN_INFO = {
43 C.PI_NAME: "Data Policy",
44 C.PI_IMPORT_NAME: IMPORT_NAME,
45 C.PI_TYPE: C.PLUG_TYPE_EXP,
46 C.PI_PROTOCOLS: [],
47 C.PI_DEPENDENCIES: [],
48 C.PI_MAIN: "DATA_POLICY",
49 C.PI_HANDLER: "no",
50 }
51
52 NS_DATA_POLICY_BASE = "urn:xmpp:data-policy"
53 NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0"
54 NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:"
55 NS_DATA_POLICY_ID_SUFFIX = ":0"
56 NS_DATA_POLICY_ID_TPL = (
57 f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}"
58 )
59
60
61 class IndividualScore(BaseModel):
62 score: int
63 description: str
64
65
66 class Score(BaseModel):
67 score: int
68 minimum: int
69 maximum: int
70 detail: list[IndividualScore]
71
72
73 class ScoredStrEnum(StrEnum):
74 _score_map = enum.nonmember({})
75 _min_score = enum.nonmember(0)
76 _max_score = enum.nonmember(0)
77
78 @classmethod
79 def get_score(cls, value: str) -> Score:
80 score, description = cls._score_map[value]
81 return Score(
82 score=score,
83 minimum=cls._min_score,
84 maximum=cls._max_score,
85 detail=[IndividualScore(score=score, description=description)],
86 )
87
88 def __init_subclass__(cls) -> None:
89 try:
90 score_map = cls._score_map
91 except AttributeError:
92 raise exceptions.InternalError('"_score_map" must be set.')
93 if not score_map:
94 raise exceptions.InternalError("ScoredEnum must set _score_map.")
95
96 if set(score_map.keys()) != set(cls):
97 raise exceptions.InternalError(
98 "All enum members must be present in _score_map."
99 )
100
101 all_scores = [score for score, _ in score_map.values()]
102 cls._min_score = min(all_scores)
103 cls._max_score = max(all_scores)
104
105
106 class AuthMechanism(ScoredStrEnum):
107 NO_AUTH = auto()
108 PLAIN = auto()
109 HIDDEN = auto()
110 RESTRICTED = auto()
111
112 _score_map = enum.nonmember(
113 {
114 NO_AUTH: (20, D_("No authentication is needed.")),
115 PLAIN: (-20, D_("Your login data are transmitted to this service.")),
116 HIDDEN: (
117 0,
118 D_("This service logs to your account, but doesn't get logging data."),
119 ),
120 RESTRICTED: (
121 15,
122 D_("This service logs to your account in a restricted way."),
123 ),
124 }
125 )
126
127
128 class DataTransmission(ScoredStrEnum):
129 PLAIN = auto()
130 ENCRYPTED = auto()
131 E2E = auto()
132 GRE = auto()
133
134 _score_map = enum.nonmember(
135 {
136 PLAIN: (
137 -20,
138 D_(
139 "Data is transmitted without encryption. This is highly insecure and "
140 "risks data interception."
141 ),
142 ),
143 ENCRYPTED: (
144 0,
145 D_(
146 "Data is encrypted during transmission but not end-to-end. The "
147 "service can view the data."
148 ),
149 ),
150 E2E: (
151 10,
152 D_(
153 "Data is end-to-end encrypted from the service. Only the service and "
154 "the recipient(s) can view the data."
155 ),
156 ),
157 GRE: (
158 30,
159 D_(
160 "Data uses Gateway Relayed Encryption, ensuring end-to-end security, "
161 "only your and your recipient(s) can view the data. Highly secure."
162 ),
163 ),
164 }
165 )
166
167
168 class AccessPolicy(ScoredStrEnum):
169 ADMINS = auto()
170 MODERATORS = auto()
171 ORGANIZATION_MEMBER = auto()
172 GOVERNMENT = auto()
173 ADVERTISERS = auto()
174 PARTNERS = auto()
175 NONE = auto()
176
177 _score_map = enum.nonmember(
178 {
179 ADMINS: (
180 -5,
181 D_(
182 "Service administrators can access user data for operational "
183 "purposes."
184 ),
185 ),
186 MODERATORS: (
187 -10,
188 D_("Moderators can access user data within their moderation scope."),
189 ),
190 ORGANIZATION_MEMBER: (
191 -15,
192 D_("Any organization member can access user data."),
193 ),
194 GOVERNMENT: (
195 -10,
196 D_(
197 "Government authorities can access user data under legal "
198 "requirements."
199 ),
200 ),
201 ADVERTISERS: (
202 -30,
203 D_("Third-party advertisers can access user data for targeted ads."),
204 ),
205 PARTNERS: (
206 -20,
207 D_("Business partners can access user data under agreements."),
208 ),
209 NONE: (20, D_("No entity other than the user can access user data.")),
210 }
211 )
212
213
214 class DataPolicy(BaseModel):
215 """Represents a data policy form as defined in Data Policy XEP.
216
217 Fields correspond to the data policy specification and may be None when not provided.
218 """
219
220 model_config = ConfigDict(use_enum_values=True)
221
222 auth_data: AuthMechanism | None = None
223 data_transmission: DataTransmission | None = None
224 encryption_algorithm: str | None = None
225 data_retention: str | None = None
226 data_deletion: bool | None = None
227 encryption_at_rest: bool | None = None
228 tos: str | None = None
229 data_export: bool | None = None
230 access_policy: set[AccessPolicy] | None = None
231 full_erasure: bool | None = None
232 backup_frequency: str | None = None
233 backup_retention: str | None = None
234 extra_info: str | None = None
235
236 @computed_field
237 @property
238 def score(self) -> Score:
239 """Calculate a score based on the filled fields.
240
241 This score helps assess the quality of the data policy at a glance.
242 """
243 total_score = 0
244 overall_min = 0
245 overall_max = 0
246 detail = []
247
248 fields_names = set(self.__class__.model_fields.keys())
249
250 for field_name in fields_names:
251 score = getattr(self, f"_{field_name}_score")
252 if score is not None:
253 total_score += score.score
254 overall_min += score.minimum
255 overall_max += score.maximum
256 detail.extend(score.detail)
257
258 return Score(
259 score=total_score, minimum=overall_min, maximum=overall_max, detail=detail
260 )
261
262 @property
263 def _auth_data_score(self) -> Score | None:
264 if self.auth_data is None:
265 return None
266
267 return AuthMechanism.get_score(self.auth_data)
268
269 @property
270 def _data_transmission_score(self) -> Score | None:
271 if self.data_transmission is None:
272 return None
273
274 return DataTransmission.get_score(self.data_transmission)
275
276 @property
277 def _encryption_algorithm_score(self) -> Score | None:
278 ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}."))
279 ENCRYPTION_ALGORITHM_UNSET = (
280 -10,
281 D_("The encryption algorithm is not specified."),
282 )
283 ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET)
284 if not self.data_transmission or self.data_transmission not in (
285 DataTransmission.E2E,
286 DataTransmission.ENCRYPTED,
287 ):
288 return None
289
290 if self.encryption_algorithm:
291 score, desc = ENCRYPTION_ALGORITHM_SET
292 desc = desc.format(self.encryption_algorithm)
293 else:
294 score, desc = ENCRYPTION_ALGORITHM_UNSET
295
296 return Score(
297 score=0,
298 minimum=min(score for score, _ in ALL),
299 maximum=max(score for score, _ in ALL),
300 detail=[IndividualScore(score=score, description=desc)],
301 )
302
303 @property
304 def _data_retention_score(self) -> Score | None:
305 if self.data_retention is None:
306 return None
307
308 DATA_RETENTION_0 = (10, D_("The service does not store data."))
309 DATA_RETENTION_INFINITE = (
310 -15,
311 D_("Data is stored indefinitely, which may pose privacy risks."),
312 )
313 DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown."))
314 DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s)."))
315 DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r})."))
316 ALL = [
317 DATA_RETENTION_0,
318 DATA_RETENTION_INFINITE,
319 DATA_RETENTION_UNKNOWN,
320 DATA_RETENTION_DEFAULT,
321 DATA_RETENTION_INVALID,
322 ]
323
324 value = self.data_retention
325 if value == "0":
326 score, desc = DATA_RETENTION_0
327 elif value == "infinite":
328 score, desc = DATA_RETENTION_INFINITE
329 elif value == "unknown":
330 score, desc = DATA_RETENTION_UNKNOWN
331 else:
332 try:
333 hours = int(value)
334 days = hours / 24
335 desc = DATA_RETENTION_DEFAULT[1].format(days=days)
336 score = DATA_RETENTION_DEFAULT[0]
337 except ValueError:
338 score, desc = DATA_RETENTION_INVALID
339
340 return Score(
341 score=score,
342 minimum=min(score for score, _ in ALL),
343 maximum=max(score for score, _ in ALL),
344 detail=[IndividualScore(score=score, description=desc)],
345 )
346
347 @property
348 def _data_deletion_score(self) -> Score | None:
349 if self.data_deletion is None:
350 return None
351
352 DATA_DELETION_TRUE = (20, D_("Users can delete data on this service."))
353 DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service."))
354 ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE]
355
356 score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE
357
358 return Score(
359 score=score,
360 minimum=min(score for score, _ in ALL),
361 maximum=max(score for score, _ in ALL),
362 detail=[IndividualScore(score=score, description=desc)],
363 )
364
365 @property
366 def _encryption_at_rest_score(self) -> Score | None:
367 if self.encryption_at_rest is None:
368 return None
369
370 ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest."))
371 ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest."))
372 ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE]
373
374 score, desc = (
375 ENCRYPTION_AT_REST_TRUE
376 if self.encryption_at_rest
377 else ENCRYPTION_AT_REST_FALSE
378 )
379
380 return Score(
381 score=score,
382 minimum=min(score for score, _ in ALL),
383 maximum=max(score for score, _ in ALL),
384 detail=[IndividualScore(score=score, description=desc)],
385 )
386
387 @property
388 def _tos_score(self) -> Score:
389 TOS_SET = (5, D_("Terms of Service are linked."))
390 TOS_UNSET = (-5, D_("Terms of Service are not linked."))
391 ALL = [TOS_SET, TOS_UNSET]
392 score, desc = TOS_SET if self.tos else TOS_UNSET
393
394 return Score(
395 score=score,
396 minimum=min(score for score, _ in ALL),
397 maximum=max(score for score, _ in ALL),
398 detail=[IndividualScore(score=score, description=desc)],
399 )
400
401 @property
402 def _data_export_score(self) -> Score | None:
403 if self.data_export is None:
404 return None
405
406 DATA_EXPORT_TRUE = (15, D_("Users can export their data."))
407 DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data."))
408 ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE]
409
410 score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE
411
412 return Score(
413 score=score,
414 minimum=min(score for score, _ in ALL),
415 maximum=max(score for score, _ in ALL),
416 detail=[IndividualScore(score=score, description=desc)],
417 )
418
419 @property
420 def _full_erasure_score(self) -> Score | None:
421 if self.full_erasure is None:
422 return None
423
424 FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data."))
425 FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data."))
426 ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE]
427
428 score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE
429
430 return Score(
431 score=score,
432 minimum=min(score for score, _ in ALL),
433 maximum=max(score for score, _ in ALL),
434 detail=[IndividualScore(score=score, description=desc)],
435 )
436
437 @property
438 def _backup_frequency_score(self) -> Score | None:
439 if self.backup_frequency is None:
440 return None
441
442 BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups."))
443 BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s)."))
444 BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}."))
445 ALL = [
446 BACKUP_FREQUENCY_0,
447 BACKUP_FREQUENCY_DEFAULT,
448 BACKUP_FREQUENCY_INVALID,
449 ]
450
451 value = self.backup_frequency
452 if value == "0":
453 score, desc = BACKUP_FREQUENCY_0
454 else:
455 try:
456 hours = int(value)
457 days = hours / 24
458 desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days)
459 score = BACKUP_FREQUENCY_DEFAULT[0]
460 except ValueError:
461 score, desc = BACKUP_FREQUENCY_INVALID
462
463 return Score(
464 score=score,
465 minimum=min(score for score, _ in ALL),
466 maximum=max(score for score, _ in ALL),
467 detail=[IndividualScore(score=score, description=desc)],
468 )
469
470 @property
471 def _backup_retention_score(self) -> Score | None:
472 if self.backup_retention is None:
473 return None
474
475 BACKUP_RETENTION_0 = (0, D_("No backups are done."))
476 BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely."))
477 BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown."))
478 BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s)."))
479 BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}."))
480 ALL = [
481 BACKUP_RETENTION_0,
482 BACKUP_RETENTION_INFINITE,
483 BACKUP_RETENTION_UNKNOWN,
484 BACKUP_RETENTION_DEFAULT,
485 BACKUP_RETENTION_INVALID,
486 ]
487
488 value = self.backup_retention
489 if value == "0":
490 score, desc = BACKUP_RETENTION_0
491 elif value == "infinite":
492 score, desc = BACKUP_RETENTION_INFINITE
493 elif value == "unknown":
494 score, desc = BACKUP_RETENTION_UNKNOWN
495 else:
496 try:
497 hours = int(value)
498 days = hours / 24
499 desc = BACKUP_RETENTION_DEFAULT[1].format(days=days)
500 score = BACKUP_RETENTION_DEFAULT[0]
501 except ValueError:
502 score, desc = BACKUP_RETENTION_INVALID
503
504 return Score(
505 score=score,
506 minimum=min(score for score, _ in ALL),
507 maximum=max(score for score, _ in ALL),
508 detail=[IndividualScore(score=score, description=desc)],
509 )
510
511 @property
512 def _access_policy_score(self) -> Score | None:
513 if self.access_policy is None:
514 return None
515
516 total_score = 0
517 overall_min = 0
518 overall_max = 0
519 details = []
520
521 for policy in self.access_policy:
522 policy_score = AccessPolicy.get_score(policy)
523 total_score += policy_score.score
524 overall_min += policy_score.minimum
525 overall_max += policy_score.maximum
526 details.extend(policy_score.detail)
527
528 return Score(
529 score=total_score, minimum=overall_min, maximum=overall_max, detail=details
530 )
531
532 @property
533 def _extra_info_score(self) -> Score | None:
534 if self.extra_info is None:
535 return None
536
537 return Score(score=0, minimum=0, maximum=0, detail=[])
538
539 @classmethod
540 def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None":
541 """Create a DataPolicy instance from a Wokkel Data Form.
542
543 @param form: The data form to parse.
544 @return: Parsed DataPolicy instance or None if form type doesn't match.
545 """
546 if not form.formNamespace or not form.formNamespace.startswith(
547 NS_DATA_POLICY_BASE
548 ):
549 return None
550
551 fields = cls.model_fields.keys()
552 kwargs = {}
553
554 for name in fields:
555 if name not in form:
556 continue
557
558 value = form.get(name)
559
560 match name:
561 case "access_policy":
562 if not value:
563 continue
564
565 policy_set = set()
566 policies = [value] if not isinstance(value, list) else value
567 for policy in policies:
568 if policy:
569 policy_set.add(AccessPolicy(policy))
570 if policy_set:
571 kwargs[name] = policy_set
572
573 case "auth_data":
574 if value:
575 kwargs[name] = AuthMechanism(value)
576
577 case "data_transmission":
578 if value:
579 kwargs[name] = DataTransmission(value)
580
581 case "extra_info":
582 if isinstance(value, list):
583 kwargs[name] = "\n".join(str(line) for line in value)
584 else:
585 kwargs[name] = str(value)
586
587 case _:
588 kwargs[name] = value
589
590 return cls(**kwargs)
591
592 def to_data_form(
593 self, category: str | None = None, type_: str | None = None
594 ) -> data_form.Form:
595 """Convert this model to a Wokkel Data Form.
596
597 @return: Form with type='result' containing all non-None fields.
598 """
599 if category is not None:
600 if type_ is not None:
601 raise exceptions.InternalError(
602 'If "category" is set, "type_" must be set too.'
603 )
604 form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_)
605 else:
606 form_ns = NS_DATA_POLICY
607 form_fields = []
608
609 for name in self.__class__.model_fields.keys():
610 value = getattr(self, name)
611 if value is None:
612 continue
613
614 match name:
615 case "auth_data" | "data_transmission":
616 form_fields.append(
617 data_form.Field(
618 fieldType="list-single",
619 var=name,
620 value=value.value,
621 )
622 )
623 case (
624 "data_deletion"
625 | "encryption_at_rest"
626 | "data_export"
627 | "full_erasure"
628 ):
629 form_fields.append(
630 data_form.Field(
631 fieldType="boolean",
632 var=name,
633 value=value,
634 )
635 )
636 case "access_policy":
637 field_values = [policy.value for policy in value]
638 form_fields.append(
639 data_form.Field(
640 fieldType="list-multi",
641 var=name,
642 values=field_values,
643 )
644 )
645 case "extra_info":
646 lines = value.split("\n")
647 form_fields.append(
648 data_form.Field(
649 fieldType="text-multi",
650 var=name,
651 values=lines,
652 )
653 )
654 case _:
655 form_fields.append(
656 data_form.Field(
657 fieldType="text-single",
658 var=name,
659 value=str(value),
660 )
661 )
662
663 return data_form.Form(
664 formType="result",
665 formNamespace=form_ns,
666 fields=form_fields,
667 )
668
669
670 class DataPolicies(BaseModel):
671 main: DataPolicy
672 services: dict[str, DataPolicy] = Field(
673 default=dict(),
674 description=(
675 "Identity to data policy map. Identity is used as key with the "
676 'template "{identity}:{type}".'
677 ),
678 )
679
680
681 class DATA_POLICY:
682 namespace = NS_DATA_POLICY
683
684 def __init__(self, host: "LiberviaBackend") -> None:
685 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
686 self.host = host
687 host.register_namespace("data-policy", NS_DATA_POLICY)
688 host.bridge.add_method(
689 "data_policy_get",
690 ".plugin",
691 in_sign="ss",
692 out_sign="s",
693 method=self._data_policy_get,
694 async_=True,
695 )
696
697 def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]:
698 client = self.host.get_client(profile_key)
699 d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target)))
700 d.addCallback(
701 lambda data_policies: (
702 "" if data_policies is None else data_policies.model_dump_json()
703 )
704 )
705 d = cast(defer.Deferred[str], d)
706 return d
707
708 async def get_data_policy(
709 self, client: SatXMPPEntity, target_jid: jid.JID
710 ) -> DataPolicies | None:
711 infos = await self.host.memory.disco.get_infos(client, target_jid)
712
713 if NS_DATA_POLICY not in infos.extensions:
714 return None
715
716 # Main data policy.
717 data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY])
718 if data_policy is None:
719 log.error(f"DataPolicy should be found at this point.")
720 return None
721
722 data_policies = DataPolicies(main=data_policy)
723
724 # Now we looks for identities data policies.
725 for namespace, form in infos.extensions.items():
726 if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith(
727 NS_DATA_POLICY_ID_SUFFIX
728 ):
729 identity_data = namespace[
730 len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX)
731 ]
732 try:
733 category, type_ = identity_data.split(":", 1)
734 except ValueError:
735 log.warning(
736 "Invalid namespace for identity data policy: " f"{namespace!r}"
737 )
738 else:
739 id_data_policy = DataPolicy.from_data_form(form)
740 if id_data_policy is not None:
741 data_policies.services[f"{category}:{type_}"] = id_data_policy
742 return data_policies