view sat_pubsub/pam.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents
children 23a51b139024
line wrap: on
line source

#!/usr/bin/env python3
#
# Copyright (c) 2015-2022 Jérôme Poisson


# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"This module implements XEP-0376 (Pubsub Account Management)"

from typing import Optional

from twisted.internet import defer
from twisted.python import log
from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error
from twisted.words.xish import domish
from wokkel import disco, iwokkel, pubsub, data_form
from wokkel.iwokkel import IPubSubService
from zope.interface import implementer

from sat_pubsub import error

NS_PAM = "urn:xmpp:pam:0"
PAM_SUB_XPATH = (
    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
)
PAM_UNSUB_XPATH = (
    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
)
PAM_SUBSCRIPTIONS_XPATH = (
    f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]'
)


@implementer(iwokkel.IDisco)
class PAMHandler(disco.DiscoClientProtocol):

    def __init__(self, service_jid):
        super(PAMHandler, self).__init__()
        self.backend = None

    def connectionInitialized(self):
        for handler in self.parent.handlers:
            if IPubSubService.providedBy(handler):
                self._pubsub_service = handler
                break
        self.backend = self.parent.parent.getServiceNamed('backend')
        self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe)
        self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe)
        self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions)

    def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]:
        """Get JID of sender if it's a user from our server

        If it's a user from an external server, None is returned and a message is log
        """
        from_jid = jid.JID(iq_elt["from"])
        if not self.backend.isFromServer(from_jid):
            log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}")
        else:
            return jid.JID(iq_elt["from"])

    def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt):
        destinee_jid = jid.JID(iq_req_elt["from"])
        sender_jid = jid.JID(iq_req_elt["to"])
        message_elt = domish.Element((None, "message"))
        message_elt["to"] = destinee_jid.userhost()
        message_elt["from"] = destinee_jid.userhost()
        # XXX: we explicitely store the notification to be sure that all clients get it
        message_elt.addElement(("urn:xmpp:hints", "store"))
        notify_elt = message_elt.addElement((NS_PAM, "notify"))
        notify_elt["service"] = sender_jid.full()
        notify_elt.addChild(iq_result_elt.pubsub.subscription)
        self.backend.privilege.sendMessage(message_elt)
        pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result')
        self.xmlstream.send(pam_iq_result_elt)

    async def onSubRequest(self, from_jid, iq_elt, subscribe=True):
        try:
            service_jid = jid.JID(iq_elt.pam.getAttribute("jid"))
        except RuntimeError:
            log.msg(
                f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}'
            )
            return
        iq_elt.handled = True
        new_iq_elt = domish.Element((None, "iq"))
        new_iq_elt["from"] = from_jid.userhost()
        new_iq_elt["to"] = service_jid.full()
        new_iq_elt.addUniqueId()
        new_iq_elt["type"] = "set"
        new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub"))
        new_pubsub_elt.addChild(
            iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe
        )
        try:
            options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options"))
        except StopIteration:
            options_elt = None
        else:
            new_pubsub_elt.addChild(options_elt)

        if self.backend.isFromServer(service_jid):
            # the request is handled locally
            new_iq_elt.delegated = True
            self.backend.delegation.registerSendHook(
                new_iq_elt, self.onSubscribeResult, iq_elt
            )
            self.xmlstream.dispatch(new_iq_elt)
        else:
            # this is a (un)subscribe request to an external server
            sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt)
            if sub_result_elt["type"] == "result":
                if subscribe:
                    node = new_iq_elt.pubsub.subscribe["node"]
                    state = sub_result_elt.pubsub.subscription.getAttribute(
                        "subscription", "subscribed"
                    )
                    public = False
                    if options_elt is not None:
                        options_form = data_form.findForm(
                            options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS
                        )
                        if options_form is not None:
                            public = options_form.get(f"{{{const.NS_PPS}}}public", False)
                    await self.backend.storage.addExternalSubscription(
                        from_jid.userhostJID(),
                        service_jid,
                        node,
                        state,
                        public
                    )
                else:
                    node = new_iq_elt.pubsub.unsubscribe["node"]
                    try:
                        await self.backend.storage.removeExternalSubscription(
                            from_jid.userhostJID(),
                            service_jid,
                            node,
                        )
                    except error.NotSubscribed:
                        pass
            self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt)

    def _onSubscribe(self, iq_elt: domish.Element) -> None:
        if not iq_elt.delegated:
            return
        from_jid = self.getServerUser(iq_elt)
        if from_jid is not None:
            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt))

    def _onUnsubscribe(self, iq_elt: domish.Element) -> None:
        if not iq_elt.delegated:
            return
        from_jid = self.getServerUser(iq_elt)
        if from_jid is not None:
            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False))

    async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None:
        iq_elt.handled = True
        try:
            subs = await self.backend.storage.getAllSubscriptions(from_jid)
        except Exception as e:
            error_elt = jabber_error.StanzaError(
                "internal-server-error",
                text=str(e)
            ).toResponse(iq_elt)
            self.xmlstream.send(error_elt)
        else:
            result_elt = xmlstream.toResponse(iq_elt, "result")
            subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions"))
            for sub in subs:
                self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub)
            self.xmlstream.send(result_elt)

    def _onSubscriptions(self, iq_elt: domish.Element) -> None:
        if not iq_elt.delegated:
            return
        from_jid = self.getServerUser(iq_elt)
        if from_jid is not None:
            defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt))

    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
        return [disco.DiscoFeature(NS_PAM)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
        return []