Mercurial > libervia-pubsub
diff sat_pubsub/pam.py @ 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | |
children | 23a51b139024 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/pam.py Wed May 11 13:39:08 2022 +0200 @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2015-2022 Jérôme Poisson + + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"This module implements XEP-0376 (Pubsub Account Management)" + +from typing import Optional + +from twisted.internet import defer +from twisted.python import log +from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error +from twisted.words.xish import domish +from wokkel import disco, iwokkel, pubsub, data_form +from wokkel.iwokkel import IPubSubService +from zope.interface import implementer + +from sat_pubsub import error + +NS_PAM = "urn:xmpp:pam:0" +PAM_SUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_UNSUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_SUBSCRIPTIONS_XPATH = ( + f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]' +) + + +@implementer(iwokkel.IDisco) +class PAMHandler(disco.DiscoClientProtocol): + + def __init__(self, service_jid): + super(PAMHandler, self).__init__() + self.backend = None + + def connectionInitialized(self): + for handler in self.parent.handlers: + if IPubSubService.providedBy(handler): + self._pubsub_service = handler + break + self.backend = self.parent.parent.getServiceNamed('backend') + self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe) + self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe) + self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions) + + def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]: + """Get JID of sender if it's a user from our server + + If it's a user from an external server, None is returned and a message is log + """ + from_jid = jid.JID(iq_elt["from"]) + if not self.backend.isFromServer(from_jid): + log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}") + else: + return jid.JID(iq_elt["from"]) + + def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt): + destinee_jid = jid.JID(iq_req_elt["from"]) + sender_jid = jid.JID(iq_req_elt["to"]) + message_elt = domish.Element((None, "message")) + message_elt["to"] = destinee_jid.userhost() + message_elt["from"] = destinee_jid.userhost() + # XXX: we explicitely store the notification to be sure that all clients get it + message_elt.addElement(("urn:xmpp:hints", "store")) + notify_elt = message_elt.addElement((NS_PAM, "notify")) + notify_elt["service"] = sender_jid.full() + notify_elt.addChild(iq_result_elt.pubsub.subscription) + self.backend.privilege.sendMessage(message_elt) + pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result') + self.xmlstream.send(pam_iq_result_elt) + + async def onSubRequest(self, from_jid, iq_elt, subscribe=True): + try: + service_jid = jid.JID(iq_elt.pam.getAttribute("jid")) + except RuntimeError: + log.msg( + f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}' + ) + return + iq_elt.handled = True + new_iq_elt = domish.Element((None, "iq")) + new_iq_elt["from"] = from_jid.userhost() + new_iq_elt["to"] = service_jid.full() + new_iq_elt.addUniqueId() + new_iq_elt["type"] = "set" + new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub")) + new_pubsub_elt.addChild( + iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe + ) + try: + options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options")) + except StopIteration: + options_elt = None + else: + new_pubsub_elt.addChild(options_elt) + + if self.backend.isFromServer(service_jid): + # the request is handled locally + new_iq_elt.delegated = True + self.backend.delegation.registerSendHook( + new_iq_elt, self.onSubscribeResult, iq_elt + ) + self.xmlstream.dispatch(new_iq_elt) + else: + # this is a (un)subscribe request to an external server + sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt) + if sub_result_elt["type"] == "result": + if subscribe: + node = new_iq_elt.pubsub.subscribe["node"] + state = sub_result_elt.pubsub.subscription.getAttribute( + "subscription", "subscribed" + ) + public = False + if options_elt is not None: + options_form = data_form.findForm( + options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS + ) + if options_form is not None: + public = options_form.get(f"{{{const.NS_PPS}}}public", False) + await self.backend.storage.addExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + state, + public + ) + else: + node = new_iq_elt.pubsub.unsubscribe["node"] + try: + await self.backend.storage.removeExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + ) + except error.NotSubscribed: + pass + self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt) + + def _onSubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt)) + + def _onUnsubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False)) + + async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None: + iq_elt.handled = True + try: + subs = await self.backend.storage.getAllSubscriptions(from_jid) + except Exception as e: + error_elt = jabber_error.StanzaError( + "internal-server-error", + text=str(e) + ).toResponse(iq_elt) + self.xmlstream.send(error_elt) + else: + result_elt = xmlstream.toResponse(iq_elt, "result") + subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions")) + for sub in subs: + self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub) + self.xmlstream.send(result_elt) + + def _onSubscriptions(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt)) + + def getDiscoInfo(self, requestor, service, nodeIdentifier=''): + return [disco.DiscoFeature(NS_PAM)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=''): + return []