comparison sat/memory/sqla.py @ 3744:658ddbabaf36

core (memory/sqla): new table/mapping to handle Pubsub node subscriptions: node subscriptions can now be cached, this can be useful for components which must keep track of subscibers. rel 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 40a6374fcd44
children 10b71e3526bd
comparison
equal deleted inserted replaced
3743:54c249ec35ce 3744:658ddbabaf36
109 109
110 def __init__(self): 110 def __init__(self):
111 self.initialized = defer.Deferred() 111 self.initialized = defer.Deferred()
112 # we keep cache for the profiles (key: profile name, value: profile id) 112 # we keep cache for the profiles (key: profile name, value: profile id)
113 # profile id to name 113 # profile id to name
114 self.profiles: Dict[int, str] = {} 114 self.profiles: Dict[str, int] = {}
115 # profile id to component entry point 115 # profile id to component entry point
116 self.components: Dict[int, str] = {} 116 self.components: Dict[int, str] = {}
117 117
118 def getProfileById(self, profile_id): 118 def getProfileById(self, profile_id):
119 return self.profiles.get(profile_id) 119 return self.profiles.get(profile_id)
1013 self, 1013 self,
1014 client: SatXMPPEntity, 1014 client: SatXMPPEntity,
1015 service: jid.JID, 1015 service: jid.JID,
1016 name: str, 1016 name: str,
1017 with_items: bool = False, 1017 with_items: bool = False,
1018 with_subscriptions: bool = False,
1018 ) -> Optional[PubsubNode]: 1019 ) -> Optional[PubsubNode]:
1019 """ 1020 """
1020 """ 1021 """
1021 async with self.session() as session: 1022 async with self.session() as session:
1022 stmt = ( 1023 stmt = (
1029 ) 1030 )
1030 if with_items: 1031 if with_items:
1031 stmt = stmt.options( 1032 stmt = stmt.options(
1032 joinedload(PubsubNode.items) 1033 joinedload(PubsubNode.items)
1033 ) 1034 )
1035 if with_subscriptions:
1036 stmt = stmt.options(
1037 joinedload(PubsubNode.subscriptions)
1038 )
1034 result = await session.execute(stmt) 1039 result = await session.execute(stmt)
1035 return result.unique().scalar_one_or_none() 1040 return result.unique().scalar_one_or_none()
1036 1041
1037 @aio 1042 @aio
1038 async def setPubsubNode( 1043 async def setPubsubNode(
1041 service: jid.JID, 1046 service: jid.JID,
1042 name: str, 1047 name: str,
1043 analyser: Optional[str] = None, 1048 analyser: Optional[str] = None,
1044 type_: Optional[str] = None, 1049 type_: Optional[str] = None,
1045 subtype: Optional[str] = None, 1050 subtype: Optional[str] = None,
1051 subscribed: bool = False,
1046 ) -> PubsubNode: 1052 ) -> PubsubNode:
1047 node = PubsubNode( 1053 node = PubsubNode(
1048 profile_id=self.profiles[client.profile], 1054 profile_id=self.profiles[client.profile],
1049 service=service, 1055 service=service,
1050 name=name, 1056 name=name,
1051 subscribed=False, 1057 subscribed=subscribed,
1052 analyser=analyser, 1058 analyser=analyser,
1053 type_=type_, 1059 type_=type_,
1054 subtype=subtype, 1060 subtype=subtype,
1061 subscriptions=[],
1055 ) 1062 )
1056 async with self.session() as session: 1063 async with self.session() as session:
1057 async with session.begin(): 1064 async with session.begin():
1058 session.add(node) 1065 session.add(node)
1059 return node 1066 return node
1185 "service": services, 1192 "service": services,
1186 "name": names, 1193 "name": names,
1187 "type_": types, 1194 "type_": types,
1188 "subtype": subtypes, 1195 "subtype": subtypes,
1189 } 1196 }
1197 if profiles is not None:
1198 node_fields["profile_id"] = [self.profiles[p] for p in profiles]
1199
1190 if any(x is not None for x in node_fields.values()): 1200 if any(x is not None for x in node_fields.values()):
1191 sub_q = select(PubsubNode.id) 1201 sub_q = select(PubsubNode.id)
1192 for col, values in node_fields.items(): 1202 for col, values in node_fields.items():
1193 if values is None: 1203 if values is None:
1194 continue 1204 continue
1195 sub_q = sub_q.where(getattr(PubsubNode, col).in_(values)) 1205 sub_q = sub_q.where(getattr(PubsubNode, col).in_(values))
1196 stmt = ( 1206 stmt = (
1197 stmt 1207 stmt
1198 .where(PubsubItem.node_id.in_(sub_q)) 1208 .where(PubsubItem.node_id.in_(sub_q))
1199 .execution_options(synchronize_session=False) 1209 .execution_options(synchronize_session=False)
1200 )
1201
1202 if profiles is not None:
1203 stmt = stmt.where(
1204 PubsubItem.profile_id.in_([self.profiles[p] for p in profiles])
1205 ) 1210 )
1206 1211
1207 if created_before is not None: 1212 if created_before is not None:
1208 stmt = stmt.where(PubsubItem.created < created_before) 1213 stmt = stmt.where(PubsubItem.created < created_before)
1209 1214