Mercurial > libervia-backend
changeset 3597:5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 29 Jul 2021 22:51:01 +0200 |
parents | 2d97c695af05 |
children | d390ff50af0f |
files | sat/core/constants.py sat/plugins/plugin_pubsub_cache.py sat/plugins/plugin_xep_0060.py |
diffstat | 3 files changed, 784 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/constants.py Thu Jul 29 22:51:01 2021 +0200 +++ b/sat/core/constants.py Thu Jul 29 22:51:01 2021 +0200 @@ -216,6 +216,7 @@ PLUG_TYPE_EXP = "EXP" PLUG_TYPE_SEC = "SEC" PLUG_TYPE_SYNTAXE = "SYNTAXE" + PLUG_TYPE_PUBSUB = "PUBSUB" PLUG_TYPE_BLOG = "BLOG" PLUG_TYPE_IMPORT = "IMPORT" PLUG_TYPE_ENTRY_POINT = "ENTRY_POINT" @@ -367,6 +368,7 @@ ## Common extra keys/values ## KEY_ORDER_BY = "order_by" + KEY_USE_CACHE = "use_cache" ORDER_BY_CREATION = 'creation' ORDER_BY_MODIFICATION = 'modification'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_pubsub_cache.py Thu Jul 29 22:51:01 2021 +0200 @@ -0,0 +1,779 @@ +#!/usr/bin/env python3 + +# Libervia plugin for PubSub Caching +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +from datetime import datetime +from typing import Optional, List, Tuple +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from twisted.internet import defer +from wokkel import pubsub, rsm +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.core.log import getLogger +from sat.core.core_types import SatXMPPEntity +from sat.tools import xml_tools, utils +from sat.tools.common import data_format +from sat.memory.sqla import PubsubNode, PubsubItem, SyncState + + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "PubSub Cache", + C.PI_IMPORT_NAME: "PUBSUB_CACHE", + C.PI_TYPE: C.PLUG_TYPE_PUBSUB, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "PubsubCache", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), +} + +ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") +# maximum of items to cache +CACHE_LIMIT = 5000 +# number of second before a progress caching is considered failed and tried again +PROGRESS_DEADLINE = 60 * 60 * 6 + + + +class PubsubCache: + # TODO: there is currently no notification for (un)subscribe events with XEP-0060, + # but it would be necessary to have this data if some devices unsubscribe a cached + # node, as we can then get out of sync. A protoXEP could be proposed to fix this + # situation. + # TODO: handle configuration events + + def __init__(self, host): + log.info(_("PubSub Cache initialization")) + strategy = host.memory.getConfig(None, "pubsub_cache_strategy") + if strategy == "no_cache": + log.info( + _( + "Pubsub cache won't be used due to pubsub_cache_strategy={value} " + "setting." + ).format(value=repr(strategy)) + ) + self.use_cache = False + else: + self.use_cache = True + self.host = host + self._p = host.plugins["XEP-0060"] + self.analysers = {} + # map for caching in progress (node, service) => Deferred + self.in_progress = {} + self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) + self._p.addManagedNode( + "", + items_cb=self.onItemsEvent, + delete_cb=self.onDeleteEvent, + purge_db=self.onPurgeEvent, + ) + host.bridge.addMethod( + "psCacheGet", + ".plugin", + in_sign="ssiassss", + out_sign="s", + method=self._getItemsFromCache, + async_=True, + ) + host.bridge.addMethod( + "psCacheSync", + ".plugin", + "sss", + out_sign="", + method=self._synchronise, + async_=True, + ) + host.bridge.addMethod( + "psCachePurge", + ".plugin", + "s", + out_sign="", + method=self._purge, + async_=True, + ) + host.bridge.addMethod( + "psCacheReset", + ".plugin", + "", + out_sign="", + method=self._reset, + async_=True, + ) + + def registerAnalyser(self, analyser: dict) -> None: + """Register a new pubsub node analyser + + @param analyser: An analyser is a dictionary which may have the following keys + (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys + must be used): + + :name (str)*: + a unique name for this analyser. This name will be stored in database + to retrieve the analyser when necessary (notably to get the parsing method), + thus it is recommended to use a stable name such as the source plugin name + instead of a name which may change with standard evolution, such as the + feature namespace. + + :type (str)*: + indicates what kind of items we are dealing with. Type must be a human + readable word, as it may be used in searches. Good types examples are + **blog** or **event**. + + :node (str): + prefix of a node name which may be used to identify its type. Example: + *urn:xmpp:microblog:0* (a node starting with this name will be identified as + *blog* node). + + :namespace (str): + root namespace of items. When analysing a node, the first item will be + retrieved. The analyser will be chosen its given namespace match the + namespace of the first child element of ``<item>`` element. + + :to_sync (bool): + if True, the node must be synchronised in cache. The default False value + means that the pubsub service will always be requested. + + :parser (callable): + method (which may be sync, a coroutine or a method returning a "Deferred") + to call to parse the ``domish.Element`` of the item. The result must be + dictionary which can be serialised to JSON. + + The method must have the following signature: + + .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ + service: Optional[jid.JID], node: Optional[str]) \ + -> dict + :noindex: + + :match_cb (callable): + method (which may be sync, a coroutine or a method returning a "Deferred") + called when the analyser matches. The method is called with the curreny + analyse which is can modify **in-place**. + + The method must have the following signature: + + .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None + :noindex: + + @raise exceptions.Conflict: a analyser with this name already exists + """ + + name = analyser.get("name", "").strip().lower() + # we want the normalised name + analyser["name"] = name + if not name: + raise ValueError('"name" is mandatory in analyser') + if "type" not in analyser: + raise ValueError('"type" is mandatory in analyser') + type_test_keys = {"node", "namespace"} + if not type_test_keys.intersection(analyser): + raise ValueError(f'at least one of {type_test_keys} must be used') + if name in self.analysers: + raise exceptions.Conflict( + f"An analyser with the name {name!r} is already registered" + ) + self.analysers[name] = analyser + + async def cacheItems( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode, + items: List[domish.Element] + ) -> None: + try: + parser = self.analysers[pubsub_node.analyser].get("parser") + except KeyError: + parser = None + + if parser is not None: + parsed_items = [ + await utils.asDeferred( + parser, + client, + item, + pubsub_node.service, + pubsub_node.name + ) + for item in items + ] + else: + parsed_items = None + + await self.host.memory.storage.cachePubsubItems( + client, pubsub_node, items, parsed_items + ) + + async def _cacheNode( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode + ) -> None: + await self.host.memory.storage.updatePubsubNodeSyncState( + pubsub_node, SyncState.IN_PROGRESS + ) + service, node = pubsub_node.service, pubsub_node.name + try: + log.debug( + f"Caching node {node!r} at {service} for {client.profile}" + ) + if not pubsub_node.subscribed: + try: + sub = await self._p.subscribe(client, service, node) + except Exception as e: + log.warning( + _( + "Can't subscribe node {pubsub_node}, that means that " + "synchronisation can't be maintained: {reason}" + ).format(pubsub_node=pubsub_node, reason=e) + ) + else: + if sub.state == "subscribed": + sub_id = sub.subscriptionIdentifier + log.debug( + f"{pubsub_node} subscribed (subscription id: {sub_id!r})" + ) + pubsub_node.subscribed = True + await self.host.memory.storage.add(pubsub_node) + else: + log.warning( + _( + "{pubsub_node} is not subscribed, that means that " + "synchronisation can't be maintained, and you may have " + "to enforce subscription manually. Subscription state: " + "{state}" + ).format(pubsub_node=pubsub_node, state=sub.state) + ) + + try: + await self.host.checkFeatures( + client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service + ) + except exceptions.FeatureNotFound: + log.warning( + f"service {service} doesn't handle Result Set Management " + "(XEP-0059), we'll only cache latest 20 items" + ) + items, __ = await client.pubsub_client.items( + pubsub_node.service, pubsub_node.name, maxItems=20 + ) + await self.cacheItems( + client, pubsub_node, items + ) + else: + rsm_p = self.host.plugins["XEP-0059"] + rsm_request = rsm.RSMRequest() + cached_ids = set() + while True: + items, rsm_response = await client.pubsub_client.items( + service, node, rsm_request=rsm_request + ) + await self.cacheItems( + client, pubsub_node, items + ) + for item in items: + item_id = item["id"] + if item_id in cached_ids: + log.warning( + f"Pubsub node {node!r} at {service} is returning several " + f"times the same item ({item_id!r}). This is illegal " + "behaviour, and it means that Pubsub service " + f"{service} is buggy and can't be cached properly. " + f"Please report this to {service.host} administrators" + ) + rsm_request = None + break + cached_ids.add(item["id"]) + if len(cached_ids) >= CACHE_LIMIT: + log.warning( + f"Pubsub node {node!r} at {service} contains more items " + f"than the cache limit ({CACHE_LIMIT}). We stop " + "caching here, at item {item['id']!r}." + ) + rsm_request = None + break + rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) + if rsm_request is None: + break + + await self.host.memory.storage.updatePubsubNodeSyncState( + pubsub_node, SyncState.COMPLETED + ) + except Exception as e: + import traceback + tb = traceback.format_tb(e.__traceback__) + log.error( + f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" + ) + await self.host.memory.storage.updatePubsubNodeSyncState( + pubsub_node, SyncState.ERROR + ) + await self.host.memory.storage.deletePubsubItems(pubsub_node) + raise e + + def _cacheNodeClean(self, __, pubsub_node): + del self.in_progress[(pubsub_node.service, pubsub_node.name)] + + def cacheNode( + self, + client: SatXMPPEntity, + pubsub_node: PubsubNode + ) -> None: + """Launch node caching as a background task""" + d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) + d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) + self.in_progress[(pubsub_node.service, pubsub_node.name)] = d + return d + + async def analyseNode( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + pubsub_node : PubsubNode = None, + ) -> dict: + """Use registered analysers on a node to determine what it is used for""" + analyse = {"service": service, "node": node} + if pubsub_node is None: + try: + first_item = (await client.pubsub_client.items( + service, node, 1 + ))[0][0] + except IndexError: + pass + except error.StanzaError as e: + if e.condition == "item-not-found": + pass + else: + log.warning( + f"Can't retrieve last item on node {node!r} at service " + f"{service} for {client.profile}: {e}" + ) + else: + try: + uri = first_item.firstChildElement().uri + except Exception as e: + log.warning( + f"Can't retrieve item namespace on node {node!r} at service " + f"{service} for {client.profile}: {e}" + ) + else: + analyse["namespace"] = uri + try: + conf = await self._p.getConfiguration(client, service, node) + except Exception as e: + log.warning( + f"Can't retrieve configuration for node {node!r} at service {service} " + f"for {client.profile}: {e}" + ) + else: + analyse["conf"] = conf + + for analyser in self.analysers.values(): + match_cb = analyser.get("match_cb") + try: + an_node = analyser["node"] + except KeyError: + pass + else: + if node.startswith(an_node): + for key in ANALYSER_KEYS_TO_COPY: + try: + analyse[key] = analyser[key] + except KeyError: + pass + found = True + break + try: + namespace = analyse["namespace"] + an_namespace = analyser["namespace"] + except KeyError: + pass + else: + if namespace == an_namespace: + for key in ANALYSER_KEYS_TO_COPY: + try: + analyse[key] = analyser[key] + except KeyError: + pass + found = True + break + + else: + found = False + log.debug( + f"node {node!r} at service {service} doesn't match any known type" + ) + if found: + try: + match_cb = analyser["match_cb"] + except KeyError: + pass + else: + await match_cb(client, analyse) + return analyse + + def _getItemsFromCache( + self, service="", node="", max_items=10, item_ids=None, sub_id=None, + extra="", profile_key=C.PROF_KEY_NONE + ): + d = defer.ensureDeferred(self._aGetItemsFromCache( + service, node, max_items, item_ids, sub_id, extra, profile_key + )) + d.addCallback(self._p.transItemsData) + d.addCallback(self._p.serialiseItems) + return d + + async def _aGetItemsFromCache( + self, service, node, max_items, item_ids, sub_id, extra, profile_key + ): + client = self.host.getClient(profile_key) + service = jid.JID(service) if service else client.jid.userhostJID() + pubsub_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if pubsub_node is None: + raise exceptions.NotFound( + f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" + ) + max_items = None if max_items == C.NO_LIMIT else max_items + extra = self._p.parseExtra(data_format.deserialise(extra)) + items, metadata = await self.getItemsFromCache( + client, + pubsub_node, + max_items, + item_ids, + sub_id or None, + extra.rsm_request, + extra.extra, + ) + return [i.data for i in items], metadata + + async def getItemsFromCache( + self, + client: SatXMPPEntity, + node: PubsubNode, + max_items: Optional[int] = None, + item_ids: Optional[List[str]] = None, + sub_id: Optional[str] = None, + rsm_request: Optional[rsm.RSMRequest] = None, + extra: Optional[dict] = None + ) -> Tuple[List[PubsubItem], dict]: + """Get items from cache, using same arguments as for external Pubsub request""" + if "mam" in extra: + raise NotImplementedError("MAM queries are not supported yet") + if max_items is None and rsm_request is None: + max_items = 20 + if max_items is not None: + if rsm_request is not None: + raise exceptions.InternalError( + "Pubsub max items and RSM must not be used at the same time" + ) + elif item_ids is None: + raise exceptions.InternalError( + "Pubsub max items and item IDs must not be used at the same time" + ) + pubsub_items, metadata = await self.host.memory.storage.getItems( + node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) + ) + else: + desc = False + if rsm_request.before == "": + before = None + desc = True + else: + before = rsm_request.before + pubsub_items, metadata = await self.host.memory.storage.getItems( + node, max_items=rsm_request.max, before=before, after=rsm_request.after, + from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), + desc=desc, force_rsm=True, + ) + + return pubsub_items, metadata + + async def onItemsEvent(self, client, event): + node = await self.host.memory.storage.getPubsubNode( + client, event.sender, event.nodeIdentifier + ) + if node is None: + return + if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): + items = [] + retract_ids = [] + for elt in event.items: + if elt.name == "item": + items.append(elt) + elif elt.name == "retract": + item_id = elt.getAttribute("id") + if not item_id: + log.warning( + "Ignoring invalid retract item element: " + f"{xml_tools.pFmtElt(elt)}" + ) + continue + + retract_ids.append(elt["id"]) + else: + log.warning( + f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" + ) + if items: + log.debug("caching new items received from {node}") + await self.cacheItems( + client, node, items + ) + if retract_ids: + log.debug(f"deleting retracted items from {node}") + await self.host.memory.storage.deletePubsubItems( + node, items_names=retract_ids + ) + + async def onDeleteEvent(self, client, event): + log.debug( + f"deleting node {event.nodeIdentifier} from {event.sender} for " + f"{client.profile}" + ) + await self.host.memory.storage.deletePubsubNode( + [client.profile], [event.sender], [event.nodeIdentifier] + ) + + async def onPurgeEvent(self, client, event): + node = await self.host.memory.storage.getPubsubNode( + client, event.sender, event.nodeIdentifier + ) + if node is None: + return + log.debug(f"purging node {node} for {client.profile}") + await self.host.memory.storage.deletePubsubItems(node) + + async def _getItemsTrigger( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: str, + max_items: Optional[int], + item_ids: Optional[List[str]], + sub_id: Optional[str], + rsm_request: Optional[rsm.RSMRequest], + extra: dict + ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: + if not self.use_cache: + log.debug("cache disabled in settings") + return True, None + if extra.get(C.KEY_USE_CACHE) == False: + log.debug("skipping pubsub cache as requested") + return True, None + if service is None: + service = client.jid.userhostJID() + pubsub_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: + analyse = {"to_sync": True} + else: + analyse = await self.analyseNode(client, service, node) + + if pubsub_node is None: + pubsub_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + subtype=analyse.get("subtype"), + ) + + if analyse.get("to_sync"): + if pubsub_node.sync_state == SyncState.COMPLETED: + if "mam" in extra: + log.debug("MAM caching is not supported yet, skipping cache") + return True, None + pubsub_items, metadata = await self.getItemsFromCache( + client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra + ) + return False, ([i.data for i in pubsub_items], metadata) + + if pubsub_node.sync_state == SyncState.IN_PROGRESS: + if (service, node) not in self.in_progress: + log.warning( + f"{pubsub_node} is reported as being cached, but not caching is " + "in progress, this is most probably due to the backend being " + "restarted. Resetting the status, caching will be done again." + ) + pubsub_node.sync_state = None + await self.host.memory.storage.deletePubsubItems(pubsub_node) + elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: + log.warning( + f"{pubsub_node} is in progress for too long " + f"({pubsub_node.sync_state_updated//60} minutes), " + "cancelling it and retrying." + ) + self.in_progress.pop[(service, node)].cancel() + pubsub_node.sync_state = None + await self.host.memory.storage.deletePubsubItems(pubsub_node) + else: + log.debug( + f"{pubsub_node} synchronisation is already in progress, skipping" + ) + if pubsub_node.sync_state is None: + key = (service, node) + if key in self.in_progress: + raise exceptions.InternalError( + f"There is already a caching in progress for {pubsub_node}, this " + "should not happen" + ) + self.cacheNode(client, pubsub_node) + elif pubsub_node.sync_state == SyncState.ERROR: + log.debug( + f"{pubsub_node} synchronisation has previously failed, skipping" + ) + + return True, None + + async def _subscribeTrigger( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + sub_jid: Optional[jid.JID], + options: Optional[dict], + subscription: pubsub.Subscription + ) -> None: + pass + + async def _unsubscribeTrigger( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + sub_jid, + subscriptionIdentifier, + sender, + ) -> None: + pass + + def _synchronise(self, service, node, profile_key): + client = self.host.getClient(profile_key) + service = client.jid.userhostJID() if not service else jid.JID(service) + return defer.ensureDeferred(self.synchronise(client, service, node)) + + async def synchronise( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str + ) -> None: + """Synchronise a node with a pubsub service + + If the node is already synchronised, it will be resynchronised (all items will be + deleted and re-downloaded). + + The node will be synchronised even if there is no matching analyser. + + Note that when a node is synchronised, it is automatically subscribed. + """ + pubsub_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if pubsub_node is None: + log.info( + _( + "Synchronising the new node {node} at {service}" + ).format(node=node, service=service.full) + ) + analyse = await self.analyseNode(client, service, node) + pubsub_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + ) + + if ((pubsub_node.sync_state == SyncState.IN_PROGRESS + or (service, node) in self.in_progress)): + log.warning( + _( + "{node} at {service} is already being synchronised, can't do a new " + "synchronisation." + ).format(node=node, service=service) + ) + else: + log.info( + _( + "(Re)Synchronising the node {node} at {service} on user request" + ).format(node=node, service=service.full) + ) + # we first delete and recreate the node (will also delete its items) + await self.host.memory.storage.delete(pubsub_node) + analyse = await self.analyseNode(client, service, node) + pubsub_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + ) + # then we can put node in cache + await self.cacheNode(client, pubsub_node) + + async def purge(self, purge_filters: dict) -> None: + """Remove items according to filters + + filters can have on of the following keys, all are optional: + + :services: + list of JIDs of services from which items must be deleted + :nodes: + list of node names to delete + :types: + list of node types to delete + :subtypes: + list of node subtypes to delete + :profiles: + list of profiles from which items must be deleted + :created_before: + datetime before which items must have been created to be deleted + :created_update: + datetime before which items must have been updated last to be deleted + """ + purge_filters["names"] = purge_filters.pop("nodes", None) + await self.host.memory.storage.purgePubsubItems(**purge_filters) + + def _purge(self, purge_filters: str) -> None: + purge_filters = data_format.deserialise(purge_filters) + for key in "created_before", "updated_before": + try: + purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) + except (KeyError, TypeError): + pass + return defer.ensureDeferred(self.purge(purge_filters)) + + async def reset(self) -> None: + """Remove ALL nodes and items from cache + + After calling this method, cache will be refilled progressively as if it where new + """ + await self.host.memory.storage.deletePubsubNode(None, None, None) + + def _reset(self) -> None: + return defer.ensureDeferred(self.reset())
--- a/sat/plugins/plugin_xep_0060.py Thu Jul 29 22:51:01 2021 +0200 +++ b/sat/plugins/plugin_xep_0060.py Thu Jul 29 22:51:01 2021 +0200 @@ -91,6 +91,8 @@ ID_SINGLETON = "current" EXTRA_PUBLISH_OPTIONS = "publish_options" EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" + # extra disco needed for RSM, cf. XEP-0060 § 6.5.4 + DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm" def __init__(self, host): log.info(_("PubSub plugin initialization")) @@ -392,6 +394,7 @@ the method must be named after PubSub constants in lower case and suffixed with "_cb" e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE + note: only C.PS_ITEMS and C.PS_DELETE are implemented so far """ assert node is not None assert kwargs