Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 419:794593086517
backend: publish-options implementation:
- removed some old code
- new ConstraintFailed exception
- publishing options implementation, following XEP-0060 §7.1.5
- first use of async/await syntax, used to simplify "publish" method
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Dec 2019 19:56:47 +0100 |
parents | 4179ed660a85 |
children | c21f31355ab9 |
comparison
equal
deleted
inserted
replaced
418:89736353f6be | 419:794593086517 |
---|---|
1 #!/usr/bin/env python3 | 1 #!/usr/bin/env python3 |
2 #-*- coding: utf-8 -*- | |
3 # | 2 # |
4 # Copyright (c) 2012-2019 Jérôme Poisson | 3 # Copyright (c) 2012-2019 Jérôme Poisson |
5 # Copyright (c) 2013-2016 Adrien Cossa | 4 # Copyright (c) 2013-2016 Adrien Cossa |
6 # Copyright (c) 2003-2011 Ralph Meijer | 5 # Copyright (c) 2003-2011 Ralph Meijer |
7 | 6 |
68 | 67 |
69 from twisted.application import service | 68 from twisted.application import service |
70 from twisted.python import components, log | 69 from twisted.python import components, log |
71 from twisted.internet import defer, reactor | 70 from twisted.internet import defer, reactor |
72 from twisted.words.protocols.jabber.error import StanzaError | 71 from twisted.words.protocols.jabber.error import StanzaError |
73 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat | |
74 from twisted.words.xish import domish, utility | 72 from twisted.words.xish import domish, utility |
75 | 73 |
76 from wokkel import disco | 74 from wokkel import disco |
77 from wokkel import data_form | 75 from wokkel import data_form |
78 from wokkel import rsm | 76 from wokkel import rsm |
207 """Return True if an entity is an administrator""" | 205 """Return True if an entity is an administrator""" |
208 return entity_jid.userhostJID() in self.admins | 206 return entity_jid.userhostJID() in self.admins |
209 | 207 |
210 def supportsPublishOptions(self): | 208 def supportsPublishOptions(self): |
211 return True | 209 return True |
210 | |
212 def supportsPublisherAffiliation(self): | 211 def supportsPublisherAffiliation(self): |
213 return True | 212 return True |
214 | 213 |
215 def supportsGroupBlog(self): | 214 def supportsGroupBlog(self): |
216 return True | 215 return True |
282 def checkSubscription(subscribed): | 281 def checkSubscription(subscribed): |
283 if not subscribed: | 282 if not subscribed: |
284 raise error.Forbidden() | 283 raise error.Forbidden() |
285 return (affiliation, node) | 284 return (affiliation, node) |
286 | 285 |
287 d.addCallback(lambda ignore: node.isSubscribed(requestor)) | 286 d.addCallback(lambda __: node.isSubscribed(requestor)) |
288 d.addCallback(checkSubscription) | 287 d.addCallback(checkSubscription) |
289 elif publish_model != const.VAL_PMODEL_OPEN: | 288 elif publish_model != const.VAL_PMODEL_OPEN: |
290 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open. | 289 # publish_model must be publishers (default), subscribers or open. |
290 raise ValueError('Unexpected value') | |
291 | 291 |
292 return d | 292 return d |
293 | 293 |
294 d = node.getAffiliation(requestor) | 294 d = node.getAffiliation(requestor) |
295 d.addCallback(check) | 295 d.addCallback(check) |
393 | 393 |
394 d = node.getItemsPublishers(itemIdentifiers) | 394 d = node.getItemsPublishers(itemIdentifiers) |
395 d.addCallback(doCheck) | 395 d.addCallback(doCheck) |
396 return d | 396 return d |
397 | 397 |
398 def publish(self, nodeIdentifier, items, requestor, pep, recipient): | 398 async def publish(self, nodeIdentifier, items, requestor, options, pep, recipient): |
399 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 399 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
400 d.addCallback(self._checkAuth, requestor) | 400 affiliation, node = await self._checkAuth(node, requestor) |
401 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. | 401 |
402 #FIXME: in addition, there can be several owners: that is not managed yet | |
403 d.addCallback(self._doPublish, items, requestor, pep, recipient) | |
404 return d | |
405 | |
406 @defer.inlineCallbacks | |
407 def _doPublish(self, result, items, requestor, pep, recipient): | |
408 affiliation, node = result | |
409 if node.nodeType == 'collection': | 402 if node.nodeType == 'collection': |
410 raise error.NoPublishing() | 403 raise error.NoPublishing() |
411 | 404 |
412 configuration = node.getConfiguration() | 405 configuration = node.getConfiguration() |
413 persistItems = configuration[const.OPT_PERSIST_ITEMS] | 406 persistItems = configuration[const.OPT_PERSIST_ITEMS] |
414 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] | 407 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] |
408 | |
409 # we check that publish-options are satisfied | |
410 for field, value in options.items(): | |
411 try: | |
412 current_value = configuration[field] | |
413 except KeyError: | |
414 raise error.ConstraintFailed( | |
415 f"publish-options {field!r} doesn't exist in node configuration" | |
416 ) | |
417 if current_value != value: | |
418 raise error.ConstraintFailed( | |
419 f"configuration field {field!r} has a value of {current_value!r} " | |
420 f"which doesn't fit publish-options expected {value!r}" | |
421 ) | |
415 | 422 |
416 if items and not persistItems and not deliverPayloads: | 423 if items and not persistItems and not deliverPayloads: |
417 raise error.ItemForbidden() | 424 raise error.ItemForbidden() |
418 elif not items and (persistItems or deliverPayloads): | 425 elif not items and (persistItems or deliverPayloads): |
419 raise error.ItemRequired() | 426 raise error.ItemRequired() |
426 item['publisher'] = requestor.full() | 433 item['publisher'] = requestor.full() |
427 if persistItems or deliverPayloads: | 434 if persistItems or deliverPayloads: |
428 item.uri = None | 435 item.uri = None |
429 item.defaultUri = None | 436 item.defaultUri = None |
430 if not item.getAttribute("id"): | 437 if not item.getAttribute("id"): |
431 item["id"] = yield node.getNextId() | 438 item["id"] = await node.getNextId() |
432 new_item = True | 439 new_item = True |
433 if ret_payload is None: | 440 if ret_payload is None: |
434 ret_pubsub_elt = domish.Element((pubsub.NS_PUBSUB, 'pubsub')) | 441 ret_pubsub_elt = domish.Element((pubsub.NS_PUBSUB, 'pubsub')) |
435 ret_publish_elt = ret_pubsub_elt.addElement('publish') | 442 ret_publish_elt = ret_pubsub_elt.addElement('publish') |
436 ret_publish_elt['node'] = node.nodeIdentifier | 443 ret_publish_elt['node'] = node.nodeIdentifier |
454 itemIdentifiers = [item['id'] for item in items | 461 itemIdentifiers = [item['id'] for item in items |
455 if item.getAttribute('id')] | 462 if item.getAttribute('id')] |
456 | 463 |
457 if affiliation == 'owner' or self.isAdmin(requestor): | 464 if affiliation == 'owner' or self.isAdmin(requestor): |
458 if configuration[const.OPT_CONSISTENT_PUBLISHER]: | 465 if configuration[const.OPT_CONSISTENT_PUBLISHER]: |
459 pub_map = yield node.getItemsPublishers(itemIdentifiers) | 466 pub_map = await node.getItemsPublishers(itemIdentifiers) |
460 if pub_map: | 467 if pub_map: |
461 # if we have existing items, we replace publishers with | 468 # if we have existing items, we replace publishers with |
462 # original one to stay consistent | 469 # original one to stay consistent |
463 publishers = set(pub_map.values()) | 470 publishers = set(pub_map.values()) |
464 if len(publishers) != 1: | 471 if len(publishers) != 1: |
474 for item in items: | 481 for item in items: |
475 item['publisher'] = requestor.full() | 482 item['publisher'] = requestor.full() |
476 else: | 483 else: |
477 # we don't want a publisher to overwrite the item | 484 # we don't want a publisher to overwrite the item |
478 # of an other publisher | 485 # of an other publisher |
479 yield self._checkOverwrite(node, itemIdentifiers, requestor) | 486 await self._checkOverwrite(node, itemIdentifiers, requestor) |
480 | 487 |
481 # TODO: check conflict and recalculate max id if serial_ids is set | 488 # TODO: check conflict and recalculate max id if serial_ids is set |
482 yield node.storeItems(items_data, requestor) | 489 await node.storeItems(items_data, requestor) |
483 | 490 |
484 yield self._doNotify(node, items_data, deliverPayloads, pep, recipient) | 491 self._doNotify(node, items_data, deliverPayloads, pep, recipient) |
485 defer.returnValue(ret_payload) | 492 return ret_payload |
486 | 493 |
487 def _doNotify(self, node, items_data, deliverPayloads, pep, recipient): | 494 def _doNotify(self, node, items_data, deliverPayloads, pep, recipient): |
488 if items_data and not deliverPayloads: | 495 if items_data and not deliverPayloads: |
489 for item_data in items_data: | 496 for item_data in items_data: |
490 item_data.item.children = [] | 497 item_data.item.children = [] |
622 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) | 629 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) |
623 | 630 |
624 def supportsAutoCreate(self): | 631 def supportsAutoCreate(self): |
625 return True | 632 return True |
626 | 633 |
627 def supportsCreatorCheck(self): | |
628 return True | |
629 | |
630 def supportsInstantNodes(self): | 634 def supportsInstantNodes(self): |
631 return True | 635 return True |
632 | 636 |
633 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None): | 637 def createNode(self, nodeIdentifier, requestor, options=None, pep=False, recipient=None): |
634 if not nodeIdentifier: | 638 if not nodeIdentifier: |
635 nodeIdentifier = 'generic/%s' % uuid.uuid4() | 639 nodeIdentifier = 'generic/%s' % uuid.uuid4() |
636 | 640 |
637 if not options: | 641 if not options: |
638 options = {} | 642 options = {} |
639 | |
640 # if self.supportsCreatorCheck(): | |
641 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) | |
642 # try: | |
643 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) | |
644 # except InvalidFormat: | |
645 # is_user_jid = False | |
646 # else: | |
647 # is_user_jid = bool(nodeIdentifierJID.user) | |
648 | |
649 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): | |
650 # #we have an user jid node, but not created by the owner of this jid | |
651 # print "Wrong creator" | |
652 # raise error.Forbidden() | |
653 | 643 |
654 nodeType = 'leaf' | 644 nodeType = 'leaf' |
655 config = self.storage.getDefaultConfiguration(nodeType) | 645 config = self.storage.getDefaultConfiguration(nodeType) |
656 config['pubsub#node_type'] = nodeType | 646 config['pubsub#node_type'] = nodeType |
657 config.update(options) | 647 config.update(options) |
1307 'unsupported', | 1297 'unsupported', |
1308 'collections'), | 1298 'collections'), |
1309 error.NoPublishing: ('feature-not-implemented', | 1299 error.NoPublishing: ('feature-not-implemented', |
1310 'unsupported', | 1300 'unsupported', |
1311 'publish'), | 1301 'publish'), |
1302 error.ConstraintFailed: ('conflict', 'precondition-not-met', None), | |
1312 } | 1303 } |
1313 | 1304 |
1314 def __init__(self, backend): | 1305 def __init__(self, backend): |
1315 pubsub.PubSubResource.__init__(self) | 1306 pubsub.PubSubResource.__init__(self) |
1316 | 1307 |
1319 | 1310 |
1320 self.backend.registerPublishNotifier(self._notifyPublish) | 1311 self.backend.registerPublishNotifier(self._notifyPublish) |
1321 self.backend.registerRetractNotifier(self._notifyRetract) | 1312 self.backend.registerRetractNotifier(self._notifyRetract) |
1322 self.backend.registerPreDelete(self._preDelete) | 1313 self.backend.registerPreDelete(self._preDelete) |
1323 | 1314 |
1324 # FIXME: to be removed, it's not useful anymore as PEP is now used | |
1325 # if self.backend.supportsCreatorCheck(): | |
1326 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to | |
1327 # a jid in this server) is created by the right jid | |
1328 | |
1329 if self.backend.supportsAutoCreate(): | 1315 if self.backend.supportsAutoCreate(): |
1330 self.features.append("auto-create") | 1316 self.features.append("auto-create") |
1331 | 1317 |
1332 if self.backend.supportsPublishOptions(): | 1318 if self.backend.supportsPublishOptions(): |
1333 self.features.append("publish-options") | 1319 self.features.append("publish-options") |
1345 self.features.append("publisher-affiliation") | 1331 self.features.append("publisher-affiliation") |
1346 | 1332 |
1347 if self.backend.supportsGroupBlog(): | 1333 if self.backend.supportsGroupBlog(): |
1348 self.features.append("groupblog") | 1334 self.features.append("groupblog") |
1349 | 1335 |
1350 | 1336 # XXX: this feature is not really described in XEP-0060, we just can see it in |
1351 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples | 1337 # examples but it's necessary for microblogging comments (see XEP-0277) |
1352 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) | 1338 if self.backend.supportsPublishModel(): |
1339 self.features.append("publish_model") | |
1353 | 1340 |
1354 def getFullItem(self, item_data): | 1341 def getFullItem(self, item_data): |
1355 """ Attach item configuration to this item | 1342 """ Attach item configuration to this item |
1356 | 1343 |
1357 Used to give item configuration back to node's owner (and *only* to owner) | 1344 Used to give item configuration back to node's owner (and *only* to owner) |
1605 def _publish_errb(self, failure, request): | 1592 def _publish_errb(self, failure, request): |
1606 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): | 1593 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): |
1607 print("Auto-creating node %s" % (request.nodeIdentifier,)) | 1594 print("Auto-creating node %s" % (request.nodeIdentifier,)) |
1608 d = self.backend.createNode(request.nodeIdentifier, | 1595 d = self.backend.createNode(request.nodeIdentifier, |
1609 request.sender, | 1596 request.sender, |
1597 request.options, | |
1610 pep=self._isPep(request), | 1598 pep=self._isPep(request), |
1611 recipient=request.recipient) | 1599 recipient=request.recipient) |
1612 d.addCallback(lambda ignore, | 1600 d.addCallback( |
1613 request: self.backend.publish(request.nodeIdentifier, | 1601 lambda __, request: defer.ensureDeferred(self.backend.publish( |
1614 request.items, | 1602 request.nodeIdentifier, |
1615 request.sender, | 1603 request.items, |
1616 self._isPep(request), | 1604 request.sender, |
1617 request.recipient, | 1605 request.options, |
1618 ), | 1606 self._isPep(request), |
1619 request) | 1607 request.recipient, |
1608 )), | |
1609 request, | |
1610 ) | |
1620 return d | 1611 return d |
1621 | 1612 |
1622 return failure | 1613 return failure |
1623 | 1614 |
1624 def _isPep(self, request): | 1615 def _isPep(self, request): |
1626 return request.delegated | 1617 return request.delegated |
1627 except AttributeError: | 1618 except AttributeError: |
1628 return False | 1619 return False |
1629 | 1620 |
1630 def publish(self, request): | 1621 def publish(self, request): |
1631 d = self.backend.publish(request.nodeIdentifier, | 1622 d = defer.ensureDeferred(self.backend.publish( |
1632 request.items, | 1623 request.nodeIdentifier, |
1633 request.sender, | 1624 request.items, |
1634 self._isPep(request), | 1625 request.sender, |
1635 request.recipient) | 1626 request.options, |
1627 self._isPep(request), | |
1628 request.recipient | |
1629 )) | |
1636 d.addErrback(self._publish_errb, request) | 1630 d.addErrback(self._publish_errb, request) |
1637 return d.addErrback(self._mapErrors) | 1631 return d.addErrback(self._mapErrors) |
1638 | 1632 |
1639 def subscribe(self, request): | 1633 def subscribe(self, request): |
1640 d = self.backend.subscribe(request.nodeIdentifier, | 1634 d = self.backend.subscribe(request.nodeIdentifier, |