comparison sat_pubsub/pgsql_storage.py @ 318:d13526c0eb32

RSM improvments/refactoring: - a warning message is displayed if maxItems == 0 in getItems, and an empty list is returned in this case - use the new container.ItemData instead of doing tuple (un)packing - the list of ItemData => list of domish.Element conversion is done at the end of the workflow - rsm request is checked in self._items_rsm directly - better handling of Response.index in _items_rsm - itemsIdentifiers can't be used with RSM (the later will be ignored if this happen) - don't do approximative unpacking anymore in _items_rsm - countItems and getIndex have been refactored and renamed getItemsCount and getItemsIndex, don't use query duplications anymore - cleaned query handling in getItems - /!\ mam module is temporarly broken
author Goffi <goffi@goffi.org>
date Sun, 03 Jan 2016 18:33:22 +0100
parents 34adc4a8aa64
children a51947371625
comparison
equal deleted inserted replaced
317:34adc4a8aa64 318:d13526c0eb32
600 nodeType = 'leaf' 600 nodeType = 'leaf'
601 601
602 def storeItems(self, item_data, publisher): 602 def storeItems(self, item_data, publisher):
603 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) 603 return self.dbpool.runInteraction(self._storeItems, item_data, publisher)
604 604
605
606 def _storeItems(self, cursor, items_data, publisher): 605 def _storeItems(self, cursor, items_data, publisher):
607 self._checkNodeExists(cursor) 606 self._checkNodeExists(cursor)
608 for item_data in items_data: 607 for item_data in items_data:
609 self._storeItem(cursor, item_data, publisher) 608 self._storeItem(cursor, item_data, publisher)
610
611 609
612 def _storeItem(self, cursor, item_data, publisher): 610 def _storeItem(self, cursor, item_data, publisher):
613 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config 611 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
614 data = item.toXml() 612 data = item.toXml()
615 613
663 VALUES (%s, %s)""", (item_id, category)) 661 VALUES (%s, %s)""", (item_id, category))
664 662
665 def removeItems(self, itemIdentifiers): 663 def removeItems(self, itemIdentifiers):
666 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) 664 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
667 665
668
669 def _removeItems(self, cursor, itemIdentifiers): 666 def _removeItems(self, cursor, itemIdentifiers):
670 self._checkNodeExists(cursor) 667 self._checkNodeExists(cursor)
671 668
672 deleted = [] 669 deleted = []
673 670
681 if cursor.rowcount: 678 if cursor.rowcount:
682 deleted.append(itemIdentifier) 679 deleted.append(itemIdentifier)
683 680
684 return deleted 681 return deleted
685 682
686
687 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None): 683 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None):
688 """ Get all authorised items 684 """ Get all authorised items
689 @param authorized_groups: we want to get items that these groups can access 685 @param authorized_groups: we want to get items that these groups can access
690 @param unrestricted: if true, don't check permissions (i.e.: get all items) 686 @param unrestricted: if true, don't check permissions (i.e.: get all items)
691 @param maxItems: nb of items we want to tget 687 @param maxItems: nb of items we want to get
692 @param rsm_data: options for RSM feature handling (XEP-0059) as a 688 @param ext_data: options for extra features like RSM and MAM
693 dictionnary of C{unicode} to C{unicode}.
694 689
695 @return: list of container.ItemData 690 @return: list of container.ItemData
696 if unrestricted is False, access_model and config will be None 691 if unrestricted is False, access_model and config will be None
697 """ 692 """
698 if ext_data is None: 693 if ext_data is None:
699 ext_data = {} 694 ext_data = {}
700 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) 695 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data)
701 696
702 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): 697 def _appendSourcesAndFilters(self, query, args, authorized_groups, unrestricted, ext_data):
703 # FIXME: simplify the query construction 698 """append sources and filters to sql query requesting items and return ORDER BY
704 self._checkNodeExists(cursor) 699
705 700 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for
701 _getItems
702 """
703 # SOURCES
706 if unrestricted: 704 if unrestricted:
707 query = ["SELECT data,items.access_model,item_id"] 705 query.append("""FROM nodes
708 source = """FROM nodes 706 INNER JOIN items USING (node_id)
709 INNER JOIN items USING (node_id) 707 WHERE node_id=%s""")
710 WHERE node_id=%s""" 708 args.append(self.nodeDbId)
711 args = [self.nodeDbId]
712 else: 709 else:
713 query = ["SELECT data"] 710 args.append(self.nodeDbId)
714 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else ""
715 source = """FROM nodes
716 INNER JOIN items USING (node_id)
717 LEFT JOIN item_groups_authorized USING (item_id)
718 WHERE node_id=%s AND
719 (items.access_model='open'""" + groups + ")"
720
721 args = [self.nodeDbId]
722 if authorized_groups: 711 if authorized_groups:
712 get_groups = " or (items.access_model='roster' and groupname in %s)"
723 args.append(authorized_groups) 713 args.append(authorized_groups)
724 714 else:
715 get_groups = ""
716
717 query.append("""FROM nodes
718 INNER JOIN items USING (node_id)
719 LEFT JOIN item_groups_authorized USING (item_id)
720 WHERE node_id=%s AND
721 (items.access_model='open'""" + get_groups + ")")
722
723 # FILTERS
725 if 'filters' in ext_data: # MAM filters 724 if 'filters' in ext_data: # MAM filters
726 for filter_ in ext_data['filters']: 725 for filter_ in ext_data['filters']:
727 if filter_.var == 'start': 726 if filter_.var == 'start':
728 source += " AND date>=%s" 727 query.append("AND date>=%s")
729 args.append(filter_.value) 728 args.append(filter_.value)
730 if filter_.var == 'end': 729 if filter_.var == 'end':
731 source += " AND date<=%s" 730 query.append("AND date<=%s")
732 args.append(filter_.value) 731 args.append(filter_.value)
733 if filter_.var == 'with': 732 if filter_.var == 'with':
734 jid_s = filter_.value 733 jid_s = filter_.value
735 if '/' in jid_s: 734 if '/' in jid_s:
736 source += " AND publisher=%s" 735 query.append("AND publisher=%s")
737 args.append(filter_.value) 736 args.append(filter_.value)
738 else: 737 else:
739 source += " AND publisher LIKE %s" 738 query.append("AND publisher LIKE %s")
740 args.append(u"{}%".format(filter_.value)) 739 args.append(u"{}%".format(filter_.value))
741 740
742 query.append(source) 741 return "ORDER BY date DESC"
743 order = "DESC" 742
743 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data):
744 self._checkNodeExists(cursor)
745
746 if maxItems == 0:
747 return []
748
749 args = []
750
751 # SELECT
752 query = ["SELECT data,items.access_model,item_id,date"]
753
754 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
744 755
745 if 'rsm' in ext_data: 756 if 'rsm' in ext_data:
746 rsm = ext_data['rsm'] 757 rsm = ext_data['rsm']
747 maxItems = rsm.max 758 maxItems = rsm.max
748 if rsm.index is not None: 759 if rsm.index is not None:
749 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") 760 # We need to know the date of corresponding to the index (offset) of the current query
750 # FIXME: change the request so source is not used 2 times 761 # so we execute the query to look for the date
751 # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args 762 tmp_query = query[:]
752 args.append(self.nodeDbId) 763 tmp_args = args[:]
753 if authorized_groups: 764 tmp_query[0] = "SELECT date"
754 args.append(authorized_groups) 765 tmp_query.append("{} LIMIT 1 OFFSET %s".format(query_order))
755 args.append(rsm.index) 766 tmp_args.append(rsm.index)
767 cursor.execute(' '.join(query), args)
768 # FIXME: bad index is not managed yet
769 date = cursor.fetchall()[0][0]
770
771 # now that we have the date, we can use it
772 query.append("AND date<=%s")
773 args.append(date)
756 elif rsm.before is not None: 774 elif rsm.before is not None:
757 order = "ASC"
758 if rsm.before != '': 775 if rsm.before != '':
759 query.append("AND date>(SELECT date FROM items WHERE item=%s LIMIT 1)") 776 query.append("AND date>(SELECT date FROM items WHERE item=%s LIMIT 1)")
760 args.append(rsm.before) 777 args.append(rsm.before)
778 if maxItems is not None:
779 # if we have maxItems (i.e. a limit), we need to reverse order
780 # in a first query to get the right items
781 query.insert(0,"SELECT * from (")
782 query.append("ORDER BY date ASC LIMIT %s) as x")
783 args.append(maxItems)
761 elif rsm.after: 784 elif rsm.after:
762 query.append("AND date<(SELECT date FROM items WHERE item=%s LIMIT 1)") 785 query.append("AND date<(SELECT date FROM items WHERE item=%s LIMIT 1)")
763 args.append(rsm.after) 786 args.append(rsm.after)
764 787
765 query.append("ORDER BY date %s" % order) 788 query.append(query_order)
766 789
767 if maxItems is not None: 790 if maxItems is not None:
768 query.append("LIMIT %s") 791 query.append("LIMIT %s")
769 args.append(maxItems) 792 args.append(maxItems)
770 793
771 cursor.execute(' '.join(query), args) 794 cursor.execute(' '.join(query), args)
772 795
773 result = cursor.fetchall() 796 result = cursor.fetchall()
774 if unrestricted: 797 if unrestricted:
798 # with unrestricted query, we need to fill the access_list for a roster access items
775 ret = [] 799 ret = []
776 for data in result: 800 for data in result:
777 item = generic.stripNamespace(parseXml(data[0])) 801 item = generic.stripNamespace(parseXml(data[0]))
778 access_model = data[1] 802 access_model = data[1]
779 item_id = data[2] 803 item_id = data[2]
782 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 806 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
783 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] 807 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
784 808
785 ret.append(container.ItemData(item, access_model, access_list)) 809 ret.append(container.ItemData(item, access_model, access_list))
786 return ret 810 return ret
811
787 items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result] 812 items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result]
788 return items_data 813 return items_data
789
790 def countItems(self, authorized_groups, unrestricted):
791 """ Count the accessible items.
792
793 @param authorized_groups: we want to get items that these groups can access.
794 @param unrestricted: if true, don't check permissions (i.e.: get all items).
795 @return: deferred that fires a C{int}.
796 """
797 return self.dbpool.runInteraction(self._countItems, authorized_groups, unrestricted)
798
799 def _countItems(self, cursor, authorized_groups, unrestricted):
800 # FIXME: should not be a separate method, but should be an option of getItems instead
801 self._checkNodeExists(cursor)
802
803 if unrestricted:
804 query = ["""SELECT count(item_id) FROM nodes
805 INNER JOIN items USING (node_id)
806 WHERE node_id=%s"""]
807 args = [self.nodeDbId]
808 else:
809 query = ["""SELECT count(item_id) FROM nodes
810 INNER JOIN items USING (node_id)
811 LEFT JOIN item_groups_authorized USING (item_id)
812 WHERE node_id=%s AND
813 (items.access_model='open' """ +
814 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
815 ")"]
816
817 args = [self.nodeDbId]
818 if authorized_groups:
819 args.append(authorized_groups)
820
821 cursor.execute(' '.join(query), args)
822 return cursor.fetchall()[0][0]
823
824 def getIndex(self, authorized_groups, unrestricted, item):
825 """ Retrieve the index of the given item within the accessible window.
826
827 @param authorized_groups: we want to get items that these groups can access.
828 @param unrestricted: if true, don't check permissions (i.e.: get all items).
829 @param item: item identifier.
830 @return: deferred that fires a C{int}.
831 """
832 return self.dbpool.runInteraction(self._getIndex, authorized_groups, unrestricted, item)
833
834 def _getIndex(self, cursor, authorized_groups, unrestricted, item):
835 self._checkNodeExists(cursor)
836
837 if unrestricted:
838 query = ["""SELECT row_number FROM (
839 SELECT row_number() OVER (ORDER BY date DESC), item
840 FROM nodes INNER JOIN items USING (node_id)
841 WHERE node_id=%s
842 ) as x
843 WHERE item=%s LIMIT 1"""]
844 args = [self.nodeDbId]
845 else:
846 query = ["""SELECT row_number FROM (
847 SELECT row_number() OVER (ORDER BY date DESC), item
848 FROM nodes INNER JOIN items USING (node_id)
849 LEFT JOIN item_groups_authorized USING (item_id)
850 WHERE node_id=%s AND
851 (items.access_model='open' """ +
852 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
853 """)) as x
854 WHERE item=%s LIMIT 1"""]
855
856 args = [self.nodeDbId]
857 if authorized_groups:
858 args.append(authorized_groups)
859
860 args.append(item)
861 cursor.execute(' '.join(query), args)
862
863 return cursor.fetchall()[0][0]
864 814
865 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): 815 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
866 """ Get items which are in the given list 816 """ Get items which are in the given list
867 @param authorized_groups: we want to get items that these groups can access 817 @param authorized_groups: we want to get items that these groups can access
868 @param unrestricted: if true, don't check permissions 818 @param unrestricted: if true, don't check permissions
870 @return: list of container.ItemData 820 @return: list of container.ItemData
871 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config) 821 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config)
872 if unrestricted is False, access_model and config will be None 822 if unrestricted is False, access_model and config will be None
873 """ 823 """
874 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) 824 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)
875
876 825
877 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): 826 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers):
878 self._checkNodeExists(cursor) 827 self._checkNodeExists(cursor)
879 ret = [] 828 ret = []
880 if unrestricted: #we get everything without checking permissions 829 if unrestricted: #we get everything without checking permissions
914 if result: 863 if result:
915 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None)) 864 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None))
916 865
917 return ret 866 return ret
918 867
868 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None):
869 """Count expected number of items in a getItems query
870
871 @param authorized_groups: we want to get items that these groups can access
872 @param unrestricted: if true, don't check permissions (i.e.: get all items)
873 @param ext_data: options for extra features like RSM and MAM
874 """
875 if ext_data is None:
876 ext_data = {}
877 return self.dbpool.runInteraction(self._getItemsCount, authorized_groups, unrestricted, ext_data)
878
879 def _getItemsCount(self, cursor, authorized_groups, unrestricted, ext_data):
880 self._checkNodeExists(cursor)
881 args = []
882
883 # SELECT
884 query = ["SELECT count(1)"]
885
886 self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
887
888 cursor.execute(' '.join(query), args)
889 return cursor.fetchall()[0][0]
890
891 def getItemsIndex(self, item_id, authorized_groups, unrestricted, ext_data=None):
892 """Get expected index of first item in the window of a getItems query
893
894 @param item_id: id of the item
895 @param authorized_groups: we want to get items that these groups can access
896 @param unrestricted: if true, don't check permissions (i.e.: get all items)
897 @param ext_data: options for extra features like RSM and MAM
898 """
899 if ext_data is None:
900 ext_data = {}
901 return self.dbpool.runInteraction(self._getItemsIndex, item_id, authorized_groups, unrestricted, ext_data)
902
903 def _getItemsIndex(self, cursor, item_id, authorized_groups, unrestricted, ext_data):
904 self._checkNodeExists(cursor)
905 args = []
906
907 # SELECT
908 query = []
909
910 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
911
912 query_select = "SELECT row_number from (SELECT row_number() OVER ({}), item".format(query_order)
913 query.insert(0, query_select)
914 query.append(") as x WHERE item=%s")
915 args.append(item_id)
916
917
918 cursor.execute(' '.join(query), args)
919 # XXX: row_number start at 1, but we want that index start at 0
920 return cursor.fetchall()[0][0] - 1
919 921
920 def getItemsPublishers(self, itemIdentifiers): 922 def getItemsPublishers(self, itemIdentifiers):
921 """Get the publishers for all given identifiers 923 """Get the publishers for all given identifiers
922 924
923 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher 925 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher
924 if item is not found, key is skipped in resulting dict 926 if item is not found, key is skipped in resulting dict
925 """ 927 """
926 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) 928 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)
927
928 929
929 def _getItemsPublishers(self, cursor, itemIdentifiers): 930 def _getItemsPublishers(self, cursor, itemIdentifiers):
930 self._checkNodeExists(cursor) 931 self._checkNodeExists(cursor)
931 ret = {} 932 ret = {}
932 for itemIdentifier in itemIdentifiers: 933 for itemIdentifier in itemIdentifiers:
936 result = cursor.fetchone() 937 result = cursor.fetchone()
937 if result: 938 if result:
938 ret[itemIdentifier] = jid.JID(result[0]) 939 ret[itemIdentifier] = jid.JID(result[0])
939 return ret 940 return ret
940 941
941
942 def purge(self): 942 def purge(self):
943 return self.dbpool.runInteraction(self._purge) 943 return self.dbpool.runInteraction(self._purge)
944
945 944
946 def _purge(self, cursor): 945 def _purge(self, cursor):
947 self._checkNodeExists(cursor) 946 self._checkNodeExists(cursor)
948 947
949 cursor.execute("""DELETE FROM items WHERE 948 cursor.execute("""DELETE FROM items WHERE