Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_exp_data_policy.py @ 4378:930a4ea7ab6f
plugin data policy: Data Policy implementation:
This plugin implement data policy parsing and an algorithm to calculate a score based on
them.
rel 460
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Jun 2025 17:02:33 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4377:448d701187b8 | 4378:930a4ea7ab6f |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for handling stateless file sharing encryption | |
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 from enum import Enum, StrEnum, auto | |
21 import enum | |
22 from typing import TYPE_CHECKING, Self, cast | |
23 from typing import get_type_hints | |
24 | |
25 from pydantic import BaseModel, ConfigDict, Field, computed_field | |
26 from twisted.internet import defer | |
27 from twisted.words.protocols.jabber import jid | |
28 from wokkel import data_form | |
29 | |
30 from libervia.backend.core import exceptions | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.core.core_types import SatXMPPEntity | |
33 from libervia.backend.core.i18n import D_, _ | |
34 from libervia.backend.core.log import getLogger | |
35 | |
36 if TYPE_CHECKING: | |
37 from libervia.backend.core.main import LiberviaBackend | |
38 | |
39 log = getLogger(__name__) | |
40 IMPORT_NAME = "DATA-POLICY" | |
41 | |
42 PLUGIN_INFO = { | |
43 C.PI_NAME: "Data Policy", | |
44 C.PI_IMPORT_NAME: IMPORT_NAME, | |
45 C.PI_TYPE: C.PLUG_TYPE_EXP, | |
46 C.PI_PROTOCOLS: [], | |
47 C.PI_DEPENDENCIES: [], | |
48 C.PI_MAIN: "DATA_POLICY", | |
49 C.PI_HANDLER: "no", | |
50 } | |
51 | |
52 NS_DATA_POLICY_BASE = "urn:xmpp:data-policy" | |
53 NS_DATA_POLICY = f"{NS_DATA_POLICY_BASE}:0" | |
54 NS_DATA_POLICY_ID_PREFIX = f"{NS_DATA_POLICY_BASE}:identity:" | |
55 NS_DATA_POLICY_ID_SUFFIX = ":0" | |
56 NS_DATA_POLICY_ID_TPL = ( | |
57 f"{NS_DATA_POLICY_ID_PREFIX}{{category}}:{{type}}{NS_DATA_POLICY_ID_SUFFIX}" | |
58 ) | |
59 | |
60 | |
61 class IndividualScore(BaseModel): | |
62 score: int | |
63 description: str | |
64 | |
65 | |
66 class Score(BaseModel): | |
67 score: int | |
68 minimum: int | |
69 maximum: int | |
70 detail: list[IndividualScore] | |
71 | |
72 | |
73 class ScoredStrEnum(StrEnum): | |
74 _score_map = enum.nonmember({}) | |
75 _min_score = enum.nonmember(0) | |
76 _max_score = enum.nonmember(0) | |
77 | |
78 @classmethod | |
79 def get_score(cls, value: str) -> Score: | |
80 score, description = cls._score_map[value] | |
81 return Score( | |
82 score=score, | |
83 minimum=cls._min_score, | |
84 maximum=cls._max_score, | |
85 detail=[IndividualScore(score=score, description=description)], | |
86 ) | |
87 | |
88 def __init_subclass__(cls) -> None: | |
89 try: | |
90 score_map = cls._score_map | |
91 except AttributeError: | |
92 raise exceptions.InternalError('"_score_map" must be set.') | |
93 if not score_map: | |
94 raise exceptions.InternalError("ScoredEnum must set _score_map.") | |
95 | |
96 if set(score_map.keys()) != set(cls): | |
97 raise exceptions.InternalError( | |
98 "All enum members must be present in _score_map." | |
99 ) | |
100 | |
101 all_scores = [score for score, _ in score_map.values()] | |
102 cls._min_score = min(all_scores) | |
103 cls._max_score = max(all_scores) | |
104 | |
105 | |
106 class AuthMechanism(ScoredStrEnum): | |
107 NO_AUTH = auto() | |
108 PLAIN = auto() | |
109 HIDDEN = auto() | |
110 RESTRICTED = auto() | |
111 | |
112 _score_map = enum.nonmember( | |
113 { | |
114 NO_AUTH: (20, D_("No authentication is needed.")), | |
115 PLAIN: (-20, D_("Your login data are transmitted to this service.")), | |
116 HIDDEN: ( | |
117 0, | |
118 D_("This service logs to your account, but doesn't get logging data."), | |
119 ), | |
120 RESTRICTED: ( | |
121 15, | |
122 D_("This service logs to your account in a restricted way."), | |
123 ), | |
124 } | |
125 ) | |
126 | |
127 | |
128 class DataTransmission(ScoredStrEnum): | |
129 PLAIN = auto() | |
130 ENCRYPTED = auto() | |
131 E2E = auto() | |
132 GRE = auto() | |
133 | |
134 _score_map = enum.nonmember( | |
135 { | |
136 PLAIN: ( | |
137 -20, | |
138 D_( | |
139 "Data is transmitted without encryption. This is highly insecure and " | |
140 "risks data interception." | |
141 ), | |
142 ), | |
143 ENCRYPTED: ( | |
144 0, | |
145 D_( | |
146 "Data is encrypted during transmission but not end-to-end. The " | |
147 "service can view the data." | |
148 ), | |
149 ), | |
150 E2E: ( | |
151 10, | |
152 D_( | |
153 "Data is end-to-end encrypted from the service. Only the service and " | |
154 "the recipient(s) can view the data." | |
155 ), | |
156 ), | |
157 GRE: ( | |
158 30, | |
159 D_( | |
160 "Data uses Gateway Relayed Encryption, ensuring end-to-end security, " | |
161 "only your and your recipient(s) can view the data. Highly secure." | |
162 ), | |
163 ), | |
164 } | |
165 ) | |
166 | |
167 | |
168 class AccessPolicy(ScoredStrEnum): | |
169 ADMINS = auto() | |
170 MODERATORS = auto() | |
171 ORGANIZATION_MEMBER = auto() | |
172 GOVERNMENT = auto() | |
173 ADVERTISERS = auto() | |
174 PARTNERS = auto() | |
175 NONE = auto() | |
176 | |
177 _score_map = enum.nonmember( | |
178 { | |
179 ADMINS: ( | |
180 -5, | |
181 D_( | |
182 "Service administrators can access user data for operational " | |
183 "purposes." | |
184 ), | |
185 ), | |
186 MODERATORS: ( | |
187 -10, | |
188 D_("Moderators can access user data within their moderation scope."), | |
189 ), | |
190 ORGANIZATION_MEMBER: ( | |
191 -15, | |
192 D_("Any organization member can access user data."), | |
193 ), | |
194 GOVERNMENT: ( | |
195 -10, | |
196 D_( | |
197 "Government authorities can access user data under legal " | |
198 "requirements." | |
199 ), | |
200 ), | |
201 ADVERTISERS: ( | |
202 -30, | |
203 D_("Third-party advertisers can access user data for targeted ads."), | |
204 ), | |
205 PARTNERS: ( | |
206 -20, | |
207 D_("Business partners can access user data under agreements."), | |
208 ), | |
209 NONE: (20, D_("No entity other than the user can access user data.")), | |
210 } | |
211 ) | |
212 | |
213 | |
214 class DataPolicy(BaseModel): | |
215 """Represents a data policy form as defined in Data Policy XEP. | |
216 | |
217 Fields correspond to the data policy specification and may be None when not provided. | |
218 """ | |
219 | |
220 model_config = ConfigDict(use_enum_values=True) | |
221 | |
222 auth_data: AuthMechanism | None = None | |
223 data_transmission: DataTransmission | None = None | |
224 encryption_algorithm: str | None = None | |
225 data_retention: str | None = None | |
226 data_deletion: bool | None = None | |
227 encryption_at_rest: bool | None = None | |
228 tos: str | None = None | |
229 data_export: bool | None = None | |
230 access_policy: set[AccessPolicy] | None = None | |
231 full_erasure: bool | None = None | |
232 backup_frequency: str | None = None | |
233 backup_retention: str | None = None | |
234 extra_info: str | None = None | |
235 | |
236 @computed_field | |
237 @property | |
238 def score(self) -> Score: | |
239 """Calculate a score based on the filled fields. | |
240 | |
241 This score helps assess the quality of the data policy at a glance. | |
242 """ | |
243 total_score = 0 | |
244 overall_min = 0 | |
245 overall_max = 0 | |
246 detail = [] | |
247 | |
248 fields_names = set(self.__class__.model_fields.keys()) | |
249 | |
250 for field_name in fields_names: | |
251 score = getattr(self, f"_{field_name}_score") | |
252 if score is not None: | |
253 total_score += score.score | |
254 overall_min += score.minimum | |
255 overall_max += score.maximum | |
256 detail.extend(score.detail) | |
257 | |
258 return Score( | |
259 score=total_score, minimum=overall_min, maximum=overall_max, detail=detail | |
260 ) | |
261 | |
262 @property | |
263 def _auth_data_score(self) -> Score | None: | |
264 if self.auth_data is None: | |
265 return None | |
266 | |
267 return AuthMechanism.get_score(self.auth_data) | |
268 | |
269 @property | |
270 def _data_transmission_score(self) -> Score | None: | |
271 if self.data_transmission is None: | |
272 return None | |
273 | |
274 return DataTransmission.get_score(self.data_transmission) | |
275 | |
276 @property | |
277 def _encryption_algorithm_score(self) -> Score | None: | |
278 ENCRYPTION_ALGORITHM_SET = (10, D_("The encryption algorithm is {}.")) | |
279 ENCRYPTION_ALGORITHM_UNSET = ( | |
280 -10, | |
281 D_("The encryption algorithm is not specified."), | |
282 ) | |
283 ALL = (ENCRYPTION_ALGORITHM_SET, ENCRYPTION_ALGORITHM_UNSET) | |
284 if not self.data_transmission or self.data_transmission not in ( | |
285 DataTransmission.E2E, | |
286 DataTransmission.ENCRYPTED, | |
287 ): | |
288 return None | |
289 | |
290 if self.encryption_algorithm: | |
291 score, desc = ENCRYPTION_ALGORITHM_SET | |
292 desc = desc.format(self.encryption_algorithm) | |
293 else: | |
294 score, desc = ENCRYPTION_ALGORITHM_UNSET | |
295 | |
296 return Score( | |
297 score=0, | |
298 minimum=min(score for score, _ in ALL), | |
299 maximum=max(score for score, _ in ALL), | |
300 detail=[IndividualScore(score=score, description=desc)], | |
301 ) | |
302 | |
303 @property | |
304 def _data_retention_score(self) -> Score | None: | |
305 if self.data_retention is None: | |
306 return None | |
307 | |
308 DATA_RETENTION_0 = (10, D_("The service does not store data.")) | |
309 DATA_RETENTION_INFINITE = ( | |
310 -15, | |
311 D_("Data is stored indefinitely, which may pose privacy risks."), | |
312 ) | |
313 DATA_RETENTION_UNKNOWN = (-10, D_("Data retention policy is unknown.")) | |
314 DATA_RETENTION_DEFAULT = (0, D_("Data is stored for {days:.02f} day(s).")) | |
315 DATA_RETENTION_INVALID = (-20, D_("Invalid data retention policy ({value!r}).")) | |
316 ALL = [ | |
317 DATA_RETENTION_0, | |
318 DATA_RETENTION_INFINITE, | |
319 DATA_RETENTION_UNKNOWN, | |
320 DATA_RETENTION_DEFAULT, | |
321 DATA_RETENTION_INVALID, | |
322 ] | |
323 | |
324 value = self.data_retention | |
325 if value == "0": | |
326 score, desc = DATA_RETENTION_0 | |
327 elif value == "infinite": | |
328 score, desc = DATA_RETENTION_INFINITE | |
329 elif value == "unknown": | |
330 score, desc = DATA_RETENTION_UNKNOWN | |
331 else: | |
332 try: | |
333 hours = int(value) | |
334 days = hours / 24 | |
335 desc = DATA_RETENTION_DEFAULT[1].format(days=days) | |
336 score = DATA_RETENTION_DEFAULT[0] | |
337 except ValueError: | |
338 score, desc = DATA_RETENTION_INVALID | |
339 | |
340 return Score( | |
341 score=score, | |
342 minimum=min(score for score, _ in ALL), | |
343 maximum=max(score for score, _ in ALL), | |
344 detail=[IndividualScore(score=score, description=desc)], | |
345 ) | |
346 | |
347 @property | |
348 def _data_deletion_score(self) -> Score | None: | |
349 if self.data_deletion is None: | |
350 return None | |
351 | |
352 DATA_DELETION_TRUE = (20, D_("Users can delete data on this service.")) | |
353 DATA_DELETION_FALSE = (-10, D_("Users cannot delete data on this service.")) | |
354 ALL = [DATA_DELETION_TRUE, DATA_DELETION_FALSE] | |
355 | |
356 score, desc = DATA_DELETION_TRUE if self.data_deletion else DATA_DELETION_FALSE | |
357 | |
358 return Score( | |
359 score=score, | |
360 minimum=min(score for score, _ in ALL), | |
361 maximum=max(score for score, _ in ALL), | |
362 detail=[IndividualScore(score=score, description=desc)], | |
363 ) | |
364 | |
365 @property | |
366 def _encryption_at_rest_score(self) -> Score | None: | |
367 if self.encryption_at_rest is None: | |
368 return None | |
369 | |
370 ENCRYPTION_AT_REST_TRUE = (10, D_("Data is encrypted at rest.")) | |
371 ENCRYPTION_AT_REST_FALSE = (-5, D_("Data is not encrypted at rest.")) | |
372 ALL = [ENCRYPTION_AT_REST_TRUE, ENCRYPTION_AT_REST_FALSE] | |
373 | |
374 score, desc = ( | |
375 ENCRYPTION_AT_REST_TRUE | |
376 if self.encryption_at_rest | |
377 else ENCRYPTION_AT_REST_FALSE | |
378 ) | |
379 | |
380 return Score( | |
381 score=score, | |
382 minimum=min(score for score, _ in ALL), | |
383 maximum=max(score for score, _ in ALL), | |
384 detail=[IndividualScore(score=score, description=desc)], | |
385 ) | |
386 | |
387 @property | |
388 def _tos_score(self) -> Score: | |
389 TOS_SET = (5, D_("Terms of Service are linked.")) | |
390 TOS_UNSET = (-5, D_("Terms of Service are not linked.")) | |
391 ALL = [TOS_SET, TOS_UNSET] | |
392 score, desc = TOS_SET if self.tos else TOS_UNSET | |
393 | |
394 return Score( | |
395 score=score, | |
396 minimum=min(score for score, _ in ALL), | |
397 maximum=max(score for score, _ in ALL), | |
398 detail=[IndividualScore(score=score, description=desc)], | |
399 ) | |
400 | |
401 @property | |
402 def _data_export_score(self) -> Score | None: | |
403 if self.data_export is None: | |
404 return None | |
405 | |
406 DATA_EXPORT_TRUE = (15, D_("Users can export their data.")) | |
407 DATA_EXPORT_FALSE = (-10, D_("Users cannot export their data.")) | |
408 ALL = [DATA_EXPORT_TRUE, DATA_EXPORT_FALSE] | |
409 | |
410 score, desc = DATA_EXPORT_TRUE if self.data_export else DATA_EXPORT_FALSE | |
411 | |
412 return Score( | |
413 score=score, | |
414 minimum=min(score for score, _ in ALL), | |
415 maximum=max(score for score, _ in ALL), | |
416 detail=[IndividualScore(score=score, description=desc)], | |
417 ) | |
418 | |
419 @property | |
420 def _full_erasure_score(self) -> Score | None: | |
421 if self.full_erasure is None: | |
422 return None | |
423 | |
424 FULL_ERASURE_TRUE = (20, D_("Users can fully erase their account and data.")) | |
425 FULL_ERASURE_FALSE = (-20, D_("Users cannot fully erase their account and data.")) | |
426 ALL = [FULL_ERASURE_TRUE, FULL_ERASURE_FALSE] | |
427 | |
428 score, desc = FULL_ERASURE_TRUE if self.full_erasure else FULL_ERASURE_FALSE | |
429 | |
430 return Score( | |
431 score=score, | |
432 minimum=min(score for score, _ in ALL), | |
433 maximum=max(score for score, _ in ALL), | |
434 detail=[IndividualScore(score=score, description=desc)], | |
435 ) | |
436 | |
437 @property | |
438 def _backup_frequency_score(self) -> Score | None: | |
439 if self.backup_frequency is None: | |
440 return None | |
441 | |
442 BACKUP_FREQUENCY_0 = (0, D_("The service does not do backups.")) | |
443 BACKUP_FREQUENCY_DEFAULT = (5, D_("Backups are done every {days:.02f} day(s).")) | |
444 BACKUP_FREQUENCY_INVALID = (-20, D_("Invalid backup frequency {value!r}.")) | |
445 ALL = [ | |
446 BACKUP_FREQUENCY_0, | |
447 BACKUP_FREQUENCY_DEFAULT, | |
448 BACKUP_FREQUENCY_INVALID, | |
449 ] | |
450 | |
451 value = self.backup_frequency | |
452 if value == "0": | |
453 score, desc = BACKUP_FREQUENCY_0 | |
454 else: | |
455 try: | |
456 hours = int(value) | |
457 days = hours / 24 | |
458 desc = BACKUP_FREQUENCY_DEFAULT[1].format(days=days) | |
459 score = BACKUP_FREQUENCY_DEFAULT[0] | |
460 except ValueError: | |
461 score, desc = BACKUP_FREQUENCY_INVALID | |
462 | |
463 return Score( | |
464 score=score, | |
465 minimum=min(score for score, _ in ALL), | |
466 maximum=max(score for score, _ in ALL), | |
467 detail=[IndividualScore(score=score, description=desc)], | |
468 ) | |
469 | |
470 @property | |
471 def _backup_retention_score(self) -> Score | None: | |
472 if self.backup_retention is None: | |
473 return None | |
474 | |
475 BACKUP_RETENTION_0 = (0, D_("No backups are done.")) | |
476 BACKUP_RETENTION_INFINITE = (-10, D_("Backups are stored indefinitely.")) | |
477 BACKUP_RETENTION_UNKNOWN = (-5, D_("Backup retention policy is unknown.")) | |
478 BACKUP_RETENTION_DEFAULT = (0, D_("Backups are kept for {days:.02f} day(s).")) | |
479 BACKUP_RETENTION_INVALID = (-20, D_("Invalid backup retention {value!r}.")) | |
480 ALL = [ | |
481 BACKUP_RETENTION_0, | |
482 BACKUP_RETENTION_INFINITE, | |
483 BACKUP_RETENTION_UNKNOWN, | |
484 BACKUP_RETENTION_DEFAULT, | |
485 BACKUP_RETENTION_INVALID, | |
486 ] | |
487 | |
488 value = self.backup_retention | |
489 if value == "0": | |
490 score, desc = BACKUP_RETENTION_0 | |
491 elif value == "infinite": | |
492 score, desc = BACKUP_RETENTION_INFINITE | |
493 elif value == "unknown": | |
494 score, desc = BACKUP_RETENTION_UNKNOWN | |
495 else: | |
496 try: | |
497 hours = int(value) | |
498 days = hours / 24 | |
499 desc = BACKUP_RETENTION_DEFAULT[1].format(days=days) | |
500 score = BACKUP_RETENTION_DEFAULT[0] | |
501 except ValueError: | |
502 score, desc = BACKUP_RETENTION_INVALID | |
503 | |
504 return Score( | |
505 score=score, | |
506 minimum=min(score for score, _ in ALL), | |
507 maximum=max(score for score, _ in ALL), | |
508 detail=[IndividualScore(score=score, description=desc)], | |
509 ) | |
510 | |
511 @property | |
512 def _access_policy_score(self) -> Score | None: | |
513 if self.access_policy is None: | |
514 return None | |
515 | |
516 total_score = 0 | |
517 overall_min = 0 | |
518 overall_max = 0 | |
519 details = [] | |
520 | |
521 for policy in self.access_policy: | |
522 policy_score = AccessPolicy.get_score(policy) | |
523 total_score += policy_score.score | |
524 overall_min += policy_score.minimum | |
525 overall_max += policy_score.maximum | |
526 details.extend(policy_score.detail) | |
527 | |
528 return Score( | |
529 score=total_score, minimum=overall_min, maximum=overall_max, detail=details | |
530 ) | |
531 | |
532 @property | |
533 def _extra_info_score(self) -> Score | None: | |
534 if self.extra_info is None: | |
535 return None | |
536 | |
537 return Score(score=0, minimum=0, maximum=0, detail=[]) | |
538 | |
539 @classmethod | |
540 def from_data_form(cls, form: data_form.Form) -> "DataPolicy | None": | |
541 """Create a DataPolicy instance from a Wokkel Data Form. | |
542 | |
543 @param form: The data form to parse. | |
544 @return: Parsed DataPolicy instance or None if form type doesn't match. | |
545 """ | |
546 if not form.formNamespace or not form.formNamespace.startswith( | |
547 NS_DATA_POLICY_BASE | |
548 ): | |
549 return None | |
550 | |
551 fields = cls.model_fields.keys() | |
552 kwargs = {} | |
553 | |
554 for name in fields: | |
555 if name not in form: | |
556 continue | |
557 | |
558 value = form.get(name) | |
559 | |
560 match name: | |
561 case "access_policy": | |
562 if not value: | |
563 continue | |
564 | |
565 policy_set = set() | |
566 policies = [value] if not isinstance(value, list) else value | |
567 for policy in policies: | |
568 if policy: | |
569 policy_set.add(AccessPolicy(policy)) | |
570 if policy_set: | |
571 kwargs[name] = policy_set | |
572 | |
573 case "auth_data": | |
574 if value: | |
575 kwargs[name] = AuthMechanism(value) | |
576 | |
577 case "data_transmission": | |
578 if value: | |
579 kwargs[name] = DataTransmission(value) | |
580 | |
581 case "extra_info": | |
582 if isinstance(value, list): | |
583 kwargs[name] = "\n".join(str(line) for line in value) | |
584 else: | |
585 kwargs[name] = str(value) | |
586 | |
587 case _: | |
588 kwargs[name] = value | |
589 | |
590 return cls(**kwargs) | |
591 | |
592 def to_data_form( | |
593 self, category: str | None = None, type_: str | None = None | |
594 ) -> data_form.Form: | |
595 """Convert this model to a Wokkel Data Form. | |
596 | |
597 @return: Form with type='result' containing all non-None fields. | |
598 """ | |
599 if category is not None: | |
600 if type_ is not None: | |
601 raise exceptions.InternalError( | |
602 'If "category" is set, "type_" must be set too.' | |
603 ) | |
604 form_ns = NS_DATA_POLICY_ID_TPL.format(category=category, type=type_) | |
605 else: | |
606 form_ns = NS_DATA_POLICY | |
607 form_fields = [] | |
608 | |
609 for name in self.__class__.model_fields.keys(): | |
610 value = getattr(self, name) | |
611 if value is None: | |
612 continue | |
613 | |
614 match name: | |
615 case "auth_data" | "data_transmission": | |
616 form_fields.append( | |
617 data_form.Field( | |
618 fieldType="list-single", | |
619 var=name, | |
620 value=value.value, | |
621 ) | |
622 ) | |
623 case ( | |
624 "data_deletion" | |
625 | "encryption_at_rest" | |
626 | "data_export" | |
627 | "full_erasure" | |
628 ): | |
629 form_fields.append( | |
630 data_form.Field( | |
631 fieldType="boolean", | |
632 var=name, | |
633 value=value, | |
634 ) | |
635 ) | |
636 case "access_policy": | |
637 field_values = [policy.value for policy in value] | |
638 form_fields.append( | |
639 data_form.Field( | |
640 fieldType="list-multi", | |
641 var=name, | |
642 values=field_values, | |
643 ) | |
644 ) | |
645 case "extra_info": | |
646 lines = value.split("\n") | |
647 form_fields.append( | |
648 data_form.Field( | |
649 fieldType="text-multi", | |
650 var=name, | |
651 values=lines, | |
652 ) | |
653 ) | |
654 case _: | |
655 form_fields.append( | |
656 data_form.Field( | |
657 fieldType="text-single", | |
658 var=name, | |
659 value=str(value), | |
660 ) | |
661 ) | |
662 | |
663 return data_form.Form( | |
664 formType="result", | |
665 formNamespace=form_ns, | |
666 fields=form_fields, | |
667 ) | |
668 | |
669 | |
670 class DataPolicies(BaseModel): | |
671 main: DataPolicy | |
672 services: dict[str, DataPolicy] = Field( | |
673 default=dict(), | |
674 description=( | |
675 "Identity to data policy map. Identity is used as key with the " | |
676 'template "{identity}:{type}".' | |
677 ), | |
678 ) | |
679 | |
680 | |
681 class DATA_POLICY: | |
682 namespace = NS_DATA_POLICY | |
683 | |
684 def __init__(self, host: "LiberviaBackend") -> None: | |
685 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
686 self.host = host | |
687 host.register_namespace("data-policy", NS_DATA_POLICY) | |
688 host.bridge.add_method( | |
689 "data_policy_get", | |
690 ".plugin", | |
691 in_sign="ss", | |
692 out_sign="s", | |
693 method=self._data_policy_get, | |
694 async_=True, | |
695 ) | |
696 | |
697 def _data_policy_get(self, target: str, profile_key: str) -> defer.Deferred[str]: | |
698 client = self.host.get_client(profile_key) | |
699 d = defer.ensureDeferred(self.get_data_policy(client, jid.JID(target))) | |
700 d.addCallback( | |
701 lambda data_policies: ( | |
702 "" if data_policies is None else data_policies.model_dump_json() | |
703 ) | |
704 ) | |
705 d = cast(defer.Deferred[str], d) | |
706 return d | |
707 | |
708 async def get_data_policy( | |
709 self, client: SatXMPPEntity, target_jid: jid.JID | |
710 ) -> DataPolicies | None: | |
711 infos = await self.host.memory.disco.get_infos(client, target_jid) | |
712 | |
713 if NS_DATA_POLICY not in infos.extensions: | |
714 return None | |
715 | |
716 # Main data policy. | |
717 data_policy = DataPolicy.from_data_form(infos.extensions[NS_DATA_POLICY]) | |
718 if data_policy is None: | |
719 log.error(f"DataPolicy should be found at this point.") | |
720 return None | |
721 | |
722 data_policies = DataPolicies(main=data_policy) | |
723 | |
724 # Now we looks for identities data policies. | |
725 for namespace, form in infos.extensions.items(): | |
726 if namespace.startswith(NS_DATA_POLICY_ID_PREFIX) and namespace.endswith( | |
727 NS_DATA_POLICY_ID_SUFFIX | |
728 ): | |
729 identity_data = namespace[ | |
730 len(NS_DATA_POLICY_ID_PREFIX) : -len(NS_DATA_POLICY_ID_SUFFIX) | |
731 ] | |
732 try: | |
733 category, type_ = identity_data.split(":", 1) | |
734 except ValueError: | |
735 log.warning( | |
736 "Invalid namespace for identity data policy: " f"{namespace!r}" | |
737 ) | |
738 else: | |
739 id_data_policy = DataPolicy.from_data_form(form) | |
740 if id_data_policy is not None: | |
741 data_policies.services[f"{category}:{type_}"] = id_data_policy | |
742 return data_policies |