Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 3849:bc24ce903835
plugin XEP-0060: handle priority in `addManagedNode`:
Priority lets order the callback list when an event is received. This is important in some
use case, notably when a plugin needs to check the former item before it is deleted from
cache or updated.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 14 Jul 2022 12:55:30 +0200 |
parents | 853cbaf56e9e |
children | 8a2c46122a11 |
comparison
equal
deleted
inserted
replaced
3848:e9c380ef41c8 | 3849:bc24ce903835 |
---|---|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 | 19 |
20 from collections import namedtuple | 20 from collections import namedtuple |
21 from functools import reduce | 21 from functools import reduce |
22 from typing import Any, Dict, List, Optional, Tuple, Union | 22 from typing import Any, Dict, List, Optional, Tuple, Union, Callable |
23 import urllib.error | 23 import urllib.error |
24 import urllib.parse | 24 import urllib.parse |
25 import urllib.request | 25 import urllib.request |
26 | 26 |
27 from twisted.internet import defer, reactor | 27 from twisted.internet import defer, reactor |
418 assert "mam" not in extra | 418 assert "mam" not in extra |
419 extra["mam"] = mam_request | 419 extra["mam"] = mam_request |
420 | 420 |
421 return Extra(rsm_request, extra) | 421 return Extra(rsm_request, extra) |
422 | 422 |
423 def addManagedNode(self, node, **kwargs): | 423 def addManagedNode( |
424 self, | |
425 node: str, | |
426 priority: int = 0, | |
427 **kwargs: Callable | |
428 ): | |
424 """Add a handler for a node | 429 """Add a handler for a node |
425 | 430 |
426 @param node(unicode): node to monitor | 431 @param node: node to monitor |
427 all node *prefixed* with this one will be triggered | 432 all node *prefixed* with this one will be triggered |
433 @param priority: priority of the callback. Callbacks with higher priority will be | |
434 called first. | |
428 @param **kwargs: method(s) to call when the node is found | 435 @param **kwargs: method(s) to call when the node is found |
429 the method must be named after PubSub constants in lower case | 436 the method must be named after PubSub constants in lower case |
430 and suffixed with "_cb" | 437 and suffixed with "_cb" |
431 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE | 438 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE |
432 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far | 439 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far |
435 assert kwargs | 442 assert kwargs |
436 callbacks = self._node_cb.setdefault(node, {}) | 443 callbacks = self._node_cb.setdefault(node, {}) |
437 for event, cb in kwargs.items(): | 444 for event, cb in kwargs.items(): |
438 event_name = event[:-3] | 445 event_name = event[:-3] |
439 assert event_name in C.PS_EVENTS | 446 assert event_name in C.PS_EVENTS |
440 callbacks.setdefault(event_name, []).append(cb) | 447 cb_list = callbacks.setdefault(event_name, []) |
448 cb_list.append((cb, priority)) | |
449 cb_list.sort(key=lambda c: c[1], reverse=True) | |
441 | 450 |
442 def removeManagedNode(self, node, *args): | 451 def removeManagedNode(self, node, *args): |
443 """Add a handler for a node | 452 """Add a handler for a node |
444 | 453 |
445 @param node(unicode): node to monitor | 454 @param node(unicode): node to monitor |
449 try: | 458 try: |
450 registred_cb = self._node_cb[node] | 459 registred_cb = self._node_cb[node] |
451 except KeyError: | 460 except KeyError: |
452 pass | 461 pass |
453 else: | 462 else: |
463 removed = False | |
454 for callback in args: | 464 for callback in args: |
455 for event, cb_list in registred_cb.items(): | 465 for event, cb_list in registred_cb.items(): |
456 try: | 466 to_remove = [] |
457 cb_list.remove(callback) | 467 for cb in cb_list: |
458 except ValueError: | 468 if cb[0] == callback: |
459 pass | 469 to_remove.append(cb) |
460 else: | 470 for cb in to_remove: |
461 log.debug( | 471 cb_list.remove(cb) |
462 "removed callback {cb} for event {event} on node {node}".format( | 472 if not cb_list: |
463 cb=callback, event=event, node=node | 473 del registred_cb[event] |
464 ) | 474 if not registred_cb: |
465 ) | 475 del self._node_cb[node] |
466 if not cb_list: | 476 removed = True |
467 del registred_cb[event] | 477 break |
468 if not registred_cb: | 478 |
469 del self._node_cb[node] | 479 if not removed: |
470 return | 480 log.error( |
471 log.error( | 481 f"Trying to remove inexistant callback {callback} for node {node}" |
472 "Trying to remove inexistant callback {cb} for node {node}".format( | 482 ) |
473 cb=callback, node=node | |
474 ) | |
475 ) | |
476 | 483 |
477 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | 484 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): |
478 # """Retrieve the name of the nodes that are accessible on the target service. | 485 # """Retrieve the name of the nodes that are accessible on the target service. |
479 | 486 |
480 # @param service (JID): target service | 487 # @param service (JID): target service |
1612 """ | 1619 """ |
1613 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): | 1620 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): |
1614 if not node.startswith(registered_node): | 1621 if not node.startswith(registered_node): |
1615 continue | 1622 continue |
1616 try: | 1623 try: |
1617 for callback in callbacks_dict[event]: | 1624 for callback_data in callbacks_dict[event]: |
1618 yield callback | 1625 yield callback_data[0] |
1619 except KeyError: | 1626 except KeyError: |
1620 continue | 1627 continue |
1621 | 1628 |
1629 async def _callNodeCallbacks(self, client, event: pubsub.ItemsEvent) -> None: | |
1630 """Call sequencially event callbacks of a node | |
1631 | |
1632 Callbacks are called sequencially and not in parallel to be sure to respect | |
1633 priority (notably for plugin needing to get old items before they are modified or | |
1634 deleted from cache). | |
1635 """ | |
1636 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | |
1637 try: | |
1638 await utils.asDeferred(callback, client, event) | |
1639 except Exception as e: | |
1640 log.error( | |
1641 f"Error while running items event callback {callback}: {e}" | |
1642 ) | |
1622 | 1643 |
1623 def itemsReceived(self, event): | 1644 def itemsReceived(self, event): |
1624 log.debug("Pubsub items received") | 1645 log.debug("Pubsub items received") |
1625 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | |
1626 d = utils.asDeferred(callback, self.parent, event) | |
1627 d.addErrback(lambda f: log.error( | |
1628 f"Error while running items event callback {callback}: {f}" | |
1629 )) | |
1630 client = self.parent | 1646 client = self.parent |
1647 defer.ensureDeferred(self._callNodeCallbacks(client, event)) | |
1631 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1648 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1632 raw_items = [i.toXml() for i in event.items] | 1649 raw_items = [i.toXml() for i in event.items] |
1633 self.host.bridge.psEventRaw( | 1650 self.host.bridge.psEventRaw( |
1634 event.sender.full(), | 1651 event.sender.full(), |
1635 event.nodeIdentifier, | 1652 event.nodeIdentifier, |