comparison libervia/backend/plugins/plugin_xep_0060.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 2b000790b197
children 23842a63ea00
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None 68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
69 # extra is a potentially empty dict 69 # extra is a potentially empty dict
70 TIMEOUT = 30 70 TIMEOUT = 30
71 # minimum features that a pubsub service must have to be selectable as default 71 # minimum features that a pubsub service must have to be selectable as default
72 DEFAULT_PUBSUB_MIN_FEAT = { 72 DEFAULT_PUBSUB_MIN_FEAT = {
73 'http://jabber.org/protocol/pubsub#persistent-items', 73 "http://jabber.org/protocol/pubsub#persistent-items",
74 'http://jabber.org/protocol/pubsub#publish', 74 "http://jabber.org/protocol/pubsub#publish",
75 'http://jabber.org/protocol/pubsub#retract-items', 75 "http://jabber.org/protocol/pubsub#retract-items",
76 } 76 }
77
77 78
78 class XEP_0060(object): 79 class XEP_0060(object):
79 OPT_ACCESS_MODEL = "pubsub#access_model" 80 OPT_ACCESS_MODEL = "pubsub#access_model"
80 OPT_PERSIST_ITEMS = "pubsub#persist_items" 81 OPT_PERSIST_ITEMS = "pubsub#persist_items"
81 OPT_MAX_ITEMS = "pubsub#max_items" 82 OPT_MAX_ITEMS = "pubsub#max_items"
107 def __init__(self, host): 108 def __init__(self, host):
108 log.info(_("PubSub plugin initialization")) 109 log.info(_("PubSub plugin initialization"))
109 self.host = host 110 self.host = host
110 self._rsm = host.plugins.get("XEP-0059") 111 self._rsm = host.plugins.get("XEP-0059")
111 self._mam = host.plugins.get("XEP-0313") 112 self._mam = host.plugins.get("XEP-0313")
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) 113 self._node_cb = (
114 {}
115 ) # dictionnary of callbacks for node (key: node, value: list of callbacks)
113 self.rt_sessions = sat_defer.RTDeferredSessions() 116 self.rt_sessions = sat_defer.RTDeferredSessions()
114 host.bridge.add_method( 117 host.bridge.add_method(
115 "ps_node_create", 118 "ps_node_create",
116 ".plugin", 119 ".plugin",
117 in_sign="ssa{ss}s", 120 in_sign="ssa{ss}s",
374 client = self.host.get_client(profile) 377 client = self.host.get_client(profile)
375 except exceptions.ProfileNotSetError: 378 except exceptions.ProfileNotSetError:
376 return {} 379 return {}
377 try: 380 try:
378 return { 381 return {
379 "service": client.pubsub_service.full() 382 "service": (
380 if client.pubsub_service is not None 383 client.pubsub_service.full()
381 else "" 384 if client.pubsub_service is not None
385 else ""
386 )
382 } 387 }
383 except AttributeError: 388 except AttributeError:
384 if self.host.is_connected(profile): 389 if self.host.is_connected(profile):
385 log.debug("Profile is not connected, service is not checked yet") 390 log.debug("Profile is not connected, service is not checked yet")
386 else: 391 else:
421 assert "mam" not in extra 426 assert "mam" not in extra
422 extra["mam"] = mam_request 427 extra["mam"] = mam_request
423 428
424 return Extra(rsm_request, extra) 429 return Extra(rsm_request, extra)
425 430
426 def add_managed_node( 431 def add_managed_node(self, node: str, priority: int = 0, **kwargs: Callable):
427 self,
428 node: str,
429 priority: int = 0,
430 **kwargs: Callable
431 ):
432 """Add a handler for a node 432 """Add a handler for a node
433 433
434 @param node: node to monitor 434 @param node: node to monitor
435 all node *prefixed* with this one will be triggered 435 all node *prefixed* with this one will be triggered
436 @param priority: priority of the callback. Callbacks with higher priority will be 436 @param priority: priority of the callback. Callbacks with higher priority will be
512 # """ 512 # """
513 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) 513 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
514 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) 514 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
515 # return d 515 # return d
516 516
517 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", 517 def _send_item(
518 profile_key=C.PROF_KEY_NONE): 518 self,
519 service,
520 nodeIdentifier,
521 payload,
522 item_id=None,
523 extra_ser="",
524 profile_key=C.PROF_KEY_NONE,
525 ):
519 client = self.host.get_client(profile_key) 526 client = self.host.get_client(profile_key)
520 service = None if not service else jid.JID(service) 527 service = None if not service else jid.JID(service)
521 payload = xml_tools.parse(payload) 528 payload = xml_tools.parse(payload)
522 extra = data_format.deserialise(extra_ser) 529 extra = data_format.deserialise(extra_ser)
523 d = defer.ensureDeferred(self.send_item( 530 d = defer.ensureDeferred(
524 client, service, nodeIdentifier, payload, item_id or None, extra 531 self.send_item(
525 )) 532 client, service, nodeIdentifier, payload, item_id or None, extra
533 )
534 )
526 d.addCallback(lambda ret: ret or "") 535 d.addCallback(lambda ret: ret or "")
527 return d 536 return d
528 537
529 def _send_items(self, service, nodeIdentifier, items, extra_ser=None, 538 def _send_items(
530 profile_key=C.PROF_KEY_NONE): 539 self, service, nodeIdentifier, items, extra_ser=None, profile_key=C.PROF_KEY_NONE
540 ):
531 client = self.host.get_client(profile_key) 541 client = self.host.get_client(profile_key)
532 service = None if not service else jid.JID(service) 542 service = None if not service else jid.JID(service)
533 try: 543 try:
534 items = [xml_tools.parse(item) for item in items] 544 items = [xml_tools.parse(item) for item in items]
535 except Exception as e: 545 except Exception as e:
536 raise exceptions.DataError(_("Can't parse items: {msg}").format( 546 raise exceptions.DataError(_("Can't parse items: {msg}").format(msg=e))
537 msg=e))
538 extra = data_format.deserialise(extra_ser) 547 extra = data_format.deserialise(extra_ser)
539 return defer.ensureDeferred(self.send_items( 548 return defer.ensureDeferred(
540 client, service, nodeIdentifier, items, extra=extra 549 self.send_items(client, service, nodeIdentifier, items, extra=extra)
541 )) 550 )
542 551
543 async def send_item( 552 async def send_item(
544 self, 553 self,
545 client: SatXMPPClient, 554 client: SatXMPPClient,
546 service: Union[jid.JID, None], 555 service: Union[jid.JID, None],
547 nodeIdentifier: str, 556 nodeIdentifier: str,
548 payload: domish.Element, 557 payload: domish.Element,
549 item_id: Optional[str] = None, 558 item_id: Optional[str] = None,
550 extra: Optional[Dict[str, Any]] = None 559 extra: Optional[Dict[str, Any]] = None,
551 ) -> Optional[str]: 560 ) -> Optional[str]:
552 """High level method to send one item 561 """High level method to send one item
553 562
554 @param service: service to send the item to None to use PEP 563 @param service: service to send the item to None to use PEP
555 @param NodeIdentifier: PubSub node to use 564 @param NodeIdentifier: PubSub node to use
557 @param item_id: id to use or None to create one 566 @param item_id: id to use or None to create one
558 @param extra: extra options 567 @param extra: extra options
559 @return: id of the created item 568 @return: id of the created item
560 """ 569 """
561 assert isinstance(payload, domish.Element) 570 assert isinstance(payload, domish.Element)
562 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) 571 item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
563 if item_id is not None: 572 if item_id is not None:
564 item_elt['id'] = item_id 573 item_elt["id"] = item_id
565 item_elt.addChild(payload) 574 item_elt.addChild(payload)
566 published_ids = await self.send_items( 575 published_ids = await self.send_items(
567 client, 576 client, service, nodeIdentifier, [item_elt], extra=extra
568 service,
569 nodeIdentifier,
570 [item_elt],
571 extra=extra
572 ) 577 )
573 try: 578 try:
574 return published_ids[0] 579 return published_ids[0]
575 except IndexError: 580 except IndexError:
576 return item_id 581 return item_id
580 client: SatXMPPEntity, 585 client: SatXMPPEntity,
581 service: Optional[jid.JID], 586 service: Optional[jid.JID],
582 nodeIdentifier: str, 587 nodeIdentifier: str,
583 items: List[domish.Element], 588 items: List[domish.Element],
584 sender: Optional[jid.JID] = None, 589 sender: Optional[jid.JID] = None,
585 extra: Optional[Dict[str, Any]] = None 590 extra: Optional[Dict[str, Any]] = None,
586 ) -> List[str]: 591 ) -> List[str]:
587 """High level method to send several items at once 592 """High level method to send several items at once
588 593
589 @param service: service to send the item to 594 @param service: service to send the item to
590 None to use PEP 595 None to use PEP
606 extra = {} 611 extra = {}
607 if service is None: 612 if service is None:
608 service = client.jid.userhostJID() 613 service = client.jid.userhostJID()
609 parsed_items = [] 614 parsed_items = []
610 for item in items: 615 for item in items:
611 if item.name != 'item': 616 if item.name != "item":
612 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) 617 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
613 item_id = item.getAttribute("id") 618 item_id = item.getAttribute("id")
614 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) 619 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
615 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) 620 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
616 try: 621 try:
617 iq_result = await self.publish( 622 iq_result = await self.publish(
618 client, service, nodeIdentifier, parsed_items, options=publish_options, 623 client,
619 sender=sender 624 service,
625 nodeIdentifier,
626 parsed_items,
627 options=publish_options,
628 sender=sender,
620 ) 629 )
621 except error.StanzaError as e: 630 except error.StanzaError as e:
622 if ((e.condition == 'conflict' and e.appCondition 631 if (
623 and e.appCondition.name == 'precondition-not-met' 632 e.condition == "conflict"
624 and publish_options is not None)): 633 and e.appCondition
634 and e.appCondition.name == "precondition-not-met"
635 and publish_options is not None
636 ):
625 # this usually happens when publish-options can't be set 637 # this usually happens when publish-options can't be set
626 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') 638 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, "raise")
627 if policy == 'raise': 639 if policy == "raise":
628 raise e 640 raise e
629 elif policy == 'publish_without_options': 641 elif policy == "publish_without_options":
630 log.warning(_( 642 log.warning(
631 "Can't use publish-options ({options}) on node {node}, " 643 _(
632 "re-publishing without them: {reason}").format( 644 "Can't use publish-options ({options}) on node {node}, "
633 options=', '.join(f'{k} = {v}' 645 "re-publishing without them: {reason}"
634 for k,v in publish_options.items()), 646 ).format(
647 options=", ".join(
648 f"{k} = {v}" for k, v in publish_options.items()
649 ),
635 node=nodeIdentifier, 650 node=nodeIdentifier,
636 reason=e, 651 reason=e,
637 ) 652 )
638 ) 653 )
639 iq_result = await self.publish( 654 iq_result = await self.publish(
640 client, service, nodeIdentifier, parsed_items) 655 client, service, nodeIdentifier, parsed_items
656 )
641 else: 657 else:
642 raise exceptions.InternalError( 658 raise exceptions.InternalError(
643 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " 659 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
644 f"{policy}" 660 f"{policy}"
645 ) 661 )
646 else: 662 else:
647 raise e 663 raise e
648 try: 664 try:
649 return [ 665 return [
650 item['id'] 666 item["id"]
651 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item') 667 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, "item")
652 ] 668 ]
653 except AttributeError: 669 except AttributeError:
654 return [] 670 return []
655 671
656 async def publish( 672 async def publish(
659 service: jid.JID, 675 service: jid.JID,
660 nodeIdentifier: str, 676 nodeIdentifier: str,
661 items: Optional[List[domish.Element]] = None, 677 items: Optional[List[domish.Element]] = None,
662 options: Optional[dict] = None, 678 options: Optional[dict] = None,
663 sender: Optional[jid.JID] = None, 679 sender: Optional[jid.JID] = None,
664 extra: Optional[Dict[str, Any]] = None 680 extra: Optional[Dict[str, Any]] = None,
665 ) -> domish.Element: 681 ) -> domish.Element:
666 """Publish pubsub items 682 """Publish pubsub items
667 683
668 @param sender: sender of the request, 684 @param sender: sender of the request,
669 client.jid will be used if nto set 685 client.jid will be used if nto set
677 if sender is None: 693 if sender is None:
678 sender = client.jid 694 sender = client.jid
679 if extra is None: 695 if extra is None:
680 extra = {} 696 extra = {}
681 if not await self.host.trigger.async_point( 697 if not await self.host.trigger.async_point(
682 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, 698 "XEP-0060_publish",
683 extra 699 client,
700 service,
701 nodeIdentifier,
702 items,
703 options,
704 sender,
705 extra,
684 ): 706 ):
685 return extra["iq_result_elt"] 707 return extra["iq_result_elt"]
686 iq_result_elt = await client.pubsub_client.publish( 708 iq_result_elt = await client.pubsub_client.publish(
687 service, nodeIdentifier, items, sender, 709 service, nodeIdentifier, items, sender, options=options
688 options=options
689 ) 710 )
690 return iq_result_elt 711 return iq_result_elt
691 712
692 def _unwrap_mam_message(self, message_elt): 713 def _unwrap_mam_message(self, message_elt):
693 try: 714 try:
694 item_elt = reduce( 715 item_elt = reduce(
695 lambda elt, ns_name: next(elt.elements(*ns_name)), 716 lambda elt, ns_name: next(elt.elements(*ns_name)),
696 (message_elt, 717 (
697 (mam.NS_MAM, "result"), 718 message_elt,
698 (C.NS_FORWARD, "forwarded"), 719 (mam.NS_MAM, "result"),
699 (C.NS_CLIENT, "message"), 720 (C.NS_FORWARD, "forwarded"),
700 ("http://jabber.org/protocol/pubsub#event", "event"), 721 (C.NS_CLIENT, "message"),
701 ("http://jabber.org/protocol/pubsub#event", "items"), 722 ("http://jabber.org/protocol/pubsub#event", "event"),
702 ("http://jabber.org/protocol/pubsub#event", "item"), 723 ("http://jabber.org/protocol/pubsub#event", "items"),
703 )) 724 ("http://jabber.org/protocol/pubsub#event", "item"),
725 ),
726 )
704 except StopIteration: 727 except StopIteration:
705 raise exceptions.DataError("Can't find Item in MAM message element") 728 raise exceptions.DataError("Can't find Item in MAM message element")
706 return item_elt 729 return item_elt
707 730
708 def serialise_items(self, items_data): 731 def serialise_items(self, items_data):
709 items, metadata = items_data 732 items, metadata = items_data
710 metadata['items'] = items 733 metadata["items"] = items
711 return data_format.serialise(metadata) 734 return data_format.serialise(metadata)
712 735
713 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, 736 def _get_items(
714 extra="", profile_key=C.PROF_KEY_NONE): 737 self,
738 service="",
739 node="",
740 max_items=10,
741 item_ids=None,
742 sub_id=None,
743 extra="",
744 profile_key=C.PROF_KEY_NONE,
745 ):
715 """Get items from pubsub node 746 """Get items from pubsub node
716 747
717 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit 748 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
718 """ 749 """
719 client = self.host.get_client(profile_key) 750 client = self.host.get_client(profile_key)
720 service = jid.JID(service) if service else None 751 service = jid.JID(service) if service else None
721 max_items = None if max_items == C.NO_LIMIT else max_items 752 max_items = None if max_items == C.NO_LIMIT else max_items
722 extra = self.parse_extra(data_format.deserialise(extra)) 753 extra = self.parse_extra(data_format.deserialise(extra))
723 d = defer.ensureDeferred(self.get_items( 754 d = defer.ensureDeferred(
724 client, 755 self.get_items(
725 service, 756 client,
726 node, 757 service,
727 max_items, 758 node,
728 item_ids, 759 max_items,
729 sub_id or None, 760 item_ids,
730 extra.rsm_request, 761 sub_id or None,
731 extra.extra, 762 extra.rsm_request,
732 )) 763 extra.extra,
764 )
765 )
733 d.addCallback(self.trans_items_data) 766 d.addCallback(self.trans_items_data)
734 d.addCallback(self.serialise_items) 767 d.addCallback(self.serialise_items)
735 return d 768 return d
736 769
737 async def get_items( 770 async def get_items(
741 node: str, 774 node: str,
742 max_items: Optional[int] = None, 775 max_items: Optional[int] = None,
743 item_ids: Optional[List[str]] = None, 776 item_ids: Optional[List[str]] = None,
744 sub_id: Optional[str] = None, 777 sub_id: Optional[str] = None,
745 rsm_request: Optional[rsm.RSMRequest] = None, 778 rsm_request: Optional[rsm.RSMRequest] = None,
746 extra: Optional[dict] = None 779 extra: Optional[dict] = None,
747 ) -> Tuple[List[domish.Element], dict]: 780 ) -> Tuple[List[domish.Element], dict]:
748 """Retrieve pubsub items from a node. 781 """Retrieve pubsub items from a node.
749 782
750 @param service (JID, None): pubsub service. 783 @param service (JID, None): pubsub service.
751 @param node (str): node id. 784 @param node (str): node id.
768 if rsm_request and item_ids: 801 if rsm_request and item_ids:
769 raise ValueError("items_id can't be used with rsm") 802 raise ValueError("items_id can't be used with rsm")
770 if extra is None: 803 if extra is None:
771 extra = {} 804 extra = {}
772 cont, ret = await self.host.trigger.async_return_point( 805 cont, ret = await self.host.trigger.async_return_point(
773 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, 806 "XEP-0060_getItems",
774 rsm_request, extra 807 client,
808 service,
809 node,
810 max_items,
811 item_ids,
812 sub_id,
813 rsm_request,
814 extra,
775 ) 815 )
776 if not cont: 816 if not cont:
777 return ret 817 return ret
778 try: 818 try:
779 mam_query = extra["mam"] 819 mam_query = extra["mam"]
780 except KeyError: 820 except KeyError:
781 d = defer.ensureDeferred(client.pubsub_client.items( 821 d = defer.ensureDeferred(
782 service = service, 822 client.pubsub_client.items(
783 nodeIdentifier = node, 823 service=service,
784 maxItems = max_items, 824 nodeIdentifier=node,
785 subscriptionIdentifier = sub_id, 825 maxItems=max_items,
786 sender = None, 826 subscriptionIdentifier=sub_id,
787 itemIdentifiers = item_ids, 827 sender=None,
788 orderBy = extra.get(C.KEY_ORDER_BY), 828 itemIdentifiers=item_ids,
789 rsm_request = rsm_request, 829 orderBy=extra.get(C.KEY_ORDER_BY),
790 extra = extra 830 rsm_request=rsm_request,
791 )) 831 extra=extra,
832 )
833 )
792 # we have no MAM data here, so we add None 834 # we have no MAM data here, so we add None
793 d.addErrback(sat_defer.stanza_2_not_found) 835 d.addErrback(sat_defer.stanza_2_not_found)
794 d.addTimeout(TIMEOUT, reactor) 836 d.addTimeout(TIMEOUT, reactor)
795 items, rsm_response = await d 837 items, rsm_response = await d
796 mam_response = None 838 mam_response = None
842 if mam_response is not None: 884 if mam_response is not None:
843 # mam_response is a dict with "complete" and "stable" keys 885 # mam_response is a dict with "complete" and "stable" keys
844 # we can put them directly in metadata 886 # we can put them directly in metadata
845 metadata.update(mam_response) 887 metadata.update(mam_response)
846 if rsm_request is not None and rsm_response is not None: 888 if rsm_request is not None and rsm_response is not None:
847 metadata['rsm'] = rsm_response.toDict() 889 metadata["rsm"] = rsm_response.toDict()
848 if mam_response is None: 890 if mam_response is None:
849 index = rsm_response.index 891 index = rsm_response.index
850 count = rsm_response.count 892 count = rsm_response.count
851 if index is None or count is None: 893 if index is None or count is None:
852 # we don't have enough information to know if the data is complete 894 # we don't have enough information to know if the data is complete
885 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) 927 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
886 # continue # avoid pubsub "item-not-found" error 928 # continue # avoid pubsub "item-not-found" error
887 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) 929 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
888 # defer.returnValue(d_dict) 930 # defer.returnValue(d_dict)
889 931
890 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, 932 def getOptions(
891 profile_key=C.PROF_KEY_NONE): 933 self,
934 service,
935 nodeIdentifier,
936 subscriber,
937 subscriptionIdentifier=None,
938 profile_key=C.PROF_KEY_NONE,
939 ):
892 client = self.host.get_client(profile_key) 940 client = self.host.get_client(profile_key)
893 return client.pubsub_client.getOptions( 941 return client.pubsub_client.getOptions(
894 service, nodeIdentifier, subscriber, subscriptionIdentifier 942 service, nodeIdentifier, subscriber, subscriptionIdentifier
895 ) 943 )
896 944
897 def setOptions(self, service, nodeIdentifier, subscriber, options, 945 def setOptions(
898 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 946 self,
947 service,
948 nodeIdentifier,
949 subscriber,
950 options,
951 subscriptionIdentifier=None,
952 profile_key=C.PROF_KEY_NONE,
953 ):
899 client = self.host.get_client(profile_key) 954 client = self.host.get_client(profile_key)
900 return client.pubsub_client.setOptions( 955 return client.pubsub_client.setOptions(
901 service, nodeIdentifier, subscriber, options, subscriptionIdentifier 956 service, nodeIdentifier, subscriber, options, subscriptionIdentifier
902 ) 957 )
903 958
910 def createNode( 965 def createNode(
911 self, 966 self,
912 client: SatXMPPClient, 967 client: SatXMPPClient,
913 service: jid.JID, 968 service: jid.JID,
914 nodeIdentifier: Optional[str] = None, 969 nodeIdentifier: Optional[str] = None,
915 options: Optional[Dict[str, str]] = None 970 options: Optional[Dict[str, str]] = None,
916 ) -> str: 971 ) -> str:
917 """Create a new node 972 """Create a new node
918 973
919 @param service: PubSub service, 974 @param service: PubSub service,
920 @param NodeIdentifier: node name use None to create instant node (identifier will 975 @param NodeIdentifier: node name use None to create instant node (identifier will
1036 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): 1091 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key):
1037 client = self.host.get_client(profile_key) 1092 client = self.host.get_client(profile_key)
1038 d = self.get_node_affiliations( 1093 d = self.get_node_affiliations(
1039 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1094 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1040 ) 1095 )
1041 d.addCallback( 1096 d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.items()})
1042 lambda affiliations: {j.full(): a for j, a in affiliations.items()}
1043 )
1044 return d 1097 return d
1045 1098
1046 def get_node_affiliations(self, client, service, nodeIdentifier): 1099 def get_node_affiliations(self, client, service, nodeIdentifier):
1047 """Retrieve affiliations of a node owned by profile""" 1100 """Retrieve affiliations of a node owned by profile"""
1048 request = pubsub.PubSubRequest("affiliationsGet") 1101 request = pubsub.PubSubRequest("affiliationsGet")
1120 return self.deleteNode( 1173 return self.deleteNode(
1121 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1174 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1122 ) 1175 )
1123 1176
1124 def deleteNode( 1177 def deleteNode(
1125 self, 1178 self, client: SatXMPPClient, service: jid.JID, nodeIdentifier: str
1126 client: SatXMPPClient,
1127 service: jid.JID,
1128 nodeIdentifier: str
1129 ) -> defer.Deferred: 1179 ) -> defer.Deferred:
1130 return client.pubsub_client.deleteNode(service, nodeIdentifier) 1180 return client.pubsub_client.deleteNode(service, nodeIdentifier)
1131 1181
1132 def _addWatch(self, service_s, node, profile_key): 1182 def _addWatch(self, service_s, node, profile_key):
1133 """watch modifications on a node 1183 """watch modifications on a node
1186 new_id, 1236 new_id,
1187 profile_key=C.PROF_KEY_NONE, 1237 profile_key=C.PROF_KEY_NONE,
1188 ): 1238 ):
1189 client = self.host.get_client(profile_key) 1239 client = self.host.get_client(profile_key)
1190 service = jid.JID(service) if service else None 1240 service = jid.JID(service) if service else None
1191 return defer.ensureDeferred(self.rename_item( 1241 return defer.ensureDeferred(
1192 client, service, node, item_id, new_id 1242 self.rename_item(client, service, node, item_id, new_id)
1193 )) 1243 )
1194 1244
1195 async def rename_item( 1245 async def rename_item(
1196 self, 1246 self,
1197 client: SatXMPPEntity, 1247 client: SatXMPPEntity,
1198 service: Optional[jid.JID], 1248 service: Optional[jid.JID],
1199 node: str, 1249 node: str,
1200 item_id: str, 1250 item_id: str,
1201 new_id: str 1251 new_id: str,
1202 ) -> None: 1252 ) -> None:
1203 """Rename an item by recreating it then deleting it 1253 """Rename an item by recreating it then deleting it
1204 1254
1205 we have to recreate then delete because there is currently no rename operation 1255 we have to recreate then delete because there is currently no rename operation
1206 with PubSub 1256 with PubSub
1216 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 1266 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
1217 client = self.host.get_client(profile_key) 1267 client = self.host.get_client(profile_key)
1218 service = None if not service else jid.JID(service) 1268 service = None if not service else jid.JID(service)
1219 d = defer.ensureDeferred( 1269 d = defer.ensureDeferred(
1220 self.subscribe( 1270 self.subscribe(
1221 client, 1271 client, service, nodeIdentifier, options=data_format.deserialise(options)
1222 service,
1223 nodeIdentifier,
1224 options=data_format.deserialise(options)
1225 ) 1272 )
1226 ) 1273 )
1227 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") 1274 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
1228 return d 1275 return d
1229 1276
1231 self, 1278 self,
1232 client: SatXMPPEntity, 1279 client: SatXMPPEntity,
1233 service: Optional[jid.JID], 1280 service: Optional[jid.JID],
1234 nodeIdentifier: str, 1281 nodeIdentifier: str,
1235 sub_jid: Optional[jid.JID] = None, 1282 sub_jid: Optional[jid.JID] = None,
1236 options: Optional[dict] = None 1283 options: Optional[dict] = None,
1237 ) -> pubsub.Subscription: 1284 ) -> pubsub.Subscription:
1238 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 1285 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
1239 if service is None: 1286 if service is None:
1240 service = client.jid.userhostJID() 1287 service = client.jid.userhostJID()
1241 cont, trigger_sub = await self.host.trigger.async_return_point( 1288 cont, trigger_sub = await self.host.trigger.async_return_point(
1242 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, 1289 "XEP-0060_subscribe",
1290 client,
1291 service,
1292 nodeIdentifier,
1293 sub_jid,
1294 options,
1243 ) 1295 )
1244 if not cont: 1296 if not cont:
1245 return trigger_sub 1297 return trigger_sub
1246 try: 1298 try:
1247 subscription = await client.pubsub_client.subscribe( 1299 subscription = await client.pubsub_client.subscribe(
1248 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), 1300 service,
1249 options=options, sender=client.jid.userhostJID() 1301 nodeIdentifier,
1302 sub_jid or client.jid.userhostJID(),
1303 options=options,
1304 sender=client.jid.userhostJID(),
1250 ) 1305 )
1251 except error.StanzaError as e: 1306 except error.StanzaError as e:
1252 if e.condition == 'item-not-found': 1307 if e.condition == "item-not-found":
1253 raise exceptions.NotFound(e.text or e.condition) 1308 raise exceptions.NotFound(e.text or e.condition)
1254 else: 1309 else:
1255 raise e 1310 raise e
1256 return subscription 1311 return subscription
1257 1312
1268 sub_jid: Optional[jid.JID] = None, 1323 sub_jid: Optional[jid.JID] = None,
1269 subscriptionIdentifier: Optional[str] = None, 1324 subscriptionIdentifier: Optional[str] = None,
1270 sender: Optional[jid.JID] = None, 1325 sender: Optional[jid.JID] = None,
1271 ) -> None: 1326 ) -> None:
1272 if not await self.host.trigger.async_point( 1327 if not await self.host.trigger.async_point(
1273 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, 1328 "XEP-0060_unsubscribe",
1274 subscriptionIdentifier, sender 1329 client,
1330 service,
1331 nodeIdentifier,
1332 sub_jid,
1333 subscriptionIdentifier,
1334 sender,
1275 ): 1335 ):
1276 return 1336 return
1277 try: 1337 try:
1278 await client.pubsub_client.unsubscribe( 1338 await client.pubsub_client.unsubscribe(
1279 service, 1339 service,
1280 nodeIdentifier, 1340 nodeIdentifier,
1281 sub_jid or client.jid.userhostJID(), 1341 sub_jid or client.jid.userhostJID(),
1282 subscriptionIdentifier, 1342 subscriptionIdentifier,
1283 sender, 1343 sender,
1284 ) 1344 )
1285 except error.StanzaError as e: 1345 except error.StanzaError as e:
1286 try: 1346 try:
1287 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) 1347 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed"))
1288 except StopIteration: 1348 except StopIteration:
1289 raise e 1349 raise e
1293 f"subscribed to node {nodeIdentifier!s} at {service.full()}" 1353 f"subscribed to node {nodeIdentifier!s} at {service.full()}"
1294 ) 1354 )
1295 1355
1296 @utils.ensure_deferred 1356 @utils.ensure_deferred
1297 async def _subscriptions( 1357 async def _subscriptions(
1298 self, 1358 self, service="", nodeIdentifier="", profile_key=C.PROF_KEY_NONE
1299 service="",
1300 nodeIdentifier="",
1301 profile_key=C.PROF_KEY_NONE
1302 ) -> str: 1359 ) -> str:
1303 client = self.host.get_client(profile_key) 1360 client = self.host.get_client(profile_key)
1304 service = None if not service else jid.JID(service) 1361 service = None if not service else jid.JID(service)
1305 subs = await self.subscriptions(client, service, nodeIdentifier or None) 1362 subs = await self.subscriptions(client, service, nodeIdentifier or None)
1306 return data_format.serialise(subs) 1363 return data_format.serialise(subs)
1307 1364
1308 async def subscriptions( 1365 async def subscriptions(
1309 self, 1366 self,
1310 client: SatXMPPEntity, 1367 client: SatXMPPEntity,
1311 service: Optional[jid.JID] = None, 1368 service: Optional[jid.JID] = None,
1312 node: Optional[str] = None 1369 node: Optional[str] = None,
1313 ) -> List[Dict[str, Union[str, bool]]]: 1370 ) -> List[Dict[str, Union[str, bool]]]:
1314 """Retrieve subscriptions from a service 1371 """Retrieve subscriptions from a service
1315 1372
1316 @param service(jid.JID): PubSub service 1373 @param service(jid.JID): PubSub service
1317 @param nodeIdentifier(unicode, None): node to check 1374 @param nodeIdentifier(unicode, None): node to check
1397 1454
1398 def eb(failure_): 1455 def eb(failure_):
1399 log.warning(f"Error while parsing item: {failure_.value}") 1456 log.warning(f"Error while parsing item: {failure_.value}")
1400 1457
1401 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) 1458 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
1402 d.addCallback(lambda parsed_items: ( 1459 d.addCallback(
1403 [i for i in parsed_items if i is not None], 1460 lambda parsed_items: ([i for i in parsed_items if i is not None], metadata)
1404 metadata 1461 )
1405 ))
1406 return d 1462 return d
1407 1463
1408 def ser_d_list(self, results, failure_result=None): 1464 def ser_d_list(self, results, failure_result=None):
1409 """Serialise a DeferredList result 1465 """Serialise a DeferredList result
1410 1466
1416 - result 1472 - result
1417 """ 1473 """
1418 if failure_result is None: 1474 if failure_result is None:
1419 failure_result = () 1475 failure_result = ()
1420 return [ 1476 return [
1421 ("", result) 1477 (
1422 if success 1478 ("", result)
1423 else (str(result.result) or UNSPECIFIED, failure_result) 1479 if success
1480 else (str(result.result) or UNSPECIFIED, failure_result)
1481 )
1424 for success, result in results 1482 for success, result in results
1425 ] 1483 ]
1426 1484
1427 # subscribe # 1485 # subscribe #
1428 1486
1429 @utils.ensure_deferred 1487 @utils.ensure_deferred
1430 async def _get_node_subscriptions( 1488 async def _get_node_subscriptions(
1431 self, 1489 self, service: str, node: str, profile_key: str
1432 service: str,
1433 node: str,
1434 profile_key: str
1435 ) -> Dict[str, str]: 1490 ) -> Dict[str, str]:
1436 client = self.host.get_client(profile_key) 1491 client = self.host.get_client(profile_key)
1437 subs = await self.get_node_subscriptions( 1492 subs = await self.get_node_subscriptions(
1438 client, jid.JID(service) if service else None, node 1493 client, jid.JID(service) if service else None, node
1439 ) 1494 )
1440 return {j.full(): a for j, a in subs.items()} 1495 return {j.full(): a for j, a in subs.items()}
1441 1496
1442 async def get_node_subscriptions( 1497 async def get_node_subscriptions(
1443 self, 1498 self, client: SatXMPPEntity, service: Optional[jid.JID], nodeIdentifier: str
1444 client: SatXMPPEntity,
1445 service: Optional[jid.JID],
1446 nodeIdentifier: str
1447 ) -> Dict[jid.JID, str]: 1499 ) -> Dict[jid.JID, str]:
1448 """Retrieve subscriptions to a node 1500 """Retrieve subscriptions to a node
1449 1501
1450 @param nodeIdentifier(unicode): node to get subscriptions from 1502 @param nodeIdentifier(unicode): node to get subscriptions from
1451 """ 1503 """
1469 except AttributeError as e: 1521 except AttributeError as e:
1470 raise ValueError(_("Invalid result: {}").format(e)) 1522 raise ValueError(_("Invalid result: {}").format(e))
1471 try: 1523 try:
1472 return { 1524 return {
1473 jid.JID(s["jid"]): s["subscription"] 1525 jid.JID(s["jid"]): s["subscription"]
1474 for s in subscriptions_elt.elements( 1526 for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, "subscription"))
1475 (pubsub.NS_PUBSUB, "subscription")
1476 )
1477 } 1527 }
1478 except KeyError: 1528 except KeyError:
1479 raise ValueError( 1529 raise ValueError(
1480 _("Invalid result: bad <subscription> element: {}").format( 1530 _("Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)
1481 iq_elt.toXml
1482 )
1483 ) 1531 )
1484 1532
1485 def _set_node_subscriptions( 1533 def _set_node_subscriptions(
1486 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE 1534 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
1487 ): 1535 ):
1488 client = self.host.get_client(profile_key) 1536 client = self.host.get_client(profile_key)
1489 subscriptions = { 1537 subscriptions = {
1490 jid.JID(jid_): subscription 1538 jid.JID(jid_): subscription for jid_, subscription in subscriptions.items()
1491 for jid_, subscription in subscriptions.items()
1492 } 1539 }
1493 d = self.set_node_subscriptions( 1540 d = self.set_node_subscriptions(
1494 client, 1541 client,
1495 jid.JID(service_s) if service_s else None, 1542 jid.JID(service_s) if service_s else None,
1496 nodeIdentifier, 1543 nodeIdentifier,
1570 """ 1617 """
1571 client = self.host.get_client(profile_key) 1618 client = self.host.get_client(profile_key)
1572 deferreds = {} 1619 deferreds = {}
1573 for service, node in node_data: 1620 for service, node in node_data:
1574 deferreds[(service, node)] = defer.ensureDeferred( 1621 deferreds[(service, node)] = defer.ensureDeferred(
1575 client.pubsub_client.subscribe( 1622 client.pubsub_client.subscribe(service, node, subscriber, options=options)
1576 service, node, subscriber, options=options
1577 )
1578 ) 1623 )
1579 return self.rt_sessions.new_session(deferreds, client.profile) 1624 return self.rt_sessions.new_session(deferreds, client.profile)
1580 # found_nodes = yield self.listNodes(service, profile=client.profile) 1625 # found_nodes = yield self.listNodes(service, profile=client.profile)
1581 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 1626 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
1582 # d_list = [] 1627 # d_list = []
1637 extra.rsm_request, 1682 extra.rsm_request,
1638 extra.extra, 1683 extra.extra,
1639 profile_key, 1684 profile_key,
1640 ) 1685 )
1641 1686
1642 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, 1687 def get_from_many(
1643 profile_key=C.PROF_KEY_NONE): 1688 self,
1689 node_data,
1690 max_item=None,
1691 rsm_request=None,
1692 extra=None,
1693 profile_key=C.PROF_KEY_NONE,
1694 ):
1644 """Get items from many nodes at once 1695 """Get items from many nodes at once
1645 1696
1646 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 1697 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
1647 - service (jid.JID) is the pubsub service 1698 - service (jid.JID) is the pubsub service
1648 - node (unicode) is the node to get items from 1699 - node (unicode) is the node to get items from
1652 @return (str): RT Deferred session id 1703 @return (str): RT Deferred session id
1653 """ 1704 """
1654 client = self.host.get_client(profile_key) 1705 client = self.host.get_client(profile_key)
1655 deferreds = {} 1706 deferreds = {}
1656 for service, node in node_data: 1707 for service, node in node_data:
1657 deferreds[(service, node)] = defer.ensureDeferred(self.get_items( 1708 deferreds[(service, node)] = defer.ensureDeferred(
1658 client, service, node, max_item, rsm_request=rsm_request, extra=extra 1709 self.get_items(
1659 )) 1710 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1711 )
1712 )
1660 return self.rt_sessions.new_session(deferreds, client.profile) 1713 return self.rt_sessions.new_session(deferreds, client.profile)
1661 1714
1662 1715
1663 @implementer(disco.IDisco) 1716 @implementer(disco.IDisco)
1664 class SatPubSubClient(rsm.PubSubClient): 1717 class SatPubSubClient(rsm.PubSubClient):
1684 extra: Optional[Dict[str, Any]] = None, 1737 extra: Optional[Dict[str, Any]] = None,
1685 ): 1738 ):
1686 if extra is None: 1739 if extra is None:
1687 extra = {} 1740 extra = {}
1688 items, rsm_response = await super().items( 1741 items, rsm_response = await super().items(
1689 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, 1742 service,
1690 itemIdentifiers, orderBy, rsm_request 1743 nodeIdentifier,
1744 maxItems,
1745 subscriptionIdentifier,
1746 sender,
1747 itemIdentifiers,
1748 orderBy,
1749 rsm_request,
1691 ) 1750 )
1692 # items must be returned, thus this async point can't stop the workflow (but it 1751 # items must be returned, thus this async point can't stop the workflow (but it
1693 # can modify returned items) 1752 # can modify returned items)
1694 await self.host.trigger.async_point( 1753 await self.host.trigger.async_point(
1695 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, 1754 "XEP-0060_items",
1696 extra 1755 self.parent,
1756 service,
1757 nodeIdentifier,
1758 items,
1759 rsm_response,
1760 extra,
1697 ) 1761 )
1698 return items, rsm_response 1762 return items, rsm_response
1699 1763
1700 def _get_node_callbacks(self, node, event): 1764 def _get_node_callbacks(self, node, event):
1701 """Generate callbacks from given node and event 1765 """Generate callbacks from given node and event
1723 """ 1787 """
1724 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): 1788 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS):
1725 try: 1789 try:
1726 await utils.as_deferred(callback, client, event) 1790 await utils.as_deferred(callback, client, event)
1727 except Exception as e: 1791 except Exception as e:
1728 log.error( 1792 log.error(f"Error while running items event callback {callback}: {e}")
1729 f"Error while running items event callback {callback}: {e}"
1730 )
1731 1793
1732 def itemsReceived(self, event): 1794 def itemsReceived(self, event):
1733 log.debug("Pubsub items received") 1795 log.debug("Pubsub items received")
1734 client = self.parent 1796 client = self.parent
1735 defer.ensureDeferred(self._call_node_callbacks(client, event)) 1797 defer.ensureDeferred(self._call_node_callbacks(client, event))
1745 1807
1746 def deleteReceived(self, event): 1808 def deleteReceived(self, event):
1747 log.debug(("Publish node deleted")) 1809 log.debug(("Publish node deleted"))
1748 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): 1810 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
1749 d = utils.as_deferred(callback, self.parent, event) 1811 d = utils.as_deferred(callback, self.parent, event)
1750 d.addErrback(lambda f: log.error( 1812 d.addErrback(
1751 f"Error while running delete event callback {callback}: {f}" 1813 lambda f: log.error(
1752 )) 1814 f"Error while running delete event callback {callback}: {f}"
1815 )
1816 )
1753 client = self.parent 1817 client = self.parent
1754 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1818 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1755 self.host.bridge.ps_event_raw( 1819 self.host.bridge.ps_event_raw(
1756 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile 1820 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
1757 ) 1821 )
1758 1822
1759 def purgeReceived(self, event): 1823 def purgeReceived(self, event):
1760 log.debug(("Publish node purged")) 1824 log.debug(("Publish node purged"))
1761 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): 1825 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
1762 d = utils.as_deferred(callback, self.parent, event) 1826 d = utils.as_deferred(callback, self.parent, event)
1763 d.addErrback(lambda f: log.error( 1827 d.addErrback(
1764 f"Error while running purge event callback {callback}: {f}" 1828 lambda f: log.error(
1765 )) 1829 f"Error while running purge event callback {callback}: {f}"
1830 )
1831 )
1766 client = self.parent 1832 client = self.parent
1767 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1833 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1768 self.host.bridge.ps_event_raw( 1834 self.host.bridge.ps_event_raw(
1769 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile 1835 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
1770 ) 1836 )
1806 @param service(jid.JID, None): service to send the item to 1872 @param service(jid.JID, None): service to send the item to
1807 None to use PEP 1873 None to use PEP
1808 @param NodeIdentifier(unicode): PubSub node to use 1874 @param NodeIdentifier(unicode): PubSub node to use
1809 """ 1875 """
1810 # TODO: propose this upstream and remove it once merged 1876 # TODO: propose this upstream and remove it once merged
1811 request = pubsub.PubSubRequest('purge') 1877 request = pubsub.PubSubRequest("purge")
1812 request.recipient = service 1878 request.recipient = service
1813 request.nodeIdentifier = nodeIdentifier 1879 request.nodeIdentifier = nodeIdentifier
1814 return request.send(self.xmlstream) 1880 return request.send(self.xmlstream)
1815 1881
1816 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): 1882 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):