Mercurial > libervia-backend
view libervia/backend/memory/migration/env.py @ 4314:6a70fcd93a7a
plugin XEP-0131: Stanza Headers and Internet Metadata implementation:
- SHIM is now supported and put in `msg_data["extra"]["headers"]`.
- `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]`
field (if present in headers on message sending, values are merged).
- Python minimal version upgraded to 3.11 due to use of `StrEnum`.
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:56:04 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
import asyncio from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.ext.asyncio import create_async_engine from alembic import context from libervia.backend.memory import sqla_config from libervia.backend.memory.sqla_mapping import Base # this is the Alembic Config object, which provides # access to the values within the .ini file in use. config = context.config # Interpret the config file for Python logging. # This line sets up loggers basically. fileConfig(config.config_file_name) # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata target_metadata = Base.metadata # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") # ... etc. def run_migrations_offline(): """Run migrations in 'offline' mode. This configures the context with just a URL and not an Engine, though an Engine is acceptable here as well. By skipping the Engine creation we don't even need a DBAPI to be available. Calls to context.execute() here emit the given string to the script output. """ db_config = sqla_config.get_db_config() context.configure( url=db_config["url"], target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def include_name(name, type_, parent_names): if type_ == "table": if name.startswith("pubsub_items_fts"): return False return True def do_run_migrations(connection): context.configure( connection=connection, target_metadata=target_metadata, render_as_batch=True, include_name=include_name, ) with context.begin_transaction(): context.run_migrations() async def run_migrations_online(): """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ db_config = sqla_config.get_db_config() engine = create_async_engine( db_config["url"], poolclass=pool.NullPool, future=True, ) async with engine.connect() as connection: await connection.run_sync(do_run_migrations) if context.is_offline_mode(): run_migrations_offline() else: asyncio.run(run_migrations_online())