Mercurial > libervia-backend
view libervia/backend/memory/migration/versions/fe3a02cb4bec_convert_legacypickle_columns_to_json.py @ 4242:8acf46ed7f36
frontends: remote control implementation:
This is the frontends common part of remote control implementation. It handle the creation
of WebRTC session, and management of inputs. For now the reception use freedesktop.org
Desktop portal, and works mostly with Wayland based Desktop Environments.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:43 +0200 |
parents | 79a4870cfbdf |
children | 0d7bb4df2343 |
line wrap: on
line source
"""convert LegacyPickle columns to JSON Revision ID: fe3a02cb4bec Revises: 610345f77e75 Create Date: 2024-02-22 14:55:59.993983 """ from alembic import op import sqlalchemy as sa import pickle import json try: from libervia.backend.plugins.plugin_xep_0373 import PublicKeyMetadata except Exception: PublicKeyMetadata = None print( "Warning: Can't import XEP-0373, its data won't be updated. It's probably not " "used on this installation." ) try: from libervia.backend.plugins.plugin_xep_0384 import TrustMessageCacheEntry except Exception: TrustMessageCacheEntry = None print( "Warning: Can't import XEP-0384, its data won't be updated. It's probably not " "used on this installation." ) # revision identifiers, used by Alembic. revision = "fe3a02cb4bec" down_revision = "610345f77e75" branch_labels = None depends_on = None def convert_pickle_to_json(value, table, primary_keys): """Convert pickled data to JSON, handling potential errors.""" if value is None: return None try: # some values are converted to bytes with LegacyPickle if isinstance(value, str): value = value.encode() try: deserialized = pickle.loads(value, encoding="utf-8") except ModuleNotFoundError: deserialized = pickle.loads( value.replace(b"sat.plugins", b"libervia.backend.plugins"), encoding="utf-8", ) if ( PublicKeyMetadata is not None and table == "private_ind_bin" and primary_keys[0] == "XEP-0373" and not primary_keys[1].startswith("/trust") and isinstance(deserialized, set) and deserialized and isinstance(next(iter(deserialized)), PublicKeyMetadata) ): # XEP-0373 plugin was pickling an internal class, this can't be converted # directly to JSON, so we do a special treatment with the add `to_dict` and # `from_dict` methods. deserialized = [pkm.to_dict() for pkm in deserialized] elif ( TrustMessageCacheEntry is not None and table == "private_ind_bin" and primary_keys[0] == "XEP-0384/TM" and primary_keys[1] == "cache" ): # Same issue and solution as for XEP-0373 try: deserialized = [tm.to_dict() for tm in deserialized] except Exception as e: print( "Warning: Failed to convert Trust Management cache with value " f" {deserialized!r}, using empty array instead: {e}" ) deserialized=[] ret = json.dumps(deserialized, ensure_ascii=False, default=str) if table == 'history' and ret == "{}": # For history, we can remove empty data, but for other tables it may be # significant. ret = None return ret except Exception as e: print( f"Warning: Failed to convert pickle to JSON, using NULL instead. Error: {e}" ) return None def upgrade(): print( "This migration may take very long, please be patient and don't stop the process." ) connection = op.get_bind() tables_and_columns = [ ("history", "extra", "uid"), ("private_gen_bin", "value", "namespace", "key"), ("private_ind_bin", "value", "namespace", "key", "profile_id"), ] for table, column, *primary_keys in tables_and_columns: primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") update_stmt = sa.text( f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" ) result = connection.execute(select_stmt) for row in result: value = row[-1] if value is None: continue data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} data[column] = convert_pickle_to_json(value, table, row[:-1]) connection.execute(update_stmt.bindparams(**data)) def convert_json_to_pickle(value, table, primary_keys): """Convert JSON data back to pickled data, handling potential errors.""" if value is None: return None try: deserialized = json.loads(value) # Check for the specific table and primary key conditions that require special # handling if ( PublicKeyMetadata is not None and table == "private_ind_bin" and primary_keys[0] == "XEP-0373" and not primary_keys[1].startswith("/trust") ): # Convert list of dicts back to set of PublicKeyMetadata objects if isinstance(deserialized, list): deserialized = {PublicKeyMetadata.from_dict(d) for d in deserialized} elif ( TrustMessageCacheEntry is not None and table == "private_ind_bin" and primary_keys[0] == "XEP-0384/TM" and primary_keys[1] == "cache" ): # Convert list of dicts back to set of TrustMessageCacheEntry objects if isinstance(deserialized, list): deserialized = {TrustMessageCacheEntry.from_dict(d) for d in deserialized} return pickle.dumps(deserialized, 0) except Exception as e: print( f"Warning: Failed to convert JSON to pickle, using NULL instead. Error: {e}" ) return None def downgrade(): print( "Reverting JSON columns to LegacyPickle format. This may take a while, please be " "patient." ) connection = op.get_bind() tables_and_columns = [ ("history", "extra", "uid"), ("private_gen_bin", "value", "namespace", "key"), ("private_ind_bin", "value", "namespace", "key", "profile_id"), ] for table, column, *primary_keys in tables_and_columns: primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") update_stmt = sa.text( f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" ) result = connection.execute(select_stmt) for row in result: value = row[-1] if value is None: continue data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} data[column] = convert_json_to_pickle(value, table, row[:-1]) connection.execute(update_stmt.bindparams(**data))