comparison mod_http_xep227/mod_http_xep227.lua @ 4876:0f5f2d4475b9

mod_http_xep227: Add support for import via APIs rather than direct store manipulation In particular this transitions PEP nodes and data to be imported via mod_pep's APIs, fixing issues with importing at runtime while PEP data may already be live in RAM. Next obvious candidate for this approach is rosters, so clients get immediate roster pushes and other special handling (such as emitting subscribes to reach the desired subscription state).
author Matthew Wild <mwild1@gmail.com>
date Tue, 18 Jan 2022 17:01:18 +0000
parents 541b2cf68e93
children 65cdbbf9703a
comparison
equal deleted inserted replaced
4875:c8a7cb6fa1a7 4876:0f5f2d4475b9
2 local http = require "util.http"; 2 local http = require "util.http";
3 local sm = require "core.storagemanager"; 3 local sm = require "core.storagemanager";
4 local st = require "util.stanza"; 4 local st = require "util.stanza";
5 local xml = require "util.xml"; 5 local xml = require "util.xml";
6 6
7 local jid_join = require "util.jid".join;
8
9 local mod_pep = module:depends("pep");
7 local tokens = module:depends("tokenauth"); 10 local tokens = module:depends("tokenauth");
8 module:depends("storage_xep0227"); 11 module:depends("storage_xep0227");
9 12
10 local archive_store_name = module:get_option("archive_store", "archive"); 13 local archive_store_name = module:get_option("archive_store", "archive");
11 14
17 vcard = "keyval"; 20 vcard = "keyval";
18 21
19 [archive_store_name] = "archive"; 22 [archive_store_name] = "archive";
20 pep_data = "archive"; 23 pep_data = "archive";
21 }; 24 };
25
26 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
22 27
23 local function new_user_xml(username, host) 28 local function new_user_xml(username, host)
24 local user_xml = st.stanza("server-data", {xmlns='urn:xmpp:pie:0'}) 29 local user_xml = st.stanza("server-data", {xmlns='urn:xmpp:pie:0'})
25 :tag("host", { jid = host }) 30 :tag("host", { jid = host })
26 :tag("user", { name = username }):reset(); 31 :tag("user", { name = username }):reset();
150 155
151 event.response.headers["Content-Type"] = "application/xml"; 156 event.response.headers["Content-Type"] = "application/xml";
152 return [[<?xml version="1.0" encoding="utf-8" ?>]]..tostring(xml_data); 157 return [[<?xml version="1.0" encoding="utf-8" ?>]]..tostring(xml_data);
153 end 158 end
154 159
160 local function generic_keyval_importer(username, host, store_name, source_store)
161 -- Read the current data
162 local data, err = source_store:get(username);
163 if data ~= nil or not err then
164 local target_store = sm.open(host, store_name);
165 -- Transform the data and update user_xml (via the _set_user_xml callback)
166 if not target_store:set(username, data == nil and {} or data) then
167 return 500;
168 end
169 module:log("debug", "Imported data for '%s' store", store_name);
170 elseif err then
171 return nil, err;
172 else
173 module:log("debug", "No data for store '%s'", store_name);
174 end
175 return true;
176 end
177
178 local function generic_archive_importer(username, host, store_name, source_archive)
179 local dest_driver = get_config_driver(store_name, host);
180 local dest_archive = dest_driver:open(store_name, "archive");
181 local results_iter, results_err = source_archive:find(username);
182 if results_iter then
183 local count, errs = 0, 0;
184 for id, item, when, with in source_archive:find(username) do
185 local ok, err = dest_archive:append(username, id, item, when, with);
186 if ok then
187 count = count + 1;
188 else
189 module:log("warn", "Error: %s", err);
190 errs = errs + 1;
191 end
192 if ( count + errs ) % 100 == 0 then
193 module:log("info", "%d items migrated, %d errors", count, errs);
194 end
195 end
196 elseif results_err then
197 module:log("warn", "Unable to read from '%s': %s", store_name, results_err);
198 return nil, "error reading from source archive";
199 end
200 return true;
201 end
202
203 local special_keyval_importers = {};
204
205 function special_keyval_importers.pep(username, host, store_name, store) --luacheck: ignore 212/store_name
206 local user_jid = jid_join(username, host);
207 local pep_service = mod_pep.get_pep_service(username);
208 local pep_nodes, store_err = store:get(username);
209 if not pep_nodes and store_err then
210 return nil, store_err;
211 end
212
213 local all_ok = true;
214 for node_name, node_config in pairs(pep_nodes) do
215 local ok, ret = pep_service:get_node_config(node_name, user_jid);
216 if not ok and ret == "item-not-found" then
217 -- Create node according to imported data
218 if node_config == true then node_config = {}; end
219 local create_ok, create_err = pep_service:create(node_name, user_jid, node_config.config);
220 if not create_ok then
221 module:log("warn", "Failed to create PEP node: %s", create_err);
222 all_ok = false;
223 end
224 end
225 end
226
227 return all_ok;
228 end
229
230 local special_archive_importers = setmetatable({}, {
231 __index = function (t, k)
232 if k:match("^pep_") then
233 return t.pep_data;
234 end
235 end;
236 });
237
238 function special_archive_importers.pep_data(username, host, store_name, source_archive)
239 local user_jid = jid_join(username, host);
240 local pep_service = mod_pep.get_pep_service(username);
241
242 local node_name = store_name:match("^pep_(.+)$");
243 if not node_name then
244 return nil, "invalid store name";
245 end
246
247 local results_iter, results_err = source_archive:find(username);
248 if results_iter then
249 local count, errs = 0, 0;
250 for id, item in source_archive:find(username) do
251 local wrapped_item = st.stanza("item", { xmlns = xmlns_pubsub, id = id })
252 :add_child(item);
253 local ok, err = pep_service:publish(node_name, user_jid, id, wrapped_item);
254 if not ok then
255 module:log("warn", "Failed to publish PEP item to '%s': %s", node_name, err, tostring(wrapped_item));
256 end
257 end
258 module:log("debug", "Imported %d PEP items (%d errors)", count, errs);
259 elseif results_err then
260 return nil, "store access error";
261 end
262 return true;
263 end
264
155 local function is_looking_like_xep227(xml_data) 265 local function is_looking_like_xep227(xml_data)
156 if not xml_data or xml_data.name ~= "server-data" 266 if not xml_data or xml_data.name ~= "server-data"
157 or xml_data.attr.xmlns ~= "urn:xmpp:pie:0" then 267 or xml_data.attr.xmlns ~= "urn:xmpp:pie:0" then
158 return false; 268 return false;
159 end 269 end
185 local xep227_driver = sm.load_driver(session.host, "xep0227"); 295 local xep227_driver = sm.load_driver(session.host, "xep0227");
186 296
187 local query_params = http.formdecode(event.request.url.query or ""); 297 local query_params = http.formdecode(event.request.url.query or "");
188 local selected_stores = get_selected_stores(query_params); 298 local selected_stores = get_selected_stores(query_params);
189 299
300 module:log("debug", "Importing %d keyval stores (%s)...", #selected_stores.keyval, table.concat(selected_stores.keyval, ", "));
190 for _, store_name in ipairs(selected_stores.keyval) do 301 for _, store_name in ipairs(selected_stores.keyval) do
302 module:log("debug", "Importing keyval store %s...", store_name);
191 -- Initialize the destination store (XEP-0227 backed) 303 -- Initialize the destination store (XEP-0227 backed)
192 local store = xep227_driver:open_xep0227(store_name, nil, user_xml); 304 local source_store = xep227_driver:open_xep0227(store_name, nil, user_xml);
193 305
194 -- Read the current data 306 local importer = special_keyval_importers[store_name] or generic_keyval_importer;
195 local data, err = store:get(username); 307 local ok, err = importer(username, session.host, store_name, source_store);
196 if data ~= nil or not err then 308 if not ok then
197 local target_store = sm.open(session.host, store_name); 309 module:log("warn", "Importer for keyval store '%s' encountered error: %s", store_name, err or "<no error returned>");
198 -- Transform the data and update user_xml (via the _set_user_xml callback)
199 if not target_store:set(username, data == nil and {} or data) then
200 return 500;
201 end
202 elseif err then
203 return 500; 310 return 500;
204 end 311 end
205 end 312 end
206 313
207 if selected_stores.export_pep_data then 314 if selected_stores.export_pep_data then
212 table.insert(selected_stores.archive, "pep_"..node_name); 319 table.insert(selected_stores.archive, "pep_"..node_name);
213 end 320 end
214 end 321 end
215 end 322 end
216 323
324 module:log("debug", "Importing %d archive stores (%s)...", #selected_stores.archive, table.concat(selected_stores.archive, ", "));
217 for store_name in it.values(selected_stores.archive) do 325 for store_name in it.values(selected_stores.archive) do
326 module:log("debug", "Importing archive store %s...", store_name);
218 local source_archive = xep227_driver:open_xep0227(store_name, "archive", user_xml); 327 local source_archive = xep227_driver:open_xep0227(store_name, "archive", user_xml);
219 local dest_driver = get_config_driver(store_name, session.host); 328
220 local dest_archive = dest_driver:open(store_name, "archive"); 329 local importer = special_archive_importers[store_name] or generic_archive_importer;
221 local results_iter, results_err = source_archive:find(username); 330 local ok, err = importer(username, session.host, store_name, source_archive);
222 if results_iter then 331 if not ok then
223 local count, errs = 0, 0; 332 module:log("warn", "Importer for archive store '%s' encountered error: %s", err or "<no error returned>");
224 for id, item, when, with in source_archive:find(username) do
225 local ok, err = dest_archive:append(username, id, item, when, with);
226 if ok then
227 count = count + 1;
228 else
229 module:log("warn", "Error: %s", err);
230 errs = errs + 1;
231 end
232 if ( count + errs ) % 100 == 0 then
233 module:log("info", "%d items migrated, %d errors", count, errs);
234 end
235 end
236 elseif results_err then
237 module:log("warn", "Unable to read from '%s': %s", store_name, results_err);
238 return 500; 333 return 500;
239 end 334 end
240 end 335 end
241 336
242 return 200; 337 return 200;