# HG changeset patch # User Goffi # Date 1652269148 -7200 # Node ID b544109ab4c4a2792368fd745a064d53a393a2f3 # Parent 9125a6e440c01e0c945da68b0b69f15336725490 Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public. diff -r 9125a6e440c0 -r b544109ab4c4 db/libervia_pubsub_update_9_10.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/db/libervia_pubsub_update_9_10.sql Wed May 11 13:39:08 2022 +0200 @@ -0,0 +1,24 @@ +-- we check version of the database before doing anything +-- and stop execution if not good +\set ON_ERROR_STOP +DO $$ +DECLARE ver text; +BEGIN + SELECT value INTO ver FROM metadata WHERE key='version'; + IF NOT FOUND OR ver!='9' THEN + RAISE EXCEPTION 'This update file needs to be applied on database schema version 9, you use version %',ver; + END IF; +END$$; +\unset ON_ERROR_STOP +-- end of version check + +/* subscriptions table updates */ +/* to handle external nodes */ +ALTER TABLE subscriptions ALTER COLUMN node_id DROP NOT NULL; +ALTER TABLE subscriptions ADD COLUMN ext_service text; +ALTER TABLE subscriptions ADD COLUMN ext_node text; +/* and to mark a subscription as public */ +ALTER TABLE subscriptions ADD COLUMN public boolean NOT NULL DEFAULT FALSE; +ALTER TABLE subscriptions ADD UNIQUE (entity_id, ext_service, ext_node); + +UPDATE metadata SET value='10' WHERE key='version'; diff -r 9125a6e440c0 -r b544109ab4c4 db/pubsub.sql --- a/db/pubsub.sql Mon Jan 03 16:48:22 2022 +0100 +++ b/db/pubsub.sql Wed May 11 13:39:08 2022 +0200 @@ -15,8 +15,8 @@ deliver_payloads boolean NOT NULL DEFAULT TRUE, max_items integer NOT NULL DEFAULT 0 CHECK (max_items >= 0), - overwrite_policy text NOT NULL DEFAULT 'original_publisher' - CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')), + overwrite_policy text NOT NULL DEFAULT 'original_publisher' + CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')), serial_ids boolean NOT NULL DEFAULT FALSE, consistent_publisher boolean NOT NULL DEFAULT FALSE, fts_language text NOT NULL DEFAULT 'generic', @@ -53,14 +53,21 @@ subscription_id serial PRIMARY KEY, entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE, resource text, - node_id integer NOT NULL REFERENCES nodes ON delete CASCADE, + node_id integer REFERENCES nodes ON DELETE CASCADE, + /* when we reference an external node (with PAM), node_id is NULL and service and node + * are set */ + ext_service text, + ext_node text, state text NOT NULL DEFAULT 'subscribed' CHECK (state IN ('subscribed', 'pending', 'unconfigured')), subscription_type text CHECK (subscription_type IN (NULL, 'items', 'nodes')), subscription_depth text CHECK (subscription_depth IN (NULL, '1', 'all')), - UNIQUE (entity_id, resource, node_id)); + public boolean NOT NULL DEFAULT FALSE, + UNIQUE (entity_id, resource, node_id)), + UNIQUE (entity_id, ext_service, ext_node)), + ; CREATE TABLE items ( item_id serial PRIMARY KEY, @@ -154,4 +161,4 @@ value text ); -INSERT INTO metadata VALUES ('version', '9'); +INSERT INTO metadata VALUES ('version', '10'); diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/backend.py --- a/sat_pubsub/backend.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/backend.py Wed May 11 13:39:08 2022 +0200 @@ -62,6 +62,7 @@ import copy import uuid +import hashlib from typing import Optional, List, Tuple from zope.interface import implementer @@ -240,10 +241,14 @@ def _getFTSLanguagesEb(self, failure_): log.msg(f"WARNING: can get FTS languages: {failure_}") - def isAdmin(self, entity_jid): + def isAdmin(self, entity_jid: jid.JID) -> bool: """Return True if an entity is an administrator""" return entity_jid.userhostJID() in self.admins + def isFromServer(self, entity_jid: jid.JID) -> bool: + """Return True if an entity come from our server""" + return entity_jid.host == self.server_jid.host + def supportsPublishOptions(self): return True @@ -595,18 +600,21 @@ def registerPurgeNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) - def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): + async def subscribe( + self, + nodeIdentifier: str, + subscriber: jid.JID, + requestor: jid.JID, + options: Optional[dict], + pep: bool, + recipient: jid.JID + ) -> pubsub.Subscription: subscriberEntity = subscriber.userhostJID() if subscriberEntity != requestor.userhostJID(): - return defer.fail(error.Forbidden()) + raise error.Forbidden() - d = self.storage.getNode(nodeIdentifier, pep, recipient) - d.addCallback(_getAffiliation, subscriberEntity) - d.addCallback(self._doSubscribe, subscriber, pep, recipient) - return d - - def _doSubscribe(self, result, subscriber, pep, recipient): - node, affiliation = result + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + __, affiliation = await _getAffiliation(node, subscriberEntity) if affiliation == 'outcast': raise error.Forbidden() @@ -614,67 +622,61 @@ access_model = node.getAccessModel() if access_model == const.VAL_AMODEL_OPEN: - d = defer.succeed(None) + pass elif access_model == const.VAL_AMODEL_PRESENCE: - d = self.checkPresenceSubscription(node, subscriber) + await self.checkPresenceSubscription(node, subscriber) elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: - d = self.checkRosterGroups(node, subscriber) + await self.checkRosterGroups(node, subscriber) elif access_model == const.VAL_AMODEL_WHITELIST: - d = self.checkNodeAffiliations(node, subscriber) + await self.checkNodeAffiliations(node, subscriber) else: raise NotImplementedError - def trapExists(failure): - failure.trap(error.SubscriptionExists) - return False + config = {} + if options and options.get(f"{{{const.NS_PPS}}}public"): + config["public"] = True + try: + await node.addSubscription(subscriber, 'subscribed', config) + except error.SubscriptionExists: + send_last = False + else: + send_last = True - def cb(sendLast): - d = node.getSubscription(subscriber) - if sendLast: - d.addCallback(self._sendLastPublished, node, pep, recipient) - return d - - d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {})) - d.addCallbacks(lambda _: True, trapExists) - d.addCallback(cb) - - return d - - def _sendLastPublished(self, subscription, node, pep, recipient): + subscription = await node.getSubscription(subscriber) - def notifyItem(items_data): - if items_data: - reactor.callLater(0, self.dispatch, - {'items_data': items_data, - 'node': node, - 'pep': pep, - 'recipient': recipient, - 'subscription': subscription, - }, - '//event/pubsub/notify') - - config = node.getConfiguration() - sendLastPublished = config.get('pubsub#send_last_published_item', - 'never') - if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': - entity = subscription.subscriber.userhostJID() - d = defer.ensureDeferred( - self.getItemsData( - node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep} + if send_last: + config = node.getConfiguration() + sendLastPublished = config.get( + 'pubsub#send_last_published_item', 'never' + ) + if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': + entity = subscription.subscriber.userhostJID() + items_data, __ = await self.getItemsData( + node.nodeIdentifier, entity, recipient, maxItems=1, + ext_data={'pep': pep} ) - ) - d.addCallback(notifyItem) - d.addErrback(log.err) + if items_data: + reactor.callLater( + 0, + self.dispatch, + {'items_data': items_data, + 'node': node, + 'pep': pep, + 'recipient': recipient, + 'subscription': subscription, + }, + '//event/pubsub/notify' + ) return subscription - def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): + async def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): if subscriber.userhostJID() != requestor.userhostJID(): - return defer.fail(error.Forbidden()) + raise error.Forbidden() - d = self.storage.getNode(nodeIdentifier, pep, recipient) - d.addCallback(lambda node: node.removeSubscription(subscriber)) - return d + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + await node.removeSubscription(subscriber) + return pubsub.Subscription(nodeIdentifier, subscriber, "none") def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): """retrieve subscriptions of an entity @@ -685,7 +687,9 @@ @param pep(bool): True if it's a PEP request @param recipient(jid.JID, None): recipient of the PEP request """ - return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) + return self.storage.getSubscriptions( + requestor, nodeIdentifier, None, pep, recipient + ) def supportsAutoCreate(self): return True @@ -749,6 +753,10 @@ if not nodeIdentifier: return defer.fail(error.NoRootNode()) + if ((nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS + or nodeIdentifier.startswith(const.PPS_SUBSCRIBERS_PREFIX))): + return defer.succeed({const.OPT_ACCESS_MODEL: const.VAL_AMODEL_OPEN}) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getConfiguration()) @@ -1067,15 +1075,19 @@ ) return ids - def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, - itemIdentifiers=None, ext_data=None): - d = defer.ensureDeferred( - self.getItemsData( - nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data - ) + async def getItems( + self, + nodeIdentifier: str, + requestor: jid.JID, + recipient: jid.JID, + maxItems: Optional[int] = None, + itemIdentifiers: Optional[List[str]] = None, + ext_data: Optional[dict] = None + ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: + items_data, rsm_response = await self.getItemsData( + nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data ) - d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) - return d + return [item_data.item for item_data in items_data], rsm_response async def getOwnerRoster(self, node, owners=None): # FIXME: roster of publisher, not owner, must be used @@ -1099,20 +1111,40 @@ return return roster - async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, - itemIdentifiers=None, ext_data=None): + async def getItemsData( + self, + nodeIdentifier: str, + requestor: jid.JID, + recipient: jid.JID, + maxItems: Optional[int] = None, + itemIdentifiers: Optional[List[str]] = None, + ext_data: Optional[dict] = None + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: """like getItems but return the whole ItemData""" if maxItems == 0: log.msg("WARNING: maxItems=0 on items retrieval") - return [] + return [], None if ext_data is None: ext_data = {} + + if nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS: + return await self.getPublicSubscriptions( + requestor, maxItems, itemIdentifiers, ext_data, + ext_data.pop("pep", False), recipient + ) + elif nodeIdentifier.startswith(f"{const.NS_PPS_SUBSCRIBERS}/"): + target_node = nodeIdentifier[len(const.NS_PPS_SUBSCRIBERS)+1:] + return await self.getPublicNodeSubscriptions( + target_node, requestor, maxItems, itemIdentifiers, ext_data, + ext_data.pop("pep", False), recipient + ) + node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) try: affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) except error.NotLeafNodeError: - return [] + return [], None # at this point node access is checked @@ -1155,9 +1187,9 @@ if schema is not None: self.filterItemsWithSchema(items_data, schema, owner) - await self._items_rsm( - items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) - return items_data + return await self._items_rsm( + items_data, node, requestor_groups, owner, itemIdentifiers, ext_data + ) def _setCount(self, value, response): response.count = value @@ -1180,48 +1212,135 @@ rsm_request = ext_data['rsm'] except KeyError: # No RSM in this request, nothing to do - return items_data + return items_data, None if itemIdentifiers: log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") - return items_data + return items_data, None - response = rsm.RSMResponse() + rsm_response = rsm.RSMResponse() d_count = node.getItemsCount(authorized_groups, owner, ext_data) - d_count.addCallback(self._setCount, response) + d_count.addCallback(self._setCount, rsm_response) d_list = [d_count] if items_data: - response.first = items_data[0].item['id'] - response.last = items_data[-1].item['id'] + rsm_response.first = items_data[0].item['id'] + rsm_response.last = items_data[-1].item['id'] # index handling if rsm_request.index is not None: - response.index = rsm_request.index + rsm_response.index = rsm_request.index elif rsm_request.before: # The last page case (before == '') is managed in render method d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) - d_index.addCallback(self._setIndex, response, -len(items_data)) + d_index.addCallback(self._setIndex, rsm_response, -len(items_data)) d_list.append(d_index) elif rsm_request.after is not None: d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) - d_index.addCallback(self._setIndex, response, 1) + d_index.addCallback(self._setIndex, rsm_response, 1) d_list.append(d_index) else: # the first page was requested - response.index = 0 + rsm_response.index = 0 await defer.DeferredList(d_list) if rsm_request.before == '': # the last page was requested - response.index = response.count - len(items_data) + rsm_response.index = rsm_response.count - len(items_data) + + return items_data, rsm_response + + def addEltFromSubDict( + self, + parent_elt: domish.Element, + from_jid: Optional[jid.JID], + sub_dict: dict[str, str], + namespace: Optional[str] = None, + ) -> None: + """Generate element from storage.getAllSubscriptions's dict + + @param parent_elt: element where the new subscription element must be added + @param sub_dict: subscription data as returned by storage.getAllSubscriptions + @param namespace: if not None, namespace to use for element + @param service_attribute: name of the attribute to use for the subscribed service + """ + subscription_elt = parent_elt.addElement( + "subscription" if namespace is None else (namespace, "subscription") + ) + if from_jid is not None: + subscription_elt["jid"] = from_jid.userhost() + if sub_dict["node"] is not None: + if sub_dict["pep"] is not None: + subscription_elt["service"] = sub_dict["pep"] + else: + subscription_elt["service"] = self.jid.full() + subscription_elt["node"] = sub_dict["node"] + else: + subscription_elt["service"] = sub_dict["ext_service"] + subscription_elt["node"] = sub_dict["ext_node"] + subscription_elt["subscription"] = sub_dict["state"] + + async def getPublicSubscriptions( + self, + requestor: jid.JID, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + ext_data: dict, + pep: bool, + recipient: jid.JID + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: - items_data.append(container.ItemData(response.toElement())) + if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): + raise NotImplementedError( + "item identifiers, RSM and MAM are not implemented yet" + ) + + if not pep: + return [], None + + subs = await self.storage.getAllSubscriptions(recipient, True) + items_data = [] + for sub in subs: + if sub["state"] != "subscribed": + continue + item = domish.Element((pubsub.NS_PUBSUB, "item")) + item["id"] = sub["id"] + self.addEltFromSubDict(item, None, sub, const.NS_PPS) + items_data.append(container.ItemData(item)) + + return items_data, None - return items_data + async def getPublicNodeSubscriptions( + self, + nodeIdentifier: str, + requestor: jid.JID, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + ext_data: dict, + pep: bool, + recipient: jid.JID + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: + + if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): + raise NotImplementedError( + "item identifiers, RSM and MAM are not implemented yet" + ) + + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + + subs = await node.getSubscriptions(public=True) + items_data = [] + for sub in subs: + item = domish.Element((pubsub.NS_PUBSUB, "item")) + item["id"] = sub.id + subscriber_elt = item.addElement((const.NS_PPS, "subscriber")) + subscriber_elt["jid"] = sub.subscriber.full() + items_data.append(container.ItemData(item)) + + return items_data, None async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): node = await self.storage.getNode(nodeIdentifier, pep, recipient) @@ -1824,19 +1943,26 @@ return d.addErrback(self._mapErrors) def subscribe(self, request): - d = self.backend.subscribe(request.nodeIdentifier, - request.subscriber, - request.sender, - self._isPep(request), - request.recipient) + d = defer.ensureDeferred( + self.backend.subscribe( + request.nodeIdentifier, + request.subscriber, + request.sender, + request.options, + self._isPep(request), + request.recipient + ) + ) return d.addErrback(self._mapErrors) def unsubscribe(self, request): - d = self.backend.unsubscribe(request.nodeIdentifier, + d = defer.ensureDeferred( + self.backend.unsubscribe(request.nodeIdentifier, request.subscriber, request.sender, self._isPep(request), request.recipient) + ) return d.addErrback(self._mapErrors) def subscriptions(self, request): @@ -1933,12 +2059,16 @@ except AttributeError: pass ext_data['order_by'] = request.orderBy or [] - d = self.backend.getItems(request.nodeIdentifier, - request.sender, - request.recipient, - request.maxItems, - request.itemIdentifiers, - ext_data) + d = defer.ensureDeferred( + self.backend.getItems( + request.nodeIdentifier, + request.sender, + request.recipient, + request.maxItems, + request.itemIdentifiers, + ext_data + ) + ) return d.addErrback(self._mapErrors) def retract(self, request): @@ -1987,7 +2117,8 @@ # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome disco.DiscoFeature(const.NS_PUBSUB_RSM), disco.DiscoFeature(pubsub.NS_ORDER_BY), - disco.DiscoFeature(const.NS_FDP) + disco.DiscoFeature(const.NS_FDP), + disco.DiscoFeature(const.NS_PPS) ] def getDiscoItems(self, requestor, service, nodeIdentifier=''): diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/const.py --- a/sat_pubsub/const.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/const.py Wed May 11 13:39:08 2022 +0200 @@ -59,8 +59,13 @@ NS_PUBSUB_RSM = "http://jabber.org/protocol/pubsub#rsm" NS_FORWARD = 'urn:xmpp:forward:0' NS_FDP = 'urn:xmpp:fdp:0' +NS_PPS = 'urn:xmpp:pps:0' +NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0" +NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0" NS_SCHEMA_RESTRICT = 'https://salut-a-toi/protocol/schema#restrict:0' +PPS_SUBSCRIBERS_PREFIX = f"{NS_PPS_SUBSCRIBERS}/" + FDP_TEMPLATE_PREFIX = "fdp/template/" FDP_SUBMITTED_PREFIX = "fdp/submitted/" diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/delegation.py --- a/sat_pubsub/delegation.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/delegation.py Wed May 11 13:39:08 2022 +0200 @@ -20,16 +20,19 @@ # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service -from wokkel.subprotocols import XMPPHandler +from typing import Callable, Any + +from twisted.python import log +from twisted.internet import reactor, defer +from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish from wokkel import pubsub from wokkel import data_form -from wokkel import disco, iwokkel, generic -from wokkel.iwokkel import IPubSubService +from wokkel import disco, iwokkel from wokkel import mam -from twisted.python import log -from twisted.words.protocols.jabber import ijabber, jid, error -from twisted.words.protocols.jabber.xmlstream import toResponse -from twisted.words.xish import domish +from wokkel.iwokkel import IPubSubService +from wokkel.subprotocols import XMPPHandler from zope.interface import implementer DELEGATION_NS = 'urn:xmpp:delegation:2' @@ -40,6 +43,7 @@ DELEGATION_MAIN_SEP = "::" DELEGATION_BARE_SEP = ":bare:" +SEND_HOOK_TIMEOUT = 300 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), (mam.IMAMService, mam, "MAMRequest"), (None, disco, "_DiscoRequest")) @@ -108,10 +112,28 @@ self._service_hack() self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) - self._current_iqs = {} # dict of iq being handler by delegation - self._xs_send = self.xmlstream.send + self._current_iqs = {} # dict of iq being handled by delegation + self.xs_send = self.xmlstream.send self.xmlstream.send = self._sendHack + def delegatedResult( + self, + iq_req_elt: domish.Element, + iq_resp_elt: domish.Element, + wrapping_iq_elt: domish.Element + ) -> None: + """Method called when a result to a delegated stanza has been received + + The result is wrapped and sent back to server + """ + iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result') + fwd_elt = iq_result_elt.addElement( + 'delegation', DELEGATION_NS + ).addElement('forwarded', FORWARDED_NS) + fwd_elt.addChild(iq_resp_elt) + iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client' + self.xs_send(iq_result_elt) + def _sendHack(self, elt): """This method is called instead of xmlstream to control sending @@ -119,24 +141,24 @@ """ if isinstance(elt, domish.Element) and elt.name=='iq': try: - id_ = elt.getAttribute('id') - ori_iq, managed_entity = self._current_iqs[id_] - if jid.JID(elt['to']) != managed_entity: - log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" - .format(got=jid.JID(elt['to']), expected=managed_entity)) + iq_id = elt["id"] + iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id] + if elt['to'] != iq_req_elt["from"]: + log.err( + "IQ id conflict: the managed entity doesn't match (got " + f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})" + ) raise KeyError except KeyError: # the iq is not a delegated one - self._xs_send(elt) + self.xs_send(elt) else: - del self._current_iqs[id_] - iq_result_elt = toResponse(ori_iq, 'result') - fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) - fwd_elt.addChild(elt) - elt.uri = elt.defaultUri = 'jabber:client' - self._xs_send(iq_result_elt) + if not timeout.called: + timeout.cancel() + del self._current_iqs[iq_id] + callback(iq_req_elt, elt, *cb_args) else: - self._xs_send(elt) + self.xs_send(elt) def _obsWrapper(self, observer, stanza): """Wrapper to observer which catch StanzaError @@ -147,7 +169,7 @@ observer(stanza) except error.StanzaError as e: error_elt = e.toResponse(stanza) - self._xs_send(error_elt) + self.xs_send(error_elt) stanza.handled = True def onAdvertise(self, message): @@ -186,12 +208,39 @@ log.msg("Invalid stanza received ({})".format(e)) log.msg('delegations updated:\n{}'.format( - '\n'.join([" - namespace {}{}".format(ns, + '\n'.join(["- namespace {}{}".format(ns, "" if not attributes else " with filtering on {} attribute(s)".format( ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) if not pubsub.NS_PUBSUB in delegated: - log.msg("Didn't got pubsub delegation from server, can't act as a PEP service") + log.msg( + "Didn't got pubsub delegation from server, can't act as a PEP service" + ) + + def registerSendHook( + self, + iq_elt: domish.Element, + callback: Callable[[domish.Element, domish.Element, ...], None], + *args + ) -> None: + """Register a methode to call when an IQ element response is received + + If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted + @param iq_elt: source IQ element sent. Its "id" attribute will be used to track + response + @param callback: method to call when answer is seen + Will be called with: + - original IQ request + - received IQ result (or error) + - optional extra arguments + self.xs_send should be used to send final result + @param *args: argument to use with callback + """ + iq_id = iq_elt["id"] + timeout = reactor.callLater( + SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None) + ) + self._current_iqs[iq_id] = (iq_elt, callback, args, timeout) def onForward(self, iq): """Manage forwarded iq @@ -210,9 +259,7 @@ except StopIteration: raise error.StanzaError('not-acceptable') - managed_entity = jid.JID(fwd_iq['from']) - - self._current_iqs[fwd_iq['id']] = (iq, managed_entity) + self.registerSendHook(fwd_iq, self.delegatedResult, iq) fwd_iq.delegated = True # we need a recipient in pubsub request for PEP diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/pam.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/pam.py Wed May 11 13:39:08 2022 +0200 @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2015-2022 Jérôme Poisson + + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"This module implements XEP-0376 (Pubsub Account Management)" + +from typing import Optional + +from twisted.internet import defer +from twisted.python import log +from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error +from twisted.words.xish import domish +from wokkel import disco, iwokkel, pubsub, data_form +from wokkel.iwokkel import IPubSubService +from zope.interface import implementer + +from sat_pubsub import error + +NS_PAM = "urn:xmpp:pam:0" +PAM_SUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_UNSUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_SUBSCRIPTIONS_XPATH = ( + f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]' +) + + +@implementer(iwokkel.IDisco) +class PAMHandler(disco.DiscoClientProtocol): + + def __init__(self, service_jid): + super(PAMHandler, self).__init__() + self.backend = None + + def connectionInitialized(self): + for handler in self.parent.handlers: + if IPubSubService.providedBy(handler): + self._pubsub_service = handler + break + self.backend = self.parent.parent.getServiceNamed('backend') + self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe) + self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe) + self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions) + + def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]: + """Get JID of sender if it's a user from our server + + If it's a user from an external server, None is returned and a message is log + """ + from_jid = jid.JID(iq_elt["from"]) + if not self.backend.isFromServer(from_jid): + log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}") + else: + return jid.JID(iq_elt["from"]) + + def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt): + destinee_jid = jid.JID(iq_req_elt["from"]) + sender_jid = jid.JID(iq_req_elt["to"]) + message_elt = domish.Element((None, "message")) + message_elt["to"] = destinee_jid.userhost() + message_elt["from"] = destinee_jid.userhost() + # XXX: we explicitely store the notification to be sure that all clients get it + message_elt.addElement(("urn:xmpp:hints", "store")) + notify_elt = message_elt.addElement((NS_PAM, "notify")) + notify_elt["service"] = sender_jid.full() + notify_elt.addChild(iq_result_elt.pubsub.subscription) + self.backend.privilege.sendMessage(message_elt) + pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result') + self.xmlstream.send(pam_iq_result_elt) + + async def onSubRequest(self, from_jid, iq_elt, subscribe=True): + try: + service_jid = jid.JID(iq_elt.pam.getAttribute("jid")) + except RuntimeError: + log.msg( + f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}' + ) + return + iq_elt.handled = True + new_iq_elt = domish.Element((None, "iq")) + new_iq_elt["from"] = from_jid.userhost() + new_iq_elt["to"] = service_jid.full() + new_iq_elt.addUniqueId() + new_iq_elt["type"] = "set" + new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub")) + new_pubsub_elt.addChild( + iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe + ) + try: + options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options")) + except StopIteration: + options_elt = None + else: + new_pubsub_elt.addChild(options_elt) + + if self.backend.isFromServer(service_jid): + # the request is handled locally + new_iq_elt.delegated = True + self.backend.delegation.registerSendHook( + new_iq_elt, self.onSubscribeResult, iq_elt + ) + self.xmlstream.dispatch(new_iq_elt) + else: + # this is a (un)subscribe request to an external server + sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt) + if sub_result_elt["type"] == "result": + if subscribe: + node = new_iq_elt.pubsub.subscribe["node"] + state = sub_result_elt.pubsub.subscription.getAttribute( + "subscription", "subscribed" + ) + public = False + if options_elt is not None: + options_form = data_form.findForm( + options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS + ) + if options_form is not None: + public = options_form.get(f"{{{const.NS_PPS}}}public", False) + await self.backend.storage.addExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + state, + public + ) + else: + node = new_iq_elt.pubsub.unsubscribe["node"] + try: + await self.backend.storage.removeExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + ) + except error.NotSubscribed: + pass + self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt) + + def _onSubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt)) + + def _onUnsubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False)) + + async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None: + iq_elt.handled = True + try: + subs = await self.backend.storage.getAllSubscriptions(from_jid) + except Exception as e: + error_elt = jabber_error.StanzaError( + "internal-server-error", + text=str(e) + ).toResponse(iq_elt) + self.xmlstream.send(error_elt) + else: + result_elt = xmlstream.toResponse(iq_elt, "result") + subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions")) + for sub in subs: + self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub) + self.xmlstream.send(result_elt) + + def _onSubscriptions(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt)) + + def getDiscoInfo(self, requestor, service, nodeIdentifier=''): + return [disco.DiscoFeature(NS_PAM)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=''): + return [] diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/pgsql_storage.py --- a/sat_pubsub/pgsql_storage.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/pgsql_storage.py Wed May 11 13:39:08 2022 +0200 @@ -49,7 +49,7 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - +from typing import Optional, List import copy, logging from datetime import datetime, timezone @@ -80,7 +80,7 @@ parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) ITEMS_SEQ_NAME = 'node_{node_id}_seq' PEP_COL_NAME = 'pep' -CURRENT_VERSION = '9' +CURRENT_VERSION = '10' # retrieve the maximum integer item id + 1 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" @@ -406,8 +406,15 @@ rows = cursor.fetchall() return [tuple(r) for r in rows] - def getSubscriptions(self, entity, nodeIdentifier=None, pep=False, recipient=None): - """retrieve subscriptions of an entity + def getSubscriptions( + self, + entity: jid.JID, + nodeIdentifier: Optional[str] = None, + public: Optional[bool] = None, + pep: bool = False, + recipient: Optional[jid.JID]=None + ) -> List[Subscription]: + """Retrieve local subscriptions of an entity @param entity(jid.JID): entity to check @param nodeIdentifier(unicode, None): node identifier @@ -425,25 +432,136 @@ subscriptions.append(subscription) return subscriptions - query = ["""SELECT node, + query = ["""SELECT nodes.node, jid, resource, state FROM entities NATURAL JOIN subscriptions - NATURAL JOIN nodes - WHERE jid=%s"""] - + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + WHERE jid=%s AND subscriptions.node_id IS NOT NULL"""] args = [entity.userhost()] + if public is not None: + query.append("AND subscriptions.public=%s") + args.append(public) + if nodeIdentifier is not None: - query.append("AND node=%s") + query.append("AND nodes.node=%s") args.append(nodeIdentifier) d = self.dbpool.runQuery(*withPEP(' '.join(query), args, pep, recipient)) d.addCallback(toSubscriptions) return d + async def getAllSubscriptions( + self, + entity: jid.JID, + public: Optional[bool] = None + ): + query = """SELECT subscription_id::text as id, + node, + pep, + ext_service, + ext_node, + state + FROM entities + NATURAL JOIN subscriptions + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + WHERE jid=%s""" + args = [entity.userhost()] + if public is not None: + query += "AND public=%s" + args.append(public) + rows = await self.dbpool.runQuery(query, args) + return [r._asdict() for r in rows] + + def addExternalSubscription( + self, + entity: jid.JID, + service: jid.JID, + node: str, + state: str, + public: bool = False + ) -> defer.Deferred: + """Store a subscription to an external node + + @param entity: entity being subscribed + @param service: pubsub service hosting the node + @param node: pubsub node being subscribed to + @param state: state of the subscription + @param public: True if the subscription is publicly visible + """ + return self.dbpool.runInteraction( + self._addExternalSubscription, + entity, service, node, state, public + ) + + def _addExternalSubscription( + self, + cursor, + entity: jid.JID, + service: jid.JID, + node: str, + state: str, + public: bool + ) -> None: + + try: + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (entity.userhost(),)) + except cursor._pool.dbapi.IntegrityError: + cursor.connection.rollback() + + cursor.execute("""INSERT INTO subscriptions + (ext_service, ext_node, entity_id, state, public) + SELECT %s, %s, entity_id, %s, %s FROM + (SELECT entity_id FROM entities + WHERE jid=%s) AS ent_id + ON CONFLICT(entity_id, ext_service, ext_node) DO UPDATE SET public=EXCLUDED.public""", + (service.full(), + node, + state, + public, + entity.userhost() + )) + + def removeExternalSubscription( + self, + entity: jid.JID, + service: jid.JID, + node: str, + ) -> defer.Deferred: + """Remove a subscription from an external node + + @param entity: entity being unsubscribed + @param service: pubsub service hosting the node + @param node: pubsub node being unsubscribed to + """ + return self.dbpool.runInteraction( + self._removeExternalSubscription, + entity, service, node + ) + + def _removeExternalSubscription( + self, + cursor, + entity: jid.JID, + service: jid.JID, + node: str, + ) -> None: + cursor.execute("""DELETE FROM subscriptions WHERE + ext_service=%s AND + ext_node=%s AND + entity_id=(SELECT entity_id FROM entities + WHERE jid=%s) + """, + (service.full(), + node, + entity.userhost())) + if cursor.rowcount != 1: + raise error.NotSubscribed() + def getDefaultConfiguration(self, nodeType): return self.defaultConfig[nodeType].copy() @@ -683,9 +801,9 @@ resource = subscriber.resource or '' cursor.execute("""SELECT state FROM subscriptions - NATURAL JOIN nodes + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id NATURAL JOIN entities - WHERE node_id=%s AND jid=%s AND resource=%s""", + WHERE subscriptions.node_id=%s AND jid=%s AND resource=%s""", (self.nodeDbId, userhost, resource)) @@ -696,25 +814,38 @@ else: return Subscription(self.nodeIdentifier, subscriber, row[0]) - def getSubscriptions(self, state=None): - return self.dbpool.runInteraction(self._getSubscriptions, state) + def getSubscriptions( + self, + state: Optional[str]=None, + public: Optional[bool] = None + ) -> List[Subscription]: + return self.dbpool.runInteraction(self._getSubscriptions, state, public) - def _getSubscriptions(self, cursor, state): + def _getSubscriptions( + self, + cursor, + state: Optional[str], + public: Optional[bool] = None, + ) -> List[Subscription]: self._checkNodeExists(cursor) - query = """SELECT node, jid, resource, state, + query = ["""SELECT subscription_id::text, nodes.node, jid, resource, state, subscription_type, subscription_depth - FROM subscriptions - NATURAL JOIN nodes - NATURAL JOIN entities - WHERE node_id=%s""" + FROM subscriptions + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + NATURAL JOIN entities + WHERE subscriptions.node_id=%s"""] values = [self.nodeDbId] if state: - query += " AND state=%s" + query.append("AND state=%s") values.append(state) - cursor.execute(query, values) + if public is not None: + query.append("AND public=%s") + values.append(public) + + cursor.execute(" ".join(query), values) rows = cursor.fetchall() subscriptions = [] @@ -727,8 +858,9 @@ if row.subscription_depth: options['pubsub#subscription_depth'] = row.subscription_depth; - subscriptions.append(Subscription(row.node, subscriber, - row.state, options)) + subscription = Subscription(row.node, subscriber, row.state, options) + subscription.id = row.subscription_id + subscriptions.append(subscription) return subscriptions @@ -744,6 +876,7 @@ subscription_type = config.get('pubsub#subscription_type') subscription_depth = config.get('pubsub#subscription_depth') + public = config.get("public", False) try: cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", @@ -751,20 +884,31 @@ except cursor._pool.dbapi.IntegrityError: cursor.connection.rollback() - try: - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state, - subscription_type, subscription_depth) - SELECT %s, entity_id, %s, %s, %s, %s FROM - (SELECT entity_id FROM entities - WHERE jid=%s) AS ent_id""", - (self.nodeDbId, - resource, - state, - subscription_type, - subscription_depth, - userhost)) - except cursor._pool.dbapi.IntegrityError: + # the RETURNING trick to detect INSERT vs UPDATE comes from + # https://stackoverflow.com/a/47001830/4188764 thanks! + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state, + subscription_type, subscription_depth, public) + SELECT %s, entity_id, %s, %s, %s, %s, %s FROM + (SELECT entity_id FROM entities + WHERE jid=%s) AS ent_id + ON CONFLICT (entity_id, node_id, resource) DO UPDATE SET public=EXCLUDED.public + RETURNING (xmax = 0) AS inserted""", + (self.nodeDbId, + resource, + state, + subscription_type, + subscription_depth, + public, + userhost)) + + rows = cursor.fetchone() + if not rows.inserted: + # this was an update, the subscription was already existing + + # we have to explicitly commit, otherwise the exception raised rollbacks the + # transation + cursor.connection.commit() raise error.SubscriptionExists() def removeSubscription(self, subscriber): diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/privilege.py --- a/sat_pubsub/privilege.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/privilege.py Wed May 11 13:39:08 2022 +0200 @@ -19,13 +19,13 @@ "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " "presences" -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Union, Set import time from twisted.internet import defer from twisted.python import log -from twisted.python import failure from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import xmppim from wokkel import pubsub @@ -35,13 +35,14 @@ from .error import NotAllowedError -FORWARDED_NS = 'urn:xmpp:forward:0' -PRIV_ENT_NS = 'urn:xmpp:privilege:1' -PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS) +NS_FORWARDED = 'urn:xmpp:forward:0' +NS_PRIV_ENT = 'urn:xmpp:privilege:2' +PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(NS_PRIV_ENT) ROSTER_NS = 'jabber:iq:roster' PERM_ROSTER = 'roster' PERM_MESSAGE = 'message' PERM_PRESENCE = 'presence' +PERM_IQ = 'iq' ALLOWED_ROSTER = ('none', 'get', 'set', 'both') ALLOWED_MESSAGE = ('none', 'outgoing') ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster') @@ -50,6 +51,13 @@ PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE } +PERMS_BASE : dict[str, Optional[Union[str, Dict[str, Union[str, bool]]]]]= { + PERM_ROSTER: None, + PERM_MESSAGE: None, + PERM_PRESENCE: None, + PERM_IQ: None, +} + # Number of seconds before a roster cache is not considered valid anymore. # We keep this delay to avoid requesting roster too much in a row if an entity is @@ -70,9 +78,7 @@ def __init__(self, service_jid): super(PrivilegesHandler, self).__init__() self.backend = None - self._permissions = {PERM_ROSTER: 'none', - PERM_MESSAGE: 'none', - PERM_PRESENCE: 'none'} + self._permissions = PERMS_BASE.copy() self._pubsub_service = None self.caps_map = {} # key: bare jid, value: dict of resources with caps hash # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to @@ -120,34 +126,69 @@ self._permissions will be updated according to advertised privileged """ - privilege_elt = next(message.elements(PRIV_ENT_NS, 'privilege')) - for perm_elt in privilege_elt.elements(PRIV_ENT_NS): + self._permissions = PERMS_BASE.copy() + privilege_elt = next(message.elements(NS_PRIV_ENT, 'privilege')) + for perm_elt in privilege_elt.elements(NS_PRIV_ENT, 'perm'): try: - if perm_elt.name != 'perm': - raise InvalidStanza('unexpected element {}'.format(perm_elt.name)) - perm_access = perm_elt['access'] - perm_type = perm_elt['type'] + perm_access = perm_elt["access"] + except KeyError: + log.err(f"missing 'access' attribute in perm element: {perm_elt.toXml()}") + continue + if perm_access in (PERM_ROSTER, PERM_MESSAGE, PERM_PRESENCE): try: + perm_type = perm_elt["type"] + except KeyError: + log.err( + "missing 'type' attribute in perm element: " + f"{perm_elt.toXml()}" + ) + continue + else: if perm_type not in TO_CHECK[perm_access]: - raise InvalidStanza( - 'bad type [{}] for permission {}' - .format(perm_type, perm_access) + log.err( + f'bad type {perm_type!r}: {perm_elt.toXml()}' + ) + continue + self._permissions[perm_access] = perm_type or None + elif perm_access == "iq": + iq_perms = self._permissions["iq"] = {} + for namespace_elt in perm_elt.elements(NS_PRIV_ENT, "namespace"): + ns = namespace_elt.getAttribute("ns") + perm_type = namespace_elt.getAttribute("type") + if not ns or not perm_type: + log.err( + f"invalid namespace element: {namespace_elt.toXml()}" ) - except KeyError: - raise InvalidStanza('bad permission [{}]'.format(perm_access)) - except InvalidStanza as e: - log.msg( - f"Invalid stanza received ({e}), setting permission to none" - ) - for perm in self._permissions: - self._permissions[perm] = 'none' - break + else: + if perm_type not in ("get", "set", "both"): + log.err( + f"invalid namespace type: {namespace_elt.toXml()}" + ) + else: + ns_perms = iq_perms[ns] = {"type": perm_type} + ns_perms["get"] = perm_type in ("get", "both") + ns_perms["set"] = perm_type in ("set", "both") + else: + log.err(f"unknown {perm_access!r} access: {perm_elt.toXml()}'") - self._permissions[perm_access] = perm_type or 'none' + perms = self._permissions + perms_iq = perms["iq"] + if perms_iq is None: + iq_perm_txt = " no iq perm advertised" + elif not isinstance(perms_iq, dict): + raise ValueError('INTERNAL ERROR: "iq" perm should a dict') + else: + iq_perm_txt = "\n".join( + f" - {ns}: {perms['type']}" + for ns, perms in perms_iq.items() + ) log.msg( - 'Privileges updated: roster={roster}, message={message}, presence={presence}' - .format(**self._permissions) + "Privileges updated:\n" + f"roster: {perms[PERM_ROSTER]}\n" + f"message: {perms[PERM_MESSAGE]}\n" + f"presence: {perms[PERM_PRESENCE]}\n" + f"iq:\n{iq_perm_txt}" ) ## roster ## @@ -308,8 +349,8 @@ if to_jid is None: to_jid = self.backend.server_jid main_message['to'] = to_jid.full() - privilege_elt = main_message.addElement((PRIV_ENT_NS, 'privilege')) - forwarded_elt = privilege_elt.addElement((FORWARDED_NS, 'forwarded')) + privilege_elt = main_message.addElement((NS_PRIV_ENT, 'privilege')) + forwarded_elt = privilege_elt.addElement((NS_FORWARDED, 'forwarded')) priv_message['xmlns'] = 'jabber:client' forwarded_elt.addChild(priv_message) self.send(main_message) @@ -452,6 +493,62 @@ for pep_jid, node, item, item_access_model in last_items: self.notifyPublish(pep_jid, node, [(from_jid, None, [item])]) + ## IQ ## + + async def sendIQ( + self, + priv_iq: domish.Element, + to: Optional[jid.JID] = None + ) -> domish.Element: + """Send privileged IQ stanza + + @param priv_iq: privileged IQ stanza + @param to: bare jid of user on behalf of who the stanza is sent + The stanza will be wrapped and sent to the server. Result/Error stanza will sent + back as return value. + """ + if to is None: + try: + to = jid.JID(priv_iq["from"]) + except (KeyError, RuntimeError): + raise ValueError( + 'no "to" specified, and invalid "to" attribute in priv_iq' + ) + if not to.user or to.resource or to.host != self.backend.server_jid.userhost(): + raise NotAllowedError( + f'"to" attribute must be set to a bare jid of the server, {to} is invalid' + ) + iq_type = priv_iq.getAttribute("type") + if iq_type not in ("get", "set"): + raise ValueError(f"invalid IQ type: {priv_iq.toXml()}") + first_child = priv_iq.firstChildElement() + iq_perms: Optional[dict] = self._permissions[PERM_IQ] + + if ((not iq_perms or first_child is None or first_child.uri is None + or not iq_perms.get(first_child.uri, {}).get(iq_type, False))): + raise NotAllowedError( + "privileged IQ stanza not allowed for this namespace/type combination " + f"{priv_iq.toXml()}" + ) + + main_iq = xmlstream.IQ(self.xmlstream, iq_type) + main_iq.timeout = 120 + privileged_iq_elt = main_iq.addElement((NS_PRIV_ENT, "privileged_iq")) + priv_iq['xmlns'] = 'jabber:client' + privileged_iq_elt.addChild(priv_iq) + ret_elt = await main_iq.send(to.full()) + # we unwrap the result + for name, ns in ( + ("privilege", NS_PRIV_ENT), + ("forwarded", NS_FORWARDED), + ("iq", "jabber:client") + ): + try: + ret_elt = next(ret_elt.elements(ns, name)) + except StopIteration: + raise ValueError(f"Invalid privileged IQ result: {ret_elt.toXml()}") + return ret_elt + ## misc ## async def getAutoSubscribers( diff -r 9125a6e440c0 -r b544109ab4c4 sat_pubsub/pubsub_admin.py --- a/sat_pubsub/pubsub_admin.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/pubsub_admin.py Wed May 11 13:39:08 2022 +0200 @@ -92,13 +92,13 @@ for item in publish_elt.elements(pubsub.NS_PUBSUB, 'item'): try: requestor = jid.JID(item.attributes.pop('publisher')) + except KeyError: + requestor = from_jid except Exception as e: log.msg("WARNING: invalid jid in publisher ({requestor}): {msg}" .format(requestor=requestor, msg=e)) self.sendError(iq_elt) return - except KeyError: - requestor = from_jid # we don't use a DeferredList because we want to be sure that # each request is done in order diff -r 9125a6e440c0 -r b544109ab4c4 twisted/plugins/pubsub.py --- a/twisted/plugins/pubsub.py Mon Jan 03 16:48:22 2022 +0100 +++ b/twisted/plugins/pubsub.py Wed May 11 13:39:08 2022 +0200 @@ -232,6 +232,7 @@ from sat_pubsub.backend import BackendService, ExtraDiscoHandler from sat_pubsub.privilege import PrivilegesHandler from sat_pubsub.delegation import DelegationsHandler + from sat_pubsub.pam import PAMHandler if not config['jid'] or not config['xmpp_pwd']: raise usage.UsageError("You must specify jid and xmpp_pwd") @@ -290,6 +291,10 @@ ph.setHandlerParent(cs) bs.privilege = ph + pam = PAMHandler(config["jid"]) + pam.setHandlerParent(cs) + bs.pam = pam + resource = IPubSubResource(bs) resource.hideNodes = config["hide-nodes"] resource.serviceJID = config["jid"]