changeset 4390:5e48634ccada

memory (sqla): update `set_pubsub_node` and `get_pubsub_nodes` to handle pubsub relationships: - `parent_node` and `linked_node` can now be specified in `set_pubsub_node`. - `get_pubsub_nodes` has been rewritten to recursively traverse a tree and retrieve nodes matching criteria and their linking nodes/children/items. rel 463
author Goffi <goffi@goffi.org>
date Fri, 29 Aug 2025 18:52:58 +0200
parents 4895cf954fbe
children c2228563bf0f
files libervia/backend/memory/sqla.py
diffstat 1 files changed, 131 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/memory/sqla.py	Fri Aug 29 17:59:12 2025 +0200
+++ b/libervia/backend/memory/sqla.py	Fri Aug 29 18:52:58 2025 +0200
@@ -28,7 +28,7 @@
 
 from alembic import config as al_config, script as al_script
 from alembic.runtime import migration as al_migration
-from sqlalchemy import and_, delete, event, exists, func, or_, update
+from sqlalchemy import and_, delete, event, exists, func, literal, or_, update
 from sqlalchemy import Integer, literal_column, text
 from sqlalchemy.dialects.sqlite import insert
 from sqlalchemy.engine import Connection, Engine
@@ -1220,7 +1220,7 @@
                     # the node may already exist, if it has been created just after
                     # get_pubsub_node above
                     log.debug("ignoring UNIQUE constraint error")
-                    cached_node = await as_future(
+                    return await as_future(
                         self.get_pubsub_node(
                             client,
                             service,
@@ -1239,24 +1239,137 @@
     async def get_pubsub_nodes(
         self,
         client: SatXMPPEntity|None,
-        service: jid.JID|None
+        service: jid.JID|None,
+        node_name: str|None = None,
+        linked_node: bool = False,
+        depth: int = 0,
+        with_items: bool = False,
+        with_subscriptions: bool = False,
+        with_affiliations: bool = False,
     ) -> list[PubsubNode]:
-        """Retrieve pubsub nodes matching arguments.
+        """Retrieve pubsub nodes with their hierarchy and linking nodes.
+
+        Nodes are returned in a tree, i.e., root nodes are returned, and their children,
+        linking nodes and items are populated if requested.
 
         @param client: If set, only return nodes of this client profile.
         @param service: If set, only return nodes from this service.
-        @return: List of matching pubsub nodes.
+        @param node_name: If set, start search from this node.
+            If None, start from all root nodes (nodes without parents).
+        @param linked_node: If True, populate linking_nodes relationship for each base
+            node.
+        @param depth: Maximum depth of child hierarchy to traverse (0 = only starting
+            nodes).
+        @param with_items: If True, eagerly load items for each node.
+        @param with_subscriptions: If True, eagerly load subscriptions for each node.
+        @param with_affiliations: If True, eagerly load affiliations for each node.
+        @return: List of base nodes matching criteria (root nodes of the request).
         """
+        # Base criteria that always apply
+        criteria = []
+        if client is not None:
+            criteria.append(PubsubNode.profile_id == self.profiles[client.profile])
+        if service is not None:
+            criteria.append(PubsubNode.service == service)
+
+        # Add filter for base nodes (not linked to another node)
+        criteria.append(PubsubNode.linked_node_id.is_(None))
+
+        # Prepare query options
+        if depth > 0:
+            options = [
+                selectinload(PubsubNode.child_nodes),
+                joinedload(PubsubNode.parent_node)
+            ]
+        else:
+            options = []
+        if with_items:
+            options.append(selectinload(PubsubNode.items))
+        if linked_node:
+            options.append(selectinload(PubsubNode.linking_nodes))
+        if with_subscriptions:
+            options.append(selectinload(PubsubNode.subscriptions))
+        if with_affiliations:
+            options.append(selectinload(PubsubNode.affiliations))
+
         async with self.session() as session:
-            stm = select(PubsubNode)
-            if client is not None:
-                profile_id = self.profiles[client.profile]
-                stm = stm.where(PubsubNode.profile_id == profile_id)
-            if service is not None:
-                stm = stm.where(PubsubNode.service == service)
-            result = await session.execute(stm)
+            # 1. Determine base query for starting nodes.
+            if node_name is not None:
+                # Specific node as starting point (must be a base node).
+                base_query = select(PubsubNode).filter(
+                    PubsubNode.name == node_name, *criteria
+                )
+            else:
+                # All root base nodes as starting points (nodes without parent that are
+                # base nodes).
+                base_query = select(PubsubNode).filter(
+                    PubsubNode.parent_node_id.is_(None), *criteria
+                )
+
+            # Apply eager loading to base query
+            for option in options:
+                base_query = base_query.options(option)
+
+            # Execute base query first (we need these nodes to build hierarchy).
+            base_result = await session.execute(base_query)
+            base_nodes = base_result.unique().scalars().all()
+
+            # If no base nodes found, return empty list.
+            if not base_nodes:
+                return []
+
+            # 2. Build child hierarchy CTE for base nodes (if depth > 0).
+            if depth > 0:
+                base_node_ids = [node.id for node in base_nodes]
+
+                # Create recursive CTE to find child base nodes.
+                hierarchy = select(
+                    PubsubNode.id,
+                    literal(0).label('depth')
+                ).filter(
+                    PubsubNode.id.in_(base_node_ids),
+                    PubsubNode.linked_node_id.is_(None)
+                ).cte(name="hierarchy", recursive=True)
 
-        return result.scalars().all()
+                # Recursive part: get direct children that are also base nodes.
+                children = (
+                    select(
+                        PubsubNode.id,
+                        hierarchy.c.depth + 1
+                    )
+                    .join(hierarchy, PubsubNode.parent_node_id == hierarchy.c.id)
+                    .filter(
+                        hierarchy.c.depth < depth,
+                        PubsubNode.linked_node_id.is_(None)
+                    )
+                )
+
+                # Combine and create the full hierarchy.
+                full_hierarchy = hierarchy.union_all(children)
+
+                # Final query for all base nodes in the hierarchy.
+                hierarchy_query = (
+                    select(PubsubNode)
+                    .filter(PubsubNode.id.in_(
+                        select(full_hierarchy.c.id)
+                    ))
+                )
+
+                # Apply the same eager loading options.
+                for option in options:
+                    hierarchy_query = hierarchy_query.options(option)
+
+                # Execute hierarchy query.
+                hierarchy_result = await session.execute(hierarchy_query)
+                all_base_nodes = hierarchy_result.unique().scalars().all()
+
+                # Return only the original base nodes (the root nodes of our query)
+                # but taken from the fully-loaded hierarchy results.
+                base_node_ids = {node.id for node in base_nodes}
+                return [node for node in all_base_nodes if node.id in base_node_ids]
+            else:
+                # When depth=0, results are just the base nodes with relationships loaded.
+                return base_nodes
 
     @aio
     async def set_pubsub_node(
@@ -1274,6 +1387,8 @@
         items: list[PubsubItem]|None = None,
         affiliations: list[PubsubAffiliation]|None = None,
         subscriptions: list[PubsubSub]|None = None,
+        parent_node: PubsubNode|None = None,
+        linked_node: PubsubNode|None = None
     ) -> PubsubNode:
         node = PubsubNode(
             profile_id=self.profiles[client.profile],
@@ -1288,7 +1403,9 @@
             extra = extra,
             items = items or [],
             affiliations = affiliations or [],
-            subscriptions = subscriptions or []
+            subscriptions = subscriptions or [],
+            parent_node = parent_node,
+            linked_node = linked_node
         )
         async with self.session() as session:
             async with session.begin():