comparison sat/plugins/plugin_xep_0060.py @ 3849:bc24ce903835

plugin XEP-0060: handle priority in `addManagedNode`: Priority lets order the callback list when an event is received. This is important in some use case, notably when a plugin needs to check the former item before it is deleted from cache or updated. rel 370
author Goffi <goffi@goffi.org>
date Thu, 14 Jul 2022 12:55:30 +0200
parents 853cbaf56e9e
children 8a2c46122a11
comparison
equal deleted inserted replaced
3848:e9c380ef41c8 3849:bc24ce903835
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 19
20 from collections import namedtuple 20 from collections import namedtuple
21 from functools import reduce 21 from functools import reduce
22 from typing import Any, Dict, List, Optional, Tuple, Union 22 from typing import Any, Dict, List, Optional, Tuple, Union, Callable
23 import urllib.error 23 import urllib.error
24 import urllib.parse 24 import urllib.parse
25 import urllib.request 25 import urllib.request
26 26
27 from twisted.internet import defer, reactor 27 from twisted.internet import defer, reactor
418 assert "mam" not in extra 418 assert "mam" not in extra
419 extra["mam"] = mam_request 419 extra["mam"] = mam_request
420 420
421 return Extra(rsm_request, extra) 421 return Extra(rsm_request, extra)
422 422
423 def addManagedNode(self, node, **kwargs): 423 def addManagedNode(
424 self,
425 node: str,
426 priority: int = 0,
427 **kwargs: Callable
428 ):
424 """Add a handler for a node 429 """Add a handler for a node
425 430
426 @param node(unicode): node to monitor 431 @param node: node to monitor
427 all node *prefixed* with this one will be triggered 432 all node *prefixed* with this one will be triggered
433 @param priority: priority of the callback. Callbacks with higher priority will be
434 called first.
428 @param **kwargs: method(s) to call when the node is found 435 @param **kwargs: method(s) to call when the node is found
429 the method must be named after PubSub constants in lower case 436 the method must be named after PubSub constants in lower case
430 and suffixed with "_cb" 437 and suffixed with "_cb"
431 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE 438 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
432 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far 439 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far
435 assert kwargs 442 assert kwargs
436 callbacks = self._node_cb.setdefault(node, {}) 443 callbacks = self._node_cb.setdefault(node, {})
437 for event, cb in kwargs.items(): 444 for event, cb in kwargs.items():
438 event_name = event[:-3] 445 event_name = event[:-3]
439 assert event_name in C.PS_EVENTS 446 assert event_name in C.PS_EVENTS
440 callbacks.setdefault(event_name, []).append(cb) 447 cb_list = callbacks.setdefault(event_name, [])
448 cb_list.append((cb, priority))
449 cb_list.sort(key=lambda c: c[1], reverse=True)
441 450
442 def removeManagedNode(self, node, *args): 451 def removeManagedNode(self, node, *args):
443 """Add a handler for a node 452 """Add a handler for a node
444 453
445 @param node(unicode): node to monitor 454 @param node(unicode): node to monitor
449 try: 458 try:
450 registred_cb = self._node_cb[node] 459 registred_cb = self._node_cb[node]
451 except KeyError: 460 except KeyError:
452 pass 461 pass
453 else: 462 else:
463 removed = False
454 for callback in args: 464 for callback in args:
455 for event, cb_list in registred_cb.items(): 465 for event, cb_list in registred_cb.items():
456 try: 466 to_remove = []
457 cb_list.remove(callback) 467 for cb in cb_list:
458 except ValueError: 468 if cb[0] == callback:
459 pass 469 to_remove.append(cb)
460 else: 470 for cb in to_remove:
461 log.debug( 471 cb_list.remove(cb)
462 "removed callback {cb} for event {event} on node {node}".format( 472 if not cb_list:
463 cb=callback, event=event, node=node 473 del registred_cb[event]
464 ) 474 if not registred_cb:
465 ) 475 del self._node_cb[node]
466 if not cb_list: 476 removed = True
467 del registred_cb[event] 477 break
468 if not registred_cb: 478
469 del self._node_cb[node] 479 if not removed:
470 return 480 log.error(
471 log.error( 481 f"Trying to remove inexistant callback {callback} for node {node}"
472 "Trying to remove inexistant callback {cb} for node {node}".format( 482 )
473 cb=callback, node=node
474 )
475 )
476 483
477 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): 484 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
478 # """Retrieve the name of the nodes that are accessible on the target service. 485 # """Retrieve the name of the nodes that are accessible on the target service.
479 486
480 # @param service (JID): target service 487 # @param service (JID): target service
1612 """ 1619 """
1613 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): 1620 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
1614 if not node.startswith(registered_node): 1621 if not node.startswith(registered_node):
1615 continue 1622 continue
1616 try: 1623 try:
1617 for callback in callbacks_dict[event]: 1624 for callback_data in callbacks_dict[event]:
1618 yield callback 1625 yield callback_data[0]
1619 except KeyError: 1626 except KeyError:
1620 continue 1627 continue
1621 1628
1629 async def _callNodeCallbacks(self, client, event: pubsub.ItemsEvent) -> None:
1630 """Call sequencially event callbacks of a node
1631
1632 Callbacks are called sequencially and not in parallel to be sure to respect
1633 priority (notably for plugin needing to get old items before they are modified or
1634 deleted from cache).
1635 """
1636 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
1637 try:
1638 await utils.asDeferred(callback, client, event)
1639 except Exception as e:
1640 log.error(
1641 f"Error while running items event callback {callback}: {e}"
1642 )
1622 1643
1623 def itemsReceived(self, event): 1644 def itemsReceived(self, event):
1624 log.debug("Pubsub items received") 1645 log.debug("Pubsub items received")
1625 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
1626 d = utils.asDeferred(callback, self.parent, event)
1627 d.addErrback(lambda f: log.error(
1628 f"Error while running items event callback {callback}: {f}"
1629 ))
1630 client = self.parent 1646 client = self.parent
1647 defer.ensureDeferred(self._callNodeCallbacks(client, event))
1631 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1648 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1632 raw_items = [i.toXml() for i in event.items] 1649 raw_items = [i.toXml() for i in event.items]
1633 self.host.bridge.psEventRaw( 1650 self.host.bridge.psEventRaw(
1634 event.sender.full(), 1651 event.sender.full(),
1635 event.nodeIdentifier, 1652 event.nodeIdentifier,