4212
|
1 """convert LegacyPickle columns to JSON |
|
2 |
|
3 Revision ID: fe3a02cb4bec |
|
4 Revises: 610345f77e75 |
|
5 Create Date: 2024-02-22 14:55:59.993983 |
|
6 |
|
7 """ |
|
8 from alembic import op |
|
9 import sqlalchemy as sa |
|
10 import pickle |
|
11 import json |
|
12 from libervia.backend.plugins.plugin_xep_0373 import PublicKeyMetadata |
4216
|
13 from libervia.backend.plugins.plugin_xep_0384 import TrustMessageCacheEntry |
4212
|
14 |
|
15 # revision identifiers, used by Alembic. |
|
16 revision = "fe3a02cb4bec" |
|
17 down_revision = "610345f77e75" |
|
18 branch_labels = None |
|
19 depends_on = None |
|
20 |
|
21 |
|
22 def convert_pickle_to_json(value, table, primary_keys): |
|
23 """Convert pickled data to JSON, handling potential errors.""" |
|
24 if value is None: |
|
25 return None |
|
26 try: |
|
27 # some values are converted to bytes with LegacyPickle |
|
28 if isinstance(value, str): |
|
29 value = value.encode() |
|
30 try: |
|
31 deserialized = pickle.loads(value, encoding="utf-8") |
|
32 except ModuleNotFoundError: |
|
33 deserialized = pickle.loads( |
|
34 value.replace(b"sat.plugins", b"libervia.backend.plugins"), |
|
35 encoding="utf-8", |
|
36 ) |
|
37 if ( |
|
38 table == "private_ind_bin" |
|
39 and primary_keys[0] == "XEP-0373" |
|
40 and not primary_keys[1].startswith("/trust") |
|
41 and isinstance(deserialized, set) |
|
42 and deserialized |
|
43 and isinstance(next(iter(deserialized)), PublicKeyMetadata) |
|
44 ): |
|
45 # XEP-0373 plugin was pickling an internal class, this can't be converted |
|
46 # directly to JSON, so we do a special treatment with the add `to_dict` and |
|
47 # `from_dict` methods. |
|
48 deserialized = [pkm.to_dict() for pkm in deserialized] |
|
49 |
4216
|
50 elif ( |
|
51 table == "private_ind_bin" |
|
52 and primary_keys[0] == "XEP-0384/TM" |
|
53 and primary_keys[1] == "cache" |
|
54 ): |
|
55 # Same issue and solution as for XEP-0373 |
|
56 try: |
|
57 deserialized = [tm.to_dict() for tm in deserialized] |
|
58 except Exception as e: |
|
59 print( |
|
60 "Warning: Failed to convert Trust Management cache with value " |
|
61 f" {deserialized!r}, using empty array instead: {e}" |
|
62 ) |
|
63 deserialized=[] |
|
64 |
4212
|
65 ret = json.dumps(deserialized, ensure_ascii=False, default=str) |
|
66 if table == 'history' and ret == "{}": |
|
67 # For history, we can remove empty data, but for other tables it may be |
|
68 # significant. |
|
69 ret = None |
|
70 return ret |
|
71 except Exception as e: |
|
72 print( |
|
73 f"Warning: Failed to convert pickle to JSON, using NULL instead. Error: {e}" |
|
74 ) |
|
75 return None |
|
76 |
|
77 |
|
78 def upgrade(): |
|
79 print( |
|
80 "This migration may take very long, please be patient and don't stop the process." |
|
81 ) |
|
82 connection = op.get_bind() |
|
83 |
|
84 tables_and_columns = [ |
|
85 ("history", "extra", "uid"), |
|
86 ("private_gen_bin", "value", "namespace", "key"), |
|
87 ("private_ind_bin", "value", "namespace", "key", "profile_id"), |
|
88 ] |
|
89 |
|
90 for table, column, *primary_keys in tables_and_columns: |
|
91 primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) |
|
92 select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") |
|
93 update_stmt = sa.text( |
|
94 f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" |
|
95 ) |
|
96 |
|
97 result = connection.execute(select_stmt) |
|
98 for row in result: |
|
99 value = row[-1] |
|
100 if value is None: |
|
101 continue |
|
102 data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} |
|
103 data[column] = convert_pickle_to_json(value, table, row[:-1]) |
|
104 connection.execute(update_stmt.bindparams(**data)) |
|
105 |
|
106 |
|
107 def convert_json_to_pickle(value, table, primary_keys): |
|
108 """Convert JSON data back to pickled data, handling potential errors.""" |
|
109 if value is None: |
|
110 return None |
|
111 try: |
|
112 deserialized = json.loads(value) |
|
113 # Check for the specific table and primary key conditions that require special |
|
114 # handling |
|
115 if ( |
|
116 table == "private_ind_bin" |
|
117 and primary_keys[0] == "XEP-0373" |
|
118 and not primary_keys[1].startswith("/trust") |
|
119 ): |
|
120 # Convert list of dicts back to set of PublicKeyMetadata objects |
|
121 if isinstance(deserialized, list): |
|
122 deserialized = {PublicKeyMetadata.from_dict(d) for d in deserialized} |
4216
|
123 elif ( |
|
124 table == "private_ind_bin" |
|
125 and primary_keys[0] == "XEP-0384/TM" |
|
126 and primary_keys[1] == "cache" |
|
127 ): |
|
128 # Convert list of dicts back to set of TrustMessageCacheEntry objects |
|
129 if isinstance(deserialized, list): |
|
130 deserialized = {TrustMessageCacheEntry.from_dict(d) for d in deserialized} |
4212
|
131 return pickle.dumps(deserialized, 0) |
|
132 except Exception as e: |
|
133 print( |
|
134 f"Warning: Failed to convert JSON to pickle, using NULL instead. Error: {e}" |
|
135 ) |
|
136 return None |
|
137 |
|
138 |
|
139 def downgrade(): |
|
140 print( |
|
141 "Reverting JSON columns to LegacyPickle format. This may take a while, please be " |
|
142 "patient." |
|
143 ) |
|
144 connection = op.get_bind() |
|
145 |
|
146 tables_and_columns = [ |
|
147 ("history", "extra", "uid"), |
|
148 ("private_gen_bin", "value", "namespace", "key"), |
|
149 ("private_ind_bin", "value", "namespace", "key", "profile_id"), |
|
150 ] |
|
151 |
|
152 for table, column, *primary_keys in tables_and_columns: |
|
153 primary_key_clause = " AND ".join(f"{pk} = :{pk}" for pk in primary_keys) |
|
154 select_stmt = sa.text(f"SELECT {', '.join(primary_keys)}, {column} FROM {table}") |
|
155 update_stmt = sa.text( |
|
156 f"UPDATE {table} SET {column} = :{column} WHERE {primary_key_clause}" |
|
157 ) |
|
158 |
|
159 result = connection.execute(select_stmt) |
|
160 for row in result: |
|
161 value = row[-1] |
|
162 if value is None: |
|
163 continue |
|
164 data = {pk: row[idx] for idx, pk in enumerate(primary_keys)} |
|
165 data[column] = convert_json_to_pickle(value, table, row[:-1]) |
|
166 connection.execute(update_stmt.bindparams(**data)) |