Mercurial > libervia-backend
changeset 4390:5e48634ccada
memory (sqla): update `set_pubsub_node` and `get_pubsub_nodes` to handle pubsub relationships:
- `parent_node` and `linked_node` can now be specified in `set_pubsub_node`.
- `get_pubsub_nodes` has been rewritten to recursively traverse a tree and retrieve nodes
matching criteria and their linking nodes/children/items.
rel 463
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 29 Aug 2025 18:52:58 +0200 |
parents | 4895cf954fbe |
children | c2228563bf0f |
files | libervia/backend/memory/sqla.py |
diffstat | 1 files changed, 131 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/memory/sqla.py Fri Aug 29 17:59:12 2025 +0200 +++ b/libervia/backend/memory/sqla.py Fri Aug 29 18:52:58 2025 +0200 @@ -28,7 +28,7 @@ from alembic import config as al_config, script as al_script from alembic.runtime import migration as al_migration -from sqlalchemy import and_, delete, event, exists, func, or_, update +from sqlalchemy import and_, delete, event, exists, func, literal, or_, update from sqlalchemy import Integer, literal_column, text from sqlalchemy.dialects.sqlite import insert from sqlalchemy.engine import Connection, Engine @@ -1220,7 +1220,7 @@ # the node may already exist, if it has been created just after # get_pubsub_node above log.debug("ignoring UNIQUE constraint error") - cached_node = await as_future( + return await as_future( self.get_pubsub_node( client, service, @@ -1239,24 +1239,137 @@ async def get_pubsub_nodes( self, client: SatXMPPEntity|None, - service: jid.JID|None + service: jid.JID|None, + node_name: str|None = None, + linked_node: bool = False, + depth: int = 0, + with_items: bool = False, + with_subscriptions: bool = False, + with_affiliations: bool = False, ) -> list[PubsubNode]: - """Retrieve pubsub nodes matching arguments. + """Retrieve pubsub nodes with their hierarchy and linking nodes. + + Nodes are returned in a tree, i.e., root nodes are returned, and their children, + linking nodes and items are populated if requested. @param client: If set, only return nodes of this client profile. @param service: If set, only return nodes from this service. - @return: List of matching pubsub nodes. + @param node_name: If set, start search from this node. + If None, start from all root nodes (nodes without parents). + @param linked_node: If True, populate linking_nodes relationship for each base + node. + @param depth: Maximum depth of child hierarchy to traverse (0 = only starting + nodes). + @param with_items: If True, eagerly load items for each node. + @param with_subscriptions: If True, eagerly load subscriptions for each node. + @param with_affiliations: If True, eagerly load affiliations for each node. + @return: List of base nodes matching criteria (root nodes of the request). """ + # Base criteria that always apply + criteria = [] + if client is not None: + criteria.append(PubsubNode.profile_id == self.profiles[client.profile]) + if service is not None: + criteria.append(PubsubNode.service == service) + + # Add filter for base nodes (not linked to another node) + criteria.append(PubsubNode.linked_node_id.is_(None)) + + # Prepare query options + if depth > 0: + options = [ + selectinload(PubsubNode.child_nodes), + joinedload(PubsubNode.parent_node) + ] + else: + options = [] + if with_items: + options.append(selectinload(PubsubNode.items)) + if linked_node: + options.append(selectinload(PubsubNode.linking_nodes)) + if with_subscriptions: + options.append(selectinload(PubsubNode.subscriptions)) + if with_affiliations: + options.append(selectinload(PubsubNode.affiliations)) + async with self.session() as session: - stm = select(PubsubNode) - if client is not None: - profile_id = self.profiles[client.profile] - stm = stm.where(PubsubNode.profile_id == profile_id) - if service is not None: - stm = stm.where(PubsubNode.service == service) - result = await session.execute(stm) + # 1. Determine base query for starting nodes. + if node_name is not None: + # Specific node as starting point (must be a base node). + base_query = select(PubsubNode).filter( + PubsubNode.name == node_name, *criteria + ) + else: + # All root base nodes as starting points (nodes without parent that are + # base nodes). + base_query = select(PubsubNode).filter( + PubsubNode.parent_node_id.is_(None), *criteria + ) + + # Apply eager loading to base query + for option in options: + base_query = base_query.options(option) + + # Execute base query first (we need these nodes to build hierarchy). + base_result = await session.execute(base_query) + base_nodes = base_result.unique().scalars().all() + + # If no base nodes found, return empty list. + if not base_nodes: + return [] + + # 2. Build child hierarchy CTE for base nodes (if depth > 0). + if depth > 0: + base_node_ids = [node.id for node in base_nodes] + + # Create recursive CTE to find child base nodes. + hierarchy = select( + PubsubNode.id, + literal(0).label('depth') + ).filter( + PubsubNode.id.in_(base_node_ids), + PubsubNode.linked_node_id.is_(None) + ).cte(name="hierarchy", recursive=True) - return result.scalars().all() + # Recursive part: get direct children that are also base nodes. + children = ( + select( + PubsubNode.id, + hierarchy.c.depth + 1 + ) + .join(hierarchy, PubsubNode.parent_node_id == hierarchy.c.id) + .filter( + hierarchy.c.depth < depth, + PubsubNode.linked_node_id.is_(None) + ) + ) + + # Combine and create the full hierarchy. + full_hierarchy = hierarchy.union_all(children) + + # Final query for all base nodes in the hierarchy. + hierarchy_query = ( + select(PubsubNode) + .filter(PubsubNode.id.in_( + select(full_hierarchy.c.id) + )) + ) + + # Apply the same eager loading options. + for option in options: + hierarchy_query = hierarchy_query.options(option) + + # Execute hierarchy query. + hierarchy_result = await session.execute(hierarchy_query) + all_base_nodes = hierarchy_result.unique().scalars().all() + + # Return only the original base nodes (the root nodes of our query) + # but taken from the fully-loaded hierarchy results. + base_node_ids = {node.id for node in base_nodes} + return [node for node in all_base_nodes if node.id in base_node_ids] + else: + # When depth=0, results are just the base nodes with relationships loaded. + return base_nodes @aio async def set_pubsub_node( @@ -1274,6 +1387,8 @@ items: list[PubsubItem]|None = None, affiliations: list[PubsubAffiliation]|None = None, subscriptions: list[PubsubSub]|None = None, + parent_node: PubsubNode|None = None, + linked_node: PubsubNode|None = None ) -> PubsubNode: node = PubsubNode( profile_id=self.profiles[client.profile], @@ -1288,7 +1403,9 @@ extra = extra, items = items or [], affiliations = affiliations or [], - subscriptions = subscriptions or [] + subscriptions = subscriptions or [], + parent_node = parent_node, + linked_node = linked_node ) async with self.session() as session: async with session.begin():