view libervia/backend/plugins/plugin_xep_0402.py @ 4337:95792a1f26c7

component email gateway: attachments handling: attachments are now stored, and metadata are created in database. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 554a87ae17a6
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin to handle chat room bookmarks via PEP
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Iterator, Self, cast

from pydantic import BaseModel, Field, RootModel
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish
from wokkel import pubsub

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.models.types import DomishElementType, JIDType
from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
from libervia.backend.plugins.plugin_xep_0048 import XEP_0048
from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
from libervia.backend.tools import utils
from libervia.backend.tools.common import data_format

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "PEP Native Bookmarks",
    C.PI_IMPORT_NAME: "XEP-0402",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0048", "XEP-0060", "XEP-0163", "XEP-0045"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0402",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Bookmark chat rooms, and handle their joined state."""),
}

NS_BOOKMARKS2 = "urn:xmpp:bookmarks:1"
NS_BOOKMARKS2_COMPAT = f"{NS_BOOKMARKS2}#compat"


class Conference(BaseModel):
    """
    Model for conference data.
    """

    autojoin: bool = Field(
        False,
        description="Whether the client should automatically join the conference room on "
        "login.",
    )
    name: str | None = Field(
        None, description="A friendly name for the bookmark, specified by the user."
    )
    nick: str | None = Field(
        None, description="The user's preferred roomnick for the chatroom."
    )
    password: str | None = Field(
        None, description="A password used to access the chatroom."
    )
    extensions: list[DomishElementType] = Field(
        default_factory=list,
        description="A set of child elements (of potentially any namespace).",
    )

    def set_attributes(self, conference_elt: domish.Element) -> None:
        """Set <conference> element attributes from this instance's data."""
        if self.autojoin:
            conference_elt["autojoin"] = "true" if self.autojoin else "false"
        if self.name:
            conference_elt["name"] = self.name

    def set_children(self, conference_elt: domish.Element) -> None:
        """Set <conference> element children from this instance's data."""
        if self.nick:
            nick_elt = conference_elt.addElement((NS_BOOKMARKS2, "nick"))
            nick_elt.addContent(self.nick)
        if self.password:
            password_elt = conference_elt.addElement((NS_BOOKMARKS2, "password"))
            password_elt.addContent(self.password)
        for extension in self.extensions:
            conference_elt.addChild(extension)

    @classmethod
    def from_element(cls, conference_elt: domish.Element) -> Self:
        """
        Create a Conference instance from a <conference> element or its parent.

        @param conference_elt: The <conference> element or a parent element.
        @return: Conference instance.
        @raise exceptions.NotFound: If the <conference> element is not found.
        """
        if conference_elt.uri != NS_BOOKMARKS2 or conference_elt.name != "conference":
            child_conference_elt = next(
                conference_elt.elements(NS_BOOKMARKS2, "conference"), None
            )
            if child_conference_elt is None:
                raise exceptions.NotFound("<conference> element not found")
            else:
                conference_elt = child_conference_elt
        kwargs = {}
        if conference_elt.hasAttribute("autojoin"):
            kwargs["autojoin"] = conference_elt["autojoin"] == "true"
        if conference_elt.hasAttribute("name"):
            kwargs["name"] = conference_elt["name"]
        nick_elt = next(conference_elt.elements(NS_BOOKMARKS2, "nick"), None)
        if nick_elt:
            kwargs["nick"] = str(nick_elt)
        password_elt = next(conference_elt.elements(NS_BOOKMARKS2, "password"), None)
        if password_elt:
            kwargs["password"] = str(password_elt)
        kwargs["extensions"] = [
            child for child in conference_elt.elements() if child.uri != NS_BOOKMARKS2
        ]
        return cls(**kwargs)

    def to_element(self) -> domish.Element:
        """Build the <conference> element from this instance's data.

        @return: <conference> element.
        """
        conference_elt = domish.Element((NS_BOOKMARKS2, "conference"))
        self.set_attributes(conference_elt)
        self.set_children(conference_elt)
        return conference_elt


class Bookmarks(RootModel):
    root: dict[JIDType, Conference]

    def items(self):
        return self.root.items()

    def __dict__(self) -> dict[JIDType, Conference]:  # type: ignore
        return self.root

    def __iter__(self) -> Iterator[JIDType]:  # type: ignore
        return iter(self.root)

    def __getitem__(self, item):
        return self.root[item]

    @classmethod
    def from_elements(cls, items_elt: list[domish.Element]) -> Self:
        """Convert list of items to instance of Bookmarks.

        @param items_elt: list of <item> elements from boorkmarks node.
        """
        bookmarks = {}

        for item_elt in items_elt:
            try:
                bookmark_jid = jid.JID(item_elt["id"])
            except RuntimeError as e:
                log.warning(f"Can't parse bookmark jid {e}: {item_elt.toXml()}")
                continue
            try:
                conference = Conference.from_element(item_elt)
            except exceptions.NotFound:
                log.warning(f"Can't find conference data in bookmark: {item_elt}")
            else:
                bookmarks[bookmark_jid] = conference

        return cls(bookmarks)


class XEP_0402:
    namespace = NS_BOOKMARKS2

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._legacy = cast(XEP_0048, host.plugins["XEP-0048"])
        self._p = cast(XEP_0060, host.plugins["XEP-0060"])
        self._muc = cast(XEP_0045, host.plugins["XEP-0045"])
        host.bridge.add_method(
            "bookmark_get",
            ".plugin",
            in_sign="ss",
            out_sign="s",
            method=self._bookmark_get,
            async_=True,
        )
        host.bridge.add_method(
            "bookmarks_list",
            ".plugin",
            in_sign="ss",
            out_sign="s",
            method=self._bookmarks_list,
            async_=True,
        )
        host.bridge.add_method(
            "bookmark_remove",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._bookmark_remove,
            async_=True,
        )
        host.bridge.add_method(
            "bookmarks_set",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._bookmarks_set,
            async_=True,
        )
        host.plugins["XEP-0163"].add_pep_event(
            None, NS_BOOKMARKS2, self._on_bookmark_event
        )
        host.register_namespace("bookmarks2", NS_BOOKMARKS2)

    async def _on_bookmark_event(
        self, items_events: pubsub.ItemsEvent, profile: str
    ) -> None:
        client = self.host.get_client(profile)
        if items_events.sender != client.jid.userhostJID():
            log.warning(
                "Bookmark event Unexpectedly send for another account "
                f"({items_events.sender})."
            )
        else:
            for item_elt in items_events.items:
                try:
                    room_jid = jid.JID(item_elt["id"])
                except Exception as e:
                    log.warning(
                        f'Ignoring bookmark due to invalid JID in "id" ({e}): '
                        f"{item_elt.toXml()}"
                    )
                    continue
                try:
                    conference = Conference.from_element(item_elt)
                except exceptions.NotFound:
                    log.warning("Ignoring invalid bookmark element: {item_elt.toXml()}")
                except Exception:
                    log.exception("Can't parse bookmark item: {item_elt.toXml()}")
                else:
                    if conference.autojoin:
                        await self._muc.join(
                            client,
                            room_jid,
                            conference.nick,
                            {"password": conference.password},
                        )
                    else:
                        await self._muc.leave(client, room_jid)

    @utils.ensure_deferred
    async def _bookmark_get(self, bookmark_jid: str, profile: str) -> str:
        """List current boorkmarks.

        @param extra_s: Serialised extra.
            Reserved for future use.
        @param profile: Profile to use.
        """
        client = self.host.get_client(profile)
        conference = await self.get(client, jid.JID(bookmark_jid))
        return conference.model_dump_json(exclude_none=True)

    async def get(self, client: SatXMPPEntity, bookmark_jid: jid.JID) -> Conference:
        """Helper method to get a single bookmark.

        @param client: profile session.
        @bookmark_jid: JID of the boorkmark to get.
        @return: Conference instance.
        """
        pep_jid = client.jid.userhostJID()
        if await self.host.memory.disco.has_feature(
            client, NS_BOOKMARKS2_COMPAT, pep_jid
        ):
            items, __ = await self._p.get_items(
                client,
                client.jid.userhostJID(),
                NS_BOOKMARKS2,
                item_ids=[bookmark_jid.full()],
            )
            return Conference.from_element(items[0])
        else:
            # No compatibility layer, we use legacy bookmarks.
            bookmarks = await self.list(client)
            return bookmarks[bookmark_jid]

    def _bookmark_remove(self, bookmark_jid: str, profile: str) -> defer.Deferred[None]:
        d = defer.ensureDeferred(
            self.remove(self.host.get_client(profile), jid.JID(bookmark_jid))
        )
        return d

    async def remove(self, client: SatXMPPEntity, bookmark_jid: jid.JID) -> None:
        """Helper method to delete an existing bookmark.

        @param client: Profile session.
        @param bookmark_jid: Bookmark to delete.
        """
        pep_jid = client.jid.userhostJID()

        if await self.host.memory.disco.has_feature(
            client, NS_BOOKMARKS2_COMPAT, pep_jid
        ):
            await self._p.retract_items(
                client, client.jid.userhostJID(), NS_BOOKMARKS2, [bookmark_jid.full()]
            )
        else:
            log.debug(
                f"[{client.profile}] No compatibility layer found, we use legacy "
                "bookmarks."
            )
            await self._legacy.remove_bookmark(
                self._legacy.MUC_TYPE, bookmark_jid, "private", client.profile
            )

    @utils.ensure_deferred
    async def _bookmarks_list(self, extra_s: str, profile: str) -> str:
        """List current boorkmarks.

        @param extra_s: Serialised extra.
            Reserved for future use.
        @param profile: Profile to use.
        """
        client = self.host.get_client(profile)
        extra = data_format.deserialise(extra_s)
        bookmarks = await self.list(client, extra)
        return bookmarks.model_dump_json(exclude_none=True)

    async def list(self, client: SatXMPPEntity, extra: dict | None = None) -> Bookmarks:
        """List bookmarks.

        If there is a compatibility layer announced, Bookmarks2 will be used, otherwise
        legacy bookmarks will be used.
        @param client: Client session.
        @param extra: extra dat)
        @return: bookmarks.
        """
        pep_jid = client.jid.userhostJID()

        if await self.host.memory.disco.has_feature(
            client, NS_BOOKMARKS2_COMPAT, pep_jid
        ):
            items, __ = await self._p.get_items(
                client, client.jid.userhostJID(), NS_BOOKMARKS2
            )
            return Bookmarks.from_elements(items)
        else:
            # There is no compatibility layer on the PEP server, so we use legacy
            # bookmarks as recommended at
            # https://docs.modernxmpp.org/client/groupchat/#bookmarks
            log.debug(
                f"[{client.profile}] No compatibility layer found, we use legacy "
                "bookmarks."
            )
            legacy_data = await self._legacy.bookmarks_list(
                self._legacy.MUC_TYPE, "private", client.profile
            )
            private_bookmarks = legacy_data["private"]
            bookmarks_dict = {}
            for jid, bookmark_data in private_bookmarks.items():
                autojoin = C.bool(bookmark_data.get("autojoin", C.BOOL_FALSE))
                name = bookmark_data.get("name")
                nick = bookmark_data.get("nick")
                password = bookmark_data.get("password")
                conference = Conference(
                    autojoin=autojoin, name=name, nick=nick, password=password
                )
                bookmarks_dict[jid] = conference
            return Bookmarks(bookmarks_dict)

    @utils.ensure_deferred
    async def _bookmarks_set(self, bookmarks_raw: str, profile: str) -> None:
        """Add or update one or more bookmarks.

        @param bookmarks_raw: serialised bookmark.
            It must deserialise to a dict mapping from bookmark JID to Conference data.
        @param profile: Profile to use.
        """
        client = self.host.get_client(profile)
        bookmarks = Bookmarks.model_validate_json(bookmarks_raw)
        pep_jid = client.jid.userhostJID()

        if await self.host.memory.disco.has_feature(
            client, NS_BOOKMARKS2_COMPAT, pep_jid
        ):
            bookmark_items = []
            for bookmark_jid, conference in bookmarks.items():
                item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
                item_elt["id"] = bookmark_jid.full()
                item_elt.addChild(conference.to_element())
                bookmark_items.append(item_elt)

            await self._p.send_items(
                client,
                None,
                NS_BOOKMARKS2,
                bookmark_items,
                extra={
                    self._p.EXTRA_PUBLISH_OPTIONS: {
                        "pubsub#access_model": self._p.ACCESS_WHITELIST
                    },
                    self._p.EXTRA_AUTOCREATE: True,
                },
            )
        else:
            log.debug(
                f"[{client.profile}] No compatibility layer found, we use legacy "
                "bookmarks."
            )
            # XXX: We add every bookmark one by one, which is inefficient. The legacy
            # plugin likely implemented this way because end-users typically add bookmarks
            # individually. Nowadays, very few servers, if any, still implement XEP-0048
            # without the XEP-0402 compatibility layer, so it's not worth spending time to
            # improve the legacy XEP-0048 plugin.
            for bookmark_jid, conference in bookmarks.items():
                bookmark_data = {}
                if conference.autojoin:
                    bookmark_data["autojoin"] = C.BOOL_TRUE
                for attribute in ("name", "nick", "password"):
                    value = getattr(conference, attribute)
                    if value:
                        bookmark_data[attribute] = value
                await self._legacy.add_bookmark(
                    self._legacy.MUC_TYPE,
                    bookmark_jid,
                    bookmark_data,
                    storage_type="private",
                    profile_key=client.profile,
                )