Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0402.py @ 4341:e9971a4b0627
remove uses of twisted.internet.defer.returnValue
This function has been deprecated in Twisted 24.7.0.
author | Povilas Kanapickas <povilas@radix.lt> |
---|---|
date | Wed, 11 Dec 2024 01:17:09 +0200 |
parents | 554a87ae17a6 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle chat room bookmarks via PEP # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Iterator, Self, cast from pydantic import BaseModel, Field, RootModel from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from wokkel import pubsub from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.models.types import DomishElementType, JIDType from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 from libervia.backend.plugins.plugin_xep_0048 import XEP_0048 from libervia.backend.plugins.plugin_xep_0060 import XEP_0060 from libervia.backend.tools import utils from libervia.backend.tools.common import data_format log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "PEP Native Bookmarks", C.PI_IMPORT_NAME: "XEP-0402", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0048", "XEP-0060", "XEP-0163", "XEP-0045"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0402", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Bookmark chat rooms, and handle their joined state."""), } NS_BOOKMARKS2 = "urn:xmpp:bookmarks:1" NS_BOOKMARKS2_COMPAT = f"{NS_BOOKMARKS2}#compat" class Conference(BaseModel): """ Model for conference data. """ autojoin: bool = Field( False, description="Whether the client should automatically join the conference room on " "login.", ) name: str | None = Field( None, description="A friendly name for the bookmark, specified by the user." ) nick: str | None = Field( None, description="The user's preferred roomnick for the chatroom." ) password: str | None = Field( None, description="A password used to access the chatroom." ) extensions: list[DomishElementType] = Field( default_factory=list, description="A set of child elements (of potentially any namespace).", ) def set_attributes(self, conference_elt: domish.Element) -> None: """Set <conference> element attributes from this instance's data.""" if self.autojoin: conference_elt["autojoin"] = "true" if self.autojoin else "false" if self.name: conference_elt["name"] = self.name def set_children(self, conference_elt: domish.Element) -> None: """Set <conference> element children from this instance's data.""" if self.nick: nick_elt = conference_elt.addElement((NS_BOOKMARKS2, "nick")) nick_elt.addContent(self.nick) if self.password: password_elt = conference_elt.addElement((NS_BOOKMARKS2, "password")) password_elt.addContent(self.password) for extension in self.extensions: conference_elt.addChild(extension) @classmethod def from_element(cls, conference_elt: domish.Element) -> Self: """ Create a Conference instance from a <conference> element or its parent. @param conference_elt: The <conference> element or a parent element. @return: Conference instance. @raise exceptions.NotFound: If the <conference> element is not found. """ if conference_elt.uri != NS_BOOKMARKS2 or conference_elt.name != "conference": child_conference_elt = next( conference_elt.elements(NS_BOOKMARKS2, "conference"), None ) if child_conference_elt is None: raise exceptions.NotFound("<conference> element not found") else: conference_elt = child_conference_elt kwargs = {} if conference_elt.hasAttribute("autojoin"): kwargs["autojoin"] = conference_elt["autojoin"] == "true" if conference_elt.hasAttribute("name"): kwargs["name"] = conference_elt["name"] nick_elt = next(conference_elt.elements(NS_BOOKMARKS2, "nick"), None) if nick_elt: kwargs["nick"] = str(nick_elt) password_elt = next(conference_elt.elements(NS_BOOKMARKS2, "password"), None) if password_elt: kwargs["password"] = str(password_elt) kwargs["extensions"] = [ child for child in conference_elt.elements() if child.uri != NS_BOOKMARKS2 ] return cls(**kwargs) def to_element(self) -> domish.Element: """Build the <conference> element from this instance's data. @return: <conference> element. """ conference_elt = domish.Element((NS_BOOKMARKS2, "conference")) self.set_attributes(conference_elt) self.set_children(conference_elt) return conference_elt class Bookmarks(RootModel): root: dict[JIDType, Conference] def items(self): return self.root.items() def __dict__(self) -> dict[JIDType, Conference]: # type: ignore return self.root def __iter__(self) -> Iterator[JIDType]: # type: ignore return iter(self.root) def __getitem__(self, item): return self.root[item] @classmethod def from_elements(cls, items_elt: list[domish.Element]) -> Self: """Convert list of items to instance of Bookmarks. @param items_elt: list of <item> elements from boorkmarks node. """ bookmarks = {} for item_elt in items_elt: try: bookmark_jid = jid.JID(item_elt["id"]) except RuntimeError as e: log.warning(f"Can't parse bookmark jid {e}: {item_elt.toXml()}") continue try: conference = Conference.from_element(item_elt) except exceptions.NotFound: log.warning(f"Can't find conference data in bookmark: {item_elt}") else: bookmarks[bookmark_jid] = conference return cls(bookmarks) class XEP_0402: namespace = NS_BOOKMARKS2 def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._legacy = cast(XEP_0048, host.plugins["XEP-0048"]) self._p = cast(XEP_0060, host.plugins["XEP-0060"]) self._muc = cast(XEP_0045, host.plugins["XEP-0045"]) host.bridge.add_method( "bookmark_get", ".plugin", in_sign="ss", out_sign="s", method=self._bookmark_get, async_=True, ) host.bridge.add_method( "bookmarks_list", ".plugin", in_sign="ss", out_sign="s", method=self._bookmarks_list, async_=True, ) host.bridge.add_method( "bookmark_remove", ".plugin", in_sign="ss", out_sign="", method=self._bookmark_remove, async_=True, ) host.bridge.add_method( "bookmarks_set", ".plugin", in_sign="ss", out_sign="", method=self._bookmarks_set, async_=True, ) host.plugins["XEP-0163"].add_pep_event( None, NS_BOOKMARKS2, self._on_bookmark_event ) host.register_namespace("bookmarks2", NS_BOOKMARKS2) async def _on_bookmark_event( self, items_events: pubsub.ItemsEvent, profile: str ) -> None: client = self.host.get_client(profile) if items_events.sender != client.jid.userhostJID(): log.warning( "Bookmark event Unexpectedly send for another account " f"({items_events.sender})." ) else: for item_elt in items_events.items: try: room_jid = jid.JID(item_elt["id"]) except Exception as e: log.warning( f'Ignoring bookmark due to invalid JID in "id" ({e}): ' f"{item_elt.toXml()}" ) continue try: conference = Conference.from_element(item_elt) except exceptions.NotFound: log.warning("Ignoring invalid bookmark element: {item_elt.toXml()}") except Exception: log.exception("Can't parse bookmark item: {item_elt.toXml()}") else: if conference.autojoin: await self._muc.join( client, room_jid, conference.nick, {"password": conference.password}, ) else: await self._muc.leave(client, room_jid) @utils.ensure_deferred async def _bookmark_get(self, bookmark_jid: str, profile: str) -> str: """List current boorkmarks. @param extra_s: Serialised extra. Reserved for future use. @param profile: Profile to use. """ client = self.host.get_client(profile) conference = await self.get(client, jid.JID(bookmark_jid)) return conference.model_dump_json(exclude_none=True) async def get(self, client: SatXMPPEntity, bookmark_jid: jid.JID) -> Conference: """Helper method to get a single bookmark. @param client: profile session. @bookmark_jid: JID of the boorkmark to get. @return: Conference instance. """ pep_jid = client.jid.userhostJID() if await self.host.memory.disco.has_feature( client, NS_BOOKMARKS2_COMPAT, pep_jid ): items, __ = await self._p.get_items( client, client.jid.userhostJID(), NS_BOOKMARKS2, item_ids=[bookmark_jid.full()], ) return Conference.from_element(items[0]) else: # No compatibility layer, we use legacy bookmarks. bookmarks = await self.list(client) return bookmarks[bookmark_jid] def _bookmark_remove(self, bookmark_jid: str, profile: str) -> defer.Deferred[None]: d = defer.ensureDeferred( self.remove(self.host.get_client(profile), jid.JID(bookmark_jid)) ) return d async def remove(self, client: SatXMPPEntity, bookmark_jid: jid.JID) -> None: """Helper method to delete an existing bookmark. @param client: Profile session. @param bookmark_jid: Bookmark to delete. """ pep_jid = client.jid.userhostJID() if await self.host.memory.disco.has_feature( client, NS_BOOKMARKS2_COMPAT, pep_jid ): await self._p.retract_items( client, client.jid.userhostJID(), NS_BOOKMARKS2, [bookmark_jid.full()] ) else: log.debug( f"[{client.profile}] No compatibility layer found, we use legacy " "bookmarks." ) await self._legacy.remove_bookmark( self._legacy.MUC_TYPE, bookmark_jid, "private", client.profile ) @utils.ensure_deferred async def _bookmarks_list(self, extra_s: str, profile: str) -> str: """List current boorkmarks. @param extra_s: Serialised extra. Reserved for future use. @param profile: Profile to use. """ client = self.host.get_client(profile) extra = data_format.deserialise(extra_s) bookmarks = await self.list(client, extra) return bookmarks.model_dump_json(exclude_none=True) async def list(self, client: SatXMPPEntity, extra: dict | None = None) -> Bookmarks: """List bookmarks. If there is a compatibility layer announced, Bookmarks2 will be used, otherwise legacy bookmarks will be used. @param client: Client session. @param extra: extra dat) @return: bookmarks. """ pep_jid = client.jid.userhostJID() if await self.host.memory.disco.has_feature( client, NS_BOOKMARKS2_COMPAT, pep_jid ): items, __ = await self._p.get_items( client, client.jid.userhostJID(), NS_BOOKMARKS2 ) return Bookmarks.from_elements(items) else: # There is no compatibility layer on the PEP server, so we use legacy # bookmarks as recommended at # https://docs.modernxmpp.org/client/groupchat/#bookmarks log.debug( f"[{client.profile}] No compatibility layer found, we use legacy " "bookmarks." ) legacy_data = await self._legacy.bookmarks_list( self._legacy.MUC_TYPE, "private", client.profile ) private_bookmarks = legacy_data["private"] bookmarks_dict = {} for jid, bookmark_data in private_bookmarks.items(): autojoin = C.bool(bookmark_data.get("autojoin", C.BOOL_FALSE)) name = bookmark_data.get("name") nick = bookmark_data.get("nick") password = bookmark_data.get("password") conference = Conference( autojoin=autojoin, name=name, nick=nick, password=password ) bookmarks_dict[jid] = conference return Bookmarks(bookmarks_dict) @utils.ensure_deferred async def _bookmarks_set(self, bookmarks_raw: str, profile: str) -> None: """Add or update one or more bookmarks. @param bookmarks_raw: serialised bookmark. It must deserialise to a dict mapping from bookmark JID to Conference data. @param profile: Profile to use. """ client = self.host.get_client(profile) bookmarks = Bookmarks.model_validate_json(bookmarks_raw) pep_jid = client.jid.userhostJID() if await self.host.memory.disco.has_feature( client, NS_BOOKMARKS2_COMPAT, pep_jid ): bookmark_items = [] for bookmark_jid, conference in bookmarks.items(): item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) item_elt["id"] = bookmark_jid.full() item_elt.addChild(conference.to_element()) bookmark_items.append(item_elt) await self._p.send_items( client, None, NS_BOOKMARKS2, bookmark_items, extra={ self._p.EXTRA_PUBLISH_OPTIONS: { "pubsub#access_model": self._p.ACCESS_WHITELIST }, self._p.EXTRA_AUTOCREATE: True, }, ) else: log.debug( f"[{client.profile}] No compatibility layer found, we use legacy " "bookmarks." ) # XXX: We add every bookmark one by one, which is inefficient. The legacy # plugin likely implemented this way because end-users typically add bookmarks # individually. Nowadays, very few servers, if any, still implement XEP-0048 # without the XEP-0402 compatibility layer, so it's not worth spending time to # improve the legacy XEP-0048 plugin. for bookmark_jid, conference in bookmarks.items(): bookmark_data = {} if conference.autojoin: bookmark_data["autojoin"] = C.BOOL_TRUE for attribute in ("name", "nick", "password"): value = getattr(conference, attribute) if value: bookmark_data[attribute] = value await self._legacy.add_bookmark( self._legacy.MUC_TYPE, bookmark_jid, bookmark_data, storage_type="private", profile_key=client.profile, )