Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 318:d13526c0eb32
RSM improvments/refactoring:
- a warning message is displayed if maxItems == 0 in getItems, and an empty list is returned in this case
- use the new container.ItemData instead of doing tuple (un)packing
- the list of ItemData => list of domish.Element conversion is done at the end of the workflow
- rsm request is checked in self._items_rsm directly
- better handling of Response.index in _items_rsm
- itemsIdentifiers can't be used with RSM (the later will be ignored if this happen)
- don't do approximative unpacking anymore in _items_rsm
- countItems and getIndex have been refactored and renamed getItemsCount and getItemsIndex, don't use query duplications anymore
- cleaned query handling in getItems
- /!\ mam module is temporarly broken
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 Jan 2016 18:33:22 +0100 |
parents | 34adc4a8aa64 |
children | a51947371625 |
comparison
equal
deleted
inserted
replaced
317:34adc4a8aa64 | 318:d13526c0eb32 |
---|---|
600 nodeType = 'leaf' | 600 nodeType = 'leaf' |
601 | 601 |
602 def storeItems(self, item_data, publisher): | 602 def storeItems(self, item_data, publisher): |
603 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) | 603 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) |
604 | 604 |
605 | |
606 def _storeItems(self, cursor, items_data, publisher): | 605 def _storeItems(self, cursor, items_data, publisher): |
607 self._checkNodeExists(cursor) | 606 self._checkNodeExists(cursor) |
608 for item_data in items_data: | 607 for item_data in items_data: |
609 self._storeItem(cursor, item_data, publisher) | 608 self._storeItem(cursor, item_data, publisher) |
610 | |
611 | 609 |
612 def _storeItem(self, cursor, item_data, publisher): | 610 def _storeItem(self, cursor, item_data, publisher): |
613 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config | 611 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config |
614 data = item.toXml() | 612 data = item.toXml() |
615 | 613 |
663 VALUES (%s, %s)""", (item_id, category)) | 661 VALUES (%s, %s)""", (item_id, category)) |
664 | 662 |
665 def removeItems(self, itemIdentifiers): | 663 def removeItems(self, itemIdentifiers): |
666 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) | 664 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) |
667 | 665 |
668 | |
669 def _removeItems(self, cursor, itemIdentifiers): | 666 def _removeItems(self, cursor, itemIdentifiers): |
670 self._checkNodeExists(cursor) | 667 self._checkNodeExists(cursor) |
671 | 668 |
672 deleted = [] | 669 deleted = [] |
673 | 670 |
681 if cursor.rowcount: | 678 if cursor.rowcount: |
682 deleted.append(itemIdentifier) | 679 deleted.append(itemIdentifier) |
683 | 680 |
684 return deleted | 681 return deleted |
685 | 682 |
686 | |
687 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None): | 683 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None): |
688 """ Get all authorised items | 684 """ Get all authorised items |
689 @param authorized_groups: we want to get items that these groups can access | 685 @param authorized_groups: we want to get items that these groups can access |
690 @param unrestricted: if true, don't check permissions (i.e.: get all items) | 686 @param unrestricted: if true, don't check permissions (i.e.: get all items) |
691 @param maxItems: nb of items we want to tget | 687 @param maxItems: nb of items we want to get |
692 @param rsm_data: options for RSM feature handling (XEP-0059) as a | 688 @param ext_data: options for extra features like RSM and MAM |
693 dictionnary of C{unicode} to C{unicode}. | |
694 | 689 |
695 @return: list of container.ItemData | 690 @return: list of container.ItemData |
696 if unrestricted is False, access_model and config will be None | 691 if unrestricted is False, access_model and config will be None |
697 """ | 692 """ |
698 if ext_data is None: | 693 if ext_data is None: |
699 ext_data = {} | 694 ext_data = {} |
700 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) | 695 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) |
701 | 696 |
702 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): | 697 def _appendSourcesAndFilters(self, query, args, authorized_groups, unrestricted, ext_data): |
703 # FIXME: simplify the query construction | 698 """append sources and filters to sql query requesting items and return ORDER BY |
704 self._checkNodeExists(cursor) | 699 |
705 | 700 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for |
701 _getItems | |
702 """ | |
703 # SOURCES | |
706 if unrestricted: | 704 if unrestricted: |
707 query = ["SELECT data,items.access_model,item_id"] | 705 query.append("""FROM nodes |
708 source = """FROM nodes | 706 INNER JOIN items USING (node_id) |
709 INNER JOIN items USING (node_id) | 707 WHERE node_id=%s""") |
710 WHERE node_id=%s""" | 708 args.append(self.nodeDbId) |
711 args = [self.nodeDbId] | |
712 else: | 709 else: |
713 query = ["SELECT data"] | 710 args.append(self.nodeDbId) |
714 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" | |
715 source = """FROM nodes | |
716 INNER JOIN items USING (node_id) | |
717 LEFT JOIN item_groups_authorized USING (item_id) | |
718 WHERE node_id=%s AND | |
719 (items.access_model='open'""" + groups + ")" | |
720 | |
721 args = [self.nodeDbId] | |
722 if authorized_groups: | 711 if authorized_groups: |
712 get_groups = " or (items.access_model='roster' and groupname in %s)" | |
723 args.append(authorized_groups) | 713 args.append(authorized_groups) |
724 | 714 else: |
715 get_groups = "" | |
716 | |
717 query.append("""FROM nodes | |
718 INNER JOIN items USING (node_id) | |
719 LEFT JOIN item_groups_authorized USING (item_id) | |
720 WHERE node_id=%s AND | |
721 (items.access_model='open'""" + get_groups + ")") | |
722 | |
723 # FILTERS | |
725 if 'filters' in ext_data: # MAM filters | 724 if 'filters' in ext_data: # MAM filters |
726 for filter_ in ext_data['filters']: | 725 for filter_ in ext_data['filters']: |
727 if filter_.var == 'start': | 726 if filter_.var == 'start': |
728 source += " AND date>=%s" | 727 query.append("AND date>=%s") |
729 args.append(filter_.value) | 728 args.append(filter_.value) |
730 if filter_.var == 'end': | 729 if filter_.var == 'end': |
731 source += " AND date<=%s" | 730 query.append("AND date<=%s") |
732 args.append(filter_.value) | 731 args.append(filter_.value) |
733 if filter_.var == 'with': | 732 if filter_.var == 'with': |
734 jid_s = filter_.value | 733 jid_s = filter_.value |
735 if '/' in jid_s: | 734 if '/' in jid_s: |
736 source += " AND publisher=%s" | 735 query.append("AND publisher=%s") |
737 args.append(filter_.value) | 736 args.append(filter_.value) |
738 else: | 737 else: |
739 source += " AND publisher LIKE %s" | 738 query.append("AND publisher LIKE %s") |
740 args.append(u"{}%".format(filter_.value)) | 739 args.append(u"{}%".format(filter_.value)) |
741 | 740 |
742 query.append(source) | 741 return "ORDER BY date DESC" |
743 order = "DESC" | 742 |
743 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): | |
744 self._checkNodeExists(cursor) | |
745 | |
746 if maxItems == 0: | |
747 return [] | |
748 | |
749 args = [] | |
750 | |
751 # SELECT | |
752 query = ["SELECT data,items.access_model,item_id,date"] | |
753 | |
754 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
744 | 755 |
745 if 'rsm' in ext_data: | 756 if 'rsm' in ext_data: |
746 rsm = ext_data['rsm'] | 757 rsm = ext_data['rsm'] |
747 maxItems = rsm.max | 758 maxItems = rsm.max |
748 if rsm.index is not None: | 759 if rsm.index is not None: |
749 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") | 760 # We need to know the date of corresponding to the index (offset) of the current query |
750 # FIXME: change the request so source is not used 2 times | 761 # so we execute the query to look for the date |
751 # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args | 762 tmp_query = query[:] |
752 args.append(self.nodeDbId) | 763 tmp_args = args[:] |
753 if authorized_groups: | 764 tmp_query[0] = "SELECT date" |
754 args.append(authorized_groups) | 765 tmp_query.append("{} LIMIT 1 OFFSET %s".format(query_order)) |
755 args.append(rsm.index) | 766 tmp_args.append(rsm.index) |
767 cursor.execute(' '.join(query), args) | |
768 # FIXME: bad index is not managed yet | |
769 date = cursor.fetchall()[0][0] | |
770 | |
771 # now that we have the date, we can use it | |
772 query.append("AND date<=%s") | |
773 args.append(date) | |
756 elif rsm.before is not None: | 774 elif rsm.before is not None: |
757 order = "ASC" | |
758 if rsm.before != '': | 775 if rsm.before != '': |
759 query.append("AND date>(SELECT date FROM items WHERE item=%s LIMIT 1)") | 776 query.append("AND date>(SELECT date FROM items WHERE item=%s LIMIT 1)") |
760 args.append(rsm.before) | 777 args.append(rsm.before) |
778 if maxItems is not None: | |
779 # if we have maxItems (i.e. a limit), we need to reverse order | |
780 # in a first query to get the right items | |
781 query.insert(0,"SELECT * from (") | |
782 query.append("ORDER BY date ASC LIMIT %s) as x") | |
783 args.append(maxItems) | |
761 elif rsm.after: | 784 elif rsm.after: |
762 query.append("AND date<(SELECT date FROM items WHERE item=%s LIMIT 1)") | 785 query.append("AND date<(SELECT date FROM items WHERE item=%s LIMIT 1)") |
763 args.append(rsm.after) | 786 args.append(rsm.after) |
764 | 787 |
765 query.append("ORDER BY date %s" % order) | 788 query.append(query_order) |
766 | 789 |
767 if maxItems is not None: | 790 if maxItems is not None: |
768 query.append("LIMIT %s") | 791 query.append("LIMIT %s") |
769 args.append(maxItems) | 792 args.append(maxItems) |
770 | 793 |
771 cursor.execute(' '.join(query), args) | 794 cursor.execute(' '.join(query), args) |
772 | 795 |
773 result = cursor.fetchall() | 796 result = cursor.fetchall() |
774 if unrestricted: | 797 if unrestricted: |
798 # with unrestricted query, we need to fill the access_list for a roster access items | |
775 ret = [] | 799 ret = [] |
776 for data in result: | 800 for data in result: |
777 item = generic.stripNamespace(parseXml(data[0])) | 801 item = generic.stripNamespace(parseXml(data[0])) |
778 access_model = data[1] | 802 access_model = data[1] |
779 item_id = data[2] | 803 item_id = data[2] |
782 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 806 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
783 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] | 807 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] |
784 | 808 |
785 ret.append(container.ItemData(item, access_model, access_list)) | 809 ret.append(container.ItemData(item, access_model, access_list)) |
786 return ret | 810 return ret |
811 | |
787 items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result] | 812 items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), None, None) for r in result] |
788 return items_data | 813 return items_data |
789 | |
790 def countItems(self, authorized_groups, unrestricted): | |
791 """ Count the accessible items. | |
792 | |
793 @param authorized_groups: we want to get items that these groups can access. | |
794 @param unrestricted: if true, don't check permissions (i.e.: get all items). | |
795 @return: deferred that fires a C{int}. | |
796 """ | |
797 return self.dbpool.runInteraction(self._countItems, authorized_groups, unrestricted) | |
798 | |
799 def _countItems(self, cursor, authorized_groups, unrestricted): | |
800 # FIXME: should not be a separate method, but should be an option of getItems instead | |
801 self._checkNodeExists(cursor) | |
802 | |
803 if unrestricted: | |
804 query = ["""SELECT count(item_id) FROM nodes | |
805 INNER JOIN items USING (node_id) | |
806 WHERE node_id=%s"""] | |
807 args = [self.nodeDbId] | |
808 else: | |
809 query = ["""SELECT count(item_id) FROM nodes | |
810 INNER JOIN items USING (node_id) | |
811 LEFT JOIN item_groups_authorized USING (item_id) | |
812 WHERE node_id=%s AND | |
813 (items.access_model='open' """ + | |
814 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + | |
815 ")"] | |
816 | |
817 args = [self.nodeDbId] | |
818 if authorized_groups: | |
819 args.append(authorized_groups) | |
820 | |
821 cursor.execute(' '.join(query), args) | |
822 return cursor.fetchall()[0][0] | |
823 | |
824 def getIndex(self, authorized_groups, unrestricted, item): | |
825 """ Retrieve the index of the given item within the accessible window. | |
826 | |
827 @param authorized_groups: we want to get items that these groups can access. | |
828 @param unrestricted: if true, don't check permissions (i.e.: get all items). | |
829 @param item: item identifier. | |
830 @return: deferred that fires a C{int}. | |
831 """ | |
832 return self.dbpool.runInteraction(self._getIndex, authorized_groups, unrestricted, item) | |
833 | |
834 def _getIndex(self, cursor, authorized_groups, unrestricted, item): | |
835 self._checkNodeExists(cursor) | |
836 | |
837 if unrestricted: | |
838 query = ["""SELECT row_number FROM ( | |
839 SELECT row_number() OVER (ORDER BY date DESC), item | |
840 FROM nodes INNER JOIN items USING (node_id) | |
841 WHERE node_id=%s | |
842 ) as x | |
843 WHERE item=%s LIMIT 1"""] | |
844 args = [self.nodeDbId] | |
845 else: | |
846 query = ["""SELECT row_number FROM ( | |
847 SELECT row_number() OVER (ORDER BY date DESC), item | |
848 FROM nodes INNER JOIN items USING (node_id) | |
849 LEFT JOIN item_groups_authorized USING (item_id) | |
850 WHERE node_id=%s AND | |
851 (items.access_model='open' """ + | |
852 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + | |
853 """)) as x | |
854 WHERE item=%s LIMIT 1"""] | |
855 | |
856 args = [self.nodeDbId] | |
857 if authorized_groups: | |
858 args.append(authorized_groups) | |
859 | |
860 args.append(item) | |
861 cursor.execute(' '.join(query), args) | |
862 | |
863 return cursor.fetchall()[0][0] | |
864 | 814 |
865 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): | 815 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): |
866 """ Get items which are in the given list | 816 """ Get items which are in the given list |
867 @param authorized_groups: we want to get items that these groups can access | 817 @param authorized_groups: we want to get items that these groups can access |
868 @param unrestricted: if true, don't check permissions | 818 @param unrestricted: if true, don't check permissions |
870 @return: list of container.ItemData | 820 @return: list of container.ItemData |
871 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config) | 821 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config) |
872 if unrestricted is False, access_model and config will be None | 822 if unrestricted is False, access_model and config will be None |
873 """ | 823 """ |
874 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) | 824 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) |
875 | |
876 | 825 |
877 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): | 826 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): |
878 self._checkNodeExists(cursor) | 827 self._checkNodeExists(cursor) |
879 ret = [] | 828 ret = [] |
880 if unrestricted: #we get everything without checking permissions | 829 if unrestricted: #we get everything without checking permissions |
914 if result: | 863 if result: |
915 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None)) | 864 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), None, None)) |
916 | 865 |
917 return ret | 866 return ret |
918 | 867 |
868 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None): | |
869 """Count expected number of items in a getItems query | |
870 | |
871 @param authorized_groups: we want to get items that these groups can access | |
872 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
873 @param ext_data: options for extra features like RSM and MAM | |
874 """ | |
875 if ext_data is None: | |
876 ext_data = {} | |
877 return self.dbpool.runInteraction(self._getItemsCount, authorized_groups, unrestricted, ext_data) | |
878 | |
879 def _getItemsCount(self, cursor, authorized_groups, unrestricted, ext_data): | |
880 self._checkNodeExists(cursor) | |
881 args = [] | |
882 | |
883 # SELECT | |
884 query = ["SELECT count(1)"] | |
885 | |
886 self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
887 | |
888 cursor.execute(' '.join(query), args) | |
889 return cursor.fetchall()[0][0] | |
890 | |
891 def getItemsIndex(self, item_id, authorized_groups, unrestricted, ext_data=None): | |
892 """Get expected index of first item in the window of a getItems query | |
893 | |
894 @param item_id: id of the item | |
895 @param authorized_groups: we want to get items that these groups can access | |
896 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
897 @param ext_data: options for extra features like RSM and MAM | |
898 """ | |
899 if ext_data is None: | |
900 ext_data = {} | |
901 return self.dbpool.runInteraction(self._getItemsIndex, item_id, authorized_groups, unrestricted, ext_data) | |
902 | |
903 def _getItemsIndex(self, cursor, item_id, authorized_groups, unrestricted, ext_data): | |
904 self._checkNodeExists(cursor) | |
905 args = [] | |
906 | |
907 # SELECT | |
908 query = [] | |
909 | |
910 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
911 | |
912 query_select = "SELECT row_number from (SELECT row_number() OVER ({}), item".format(query_order) | |
913 query.insert(0, query_select) | |
914 query.append(") as x WHERE item=%s") | |
915 args.append(item_id) | |
916 | |
917 | |
918 cursor.execute(' '.join(query), args) | |
919 # XXX: row_number start at 1, but we want that index start at 0 | |
920 return cursor.fetchall()[0][0] - 1 | |
919 | 921 |
920 def getItemsPublishers(self, itemIdentifiers): | 922 def getItemsPublishers(self, itemIdentifiers): |
921 """Get the publishers for all given identifiers | 923 """Get the publishers for all given identifiers |
922 | 924 |
923 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher | 925 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher |
924 if item is not found, key is skipped in resulting dict | 926 if item is not found, key is skipped in resulting dict |
925 """ | 927 """ |
926 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) | 928 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) |
927 | |
928 | 929 |
929 def _getItemsPublishers(self, cursor, itemIdentifiers): | 930 def _getItemsPublishers(self, cursor, itemIdentifiers): |
930 self._checkNodeExists(cursor) | 931 self._checkNodeExists(cursor) |
931 ret = {} | 932 ret = {} |
932 for itemIdentifier in itemIdentifiers: | 933 for itemIdentifier in itemIdentifiers: |
936 result = cursor.fetchone() | 937 result = cursor.fetchone() |
937 if result: | 938 if result: |
938 ret[itemIdentifier] = jid.JID(result[0]) | 939 ret[itemIdentifier] = jid.JID(result[0]) |
939 return ret | 940 return ret |
940 | 941 |
941 | |
942 def purge(self): | 942 def purge(self): |
943 return self.dbpool.runInteraction(self._purge) | 943 return self.dbpool.runInteraction(self._purge) |
944 | |
945 | 944 |
946 def _purge(self, cursor): | 945 def _purge(self, cursor): |
947 self._checkNodeExists(cursor) | 946 self._checkNodeExists(cursor) |
948 | 947 |
949 cursor.execute("""DELETE FROM items WHERE | 948 cursor.execute("""DELETE FROM items WHERE |