Mercurial > libervia-backend
comparison sat/memory/sqla.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 1a77e1f866f9 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
120 # profile id to name | 120 # profile id to name |
121 self.profiles: Dict[str, int] = {} | 121 self.profiles: Dict[str, int] = {} |
122 # profile id to component entry point | 122 # profile id to component entry point |
123 self.components: Dict[int, str] = {} | 123 self.components: Dict[int, str] = {} |
124 | 124 |
125 def getProfileById(self, profile_id): | 125 def get_profile_by_id(self, profile_id): |
126 return self.profiles.get(profile_id) | 126 return self.profiles.get(profile_id) |
127 | 127 |
128 async def migrateApply(self, *args: str, log_output: bool = False) -> None: | 128 async def migrate_apply(self, *args: str, log_output: bool = False) -> None: |
129 """Do a migration command | 129 """Do a migration command |
130 | 130 |
131 Commands are applied by running Alembic in a subprocess. | 131 Commands are applied by running Alembic in a subprocess. |
132 Arguments are alembic executables commands | 132 Arguments are alembic executables commands |
133 | 133 |
165 db_config["path"].parent.mkdir(0o700, True, True) | 165 db_config["path"].parent.mkdir(0o700, True, True) |
166 async with engine.begin() as conn: | 166 async with engine.begin() as conn: |
167 await conn.run_sync(Base.metadata.create_all) | 167 await conn.run_sync(Base.metadata.create_all) |
168 | 168 |
169 log.debug("stamping the database") | 169 log.debug("stamping the database") |
170 await self.migrateApply("stamp", "head") | 170 await self.migrate_apply("stamp", "head") |
171 log.debug("stamping done") | 171 log.debug("stamping done") |
172 | 172 |
173 def _check_db_is_up_to_date(self, conn: Connection) -> bool: | 173 def _check_db_is_up_to_date(self, conn: Connection) -> bool: |
174 al_ini_path = migration_path / "alembic.ini" | 174 al_ini_path = migration_path / "alembic.ini" |
175 al_cfg = al_config.Config(al_ini_path) | 175 al_cfg = al_config.Config(al_ini_path) |
191 if up_to_date: | 191 if up_to_date: |
192 log.debug("Database is up-to-date") | 192 log.debug("Database is up-to-date") |
193 else: | 193 else: |
194 log.info("Database needs to be updated") | 194 log.info("Database needs to be updated") |
195 log.info("updating…") | 195 log.info("updating…") |
196 await self.migrateApply("upgrade", "head", log_output=True) | 196 await self.migrate_apply("upgrade", "head", log_output=True) |
197 log.info("Database is now up-to-date") | 197 log.info("Database is now up-to-date") |
198 | 198 |
199 @aio | 199 @aio |
200 async def initialise(self) -> None: | 200 async def initialise(self) -> None: |
201 log.info(_("Connecting database")) | 201 log.info(_("Connecting database")) |
202 | 202 |
203 db_config = sqla_config.getDbConfig() | 203 db_config = sqla_config.get_db_config() |
204 engine = create_async_engine( | 204 engine = create_async_engine( |
205 db_config["url"], | 205 db_config["url"], |
206 future=True, | 206 future=True, |
207 ) | 207 ) |
208 | 208 |
286 await session.delete(obj) | 286 await session.delete(obj) |
287 await session.commit() | 287 await session.commit() |
288 | 288 |
289 ## Profiles | 289 ## Profiles |
290 | 290 |
291 def getProfilesList(self) -> List[str]: | 291 def get_profiles_list(self) -> List[str]: |
292 """"Return list of all registered profiles""" | 292 """"Return list of all registered profiles""" |
293 return list(self.profiles.keys()) | 293 return list(self.profiles.keys()) |
294 | 294 |
295 def hasProfile(self, profile_name: str) -> bool: | 295 def has_profile(self, profile_name: str) -> bool: |
296 """return True if profile_name exists | 296 """return True if profile_name exists |
297 | 297 |
298 @param profile_name: name of the profile to check | 298 @param profile_name: name of the profile to check |
299 """ | 299 """ |
300 return profile_name in self.profiles | 300 return profile_name in self.profiles |
301 | 301 |
302 def profileIsComponent(self, profile_name: str) -> bool: | 302 def profile_is_component(self, profile_name: str) -> bool: |
303 try: | 303 try: |
304 return self.profiles[profile_name] in self.components | 304 return self.profiles[profile_name] in self.components |
305 except KeyError: | 305 except KeyError: |
306 raise exceptions.NotFound("the requested profile doesn't exists") | 306 raise exceptions.NotFound("the requested profile doesn't exists") |
307 | 307 |
308 def getEntryPoint(self, profile_name: str) -> str: | 308 def get_entry_point(self, profile_name: str) -> str: |
309 try: | 309 try: |
310 return self.components[self.profiles[profile_name]] | 310 return self.components[self.profiles[profile_name]] |
311 except KeyError: | 311 except KeyError: |
312 raise exceptions.NotFound("the requested profile doesn't exists or is not a component") | 312 raise exceptions.NotFound("the requested profile doesn't exists or is not a component") |
313 | 313 |
314 @aio | 314 @aio |
315 async def createProfile(self, name: str, component_ep: Optional[str] = None) -> None: | 315 async def create_profile(self, name: str, component_ep: Optional[str] = None) -> None: |
316 """Create a new profile | 316 """Create a new profile |
317 | 317 |
318 @param name: name of the profile | 318 @param name: name of the profile |
319 @param component: if not None, must point to a component entry point | 319 @param component: if not None, must point to a component entry point |
320 """ | 320 """ |
329 session.add(component) | 329 session.add(component) |
330 self.components[profile.id] = component_ep | 330 self.components[profile.id] = component_ep |
331 return profile | 331 return profile |
332 | 332 |
333 @aio | 333 @aio |
334 async def deleteProfile(self, name: str) -> None: | 334 async def delete_profile(self, name: str) -> None: |
335 """Delete profile | 335 """Delete profile |
336 | 336 |
337 @param name: name of the profile | 337 @param name: name of the profile |
338 """ | 338 """ |
339 async with self.session() as session: | 339 async with self.session() as session: |
347 log.info(_("Profile {name!r} deleted").format(name = name)) | 347 log.info(_("Profile {name!r} deleted").format(name = name)) |
348 | 348 |
349 ## Params | 349 ## Params |
350 | 350 |
351 @aio | 351 @aio |
352 async def loadGenParams(self, params_gen: dict) -> None: | 352 async def load_gen_params(self, params_gen: dict) -> None: |
353 """Load general parameters | 353 """Load general parameters |
354 | 354 |
355 @param params_gen: dictionary to fill | 355 @param params_gen: dictionary to fill |
356 """ | 356 """ |
357 log.debug(_("loading general parameters from database")) | 357 log.debug(_("loading general parameters from database")) |
359 result = await session.execute(select(ParamGen)) | 359 result = await session.execute(select(ParamGen)) |
360 for p in result.scalars(): | 360 for p in result.scalars(): |
361 params_gen[(p.category, p.name)] = p.value | 361 params_gen[(p.category, p.name)] = p.value |
362 | 362 |
363 @aio | 363 @aio |
364 async def loadIndParams(self, params_ind: dict, profile: str) -> None: | 364 async def load_ind_params(self, params_ind: dict, profile: str) -> None: |
365 """Load individual parameters | 365 """Load individual parameters |
366 | 366 |
367 @param params_ind: dictionary to fill | 367 @param params_ind: dictionary to fill |
368 @param profile: a profile which *must* exist | 368 @param profile: a profile which *must* exist |
369 """ | 369 """ |
374 ) | 374 ) |
375 for p in result.scalars(): | 375 for p in result.scalars(): |
376 params_ind[(p.category, p.name)] = p.value | 376 params_ind[(p.category, p.name)] = p.value |
377 | 377 |
378 @aio | 378 @aio |
379 async def getIndParam(self, category: str, name: str, profile: str) -> Optional[str]: | 379 async def get_ind_param(self, category: str, name: str, profile: str) -> Optional[str]: |
380 """Ask database for the value of one specific individual parameter | 380 """Ask database for the value of one specific individual parameter |
381 | 381 |
382 @param category: category of the parameter | 382 @param category: category of the parameter |
383 @param name: name of the parameter | 383 @param name: name of the parameter |
384 @param profile: %(doc_profile)s | 384 @param profile: %(doc_profile)s |
393 ) | 393 ) |
394 ) | 394 ) |
395 return result.scalar_one_or_none() | 395 return result.scalar_one_or_none() |
396 | 396 |
397 @aio | 397 @aio |
398 async def getIndParamValues(self, category: str, name: str) -> Dict[str, str]: | 398 async def get_ind_param_values(self, category: str, name: str) -> Dict[str, str]: |
399 """Ask database for the individual values of a parameter for all profiles | 399 """Ask database for the individual values of a parameter for all profiles |
400 | 400 |
401 @param category: category of the parameter | 401 @param category: category of the parameter |
402 @param name: name of the parameter | 402 @param name: name of the parameter |
403 @return dict: profile => value map | 403 @return dict: profile => value map |
412 .options(subqueryload(ParamInd.profile)) | 412 .options(subqueryload(ParamInd.profile)) |
413 ) | 413 ) |
414 return {param.profile.name: param.value for param in result.scalars()} | 414 return {param.profile.name: param.value for param in result.scalars()} |
415 | 415 |
416 @aio | 416 @aio |
417 async def setGenParam(self, category: str, name: str, value: Optional[str]) -> None: | 417 async def set_gen_param(self, category: str, name: str, value: Optional[str]) -> None: |
418 """Save the general parameters in database | 418 """Save the general parameters in database |
419 | 419 |
420 @param category: category of the parameter | 420 @param category: category of the parameter |
421 @param name: name of the parameter | 421 @param name: name of the parameter |
422 @param value: value to set | 422 @param value: value to set |
434 ) | 434 ) |
435 await session.execute(stmt) | 435 await session.execute(stmt) |
436 await session.commit() | 436 await session.commit() |
437 | 437 |
438 @aio | 438 @aio |
439 async def setIndParam( | 439 async def set_ind_param( |
440 self, | 440 self, |
441 category:str, | 441 category:str, |
442 name: str, | 442 name: str, |
443 value: Optional[str], | 443 value: Optional[str], |
444 profile: str | 444 profile: str |
487 return History.dest == jid_.userhost() | 487 return History.dest == jid_.userhost() |
488 else: | 488 else: |
489 return History.source == jid_.userhost() | 489 return History.source == jid_.userhost() |
490 | 490 |
491 @aio | 491 @aio |
492 async def historyGet( | 492 async def history_get( |
493 self, | 493 self, |
494 from_jid: Optional[jid.JID], | 494 from_jid: Optional[jid.JID], |
495 to_jid: Optional[jid.JID], | 495 to_jid: Optional[jid.JID], |
496 limit: Optional[int] = None, | 496 limit: Optional[int] = None, |
497 between: bool = True, | 497 between: bool = True, |
507 @param limit: maximum number of messages to get: | 507 @param limit: maximum number of messages to get: |
508 - 0 for no message (returns the empty list) | 508 - 0 for no message (returns the empty list) |
509 - None for unlimited | 509 - None for unlimited |
510 @param between: confound source and dest (ignore the direction) | 510 @param between: confound source and dest (ignore the direction) |
511 @param filters: pattern to filter the history results | 511 @param filters: pattern to filter the history results |
512 @return: list of messages as in [messageNew], minus the profile which is already | 512 @return: list of messages as in [message_new], minus the profile which is already |
513 known. | 513 known. |
514 """ | 514 """ |
515 # we have to set a default value to profile because it's last argument | 515 # we have to set a default value to profile because it's last argument |
516 # and thus follow other keyword arguments with default values | 516 # and thus follow other keyword arguments with default values |
517 # but None should not be used for it | 517 # but None should not be used for it |
632 result = result.scalars().unique().all() | 632 result = result.scalars().unique().all() |
633 result.reverse() | 633 result.reverse() |
634 return [h.as_tuple() for h in result] | 634 return [h.as_tuple() for h in result] |
635 | 635 |
636 @aio | 636 @aio |
637 async def addToHistory(self, data: dict, profile: str) -> None: | 637 async def add_to_history(self, data: dict, profile: str) -> None: |
638 """Store a new message in history | 638 """Store a new message in history |
639 | 639 |
640 @param data: message data as build by SatMessageProtocol.onMessage | 640 @param data: message data as build by SatMessageProtocol.onMessage |
641 """ | 641 """ |
642 extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} | 642 extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} |
680 f"Can't store message, unexpected exception (uid: {data['uid']}): {e}" | 680 f"Can't store message, unexpected exception (uid: {data['uid']}): {e}" |
681 ) | 681 ) |
682 | 682 |
683 ## Private values | 683 ## Private values |
684 | 684 |
685 def _getPrivateClass(self, binary, profile): | 685 def _get_private_class(self, binary, profile): |
686 """Get ORM class to use for private values""" | 686 """Get ORM class to use for private values""" |
687 if profile is None: | 687 if profile is None: |
688 return PrivateGenBin if binary else PrivateGen | 688 return PrivateGenBin if binary else PrivateGen |
689 else: | 689 else: |
690 return PrivateIndBin if binary else PrivateInd | 690 return PrivateIndBin if binary else PrivateInd |
691 | 691 |
692 | 692 |
693 @aio | 693 @aio |
694 async def getPrivates( | 694 async def get_privates( |
695 self, | 695 self, |
696 namespace:str, | 696 namespace:str, |
697 keys: Optional[Iterable[str]] = None, | 697 keys: Optional[Iterable[str]] = None, |
698 binary: bool = False, | 698 binary: bool = False, |
699 profile: Optional[str] = None | 699 profile: Optional[str] = None |
712 log.debug( | 712 log.debug( |
713 f"getting {'general' if profile is None else 'individual'}" | 713 f"getting {'general' if profile is None else 'individual'}" |
714 f"{' binary' if binary else ''} private values from database for namespace " | 714 f"{' binary' if binary else ''} private values from database for namespace " |
715 f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}" | 715 f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}" |
716 ) | 716 ) |
717 cls = self._getPrivateClass(binary, profile) | 717 cls = self._get_private_class(binary, profile) |
718 stmt = select(cls).filter_by(namespace=namespace) | 718 stmt = select(cls).filter_by(namespace=namespace) |
719 if keys: | 719 if keys: |
720 stmt = stmt.where(cls.key.in_(list(keys))) | 720 stmt = stmt.where(cls.key.in_(list(keys))) |
721 if profile is not None: | 721 if profile is not None: |
722 stmt = stmt.filter_by(profile_id=self.profiles[profile]) | 722 stmt = stmt.filter_by(profile_id=self.profiles[profile]) |
723 async with self.session() as session: | 723 async with self.session() as session: |
724 result = await session.execute(stmt) | 724 result = await session.execute(stmt) |
725 return {p.key: p.value for p in result.scalars()} | 725 return {p.key: p.value for p in result.scalars()} |
726 | 726 |
727 @aio | 727 @aio |
728 async def setPrivateValue( | 728 async def set_private_value( |
729 self, | 729 self, |
730 namespace: str, | 730 namespace: str, |
731 key:str, | 731 key:str, |
732 value: Any, | 732 value: Any, |
733 binary: bool = False, | 733 binary: bool = False, |
741 @param binary: True if it's a binary values | 741 @param binary: True if it's a binary values |
742 binary values need to be serialised, used for everything but strings | 742 binary values need to be serialised, used for everything but strings |
743 @param profile: profile to use for individual value | 743 @param profile: profile to use for individual value |
744 if None, it's a general value | 744 if None, it's a general value |
745 """ | 745 """ |
746 cls = self._getPrivateClass(binary, profile) | 746 cls = self._get_private_class(binary, profile) |
747 | 747 |
748 values = { | 748 values = { |
749 "namespace": namespace, | 749 "namespace": namespace, |
750 "key": key, | 750 "key": key, |
751 "value": value | 751 "value": value |
766 ) | 766 ) |
767 ) | 767 ) |
768 await session.commit() | 768 await session.commit() |
769 | 769 |
770 @aio | 770 @aio |
771 async def delPrivateValue( | 771 async def del_private_value( |
772 self, | 772 self, |
773 namespace: str, | 773 namespace: str, |
774 key: str, | 774 key: str, |
775 binary: bool = False, | 775 binary: bool = False, |
776 profile: Optional[str] = None | 776 profile: Optional[str] = None |
781 @param key: key of the private value | 781 @param key: key of the private value |
782 @param binary: True if it's a binary values | 782 @param binary: True if it's a binary values |
783 @param profile: profile to use for individual value | 783 @param profile: profile to use for individual value |
784 if None, it's a general value | 784 if None, it's a general value |
785 """ | 785 """ |
786 cls = self._getPrivateClass(binary, profile) | 786 cls = self._get_private_class(binary, profile) |
787 | 787 |
788 stmt = delete(cls).filter_by(namespace=namespace, key=key) | 788 stmt = delete(cls).filter_by(namespace=namespace, key=key) |
789 | 789 |
790 if profile is not None: | 790 if profile is not None: |
791 stmt = stmt.filter_by(profile_id=self.profiles[profile]) | 791 stmt = stmt.filter_by(profile_id=self.profiles[profile]) |
793 async with self.session() as session: | 793 async with self.session() as session: |
794 await session.execute(stmt) | 794 await session.execute(stmt) |
795 await session.commit() | 795 await session.commit() |
796 | 796 |
797 @aio | 797 @aio |
798 async def delPrivateNamespace( | 798 async def del_private_namespace( |
799 self, | 799 self, |
800 namespace: str, | 800 namespace: str, |
801 binary: bool = False, | 801 binary: bool = False, |
802 profile: Optional[str] = None | 802 profile: Optional[str] = None |
803 ) -> None: | 803 ) -> None: |
804 """Delete all data from a private namespace | 804 """Delete all data from a private namespace |
805 | 805 |
806 Be really cautious when you use this method, as all data with given namespace are | 806 Be really cautious when you use this method, as all data with given namespace are |
807 removed. | 807 removed. |
808 Params are the same as for delPrivateValue | 808 Params are the same as for del_private_value |
809 """ | 809 """ |
810 cls = self._getPrivateClass(binary, profile) | 810 cls = self._get_private_class(binary, profile) |
811 | 811 |
812 stmt = delete(cls).filter_by(namespace=namespace) | 812 stmt = delete(cls).filter_by(namespace=namespace) |
813 | 813 |
814 if profile is not None: | 814 if profile is not None: |
815 stmt = stmt.filter_by(profile_id=self.profiles[profile]) | 815 stmt = stmt.filter_by(profile_id=self.profiles[profile]) |
819 await session.commit() | 819 await session.commit() |
820 | 820 |
821 ## Files | 821 ## Files |
822 | 822 |
823 @aio | 823 @aio |
824 async def getFiles( | 824 async def get_files( |
825 self, | 825 self, |
826 client: Optional[SatXMPPEntity], | 826 client: Optional[SatXMPPEntity], |
827 file_id: Optional[str] = None, | 827 file_id: Optional[str] = None, |
828 version: Optional[str] = '', | 828 version: Optional[str] = '', |
829 parent: Optional[str] = None, | 829 parent: Optional[str] = None, |
850 None to ignore | 850 None to ignore |
851 empty string to look for root files/directories | 851 empty string to look for root files/directories |
852 @param projection: name of columns to retrieve | 852 @param projection: name of columns to retrieve |
853 None to retrieve all | 853 None to retrieve all |
854 @param unique: if True will remove duplicates | 854 @param unique: if True will remove duplicates |
855 other params are the same as for [setFile] | 855 other params are the same as for [set_file] |
856 @return: files corresponding to filters | 856 @return: files corresponding to filters |
857 """ | 857 """ |
858 if projection is None: | 858 if projection is None: |
859 projection = [ | 859 projection = [ |
860 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', | 860 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', |
908 result = await session.execute(stmt) | 908 result = await session.execute(stmt) |
909 | 909 |
910 return [dict(r) for r in result] | 910 return [dict(r) for r in result] |
911 | 911 |
912 @aio | 912 @aio |
913 async def setFile( | 913 async def set_file( |
914 self, | 914 self, |
915 client: SatXMPPEntity, | 915 client: SatXMPPEntity, |
916 name: str, | 916 name: str, |
917 file_id: str, | 917 file_id: str, |
918 version: str = "", | 918 version: str = "", |
985 extra=extra, | 985 extra=extra, |
986 profile_id=self.profiles[client.profile] | 986 profile_id=self.profiles[client.profile] |
987 )) | 987 )) |
988 | 988 |
989 @aio | 989 @aio |
990 async def fileGetUsedSpace(self, client: SatXMPPEntity, owner: jid.JID) -> int: | 990 async def file_get_used_space(self, client: SatXMPPEntity, owner: jid.JID) -> int: |
991 async with self.session() as session: | 991 async with self.session() as session: |
992 result = await session.execute( | 992 result = await session.execute( |
993 select(sum_(File.size)).filter_by( | 993 select(sum_(File.size)).filter_by( |
994 owner=owner, | 994 owner=owner, |
995 type=C.FILE_TYPE_FILE, | 995 type=C.FILE_TYPE_FILE, |
996 profile_id=self.profiles[client.profile] | 996 profile_id=self.profiles[client.profile] |
997 )) | 997 )) |
998 return result.scalar_one_or_none() or 0 | 998 return result.scalar_one_or_none() or 0 |
999 | 999 |
1000 @aio | 1000 @aio |
1001 async def fileDelete(self, file_id: str) -> None: | 1001 async def file_delete(self, file_id: str) -> None: |
1002 """Delete file metadata from the database | 1002 """Delete file metadata from the database |
1003 | 1003 |
1004 @param file_id: id of the file to delete | 1004 @param file_id: id of the file to delete |
1005 NOTE: file itself must still be removed, this method only handle metadata in | 1005 NOTE: file itself must still be removed, this method only handle metadata in |
1006 database | 1006 database |
1008 async with self.session() as session: | 1008 async with self.session() as session: |
1009 await session.execute(delete(File).filter_by(id=file_id)) | 1009 await session.execute(delete(File).filter_by(id=file_id)) |
1010 await session.commit() | 1010 await session.commit() |
1011 | 1011 |
1012 @aio | 1012 @aio |
1013 async def fileUpdate( | 1013 async def file_update( |
1014 self, | 1014 self, |
1015 file_id: str, | 1015 file_id: str, |
1016 column: str, | 1016 column: str, |
1017 update_cb: Callable[[dict], None] | 1017 update_cb: Callable[[dict], None] |
1018 ) -> None: | 1018 ) -> None: |
1066 _("Can't update file {file_id} due to race condition") | 1066 _("Can't update file {file_id} due to race condition") |
1067 .format(file_id=file_id) | 1067 .format(file_id=file_id) |
1068 ) | 1068 ) |
1069 | 1069 |
1070 @aio | 1070 @aio |
1071 async def getPubsubNode( | 1071 async def get_pubsub_node( |
1072 self, | 1072 self, |
1073 client: SatXMPPEntity, | 1073 client: SatXMPPEntity, |
1074 service: jid.JID, | 1074 service: jid.JID, |
1075 name: str, | 1075 name: str, |
1076 with_items: bool = False, | 1076 with_items: bool = False, |
1083 @param service: service hosting the node | 1083 @param service: service hosting the node |
1084 @param name: node's name | 1084 @param name: node's name |
1085 @param with_items: retrieve items in the same query | 1085 @param with_items: retrieve items in the same query |
1086 @param with_subscriptions: retrieve subscriptions in the same query | 1086 @param with_subscriptions: retrieve subscriptions in the same query |
1087 @param create: if the node doesn't exist in DB, create it | 1087 @param create: if the node doesn't exist in DB, create it |
1088 @param create_kwargs: keyword arguments to use with ``setPubsubNode`` if the node | 1088 @param create_kwargs: keyword arguments to use with ``set_pubsub_node`` if the node |
1089 needs to be created. | 1089 needs to be created. |
1090 """ | 1090 """ |
1091 async with self.session() as session: | 1091 async with self.session() as session: |
1092 stmt = ( | 1092 stmt = ( |
1093 select(PubsubNode) | 1093 select(PubsubNode) |
1110 if ret is None and create: | 1110 if ret is None and create: |
1111 # we auto-create the node | 1111 # we auto-create the node |
1112 if create_kwargs is None: | 1112 if create_kwargs is None: |
1113 create_kwargs = {} | 1113 create_kwargs = {} |
1114 try: | 1114 try: |
1115 return await as_future(self.setPubsubNode( | 1115 return await as_future(self.set_pubsub_node( |
1116 client, service, name, **create_kwargs | 1116 client, service, name, **create_kwargs |
1117 )) | 1117 )) |
1118 except IntegrityError as e: | 1118 except IntegrityError as e: |
1119 if "unique" in str(e.orig).lower(): | 1119 if "unique" in str(e.orig).lower(): |
1120 # the node may already exist, if it has been created just after | 1120 # the node may already exist, if it has been created just after |
1121 # getPubsubNode above | 1121 # get_pubsub_node above |
1122 log.debug("ignoring UNIQUE constraint error") | 1122 log.debug("ignoring UNIQUE constraint error") |
1123 cached_node = await as_future(self.getPubsubNode( | 1123 cached_node = await as_future(self.get_pubsub_node( |
1124 client, | 1124 client, |
1125 service, | 1125 service, |
1126 name, | 1126 name, |
1127 with_items=with_items, | 1127 with_items=with_items, |
1128 with_subscriptions=with_subscriptions | 1128 with_subscriptions=with_subscriptions |
1131 raise e | 1131 raise e |
1132 else: | 1132 else: |
1133 return ret | 1133 return ret |
1134 | 1134 |
1135 @aio | 1135 @aio |
1136 async def setPubsubNode( | 1136 async def set_pubsub_node( |
1137 self, | 1137 self, |
1138 client: SatXMPPEntity, | 1138 client: SatXMPPEntity, |
1139 service: jid.JID, | 1139 service: jid.JID, |
1140 name: str, | 1140 name: str, |
1141 analyser: Optional[str] = None, | 1141 analyser: Optional[str] = None, |
1157 async with session.begin(): | 1157 async with session.begin(): |
1158 session.add(node) | 1158 session.add(node) |
1159 return node | 1159 return node |
1160 | 1160 |
1161 @aio | 1161 @aio |
1162 async def updatePubsubNodeSyncState( | 1162 async def update_pubsub_node_sync_state( |
1163 self, | 1163 self, |
1164 node: PubsubNode, | 1164 node: PubsubNode, |
1165 state: SyncState | 1165 state: SyncState |
1166 ) -> None: | 1166 ) -> None: |
1167 async with self.session() as session: | 1167 async with self.session() as session: |
1174 sync_state_updated=time.time(), | 1174 sync_state_updated=time.time(), |
1175 ) | 1175 ) |
1176 ) | 1176 ) |
1177 | 1177 |
1178 @aio | 1178 @aio |
1179 async def deletePubsubNode( | 1179 async def delete_pubsub_node( |
1180 self, | 1180 self, |
1181 profiles: Optional[List[str]], | 1181 profiles: Optional[List[str]], |
1182 services: Optional[List[jid.JID]], | 1182 services: Optional[List[jid.JID]], |
1183 names: Optional[List[str]] | 1183 names: Optional[List[str]] |
1184 ) -> None: | 1184 ) -> None: |
1205 async with self.session() as session: | 1205 async with self.session() as session: |
1206 await session.execute(stmt) | 1206 await session.execute(stmt) |
1207 await session.commit() | 1207 await session.commit() |
1208 | 1208 |
1209 @aio | 1209 @aio |
1210 async def cachePubsubItems( | 1210 async def cache_pubsub_items( |
1211 self, | 1211 self, |
1212 client: SatXMPPEntity, | 1212 client: SatXMPPEntity, |
1213 node: PubsubNode, | 1213 node: PubsubNode, |
1214 items: List[domish.Element], | 1214 items: List[domish.Element], |
1215 parsed_items: Optional[List[dict]] = None, | 1215 parsed_items: Optional[List[dict]] = None, |
1238 ) | 1238 ) |
1239 await session.execute(stmt) | 1239 await session.execute(stmt) |
1240 await session.commit() | 1240 await session.commit() |
1241 | 1241 |
1242 @aio | 1242 @aio |
1243 async def deletePubsubItems( | 1243 async def delete_pubsub_items( |
1244 self, | 1244 self, |
1245 node: PubsubNode, | 1245 node: PubsubNode, |
1246 items_names: Optional[List[str]] = None | 1246 items_names: Optional[List[str]] = None |
1247 ) -> None: | 1247 ) -> None: |
1248 """Delete items cached for a node | 1248 """Delete items cached for a node |
1262 async with self.session() as session: | 1262 async with self.session() as session: |
1263 await session.execute(stmt) | 1263 await session.execute(stmt) |
1264 await session.commit() | 1264 await session.commit() |
1265 | 1265 |
1266 @aio | 1266 @aio |
1267 async def purgePubsubItems( | 1267 async def purge_pubsub_items( |
1268 self, | 1268 self, |
1269 services: Optional[List[jid.JID]] = None, | 1269 services: Optional[List[jid.JID]] = None, |
1270 names: Optional[List[str]] = None, | 1270 names: Optional[List[str]] = None, |
1271 types: Optional[List[str]] = None, | 1271 types: Optional[List[str]] = None, |
1272 subtypes: Optional[List[str]] = None, | 1272 subtypes: Optional[List[str]] = None, |
1311 async with self.session() as session: | 1311 async with self.session() as session: |
1312 await session.execute(stmt) | 1312 await session.execute(stmt) |
1313 await session.commit() | 1313 await session.commit() |
1314 | 1314 |
1315 @aio | 1315 @aio |
1316 async def getItems( | 1316 async def get_items( |
1317 self, | 1317 self, |
1318 node: PubsubNode, | 1318 node: PubsubNode, |
1319 max_items: Optional[int] = None, | 1319 max_items: Optional[int] = None, |
1320 item_ids: Optional[list[str]] = None, | 1320 item_ids: Optional[list[str]] = None, |
1321 before: Optional[str] = None, | 1321 before: Optional[str] = None, |
1350 """ | 1350 """ |
1351 | 1351 |
1352 metadata = { | 1352 metadata = { |
1353 "service": node.service, | 1353 "service": node.service, |
1354 "node": node.name, | 1354 "node": node.name, |
1355 "uri": uri.buildXMPPUri( | 1355 "uri": uri.build_xmpp_uri( |
1356 "pubsub", | 1356 "pubsub", |
1357 path=node.service.full(), | 1357 path=node.service.full(), |
1358 node=node.name, | 1358 node=node.name, |
1359 ), | 1359 ), |
1360 } | 1360 } |
1485 result = result.scalars().all() | 1485 result = result.scalars().all() |
1486 if desc: | 1486 if desc: |
1487 result.reverse() | 1487 result.reverse() |
1488 return result, metadata | 1488 return result, metadata |
1489 | 1489 |
1490 def _getSqlitePath( | 1490 def _get_sqlite_path( |
1491 self, | 1491 self, |
1492 path: List[Union[str, int]] | 1492 path: List[Union[str, int]] |
1493 ) -> str: | 1493 ) -> str: |
1494 """generate path suitable to query JSON element with SQLite""" | 1494 """generate path suitable to query JSON element with SQLite""" |
1495 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" | 1495 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" |
1496 | 1496 |
1497 @aio | 1497 @aio |
1498 async def searchPubsubItems( | 1498 async def search_pubsub_items( |
1499 self, | 1499 self, |
1500 query: dict, | 1500 query: dict, |
1501 ) -> Tuple[List[PubsubItem]]: | 1501 ) -> Tuple[List[PubsubItem]]: |
1502 """Search for pubsub items in cache | 1502 """Search for pubsub items in cache |
1503 | 1503 |
1624 ) | 1624 ) |
1625 try: | 1625 try: |
1626 op_attr = OP_MAP[operator] | 1626 op_attr = OP_MAP[operator] |
1627 except KeyError: | 1627 except KeyError: |
1628 raise ValueError(f"invalid operator: {operator!r}") | 1628 raise ValueError(f"invalid operator: {operator!r}") |
1629 sqlite_path = self._getSqlitePath(path) | 1629 sqlite_path = self._get_sqlite_path(path) |
1630 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): | 1630 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): |
1631 col = literal_column("json_each.value") | 1631 col = literal_column("json_each.value") |
1632 if operator[0] == "i": | 1632 if operator[0] == "i": |
1633 col = func.lower(col) | 1633 col = func.lower(col) |
1634 value = [str(v).lower() for v in value] | 1634 value = [str(v).lower() for v in value] |
1681 col = literal_column("rank") | 1681 col = literal_column("rank") |
1682 else: | 1682 else: |
1683 raise NotImplementedError(f"Unknown {order!r} order") | 1683 raise NotImplementedError(f"Unknown {order!r} order") |
1684 else: | 1684 else: |
1685 # we have a JSON path | 1685 # we have a JSON path |
1686 # sqlite_path = self._getSqlitePath(path) | 1686 # sqlite_path = self._get_sqlite_path(path) |
1687 col = PubsubItem.parsed[path] | 1687 col = PubsubItem.parsed[path] |
1688 direction = order_data.get("direction", "ASC").lower() | 1688 direction = order_data.get("direction", "ASC").lower() |
1689 if not direction in ("asc", "desc"): | 1689 if not direction in ("asc", "desc"): |
1690 raise ValueError(f"Invalid order-by direction: {direction!r}") | 1690 raise ValueError(f"Invalid order-by direction: {direction!r}") |
1691 stmt = stmt.order_by(getattr(col, direction)()) | 1691 stmt = stmt.order_by(getattr(col, direction)()) |