comparison sat/memory/sqla.py @ 3664:9ae6ec74face

memory (sqla): implement `searchPubsubItems`: `searchPubsubItems` is a high level method to handle Full-Text Search queries on Pubsub cache. rel 361
author Goffi <goffi@goffi.org>
date Wed, 08 Sep 2021 17:58:48 +0200
parents 257135d5c5c2
children 72b0e4053ab0
comparison
equal deleted inserted replaced
3663:162866ca4be7 3664:9ae6ec74face
20 import time 20 import time
21 import asyncio 21 import asyncio
22 from datetime import datetime 22 from datetime import datetime
23 from asyncio.subprocess import PIPE 23 from asyncio.subprocess import PIPE
24 from pathlib import Path 24 from pathlib import Path
25 from typing import Dict, List, Tuple, Iterable, Any, Callable, Optional 25 from typing import Union, Dict, List, Tuple, Iterable, Any, Callable, Optional
26 from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine 26 from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine, create_async_engine
27 from sqlalchemy.exc import IntegrityError, NoResultFound 27 from sqlalchemy.exc import IntegrityError, NoResultFound
28 from sqlalchemy.orm import ( 28 from sqlalchemy.orm import (
29 sessionmaker, subqueryload, joinedload, contains_eager # , aliased 29 sessionmaker, subqueryload, joinedload, selectinload, contains_eager
30 ) 30 )
31 from sqlalchemy.orm.decl_api import DeclarativeMeta 31 from sqlalchemy.orm.decl_api import DeclarativeMeta
32 from sqlalchemy.future import select 32 from sqlalchemy.future import select
33 from sqlalchemy.engine import Engine, Connection 33 from sqlalchemy.engine import Engine, Connection
34 from sqlalchemy import update, delete, and_, or_, event, func 34 from sqlalchemy import update, delete, and_, or_, event, func
35 from sqlalchemy.sql.functions import coalesce, sum as sum_, now, count 35 from sqlalchemy.sql.functions import coalesce, sum as sum_, now, count
36 from sqlalchemy.dialects.sqlite import insert 36 from sqlalchemy.dialects.sqlite import insert
37 from sqlalchemy import text, literal_column, Integer
37 from alembic import script as al_script, config as al_config 38 from alembic import script as al_script, config as al_config
38 from alembic.runtime import migration as al_migration 39 from alembic.runtime import migration as al_migration
39 from twisted.internet import defer 40 from twisted.internet import defer
40 from twisted.words.protocols.jabber import jid 41 from twisted.words.protocols.jabber import jid
41 from twisted.words.xish import domish 42 from twisted.words.xish import domish
70 ) 71 )
71 72
72 73
73 log = getLogger(__name__) 74 log = getLogger(__name__)
74 migration_path = Path(migration.__file__).parent 75 migration_path = Path(migration.__file__).parent
76 #: mapping of Libervia search query operators to SQLAlchemy method name
77 OP_MAP = {
78 "==": "__eq__",
79 "eq": "__eq__",
80 "!=": "__ne__",
81 "ne": "__ne__",
82 ">": "__gt__",
83 "gt": "__gt__",
84 "<": "__le__",
85 "le": "__le__",
86 "between": "between",
87 "in": "in_",
88 "not_in": "not_in",
89 "overlap": "in_",
90 "ioverlap": "in_",
91 "disjoint": "in_",
92 "idisjoint": "in_",
93 "like": "like",
94 "ilike": "ilike",
95 "not_like": "notlike",
96 "not_ilike": "notilike",
97 }
75 98
76 99
77 @event.listens_for(Engine, "connect") 100 @event.listens_for(Engine, "connect")
78 def set_sqlite_pragma(dbapi_connection, connection_record): 101 def set_sqlite_pragma(dbapi_connection, connection_record):
79 cursor = dbapi_connection.cursor() 102 cursor = dbapi_connection.cursor()
1353 1376
1354 result = result.scalars().all() 1377 result = result.scalars().all()
1355 if desc: 1378 if desc:
1356 result.reverse() 1379 result.reverse()
1357 return result, metadata 1380 return result, metadata
1381
1382 def _getSqlitePath(
1383 self,
1384 path: List[Union[str, int]]
1385 ) -> str:
1386 """generate path suitable to query JSON element with SQLite"""
1387 return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}"
1388
1389 @aio
1390 async def searchPubsubItems(
1391 self,
1392 query: dict,
1393 ) -> Tuple[List[PubsubItem]]:
1394 """Search for pubsub items in cache
1395
1396 @param query: search terms. Keys can be:
1397 :fts (str):
1398 Full-Text Search query. Currently SQLite FT5 engine is used, its query
1399 syntax can be used, see `FTS5 Query documentation
1400 <https://sqlite.org/fts5.html#full_text_query_syntax>`_
1401 :profiles (list[str]):
1402 filter on nodes linked to those profiles
1403 :nodes (list[str]):
1404 filter on nodes with those names
1405 :services (list[jid.JID]):
1406 filter on nodes from those services
1407 :types (list[str|None]):
1408 filter on nodes with those types. None can be used to filter on nodes with
1409 no type set
1410 :subtypes (list[str|None]):
1411 filter on nodes with those subtypes. None can be used to filter on nodes with
1412 no subtype set
1413 :parsed (list[dict]):
1414 Filter on a parsed data field. The dict must contain 3 keys: ``path``
1415 which is a list of str or int giving the path to the field of interest
1416 (str for a dict key, int for a list index), ``operator`` with indicate the
1417 operator to use to check the condition, and ``value`` which depends of
1418 field type and operator.
1419
1420 See documentation for details on operators (it's currently explained at
1421 ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command
1422 documentation).
1423
1424 :order-by (list[dict]):
1425 Indicates how to order results. The dict can contain either a ``order``
1426 for a well-know order or a ``path`` for a parsed data field path
1427 (``order`` and ``path`` can't be used at the same time), an an optional
1428 ``direction`` which can be ``asc`` or ``desc``. See documentation for
1429 details on well-known orders (it's currently explained at
1430 ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command
1431 documentation).
1432
1433 :index (int):
1434 starting index of items to return from the query result. It's translated
1435 to SQL's OFFSET
1436
1437 :limit (int):
1438 maximum number of items to return. It's translated to SQL's LIMIT.
1439
1440 @result: found items (the ``node`` attribute will be filled with suitable
1441 PubsubNode)
1442 """
1443 # TODO: FTS and parsed data filters use SQLite specific syntax
1444 # when other DB engines will be used, this will have to be adapted
1445 stmt = select(PubsubItem)
1446
1447 # Full-Text Search
1448 fts = query.get("fts")
1449 if fts:
1450 fts_select = text(
1451 "SELECT rowid, rank FROM pubsub_items_fts(:fts_query)"
1452 ).bindparams(fts_query=fts).columns(rowid=Integer).subquery()
1453 stmt = (
1454 stmt
1455 .select_from(fts_select)
1456 .outerjoin(PubsubItem, fts_select.c.rowid == PubsubItem.id)
1457 )
1458
1459 # node related filters
1460 profiles = query.get("profiles")
1461 if (profiles
1462 or any(query.get(k) for k in ("nodes", "services", "types", "subtypes"))
1463 ):
1464 stmt = stmt.join(PubsubNode).options(contains_eager(PubsubItem.node))
1465 if profiles:
1466 try:
1467 stmt = stmt.where(
1468 PubsubNode.profile_id.in_(self.profiles[p] for p in profiles)
1469 )
1470 except KeyError as e:
1471 raise exceptions.ProfileUnknownError(
1472 f"This profile doesn't exist: {e.args[0]!r}"
1473 )
1474 for key, attr in (
1475 ("nodes", "name"),
1476 ("services", "service"),
1477 ("types", "type_"),
1478 ("subtypes", "subtype")
1479 ):
1480 value = query.get(key)
1481 if not value:
1482 continue
1483 if key in ("types", "subtypes") and None in value:
1484 # NULL can't be used with SQL's IN, so we have to add a condition with
1485 # IS NULL, and use a OR if there are other values to check
1486 value.remove(None)
1487 condition = getattr(PubsubNode, attr).is_(None)
1488 if value:
1489 condition = or_(
1490 getattr(PubsubNode, attr).in_(value),
1491 condition
1492 )
1493 else:
1494 condition = getattr(PubsubNode, attr).in_(value)
1495 stmt = stmt.where(condition)
1496 else:
1497 stmt = stmt.options(selectinload(PubsubItem.node))
1498
1499 # parsed data filters
1500 parsed = query.get("parsed", [])
1501 for filter_ in parsed:
1502 try:
1503 path = filter_["path"]
1504 operator = filter_["op"]
1505 value = filter_["value"]
1506 except KeyError as e:
1507 raise ValueError(
1508 f'missing mandatory key {e.args[0]!r} in "parsed" filter'
1509 )
1510 try:
1511 op_attr = OP_MAP[operator]
1512 except KeyError:
1513 raise ValueError(f"invalid operator: {operator!r}")
1514 sqlite_path = self._getSqlitePath(path)
1515 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
1516 col = literal_column("json_each.value")
1517 if operator[0] == "i":
1518 col = func.lower(col)
1519 value = [str(v).lower() for v in value]
1520 condition = (
1521 select(1)
1522 .select_from(func.json_each(PubsubItem.parsed, sqlite_path))
1523 .where(col.in_(value))
1524 ).scalar_subquery()
1525 if operator in ("disjoint", "idisjoint"):
1526 condition = condition.is_(None)
1527 stmt = stmt.where(condition)
1528 elif operator == "between":
1529 try:
1530 left, right = value
1531 except (ValueError, TypeError):
1532 raise ValueError(_(
1533 "invalid value for \"between\" filter, you must use a 2 items "
1534 "array: {value!r}"
1535 ).format(value=value))
1536 col = func.json_extract(PubsubItem.parsed, sqlite_path)
1537 stmt = stmt.where(col.between(left, right))
1538 else:
1539 # we use func.json_extract instead of generic JSON way because SQLAlchemy
1540 # add a JSON_QUOTE to the value, and we want SQL value
1541 col = func.json_extract(PubsubItem.parsed, sqlite_path)
1542 stmt = stmt.where(getattr(col, op_attr)(value))
1543
1544 # order
1545 order_by = query.get("order-by") or [{"order": "creation"}]
1546
1547 for order_data in order_by:
1548 order, path = order_data.get("order"), order_data.get("path")
1549 if order and path:
1550 raise ValueError(_(
1551 '"order" and "path" can\'t be used at the same time in '
1552 '"order-by" data'
1553 ))
1554 if order:
1555 if order == "creation":
1556 col = PubsubItem.id
1557 elif order == "modification":
1558 col = PubsubItem.updated
1559 elif order == "item_id":
1560 col = PubsubItem.name
1561 elif order == "rank":
1562 if not fts:
1563 raise ValueError(
1564 "'rank' order can only be used with Full-Text Search (fts)"
1565 )
1566 col = literal_column("rank")
1567 else:
1568 raise NotImplementedError(f"Unknown {order!r} order")
1569 else:
1570 # we have a JSON path
1571 # sqlite_path = self._getSqlitePath(path)
1572 col = PubsubItem.parsed[path]
1573 direction = order_data.get("direction", "ASC").lower()
1574 if not direction in ("asc", "desc"):
1575 raise ValueError(f"Invalid order-by direction: {direction!r}")
1576 stmt = stmt.order_by(getattr(col, direction)())
1577
1578 # offset, limit
1579 index = query.get("index")
1580 if index:
1581 stmt = stmt.offset(index)
1582 limit = query.get("limit")
1583 if limit:
1584 stmt = stmt.limit(limit)
1585
1586 async with self.session() as session:
1587 result = await session.execute(stmt)
1588
1589 return result.scalars().all()