view libervia/backend/plugins/plugin_xep_0358.py @ 4336:6e0918e638ee

plugin XEP-0498: "Pubsub File Sharing" implementation: Partial implementation of XEP-0498, necessary to implement the service part in email gateway. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 430d5d99a740
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin to jingle session publishing.
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import TYPE_CHECKING, Self

from pydantic import BaseModel, Field
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.models.types import DomishElementType, JIDType

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Publishing Available Jingle Sessions",
    C.PI_IMPORT_NAME: "XEP-0358",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [
        "XEP-0234",
    ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0358",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Indicate available Jingle sessions."""),
}

NS_JINGLEPUB = "urn:xmpp:jinglepub:1"


class Meta(BaseModel):
    """Model for meta element in JinglePub element."""

    lang: str | None = Field(None, description="The language of the meta data.")
    title: str = Field(description="The title of the meta data.")
    summary: str | None = Field(None, description="A summary of the meta data.")

    def to_element(self) -> domish.Element:
        """Build the <meta> element from this instance's data.

        @return: <meta> element.
        """
        meta_elt = domish.Element((NS_JINGLEPUB, "meta"))
        if self.lang:
            meta_elt["xml:lang"] = self.lang
        meta_elt["title"] = self.title
        if self.summary:
            meta_elt["summary"] = self.summary
        return meta_elt

    @classmethod
    def from_element(cls, meta_elt: domish.Element) -> Self:
        """Create a Meta instance from a <meta> element.

        @param meta_elt: The <meta> element.
        @return: Meta instance.
        """
        assert meta_elt.name == "meta" and meta_elt.uri == NS_JINGLEPUB
        kwargs = {}
        if meta_elt.hasAttribute("xml:lang"):
            kwargs["lang"] = meta_elt["xml:lang"]
        if meta_elt.hasAttribute("title"):
            kwargs["title"] = meta_elt["title"]
        if meta_elt.hasAttribute("summary"):
            kwargs["summary"] = meta_elt["summary"]
        return cls(**kwargs)


class JinglePub(BaseModel):
    """Model for JinglePub element."""

    from_jid: JIDType = Field(description="The JID of the session owner.")
    id: str = Field(description="The jinglepub identifier.")
    uri: str | None = Field(
        default=None, description="An alternate or informational URI of the content."
    )
    meta: list[Meta] = Field(
        default_factory=list, description="Human-friendly information about the session."
    )
    descriptions: list[DomishElementType] = Field(
        default_factory=list, description="Application format(s) for the Jingle session."
    )

    def set_attributes(self, jinglepub_elt: domish.Element) -> None:
        """Set <jinglepub> element attributes from this instance's data."""
        jinglepub_elt["from"] = str(self.from_jid)
        jinglepub_elt["id"] = self.id

    def set_children(self, jinglepub_elt: domish.Element) -> None:
        """Set <jinglepub> element children from this instance's data."""
        if self.uri:
            uri_elt = jinglepub_elt.addElement((NS_JINGLEPUB, "uri"))
            uri_elt.addContent(self.uri)
        for meta in self.meta:
            jinglepub_elt.addChild(meta.to_element())
        for description in self.descriptions:
            jinglepub_elt.addChild(description)

    def to_element(self) -> domish.Element:
        """Build the <jinglepub> element from this instance's data.

        @return: <jinglepub> element.
        """
        jinglepub_elt = domish.Element((NS_JINGLEPUB, "jinglepub"))
        self.set_attributes(jinglepub_elt)
        self.set_children(jinglepub_elt)
        return jinglepub_elt

    @classmethod
    def from_element(cls, element: domish.Element) -> Self:
        """Create a JinglePub instance from a <jinglepub> element.

        @param jinglepub_elt: The <jinglepub> element.
        @return: JinglePub instance.
        @raise exceptions.NotFound: If the <jinglepub> element is not found.
        """
        if element.uri != NS_JINGLEPUB or element.name != "jinglepub":
            raise exceptions.NotFound("<jinglepub> element not found")

        kwargs = {}
        try:
            kwargs["from_jid"] = jid.JID(element["from"])
        except RuntimeError as e:
            log.warning(f"Can't parse from_jid {e}: {element.toXml()}")
            raise exceptions.NotFound("<jinglepub> element has invalid 'from' attribute")

        if element.hasAttribute("id"):
            kwargs["id"] = element["id"]

        uri_elt = next(element.elements(NS_JINGLEPUB, "uri"), None)
        if uri_elt:
            kwargs["uri"] = str(uri_elt)

        kwargs["meta"] = [
            Meta.from_element(meta_elt)
            for meta_elt in element.elements(NS_JINGLEPUB, "meta")
        ]

        kwargs["descriptions"] = [
            child
            for child in element.elements()
            if child.uri != NS_JINGLEPUB and child.name == "description"
        ]

        return cls(**kwargs)


class XEP_0358:
    namespace = NS_JINGLEPUB

    def __init__(self, host: "LiberviaBackend") -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.register_namespace("jinglepub", NS_JINGLEPUB)

    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
        return JinglePubHandler(self)


@implementer(iwokkel.IDisco)
class JinglePubHandler(XMPPHandler):

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        return [
            disco.DiscoFeature(NS_JINGLEPUB),
        ]

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItems]:
        return []