Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 3584:edc79cefe968
plugin XEP-0060: `getItem(s)`, `publish` and `(un)subscribe` are now coroutines
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 30 Jun 2021 16:19:14 +0200 |
parents | 02eec2a5b5f9 |
children | 5f65f4e9f8cb |
comparison
equal
deleted
inserted
replaced
3583:16ade4ad63f3 | 3584:edc79cefe968 |
---|---|
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 | 19 |
20 from typing import Optional | 20 from typing import Optional, List, Tuple |
21 from collections import namedtuple | 21 from collections import namedtuple |
22 import urllib.request, urllib.parse, urllib.error | 22 import urllib.request, urllib.parse, urllib.error |
23 from functools import reduce | 23 from functools import reduce |
24 from zope.interface import implementer | 24 from zope.interface import implementer |
25 from twisted.words.xish import domish | 25 from twisted.words.xish import domish |
33 from wokkel import rsm | 33 from wokkel import rsm |
34 from wokkel import mam | 34 from wokkel import mam |
35 from sat.core.i18n import _ | 35 from sat.core.i18n import _ |
36 from sat.core.constants import Const as C | 36 from sat.core.constants import Const as C |
37 from sat.core.log import getLogger | 37 from sat.core.log import getLogger |
38 from sat.core.xmpp import SatXMPPEntity | 38 from sat.core.core_types import SatXMPPEntity |
39 from sat.core import exceptions | 39 from sat.core import exceptions |
40 from sat.tools import sat_defer | 40 from sat.tools import sat_defer |
41 from sat.tools import xml_tools | 41 from sat.tools import xml_tools |
42 from sat.tools.common import data_format | 42 from sat.tools.common import data_format |
43 | 43 |
469 profile_key=C.PROF_KEY_NONE): | 469 profile_key=C.PROF_KEY_NONE): |
470 client = self.host.getClient(profile_key) | 470 client = self.host.getClient(profile_key) |
471 service = None if not service else jid.JID(service) | 471 service = None if not service else jid.JID(service) |
472 payload = xml_tools.parse(payload) | 472 payload = xml_tools.parse(payload) |
473 extra = data_format.deserialise(extra_ser) | 473 extra = data_format.deserialise(extra_ser) |
474 d = self.sendItem( | 474 d = defer.ensureDeferred(self.sendItem( |
475 client, service, nodeIdentifier, payload, item_id or None, extra | 475 client, service, nodeIdentifier, payload, item_id or None, extra |
476 ) | 476 )) |
477 d.addCallback(lambda ret: ret or "") | 477 d.addCallback(lambda ret: ret or "") |
478 return d | 478 return d |
479 | 479 |
480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, | 480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, |
481 profile_key=C.PROF_KEY_NONE): | 481 profile_key=C.PROF_KEY_NONE): |
485 items = [xml_tools.parse(item) for item in items] | 485 items = [xml_tools.parse(item) for item in items] |
486 except Exception as e: | 486 except Exception as e: |
487 raise exceptions.DataError(_("Can't parse items: {msg}").format( | 487 raise exceptions.DataError(_("Can't parse items: {msg}").format( |
488 msg=e)) | 488 msg=e)) |
489 extra = data_format.deserialise(extra_ser) | 489 extra = data_format.deserialise(extra_ser) |
490 d = self.sendItems( | 490 return defer.ensureDeferred(self.sendItems( |
491 client, service, nodeIdentifier, items, extra | 491 client, service, nodeIdentifier, items, extra |
492 ) | 492 )) |
493 return d | 493 |
494 | 494 async def sendItem( |
495 def _getPublishedItemId(self, published_ids, original_id): | 495 self, client, service, nodeIdentifier, payload, item_id=None, extra=None |
496 """Return item of published id if found in answer | 496 ): |
497 | |
498 if not found original_id is returned, which may be None | |
499 """ | |
500 try: | |
501 return published_ids[0] | |
502 except IndexError: | |
503 return original_id | |
504 | |
505 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, | |
506 extra=None): | |
507 """High level method to send one item | 497 """High level method to send one item |
508 | 498 |
509 @param service(jid.JID, None): service to send the item to | 499 @param service(jid.JID, None): service to send the item to |
510 None to use PEP | 500 None to use PEP |
511 @param NodeIdentifier(unicode): PubSub node to use | 501 @param NodeIdentifier(unicode): PubSub node to use |
517 assert isinstance(payload, domish.Element) | 507 assert isinstance(payload, domish.Element) |
518 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) | 508 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) |
519 if item_id is not None: | 509 if item_id is not None: |
520 item_elt['id'] = item_id | 510 item_elt['id'] = item_id |
521 item_elt.addChild(payload) | 511 item_elt.addChild(payload) |
522 d = defer.ensureDeferred(self.sendItems( | 512 published_ids = await self.sendItems( |
523 client, | 513 client, |
524 service, | 514 service, |
525 nodeIdentifier, | 515 nodeIdentifier, |
526 [item_elt], | 516 [item_elt], |
527 extra | 517 extra |
528 )) | 518 ) |
529 d.addCallback(self._getPublishedItemId, item_id) | 519 try: |
530 return d | 520 return published_ids[0] |
521 except IndexError: | |
522 return item_id | |
531 | 523 |
532 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): | 524 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): |
533 """High level method to send several items at once | 525 """High level method to send several items at once |
534 | 526 |
535 @param service(jid.JID, None): service to send the item to | 527 @param service(jid.JID, None): service to send the item to |
591 return [item['id'] | 583 return [item['id'] |
592 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] | 584 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] |
593 except AttributeError: | 585 except AttributeError: |
594 return [] | 586 return [] |
595 | 587 |
596 def publish(self, client, service, nodeIdentifier, items=None, options=None): | 588 async def publish(self, client, service, nodeIdentifier, items=None, options=None): |
597 return client.pubsub_client.publish( | 589 return await client.pubsub_client.publish( |
598 service, nodeIdentifier, items, client.pubsub_client.parent.jid, | 590 service, nodeIdentifier, items, client.pubsub_client.parent.jid, |
599 options=options | 591 options=options |
600 ) | 592 ) |
601 | 593 |
602 def _unwrapMAMMessage(self, message_elt): | 594 def _unwrapMAMMessage(self, message_elt): |
628 """ | 620 """ |
629 client = self.host.getClient(profile_key) | 621 client = self.host.getClient(profile_key) |
630 service = jid.JID(service) if service else None | 622 service = jid.JID(service) if service else None |
631 max_items = None if max_items == C.NO_LIMIT else max_items | 623 max_items = None if max_items == C.NO_LIMIT else max_items |
632 extra = self.parseExtra(extra_dict) | 624 extra = self.parseExtra(extra_dict) |
633 d = self.getItems( | 625 d = defer.ensureDeferred(self.getItems( |
634 client, | 626 client, |
635 service, | 627 service, |
636 node or None, | 628 node or None, |
637 max_items or None, | 629 max_items or None, |
638 item_ids, | 630 item_ids, |
639 sub_id or None, | 631 sub_id or None, |
640 extra.rsm_request, | 632 extra.rsm_request, |
641 extra.extra, | 633 extra.extra, |
642 ) | 634 )) |
643 d.addCallback(self.transItemsData) | 635 d.addCallback(self.transItemsData) |
644 d.addCallback(self.serialiseItems) | 636 d.addCallback(self.serialiseItems) |
645 return d | 637 return d |
646 | 638 |
647 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, | 639 async def getItems( |
648 rsm_request=None, extra=None): | 640 self, |
641 client: SatXMPPEntity, | |
642 service: Optional[jid.JID], | |
643 node: str, | |
644 max_items: Optional[int] = None, | |
645 item_ids: Optional[List[str]] = None, | |
646 sub_id: Optional[str] = None, | |
647 rsm_request: Optional[rsm.RSMRequest] = None, | |
648 extra: Optional[dict] = None | |
649 ) -> Tuple[List[dict], dict]: | |
649 """Retrieve pubsub items from a node. | 650 """Retrieve pubsub items from a node. |
650 | 651 |
651 @param service (JID, None): pubsub service. | 652 @param service (JID, None): pubsub service. |
652 @param node (str): node id. | 653 @param node (str): node id. |
653 @param max_items (int): optional limit on the number of retrieved items. | 654 @param max_items (int): optional limit on the number of retrieved items. |
680 itemIdentifiers = item_ids, | 681 itemIdentifiers = item_ids, |
681 orderBy = extra.get(C.KEY_ORDER_BY), | 682 orderBy = extra.get(C.KEY_ORDER_BY), |
682 rsm_request = rsm_request | 683 rsm_request = rsm_request |
683 ) | 684 ) |
684 # we have no MAM data here, so we add None | 685 # we have no MAM data here, so we add None |
685 d.addCallback(lambda data: data + (None,)) | |
686 d.addErrback(sat_defer.stanza2NotFound) | 686 d.addErrback(sat_defer.stanza2NotFound) |
687 d.addTimeout(TIMEOUT, reactor) | 687 d.addTimeout(TIMEOUT, reactor) |
688 items, rsm_response = await d | |
689 mam_response = None | |
688 else: | 690 else: |
689 # if mam is requested, we have to do a totally different query | 691 # if mam is requested, we have to do a totally different query |
690 if self._mam is None: | 692 if self._mam is None: |
691 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") | 693 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") |
692 if max_items is not None: | 694 if max_items is not None: |
704 else: | 706 else: |
705 if mam_query.rsm != rsm_request: | 707 if mam_query.rsm != rsm_request: |
706 raise exceptions.DataError( | 708 raise exceptions.DataError( |
707 "Conflict between RSM request and MAM's RSM request" | 709 "Conflict between RSM request and MAM's RSM request" |
708 ) | 710 ) |
709 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) | 711 items, rsm_response, mam_response = await self._mam.getArchives( |
712 client, mam_query, service, self._unwrapMAMMessage | |
713 ) | |
710 | 714 |
711 try: | 715 try: |
712 subscribe = C.bool(extra["subscribe"]) | 716 subscribe = C.bool(extra["subscribe"]) |
713 except KeyError: | 717 except KeyError: |
714 subscribe = False | 718 subscribe = False |
715 | 719 |
716 def subscribeEb(failure, service, node): | 720 if subscribe: |
717 failure.trap(error.StanzaError) | 721 try: |
718 log.warning( | 722 await self.subscribe(client, service, node) |
719 "Could not subscribe to node {} on service {}: {}".format( | 723 except error.StanzaError as e: |
720 node, str(service), str(failure.value) | 724 log.warning( |
725 f"Could not subscribe to node {node} on service {service}: {e}" | |
721 ) | 726 ) |
722 ) | 727 |
723 | 728 # TODO: handle mam_response |
724 def doSubscribe(data): | 729 service_jid = service if service else client.jid.userhostJID() |
725 self.subscribe(client, service, node).addErrback( | 730 metadata = { |
726 subscribeEb, service, node | 731 "service": service_jid, |
727 ) | 732 "node": node, |
728 return data | 733 "uri": self.getNodeURI(service_jid, node), |
729 | 734 } |
730 if subscribe: | 735 if mam_response is not None: |
731 d.addCallback(doSubscribe) | 736 # mam_response is a dict with "complete" and "stable" keys |
732 | 737 # we can put them directly in metadata |
733 def addMetadata(result): | 738 metadata.update(mam_response) |
734 # TODO: handle the third argument (mam_response) | 739 if rsm_request is not None and rsm_response is not None: |
735 items, rsm_response, mam_response = result | 740 metadata['rsm'] = rsm_response.toDict() |
736 service_jid = service if service else client.jid.userhostJID() | 741 if mam_response is None: |
737 metadata = { | 742 index = rsm_response.index |
738 "service": service_jid, | 743 count = rsm_response.count |
739 "node": node, | 744 if index is None or count is None: |
740 "uri": self.getNodeURI(service_jid, node), | 745 # we don't have enough information to know if the data is complete |
741 } | 746 # or not |
742 if mam_response is not None: | 747 metadata["complete"] = None |
743 # mam_response is a dict with "complete" and "stable" keys | 748 else: |
744 # we can put them directly in metadata | 749 # normally we have a strict equality here but XEP-0059 states |
745 metadata.update(mam_response) | 750 # that index MAY be approximative, so just in case… |
746 if rsm_request is not None and rsm_response is not None: | 751 metadata["complete"] = index + len(items) >= count |
747 metadata['rsm'] = rsm_response.toDict() | 752 |
748 if mam_response is None: | 753 return (items, metadata) |
749 index = rsm_response.index | |
750 count = rsm_response.count | |
751 if index is None or count is None: | |
752 # we don't have enough information to know if the data is complete | |
753 # or not | |
754 metadata["complete"] = None | |
755 else: | |
756 # normally we have a strict equality here but XEP-0059 states | |
757 # that index MAY be approximative, so just in case… | |
758 metadata["complete"] = index + len(items) >= count | |
759 | |
760 return (items, metadata) | |
761 | |
762 d.addCallback(addMetadata) | |
763 return d | |
764 | 754 |
765 # @defer.inlineCallbacks | 755 # @defer.inlineCallbacks |
766 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | 756 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): |
767 # """Massively retrieve pubsub items from many nodes. | 757 # """Massively retrieve pubsub items from many nodes. |
768 # @param service (JID): target service. | 758 # @param service (JID): target service. |
1098 await self.retractItems(client, service, node, [item_id]) | 1088 await self.retractItems(client, service, node, [item_id]) |
1099 | 1089 |
1100 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 1090 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
1101 client = self.host.getClient(profile_key) | 1091 client = self.host.getClient(profile_key) |
1102 service = None if not service else jid.JID(service) | 1092 service = None if not service else jid.JID(service) |
1103 d = self.subscribe(client, service, nodeIdentifier, options=options or None) | 1093 d = defer.ensureDeferred( |
1094 self.subscribe(client, service, nodeIdentifier, options=options or None) | |
1095 ) | |
1104 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") | 1096 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") |
1105 return d | 1097 return d |
1106 | 1098 |
1107 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): | 1099 async def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): |
1108 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 1100 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
1109 return client.pubsub_client.subscribe( | 1101 return await client.pubsub_client.subscribe( |
1110 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options | 1102 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options |
1111 ) | 1103 ) |
1112 | 1104 |
1113 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | 1105 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): |
1114 client = self.host.getClient(profile_key) | 1106 client = self.host.getClient(profile_key) |
1115 service = None if not service else jid.JID(service) | 1107 service = None if not service else jid.JID(service) |
1116 return self.unsubscribe(client, service, nodeIdentifier) | 1108 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) |
1117 | 1109 |
1118 def unsubscribe( | 1110 async def unsubscribe( |
1119 self, | 1111 self, |
1120 client, | 1112 client, |
1121 service, | 1113 service, |
1122 nodeIdentifier, | 1114 nodeIdentifier, |
1123 sub_jid=None, | 1115 sub_jid=None, |
1124 subscriptionIdentifier=None, | 1116 subscriptionIdentifier=None, |
1125 sender=None, | 1117 sender=None, |
1126 ): | 1118 ): |
1127 return client.pubsub_client.unsubscribe( | 1119 return await client.pubsub_client.unsubscribe( |
1128 service, | 1120 service, |
1129 nodeIdentifier, | 1121 nodeIdentifier, |
1130 sub_jid or client.jid.userhostJID(), | 1122 sub_jid or client.jid.userhostJID(), |
1131 subscriptionIdentifier, | 1123 subscriptionIdentifier, |
1132 sender, | 1124 sender, |
1392 @return (str): RT Deferred session id | 1384 @return (str): RT Deferred session id |
1393 """ | 1385 """ |
1394 client = self.host.getClient(profile_key) | 1386 client = self.host.getClient(profile_key) |
1395 deferreds = {} | 1387 deferreds = {} |
1396 for service, node in node_data: | 1388 for service, node in node_data: |
1397 deferreds[(service, node)] = client.pubsub_client.subscribe( | 1389 deferreds[(service, node)] = defer.ensureDeferred( |
1398 service, node, subscriber, options=options | 1390 client.pubsub_client.subscribe( |
1391 service, node, subscriber, options=options | |
1392 ) | |
1399 ) | 1393 ) |
1400 return self.rt_sessions.newSession(deferreds, client.profile) | 1394 return self.rt_sessions.newSession(deferreds, client.profile) |
1401 # found_nodes = yield self.listNodes(service, profile=client.profile) | 1395 # found_nodes = yield self.listNodes(service, profile=client.profile) |
1402 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 1396 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
1403 # d_list = [] | 1397 # d_list = [] |
1473 @return (str): RT Deferred session id | 1467 @return (str): RT Deferred session id |
1474 """ | 1468 """ |
1475 client = self.host.getClient(profile_key) | 1469 client = self.host.getClient(profile_key) |
1476 deferreds = {} | 1470 deferreds = {} |
1477 for service, node in node_data: | 1471 for service, node in node_data: |
1478 deferreds[(service, node)] = self.getItems( | 1472 deferreds[(service, node)] = defer.ensureDeferred(self.getItems( |
1479 client, service, node, max_item, rsm_request=rsm_request, extra=extra | 1473 client, service, node, max_item, rsm_request=rsm_request, extra=extra |
1480 ) | 1474 )) |
1481 return self.rt_sessions.newSession(deferreds, client.profile) | 1475 return self.rt_sessions.newSession(deferreds, client.profile) |
1482 | 1476 |
1483 | 1477 |
1484 @implementer(disco.IDisco) | 1478 @implementer(disco.IDisco) |
1485 class SatPubSubClient(rsm.PubSubClient): | 1479 class SatPubSubClient(rsm.PubSubClient): |