diff sat/plugins/plugin_xep_0465.py @ 3758:b7cef1b24f83

plugins XEP-0060, XEP-0376, XEP-0465, CLI: PAM + PSS implementation: - update psSubscriptionsGet to use serialised return value - implement XEP-0376 Pubsub Account Management - implement XEP-0465 Public Pubsub Subscriptions - CLI `pubsub` commands updated accordingly, and added `--public` flags to `subscribe`, `Subscriptions` and `node Subscriptions get` ⚠ `XEP-0465` is speculative, the XEP has been accepted by council but not published yet. As is should be the next one, and current latest one is `XEP-0464`, `XEP-0465` has been anticipated. rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 18:38:05 +0200
parents
children 56e5b18f4d06
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0465.py	Fri May 13 18:38:05 2022 +0200
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for XEP-0465
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, List, Dict, Union
+
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+from zope.interface import implementer
+from wokkel import disco, iwokkel
+
+from sat.core.constants import Const as C
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.core import exceptions
+from sat.core.core_types import SatXMPPEntity
+from sat.tools import utils
+from sat.tools.common import data_format
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Pubsub Public Subscriptions",
+    C.PI_IMPORT_NAME: "XEP-0465",
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0465"],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0376"],
+    C.PI_MAIN: "XEP_0465",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub Public Subscriptions implementation"""),
+}
+
+NS_PPS = "urn:xmpp:pps:0"
+NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0"
+NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0"
+SUBSCRIBERS_NODE_PREFIX = f"{NS_PPS_SUBSCRIBERS}/"
+
+
+class XEP_0465:
+
+    def __init__(self, host):
+        log.info(_("Pubsub Public Subscriptions initialization"))
+        host.registerNamespace("pps", NS_PPS)
+        self.host = host
+        host.bridge.addMethod(
+            "psPublicSubscriptionsGet",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._subscriptions,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psPublicSubscriptionsGet",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._subscriptions,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psPublicNodeSubscriptionsGet",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._getPublicNodeSubscriptions,
+            async_=True,
+        )
+
+    def getHandler(self, client):
+        return XEP_0465_Handler()
+
+    @property
+    def subscriptions_node(self) -> str:
+        return NS_PPS_SUBSCRIPTIONS
+
+    @property
+    def subscribers_node_prefix(self) -> str:
+        return SUBSCRIBERS_NODE_PREFIX
+
+    def buildSubscriptionElt(self, node: str, service: jid.JID) -> domish.Element:
+        """Generate a <subscriptions> element
+
+        This is the element that a service returns on public subscriptions request
+        """
+        subscription_elt = domish.Element((NS_PPS, "subscription"))
+        subscription_elt["node"] = node
+        subscription_elt["service"] = service.full()
+        return subscription_elt
+
+    def buildSubscriberElt(self, subscriber: jid.JID) -> domish.Element:
+        """Generate a <subscriber> element
+
+        This is the element that a service returns on node public subscriptions request
+        """
+        subscriber_elt = domish.Element((NS_PPS, "subscriber"))
+        subscriber_elt["jid"] = subscriber.full()
+        return subscriber_elt
+
+    @utils.ensure_deferred
+    async def _subscriptions(
+        self,
+        service="",
+        nodeIdentifier="",
+        profile_key=C.PROF_KEY_NONE
+    ) -> str:
+        client = self.host.getClient(profile_key)
+        service = None if not service else jid.JID(service)
+        subs = await self.subscriptions(client, service, nodeIdentifier or None)
+        return data_format.serialise(subs)
+
+    async def subscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID] = None,
+        node: Optional[str] = None
+    ) -> List[Dict[str, Union[str, bool]]]:
+        """Retrieve public subscriptions from a service
+
+        @param service(jid.JID): PubSub service
+        @param nodeIdentifier(unicode, None): node to filter
+            None to get all subscriptions
+        """
+        if service is None:
+            service = client.jid.userhostJID()
+        items, __ = await self.host.plugins["XEP-0060"].getItems(
+            client, service, NS_PPS_SUBSCRIPTIONS
+        )
+        ret = []
+        for item in items:
+            try:
+                subscription_elt = next(item.elements(NS_PPS, "subscription"))
+            except StopIteration:
+                log.warning(f"no <subscription> element found: {item.toXml()}")
+                continue
+
+            try:
+                sub_dict = {
+                    "service": subscription_elt["service"],
+                    "node": subscription_elt["node"],
+                    "subscriber": service.full(),
+                    "state": subscription_elt.getAttribute("subscription", "subscribed"),
+                }
+            except KeyError:
+                log.warning(
+                    f"invalid <subscription> element: {subscription_elt.toXml()}"
+                )
+                continue
+            if node is not None and sub_dict["node"] != node:
+                # if not is specified, we filter out any other node
+                # FIXME: should node filtering be done by server?
+                continue
+            ret.append(sub_dict)
+        return ret
+
+    @utils.ensure_deferred
+    async def _getPublicNodeSubscriptions(
+        self,
+        service: str,
+        node: str,
+        profile_key: str
+    ) -> Dict[str, str]:
+        client = self.host.getClient(profile_key)
+        subs = await self.getPublicNodeSubscriptions(
+            client, jid.JID(service) if service else None, node
+        )
+        return {j.full(): a for j, a in subs.items()}
+
+    def getPublicSubscribersNode(self, node: str) -> str:
+        """Return prefixed node to retrieve public subscribers"""
+        return f"{NS_PPS_SUBSCRIBERS}/{node}"
+
+    async def getPublicNodeSubscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        nodeIdentifier: str
+    ) -> Dict[jid.JID, str]:
+        """Retrieve public subscriptions to a node
+
+        @param nodeIdentifier(unicode): node to get subscriptions from
+        """
+        if not nodeIdentifier:
+            raise exceptions.DataError("node identifier can't be empty")
+
+        if service is None:
+            service = client.jid.userhostJID()
+
+        subscribers_node = self.getPublicSubscribersNode(nodeIdentifier)
+
+        items, __ = await self.host.plugins["XEP-0060"].getItems(
+            client, service, subscribers_node
+        )
+        ret = {}
+        for item in items:
+            try:
+                subscriber_elt = next(item.elements(NS_PPS, "subscriber"))
+            except StopIteration:
+                log.warning(f"no <subscriber> element found: {item.toXml()}")
+                continue
+
+            try:
+                ret[jid.JID(subscriber_elt["jid"])] = "subscribed"
+            except (KeyError, RuntimeError):
+                log.warning(
+                    f"invalid <subscriber> element: {subscriber_elt.toXml()}"
+                )
+                continue
+        return ret
+
+    def setPublicOpt(self, options: Optional[dict] = None) -> dict:
+        """Set option to make a subscription public
+
+        @param options: dict where the option must be set
+            if None, a new dict will be created
+
+        @return: the options dict
+        """
+        if options is None:
+            options = {}
+        options[f'{{{NS_PPS}}}public'] = True
+        return options
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0465_Handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_PPS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []