Mercurial > libervia-pubsub
view sat_pubsub/pam.py @ 480:23a51b139024
pam: handle errors in onSubscribeResult
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 30 May 2022 16:37:14 +0200 |
parents | b544109ab4c4 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # # Copyright (c) 2015-2022 Jérôme Poisson # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. "This module implements XEP-0376 (Pubsub Account Management)" from typing import Optional from twisted.internet import defer from twisted.python import log from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error from twisted.words.xish import domish from wokkel import disco, iwokkel, pubsub, data_form from wokkel.iwokkel import IPubSubService from zope.interface import implementer from sat_pubsub import error NS_PAM = "urn:xmpp:pam:0" PAM_SUB_XPATH = ( f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]' ) PAM_UNSUB_XPATH = ( f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]' ) PAM_SUBSCRIPTIONS_XPATH = ( f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]' ) @implementer(iwokkel.IDisco) class PAMHandler(disco.DiscoClientProtocol): def __init__(self, service_jid): super(PAMHandler, self).__init__() self.backend = None def connectionInitialized(self): for handler in self.parent.handlers: if IPubSubService.providedBy(handler): self._pubsub_service = handler break self.backend = self.parent.parent.getServiceNamed('backend') self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe) self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe) self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions) def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]: """Get JID of sender if it's a user from our server If it's a user from an external server, None is returned and a message is log """ from_jid = jid.JID(iq_elt["from"]) if not self.backend.isFromServer(from_jid): log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}") else: return jid.JID(iq_elt["from"]) def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt): subscription_elt = iq_result_elt.pubsub.subscription if subscription_elt is not None: destinee_jid = jid.JID(iq_req_elt["from"]) sender_jid = jid.JID(iq_req_elt["to"]) message_elt = domish.Element((None, "message")) message_elt["to"] = destinee_jid.userhost() message_elt["from"] = destinee_jid.userhost() # XXX: we explicitely store the notification to be sure that all clients get it message_elt.addElement(("urn:xmpp:hints", "store")) notify_elt = message_elt.addElement((NS_PAM, "notify")) notify_elt["service"] = sender_jid.full() notify_elt.addChild(subscription_elt) self.backend.privilege.sendMessage(message_elt) pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result') else: # no <subscription> element, this must be an error error_elt = iq_result_elt.error if error_elt is None: log.msg(f"Invalid reply received: {iq_result_elt.toXml()}") error_elt = jabber_error.StanzaError( "service-unavailable", "received invalid reply from external pubsub service" ).getElement() pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'error') pam_iq_result_elt.addChild(error_elt) self.xmlstream.send(pam_iq_result_elt) async def onSubRequest(self, from_jid, iq_elt, subscribe=True): try: service_jid = jid.JID(iq_elt.pam.getAttribute("jid")) except RuntimeError: log.msg( f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}' ) return iq_elt.handled = True new_iq_elt = domish.Element((None, "iq")) new_iq_elt["from"] = from_jid.userhost() new_iq_elt["to"] = service_jid.full() new_iq_elt.addUniqueId() new_iq_elt["type"] = "set" new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub")) new_pubsub_elt.addChild( iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe ) try: options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options")) except StopIteration: options_elt = None else: new_pubsub_elt.addChild(options_elt) if self.backend.isFromServer(service_jid): # the request is handled locally new_iq_elt.delegated = True self.backend.delegation.registerSendHook( new_iq_elt, self.onSubscribeResult, iq_elt ) self.xmlstream.dispatch(new_iq_elt) else: # this is a (un)subscribe request to an external server sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt) if sub_result_elt["type"] == "result": if subscribe: node = new_iq_elt.pubsub.subscribe["node"] state = sub_result_elt.pubsub.subscription.getAttribute( "subscription", "subscribed" ) public = False if options_elt is not None: options_form = data_form.findForm( options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS ) if options_form is not None: public = options_form.get(f"{{{const.NS_PPS}}}public", False) await self.backend.storage.addExternalSubscription( from_jid.userhostJID(), service_jid, node, state, public ) else: node = new_iq_elt.pubsub.unsubscribe["node"] try: await self.backend.storage.removeExternalSubscription( from_jid.userhostJID(), service_jid, node, ) except error.NotSubscribed: pass self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt) def _onSubscribe(self, iq_elt: domish.Element) -> None: if not iq_elt.delegated: return from_jid = self.getServerUser(iq_elt) if from_jid is not None: defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt)) def _onUnsubscribe(self, iq_elt: domish.Element) -> None: if not iq_elt.delegated: return from_jid = self.getServerUser(iq_elt) if from_jid is not None: defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False)) async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None: iq_elt.handled = True try: subs = await self.backend.storage.getAllSubscriptions(from_jid) except Exception as e: error_elt = jabber_error.StanzaError( "internal-server-error", text=str(e) ).toResponse(iq_elt) self.xmlstream.send(error_elt) else: result_elt = xmlstream.toResponse(iq_elt, "result") subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions")) for sub in subs: self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub) self.xmlstream.send(result_elt) def _onSubscriptions(self, iq_elt: domish.Element) -> None: if not iq_elt.delegated: return from_jid = self.getServerUser(iq_elt) if from_jid is not None: defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt)) def getDiscoInfo(self, requestor, service, nodeIdentifier=''): return [disco.DiscoFeature(NS_PAM)] def getDiscoItems(self, requestor, service, nodeIdentifier=''): return []