comparison libervia/backend/plugins/plugin_xep_0447.py @ 4334:111dce64dcb5

plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic: Pydantic models are used more and more in Libervia, for the bridge API, and also to convert `domish.Element` to internal representation. Type hints have also been added in many places. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:12:38 +0100
parents 0d7bb4df2343
children 430d5d99a740
comparison
equal deleted inserted replaced
4333:e94799a0908f 4334:111dce64dcb5
13 # GNU Affero General Public License for more details. 13 # GNU Affero General Public License for more details.
14 14
15 # You should have received a copy of the GNU Affero General Public License 15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>. 16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 from abc import ABC, abstractmethod
18 from collections import namedtuple 19 from collections import namedtuple
19 from functools import partial 20 from functools import partial
20 import mimetypes 21 import mimetypes
21 from pathlib import Path 22 from pathlib import Path
22 from typing import Any, Callable, Dict, List, Optional, Tuple, Union 23 from typing import (
23 24 Any,
25 Callable,
26 ClassVar,
27 Dict,
28 Final,
29 List,
30 Literal,
31 NamedTuple,
32 Optional,
33 Self,
34 Tuple,
35 Union,
36 cast,
37 )
38
39 from pydantic import BaseModel, Field
24 import treq 40 import treq
25 from twisted.internet import defer 41 from twisted.internet import defer
26 from twisted.words.xish import domish 42 from twisted.words.xish import domish
27 43
28 from libervia.backend.core import exceptions 44 from libervia.backend.core import exceptions
29 from libervia.backend.core.constants import Const as C 45 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.core_types import SatXMPPEntity 46 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.core.i18n import _ 47 from libervia.backend.core.i18n import _
32 from libervia.backend.core.log import getLogger 48 from libervia.backend.core.log import getLogger
49 from libervia.backend.plugins.plugin_xep_0103 import URLData, XEP_0103
50 from libervia.backend.plugins.plugin_xep_0446 import FileMetadata, XEP_0446
33 from libervia.backend.tools import stream 51 from libervia.backend.tools import stream
34 from libervia.backend.tools.web import treq_client_no_ssl 52 from libervia.backend.tools.web import treq_client_no_ssl
35 53
36 log = getLogger(__name__) 54 log = getLogger(__name__)
37 55
40 C.PI_NAME: "Stateless File Sharing", 58 C.PI_NAME: "Stateless File Sharing",
41 C.PI_IMPORT_NAME: "XEP-0447", 59 C.PI_IMPORT_NAME: "XEP-0447",
42 C.PI_TYPE: "XEP", 60 C.PI_TYPE: "XEP",
43 C.PI_MODES: C.PLUG_MODE_BOTH, 61 C.PI_MODES: C.PLUG_MODE_BOTH,
44 C.PI_PROTOCOLS: ["XEP-0447"], 62 C.PI_PROTOCOLS: ["XEP-0447"],
45 C.PI_DEPENDENCIES: ["XEP-0103", "XEP-0334", "XEP-0446", "ATTACH", "DOWNLOAD"], 63 C.PI_DEPENDENCIES: [
64 "XEP-0103",
65 "XEP-0334",
66 "XEP-0446",
67 "ATTACH",
68 "DOWNLOAD",
69 ],
46 C.PI_RECOMMENDATIONS: ["XEP-0363"], 70 C.PI_RECOMMENDATIONS: ["XEP-0363"],
47 C.PI_MAIN: "XEP_0447", 71 C.PI_MAIN: "XEP_0447",
48 C.PI_HANDLER: "no", 72 C.PI_HANDLER: "no",
49 C.PI_DESCRIPTION: _("""Implementation of XEP-0447 (Stateless File Sharing)"""), 73 C.PI_DESCRIPTION: _("""Implementation of XEP-0447 (Stateless File Sharing)"""),
50 } 74 }
51 75
52 NS_SFS = "urn:xmpp:sfs:0" 76 NS_SFS = "urn:xmpp:sfs:0"
53 SourceHandler = namedtuple("SourceHandler", ["callback", "encrypted"]) 77
78
79 class Source(ABC, BaseModel):
80
81 type: ClassVar[str]
82 encrypted: ClassVar[bool] = False
83
84 def __init_subclass__(cls) -> None:
85 super().__init_subclass__()
86 if not hasattr(cls, "type"):
87 raise TypeError(
88 f'Can\'t instantiate {cls.__name__} without "type" class attribute.'
89 )
90
91 @classmethod
92 @abstractmethod
93 def from_element(cls, element: domish.Element) -> Self:
94 """Parse an element and return corresponding model
95
96 @param element: element to parse
97 @raise exceptions.DataError: the element is invalid
98 """
99
100 @abstractmethod
101 def to_element(self) -> domish.Element:
102 """Convert model to an element
103
104 @return: domish.Element representing the model
105 """
106
107
108 class FileSharing(BaseModel):
109 """
110 Model for handling XEP-0447 <file-sharing> element.
111 """
112
113 file: FileMetadata
114 sources: list[Source]
115 disposition: str | None = Field(
116 default=None,
117 description="Disposition of the file, either 'attachment' or 'inline'.",
118 )
119 id: str | None = Field(
120 default=None, description="Unique identifier for the file-sharing element."
121 )
122 _sfs: "XEP_0447 | None" = None
123
124 def to_element(self) -> domish.Element:
125 """Build the <file-sharing> element from this instance's data.
126
127 @return: <file-sharing> element.
128 """
129 file_sharing_elt = domish.Element((NS_SFS, "file-sharing"))
130
131 if self.disposition:
132 file_sharing_elt["disposition"] = self.disposition
133
134 if self.id:
135 file_sharing_elt["id"] = self.id
136
137 file_sharing_elt.addChild(self.file.to_element())
138
139 sources_elt = file_sharing_elt.addElement("sources")
140 for source in self.sources:
141 sources_elt.addChild(source.to_element())
142
143 return file_sharing_elt
144
145 @classmethod
146 def from_element(cls, file_sharing_elt: domish.Element) -> Self:
147 """Create a FileSharing instance from a <file-sharing> element or its parent.
148
149 @param file_sharing_elt: The <file-sharing> element or a parent element.
150 @return: FileSharing instance.
151 @raise exceptions.NotFound: If the <file-sharing> element is not found.
152 """
153 assert cls._sfs is not None
154 if file_sharing_elt.uri != NS_SFS or file_sharing_elt.name != "file-sharing":
155 child_file_sharing_elt = next(
156 file_sharing_elt.elements(NS_SFS, "file-sharing"), None
157 )
158 if child_file_sharing_elt is None:
159 raise exceptions.NotFound("<file-sharing> element not found")
160 else:
161 file_sharing_elt = child_file_sharing_elt
162
163 kwargs = {}
164 disposition = file_sharing_elt.getAttribute("disposition")
165 if disposition:
166 kwargs["disposition"] = disposition
167
168 file_id = file_sharing_elt.getAttribute("id")
169 if file_id:
170 kwargs["id"] = file_id
171
172 kwargs["file"] = FileMetadata.from_element(file_sharing_elt)
173 kwargs["sources"] = cls._sfs.parse_sources_elt(file_sharing_elt)
174
175 return cls(**kwargs)
176
177
178 class URLDataSource(URLData, Source):
179 type = "url"
180
181 @classmethod
182 def from_element(cls, element: domish.Element) -> Self:
183 return super().from_element(element)
184
185 def to_element(self) -> domish.Element:
186 return super().to_element()
54 187
55 188
56 class XEP_0447: 189 class XEP_0447:
57 namespace = NS_SFS 190 namespace = NS_SFS
58 191
59 def __init__(self, host): 192 def __init__(self, host):
60 self.host = host 193 self.host = host
61 log.info(_("XEP-0447 (Stateless File Sharing) plugin initialization")) 194 log.info(_("XEP-0447 (Stateless File Sharing) plugin initialization"))
62 host.register_namespace("sfs", NS_SFS) 195 host.register_namespace("sfs", NS_SFS)
63 self._sources_handlers = {} 196 FileSharing._sfs = self
64 self._u = host.plugins["XEP-0103"] 197 self._sources_handlers: dict[tuple[str, str], type[Source]] = {}
198 self._u = cast(XEP_0103, host.plugins["XEP-0103"])
65 self._hints = host.plugins["XEP-0334"] 199 self._hints = host.plugins["XEP-0334"]
66 self._m = host.plugins["XEP-0446"] 200 self._m = cast(XEP_0446, host.plugins["XEP-0446"])
67 self._http_upload = host.plugins.get("XEP-0363") 201 self._http_upload = host.plugins.get("XEP-0363")
68 self._attach = host.plugins["ATTACH"] 202 self._attach = host.plugins["ATTACH"]
69 self._attach.register(self.can_handle_attachment, self.attach, priority=1000) 203 self._attach.register(self.can_handle_attachment, self.attach, priority=1000)
70 self.register_source_handler( 204 self.register_source(
71 self._u.namespace, "url-data", self._u.parse_url_data_elt 205 self._u.namespace,
206 "url-data",
207 URLDataSource,
208 )
209 self.register_source(
210 self._jp.namespace,
211 "jinglepub",
212 JinglePubSource,
72 ) 213 )
73 host.plugins["DOWNLOAD"].register_download_handler( 214 host.plugins["DOWNLOAD"].register_download_handler(
74 self._u.namespace, self.download 215 self._u.namespace, self.download
75 ) 216 )
76 host.trigger.add("message_received", self._message_received_trigger) 217 host.trigger.add("message_received", self._message_received_trigger)
77 218
78 def register_source_handler( 219 def register_source(
79 self, 220 self,
80 namespace: str, 221 namespace: str,
81 element_name: str, 222 element_name: str,
82 callback: Callable[[domish.Element], Dict[str, Any]], 223 source: type[Source],
83 encrypted: bool = False,
84 ) -> None: 224 ) -> None:
85 """Register a handler for file source 225 """Register a handler for file source
86 226
87 @param namespace: namespace of the element supported 227 @param namespace: namespace of the element supported
88 @param element_name: name of the element supported 228 @param element_name: name of the element supported
95 if key in self._sources_handlers: 235 if key in self._sources_handlers:
96 raise exceptions.ConflictError( 236 raise exceptions.ConflictError(
97 f"There is already a resource handler for namespace {namespace!r} and " 237 f"There is already a resource handler for namespace {namespace!r} and "
98 f"name {element_name!r}" 238 f"name {element_name!r}"
99 ) 239 )
100 self._sources_handlers[key] = SourceHandler(callback, encrypted) 240 self._sources_handlers[key] = source
101 241
102 async def download( 242 async def download(
103 self, 243 self,
104 client: SatXMPPEntity, 244 client: SatXMPPEntity,
105 attachment: Dict[str, Any], 245 attachment: Dict[str, Any],
191 if disposition is not None: 331 if disposition is not None:
192 file_sharing_elt["disposition"] = disposition 332 file_sharing_elt["disposition"] = disposition
193 if media_type is None and name: 333 if media_type is None and name:
194 media_type = mimetypes.guess_type(name, strict=False)[0] 334 media_type = mimetypes.guess_type(name, strict=False)[0]
195 file_sharing_elt.addChild( 335 file_sharing_elt.addChild(
196 self._m.get_file_metadata_elt( 336 self._m.generate_file_metadata(
197 name=name, 337 name=name,
198 media_type=media_type, 338 media_type=media_type,
199 desc=desc, 339 desc=desc,
200 size=size, 340 size=size,
201 file_hash=file_hash, 341 file_hash=file_hash,
202 date=date, 342 date=date,
203 width=width, 343 width=width,
204 height=height, 344 height=height,
205 length=length, 345 length=length,
206 thumbnail=thumbnail, 346 thumbnail=thumbnail,
207 ) 347 ).to_element()
208 ) 348 )
209 sources_elt = self.get_sources_elt() 349 sources_elt = self.get_sources_elt()
210 file_sharing_elt.addChild(sources_elt) 350 file_sharing_elt.addChild(sources_elt)
211 for source_data in sources: 351 for source_data in sources:
212 if "url" in source_data: 352 if "url" in source_data:
213 sources_elt.addChild(self._u.get_url_data_elt(**source_data)) 353 sources_elt.addChild(
354 self._u.generate_url_data(**source_data).to_element()
355 )
214 else: 356 else:
215 raise NotImplementedError(f"source data not implemented: {source_data}") 357 raise NotImplementedError(f"source data not implemented: {source_data}")
216 358
217 return file_sharing_elt 359 return file_sharing_elt
218 360
219 def parse_sources_elt(self, sources_elt: domish.Element) -> List[Dict[str, Any]]: 361 def parse_sources_elt(self, sources_elt: domish.Element) -> List[Source]:
220 """Parse <sources/> element 362 """Parse <sources/> element
221 363
222 @param sources_elt: <sources/> element, or a direct parent element 364 @param sources_elt: <sources/> element, or a direct parent element
223 @return: list of found sources data 365 @return: list of found sources data
224 @raise: exceptions.NotFound: Can't find <sources/> element 366 @raise: exceptions.NotFound: Can't find <sources/> element
240 source_handler = self._sources_handlers[key] 382 source_handler = self._sources_handlers[key]
241 except KeyError: 383 except KeyError:
242 log.warning(f"unmanaged file sharing element: {elt.toXml}") 384 log.warning(f"unmanaged file sharing element: {elt.toXml}")
243 continue 385 continue
244 else: 386 else:
245 source_data = source_handler.callback(elt) 387 source = source_handler.from_element(elt)
246 if source_handler.encrypted: 388 sources.append(source)
247 source_data[C.MESS_KEY_ENCRYPTED] = True
248 if "type" not in source_data:
249 source_data["type"] = elt.uri
250 sources.append(source_data)
251 return sources 389 return sources
252 390
253 def parse_file_sharing_elt(self, file_sharing_elt: domish.Element) -> Dict[str, Any]: 391 def parse_file_sharing_elt(self, file_sharing_elt: domish.Element) -> Dict[str, Any]:
254 """Parse <file-sharing/> element and return file-sharing data 392 """Parse <file-sharing/> element and return file-sharing data
255 393
261 try: 399 try:
262 file_sharing_elt = next(file_sharing_elt.elements(NS_SFS, "file-sharing")) 400 file_sharing_elt = next(file_sharing_elt.elements(NS_SFS, "file-sharing"))
263 except StopIteration: 401 except StopIteration:
264 raise exceptions.NotFound 402 raise exceptions.NotFound
265 try: 403 try:
266 data = self._m.parse_file_metadata_elt(file_sharing_elt) 404 data = self._m.parse_file_metadata_elt(file_sharing_elt).model_dump()
267 except exceptions.NotFound: 405 except exceptions.NotFound:
268 data = {} 406 data = {}
269 disposition = file_sharing_elt.getAttribute("disposition") 407 disposition = file_sharing_elt.getAttribute("disposition")
270 if disposition is not None: 408 if disposition is not None:
271 data["disposition"] = disposition 409 data["disposition"] = disposition