Mercurial > libervia-backend
comparison sat/memory/sqla.py @ 3664:9ae6ec74face
memory (sqla): implement `searchPubsubItems`:
`searchPubsubItems` is a high level method to handle Full-Text Search queries on Pubsub
cache.
rel 361
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 08 Sep 2021 17:58:48 +0200 |
parents | 257135d5c5c2 |
children | 72b0e4053ab0 |
comparison
equal
deleted
inserted
replaced
3663:162866ca4be7 | 3664:9ae6ec74face |
---|---|
20 import time | 20 import time |
21 import asyncio | 21 import asyncio |
22 from datetime import datetime | 22 from datetime import datetime |
23 from asyncio.subprocess import PIPE | 23 from asyncio.subprocess import PIPE |
24 from pathlib import Path | 24 from pathlib import Path |
25 from typing import Dict, List, Tuple, Iterable, Any, Callable, Optional | 25 from typing import Union, Dict, List, Tuple, Iterable, Any, Callable, Optional |
26 from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine | 26 from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine |
27 from sqlalchemy.exc import IntegrityError, NoResultFound | 27 from sqlalchemy.exc import IntegrityError, NoResultFound |
28 from sqlalchemy.orm import ( | 28 from sqlalchemy.orm import ( |
29 sessionmaker, subqueryload, joinedload, contains_eager # , aliased | 29 sessionmaker, subqueryload, joinedload, selectinload, contains_eager |
30 ) | 30 ) |
31 from sqlalchemy.orm.decl_api import DeclarativeMeta | 31 from sqlalchemy.orm.decl_api import DeclarativeMeta |
32 from sqlalchemy.future import select | 32 from sqlalchemy.future import select |
33 from sqlalchemy.engine import Engine, Connection | 33 from sqlalchemy.engine import Engine, Connection |
34 from sqlalchemy import update, delete, and_, or_, event, func | 34 from sqlalchemy import update, delete, and_, or_, event, func |
35 from sqlalchemy.sql.functions import coalesce, sum as sum_, now, count | 35 from sqlalchemy.sql.functions import coalesce, sum as sum_, now, count |
36 from sqlalchemy.dialects.sqlite import insert | 36 from sqlalchemy.dialects.sqlite import insert |
37 from sqlalchemy import text, literal_column, Integer | |
37 from alembic import script as al_script, config as al_config | 38 from alembic import script as al_script, config as al_config |
38 from alembic.runtime import migration as al_migration | 39 from alembic.runtime import migration as al_migration |
39 from twisted.internet import defer | 40 from twisted.internet import defer |
40 from twisted.words.protocols.jabber import jid | 41 from twisted.words.protocols.jabber import jid |
41 from twisted.words.xish import domish | 42 from twisted.words.xish import domish |
70 ) | 71 ) |
71 | 72 |
72 | 73 |
73 log = getLogger(__name__) | 74 log = getLogger(__name__) |
74 migration_path = Path(migration.__file__).parent | 75 migration_path = Path(migration.__file__).parent |
76 #: mapping of Libervia search query operators to SQLAlchemy method name | |
77 OP_MAP = { | |
78 "==": "__eq__", | |
79 "eq": "__eq__", | |
80 "!=": "__ne__", | |
81 "ne": "__ne__", | |
82 ">": "__gt__", | |
83 "gt": "__gt__", | |
84 "<": "__le__", | |
85 "le": "__le__", | |
86 "between": "between", | |
87 "in": "in_", | |
88 "not_in": "not_in", | |
89 "overlap": "in_", | |
90 "ioverlap": "in_", | |
91 "disjoint": "in_", | |
92 "idisjoint": "in_", | |
93 "like": "like", | |
94 "ilike": "ilike", | |
95 "not_like": "notlike", | |
96 "not_ilike": "notilike", | |
97 } | |
75 | 98 |
76 | 99 |
77 @event.listens_for(Engine, "connect") | 100 @event.listens_for(Engine, "connect") |
78 def set_sqlite_pragma(dbapi_connection, connection_record): | 101 def set_sqlite_pragma(dbapi_connection, connection_record): |
79 cursor = dbapi_connection.cursor() | 102 cursor = dbapi_connection.cursor() |
1353 | 1376 |
1354 result = result.scalars().all() | 1377 result = result.scalars().all() |
1355 if desc: | 1378 if desc: |
1356 result.reverse() | 1379 result.reverse() |
1357 return result, metadata | 1380 return result, metadata |
1381 | |
1382 def _getSqlitePath( | |
1383 self, | |
1384 path: List[Union[str, int]] | |
1385 ) -> str: | |
1386 """generate path suitable to query JSON element with SQLite""" | |
1387 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" | |
1388 | |
1389 @aio | |
1390 async def searchPubsubItems( | |
1391 self, | |
1392 query: dict, | |
1393 ) -> Tuple[List[PubsubItem]]: | |
1394 """Search for pubsub items in cache | |
1395 | |
1396 @param query: search terms. Keys can be: | |
1397 :fts (str): | |
1398 Full-Text Search query. Currently SQLite FT5 engine is used, its query | |
1399 syntax can be used, see `FTS5 Query documentation | |
1400 <https://sqlite.org/fts5.html#full_text_query_syntax>`_ | |
1401 :profiles (list[str]): | |
1402 filter on nodes linked to those profiles | |
1403 :nodes (list[str]): | |
1404 filter on nodes with those names | |
1405 :services (list[jid.JID]): | |
1406 filter on nodes from those services | |
1407 :types (list[str|None]): | |
1408 filter on nodes with those types. None can be used to filter on nodes with | |
1409 no type set | |
1410 :subtypes (list[str|None]): | |
1411 filter on nodes with those subtypes. None can be used to filter on nodes with | |
1412 no subtype set | |
1413 :parsed (list[dict]): | |
1414 Filter on a parsed data field. The dict must contain 3 keys: ``path`` | |
1415 which is a list of str or int giving the path to the field of interest | |
1416 (str for a dict key, int for a list index), ``operator`` with indicate the | |
1417 operator to use to check the condition, and ``value`` which depends of | |
1418 field type and operator. | |
1419 | |
1420 See documentation for details on operators (it's currently explained at | |
1421 ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command | |
1422 documentation). | |
1423 | |
1424 :order-by (list[dict]): | |
1425 Indicates how to order results. The dict can contain either a ``order`` | |
1426 for a well-know order or a ``path`` for a parsed data field path | |
1427 (``order`` and ``path`` can't be used at the same time), an an optional | |
1428 ``direction`` which can be ``asc`` or ``desc``. See documentation for | |
1429 details on well-known orders (it's currently explained at | |
1430 ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command | |
1431 documentation). | |
1432 | |
1433 :index (int): | |
1434 starting index of items to return from the query result. It's translated | |
1435 to SQL's OFFSET | |
1436 | |
1437 :limit (int): | |
1438 maximum number of items to return. It's translated to SQL's LIMIT. | |
1439 | |
1440 @result: found items (the ``node`` attribute will be filled with suitable | |
1441 PubsubNode) | |
1442 """ | |
1443 # TODO: FTS and parsed data filters use SQLite specific syntax | |
1444 # when other DB engines will be used, this will have to be adapted | |
1445 stmt = select(PubsubItem) | |
1446 | |
1447 # Full-Text Search | |
1448 fts = query.get("fts") | |
1449 if fts: | |
1450 fts_select = text( | |
1451 "SELECT rowid, rank FROM pubsub_items_fts(:fts_query)" | |
1452 ).bindparams(fts_query=fts).columns(rowid=Integer).subquery() | |
1453 stmt = ( | |
1454 stmt | |
1455 .select_from(fts_select) | |
1456 .outerjoin(PubsubItem, fts_select.c.rowid == PubsubItem.id) | |
1457 ) | |
1458 | |
1459 # node related filters | |
1460 profiles = query.get("profiles") | |
1461 if (profiles | |
1462 or any(query.get(k) for k in ("nodes", "services", "types", "subtypes")) | |
1463 ): | |
1464 stmt = stmt.join(PubsubNode).options(contains_eager(PubsubItem.node)) | |
1465 if profiles: | |
1466 try: | |
1467 stmt = stmt.where( | |
1468 PubsubNode.profile_id.in_(self.profiles[p] for p in profiles) | |
1469 ) | |
1470 except KeyError as e: | |
1471 raise exceptions.ProfileUnknownError( | |
1472 f"This profile doesn't exist: {e.args[0]!r}" | |
1473 ) | |
1474 for key, attr in ( | |
1475 ("nodes", "name"), | |
1476 ("services", "service"), | |
1477 ("types", "type_"), | |
1478 ("subtypes", "subtype") | |
1479 ): | |
1480 value = query.get(key) | |
1481 if not value: | |
1482 continue | |
1483 if key in ("types", "subtypes") and None in value: | |
1484 # NULL can't be used with SQL's IN, so we have to add a condition with | |
1485 # IS NULL, and use a OR if there are other values to check | |
1486 value.remove(None) | |
1487 condition = getattr(PubsubNode, attr).is_(None) | |
1488 if value: | |
1489 condition = or_( | |
1490 getattr(PubsubNode, attr).in_(value), | |
1491 condition | |
1492 ) | |
1493 else: | |
1494 condition = getattr(PubsubNode, attr).in_(value) | |
1495 stmt = stmt.where(condition) | |
1496 else: | |
1497 stmt = stmt.options(selectinload(PubsubItem.node)) | |
1498 | |
1499 # parsed data filters | |
1500 parsed = query.get("parsed", []) | |
1501 for filter_ in parsed: | |
1502 try: | |
1503 path = filter_["path"] | |
1504 operator = filter_["op"] | |
1505 value = filter_["value"] | |
1506 except KeyError as e: | |
1507 raise ValueError( | |
1508 f'missing mandatory key {e.args[0]!r} in "parsed" filter' | |
1509 ) | |
1510 try: | |
1511 op_attr = OP_MAP[operator] | |
1512 except KeyError: | |
1513 raise ValueError(f"invalid operator: {operator!r}") | |
1514 sqlite_path = self._getSqlitePath(path) | |
1515 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): | |
1516 col = literal_column("json_each.value") | |
1517 if operator[0] == "i": | |
1518 col = func.lower(col) | |
1519 value = [str(v).lower() for v in value] | |
1520 condition = ( | |
1521 select(1) | |
1522 .select_from(func.json_each(PubsubItem.parsed, sqlite_path)) | |
1523 .where(col.in_(value)) | |
1524 ).scalar_subquery() | |
1525 if operator in ("disjoint", "idisjoint"): | |
1526 condition = condition.is_(None) | |
1527 stmt = stmt.where(condition) | |
1528 elif operator == "between": | |
1529 try: | |
1530 left, right = value | |
1531 except (ValueError, TypeError): | |
1532 raise ValueError(_( | |
1533 "invalid value for \"between\" filter, you must use a 2 items " | |
1534 "array: {value!r}" | |
1535 ).format(value=value)) | |
1536 col = func.json_extract(PubsubItem.parsed, sqlite_path) | |
1537 stmt = stmt.where(col.between(left, right)) | |
1538 else: | |
1539 # we use func.json_extract instead of generic JSON way because SQLAlchemy | |
1540 # add a JSON_QUOTE to the value, and we want SQL value | |
1541 col = func.json_extract(PubsubItem.parsed, sqlite_path) | |
1542 stmt = stmt.where(getattr(col, op_attr)(value)) | |
1543 | |
1544 # order | |
1545 order_by = query.get("order-by") or [{"order": "creation"}] | |
1546 | |
1547 for order_data in order_by: | |
1548 order, path = order_data.get("order"), order_data.get("path") | |
1549 if order and path: | |
1550 raise ValueError(_( | |
1551 '"order" and "path" can\'t be used at the same time in ' | |
1552 '"order-by" data' | |
1553 )) | |
1554 if order: | |
1555 if order == "creation": | |
1556 col = PubsubItem.id | |
1557 elif order == "modification": | |
1558 col = PubsubItem.updated | |
1559 elif order == "item_id": | |
1560 col = PubsubItem.name | |
1561 elif order == "rank": | |
1562 if not fts: | |
1563 raise ValueError( | |
1564 "'rank' order can only be used with Full-Text Search (fts)" | |
1565 ) | |
1566 col = literal_column("rank") | |
1567 else: | |
1568 raise NotImplementedError(f"Unknown {order!r} order") | |
1569 else: | |
1570 # we have a JSON path | |
1571 # sqlite_path = self._getSqlitePath(path) | |
1572 col = PubsubItem.parsed[path] | |
1573 direction = order_data.get("direction", "ASC").lower() | |
1574 if not direction in ("asc", "desc"): | |
1575 raise ValueError(f"Invalid order-by direction: {direction!r}") | |
1576 stmt = stmt.order_by(getattr(col, direction)()) | |
1577 | |
1578 # offset, limit | |
1579 index = query.get("index") | |
1580 if index: | |
1581 stmt = stmt.offset(index) | |
1582 limit = query.get("limit") | |
1583 if limit: | |
1584 stmt = stmt.limit(limit) | |
1585 | |
1586 async with self.session() as session: | |
1587 result = await session.execute(stmt) | |
1588 | |
1589 return result.scalars().all() |