Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0060.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 2b000790b197 |
children | 23842a63ea00 |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None | 68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None |
69 # extra is a potentially empty dict | 69 # extra is a potentially empty dict |
70 TIMEOUT = 30 | 70 TIMEOUT = 30 |
71 # minimum features that a pubsub service must have to be selectable as default | 71 # minimum features that a pubsub service must have to be selectable as default |
72 DEFAULT_PUBSUB_MIN_FEAT = { | 72 DEFAULT_PUBSUB_MIN_FEAT = { |
73 'http://jabber.org/protocol/pubsub#persistent-items', | 73 "http://jabber.org/protocol/pubsub#persistent-items", |
74 'http://jabber.org/protocol/pubsub#publish', | 74 "http://jabber.org/protocol/pubsub#publish", |
75 'http://jabber.org/protocol/pubsub#retract-items', | 75 "http://jabber.org/protocol/pubsub#retract-items", |
76 } | 76 } |
77 | |
77 | 78 |
78 class XEP_0060(object): | 79 class XEP_0060(object): |
79 OPT_ACCESS_MODEL = "pubsub#access_model" | 80 OPT_ACCESS_MODEL = "pubsub#access_model" |
80 OPT_PERSIST_ITEMS = "pubsub#persist_items" | 81 OPT_PERSIST_ITEMS = "pubsub#persist_items" |
81 OPT_MAX_ITEMS = "pubsub#max_items" | 82 OPT_MAX_ITEMS = "pubsub#max_items" |
107 def __init__(self, host): | 108 def __init__(self, host): |
108 log.info(_("PubSub plugin initialization")) | 109 log.info(_("PubSub plugin initialization")) |
109 self.host = host | 110 self.host = host |
110 self._rsm = host.plugins.get("XEP-0059") | 111 self._rsm = host.plugins.get("XEP-0059") |
111 self._mam = host.plugins.get("XEP-0313") | 112 self._mam = host.plugins.get("XEP-0313") |
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) | 113 self._node_cb = ( |
114 {} | |
115 ) # dictionnary of callbacks for node (key: node, value: list of callbacks) | |
113 self.rt_sessions = sat_defer.RTDeferredSessions() | 116 self.rt_sessions = sat_defer.RTDeferredSessions() |
114 host.bridge.add_method( | 117 host.bridge.add_method( |
115 "ps_node_create", | 118 "ps_node_create", |
116 ".plugin", | 119 ".plugin", |
117 in_sign="ssa{ss}s", | 120 in_sign="ssa{ss}s", |
374 client = self.host.get_client(profile) | 377 client = self.host.get_client(profile) |
375 except exceptions.ProfileNotSetError: | 378 except exceptions.ProfileNotSetError: |
376 return {} | 379 return {} |
377 try: | 380 try: |
378 return { | 381 return { |
379 "service": client.pubsub_service.full() | 382 "service": ( |
380 if client.pubsub_service is not None | 383 client.pubsub_service.full() |
381 else "" | 384 if client.pubsub_service is not None |
385 else "" | |
386 ) | |
382 } | 387 } |
383 except AttributeError: | 388 except AttributeError: |
384 if self.host.is_connected(profile): | 389 if self.host.is_connected(profile): |
385 log.debug("Profile is not connected, service is not checked yet") | 390 log.debug("Profile is not connected, service is not checked yet") |
386 else: | 391 else: |
421 assert "mam" not in extra | 426 assert "mam" not in extra |
422 extra["mam"] = mam_request | 427 extra["mam"] = mam_request |
423 | 428 |
424 return Extra(rsm_request, extra) | 429 return Extra(rsm_request, extra) |
425 | 430 |
426 def add_managed_node( | 431 def add_managed_node(self, node: str, priority: int = 0, **kwargs: Callable): |
427 self, | |
428 node: str, | |
429 priority: int = 0, | |
430 **kwargs: Callable | |
431 ): | |
432 """Add a handler for a node | 432 """Add a handler for a node |
433 | 433 |
434 @param node: node to monitor | 434 @param node: node to monitor |
435 all node *prefixed* with this one will be triggered | 435 all node *prefixed* with this one will be triggered |
436 @param priority: priority of the callback. Callbacks with higher priority will be | 436 @param priority: priority of the callback. Callbacks with higher priority will be |
512 # """ | 512 # """ |
513 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | 513 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
514 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | 514 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
515 # return d | 515 # return d |
516 | 516 |
517 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", | 517 def _send_item( |
518 profile_key=C.PROF_KEY_NONE): | 518 self, |
519 service, | |
520 nodeIdentifier, | |
521 payload, | |
522 item_id=None, | |
523 extra_ser="", | |
524 profile_key=C.PROF_KEY_NONE, | |
525 ): | |
519 client = self.host.get_client(profile_key) | 526 client = self.host.get_client(profile_key) |
520 service = None if not service else jid.JID(service) | 527 service = None if not service else jid.JID(service) |
521 payload = xml_tools.parse(payload) | 528 payload = xml_tools.parse(payload) |
522 extra = data_format.deserialise(extra_ser) | 529 extra = data_format.deserialise(extra_ser) |
523 d = defer.ensureDeferred(self.send_item( | 530 d = defer.ensureDeferred( |
524 client, service, nodeIdentifier, payload, item_id or None, extra | 531 self.send_item( |
525 )) | 532 client, service, nodeIdentifier, payload, item_id or None, extra |
533 ) | |
534 ) | |
526 d.addCallback(lambda ret: ret or "") | 535 d.addCallback(lambda ret: ret or "") |
527 return d | 536 return d |
528 | 537 |
529 def _send_items(self, service, nodeIdentifier, items, extra_ser=None, | 538 def _send_items( |
530 profile_key=C.PROF_KEY_NONE): | 539 self, service, nodeIdentifier, items, extra_ser=None, profile_key=C.PROF_KEY_NONE |
540 ): | |
531 client = self.host.get_client(profile_key) | 541 client = self.host.get_client(profile_key) |
532 service = None if not service else jid.JID(service) | 542 service = None if not service else jid.JID(service) |
533 try: | 543 try: |
534 items = [xml_tools.parse(item) for item in items] | 544 items = [xml_tools.parse(item) for item in items] |
535 except Exception as e: | 545 except Exception as e: |
536 raise exceptions.DataError(_("Can't parse items: {msg}").format( | 546 raise exceptions.DataError(_("Can't parse items: {msg}").format(msg=e)) |
537 msg=e)) | |
538 extra = data_format.deserialise(extra_ser) | 547 extra = data_format.deserialise(extra_ser) |
539 return defer.ensureDeferred(self.send_items( | 548 return defer.ensureDeferred( |
540 client, service, nodeIdentifier, items, extra=extra | 549 self.send_items(client, service, nodeIdentifier, items, extra=extra) |
541 )) | 550 ) |
542 | 551 |
543 async def send_item( | 552 async def send_item( |
544 self, | 553 self, |
545 client: SatXMPPClient, | 554 client: SatXMPPClient, |
546 service: Union[jid.JID, None], | 555 service: Union[jid.JID, None], |
547 nodeIdentifier: str, | 556 nodeIdentifier: str, |
548 payload: domish.Element, | 557 payload: domish.Element, |
549 item_id: Optional[str] = None, | 558 item_id: Optional[str] = None, |
550 extra: Optional[Dict[str, Any]] = None | 559 extra: Optional[Dict[str, Any]] = None, |
551 ) -> Optional[str]: | 560 ) -> Optional[str]: |
552 """High level method to send one item | 561 """High level method to send one item |
553 | 562 |
554 @param service: service to send the item to None to use PEP | 563 @param service: service to send the item to None to use PEP |
555 @param NodeIdentifier: PubSub node to use | 564 @param NodeIdentifier: PubSub node to use |
557 @param item_id: id to use or None to create one | 566 @param item_id: id to use or None to create one |
558 @param extra: extra options | 567 @param extra: extra options |
559 @return: id of the created item | 568 @return: id of the created item |
560 """ | 569 """ |
561 assert isinstance(payload, domish.Element) | 570 assert isinstance(payload, domish.Element) |
562 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) | 571 item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) |
563 if item_id is not None: | 572 if item_id is not None: |
564 item_elt['id'] = item_id | 573 item_elt["id"] = item_id |
565 item_elt.addChild(payload) | 574 item_elt.addChild(payload) |
566 published_ids = await self.send_items( | 575 published_ids = await self.send_items( |
567 client, | 576 client, service, nodeIdentifier, [item_elt], extra=extra |
568 service, | |
569 nodeIdentifier, | |
570 [item_elt], | |
571 extra=extra | |
572 ) | 577 ) |
573 try: | 578 try: |
574 return published_ids[0] | 579 return published_ids[0] |
575 except IndexError: | 580 except IndexError: |
576 return item_id | 581 return item_id |
580 client: SatXMPPEntity, | 585 client: SatXMPPEntity, |
581 service: Optional[jid.JID], | 586 service: Optional[jid.JID], |
582 nodeIdentifier: str, | 587 nodeIdentifier: str, |
583 items: List[domish.Element], | 588 items: List[domish.Element], |
584 sender: Optional[jid.JID] = None, | 589 sender: Optional[jid.JID] = None, |
585 extra: Optional[Dict[str, Any]] = None | 590 extra: Optional[Dict[str, Any]] = None, |
586 ) -> List[str]: | 591 ) -> List[str]: |
587 """High level method to send several items at once | 592 """High level method to send several items at once |
588 | 593 |
589 @param service: service to send the item to | 594 @param service: service to send the item to |
590 None to use PEP | 595 None to use PEP |
606 extra = {} | 611 extra = {} |
607 if service is None: | 612 if service is None: |
608 service = client.jid.userhostJID() | 613 service = client.jid.userhostJID() |
609 parsed_items = [] | 614 parsed_items = [] |
610 for item in items: | 615 for item in items: |
611 if item.name != 'item': | 616 if item.name != "item": |
612 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) | 617 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) |
613 item_id = item.getAttribute("id") | 618 item_id = item.getAttribute("id") |
614 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) | 619 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) |
615 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) | 620 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) |
616 try: | 621 try: |
617 iq_result = await self.publish( | 622 iq_result = await self.publish( |
618 client, service, nodeIdentifier, parsed_items, options=publish_options, | 623 client, |
619 sender=sender | 624 service, |
625 nodeIdentifier, | |
626 parsed_items, | |
627 options=publish_options, | |
628 sender=sender, | |
620 ) | 629 ) |
621 except error.StanzaError as e: | 630 except error.StanzaError as e: |
622 if ((e.condition == 'conflict' and e.appCondition | 631 if ( |
623 and e.appCondition.name == 'precondition-not-met' | 632 e.condition == "conflict" |
624 and publish_options is not None)): | 633 and e.appCondition |
634 and e.appCondition.name == "precondition-not-met" | |
635 and publish_options is not None | |
636 ): | |
625 # this usually happens when publish-options can't be set | 637 # this usually happens when publish-options can't be set |
626 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') | 638 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, "raise") |
627 if policy == 'raise': | 639 if policy == "raise": |
628 raise e | 640 raise e |
629 elif policy == 'publish_without_options': | 641 elif policy == "publish_without_options": |
630 log.warning(_( | 642 log.warning( |
631 "Can't use publish-options ({options}) on node {node}, " | 643 _( |
632 "re-publishing without them: {reason}").format( | 644 "Can't use publish-options ({options}) on node {node}, " |
633 options=', '.join(f'{k} = {v}' | 645 "re-publishing without them: {reason}" |
634 for k,v in publish_options.items()), | 646 ).format( |
647 options=", ".join( | |
648 f"{k} = {v}" for k, v in publish_options.items() | |
649 ), | |
635 node=nodeIdentifier, | 650 node=nodeIdentifier, |
636 reason=e, | 651 reason=e, |
637 ) | 652 ) |
638 ) | 653 ) |
639 iq_result = await self.publish( | 654 iq_result = await self.publish( |
640 client, service, nodeIdentifier, parsed_items) | 655 client, service, nodeIdentifier, parsed_items |
656 ) | |
641 else: | 657 else: |
642 raise exceptions.InternalError( | 658 raise exceptions.InternalError( |
643 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " | 659 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " |
644 f"{policy}" | 660 f"{policy}" |
645 ) | 661 ) |
646 else: | 662 else: |
647 raise e | 663 raise e |
648 try: | 664 try: |
649 return [ | 665 return [ |
650 item['id'] | 666 item["id"] |
651 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item') | 667 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, "item") |
652 ] | 668 ] |
653 except AttributeError: | 669 except AttributeError: |
654 return [] | 670 return [] |
655 | 671 |
656 async def publish( | 672 async def publish( |
659 service: jid.JID, | 675 service: jid.JID, |
660 nodeIdentifier: str, | 676 nodeIdentifier: str, |
661 items: Optional[List[domish.Element]] = None, | 677 items: Optional[List[domish.Element]] = None, |
662 options: Optional[dict] = None, | 678 options: Optional[dict] = None, |
663 sender: Optional[jid.JID] = None, | 679 sender: Optional[jid.JID] = None, |
664 extra: Optional[Dict[str, Any]] = None | 680 extra: Optional[Dict[str, Any]] = None, |
665 ) -> domish.Element: | 681 ) -> domish.Element: |
666 """Publish pubsub items | 682 """Publish pubsub items |
667 | 683 |
668 @param sender: sender of the request, | 684 @param sender: sender of the request, |
669 client.jid will be used if nto set | 685 client.jid will be used if nto set |
677 if sender is None: | 693 if sender is None: |
678 sender = client.jid | 694 sender = client.jid |
679 if extra is None: | 695 if extra is None: |
680 extra = {} | 696 extra = {} |
681 if not await self.host.trigger.async_point( | 697 if not await self.host.trigger.async_point( |
682 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, | 698 "XEP-0060_publish", |
683 extra | 699 client, |
700 service, | |
701 nodeIdentifier, | |
702 items, | |
703 options, | |
704 sender, | |
705 extra, | |
684 ): | 706 ): |
685 return extra["iq_result_elt"] | 707 return extra["iq_result_elt"] |
686 iq_result_elt = await client.pubsub_client.publish( | 708 iq_result_elt = await client.pubsub_client.publish( |
687 service, nodeIdentifier, items, sender, | 709 service, nodeIdentifier, items, sender, options=options |
688 options=options | |
689 ) | 710 ) |
690 return iq_result_elt | 711 return iq_result_elt |
691 | 712 |
692 def _unwrap_mam_message(self, message_elt): | 713 def _unwrap_mam_message(self, message_elt): |
693 try: | 714 try: |
694 item_elt = reduce( | 715 item_elt = reduce( |
695 lambda elt, ns_name: next(elt.elements(*ns_name)), | 716 lambda elt, ns_name: next(elt.elements(*ns_name)), |
696 (message_elt, | 717 ( |
697 (mam.NS_MAM, "result"), | 718 message_elt, |
698 (C.NS_FORWARD, "forwarded"), | 719 (mam.NS_MAM, "result"), |
699 (C.NS_CLIENT, "message"), | 720 (C.NS_FORWARD, "forwarded"), |
700 ("http://jabber.org/protocol/pubsub#event", "event"), | 721 (C.NS_CLIENT, "message"), |
701 ("http://jabber.org/protocol/pubsub#event", "items"), | 722 ("http://jabber.org/protocol/pubsub#event", "event"), |
702 ("http://jabber.org/protocol/pubsub#event", "item"), | 723 ("http://jabber.org/protocol/pubsub#event", "items"), |
703 )) | 724 ("http://jabber.org/protocol/pubsub#event", "item"), |
725 ), | |
726 ) | |
704 except StopIteration: | 727 except StopIteration: |
705 raise exceptions.DataError("Can't find Item in MAM message element") | 728 raise exceptions.DataError("Can't find Item in MAM message element") |
706 return item_elt | 729 return item_elt |
707 | 730 |
708 def serialise_items(self, items_data): | 731 def serialise_items(self, items_data): |
709 items, metadata = items_data | 732 items, metadata = items_data |
710 metadata['items'] = items | 733 metadata["items"] = items |
711 return data_format.serialise(metadata) | 734 return data_format.serialise(metadata) |
712 | 735 |
713 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 736 def _get_items( |
714 extra="", profile_key=C.PROF_KEY_NONE): | 737 self, |
738 service="", | |
739 node="", | |
740 max_items=10, | |
741 item_ids=None, | |
742 sub_id=None, | |
743 extra="", | |
744 profile_key=C.PROF_KEY_NONE, | |
745 ): | |
715 """Get items from pubsub node | 746 """Get items from pubsub node |
716 | 747 |
717 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | 748 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit |
718 """ | 749 """ |
719 client = self.host.get_client(profile_key) | 750 client = self.host.get_client(profile_key) |
720 service = jid.JID(service) if service else None | 751 service = jid.JID(service) if service else None |
721 max_items = None if max_items == C.NO_LIMIT else max_items | 752 max_items = None if max_items == C.NO_LIMIT else max_items |
722 extra = self.parse_extra(data_format.deserialise(extra)) | 753 extra = self.parse_extra(data_format.deserialise(extra)) |
723 d = defer.ensureDeferred(self.get_items( | 754 d = defer.ensureDeferred( |
724 client, | 755 self.get_items( |
725 service, | 756 client, |
726 node, | 757 service, |
727 max_items, | 758 node, |
728 item_ids, | 759 max_items, |
729 sub_id or None, | 760 item_ids, |
730 extra.rsm_request, | 761 sub_id or None, |
731 extra.extra, | 762 extra.rsm_request, |
732 )) | 763 extra.extra, |
764 ) | |
765 ) | |
733 d.addCallback(self.trans_items_data) | 766 d.addCallback(self.trans_items_data) |
734 d.addCallback(self.serialise_items) | 767 d.addCallback(self.serialise_items) |
735 return d | 768 return d |
736 | 769 |
737 async def get_items( | 770 async def get_items( |
741 node: str, | 774 node: str, |
742 max_items: Optional[int] = None, | 775 max_items: Optional[int] = None, |
743 item_ids: Optional[List[str]] = None, | 776 item_ids: Optional[List[str]] = None, |
744 sub_id: Optional[str] = None, | 777 sub_id: Optional[str] = None, |
745 rsm_request: Optional[rsm.RSMRequest] = None, | 778 rsm_request: Optional[rsm.RSMRequest] = None, |
746 extra: Optional[dict] = None | 779 extra: Optional[dict] = None, |
747 ) -> Tuple[List[domish.Element], dict]: | 780 ) -> Tuple[List[domish.Element], dict]: |
748 """Retrieve pubsub items from a node. | 781 """Retrieve pubsub items from a node. |
749 | 782 |
750 @param service (JID, None): pubsub service. | 783 @param service (JID, None): pubsub service. |
751 @param node (str): node id. | 784 @param node (str): node id. |
768 if rsm_request and item_ids: | 801 if rsm_request and item_ids: |
769 raise ValueError("items_id can't be used with rsm") | 802 raise ValueError("items_id can't be used with rsm") |
770 if extra is None: | 803 if extra is None: |
771 extra = {} | 804 extra = {} |
772 cont, ret = await self.host.trigger.async_return_point( | 805 cont, ret = await self.host.trigger.async_return_point( |
773 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, | 806 "XEP-0060_getItems", |
774 rsm_request, extra | 807 client, |
808 service, | |
809 node, | |
810 max_items, | |
811 item_ids, | |
812 sub_id, | |
813 rsm_request, | |
814 extra, | |
775 ) | 815 ) |
776 if not cont: | 816 if not cont: |
777 return ret | 817 return ret |
778 try: | 818 try: |
779 mam_query = extra["mam"] | 819 mam_query = extra["mam"] |
780 except KeyError: | 820 except KeyError: |
781 d = defer.ensureDeferred(client.pubsub_client.items( | 821 d = defer.ensureDeferred( |
782 service = service, | 822 client.pubsub_client.items( |
783 nodeIdentifier = node, | 823 service=service, |
784 maxItems = max_items, | 824 nodeIdentifier=node, |
785 subscriptionIdentifier = sub_id, | 825 maxItems=max_items, |
786 sender = None, | 826 subscriptionIdentifier=sub_id, |
787 itemIdentifiers = item_ids, | 827 sender=None, |
788 orderBy = extra.get(C.KEY_ORDER_BY), | 828 itemIdentifiers=item_ids, |
789 rsm_request = rsm_request, | 829 orderBy=extra.get(C.KEY_ORDER_BY), |
790 extra = extra | 830 rsm_request=rsm_request, |
791 )) | 831 extra=extra, |
832 ) | |
833 ) | |
792 # we have no MAM data here, so we add None | 834 # we have no MAM data here, so we add None |
793 d.addErrback(sat_defer.stanza_2_not_found) | 835 d.addErrback(sat_defer.stanza_2_not_found) |
794 d.addTimeout(TIMEOUT, reactor) | 836 d.addTimeout(TIMEOUT, reactor) |
795 items, rsm_response = await d | 837 items, rsm_response = await d |
796 mam_response = None | 838 mam_response = None |
842 if mam_response is not None: | 884 if mam_response is not None: |
843 # mam_response is a dict with "complete" and "stable" keys | 885 # mam_response is a dict with "complete" and "stable" keys |
844 # we can put them directly in metadata | 886 # we can put them directly in metadata |
845 metadata.update(mam_response) | 887 metadata.update(mam_response) |
846 if rsm_request is not None and rsm_response is not None: | 888 if rsm_request is not None and rsm_response is not None: |
847 metadata['rsm'] = rsm_response.toDict() | 889 metadata["rsm"] = rsm_response.toDict() |
848 if mam_response is None: | 890 if mam_response is None: |
849 index = rsm_response.index | 891 index = rsm_response.index |
850 count = rsm_response.count | 892 count = rsm_response.count |
851 if index is None or count is None: | 893 if index is None or count is None: |
852 # we don't have enough information to know if the data is complete | 894 # we don't have enough information to know if the data is complete |
885 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | 927 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) |
886 # continue # avoid pubsub "item-not-found" error | 928 # continue # avoid pubsub "item-not-found" error |
887 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) | 929 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) |
888 # defer.returnValue(d_dict) | 930 # defer.returnValue(d_dict) |
889 | 931 |
890 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, | 932 def getOptions( |
891 profile_key=C.PROF_KEY_NONE): | 933 self, |
934 service, | |
935 nodeIdentifier, | |
936 subscriber, | |
937 subscriptionIdentifier=None, | |
938 profile_key=C.PROF_KEY_NONE, | |
939 ): | |
892 client = self.host.get_client(profile_key) | 940 client = self.host.get_client(profile_key) |
893 return client.pubsub_client.getOptions( | 941 return client.pubsub_client.getOptions( |
894 service, nodeIdentifier, subscriber, subscriptionIdentifier | 942 service, nodeIdentifier, subscriber, subscriptionIdentifier |
895 ) | 943 ) |
896 | 944 |
897 def setOptions(self, service, nodeIdentifier, subscriber, options, | 945 def setOptions( |
898 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 946 self, |
947 service, | |
948 nodeIdentifier, | |
949 subscriber, | |
950 options, | |
951 subscriptionIdentifier=None, | |
952 profile_key=C.PROF_KEY_NONE, | |
953 ): | |
899 client = self.host.get_client(profile_key) | 954 client = self.host.get_client(profile_key) |
900 return client.pubsub_client.setOptions( | 955 return client.pubsub_client.setOptions( |
901 service, nodeIdentifier, subscriber, options, subscriptionIdentifier | 956 service, nodeIdentifier, subscriber, options, subscriptionIdentifier |
902 ) | 957 ) |
903 | 958 |
910 def createNode( | 965 def createNode( |
911 self, | 966 self, |
912 client: SatXMPPClient, | 967 client: SatXMPPClient, |
913 service: jid.JID, | 968 service: jid.JID, |
914 nodeIdentifier: Optional[str] = None, | 969 nodeIdentifier: Optional[str] = None, |
915 options: Optional[Dict[str, str]] = None | 970 options: Optional[Dict[str, str]] = None, |
916 ) -> str: | 971 ) -> str: |
917 """Create a new node | 972 """Create a new node |
918 | 973 |
919 @param service: PubSub service, | 974 @param service: PubSub service, |
920 @param NodeIdentifier: node name use None to create instant node (identifier will | 975 @param NodeIdentifier: node name use None to create instant node (identifier will |
1036 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): | 1091 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): |
1037 client = self.host.get_client(profile_key) | 1092 client = self.host.get_client(profile_key) |
1038 d = self.get_node_affiliations( | 1093 d = self.get_node_affiliations( |
1039 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1094 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1040 ) | 1095 ) |
1041 d.addCallback( | 1096 d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.items()}) |
1042 lambda affiliations: {j.full(): a for j, a in affiliations.items()} | |
1043 ) | |
1044 return d | 1097 return d |
1045 | 1098 |
1046 def get_node_affiliations(self, client, service, nodeIdentifier): | 1099 def get_node_affiliations(self, client, service, nodeIdentifier): |
1047 """Retrieve affiliations of a node owned by profile""" | 1100 """Retrieve affiliations of a node owned by profile""" |
1048 request = pubsub.PubSubRequest("affiliationsGet") | 1101 request = pubsub.PubSubRequest("affiliationsGet") |
1120 return self.deleteNode( | 1173 return self.deleteNode( |
1121 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1174 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1122 ) | 1175 ) |
1123 | 1176 |
1124 def deleteNode( | 1177 def deleteNode( |
1125 self, | 1178 self, client: SatXMPPClient, service: jid.JID, nodeIdentifier: str |
1126 client: SatXMPPClient, | |
1127 service: jid.JID, | |
1128 nodeIdentifier: str | |
1129 ) -> defer.Deferred: | 1179 ) -> defer.Deferred: |
1130 return client.pubsub_client.deleteNode(service, nodeIdentifier) | 1180 return client.pubsub_client.deleteNode(service, nodeIdentifier) |
1131 | 1181 |
1132 def _addWatch(self, service_s, node, profile_key): | 1182 def _addWatch(self, service_s, node, profile_key): |
1133 """watch modifications on a node | 1183 """watch modifications on a node |
1186 new_id, | 1236 new_id, |
1187 profile_key=C.PROF_KEY_NONE, | 1237 profile_key=C.PROF_KEY_NONE, |
1188 ): | 1238 ): |
1189 client = self.host.get_client(profile_key) | 1239 client = self.host.get_client(profile_key) |
1190 service = jid.JID(service) if service else None | 1240 service = jid.JID(service) if service else None |
1191 return defer.ensureDeferred(self.rename_item( | 1241 return defer.ensureDeferred( |
1192 client, service, node, item_id, new_id | 1242 self.rename_item(client, service, node, item_id, new_id) |
1193 )) | 1243 ) |
1194 | 1244 |
1195 async def rename_item( | 1245 async def rename_item( |
1196 self, | 1246 self, |
1197 client: SatXMPPEntity, | 1247 client: SatXMPPEntity, |
1198 service: Optional[jid.JID], | 1248 service: Optional[jid.JID], |
1199 node: str, | 1249 node: str, |
1200 item_id: str, | 1250 item_id: str, |
1201 new_id: str | 1251 new_id: str, |
1202 ) -> None: | 1252 ) -> None: |
1203 """Rename an item by recreating it then deleting it | 1253 """Rename an item by recreating it then deleting it |
1204 | 1254 |
1205 we have to recreate then delete because there is currently no rename operation | 1255 we have to recreate then delete because there is currently no rename operation |
1206 with PubSub | 1256 with PubSub |
1216 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 1266 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
1217 client = self.host.get_client(profile_key) | 1267 client = self.host.get_client(profile_key) |
1218 service = None if not service else jid.JID(service) | 1268 service = None if not service else jid.JID(service) |
1219 d = defer.ensureDeferred( | 1269 d = defer.ensureDeferred( |
1220 self.subscribe( | 1270 self.subscribe( |
1221 client, | 1271 client, service, nodeIdentifier, options=data_format.deserialise(options) |
1222 service, | |
1223 nodeIdentifier, | |
1224 options=data_format.deserialise(options) | |
1225 ) | 1272 ) |
1226 ) | 1273 ) |
1227 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") | 1274 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") |
1228 return d | 1275 return d |
1229 | 1276 |
1231 self, | 1278 self, |
1232 client: SatXMPPEntity, | 1279 client: SatXMPPEntity, |
1233 service: Optional[jid.JID], | 1280 service: Optional[jid.JID], |
1234 nodeIdentifier: str, | 1281 nodeIdentifier: str, |
1235 sub_jid: Optional[jid.JID] = None, | 1282 sub_jid: Optional[jid.JID] = None, |
1236 options: Optional[dict] = None | 1283 options: Optional[dict] = None, |
1237 ) -> pubsub.Subscription: | 1284 ) -> pubsub.Subscription: |
1238 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 1285 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
1239 if service is None: | 1286 if service is None: |
1240 service = client.jid.userhostJID() | 1287 service = client.jid.userhostJID() |
1241 cont, trigger_sub = await self.host.trigger.async_return_point( | 1288 cont, trigger_sub = await self.host.trigger.async_return_point( |
1242 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, | 1289 "XEP-0060_subscribe", |
1290 client, | |
1291 service, | |
1292 nodeIdentifier, | |
1293 sub_jid, | |
1294 options, | |
1243 ) | 1295 ) |
1244 if not cont: | 1296 if not cont: |
1245 return trigger_sub | 1297 return trigger_sub |
1246 try: | 1298 try: |
1247 subscription = await client.pubsub_client.subscribe( | 1299 subscription = await client.pubsub_client.subscribe( |
1248 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), | 1300 service, |
1249 options=options, sender=client.jid.userhostJID() | 1301 nodeIdentifier, |
1302 sub_jid or client.jid.userhostJID(), | |
1303 options=options, | |
1304 sender=client.jid.userhostJID(), | |
1250 ) | 1305 ) |
1251 except error.StanzaError as e: | 1306 except error.StanzaError as e: |
1252 if e.condition == 'item-not-found': | 1307 if e.condition == "item-not-found": |
1253 raise exceptions.NotFound(e.text or e.condition) | 1308 raise exceptions.NotFound(e.text or e.condition) |
1254 else: | 1309 else: |
1255 raise e | 1310 raise e |
1256 return subscription | 1311 return subscription |
1257 | 1312 |
1268 sub_jid: Optional[jid.JID] = None, | 1323 sub_jid: Optional[jid.JID] = None, |
1269 subscriptionIdentifier: Optional[str] = None, | 1324 subscriptionIdentifier: Optional[str] = None, |
1270 sender: Optional[jid.JID] = None, | 1325 sender: Optional[jid.JID] = None, |
1271 ) -> None: | 1326 ) -> None: |
1272 if not await self.host.trigger.async_point( | 1327 if not await self.host.trigger.async_point( |
1273 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, | 1328 "XEP-0060_unsubscribe", |
1274 subscriptionIdentifier, sender | 1329 client, |
1330 service, | |
1331 nodeIdentifier, | |
1332 sub_jid, | |
1333 subscriptionIdentifier, | |
1334 sender, | |
1275 ): | 1335 ): |
1276 return | 1336 return |
1277 try: | 1337 try: |
1278 await client.pubsub_client.unsubscribe( | 1338 await client.pubsub_client.unsubscribe( |
1279 service, | 1339 service, |
1280 nodeIdentifier, | 1340 nodeIdentifier, |
1281 sub_jid or client.jid.userhostJID(), | 1341 sub_jid or client.jid.userhostJID(), |
1282 subscriptionIdentifier, | 1342 subscriptionIdentifier, |
1283 sender, | 1343 sender, |
1284 ) | 1344 ) |
1285 except error.StanzaError as e: | 1345 except error.StanzaError as e: |
1286 try: | 1346 try: |
1287 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) | 1347 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) |
1288 except StopIteration: | 1348 except StopIteration: |
1289 raise e | 1349 raise e |
1293 f"subscribed to node {nodeIdentifier!s} at {service.full()}" | 1353 f"subscribed to node {nodeIdentifier!s} at {service.full()}" |
1294 ) | 1354 ) |
1295 | 1355 |
1296 @utils.ensure_deferred | 1356 @utils.ensure_deferred |
1297 async def _subscriptions( | 1357 async def _subscriptions( |
1298 self, | 1358 self, service="", nodeIdentifier="", profile_key=C.PROF_KEY_NONE |
1299 service="", | |
1300 nodeIdentifier="", | |
1301 profile_key=C.PROF_KEY_NONE | |
1302 ) -> str: | 1359 ) -> str: |
1303 client = self.host.get_client(profile_key) | 1360 client = self.host.get_client(profile_key) |
1304 service = None if not service else jid.JID(service) | 1361 service = None if not service else jid.JID(service) |
1305 subs = await self.subscriptions(client, service, nodeIdentifier or None) | 1362 subs = await self.subscriptions(client, service, nodeIdentifier or None) |
1306 return data_format.serialise(subs) | 1363 return data_format.serialise(subs) |
1307 | 1364 |
1308 async def subscriptions( | 1365 async def subscriptions( |
1309 self, | 1366 self, |
1310 client: SatXMPPEntity, | 1367 client: SatXMPPEntity, |
1311 service: Optional[jid.JID] = None, | 1368 service: Optional[jid.JID] = None, |
1312 node: Optional[str] = None | 1369 node: Optional[str] = None, |
1313 ) -> List[Dict[str, Union[str, bool]]]: | 1370 ) -> List[Dict[str, Union[str, bool]]]: |
1314 """Retrieve subscriptions from a service | 1371 """Retrieve subscriptions from a service |
1315 | 1372 |
1316 @param service(jid.JID): PubSub service | 1373 @param service(jid.JID): PubSub service |
1317 @param nodeIdentifier(unicode, None): node to check | 1374 @param nodeIdentifier(unicode, None): node to check |
1397 | 1454 |
1398 def eb(failure_): | 1455 def eb(failure_): |
1399 log.warning(f"Error while parsing item: {failure_.value}") | 1456 log.warning(f"Error while parsing item: {failure_.value}") |
1400 | 1457 |
1401 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) | 1458 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) |
1402 d.addCallback(lambda parsed_items: ( | 1459 d.addCallback( |
1403 [i for i in parsed_items if i is not None], | 1460 lambda parsed_items: ([i for i in parsed_items if i is not None], metadata) |
1404 metadata | 1461 ) |
1405 )) | |
1406 return d | 1462 return d |
1407 | 1463 |
1408 def ser_d_list(self, results, failure_result=None): | 1464 def ser_d_list(self, results, failure_result=None): |
1409 """Serialise a DeferredList result | 1465 """Serialise a DeferredList result |
1410 | 1466 |
1416 - result | 1472 - result |
1417 """ | 1473 """ |
1418 if failure_result is None: | 1474 if failure_result is None: |
1419 failure_result = () | 1475 failure_result = () |
1420 return [ | 1476 return [ |
1421 ("", result) | 1477 ( |
1422 if success | 1478 ("", result) |
1423 else (str(result.result) or UNSPECIFIED, failure_result) | 1479 if success |
1480 else (str(result.result) or UNSPECIFIED, failure_result) | |
1481 ) | |
1424 for success, result in results | 1482 for success, result in results |
1425 ] | 1483 ] |
1426 | 1484 |
1427 # subscribe # | 1485 # subscribe # |
1428 | 1486 |
1429 @utils.ensure_deferred | 1487 @utils.ensure_deferred |
1430 async def _get_node_subscriptions( | 1488 async def _get_node_subscriptions( |
1431 self, | 1489 self, service: str, node: str, profile_key: str |
1432 service: str, | |
1433 node: str, | |
1434 profile_key: str | |
1435 ) -> Dict[str, str]: | 1490 ) -> Dict[str, str]: |
1436 client = self.host.get_client(profile_key) | 1491 client = self.host.get_client(profile_key) |
1437 subs = await self.get_node_subscriptions( | 1492 subs = await self.get_node_subscriptions( |
1438 client, jid.JID(service) if service else None, node | 1493 client, jid.JID(service) if service else None, node |
1439 ) | 1494 ) |
1440 return {j.full(): a for j, a in subs.items()} | 1495 return {j.full(): a for j, a in subs.items()} |
1441 | 1496 |
1442 async def get_node_subscriptions( | 1497 async def get_node_subscriptions( |
1443 self, | 1498 self, client: SatXMPPEntity, service: Optional[jid.JID], nodeIdentifier: str |
1444 client: SatXMPPEntity, | |
1445 service: Optional[jid.JID], | |
1446 nodeIdentifier: str | |
1447 ) -> Dict[jid.JID, str]: | 1499 ) -> Dict[jid.JID, str]: |
1448 """Retrieve subscriptions to a node | 1500 """Retrieve subscriptions to a node |
1449 | 1501 |
1450 @param nodeIdentifier(unicode): node to get subscriptions from | 1502 @param nodeIdentifier(unicode): node to get subscriptions from |
1451 """ | 1503 """ |
1469 except AttributeError as e: | 1521 except AttributeError as e: |
1470 raise ValueError(_("Invalid result: {}").format(e)) | 1522 raise ValueError(_("Invalid result: {}").format(e)) |
1471 try: | 1523 try: |
1472 return { | 1524 return { |
1473 jid.JID(s["jid"]): s["subscription"] | 1525 jid.JID(s["jid"]): s["subscription"] |
1474 for s in subscriptions_elt.elements( | 1526 for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, "subscription")) |
1475 (pubsub.NS_PUBSUB, "subscription") | |
1476 ) | |
1477 } | 1527 } |
1478 except KeyError: | 1528 except KeyError: |
1479 raise ValueError( | 1529 raise ValueError( |
1480 _("Invalid result: bad <subscription> element: {}").format( | 1530 _("Invalid result: bad <subscription> element: {}").format(iq_elt.toXml) |
1481 iq_elt.toXml | |
1482 ) | |
1483 ) | 1531 ) |
1484 | 1532 |
1485 def _set_node_subscriptions( | 1533 def _set_node_subscriptions( |
1486 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE | 1534 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE |
1487 ): | 1535 ): |
1488 client = self.host.get_client(profile_key) | 1536 client = self.host.get_client(profile_key) |
1489 subscriptions = { | 1537 subscriptions = { |
1490 jid.JID(jid_): subscription | 1538 jid.JID(jid_): subscription for jid_, subscription in subscriptions.items() |
1491 for jid_, subscription in subscriptions.items() | |
1492 } | 1539 } |
1493 d = self.set_node_subscriptions( | 1540 d = self.set_node_subscriptions( |
1494 client, | 1541 client, |
1495 jid.JID(service_s) if service_s else None, | 1542 jid.JID(service_s) if service_s else None, |
1496 nodeIdentifier, | 1543 nodeIdentifier, |
1570 """ | 1617 """ |
1571 client = self.host.get_client(profile_key) | 1618 client = self.host.get_client(profile_key) |
1572 deferreds = {} | 1619 deferreds = {} |
1573 for service, node in node_data: | 1620 for service, node in node_data: |
1574 deferreds[(service, node)] = defer.ensureDeferred( | 1621 deferreds[(service, node)] = defer.ensureDeferred( |
1575 client.pubsub_client.subscribe( | 1622 client.pubsub_client.subscribe(service, node, subscriber, options=options) |
1576 service, node, subscriber, options=options | |
1577 ) | |
1578 ) | 1623 ) |
1579 return self.rt_sessions.new_session(deferreds, client.profile) | 1624 return self.rt_sessions.new_session(deferreds, client.profile) |
1580 # found_nodes = yield self.listNodes(service, profile=client.profile) | 1625 # found_nodes = yield self.listNodes(service, profile=client.profile) |
1581 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 1626 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
1582 # d_list = [] | 1627 # d_list = [] |
1637 extra.rsm_request, | 1682 extra.rsm_request, |
1638 extra.extra, | 1683 extra.extra, |
1639 profile_key, | 1684 profile_key, |
1640 ) | 1685 ) |
1641 | 1686 |
1642 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, | 1687 def get_from_many( |
1643 profile_key=C.PROF_KEY_NONE): | 1688 self, |
1689 node_data, | |
1690 max_item=None, | |
1691 rsm_request=None, | |
1692 extra=None, | |
1693 profile_key=C.PROF_KEY_NONE, | |
1694 ): | |
1644 """Get items from many nodes at once | 1695 """Get items from many nodes at once |
1645 | 1696 |
1646 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | 1697 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: |
1647 - service (jid.JID) is the pubsub service | 1698 - service (jid.JID) is the pubsub service |
1648 - node (unicode) is the node to get items from | 1699 - node (unicode) is the node to get items from |
1652 @return (str): RT Deferred session id | 1703 @return (str): RT Deferred session id |
1653 """ | 1704 """ |
1654 client = self.host.get_client(profile_key) | 1705 client = self.host.get_client(profile_key) |
1655 deferreds = {} | 1706 deferreds = {} |
1656 for service, node in node_data: | 1707 for service, node in node_data: |
1657 deferreds[(service, node)] = defer.ensureDeferred(self.get_items( | 1708 deferreds[(service, node)] = defer.ensureDeferred( |
1658 client, service, node, max_item, rsm_request=rsm_request, extra=extra | 1709 self.get_items( |
1659 )) | 1710 client, service, node, max_item, rsm_request=rsm_request, extra=extra |
1711 ) | |
1712 ) | |
1660 return self.rt_sessions.new_session(deferreds, client.profile) | 1713 return self.rt_sessions.new_session(deferreds, client.profile) |
1661 | 1714 |
1662 | 1715 |
1663 @implementer(disco.IDisco) | 1716 @implementer(disco.IDisco) |
1664 class SatPubSubClient(rsm.PubSubClient): | 1717 class SatPubSubClient(rsm.PubSubClient): |
1684 extra: Optional[Dict[str, Any]] = None, | 1737 extra: Optional[Dict[str, Any]] = None, |
1685 ): | 1738 ): |
1686 if extra is None: | 1739 if extra is None: |
1687 extra = {} | 1740 extra = {} |
1688 items, rsm_response = await super().items( | 1741 items, rsm_response = await super().items( |
1689 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, | 1742 service, |
1690 itemIdentifiers, orderBy, rsm_request | 1743 nodeIdentifier, |
1744 maxItems, | |
1745 subscriptionIdentifier, | |
1746 sender, | |
1747 itemIdentifiers, | |
1748 orderBy, | |
1749 rsm_request, | |
1691 ) | 1750 ) |
1692 # items must be returned, thus this async point can't stop the workflow (but it | 1751 # items must be returned, thus this async point can't stop the workflow (but it |
1693 # can modify returned items) | 1752 # can modify returned items) |
1694 await self.host.trigger.async_point( | 1753 await self.host.trigger.async_point( |
1695 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, | 1754 "XEP-0060_items", |
1696 extra | 1755 self.parent, |
1756 service, | |
1757 nodeIdentifier, | |
1758 items, | |
1759 rsm_response, | |
1760 extra, | |
1697 ) | 1761 ) |
1698 return items, rsm_response | 1762 return items, rsm_response |
1699 | 1763 |
1700 def _get_node_callbacks(self, node, event): | 1764 def _get_node_callbacks(self, node, event): |
1701 """Generate callbacks from given node and event | 1765 """Generate callbacks from given node and event |
1723 """ | 1787 """ |
1724 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): | 1788 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): |
1725 try: | 1789 try: |
1726 await utils.as_deferred(callback, client, event) | 1790 await utils.as_deferred(callback, client, event) |
1727 except Exception as e: | 1791 except Exception as e: |
1728 log.error( | 1792 log.error(f"Error while running items event callback {callback}: {e}") |
1729 f"Error while running items event callback {callback}: {e}" | |
1730 ) | |
1731 | 1793 |
1732 def itemsReceived(self, event): | 1794 def itemsReceived(self, event): |
1733 log.debug("Pubsub items received") | 1795 log.debug("Pubsub items received") |
1734 client = self.parent | 1796 client = self.parent |
1735 defer.ensureDeferred(self._call_node_callbacks(client, event)) | 1797 defer.ensureDeferred(self._call_node_callbacks(client, event)) |
1745 | 1807 |
1746 def deleteReceived(self, event): | 1808 def deleteReceived(self, event): |
1747 log.debug(("Publish node deleted")) | 1809 log.debug(("Publish node deleted")) |
1748 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): | 1810 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): |
1749 d = utils.as_deferred(callback, self.parent, event) | 1811 d = utils.as_deferred(callback, self.parent, event) |
1750 d.addErrback(lambda f: log.error( | 1812 d.addErrback( |
1751 f"Error while running delete event callback {callback}: {f}" | 1813 lambda f: log.error( |
1752 )) | 1814 f"Error while running delete event callback {callback}: {f}" |
1815 ) | |
1816 ) | |
1753 client = self.parent | 1817 client = self.parent |
1754 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1818 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1755 self.host.bridge.ps_event_raw( | 1819 self.host.bridge.ps_event_raw( |
1756 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile | 1820 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile |
1757 ) | 1821 ) |
1758 | 1822 |
1759 def purgeReceived(self, event): | 1823 def purgeReceived(self, event): |
1760 log.debug(("Publish node purged")) | 1824 log.debug(("Publish node purged")) |
1761 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): | 1825 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): |
1762 d = utils.as_deferred(callback, self.parent, event) | 1826 d = utils.as_deferred(callback, self.parent, event) |
1763 d.addErrback(lambda f: log.error( | 1827 d.addErrback( |
1764 f"Error while running purge event callback {callback}: {f}" | 1828 lambda f: log.error( |
1765 )) | 1829 f"Error while running purge event callback {callback}: {f}" |
1830 ) | |
1831 ) | |
1766 client = self.parent | 1832 client = self.parent |
1767 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1833 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1768 self.host.bridge.ps_event_raw( | 1834 self.host.bridge.ps_event_raw( |
1769 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile | 1835 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile |
1770 ) | 1836 ) |
1806 @param service(jid.JID, None): service to send the item to | 1872 @param service(jid.JID, None): service to send the item to |
1807 None to use PEP | 1873 None to use PEP |
1808 @param NodeIdentifier(unicode): PubSub node to use | 1874 @param NodeIdentifier(unicode): PubSub node to use |
1809 """ | 1875 """ |
1810 # TODO: propose this upstream and remove it once merged | 1876 # TODO: propose this upstream and remove it once merged |
1811 request = pubsub.PubSubRequest('purge') | 1877 request = pubsub.PubSubRequest("purge") |
1812 request.recipient = service | 1878 request.recipient = service |
1813 request.nodeIdentifier = nodeIdentifier | 1879 request.nodeIdentifier = nodeIdentifier |
1814 return request.send(self.xmlstream) | 1880 return request.send(self.xmlstream) |
1815 | 1881 |
1816 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | 1882 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): |