comparison sat/memory/sqla.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 1a77e1f866f9
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
120 # profile id to name 120 # profile id to name
121 self.profiles: Dict[str, int] = {} 121 self.profiles: Dict[str, int] = {}
122 # profile id to component entry point 122 # profile id to component entry point
123 self.components: Dict[int, str] = {} 123 self.components: Dict[int, str] = {}
124 124
125 def getProfileById(self, profile_id): 125 def get_profile_by_id(self, profile_id):
126 return self.profiles.get(profile_id) 126 return self.profiles.get(profile_id)
127 127
128 async def migrateApply(self, *args: str, log_output: bool = False) -> None: 128 async def migrate_apply(self, *args: str, log_output: bool = False) -> None:
129 """Do a migration command 129 """Do a migration command
130 130
131 Commands are applied by running Alembic in a subprocess. 131 Commands are applied by running Alembic in a subprocess.
132 Arguments are alembic executables commands 132 Arguments are alembic executables commands
133 133
165 db_config["path"].parent.mkdir(0o700, True, True) 165 db_config["path"].parent.mkdir(0o700, True, True)
166 async with engine.begin() as conn: 166 async with engine.begin() as conn:
167 await conn.run_sync(Base.metadata.create_all) 167 await conn.run_sync(Base.metadata.create_all)
168 168
169 log.debug("stamping the database") 169 log.debug("stamping the database")
170 await self.migrateApply("stamp", "head") 170 await self.migrate_apply("stamp", "head")
171 log.debug("stamping done") 171 log.debug("stamping done")
172 172
173 def _check_db_is_up_to_date(self, conn: Connection) -> bool: 173 def _check_db_is_up_to_date(self, conn: Connection) -> bool:
174 al_ini_path = migration_path / "alembic.ini" 174 al_ini_path = migration_path / "alembic.ini"
175 al_cfg = al_config.Config(al_ini_path) 175 al_cfg = al_config.Config(al_ini_path)
191 if up_to_date: 191 if up_to_date:
192 log.debug("Database is up-to-date") 192 log.debug("Database is up-to-date")
193 else: 193 else:
194 log.info("Database needs to be updated") 194 log.info("Database needs to be updated")
195 log.info("updating…") 195 log.info("updating…")
196 await self.migrateApply("upgrade", "head", log_output=True) 196 await self.migrate_apply("upgrade", "head", log_output=True)
197 log.info("Database is now up-to-date") 197 log.info("Database is now up-to-date")
198 198
199 @aio 199 @aio
200 async def initialise(self) -> None: 200 async def initialise(self) -> None:
201 log.info(_("Connecting database")) 201 log.info(_("Connecting database"))
202 202
203 db_config = sqla_config.getDbConfig() 203 db_config = sqla_config.get_db_config()
204 engine = create_async_engine( 204 engine = create_async_engine(
205 db_config["url"], 205 db_config["url"],
206 future=True, 206 future=True,
207 ) 207 )
208 208
286 await session.delete(obj) 286 await session.delete(obj)
287 await session.commit() 287 await session.commit()
288 288
289 ## Profiles 289 ## Profiles
290 290
291 def getProfilesList(self) -> List[str]: 291 def get_profiles_list(self) -> List[str]:
292 """"Return list of all registered profiles""" 292 """"Return list of all registered profiles"""
293 return list(self.profiles.keys()) 293 return list(self.profiles.keys())
294 294
295 def hasProfile(self, profile_name: str) -> bool: 295 def has_profile(self, profile_name: str) -> bool:
296 """return True if profile_name exists 296 """return True if profile_name exists
297 297
298 @param profile_name: name of the profile to check 298 @param profile_name: name of the profile to check
299 """ 299 """
300 return profile_name in self.profiles 300 return profile_name in self.profiles
301 301
302 def profileIsComponent(self, profile_name: str) -> bool: 302 def profile_is_component(self, profile_name: str) -> bool:
303 try: 303 try:
304 return self.profiles[profile_name] in self.components 304 return self.profiles[profile_name] in self.components
305 except KeyError: 305 except KeyError:
306 raise exceptions.NotFound("the requested profile doesn't exists") 306 raise exceptions.NotFound("the requested profile doesn't exists")
307 307
308 def getEntryPoint(self, profile_name: str) -> str: 308 def get_entry_point(self, profile_name: str) -> str:
309 try: 309 try:
310 return self.components[self.profiles[profile_name]] 310 return self.components[self.profiles[profile_name]]
311 except KeyError: 311 except KeyError:
312 raise exceptions.NotFound("the requested profile doesn't exists or is not a component") 312 raise exceptions.NotFound("the requested profile doesn't exists or is not a component")
313 313
314 @aio 314 @aio
315 async def createProfile(self, name: str, component_ep: Optional[str] = None) -> None: 315 async def create_profile(self, name: str, component_ep: Optional[str] = None) -> None:
316 """Create a new profile 316 """Create a new profile
317 317
318 @param name: name of the profile 318 @param name: name of the profile
319 @param component: if not None, must point to a component entry point 319 @param component: if not None, must point to a component entry point
320 """ 320 """
329 session.add(component) 329 session.add(component)
330 self.components[profile.id] = component_ep 330 self.components[profile.id] = component_ep
331 return profile 331 return profile
332 332
333 @aio 333 @aio
334 async def deleteProfile(self, name: str) -> None: 334 async def delete_profile(self, name: str) -> None:
335 """Delete profile 335 """Delete profile
336 336
337 @param name: name of the profile 337 @param name: name of the profile
338 """ 338 """
339 async with self.session() as session: 339 async with self.session() as session:
347 log.info(_("Profile {name!r} deleted").format(name = name)) 347 log.info(_("Profile {name!r} deleted").format(name = name))
348 348
349 ## Params 349 ## Params
350 350
351 @aio 351 @aio
352 async def loadGenParams(self, params_gen: dict) -> None: 352 async def load_gen_params(self, params_gen: dict) -> None:
353 """Load general parameters 353 """Load general parameters
354 354
355 @param params_gen: dictionary to fill 355 @param params_gen: dictionary to fill
356 """ 356 """
357 log.debug(_("loading general parameters from database")) 357 log.debug(_("loading general parameters from database"))
359 result = await session.execute(select(ParamGen)) 359 result = await session.execute(select(ParamGen))
360 for p in result.scalars(): 360 for p in result.scalars():
361 params_gen[(p.category, p.name)] = p.value 361 params_gen[(p.category, p.name)] = p.value
362 362
363 @aio 363 @aio
364 async def loadIndParams(self, params_ind: dict, profile: str) -> None: 364 async def load_ind_params(self, params_ind: dict, profile: str) -> None:
365 """Load individual parameters 365 """Load individual parameters
366 366
367 @param params_ind: dictionary to fill 367 @param params_ind: dictionary to fill
368 @param profile: a profile which *must* exist 368 @param profile: a profile which *must* exist
369 """ 369 """
374 ) 374 )
375 for p in result.scalars(): 375 for p in result.scalars():
376 params_ind[(p.category, p.name)] = p.value 376 params_ind[(p.category, p.name)] = p.value
377 377
378 @aio 378 @aio
379 async def getIndParam(self, category: str, name: str, profile: str) -> Optional[str]: 379 async def get_ind_param(self, category: str, name: str, profile: str) -> Optional[str]:
380 """Ask database for the value of one specific individual parameter 380 """Ask database for the value of one specific individual parameter
381 381
382 @param category: category of the parameter 382 @param category: category of the parameter
383 @param name: name of the parameter 383 @param name: name of the parameter
384 @param profile: %(doc_profile)s 384 @param profile: %(doc_profile)s
393 ) 393 )
394 ) 394 )
395 return result.scalar_one_or_none() 395 return result.scalar_one_or_none()
396 396
397 @aio 397 @aio
398 async def getIndParamValues(self, category: str, name: str) -> Dict[str, str]: 398 async def get_ind_param_values(self, category: str, name: str) -> Dict[str, str]:
399 """Ask database for the individual values of a parameter for all profiles 399 """Ask database for the individual values of a parameter for all profiles
400 400
401 @param category: category of the parameter 401 @param category: category of the parameter
402 @param name: name of the parameter 402 @param name: name of the parameter
403 @return dict: profile => value map 403 @return dict: profile => value map
412 .options(subqueryload(ParamInd.profile)) 412 .options(subqueryload(ParamInd.profile))
413 ) 413 )
414 return {param.profile.name: param.value for param in result.scalars()} 414 return {param.profile.name: param.value for param in result.scalars()}
415 415
416 @aio 416 @aio
417 async def setGenParam(self, category: str, name: str, value: Optional[str]) -> None: 417 async def set_gen_param(self, category: str, name: str, value: Optional[str]) -> None:
418 """Save the general parameters in database 418 """Save the general parameters in database
419 419
420 @param category: category of the parameter 420 @param category: category of the parameter
421 @param name: name of the parameter 421 @param name: name of the parameter
422 @param value: value to set 422 @param value: value to set
434 ) 434 )
435 await session.execute(stmt) 435 await session.execute(stmt)
436 await session.commit() 436 await session.commit()
437 437
438 @aio 438 @aio
439 async def setIndParam( 439 async def set_ind_param(
440 self, 440 self,
441 category:str, 441 category:str,
442 name: str, 442 name: str,
443 value: Optional[str], 443 value: Optional[str],
444 profile: str 444 profile: str
487 return History.dest == jid_.userhost() 487 return History.dest == jid_.userhost()
488 else: 488 else:
489 return History.source == jid_.userhost() 489 return History.source == jid_.userhost()
490 490
491 @aio 491 @aio
492 async def historyGet( 492 async def history_get(
493 self, 493 self,
494 from_jid: Optional[jid.JID], 494 from_jid: Optional[jid.JID],
495 to_jid: Optional[jid.JID], 495 to_jid: Optional[jid.JID],
496 limit: Optional[int] = None, 496 limit: Optional[int] = None,
497 between: bool = True, 497 between: bool = True,
507 @param limit: maximum number of messages to get: 507 @param limit: maximum number of messages to get:
508 - 0 for no message (returns the empty list) 508 - 0 for no message (returns the empty list)
509 - None for unlimited 509 - None for unlimited
510 @param between: confound source and dest (ignore the direction) 510 @param between: confound source and dest (ignore the direction)
511 @param filters: pattern to filter the history results 511 @param filters: pattern to filter the history results
512 @return: list of messages as in [messageNew], minus the profile which is already 512 @return: list of messages as in [message_new], minus the profile which is already
513 known. 513 known.
514 """ 514 """
515 # we have to set a default value to profile because it's last argument 515 # we have to set a default value to profile because it's last argument
516 # and thus follow other keyword arguments with default values 516 # and thus follow other keyword arguments with default values
517 # but None should not be used for it 517 # but None should not be used for it
632 result = result.scalars().unique().all() 632 result = result.scalars().unique().all()
633 result.reverse() 633 result.reverse()
634 return [h.as_tuple() for h in result] 634 return [h.as_tuple() for h in result]
635 635
636 @aio 636 @aio
637 async def addToHistory(self, data: dict, profile: str) -> None: 637 async def add_to_history(self, data: dict, profile: str) -> None:
638 """Store a new message in history 638 """Store a new message in history
639 639
640 @param data: message data as build by SatMessageProtocol.onMessage 640 @param data: message data as build by SatMessageProtocol.onMessage
641 """ 641 """
642 extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} 642 extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA}
680 f"Can't store message, unexpected exception (uid: {data['uid']}): {e}" 680 f"Can't store message, unexpected exception (uid: {data['uid']}): {e}"
681 ) 681 )
682 682
683 ## Private values 683 ## Private values
684 684
685 def _getPrivateClass(self, binary, profile): 685 def _get_private_class(self, binary, profile):
686 """Get ORM class to use for private values""" 686 """Get ORM class to use for private values"""
687 if profile is None: 687 if profile is None:
688 return PrivateGenBin if binary else PrivateGen 688 return PrivateGenBin if binary else PrivateGen
689 else: 689 else:
690 return PrivateIndBin if binary else PrivateInd 690 return PrivateIndBin if binary else PrivateInd
691 691
692 692
693 @aio 693 @aio
694 async def getPrivates( 694 async def get_privates(
695 self, 695 self,
696 namespace:str, 696 namespace:str,
697 keys: Optional[Iterable[str]] = None, 697 keys: Optional[Iterable[str]] = None,
698 binary: bool = False, 698 binary: bool = False,
699 profile: Optional[str] = None 699 profile: Optional[str] = None
712 log.debug( 712 log.debug(
713 f"getting {'general' if profile is None else 'individual'}" 713 f"getting {'general' if profile is None else 'individual'}"
714 f"{' binary' if binary else ''} private values from database for namespace " 714 f"{' binary' if binary else ''} private values from database for namespace "
715 f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}" 715 f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}"
716 ) 716 )
717 cls = self._getPrivateClass(binary, profile) 717 cls = self._get_private_class(binary, profile)
718 stmt = select(cls).filter_by(namespace=namespace) 718 stmt = select(cls).filter_by(namespace=namespace)
719 if keys: 719 if keys:
720 stmt = stmt.where(cls.key.in_(list(keys))) 720 stmt = stmt.where(cls.key.in_(list(keys)))
721 if profile is not None: 721 if profile is not None:
722 stmt = stmt.filter_by(profile_id=self.profiles[profile]) 722 stmt = stmt.filter_by(profile_id=self.profiles[profile])
723 async with self.session() as session: 723 async with self.session() as session:
724 result = await session.execute(stmt) 724 result = await session.execute(stmt)
725 return {p.key: p.value for p in result.scalars()} 725 return {p.key: p.value for p in result.scalars()}
726 726
727 @aio 727 @aio
728 async def setPrivateValue( 728 async def set_private_value(
729 self, 729 self,
730 namespace: str, 730 namespace: str,
731 key:str, 731 key:str,
732 value: Any, 732 value: Any,
733 binary: bool = False, 733 binary: bool = False,
741 @param binary: True if it's a binary values 741 @param binary: True if it's a binary values
742 binary values need to be serialised, used for everything but strings 742 binary values need to be serialised, used for everything but strings
743 @param profile: profile to use for individual value 743 @param profile: profile to use for individual value
744 if None, it's a general value 744 if None, it's a general value
745 """ 745 """
746 cls = self._getPrivateClass(binary, profile) 746 cls = self._get_private_class(binary, profile)
747 747
748 values = { 748 values = {
749 "namespace": namespace, 749 "namespace": namespace,
750 "key": key, 750 "key": key,
751 "value": value 751 "value": value
766 ) 766 )
767 ) 767 )
768 await session.commit() 768 await session.commit()
769 769
770 @aio 770 @aio
771 async def delPrivateValue( 771 async def del_private_value(
772 self, 772 self,
773 namespace: str, 773 namespace: str,
774 key: str, 774 key: str,
775 binary: bool = False, 775 binary: bool = False,
776 profile: Optional[str] = None 776 profile: Optional[str] = None
781 @param key: key of the private value 781 @param key: key of the private value
782 @param binary: True if it's a binary values 782 @param binary: True if it's a binary values
783 @param profile: profile to use for individual value 783 @param profile: profile to use for individual value
784 if None, it's a general value 784 if None, it's a general value
785 """ 785 """
786 cls = self._getPrivateClass(binary, profile) 786 cls = self._get_private_class(binary, profile)
787 787
788 stmt = delete(cls).filter_by(namespace=namespace, key=key) 788 stmt = delete(cls).filter_by(namespace=namespace, key=key)
789 789
790 if profile is not None: 790 if profile is not None:
791 stmt = stmt.filter_by(profile_id=self.profiles[profile]) 791 stmt = stmt.filter_by(profile_id=self.profiles[profile])
793 async with self.session() as session: 793 async with self.session() as session:
794 await session.execute(stmt) 794 await session.execute(stmt)
795 await session.commit() 795 await session.commit()
796 796
797 @aio 797 @aio
798 async def delPrivateNamespace( 798 async def del_private_namespace(
799 self, 799 self,
800 namespace: str, 800 namespace: str,
801 binary: bool = False, 801 binary: bool = False,
802 profile: Optional[str] = None 802 profile: Optional[str] = None
803 ) -> None: 803 ) -> None:
804 """Delete all data from a private namespace 804 """Delete all data from a private namespace
805 805
806 Be really cautious when you use this method, as all data with given namespace are 806 Be really cautious when you use this method, as all data with given namespace are
807 removed. 807 removed.
808 Params are the same as for delPrivateValue 808 Params are the same as for del_private_value
809 """ 809 """
810 cls = self._getPrivateClass(binary, profile) 810 cls = self._get_private_class(binary, profile)
811 811
812 stmt = delete(cls).filter_by(namespace=namespace) 812 stmt = delete(cls).filter_by(namespace=namespace)
813 813
814 if profile is not None: 814 if profile is not None:
815 stmt = stmt.filter_by(profile_id=self.profiles[profile]) 815 stmt = stmt.filter_by(profile_id=self.profiles[profile])
819 await session.commit() 819 await session.commit()
820 820
821 ## Files 821 ## Files
822 822
823 @aio 823 @aio
824 async def getFiles( 824 async def get_files(
825 self, 825 self,
826 client: Optional[SatXMPPEntity], 826 client: Optional[SatXMPPEntity],
827 file_id: Optional[str] = None, 827 file_id: Optional[str] = None,
828 version: Optional[str] = '', 828 version: Optional[str] = '',
829 parent: Optional[str] = None, 829 parent: Optional[str] = None,
850 None to ignore 850 None to ignore
851 empty string to look for root files/directories 851 empty string to look for root files/directories
852 @param projection: name of columns to retrieve 852 @param projection: name of columns to retrieve
853 None to retrieve all 853 None to retrieve all
854 @param unique: if True will remove duplicates 854 @param unique: if True will remove duplicates
855 other params are the same as for [setFile] 855 other params are the same as for [set_file]
856 @return: files corresponding to filters 856 @return: files corresponding to filters
857 """ 857 """
858 if projection is None: 858 if projection is None:
859 projection = [ 859 projection = [
860 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', 860 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name',
908 result = await session.execute(stmt) 908 result = await session.execute(stmt)
909 909
910 return [dict(r) for r in result] 910 return [dict(r) for r in result]
911 911
912 @aio 912 @aio
913 async def setFile( 913 async def set_file(
914 self, 914 self,
915 client: SatXMPPEntity, 915 client: SatXMPPEntity,
916 name: str, 916 name: str,
917 file_id: str, 917 file_id: str,
918 version: str = "", 918 version: str = "",
985 extra=extra, 985 extra=extra,
986 profile_id=self.profiles[client.profile] 986 profile_id=self.profiles[client.profile]
987 )) 987 ))
988 988
989 @aio 989 @aio
990 async def fileGetUsedSpace(self, client: SatXMPPEntity, owner: jid.JID) -> int: 990 async def file_get_used_space(self, client: SatXMPPEntity, owner: jid.JID) -> int:
991 async with self.session() as session: 991 async with self.session() as session:
992 result = await session.execute( 992 result = await session.execute(
993 select(sum_(File.size)).filter_by( 993 select(sum_(File.size)).filter_by(
994 owner=owner, 994 owner=owner,
995 type=C.FILE_TYPE_FILE, 995 type=C.FILE_TYPE_FILE,
996 profile_id=self.profiles[client.profile] 996 profile_id=self.profiles[client.profile]
997 )) 997 ))
998 return result.scalar_one_or_none() or 0 998 return result.scalar_one_or_none() or 0
999 999
1000 @aio 1000 @aio
1001 async def fileDelete(self, file_id: str) -> None: 1001 async def file_delete(self, file_id: str) -> None:
1002 """Delete file metadata from the database 1002 """Delete file metadata from the database
1003 1003
1004 @param file_id: id of the file to delete 1004 @param file_id: id of the file to delete
1005 NOTE: file itself must still be removed, this method only handle metadata in 1005 NOTE: file itself must still be removed, this method only handle metadata in
1006 database 1006 database
1008 async with self.session() as session: 1008 async with self.session() as session:
1009 await session.execute(delete(File).filter_by(id=file_id)) 1009 await session.execute(delete(File).filter_by(id=file_id))
1010 await session.commit() 1010 await session.commit()
1011 1011
1012 @aio 1012 @aio
1013 async def fileUpdate( 1013 async def file_update(
1014 self, 1014 self,
1015 file_id: str, 1015 file_id: str,
1016 column: str, 1016 column: str,
1017 update_cb: Callable[[dict], None] 1017 update_cb: Callable[[dict], None]
1018 ) -> None: 1018 ) -> None:
1066 _("Can't update file {file_id} due to race condition") 1066 _("Can't update file {file_id} due to race condition")
1067 .format(file_id=file_id) 1067 .format(file_id=file_id)
1068 ) 1068 )
1069 1069
1070 @aio 1070 @aio
1071 async def getPubsubNode( 1071 async def get_pubsub_node(
1072 self, 1072 self,
1073 client: SatXMPPEntity, 1073 client: SatXMPPEntity,
1074 service: jid.JID, 1074 service: jid.JID,
1075 name: str, 1075 name: str,
1076 with_items: bool = False, 1076 with_items: bool = False,
1083 @param service: service hosting the node 1083 @param service: service hosting the node
1084 @param name: node's name 1084 @param name: node's name
1085 @param with_items: retrieve items in the same query 1085 @param with_items: retrieve items in the same query
1086 @param with_subscriptions: retrieve subscriptions in the same query 1086 @param with_subscriptions: retrieve subscriptions in the same query
1087 @param create: if the node doesn't exist in DB, create it 1087 @param create: if the node doesn't exist in DB, create it
1088 @param create_kwargs: keyword arguments to use with ``setPubsubNode`` if the node 1088 @param create_kwargs: keyword arguments to use with ``set_pubsub_node`` if the node
1089 needs to be created. 1089 needs to be created.
1090 """ 1090 """
1091 async with self.session() as session: 1091 async with self.session() as session:
1092 stmt = ( 1092 stmt = (
1093 select(PubsubNode) 1093 select(PubsubNode)
1110 if ret is None and create: 1110 if ret is None and create:
1111 # we auto-create the node 1111 # we auto-create the node
1112 if create_kwargs is None: 1112 if create_kwargs is None:
1113 create_kwargs = {} 1113 create_kwargs = {}
1114 try: 1114 try:
1115 return await as_future(self.setPubsubNode( 1115 return await as_future(self.set_pubsub_node(
1116 client, service, name, **create_kwargs 1116 client, service, name, **create_kwargs
1117 )) 1117 ))
1118 except IntegrityError as e: 1118 except IntegrityError as e:
1119 if "unique" in str(e.orig).lower(): 1119 if "unique" in str(e.orig).lower():
1120 # the node may already exist, if it has been created just after 1120 # the node may already exist, if it has been created just after
1121 # getPubsubNode above 1121 # get_pubsub_node above
1122 log.debug("ignoring UNIQUE constraint error") 1122 log.debug("ignoring UNIQUE constraint error")
1123 cached_node = await as_future(self.getPubsubNode( 1123 cached_node = await as_future(self.get_pubsub_node(
1124 client, 1124 client,
1125 service, 1125 service,
1126 name, 1126 name,
1127 with_items=with_items, 1127 with_items=with_items,
1128 with_subscriptions=with_subscriptions 1128 with_subscriptions=with_subscriptions
1131 raise e 1131 raise e
1132 else: 1132 else:
1133 return ret 1133 return ret
1134 1134
1135 @aio 1135 @aio
1136 async def setPubsubNode( 1136 async def set_pubsub_node(
1137 self, 1137 self,
1138 client: SatXMPPEntity, 1138 client: SatXMPPEntity,
1139 service: jid.JID, 1139 service: jid.JID,
1140 name: str, 1140 name: str,
1141 analyser: Optional[str] = None, 1141 analyser: Optional[str] = None,
1157 async with session.begin(): 1157 async with session.begin():
1158 session.add(node) 1158 session.add(node)
1159 return node 1159 return node
1160 1160
1161 @aio 1161 @aio
1162 async def updatePubsubNodeSyncState( 1162 async def update_pubsub_node_sync_state(
1163 self, 1163 self,
1164 node: PubsubNode, 1164 node: PubsubNode,
1165 state: SyncState 1165 state: SyncState
1166 ) -> None: 1166 ) -> None:
1167 async with self.session() as session: 1167 async with self.session() as session:
1174 sync_state_updated=time.time(), 1174 sync_state_updated=time.time(),
1175 ) 1175 )
1176 ) 1176 )
1177 1177
1178 @aio 1178 @aio
1179 async def deletePubsubNode( 1179 async def delete_pubsub_node(
1180 self, 1180 self,
1181 profiles: Optional[List[str]], 1181 profiles: Optional[List[str]],
1182 services: Optional[List[jid.JID]], 1182 services: Optional[List[jid.JID]],
1183 names: Optional[List[str]] 1183 names: Optional[List[str]]
1184 ) -> None: 1184 ) -> None:
1205 async with self.session() as session: 1205 async with self.session() as session:
1206 await session.execute(stmt) 1206 await session.execute(stmt)
1207 await session.commit() 1207 await session.commit()
1208 1208
1209 @aio 1209 @aio
1210 async def cachePubsubItems( 1210 async def cache_pubsub_items(
1211 self, 1211 self,
1212 client: SatXMPPEntity, 1212 client: SatXMPPEntity,
1213 node: PubsubNode, 1213 node: PubsubNode,
1214 items: List[domish.Element], 1214 items: List[domish.Element],
1215 parsed_items: Optional[List[dict]] = None, 1215 parsed_items: Optional[List[dict]] = None,
1238 ) 1238 )
1239 await session.execute(stmt) 1239 await session.execute(stmt)
1240 await session.commit() 1240 await session.commit()
1241 1241
1242 @aio 1242 @aio
1243 async def deletePubsubItems( 1243 async def delete_pubsub_items(
1244 self, 1244 self,
1245 node: PubsubNode, 1245 node: PubsubNode,
1246 items_names: Optional[List[str]] = None 1246 items_names: Optional[List[str]] = None
1247 ) -> None: 1247 ) -> None:
1248 """Delete items cached for a node 1248 """Delete items cached for a node
1262 async with self.session() as session: 1262 async with self.session() as session:
1263 await session.execute(stmt) 1263 await session.execute(stmt)
1264 await session.commit() 1264 await session.commit()
1265 1265
1266 @aio 1266 @aio
1267 async def purgePubsubItems( 1267 async def purge_pubsub_items(
1268 self, 1268 self,
1269 services: Optional[List[jid.JID]] = None, 1269 services: Optional[List[jid.JID]] = None,
1270 names: Optional[List[str]] = None, 1270 names: Optional[List[str]] = None,
1271 types: Optional[List[str]] = None, 1271 types: Optional[List[str]] = None,
1272 subtypes: Optional[List[str]] = None, 1272 subtypes: Optional[List[str]] = None,
1311 async with self.session() as session: 1311 async with self.session() as session:
1312 await session.execute(stmt) 1312 await session.execute(stmt)
1313 await session.commit() 1313 await session.commit()
1314 1314
1315 @aio 1315 @aio
1316 async def getItems( 1316 async def get_items(
1317 self, 1317 self,
1318 node: PubsubNode, 1318 node: PubsubNode,
1319 max_items: Optional[int] = None, 1319 max_items: Optional[int] = None,
1320 item_ids: Optional[list[str]] = None, 1320 item_ids: Optional[list[str]] = None,
1321 before: Optional[str] = None, 1321 before: Optional[str] = None,
1350 """ 1350 """
1351 1351
1352 metadata = { 1352 metadata = {
1353 "service": node.service, 1353 "service": node.service,
1354 "node": node.name, 1354 "node": node.name,
1355 "uri": uri.buildXMPPUri( 1355 "uri": uri.build_xmpp_uri(
1356 "pubsub", 1356 "pubsub",
1357 path=node.service.full(), 1357 path=node.service.full(),
1358 node=node.name, 1358 node=node.name,
1359 ), 1359 ),
1360 } 1360 }
1485 result = result.scalars().all() 1485 result = result.scalars().all()
1486 if desc: 1486 if desc:
1487 result.reverse() 1487 result.reverse()
1488 return result, metadata 1488 return result, metadata
1489 1489
1490 def _getSqlitePath( 1490 def _get_sqlite_path(
1491 self, 1491 self,
1492 path: List[Union[str, int]] 1492 path: List[Union[str, int]]
1493 ) -> str: 1493 ) -> str:
1494 """generate path suitable to query JSON element with SQLite""" 1494 """generate path suitable to query JSON element with SQLite"""
1495 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" 1495 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}"
1496 1496
1497 @aio 1497 @aio
1498 async def searchPubsubItems( 1498 async def search_pubsub_items(
1499 self, 1499 self,
1500 query: dict, 1500 query: dict,
1501 ) -> Tuple[List[PubsubItem]]: 1501 ) -> Tuple[List[PubsubItem]]:
1502 """Search for pubsub items in cache 1502 """Search for pubsub items in cache
1503 1503
1624 ) 1624 )
1625 try: 1625 try:
1626 op_attr = OP_MAP[operator] 1626 op_attr = OP_MAP[operator]
1627 except KeyError: 1627 except KeyError:
1628 raise ValueError(f"invalid operator: {operator!r}") 1628 raise ValueError(f"invalid operator: {operator!r}")
1629 sqlite_path = self._getSqlitePath(path) 1629 sqlite_path = self._get_sqlite_path(path)
1630 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): 1630 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
1631 col = literal_column("json_each.value") 1631 col = literal_column("json_each.value")
1632 if operator[0] == "i": 1632 if operator[0] == "i":
1633 col = func.lower(col) 1633 col = func.lower(col)
1634 value = [str(v).lower() for v in value] 1634 value = [str(v).lower() for v in value]
1681 col = literal_column("rank") 1681 col = literal_column("rank")
1682 else: 1682 else:
1683 raise NotImplementedError(f"Unknown {order!r} order") 1683 raise NotImplementedError(f"Unknown {order!r} order")
1684 else: 1684 else:
1685 # we have a JSON path 1685 # we have a JSON path
1686 # sqlite_path = self._getSqlitePath(path) 1686 # sqlite_path = self._get_sqlite_path(path)
1687 col = PubsubItem.parsed[path] 1687 col = PubsubItem.parsed[path]
1688 direction = order_data.get("direction", "ASC").lower() 1688 direction = order_data.get("direction", "ASC").lower()
1689 if not direction in ("asc", "desc"): 1689 if not direction in ("asc", "desc"):
1690 raise ValueError(f"Invalid order-by direction: {direction!r}") 1690 raise ValueError(f"Invalid order-by direction: {direction!r}")
1691 stmt = stmt.order_by(getattr(col, direction)()) 1691 stmt = stmt.order_by(getattr(col, direction)())