comparison sat/plugins/plugin_xep_0060.py @ 3584:edc79cefe968

plugin XEP-0060: `getItem(s)`, `publish` and `(un)subscribe` are now coroutines
author Goffi <goffi@goffi.org>
date Wed, 30 Jun 2021 16:19:14 +0200
parents 02eec2a5b5f9
children 5f65f4e9f8cb
comparison
equal deleted inserted replaced
3583:16ade4ad63f3 3584:edc79cefe968
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 19
20 from typing import Optional 20 from typing import Optional, List, Tuple
21 from collections import namedtuple 21 from collections import namedtuple
22 import urllib.request, urllib.parse, urllib.error 22 import urllib.request, urllib.parse, urllib.error
23 from functools import reduce 23 from functools import reduce
24 from zope.interface import implementer 24 from zope.interface import implementer
25 from twisted.words.xish import domish 25 from twisted.words.xish import domish
33 from wokkel import rsm 33 from wokkel import rsm
34 from wokkel import mam 34 from wokkel import mam
35 from sat.core.i18n import _ 35 from sat.core.i18n import _
36 from sat.core.constants import Const as C 36 from sat.core.constants import Const as C
37 from sat.core.log import getLogger 37 from sat.core.log import getLogger
38 from sat.core.xmpp import SatXMPPEntity 38 from sat.core.core_types import SatXMPPEntity
39 from sat.core import exceptions 39 from sat.core import exceptions
40 from sat.tools import sat_defer 40 from sat.tools import sat_defer
41 from sat.tools import xml_tools 41 from sat.tools import xml_tools
42 from sat.tools.common import data_format 42 from sat.tools.common import data_format
43 43
469 profile_key=C.PROF_KEY_NONE): 469 profile_key=C.PROF_KEY_NONE):
470 client = self.host.getClient(profile_key) 470 client = self.host.getClient(profile_key)
471 service = None if not service else jid.JID(service) 471 service = None if not service else jid.JID(service)
472 payload = xml_tools.parse(payload) 472 payload = xml_tools.parse(payload)
473 extra = data_format.deserialise(extra_ser) 473 extra = data_format.deserialise(extra_ser)
474 d = self.sendItem( 474 d = defer.ensureDeferred(self.sendItem(
475 client, service, nodeIdentifier, payload, item_id or None, extra 475 client, service, nodeIdentifier, payload, item_id or None, extra
476 ) 476 ))
477 d.addCallback(lambda ret: ret or "") 477 d.addCallback(lambda ret: ret or "")
478 return d 478 return d
479 479
480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, 480 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None,
481 profile_key=C.PROF_KEY_NONE): 481 profile_key=C.PROF_KEY_NONE):
485 items = [xml_tools.parse(item) for item in items] 485 items = [xml_tools.parse(item) for item in items]
486 except Exception as e: 486 except Exception as e:
487 raise exceptions.DataError(_("Can't parse items: {msg}").format( 487 raise exceptions.DataError(_("Can't parse items: {msg}").format(
488 msg=e)) 488 msg=e))
489 extra = data_format.deserialise(extra_ser) 489 extra = data_format.deserialise(extra_ser)
490 d = self.sendItems( 490 return defer.ensureDeferred(self.sendItems(
491 client, service, nodeIdentifier, items, extra 491 client, service, nodeIdentifier, items, extra
492 ) 492 ))
493 return d 493
494 494 async def sendItem(
495 def _getPublishedItemId(self, published_ids, original_id): 495 self, client, service, nodeIdentifier, payload, item_id=None, extra=None
496 """Return item of published id if found in answer 496 ):
497
498 if not found original_id is returned, which may be None
499 """
500 try:
501 return published_ids[0]
502 except IndexError:
503 return original_id
504
505 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
506 extra=None):
507 """High level method to send one item 497 """High level method to send one item
508 498
509 @param service(jid.JID, None): service to send the item to 499 @param service(jid.JID, None): service to send the item to
510 None to use PEP 500 None to use PEP
511 @param NodeIdentifier(unicode): PubSub node to use 501 @param NodeIdentifier(unicode): PubSub node to use
517 assert isinstance(payload, domish.Element) 507 assert isinstance(payload, domish.Element)
518 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) 508 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
519 if item_id is not None: 509 if item_id is not None:
520 item_elt['id'] = item_id 510 item_elt['id'] = item_id
521 item_elt.addChild(payload) 511 item_elt.addChild(payload)
522 d = defer.ensureDeferred(self.sendItems( 512 published_ids = await self.sendItems(
523 client, 513 client,
524 service, 514 service,
525 nodeIdentifier, 515 nodeIdentifier,
526 [item_elt], 516 [item_elt],
527 extra 517 extra
528 )) 518 )
529 d.addCallback(self._getPublishedItemId, item_id) 519 try:
530 return d 520 return published_ids[0]
521 except IndexError:
522 return item_id
531 523
532 async def sendItems(self, client, service, nodeIdentifier, items, extra=None): 524 async def sendItems(self, client, service, nodeIdentifier, items, extra=None):
533 """High level method to send several items at once 525 """High level method to send several items at once
534 526
535 @param service(jid.JID, None): service to send the item to 527 @param service(jid.JID, None): service to send the item to
591 return [item['id'] 583 return [item['id']
592 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] 584 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
593 except AttributeError: 585 except AttributeError:
594 return [] 586 return []
595 587
596 def publish(self, client, service, nodeIdentifier, items=None, options=None): 588 async def publish(self, client, service, nodeIdentifier, items=None, options=None):
597 return client.pubsub_client.publish( 589 return await client.pubsub_client.publish(
598 service, nodeIdentifier, items, client.pubsub_client.parent.jid, 590 service, nodeIdentifier, items, client.pubsub_client.parent.jid,
599 options=options 591 options=options
600 ) 592 )
601 593
602 def _unwrapMAMMessage(self, message_elt): 594 def _unwrapMAMMessage(self, message_elt):
628 """ 620 """
629 client = self.host.getClient(profile_key) 621 client = self.host.getClient(profile_key)
630 service = jid.JID(service) if service else None 622 service = jid.JID(service) if service else None
631 max_items = None if max_items == C.NO_LIMIT else max_items 623 max_items = None if max_items == C.NO_LIMIT else max_items
632 extra = self.parseExtra(extra_dict) 624 extra = self.parseExtra(extra_dict)
633 d = self.getItems( 625 d = defer.ensureDeferred(self.getItems(
634 client, 626 client,
635 service, 627 service,
636 node or None, 628 node or None,
637 max_items or None, 629 max_items or None,
638 item_ids, 630 item_ids,
639 sub_id or None, 631 sub_id or None,
640 extra.rsm_request, 632 extra.rsm_request,
641 extra.extra, 633 extra.extra,
642 ) 634 ))
643 d.addCallback(self.transItemsData) 635 d.addCallback(self.transItemsData)
644 d.addCallback(self.serialiseItems) 636 d.addCallback(self.serialiseItems)
645 return d 637 return d
646 638
647 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, 639 async def getItems(
648 rsm_request=None, extra=None): 640 self,
641 client: SatXMPPEntity,
642 service: Optional[jid.JID],
643 node: str,
644 max_items: Optional[int] = None,
645 item_ids: Optional[List[str]] = None,
646 sub_id: Optional[str] = None,
647 rsm_request: Optional[rsm.RSMRequest] = None,
648 extra: Optional[dict] = None
649 ) -> Tuple[List[dict], dict]:
649 """Retrieve pubsub items from a node. 650 """Retrieve pubsub items from a node.
650 651
651 @param service (JID, None): pubsub service. 652 @param service (JID, None): pubsub service.
652 @param node (str): node id. 653 @param node (str): node id.
653 @param max_items (int): optional limit on the number of retrieved items. 654 @param max_items (int): optional limit on the number of retrieved items.
680 itemIdentifiers = item_ids, 681 itemIdentifiers = item_ids,
681 orderBy = extra.get(C.KEY_ORDER_BY), 682 orderBy = extra.get(C.KEY_ORDER_BY),
682 rsm_request = rsm_request 683 rsm_request = rsm_request
683 ) 684 )
684 # we have no MAM data here, so we add None 685 # we have no MAM data here, so we add None
685 d.addCallback(lambda data: data + (None,))
686 d.addErrback(sat_defer.stanza2NotFound) 686 d.addErrback(sat_defer.stanza2NotFound)
687 d.addTimeout(TIMEOUT, reactor) 687 d.addTimeout(TIMEOUT, reactor)
688 items, rsm_response = await d
689 mam_response = None
688 else: 690 else:
689 # if mam is requested, we have to do a totally different query 691 # if mam is requested, we have to do a totally different query
690 if self._mam is None: 692 if self._mam is None:
691 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") 693 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
692 if max_items is not None: 694 if max_items is not None:
704 else: 706 else:
705 if mam_query.rsm != rsm_request: 707 if mam_query.rsm != rsm_request:
706 raise exceptions.DataError( 708 raise exceptions.DataError(
707 "Conflict between RSM request and MAM's RSM request" 709 "Conflict between RSM request and MAM's RSM request"
708 ) 710 )
709 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) 711 items, rsm_response, mam_response = await self._mam.getArchives(
712 client, mam_query, service, self._unwrapMAMMessage
713 )
710 714
711 try: 715 try:
712 subscribe = C.bool(extra["subscribe"]) 716 subscribe = C.bool(extra["subscribe"])
713 except KeyError: 717 except KeyError:
714 subscribe = False 718 subscribe = False
715 719
716 def subscribeEb(failure, service, node): 720 if subscribe:
717 failure.trap(error.StanzaError) 721 try:
718 log.warning( 722 await self.subscribe(client, service, node)
719 "Could not subscribe to node {} on service {}: {}".format( 723 except error.StanzaError as e:
720 node, str(service), str(failure.value) 724 log.warning(
725 f"Could not subscribe to node {node} on service {service}: {e}"
721 ) 726 )
722 ) 727
723 728 # TODO: handle mam_response
724 def doSubscribe(data): 729 service_jid = service if service else client.jid.userhostJID()
725 self.subscribe(client, service, node).addErrback( 730 metadata = {
726 subscribeEb, service, node 731 "service": service_jid,
727 ) 732 "node": node,
728 return data 733 "uri": self.getNodeURI(service_jid, node),
729 734 }
730 if subscribe: 735 if mam_response is not None:
731 d.addCallback(doSubscribe) 736 # mam_response is a dict with "complete" and "stable" keys
732 737 # we can put them directly in metadata
733 def addMetadata(result): 738 metadata.update(mam_response)
734 # TODO: handle the third argument (mam_response) 739 if rsm_request is not None and rsm_response is not None:
735 items, rsm_response, mam_response = result 740 metadata['rsm'] = rsm_response.toDict()
736 service_jid = service if service else client.jid.userhostJID() 741 if mam_response is None:
737 metadata = { 742 index = rsm_response.index
738 "service": service_jid, 743 count = rsm_response.count
739 "node": node, 744 if index is None or count is None:
740 "uri": self.getNodeURI(service_jid, node), 745 # we don't have enough information to know if the data is complete
741 } 746 # or not
742 if mam_response is not None: 747 metadata["complete"] = None
743 # mam_response is a dict with "complete" and "stable" keys 748 else:
744 # we can put them directly in metadata 749 # normally we have a strict equality here but XEP-0059 states
745 metadata.update(mam_response) 750 # that index MAY be approximative, so just in case…
746 if rsm_request is not None and rsm_response is not None: 751 metadata["complete"] = index + len(items) >= count
747 metadata['rsm'] = rsm_response.toDict() 752
748 if mam_response is None: 753 return (items, metadata)
749 index = rsm_response.index
750 count = rsm_response.count
751 if index is None or count is None:
752 # we don't have enough information to know if the data is complete
753 # or not
754 metadata["complete"] = None
755 else:
756 # normally we have a strict equality here but XEP-0059 states
757 # that index MAY be approximative, so just in case…
758 metadata["complete"] = index + len(items) >= count
759
760 return (items, metadata)
761
762 d.addCallback(addMetadata)
763 return d
764 754
765 # @defer.inlineCallbacks 755 # @defer.inlineCallbacks
766 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): 756 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
767 # """Massively retrieve pubsub items from many nodes. 757 # """Massively retrieve pubsub items from many nodes.
768 # @param service (JID): target service. 758 # @param service (JID): target service.
1098 await self.retractItems(client, service, node, [item_id]) 1088 await self.retractItems(client, service, node, [item_id])
1099 1089
1100 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 1090 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
1101 client = self.host.getClient(profile_key) 1091 client = self.host.getClient(profile_key)
1102 service = None if not service else jid.JID(service) 1092 service = None if not service else jid.JID(service)
1103 d = self.subscribe(client, service, nodeIdentifier, options=options or None) 1093 d = defer.ensureDeferred(
1094 self.subscribe(client, service, nodeIdentifier, options=options or None)
1095 )
1104 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") 1096 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
1105 return d 1097 return d
1106 1098
1107 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): 1099 async def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
1108 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 1100 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
1109 return client.pubsub_client.subscribe( 1101 return await client.pubsub_client.subscribe(
1110 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options 1102 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options
1111 ) 1103 )
1112 1104
1113 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 1105 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
1114 client = self.host.getClient(profile_key) 1106 client = self.host.getClient(profile_key)
1115 service = None if not service else jid.JID(service) 1107 service = None if not service else jid.JID(service)
1116 return self.unsubscribe(client, service, nodeIdentifier) 1108 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
1117 1109
1118 def unsubscribe( 1110 async def unsubscribe(
1119 self, 1111 self,
1120 client, 1112 client,
1121 service, 1113 service,
1122 nodeIdentifier, 1114 nodeIdentifier,
1123 sub_jid=None, 1115 sub_jid=None,
1124 subscriptionIdentifier=None, 1116 subscriptionIdentifier=None,
1125 sender=None, 1117 sender=None,
1126 ): 1118 ):
1127 return client.pubsub_client.unsubscribe( 1119 return await client.pubsub_client.unsubscribe(
1128 service, 1120 service,
1129 nodeIdentifier, 1121 nodeIdentifier,
1130 sub_jid or client.jid.userhostJID(), 1122 sub_jid or client.jid.userhostJID(),
1131 subscriptionIdentifier, 1123 subscriptionIdentifier,
1132 sender, 1124 sender,
1392 @return (str): RT Deferred session id 1384 @return (str): RT Deferred session id
1393 """ 1385 """
1394 client = self.host.getClient(profile_key) 1386 client = self.host.getClient(profile_key)
1395 deferreds = {} 1387 deferreds = {}
1396 for service, node in node_data: 1388 for service, node in node_data:
1397 deferreds[(service, node)] = client.pubsub_client.subscribe( 1389 deferreds[(service, node)] = defer.ensureDeferred(
1398 service, node, subscriber, options=options 1390 client.pubsub_client.subscribe(
1391 service, node, subscriber, options=options
1392 )
1399 ) 1393 )
1400 return self.rt_sessions.newSession(deferreds, client.profile) 1394 return self.rt_sessions.newSession(deferreds, client.profile)
1401 # found_nodes = yield self.listNodes(service, profile=client.profile) 1395 # found_nodes = yield self.listNodes(service, profile=client.profile)
1402 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 1396 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
1403 # d_list = [] 1397 # d_list = []
1473 @return (str): RT Deferred session id 1467 @return (str): RT Deferred session id
1474 """ 1468 """
1475 client = self.host.getClient(profile_key) 1469 client = self.host.getClient(profile_key)
1476 deferreds = {} 1470 deferreds = {}
1477 for service, node in node_data: 1471 for service, node in node_data:
1478 deferreds[(service, node)] = self.getItems( 1472 deferreds[(service, node)] = defer.ensureDeferred(self.getItems(
1479 client, service, node, max_item, rsm_request=rsm_request, extra=extra 1473 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1480 ) 1474 ))
1481 return self.rt_sessions.newSession(deferreds, client.profile) 1475 return self.rt_sessions.newSession(deferreds, client.profile)
1482 1476
1483 1477
1484 @implementer(disco.IDisco) 1478 @implementer(disco.IDisco)
1485 class SatPubSubClient(rsm.PubSubClient): 1479 class SatPubSubClient(rsm.PubSubClient):