diff libervia/backend/plugins/plugin_xep_0447.py @ 4334:111dce64dcb5

plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic: Pydantic models are used more and more in Libervia, for the bridge API, and also to convert `domish.Element` to internal representation. Type hints have also been added in many places. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:12:38 +0100
parents 0d7bb4df2343
children 430d5d99a740
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0447.py	Tue Dec 03 00:11:00 2024 +0100
+++ b/libervia/backend/plugins/plugin_xep_0447.py	Tue Dec 03 00:12:38 2024 +0100
@@ -15,12 +15,28 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from abc import ABC, abstractmethod
 from collections import namedtuple
 from functools import partial
 import mimetypes
 from pathlib import Path
-from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+from typing import (
+    Any,
+    Callable,
+    ClassVar,
+    Dict,
+    Final,
+    List,
+    Literal,
+    NamedTuple,
+    Optional,
+    Self,
+    Tuple,
+    Union,
+    cast,
+)
 
+from pydantic import BaseModel, Field
 import treq
 from twisted.internet import defer
 from twisted.words.xish import domish
@@ -30,6 +46,8 @@
 from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import _
 from libervia.backend.core.log import getLogger
+from libervia.backend.plugins.plugin_xep_0103 import URLData, XEP_0103
+from libervia.backend.plugins.plugin_xep_0446 import FileMetadata, XEP_0446
 from libervia.backend.tools import stream
 from libervia.backend.tools.web import treq_client_no_ssl
 
@@ -42,7 +60,13 @@
     C.PI_TYPE: "XEP",
     C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_PROTOCOLS: ["XEP-0447"],
-    C.PI_DEPENDENCIES: ["XEP-0103", "XEP-0334", "XEP-0446", "ATTACH", "DOWNLOAD"],
+    C.PI_DEPENDENCIES: [
+        "XEP-0103",
+        "XEP-0334",
+        "XEP-0446",
+        "ATTACH",
+        "DOWNLOAD",
+    ],
     C.PI_RECOMMENDATIONS: ["XEP-0363"],
     C.PI_MAIN: "XEP_0447",
     C.PI_HANDLER: "no",
@@ -50,7 +74,116 @@
 }
 
 NS_SFS = "urn:xmpp:sfs:0"
-SourceHandler = namedtuple("SourceHandler", ["callback", "encrypted"])
+
+
+class Source(ABC, BaseModel):
+
+    type: ClassVar[str]
+    encrypted: ClassVar[bool] = False
+
+    def __init_subclass__(cls) -> None:
+        super().__init_subclass__()
+        if not hasattr(cls, "type"):
+            raise TypeError(
+                f'Can\'t instantiate {cls.__name__} without "type" class attribute.'
+            )
+
+    @classmethod
+    @abstractmethod
+    def from_element(cls, element: domish.Element) -> Self:
+        """Parse an element and return corresponding model
+
+        @param element: element to parse
+        @raise exceptions.DataError: the element is invalid
+        """
+
+    @abstractmethod
+    def to_element(self) -> domish.Element:
+        """Convert model to an element
+
+        @return: domish.Element representing the model
+        """
+
+
+class FileSharing(BaseModel):
+    """
+    Model for handling XEP-0447 <file-sharing> element.
+    """
+
+    file: FileMetadata
+    sources: list[Source]
+    disposition: str | None = Field(
+        default=None,
+        description="Disposition of the file, either 'attachment' or 'inline'.",
+    )
+    id: str | None = Field(
+        default=None, description="Unique identifier for the file-sharing element."
+    )
+    _sfs: "XEP_0447 | None" = None
+
+    def to_element(self) -> domish.Element:
+        """Build the <file-sharing> element from this instance's data.
+
+        @return: <file-sharing> element.
+        """
+        file_sharing_elt = domish.Element((NS_SFS, "file-sharing"))
+
+        if self.disposition:
+            file_sharing_elt["disposition"] = self.disposition
+
+        if self.id:
+            file_sharing_elt["id"] = self.id
+
+        file_sharing_elt.addChild(self.file.to_element())
+
+        sources_elt = file_sharing_elt.addElement("sources")
+        for source in self.sources:
+            sources_elt.addChild(source.to_element())
+
+        return file_sharing_elt
+
+    @classmethod
+    def from_element(cls, file_sharing_elt: domish.Element) -> Self:
+        """Create a FileSharing instance from a <file-sharing> element or its parent.
+
+        @param file_sharing_elt: The <file-sharing> element or a parent element.
+        @return: FileSharing instance.
+        @raise exceptions.NotFound: If the <file-sharing> element is not found.
+        """
+        assert cls._sfs is not None
+        if file_sharing_elt.uri != NS_SFS or file_sharing_elt.name != "file-sharing":
+            child_file_sharing_elt = next(
+                file_sharing_elt.elements(NS_SFS, "file-sharing"), None
+            )
+            if child_file_sharing_elt is None:
+                raise exceptions.NotFound("<file-sharing> element not found")
+            else:
+                file_sharing_elt = child_file_sharing_elt
+
+        kwargs = {}
+        disposition = file_sharing_elt.getAttribute("disposition")
+        if disposition:
+            kwargs["disposition"] = disposition
+
+        file_id = file_sharing_elt.getAttribute("id")
+        if file_id:
+            kwargs["id"] = file_id
+
+        kwargs["file"] = FileMetadata.from_element(file_sharing_elt)
+        kwargs["sources"] = cls._sfs.parse_sources_elt(file_sharing_elt)
+
+        return cls(**kwargs)
+
+
+class URLDataSource(URLData, Source):
+    type = "url"
+
+    @classmethod
+    def from_element(cls, element: domish.Element) -> Self:
+        return super().from_element(element)
+
+    def to_element(self) -> domish.Element:
+        return super().to_element()
 
 
 class XEP_0447:
@@ -60,27 +193,34 @@
         self.host = host
         log.info(_("XEP-0447 (Stateless File Sharing) plugin initialization"))
         host.register_namespace("sfs", NS_SFS)
-        self._sources_handlers = {}
-        self._u = host.plugins["XEP-0103"]
+        FileSharing._sfs = self
+        self._sources_handlers: dict[tuple[str, str], type[Source]] = {}
+        self._u = cast(XEP_0103, host.plugins["XEP-0103"])
         self._hints = host.plugins["XEP-0334"]
-        self._m = host.plugins["XEP-0446"]
+        self._m = cast(XEP_0446, host.plugins["XEP-0446"])
         self._http_upload = host.plugins.get("XEP-0363")
         self._attach = host.plugins["ATTACH"]
         self._attach.register(self.can_handle_attachment, self.attach, priority=1000)
-        self.register_source_handler(
-            self._u.namespace, "url-data", self._u.parse_url_data_elt
+        self.register_source(
+            self._u.namespace,
+            "url-data",
+            URLDataSource,
+        )
+        self.register_source(
+            self._jp.namespace,
+            "jinglepub",
+            JinglePubSource,
         )
         host.plugins["DOWNLOAD"].register_download_handler(
             self._u.namespace, self.download
         )
         host.trigger.add("message_received", self._message_received_trigger)
 
-    def register_source_handler(
+    def register_source(
         self,
         namespace: str,
         element_name: str,
-        callback: Callable[[domish.Element], Dict[str, Any]],
-        encrypted: bool = False,
+        source: type[Source],
     ) -> None:
         """Register a handler for file source
 
@@ -97,7 +237,7 @@
                 f"There is already a resource handler for namespace {namespace!r} and "
                 f"name {element_name!r}"
             )
-        self._sources_handlers[key] = SourceHandler(callback, encrypted)
+        self._sources_handlers[key] = source
 
     async def download(
         self,
@@ -193,7 +333,7 @@
         if media_type is None and name:
             media_type = mimetypes.guess_type(name, strict=False)[0]
         file_sharing_elt.addChild(
-            self._m.get_file_metadata_elt(
+            self._m.generate_file_metadata(
                 name=name,
                 media_type=media_type,
                 desc=desc,
@@ -204,19 +344,21 @@
                 height=height,
                 length=length,
                 thumbnail=thumbnail,
-            )
+            ).to_element()
         )
         sources_elt = self.get_sources_elt()
         file_sharing_elt.addChild(sources_elt)
         for source_data in sources:
             if "url" in source_data:
-                sources_elt.addChild(self._u.get_url_data_elt(**source_data))
+                sources_elt.addChild(
+                    self._u.generate_url_data(**source_data).to_element()
+                )
             else:
                 raise NotImplementedError(f"source data not implemented: {source_data}")
 
         return file_sharing_elt
 
-    def parse_sources_elt(self, sources_elt: domish.Element) -> List[Dict[str, Any]]:
+    def parse_sources_elt(self, sources_elt: domish.Element) -> List[Source]:
         """Parse <sources/> element
 
         @param sources_elt: <sources/> element, or a direct parent element
@@ -242,12 +384,8 @@
                 log.warning(f"unmanaged file sharing element: {elt.toXml}")
                 continue
             else:
-                source_data = source_handler.callback(elt)
-                if source_handler.encrypted:
-                    source_data[C.MESS_KEY_ENCRYPTED] = True
-                if "type" not in source_data:
-                    source_data["type"] = elt.uri
-                sources.append(source_data)
+                source = source_handler.from_element(elt)
+                sources.append(source)
         return sources
 
     def parse_file_sharing_elt(self, file_sharing_elt: domish.Element) -> Dict[str, Any]:
@@ -263,7 +401,7 @@
             except StopIteration:
                 raise exceptions.NotFound
         try:
-            data = self._m.parse_file_metadata_elt(file_sharing_elt)
+            data = self._m.parse_file_metadata_elt(file_sharing_elt).model_dump()
         except exceptions.NotFound:
             data = {}
         disposition = file_sharing_elt.getAttribute("disposition")