diff sat_pubsub/pam.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents
children 23a51b139024
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/pam.py	Wed May 11 13:39:08 2022 +0200
@@ -0,0 +1,197 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2015-2022 Jérôme Poisson
+
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"This module implements XEP-0376 (Pubsub Account Management)"
+
+from typing import Optional
+
+from twisted.internet import defer
+from twisted.python import log
+from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel, pubsub, data_form
+from wokkel.iwokkel import IPubSubService
+from zope.interface import implementer
+
+from sat_pubsub import error
+
+NS_PAM = "urn:xmpp:pam:0"
+PAM_SUB_XPATH = (
+    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
+)
+PAM_UNSUB_XPATH = (
+    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
+)
+PAM_SUBSCRIPTIONS_XPATH = (
+    f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]'
+)
+
+
+@implementer(iwokkel.IDisco)
+class PAMHandler(disco.DiscoClientProtocol):
+
+    def __init__(self, service_jid):
+        super(PAMHandler, self).__init__()
+        self.backend = None
+
+    def connectionInitialized(self):
+        for handler in self.parent.handlers:
+            if IPubSubService.providedBy(handler):
+                self._pubsub_service = handler
+                break
+        self.backend = self.parent.parent.getServiceNamed('backend')
+        self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe)
+        self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe)
+        self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions)
+
+    def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]:
+        """Get JID of sender if it's a user from our server
+
+        If it's a user from an external server, None is returned and a message is log
+        """
+        from_jid = jid.JID(iq_elt["from"])
+        if not self.backend.isFromServer(from_jid):
+            log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}")
+        else:
+            return jid.JID(iq_elt["from"])
+
+    def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt):
+        destinee_jid = jid.JID(iq_req_elt["from"])
+        sender_jid = jid.JID(iq_req_elt["to"])
+        message_elt = domish.Element((None, "message"))
+        message_elt["to"] = destinee_jid.userhost()
+        message_elt["from"] = destinee_jid.userhost()
+        # XXX: we explicitely store the notification to be sure that all clients get it
+        message_elt.addElement(("urn:xmpp:hints", "store"))
+        notify_elt = message_elt.addElement((NS_PAM, "notify"))
+        notify_elt["service"] = sender_jid.full()
+        notify_elt.addChild(iq_result_elt.pubsub.subscription)
+        self.backend.privilege.sendMessage(message_elt)
+        pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result')
+        self.xmlstream.send(pam_iq_result_elt)
+
+    async def onSubRequest(self, from_jid, iq_elt, subscribe=True):
+        try:
+            service_jid = jid.JID(iq_elt.pam.getAttribute("jid"))
+        except RuntimeError:
+            log.msg(
+                f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}'
+            )
+            return
+        iq_elt.handled = True
+        new_iq_elt = domish.Element((None, "iq"))
+        new_iq_elt["from"] = from_jid.userhost()
+        new_iq_elt["to"] = service_jid.full()
+        new_iq_elt.addUniqueId()
+        new_iq_elt["type"] = "set"
+        new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub"))
+        new_pubsub_elt.addChild(
+            iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe
+        )
+        try:
+            options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options"))
+        except StopIteration:
+            options_elt = None
+        else:
+            new_pubsub_elt.addChild(options_elt)
+
+        if self.backend.isFromServer(service_jid):
+            # the request is handled locally
+            new_iq_elt.delegated = True
+            self.backend.delegation.registerSendHook(
+                new_iq_elt, self.onSubscribeResult, iq_elt
+            )
+            self.xmlstream.dispatch(new_iq_elt)
+        else:
+            # this is a (un)subscribe request to an external server
+            sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt)
+            if sub_result_elt["type"] == "result":
+                if subscribe:
+                    node = new_iq_elt.pubsub.subscribe["node"]
+                    state = sub_result_elt.pubsub.subscription.getAttribute(
+                        "subscription", "subscribed"
+                    )
+                    public = False
+                    if options_elt is not None:
+                        options_form = data_form.findForm(
+                            options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS
+                        )
+                        if options_form is not None:
+                            public = options_form.get(f"{{{const.NS_PPS}}}public", False)
+                    await self.backend.storage.addExternalSubscription(
+                        from_jid.userhostJID(),
+                        service_jid,
+                        node,
+                        state,
+                        public
+                    )
+                else:
+                    node = new_iq_elt.pubsub.unsubscribe["node"]
+                    try:
+                        await self.backend.storage.removeExternalSubscription(
+                            from_jid.userhostJID(),
+                            service_jid,
+                            node,
+                        )
+                    except error.NotSubscribed:
+                        pass
+            self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt)
+
+    def _onSubscribe(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt))
+
+    def _onUnsubscribe(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False))
+
+    async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None:
+        iq_elt.handled = True
+        try:
+            subs = await self.backend.storage.getAllSubscriptions(from_jid)
+        except Exception as e:
+            error_elt = jabber_error.StanzaError(
+                "internal-server-error",
+                text=str(e)
+            ).toResponse(iq_elt)
+            self.xmlstream.send(error_elt)
+        else:
+            result_elt = xmlstream.toResponse(iq_elt, "result")
+            subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions"))
+            for sub in subs:
+                self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub)
+            self.xmlstream.send(result_elt)
+
+    def _onSubscriptions(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt))
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
+        return [disco.DiscoFeature(NS_PAM)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
+        return []