Mercurial > libervia-pubsub
changeset 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | 9125a6e440c0 |
children | cfa40fa108a4 |
files | db/libervia_pubsub_update_9_10.sql db/pubsub.sql sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/delegation.py sat_pubsub/pam.py sat_pubsub/pgsql_storage.py sat_pubsub/privilege.py sat_pubsub/pubsub_admin.py twisted/plugins/pubsub.py |
diffstat | 10 files changed, 859 insertions(+), 202 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/db/libervia_pubsub_update_9_10.sql Wed May 11 13:39:08 2022 +0200 @@ -0,0 +1,24 @@ +-- we check version of the database before doing anything +-- and stop execution if not good +\set ON_ERROR_STOP +DO $$ +DECLARE ver text; +BEGIN + SELECT value INTO ver FROM metadata WHERE key='version'; + IF NOT FOUND OR ver!='9' THEN + RAISE EXCEPTION 'This update file needs to be applied on database schema version 9, you use version %',ver; + END IF; +END$$; +\unset ON_ERROR_STOP +-- end of version check + +/* subscriptions table updates */ +/* to handle external nodes */ +ALTER TABLE subscriptions ALTER COLUMN node_id DROP NOT NULL; +ALTER TABLE subscriptions ADD COLUMN ext_service text; +ALTER TABLE subscriptions ADD COLUMN ext_node text; +/* and to mark a subscription as public */ +ALTER TABLE subscriptions ADD COLUMN public boolean NOT NULL DEFAULT FALSE; +ALTER TABLE subscriptions ADD UNIQUE (entity_id, ext_service, ext_node); + +UPDATE metadata SET value='10' WHERE key='version';
--- a/db/pubsub.sql Mon Jan 03 16:48:22 2022 +0100 +++ b/db/pubsub.sql Wed May 11 13:39:08 2022 +0200 @@ -15,8 +15,8 @@ deliver_payloads boolean NOT NULL DEFAULT TRUE, max_items integer NOT NULL DEFAULT 0 CHECK (max_items >= 0), - overwrite_policy text NOT NULL DEFAULT 'original_publisher' - CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')), + overwrite_policy text NOT NULL DEFAULT 'original_publisher' + CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')), serial_ids boolean NOT NULL DEFAULT FALSE, consistent_publisher boolean NOT NULL DEFAULT FALSE, fts_language text NOT NULL DEFAULT 'generic', @@ -53,14 +53,21 @@ subscription_id serial PRIMARY KEY, entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE, resource text, - node_id integer NOT NULL REFERENCES nodes ON delete CASCADE, + node_id integer REFERENCES nodes ON DELETE CASCADE, + /* when we reference an external node (with PAM), node_id is NULL and service and node + * are set */ + ext_service text, + ext_node text, state text NOT NULL DEFAULT 'subscribed' CHECK (state IN ('subscribed', 'pending', 'unconfigured')), subscription_type text CHECK (subscription_type IN (NULL, 'items', 'nodes')), subscription_depth text CHECK (subscription_depth IN (NULL, '1', 'all')), - UNIQUE (entity_id, resource, node_id)); + public boolean NOT NULL DEFAULT FALSE, + UNIQUE (entity_id, resource, node_id)), + UNIQUE (entity_id, ext_service, ext_node)), + ; CREATE TABLE items ( item_id serial PRIMARY KEY, @@ -154,4 +161,4 @@ value text ); -INSERT INTO metadata VALUES ('version', '9'); +INSERT INTO metadata VALUES ('version', '10');
--- a/sat_pubsub/backend.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/backend.py Wed May 11 13:39:08 2022 +0200 @@ -62,6 +62,7 @@ import copy import uuid +import hashlib from typing import Optional, List, Tuple from zope.interface import implementer @@ -240,10 +241,14 @@ def _getFTSLanguagesEb(self, failure_): log.msg(f"WARNING: can get FTS languages: {failure_}") - def isAdmin(self, entity_jid): + def isAdmin(self, entity_jid: jid.JID) -> bool: """Return True if an entity is an administrator""" return entity_jid.userhostJID() in self.admins + def isFromServer(self, entity_jid: jid.JID) -> bool: + """Return True if an entity come from our server""" + return entity_jid.host == self.server_jid.host + def supportsPublishOptions(self): return True @@ -595,18 +600,21 @@ def registerPurgeNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs) - def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): + async def subscribe( + self, + nodeIdentifier: str, + subscriber: jid.JID, + requestor: jid.JID, + options: Optional[dict], + pep: bool, + recipient: jid.JID + ) -> pubsub.Subscription: subscriberEntity = subscriber.userhostJID() if subscriberEntity != requestor.userhostJID(): - return defer.fail(error.Forbidden()) + raise error.Forbidden() - d = self.storage.getNode(nodeIdentifier, pep, recipient) - d.addCallback(_getAffiliation, subscriberEntity) - d.addCallback(self._doSubscribe, subscriber, pep, recipient) - return d - - def _doSubscribe(self, result, subscriber, pep, recipient): - node, affiliation = result + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + __, affiliation = await _getAffiliation(node, subscriberEntity) if affiliation == 'outcast': raise error.Forbidden() @@ -614,67 +622,61 @@ access_model = node.getAccessModel() if access_model == const.VAL_AMODEL_OPEN: - d = defer.succeed(None) + pass elif access_model == const.VAL_AMODEL_PRESENCE: - d = self.checkPresenceSubscription(node, subscriber) + await self.checkPresenceSubscription(node, subscriber) elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: - d = self.checkRosterGroups(node, subscriber) + await self.checkRosterGroups(node, subscriber) elif access_model == const.VAL_AMODEL_WHITELIST: - d = self.checkNodeAffiliations(node, subscriber) + await self.checkNodeAffiliations(node, subscriber) else: raise NotImplementedError - def trapExists(failure): - failure.trap(error.SubscriptionExists) - return False + config = {} + if options and options.get(f"{{{const.NS_PPS}}}public"): + config["public"] = True + try: + await node.addSubscription(subscriber, 'subscribed', config) + except error.SubscriptionExists: + send_last = False + else: + send_last = True - def cb(sendLast): - d = node.getSubscription(subscriber) - if sendLast: - d.addCallback(self._sendLastPublished, node, pep, recipient) - return d - - d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {})) - d.addCallbacks(lambda _: True, trapExists) - d.addCallback(cb) - - return d - - def _sendLastPublished(self, subscription, node, pep, recipient): + subscription = await node.getSubscription(subscriber) - def notifyItem(items_data): - if items_data: - reactor.callLater(0, self.dispatch, - {'items_data': items_data, - 'node': node, - 'pep': pep, - 'recipient': recipient, - 'subscription': subscription, - }, - '//event/pubsub/notify') - - config = node.getConfiguration() - sendLastPublished = config.get('pubsub#send_last_published_item', - 'never') - if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': - entity = subscription.subscriber.userhostJID() - d = defer.ensureDeferred( - self.getItemsData( - node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep} + if send_last: + config = node.getConfiguration() + sendLastPublished = config.get( + 'pubsub#send_last_published_item', 'never' + ) + if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': + entity = subscription.subscriber.userhostJID() + items_data, __ = await self.getItemsData( + node.nodeIdentifier, entity, recipient, maxItems=1, + ext_data={'pep': pep} ) - ) - d.addCallback(notifyItem) - d.addErrback(log.err) + if items_data: + reactor.callLater( + 0, + self.dispatch, + {'items_data': items_data, + 'node': node, + 'pep': pep, + 'recipient': recipient, + 'subscription': subscription, + }, + '//event/pubsub/notify' + ) return subscription - def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): + async def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): if subscriber.userhostJID() != requestor.userhostJID(): - return defer.fail(error.Forbidden()) + raise error.Forbidden() - d = self.storage.getNode(nodeIdentifier, pep, recipient) - d.addCallback(lambda node: node.removeSubscription(subscriber)) - return d + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + await node.removeSubscription(subscriber) + return pubsub.Subscription(nodeIdentifier, subscriber, "none") def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): """retrieve subscriptions of an entity @@ -685,7 +687,9 @@ @param pep(bool): True if it's a PEP request @param recipient(jid.JID, None): recipient of the PEP request """ - return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) + return self.storage.getSubscriptions( + requestor, nodeIdentifier, None, pep, recipient + ) def supportsAutoCreate(self): return True @@ -749,6 +753,10 @@ if not nodeIdentifier: return defer.fail(error.NoRootNode()) + if ((nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS + or nodeIdentifier.startswith(const.PPS_SUBSCRIBERS_PREFIX))): + return defer.succeed({const.OPT_ACCESS_MODEL: const.VAL_AMODEL_OPEN}) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getConfiguration()) @@ -1067,15 +1075,19 @@ ) return ids - def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, - itemIdentifiers=None, ext_data=None): - d = defer.ensureDeferred( - self.getItemsData( - nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data - ) + async def getItems( + self, + nodeIdentifier: str, + requestor: jid.JID, + recipient: jid.JID, + maxItems: Optional[int] = None, + itemIdentifiers: Optional[List[str]] = None, + ext_data: Optional[dict] = None + ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: + items_data, rsm_response = await self.getItemsData( + nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data ) - d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) - return d + return [item_data.item for item_data in items_data], rsm_response async def getOwnerRoster(self, node, owners=None): # FIXME: roster of publisher, not owner, must be used @@ -1099,20 +1111,40 @@ return return roster - async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, - itemIdentifiers=None, ext_data=None): + async def getItemsData( + self, + nodeIdentifier: str, + requestor: jid.JID, + recipient: jid.JID, + maxItems: Optional[int] = None, + itemIdentifiers: Optional[List[str]] = None, + ext_data: Optional[dict] = None + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: """like getItems but return the whole ItemData""" if maxItems == 0: log.msg("WARNING: maxItems=0 on items retrieval") - return [] + return [], None if ext_data is None: ext_data = {} + + if nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS: + return await self.getPublicSubscriptions( + requestor, maxItems, itemIdentifiers, ext_data, + ext_data.pop("pep", False), recipient + ) + elif nodeIdentifier.startswith(f"{const.NS_PPS_SUBSCRIBERS}/"): + target_node = nodeIdentifier[len(const.NS_PPS_SUBSCRIBERS)+1:] + return await self.getPublicNodeSubscriptions( + target_node, requestor, maxItems, itemIdentifiers, ext_data, + ext_data.pop("pep", False), recipient + ) + node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) try: affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor) except error.NotLeafNodeError: - return [] + return [], None # at this point node access is checked @@ -1155,9 +1187,9 @@ if schema is not None: self.filterItemsWithSchema(items_data, schema, owner) - await self._items_rsm( - items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) - return items_data + return await self._items_rsm( + items_data, node, requestor_groups, owner, itemIdentifiers, ext_data + ) def _setCount(self, value, response): response.count = value @@ -1180,48 +1212,135 @@ rsm_request = ext_data['rsm'] except KeyError: # No RSM in this request, nothing to do - return items_data + return items_data, None if itemIdentifiers: log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") - return items_data + return items_data, None - response = rsm.RSMResponse() + rsm_response = rsm.RSMResponse() d_count = node.getItemsCount(authorized_groups, owner, ext_data) - d_count.addCallback(self._setCount, response) + d_count.addCallback(self._setCount, rsm_response) d_list = [d_count] if items_data: - response.first = items_data[0].item['id'] - response.last = items_data[-1].item['id'] + rsm_response.first = items_data[0].item['id'] + rsm_response.last = items_data[-1].item['id'] # index handling if rsm_request.index is not None: - response.index = rsm_request.index + rsm_response.index = rsm_request.index elif rsm_request.before: # The last page case (before == '') is managed in render method d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) - d_index.addCallback(self._setIndex, response, -len(items_data)) + d_index.addCallback(self._setIndex, rsm_response, -len(items_data)) d_list.append(d_index) elif rsm_request.after is not None: d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) - d_index.addCallback(self._setIndex, response, 1) + d_index.addCallback(self._setIndex, rsm_response, 1) d_list.append(d_index) else: # the first page was requested - response.index = 0 + rsm_response.index = 0 await defer.DeferredList(d_list) if rsm_request.before == '': # the last page was requested - response.index = response.count - len(items_data) + rsm_response.index = rsm_response.count - len(items_data) + + return items_data, rsm_response + + def addEltFromSubDict( + self, + parent_elt: domish.Element, + from_jid: Optional[jid.JID], + sub_dict: dict[str, str], + namespace: Optional[str] = None, + ) -> None: + """Generate <subscription> element from storage.getAllSubscriptions's dict + + @param parent_elt: element where the new subscription element must be added + @param sub_dict: subscription data as returned by storage.getAllSubscriptions + @param namespace: if not None, namespace to use for <subscription> element + @param service_attribute: name of the attribute to use for the subscribed service + """ + subscription_elt = parent_elt.addElement( + "subscription" if namespace is None else (namespace, "subscription") + ) + if from_jid is not None: + subscription_elt["jid"] = from_jid.userhost() + if sub_dict["node"] is not None: + if sub_dict["pep"] is not None: + subscription_elt["service"] = sub_dict["pep"] + else: + subscription_elt["service"] = self.jid.full() + subscription_elt["node"] = sub_dict["node"] + else: + subscription_elt["service"] = sub_dict["ext_service"] + subscription_elt["node"] = sub_dict["ext_node"] + subscription_elt["subscription"] = sub_dict["state"] + + async def getPublicSubscriptions( + self, + requestor: jid.JID, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + ext_data: dict, + pep: bool, + recipient: jid.JID + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: - items_data.append(container.ItemData(response.toElement())) + if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): + raise NotImplementedError( + "item identifiers, RSM and MAM are not implemented yet" + ) + + if not pep: + return [], None + + subs = await self.storage.getAllSubscriptions(recipient, True) + items_data = [] + for sub in subs: + if sub["state"] != "subscribed": + continue + item = domish.Element((pubsub.NS_PUBSUB, "item")) + item["id"] = sub["id"] + self.addEltFromSubDict(item, None, sub, const.NS_PPS) + items_data.append(container.ItemData(item)) + + return items_data, None - return items_data + async def getPublicNodeSubscriptions( + self, + nodeIdentifier: str, + requestor: jid.JID, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + ext_data: dict, + pep: bool, + recipient: jid.JID + ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]: + + if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"): + raise NotImplementedError( + "item identifiers, RSM and MAM are not implemented yet" + ) + + node = await self.storage.getNode(nodeIdentifier, pep, recipient) + + subs = await node.getSubscriptions(public=True) + items_data = [] + for sub in subs: + item = domish.Element((pubsub.NS_PUBSUB, "item")) + item["id"] = sub.id + subscriber_elt = item.addElement((const.NS_PPS, "subscriber")) + subscriber_elt["jid"] = sub.subscriber.full() + items_data.append(container.ItemData(item)) + + return items_data, None async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): node = await self.storage.getNode(nodeIdentifier, pep, recipient) @@ -1824,19 +1943,26 @@ return d.addErrback(self._mapErrors) def subscribe(self, request): - d = self.backend.subscribe(request.nodeIdentifier, - request.subscriber, - request.sender, - self._isPep(request), - request.recipient) + d = defer.ensureDeferred( + self.backend.subscribe( + request.nodeIdentifier, + request.subscriber, + request.sender, + request.options, + self._isPep(request), + request.recipient + ) + ) return d.addErrback(self._mapErrors) def unsubscribe(self, request): - d = self.backend.unsubscribe(request.nodeIdentifier, + d = defer.ensureDeferred( + self.backend.unsubscribe(request.nodeIdentifier, request.subscriber, request.sender, self._isPep(request), request.recipient) + ) return d.addErrback(self._mapErrors) def subscriptions(self, request): @@ -1933,12 +2059,16 @@ except AttributeError: pass ext_data['order_by'] = request.orderBy or [] - d = self.backend.getItems(request.nodeIdentifier, - request.sender, - request.recipient, - request.maxItems, - request.itemIdentifiers, - ext_data) + d = defer.ensureDeferred( + self.backend.getItems( + request.nodeIdentifier, + request.sender, + request.recipient, + request.maxItems, + request.itemIdentifiers, + ext_data + ) + ) return d.addErrback(self._mapErrors) def retract(self, request): @@ -1987,7 +2117,8 @@ # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome disco.DiscoFeature(const.NS_PUBSUB_RSM), disco.DiscoFeature(pubsub.NS_ORDER_BY), - disco.DiscoFeature(const.NS_FDP) + disco.DiscoFeature(const.NS_FDP), + disco.DiscoFeature(const.NS_PPS) ] def getDiscoItems(self, requestor, service, nodeIdentifier=''):
--- a/sat_pubsub/const.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/const.py Wed May 11 13:39:08 2022 +0200 @@ -59,8 +59,13 @@ NS_PUBSUB_RSM = "http://jabber.org/protocol/pubsub#rsm" NS_FORWARD = 'urn:xmpp:forward:0' NS_FDP = 'urn:xmpp:fdp:0' +NS_PPS = 'urn:xmpp:pps:0' +NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0" +NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0" NS_SCHEMA_RESTRICT = 'https://salut-a-toi/protocol/schema#restrict:0' +PPS_SUBSCRIBERS_PREFIX = f"{NS_PPS_SUBSCRIBERS}/" + FDP_TEMPLATE_PREFIX = "fdp/template/" FDP_SUBMITTED_PREFIX = "fdp/submitted/"
--- a/sat_pubsub/delegation.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/delegation.py Wed May 11 13:39:08 2022 +0200 @@ -20,16 +20,19 @@ # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service -from wokkel.subprotocols import XMPPHandler +from typing import Callable, Any + +from twisted.python import log +from twisted.internet import reactor, defer +from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish from wokkel import pubsub from wokkel import data_form -from wokkel import disco, iwokkel, generic -from wokkel.iwokkel import IPubSubService +from wokkel import disco, iwokkel from wokkel import mam -from twisted.python import log -from twisted.words.protocols.jabber import ijabber, jid, error -from twisted.words.protocols.jabber.xmlstream import toResponse -from twisted.words.xish import domish +from wokkel.iwokkel import IPubSubService +from wokkel.subprotocols import XMPPHandler from zope.interface import implementer DELEGATION_NS = 'urn:xmpp:delegation:2' @@ -40,6 +43,7 @@ DELEGATION_MAIN_SEP = "::" DELEGATION_BARE_SEP = ":bare:" +SEND_HOOK_TIMEOUT = 300 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), (mam.IMAMService, mam, "MAMRequest"), (None, disco, "_DiscoRequest")) @@ -108,10 +112,28 @@ self._service_hack() self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) - self._current_iqs = {} # dict of iq being handler by delegation - self._xs_send = self.xmlstream.send + self._current_iqs = {} # dict of iq being handled by delegation + self.xs_send = self.xmlstream.send self.xmlstream.send = self._sendHack + def delegatedResult( + self, + iq_req_elt: domish.Element, + iq_resp_elt: domish.Element, + wrapping_iq_elt: domish.Element + ) -> None: + """Method called when a result to a delegated stanza has been received + + The result is wrapped and sent back to server + """ + iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result') + fwd_elt = iq_result_elt.addElement( + 'delegation', DELEGATION_NS + ).addElement('forwarded', FORWARDED_NS) + fwd_elt.addChild(iq_resp_elt) + iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client' + self.xs_send(iq_result_elt) + def _sendHack(self, elt): """This method is called instead of xmlstream to control sending @@ -119,24 +141,24 @@ """ if isinstance(elt, domish.Element) and elt.name=='iq': try: - id_ = elt.getAttribute('id') - ori_iq, managed_entity = self._current_iqs[id_] - if jid.JID(elt['to']) != managed_entity: - log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" - .format(got=jid.JID(elt['to']), expected=managed_entity)) + iq_id = elt["id"] + iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id] + if elt['to'] != iq_req_elt["from"]: + log.err( + "IQ id conflict: the managed entity doesn't match (got " + f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})" + ) raise KeyError except KeyError: # the iq is not a delegated one - self._xs_send(elt) + self.xs_send(elt) else: - del self._current_iqs[id_] - iq_result_elt = toResponse(ori_iq, 'result') - fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) - fwd_elt.addChild(elt) - elt.uri = elt.defaultUri = 'jabber:client' - self._xs_send(iq_result_elt) + if not timeout.called: + timeout.cancel() + del self._current_iqs[iq_id] + callback(iq_req_elt, elt, *cb_args) else: - self._xs_send(elt) + self.xs_send(elt) def _obsWrapper(self, observer, stanza): """Wrapper to observer which catch StanzaError @@ -147,7 +169,7 @@ observer(stanza) except error.StanzaError as e: error_elt = e.toResponse(stanza) - self._xs_send(error_elt) + self.xs_send(error_elt) stanza.handled = True def onAdvertise(self, message): @@ -186,12 +208,39 @@ log.msg("Invalid stanza received ({})".format(e)) log.msg('delegations updated:\n{}'.format( - '\n'.join([" - namespace {}{}".format(ns, + '\n'.join(["- namespace {}{}".format(ns, "" if not attributes else " with filtering on {} attribute(s)".format( ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) if not pubsub.NS_PUBSUB in delegated: - log.msg("Didn't got pubsub delegation from server, can't act as a PEP service") + log.msg( + "Didn't got pubsub delegation from server, can't act as a PEP service" + ) + + def registerSendHook( + self, + iq_elt: domish.Element, + callback: Callable[[domish.Element, domish.Element, ...], None], + *args + ) -> None: + """Register a methode to call when an IQ element response is received + + If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted + @param iq_elt: source IQ element sent. Its "id" attribute will be used to track + response + @param callback: method to call when answer is seen + Will be called with: + - original IQ request + - received IQ result (or error) + - optional extra arguments + self.xs_send should be used to send final result + @param *args: argument to use with callback + """ + iq_id = iq_elt["id"] + timeout = reactor.callLater( + SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None) + ) + self._current_iqs[iq_id] = (iq_elt, callback, args, timeout) def onForward(self, iq): """Manage forwarded iq @@ -210,9 +259,7 @@ except StopIteration: raise error.StanzaError('not-acceptable') - managed_entity = jid.JID(fwd_iq['from']) - - self._current_iqs[fwd_iq['id']] = (iq, managed_entity) + self.registerSendHook(fwd_iq, self.delegatedResult, iq) fwd_iq.delegated = True # we need a recipient in pubsub request for PEP
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/pam.py Wed May 11 13:39:08 2022 +0200 @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2015-2022 Jérôme Poisson + + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"This module implements XEP-0376 (Pubsub Account Management)" + +from typing import Optional + +from twisted.internet import defer +from twisted.python import log +from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error +from twisted.words.xish import domish +from wokkel import disco, iwokkel, pubsub, data_form +from wokkel.iwokkel import IPubSubService +from zope.interface import implementer + +from sat_pubsub import error + +NS_PAM = "urn:xmpp:pam:0" +PAM_SUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_UNSUB_XPATH = ( + f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]' +) +PAM_SUBSCRIPTIONS_XPATH = ( + f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]' +) + + +@implementer(iwokkel.IDisco) +class PAMHandler(disco.DiscoClientProtocol): + + def __init__(self, service_jid): + super(PAMHandler, self).__init__() + self.backend = None + + def connectionInitialized(self): + for handler in self.parent.handlers: + if IPubSubService.providedBy(handler): + self._pubsub_service = handler + break + self.backend = self.parent.parent.getServiceNamed('backend') + self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe) + self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe) + self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions) + + def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]: + """Get JID of sender if it's a user from our server + + If it's a user from an external server, None is returned and a message is log + """ + from_jid = jid.JID(iq_elt["from"]) + if not self.backend.isFromServer(from_jid): + log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}") + else: + return jid.JID(iq_elt["from"]) + + def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt): + destinee_jid = jid.JID(iq_req_elt["from"]) + sender_jid = jid.JID(iq_req_elt["to"]) + message_elt = domish.Element((None, "message")) + message_elt["to"] = destinee_jid.userhost() + message_elt["from"] = destinee_jid.userhost() + # XXX: we explicitely store the notification to be sure that all clients get it + message_elt.addElement(("urn:xmpp:hints", "store")) + notify_elt = message_elt.addElement((NS_PAM, "notify")) + notify_elt["service"] = sender_jid.full() + notify_elt.addChild(iq_result_elt.pubsub.subscription) + self.backend.privilege.sendMessage(message_elt) + pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result') + self.xmlstream.send(pam_iq_result_elt) + + async def onSubRequest(self, from_jid, iq_elt, subscribe=True): + try: + service_jid = jid.JID(iq_elt.pam.getAttribute("jid")) + except RuntimeError: + log.msg( + f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}' + ) + return + iq_elt.handled = True + new_iq_elt = domish.Element((None, "iq")) + new_iq_elt["from"] = from_jid.userhost() + new_iq_elt["to"] = service_jid.full() + new_iq_elt.addUniqueId() + new_iq_elt["type"] = "set" + new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub")) + new_pubsub_elt.addChild( + iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe + ) + try: + options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options")) + except StopIteration: + options_elt = None + else: + new_pubsub_elt.addChild(options_elt) + + if self.backend.isFromServer(service_jid): + # the request is handled locally + new_iq_elt.delegated = True + self.backend.delegation.registerSendHook( + new_iq_elt, self.onSubscribeResult, iq_elt + ) + self.xmlstream.dispatch(new_iq_elt) + else: + # this is a (un)subscribe request to an external server + sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt) + if sub_result_elt["type"] == "result": + if subscribe: + node = new_iq_elt.pubsub.subscribe["node"] + state = sub_result_elt.pubsub.subscription.getAttribute( + "subscription", "subscribed" + ) + public = False + if options_elt is not None: + options_form = data_form.findForm( + options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS + ) + if options_form is not None: + public = options_form.get(f"{{{const.NS_PPS}}}public", False) + await self.backend.storage.addExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + state, + public + ) + else: + node = new_iq_elt.pubsub.unsubscribe["node"] + try: + await self.backend.storage.removeExternalSubscription( + from_jid.userhostJID(), + service_jid, + node, + ) + except error.NotSubscribed: + pass + self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt) + + def _onSubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt)) + + def _onUnsubscribe(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False)) + + async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None: + iq_elt.handled = True + try: + subs = await self.backend.storage.getAllSubscriptions(from_jid) + except Exception as e: + error_elt = jabber_error.StanzaError( + "internal-server-error", + text=str(e) + ).toResponse(iq_elt) + self.xmlstream.send(error_elt) + else: + result_elt = xmlstream.toResponse(iq_elt, "result") + subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions")) + for sub in subs: + self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub) + self.xmlstream.send(result_elt) + + def _onSubscriptions(self, iq_elt: domish.Element) -> None: + if not iq_elt.delegated: + return + from_jid = self.getServerUser(iq_elt) + if from_jid is not None: + defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt)) + + def getDiscoInfo(self, requestor, service, nodeIdentifier=''): + return [disco.DiscoFeature(NS_PAM)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=''): + return []
--- a/sat_pubsub/pgsql_storage.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/pgsql_storage.py Wed May 11 13:39:08 2022 +0200 @@ -49,7 +49,7 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - +from typing import Optional, List import copy, logging from datetime import datetime, timezone @@ -80,7 +80,7 @@ parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) ITEMS_SEQ_NAME = 'node_{node_id}_seq' PEP_COL_NAME = 'pep' -CURRENT_VERSION = '9' +CURRENT_VERSION = '10' # retrieve the maximum integer item id + 1 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" @@ -406,8 +406,15 @@ rows = cursor.fetchall() return [tuple(r) for r in rows] - def getSubscriptions(self, entity, nodeIdentifier=None, pep=False, recipient=None): - """retrieve subscriptions of an entity + def getSubscriptions( + self, + entity: jid.JID, + nodeIdentifier: Optional[str] = None, + public: Optional[bool] = None, + pep: bool = False, + recipient: Optional[jid.JID]=None + ) -> List[Subscription]: + """Retrieve local subscriptions of an entity @param entity(jid.JID): entity to check @param nodeIdentifier(unicode, None): node identifier @@ -425,25 +432,136 @@ subscriptions.append(subscription) return subscriptions - query = ["""SELECT node, + query = ["""SELECT nodes.node, jid, resource, state FROM entities NATURAL JOIN subscriptions - NATURAL JOIN nodes - WHERE jid=%s"""] - + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + WHERE jid=%s AND subscriptions.node_id IS NOT NULL"""] args = [entity.userhost()] + if public is not None: + query.append("AND subscriptions.public=%s") + args.append(public) + if nodeIdentifier is not None: - query.append("AND node=%s") + query.append("AND nodes.node=%s") args.append(nodeIdentifier) d = self.dbpool.runQuery(*withPEP(' '.join(query), args, pep, recipient)) d.addCallback(toSubscriptions) return d + async def getAllSubscriptions( + self, + entity: jid.JID, + public: Optional[bool] = None + ): + query = """SELECT subscription_id::text as id, + node, + pep, + ext_service, + ext_node, + state + FROM entities + NATURAL JOIN subscriptions + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + WHERE jid=%s""" + args = [entity.userhost()] + if public is not None: + query += "AND public=%s" + args.append(public) + rows = await self.dbpool.runQuery(query, args) + return [r._asdict() for r in rows] + + def addExternalSubscription( + self, + entity: jid.JID, + service: jid.JID, + node: str, + state: str, + public: bool = False + ) -> defer.Deferred: + """Store a subscription to an external node + + @param entity: entity being subscribed + @param service: pubsub service hosting the node + @param node: pubsub node being subscribed to + @param state: state of the subscription + @param public: True if the subscription is publicly visible + """ + return self.dbpool.runInteraction( + self._addExternalSubscription, + entity, service, node, state, public + ) + + def _addExternalSubscription( + self, + cursor, + entity: jid.JID, + service: jid.JID, + node: str, + state: str, + public: bool + ) -> None: + + try: + cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", + (entity.userhost(),)) + except cursor._pool.dbapi.IntegrityError: + cursor.connection.rollback() + + cursor.execute("""INSERT INTO subscriptions + (ext_service, ext_node, entity_id, state, public) + SELECT %s, %s, entity_id, %s, %s FROM + (SELECT entity_id FROM entities + WHERE jid=%s) AS ent_id + ON CONFLICT(entity_id, ext_service, ext_node) DO UPDATE SET public=EXCLUDED.public""", + (service.full(), + node, + state, + public, + entity.userhost() + )) + + def removeExternalSubscription( + self, + entity: jid.JID, + service: jid.JID, + node: str, + ) -> defer.Deferred: + """Remove a subscription from an external node + + @param entity: entity being unsubscribed + @param service: pubsub service hosting the node + @param node: pubsub node being unsubscribed to + """ + return self.dbpool.runInteraction( + self._removeExternalSubscription, + entity, service, node + ) + + def _removeExternalSubscription( + self, + cursor, + entity: jid.JID, + service: jid.JID, + node: str, + ) -> None: + cursor.execute("""DELETE FROM subscriptions WHERE + ext_service=%s AND + ext_node=%s AND + entity_id=(SELECT entity_id FROM entities + WHERE jid=%s) + """, + (service.full(), + node, + entity.userhost())) + if cursor.rowcount != 1: + raise error.NotSubscribed() + def getDefaultConfiguration(self, nodeType): return self.defaultConfig[nodeType].copy() @@ -683,9 +801,9 @@ resource = subscriber.resource or '' cursor.execute("""SELECT state FROM subscriptions - NATURAL JOIN nodes + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id NATURAL JOIN entities - WHERE node_id=%s AND jid=%s AND resource=%s""", + WHERE subscriptions.node_id=%s AND jid=%s AND resource=%s""", (self.nodeDbId, userhost, resource)) @@ -696,25 +814,38 @@ else: return Subscription(self.nodeIdentifier, subscriber, row[0]) - def getSubscriptions(self, state=None): - return self.dbpool.runInteraction(self._getSubscriptions, state) + def getSubscriptions( + self, + state: Optional[str]=None, + public: Optional[bool] = None + ) -> List[Subscription]: + return self.dbpool.runInteraction(self._getSubscriptions, state, public) - def _getSubscriptions(self, cursor, state): + def _getSubscriptions( + self, + cursor, + state: Optional[str], + public: Optional[bool] = None, + ) -> List[Subscription]: self._checkNodeExists(cursor) - query = """SELECT node, jid, resource, state, + query = ["""SELECT subscription_id::text, nodes.node, jid, resource, state, subscription_type, subscription_depth - FROM subscriptions - NATURAL JOIN nodes - NATURAL JOIN entities - WHERE node_id=%s""" + FROM subscriptions + LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id + NATURAL JOIN entities + WHERE subscriptions.node_id=%s"""] values = [self.nodeDbId] if state: - query += " AND state=%s" + query.append("AND state=%s") values.append(state) - cursor.execute(query, values) + if public is not None: + query.append("AND public=%s") + values.append(public) + + cursor.execute(" ".join(query), values) rows = cursor.fetchall() subscriptions = [] @@ -727,8 +858,9 @@ if row.subscription_depth: options['pubsub#subscription_depth'] = row.subscription_depth; - subscriptions.append(Subscription(row.node, subscriber, - row.state, options)) + subscription = Subscription(row.node, subscriber, row.state, options) + subscription.id = row.subscription_id + subscriptions.append(subscription) return subscriptions @@ -744,6 +876,7 @@ subscription_type = config.get('pubsub#subscription_type') subscription_depth = config.get('pubsub#subscription_depth') + public = config.get("public", False) try: cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", @@ -751,20 +884,31 @@ except cursor._pool.dbapi.IntegrityError: cursor.connection.rollback() - try: - cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, state, - subscription_type, subscription_depth) - SELECT %s, entity_id, %s, %s, %s, %s FROM - (SELECT entity_id FROM entities - WHERE jid=%s) AS ent_id""", - (self.nodeDbId, - resource, - state, - subscription_type, - subscription_depth, - userhost)) - except cursor._pool.dbapi.IntegrityError: + # the RETURNING trick to detect INSERT vs UPDATE comes from + # https://stackoverflow.com/a/47001830/4188764 thanks! + cursor.execute("""INSERT INTO subscriptions + (node_id, entity_id, resource, state, + subscription_type, subscription_depth, public) + SELECT %s, entity_id, %s, %s, %s, %s, %s FROM + (SELECT entity_id FROM entities + WHERE jid=%s) AS ent_id + ON CONFLICT (entity_id, node_id, resource) DO UPDATE SET public=EXCLUDED.public + RETURNING (xmax = 0) AS inserted""", + (self.nodeDbId, + resource, + state, + subscription_type, + subscription_depth, + public, + userhost)) + + rows = cursor.fetchone() + if not rows.inserted: + # this was an update, the subscription was already existing + + # we have to explicitly commit, otherwise the exception raised rollbacks the + # transation + cursor.connection.commit() raise error.SubscriptionExists() def removeSubscription(self, subscriber):
--- a/sat_pubsub/privilege.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/privilege.py Wed May 11 13:39:08 2022 +0200 @@ -19,13 +19,13 @@ "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " "presences" -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Union, Set import time from twisted.internet import defer from twisted.python import log -from twisted.python import failure from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import xmppim from wokkel import pubsub @@ -35,13 +35,14 @@ from .error import NotAllowedError -FORWARDED_NS = 'urn:xmpp:forward:0' -PRIV_ENT_NS = 'urn:xmpp:privilege:1' -PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS) +NS_FORWARDED = 'urn:xmpp:forward:0' +NS_PRIV_ENT = 'urn:xmpp:privilege:2' +PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(NS_PRIV_ENT) ROSTER_NS = 'jabber:iq:roster' PERM_ROSTER = 'roster' PERM_MESSAGE = 'message' PERM_PRESENCE = 'presence' +PERM_IQ = 'iq' ALLOWED_ROSTER = ('none', 'get', 'set', 'both') ALLOWED_MESSAGE = ('none', 'outgoing') ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster') @@ -50,6 +51,13 @@ PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE } +PERMS_BASE : dict[str, Optional[Union[str, Dict[str, Union[str, bool]]]]]= { + PERM_ROSTER: None, + PERM_MESSAGE: None, + PERM_PRESENCE: None, + PERM_IQ: None, +} + # Number of seconds before a roster cache is not considered valid anymore. # We keep this delay to avoid requesting roster too much in a row if an entity is @@ -70,9 +78,7 @@ def __init__(self, service_jid): super(PrivilegesHandler, self).__init__() self.backend = None - self._permissions = {PERM_ROSTER: 'none', - PERM_MESSAGE: 'none', - PERM_PRESENCE: 'none'} + self._permissions = PERMS_BASE.copy() self._pubsub_service = None self.caps_map = {} # key: bare jid, value: dict of resources with caps hash # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to @@ -120,34 +126,69 @@ self._permissions will be updated according to advertised privileged """ - privilege_elt = next(message.elements(PRIV_ENT_NS, 'privilege')) - for perm_elt in privilege_elt.elements(PRIV_ENT_NS): + self._permissions = PERMS_BASE.copy() + privilege_elt = next(message.elements(NS_PRIV_ENT, 'privilege')) + for perm_elt in privilege_elt.elements(NS_PRIV_ENT, 'perm'): try: - if perm_elt.name != 'perm': - raise InvalidStanza('unexpected element {}'.format(perm_elt.name)) - perm_access = perm_elt['access'] - perm_type = perm_elt['type'] + perm_access = perm_elt["access"] + except KeyError: + log.err(f"missing 'access' attribute in perm element: {perm_elt.toXml()}") + continue + if perm_access in (PERM_ROSTER, PERM_MESSAGE, PERM_PRESENCE): try: + perm_type = perm_elt["type"] + except KeyError: + log.err( + "missing 'type' attribute in perm element: " + f"{perm_elt.toXml()}" + ) + continue + else: if perm_type not in TO_CHECK[perm_access]: - raise InvalidStanza( - 'bad type [{}] for permission {}' - .format(perm_type, perm_access) + log.err( + f'bad type {perm_type!r}: {perm_elt.toXml()}' + ) + continue + self._permissions[perm_access] = perm_type or None + elif perm_access == "iq": + iq_perms = self._permissions["iq"] = {} + for namespace_elt in perm_elt.elements(NS_PRIV_ENT, "namespace"): + ns = namespace_elt.getAttribute("ns") + perm_type = namespace_elt.getAttribute("type") + if not ns or not perm_type: + log.err( + f"invalid namespace element: {namespace_elt.toXml()}" ) - except KeyError: - raise InvalidStanza('bad permission [{}]'.format(perm_access)) - except InvalidStanza as e: - log.msg( - f"Invalid stanza received ({e}), setting permission to none" - ) - for perm in self._permissions: - self._permissions[perm] = 'none' - break + else: + if perm_type not in ("get", "set", "both"): + log.err( + f"invalid namespace type: {namespace_elt.toXml()}" + ) + else: + ns_perms = iq_perms[ns] = {"type": perm_type} + ns_perms["get"] = perm_type in ("get", "both") + ns_perms["set"] = perm_type in ("set", "both") + else: + log.err(f"unknown {perm_access!r} access: {perm_elt.toXml()}'") - self._permissions[perm_access] = perm_type or 'none' + perms = self._permissions + perms_iq = perms["iq"] + if perms_iq is None: + iq_perm_txt = " no iq perm advertised" + elif not isinstance(perms_iq, dict): + raise ValueError('INTERNAL ERROR: "iq" perm should a dict') + else: + iq_perm_txt = "\n".join( + f" - {ns}: {perms['type']}" + for ns, perms in perms_iq.items() + ) log.msg( - 'Privileges updated: roster={roster}, message={message}, presence={presence}' - .format(**self._permissions) + "Privileges updated:\n" + f"roster: {perms[PERM_ROSTER]}\n" + f"message: {perms[PERM_MESSAGE]}\n" + f"presence: {perms[PERM_PRESENCE]}\n" + f"iq:\n{iq_perm_txt}" ) ## roster ## @@ -308,8 +349,8 @@ if to_jid is None: to_jid = self.backend.server_jid main_message['to'] = to_jid.full() - privilege_elt = main_message.addElement((PRIV_ENT_NS, 'privilege')) - forwarded_elt = privilege_elt.addElement((FORWARDED_NS, 'forwarded')) + privilege_elt = main_message.addElement((NS_PRIV_ENT, 'privilege')) + forwarded_elt = privilege_elt.addElement((NS_FORWARDED, 'forwarded')) priv_message['xmlns'] = 'jabber:client' forwarded_elt.addChild(priv_message) self.send(main_message) @@ -452,6 +493,62 @@ for pep_jid, node, item, item_access_model in last_items: self.notifyPublish(pep_jid, node, [(from_jid, None, [item])]) + ## IQ ## + + async def sendIQ( + self, + priv_iq: domish.Element, + to: Optional[jid.JID] = None + ) -> domish.Element: + """Send privileged IQ stanza + + @param priv_iq: privileged IQ stanza + @param to: bare jid of user on behalf of who the stanza is sent + The stanza will be wrapped and sent to the server. Result/Error stanza will sent + back as return value. + """ + if to is None: + try: + to = jid.JID(priv_iq["from"]) + except (KeyError, RuntimeError): + raise ValueError( + 'no "to" specified, and invalid "to" attribute in priv_iq' + ) + if not to.user or to.resource or to.host != self.backend.server_jid.userhost(): + raise NotAllowedError( + f'"to" attribute must be set to a bare jid of the server, {to} is invalid' + ) + iq_type = priv_iq.getAttribute("type") + if iq_type not in ("get", "set"): + raise ValueError(f"invalid IQ type: {priv_iq.toXml()}") + first_child = priv_iq.firstChildElement() + iq_perms: Optional[dict] = self._permissions[PERM_IQ] + + if ((not iq_perms or first_child is None or first_child.uri is None + or not iq_perms.get(first_child.uri, {}).get(iq_type, False))): + raise NotAllowedError( + "privileged IQ stanza not allowed for this namespace/type combination " + f"{priv_iq.toXml()}" + ) + + main_iq = xmlstream.IQ(self.xmlstream, iq_type) + main_iq.timeout = 120 + privileged_iq_elt = main_iq.addElement((NS_PRIV_ENT, "privileged_iq")) + priv_iq['xmlns'] = 'jabber:client' + privileged_iq_elt.addChild(priv_iq) + ret_elt = await main_iq.send(to.full()) + # we unwrap the result + for name, ns in ( + ("privilege", NS_PRIV_ENT), + ("forwarded", NS_FORWARDED), + ("iq", "jabber:client") + ): + try: + ret_elt = next(ret_elt.elements(ns, name)) + except StopIteration: + raise ValueError(f"Invalid privileged IQ result: {ret_elt.toXml()}") + return ret_elt + ## misc ## async def getAutoSubscribers(
--- a/sat_pubsub/pubsub_admin.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/pubsub_admin.py Wed May 11 13:39:08 2022 +0200 @@ -92,13 +92,13 @@ for item in publish_elt.elements(pubsub.NS_PUBSUB, 'item'): try: requestor = jid.JID(item.attributes.pop('publisher')) + except KeyError: + requestor = from_jid except Exception as e: log.msg("WARNING: invalid jid in publisher ({requestor}): {msg}" .format(requestor=requestor, msg=e)) self.sendError(iq_elt) return - except KeyError: - requestor = from_jid # we don't use a DeferredList because we want to be sure that # each request is done in order
--- a/twisted/plugins/pubsub.py Mon Jan 03 16:48:22 2022 +0100 +++ b/twisted/plugins/pubsub.py Wed May 11 13:39:08 2022 +0200 @@ -232,6 +232,7 @@ from sat_pubsub.backend import BackendService, ExtraDiscoHandler from sat_pubsub.privilege import PrivilegesHandler from sat_pubsub.delegation import DelegationsHandler + from sat_pubsub.pam import PAMHandler if not config['jid'] or not config['xmpp_pwd']: raise usage.UsageError("You must specify jid and xmpp_pwd") @@ -290,6 +291,10 @@ ph.setHandlerParent(cs) bs.privilege = ph + pam = PAMHandler(config["jid"]) + pam.setHandlerParent(cs) + bs.pam = pam + resource = IPubSubResource(bs) resource.hideNodes = config["hide-nodes"] resource.serviceJID = config["jid"]