4212
|
1 """convert LegacyPickle columns to JSON |
|
2 |
|
3 Revision ID: fe3a02cb4bec |
|
4 Revises: 610345f77e75 |
|
5 Create Date: 2024-02-22 14:55:59.993983 |
|
6 |
|
7 """ |
|
8 from alembic import op |
|
9 import sqlalchemy as sa |
|
10 import pickle |
|
11 import json |
|
12 from libervia.backend.plugins.plugin_xep_0373 import PublicKeyMetadata |
|
13 |
|
14 # revision identifiers, used by Alembic. |
|
15 revision = "fe3a02cb4bec" |
|
16 down_revision = "610345f77e75" |
|
17 branch_labels = None |
|
18 depends_on = None |
|
19 |
|
20 |
|
21 def convert_pickle_to_json(value, table, primary_keys): |
|
22 """Convert pickled data to JSON, handling potential errors.""" |
|
23 if value is None: |
|
24 return None |
|
25 try: |
|
26 # some values are converted to bytes with LegacyPickle |
|
27 if isinstance(value, str): |
|
28 value = value.encode() |
|
29 try: |
|
30 deserialized = pickle.loads(value, encoding="utf-8") |
|
31 except ModuleNotFoundError: |
|
32 deserialized = pickle.loads( |
|
33 value.replace(b"sat.plugins", b"libervia.backend.plugins"), |
|
34 encoding="utf-8", |
|
35 ) |
|
36 if ( |
|
37 table == "private_ind_bin" |
|
38 and primary_keys[0] == "XEP-0373" |
|
39 and not primary_keys[1].startswith("/trust") |
|
40 and isinstance(deserialized, set) |
|
41 and deserialized |
|
42 and isinstance(next(iter(deserialized)), PublicKeyMetadata) |
|
43 ): |
|
44 # XEP-0373 plugin was pickling an internal class, this can't be converted |
|
45 # directly to JSON, so we do a special treatment with the add `to_dict` and |
|
46 # `from_dict` methods. |
|
47 deserialized = [pkm.to_dict() for pkm in deserialized] |
|
48 |
|
49 ret = json.dumps(deserialized, ensure_ascii=False, default=str) |
|
50 if table == 'history' and ret == "{}": |
|
51 # For history, we can remove empty data, but for other tables it may be |
|
52 # significant. |
|
53 ret = None |
|
54 return ret |
|
55 except Exception as e: |
|
56 print( |
|
57 f"Warning: Failed to convert pickle to JSON, using NULL instead. Error: {e}" |
|
58 ) |
|
59 return None |
|
60 |
|
61 |
|
62 def upgrade(): |
|
63 print( |
|
64 "This migration may take very long, please be patient and don't stop the process." |
|
65 ) |
|
66 connection = op.get_bind() |
|
67 |
|
68 tables_and_columns = [ |
|
69 ("history", "extra", "uid"), |
|
70 ("private_gen_bin", "value", "namespace", "key"), |
|
71 ("private_ind_bin", "value", "namespace", "key", "profile_id"), |
|
72 ] |
|
73 |
|
74 for table, column, *primary_keys in tables_and_columns: |
|
75 primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) |
|
76 select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") |
|
77 update_stmt = sa.text( |
|
78 f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" |
|
79 ) |
|
80 |
|
81 result = connection.execute(select_stmt) |
|
82 for row in result: |
|
83 value = row[-1] |
|
84 if value is None: |
|
85 continue |
|
86 data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} |
|
87 data[column] = convert_pickle_to_json(value, table, row[:-1]) |
|
88 connection.execute(update_stmt.bindparams(**data)) |
|
89 |
|
90 |
|
91 def convert_json_to_pickle(value, table, primary_keys): |
|
92 """Convert JSON data back to pickled data, handling potential errors.""" |
|
93 if value is None: |
|
94 return None |
|
95 try: |
|
96 deserialized = json.loads(value) |
|
97 # Check for the specific table and primary key conditions that require special |
|
98 # handling |
|
99 if ( |
|
100 table == "private_ind_bin" |
|
101 and primary_keys[0] == "XEP-0373" |
|
102 and not primary_keys[1].startswith("/trust") |
|
103 ): |
|
104 # Convert list of dicts back to set of PublicKeyMetadata objects |
|
105 if isinstance(deserialized, list): |
|
106 deserialized = {PublicKeyMetadata.from_dict(d) for d in deserialized} |
|
107 return pickle.dumps(deserialized, 0) |
|
108 except Exception as e: |
|
109 print( |
|
110 f"Warning: Failed to convert JSON to pickle, using NULL instead. Error: {e}" |
|
111 ) |
|
112 return None |
|
113 |
|
114 |
|
115 def downgrade(): |
|
116 print( |
|
117 "Reverting JSON columns to LegacyPickle format. This may take a while, please be " |
|
118 "patient." |
|
119 ) |
|
120 connection = op.get_bind() |
|
121 |
|
122 tables_and_columns = [ |
|
123 ("history", "extra", "uid"), |
|
124 ("private_gen_bin", "value", "namespace", "key"), |
|
125 ("private_ind_bin", "value", "namespace", "key", "profile_id"), |
|
126 ] |
|
127 |
|
128 for table, column, *primary_keys in tables_and_columns: |
|
129 primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) |
|
130 select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") |
|
131 update_stmt = sa.text( |
|
132 f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" |
|
133 ) |
|
134 |
|
135 result = connection.execute(select_stmt) |
|
136 for row in result: |
|
137 value = row[-1] |
|
138 if value is None: |
|
139 continue |
|
140 data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} |
|
141 data[column] = convert_json_to_pickle(value, table, row[:-1]) |
|
142 connection.execute(update_stmt.bindparams(**data)) |