diff libervia/backend/plugins/plugin_xep_0465.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0465.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0465.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,267 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for XEP-0465
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, List, Dict, Union
+
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber import error
+from twisted.words.xish import domish
+from zope.interface import implementer
+from wokkel import disco, iwokkel
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core import exceptions
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.tools import utils
+from libervia.backend.tools.common import data_format
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Pubsub Public Subscriptions",
+    C.PI_IMPORT_NAME: "XEP-0465",
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0465"],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0376"],
+    C.PI_MAIN: "XEP_0465",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub Public Subscriptions implementation"""),
+}
+
+NS_PPS = "urn:xmpp:pps:0"
+NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0"
+NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0"
+SUBSCRIBERS_NODE_PREFIX = f"{NS_PPS_SUBSCRIBERS}/"
+NOT_IMPLEMENTED_MSG = (
+    "The service at {service!s} doesn't seem to support Pubsub Public Subscriptions "
+    "(XEP-0465), please request support from your service administrator."
+)
+
+
+class XEP_0465:
+
+    def __init__(self, host):
+        log.info(_("Pubsub Public Subscriptions initialization"))
+        host.register_namespace("pps", NS_PPS)
+        self.host = host
+        host.bridge.add_method(
+            "ps_public_subscriptions_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._subscriptions,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_public_subscriptions_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._subscriptions,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_public_node_subscriptions_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._get_public_node_subscriptions,
+            async_=True,
+        )
+
+    def get_handler(self, client):
+        return XEP_0465_Handler()
+
+    @property
+    def subscriptions_node(self) -> str:
+        return NS_PPS_SUBSCRIPTIONS
+
+    @property
+    def subscribers_node_prefix(self) -> str:
+        return SUBSCRIBERS_NODE_PREFIX
+
+    def build_subscription_elt(self, node: str, service: jid.JID) -> domish.Element:
+        """Generate a <subscriptions> element
+
+        This is the element that a service returns on public subscriptions request
+        """
+        subscription_elt = domish.Element((NS_PPS, "subscription"))
+        subscription_elt["node"] = node
+        subscription_elt["service"] = service.full()
+        return subscription_elt
+
+    def build_subscriber_elt(self, subscriber: jid.JID) -> domish.Element:
+        """Generate a <subscriber> element
+
+        This is the element that a service returns on node public subscriptions request
+        """
+        subscriber_elt = domish.Element((NS_PPS, "subscriber"))
+        subscriber_elt["jid"] = subscriber.full()
+        return subscriber_elt
+
+    @utils.ensure_deferred
+    async def _subscriptions(
+        self,
+        service="",
+        nodeIdentifier="",
+        profile_key=C.PROF_KEY_NONE
+    ) -> str:
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        subs = await self.subscriptions(client, service, nodeIdentifier or None)
+        return data_format.serialise(subs)
+
+    async def subscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID] = None,
+        node: Optional[str] = None
+    ) -> List[Dict[str, Union[str, bool]]]:
+        """Retrieve public subscriptions from a service
+
+        @param service(jid.JID): PubSub service
+        @param nodeIdentifier(unicode, None): node to filter
+            None to get all subscriptions
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        try:
+            items, __ = await self.host.plugins["XEP-0060"].get_items(
+                client, service, NS_PPS_SUBSCRIPTIONS
+            )
+        except error.StanzaError as e:
+            if e.condition == "forbidden":
+                log.warning(NOT_IMPLEMENTED_MSG.format(service=service))
+                return []
+            else:
+                raise e
+        ret = []
+        for item in items:
+            try:
+                subscription_elt = next(item.elements(NS_PPS, "subscription"))
+            except StopIteration:
+                log.warning(f"no <subscription> element found: {item.toXml()}")
+                continue
+
+            try:
+                sub_dict = {
+                    "service": subscription_elt["service"],
+                    "node": subscription_elt["node"],
+                    "subscriber": service.full(),
+                    "state": subscription_elt.getAttribute("subscription", "subscribed"),
+                }
+            except KeyError:
+                log.warning(
+                    f"invalid <subscription> element: {subscription_elt.toXml()}"
+                )
+                continue
+            if node is not None and sub_dict["node"] != node:
+                # if not is specified, we filter out any other node
+                # FIXME: should node filtering be done by server?
+                continue
+            ret.append(sub_dict)
+        return ret
+
+    @utils.ensure_deferred
+    async def _get_public_node_subscriptions(
+        self,
+        service: str,
+        node: str,
+        profile_key: str
+    ) -> Dict[str, str]:
+        client = self.host.get_client(profile_key)
+        subs = await self.get_public_node_subscriptions(
+            client, jid.JID(service) if service else None, node
+        )
+        return {j.full(): a for j, a in subs.items()}
+
+    def get_public_subscribers_node(self, node: str) -> str:
+        """Return prefixed node to retrieve public subscribers"""
+        return f"{NS_PPS_SUBSCRIBERS}/{node}"
+
+    async def get_public_node_subscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        nodeIdentifier: str
+    ) -> Dict[jid.JID, str]:
+        """Retrieve public subscriptions to a node
+
+        @param nodeIdentifier(unicode): node to get subscriptions from
+        """
+        if not nodeIdentifier:
+            raise exceptions.DataError("node identifier can't be empty")
+
+        if service is None:
+            service = client.jid.userhostJID()
+
+        subscribers_node = self.get_public_subscribers_node(nodeIdentifier)
+
+        try:
+            items, __ = await self.host.plugins["XEP-0060"].get_items(
+                client, service, subscribers_node
+            )
+        except error.StanzaError as e:
+            if e.condition == "forbidden":
+                log.warning(NOT_IMPLEMENTED_MSG.format(service=service))
+                return {}
+            else:
+                raise e
+        ret = {}
+        for item in items:
+            try:
+                subscriber_elt = next(item.elements(NS_PPS, "subscriber"))
+            except StopIteration:
+                log.warning(f"no <subscriber> element found: {item.toXml()}")
+                continue
+
+            try:
+                ret[jid.JID(subscriber_elt["jid"])] = "subscribed"
+            except (KeyError, RuntimeError):
+                log.warning(
+                    f"invalid <subscriber> element: {subscriber_elt.toXml()}"
+                )
+                continue
+        return ret
+
+    def set_public_opt(self, options: Optional[dict] = None) -> dict:
+        """Set option to make a subscription public
+
+        @param options: dict where the option must be set
+            if None, a new dict will be created
+
+        @return: the options dict
+        """
+        if options is None:
+            options = {}
+        options[f'{{{NS_PPS}}}public'] = True
+        return options
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0465_Handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_PPS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []