Mercurial > libervia-backend
comparison sat/memory/sqla.py @ 3744:658ddbabaf36
core (memory/sqla): new table/mapping to handle Pubsub node subscriptions:
node subscriptions can now be cached, this can be useful for components which must keep
track of subscibers.
rel 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 40a6374fcd44 |
children | 10b71e3526bd |
comparison
equal
deleted
inserted
replaced
3743:54c249ec35ce | 3744:658ddbabaf36 |
---|---|
109 | 109 |
110 def __init__(self): | 110 def __init__(self): |
111 self.initialized = defer.Deferred() | 111 self.initialized = defer.Deferred() |
112 # we keep cache for the profiles (key: profile name, value: profile id) | 112 # we keep cache for the profiles (key: profile name, value: profile id) |
113 # profile id to name | 113 # profile id to name |
114 self.profiles: Dict[int, str] = {} | 114 self.profiles: Dict[str, int] = {} |
115 # profile id to component entry point | 115 # profile id to component entry point |
116 self.components: Dict[int, str] = {} | 116 self.components: Dict[int, str] = {} |
117 | 117 |
118 def getProfileById(self, profile_id): | 118 def getProfileById(self, profile_id): |
119 return self.profiles.get(profile_id) | 119 return self.profiles.get(profile_id) |
1013 self, | 1013 self, |
1014 client: SatXMPPEntity, | 1014 client: SatXMPPEntity, |
1015 service: jid.JID, | 1015 service: jid.JID, |
1016 name: str, | 1016 name: str, |
1017 with_items: bool = False, | 1017 with_items: bool = False, |
1018 with_subscriptions: bool = False, | |
1018 ) -> Optional[PubsubNode]: | 1019 ) -> Optional[PubsubNode]: |
1019 """ | 1020 """ |
1020 """ | 1021 """ |
1021 async with self.session() as session: | 1022 async with self.session() as session: |
1022 stmt = ( | 1023 stmt = ( |
1029 ) | 1030 ) |
1030 if with_items: | 1031 if with_items: |
1031 stmt = stmt.options( | 1032 stmt = stmt.options( |
1032 joinedload(PubsubNode.items) | 1033 joinedload(PubsubNode.items) |
1033 ) | 1034 ) |
1035 if with_subscriptions: | |
1036 stmt = stmt.options( | |
1037 joinedload(PubsubNode.subscriptions) | |
1038 ) | |
1034 result = await session.execute(stmt) | 1039 result = await session.execute(stmt) |
1035 return result.unique().scalar_one_or_none() | 1040 return result.unique().scalar_one_or_none() |
1036 | 1041 |
1037 @aio | 1042 @aio |
1038 async def setPubsubNode( | 1043 async def setPubsubNode( |
1041 service: jid.JID, | 1046 service: jid.JID, |
1042 name: str, | 1047 name: str, |
1043 analyser: Optional[str] = None, | 1048 analyser: Optional[str] = None, |
1044 type_: Optional[str] = None, | 1049 type_: Optional[str] = None, |
1045 subtype: Optional[str] = None, | 1050 subtype: Optional[str] = None, |
1051 subscribed: bool = False, | |
1046 ) -> PubsubNode: | 1052 ) -> PubsubNode: |
1047 node = PubsubNode( | 1053 node = PubsubNode( |
1048 profile_id=self.profiles[client.profile], | 1054 profile_id=self.profiles[client.profile], |
1049 service=service, | 1055 service=service, |
1050 name=name, | 1056 name=name, |
1051 subscribed=False, | 1057 subscribed=subscribed, |
1052 analyser=analyser, | 1058 analyser=analyser, |
1053 type_=type_, | 1059 type_=type_, |
1054 subtype=subtype, | 1060 subtype=subtype, |
1061 subscriptions=[], | |
1055 ) | 1062 ) |
1056 async with self.session() as session: | 1063 async with self.session() as session: |
1057 async with session.begin(): | 1064 async with session.begin(): |
1058 session.add(node) | 1065 session.add(node) |
1059 return node | 1066 return node |
1185 "service": services, | 1192 "service": services, |
1186 "name": names, | 1193 "name": names, |
1187 "type_": types, | 1194 "type_": types, |
1188 "subtype": subtypes, | 1195 "subtype": subtypes, |
1189 } | 1196 } |
1197 if profiles is not None: | |
1198 node_fields["profile_id"] = [self.profiles[p] for p in profiles] | |
1199 | |
1190 if any(x is not None for x in node_fields.values()): | 1200 if any(x is not None for x in node_fields.values()): |
1191 sub_q = select(PubsubNode.id) | 1201 sub_q = select(PubsubNode.id) |
1192 for col, values in node_fields.items(): | 1202 for col, values in node_fields.items(): |
1193 if values is None: | 1203 if values is None: |
1194 continue | 1204 continue |
1195 sub_q = sub_q.where(getattr(PubsubNode, col).in_(values)) | 1205 sub_q = sub_q.where(getattr(PubsubNode, col).in_(values)) |
1196 stmt = ( | 1206 stmt = ( |
1197 stmt | 1207 stmt |
1198 .where(PubsubItem.node_id.in_(sub_q)) | 1208 .where(PubsubItem.node_id.in_(sub_q)) |
1199 .execution_options(synchronize_session=False) | 1209 .execution_options(synchronize_session=False) |
1200 ) | |
1201 | |
1202 if profiles is not None: | |
1203 stmt = stmt.where( | |
1204 PubsubItem.profile_id.in_([self.profiles[p] for p in profiles]) | |
1205 ) | 1210 ) |
1206 | 1211 |
1207 if created_before is not None: | 1212 if created_before is not None: |
1208 stmt = stmt.where(PubsubItem.created < created_before) | 1213 stmt = stmt.where(PubsubItem.created < created_before) |
1209 | 1214 |