Mercurial > prosody-modules
comparison mod_http_xep227/mod_http_xep227.lua @ 4876:0f5f2d4475b9
mod_http_xep227: Add support for import via APIs rather than direct store manipulation
In particular this transitions PEP nodes and data to be imported via mod_pep's
APIs, fixing issues with importing at runtime while PEP data may already be
live in RAM.
Next obvious candidate for this approach is rosters, so clients get immediate
roster pushes and other special handling (such as emitting subscribes to reach
the desired subscription state).
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Tue, 18 Jan 2022 17:01:18 +0000 |
parents | 541b2cf68e93 |
children | 65cdbbf9703a |
comparison
equal
deleted
inserted
replaced
4875:c8a7cb6fa1a7 | 4876:0f5f2d4475b9 |
---|---|
2 local http = require "util.http"; | 2 local http = require "util.http"; |
3 local sm = require "core.storagemanager"; | 3 local sm = require "core.storagemanager"; |
4 local st = require "util.stanza"; | 4 local st = require "util.stanza"; |
5 local xml = require "util.xml"; | 5 local xml = require "util.xml"; |
6 | 6 |
7 local jid_join = require "util.jid".join; | |
8 | |
9 local mod_pep = module:depends("pep"); | |
7 local tokens = module:depends("tokenauth"); | 10 local tokens = module:depends("tokenauth"); |
8 module:depends("storage_xep0227"); | 11 module:depends("storage_xep0227"); |
9 | 12 |
10 local archive_store_name = module:get_option("archive_store", "archive"); | 13 local archive_store_name = module:get_option("archive_store", "archive"); |
11 | 14 |
17 vcard = "keyval"; | 20 vcard = "keyval"; |
18 | 21 |
19 [archive_store_name] = "archive"; | 22 [archive_store_name] = "archive"; |
20 pep_data = "archive"; | 23 pep_data = "archive"; |
21 }; | 24 }; |
25 | |
26 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; | |
22 | 27 |
23 local function new_user_xml(username, host) | 28 local function new_user_xml(username, host) |
24 local user_xml = st.stanza("server-data", {xmlns='urn:xmpp:pie:0'}) | 29 local user_xml = st.stanza("server-data", {xmlns='urn:xmpp:pie:0'}) |
25 :tag("host", { jid = host }) | 30 :tag("host", { jid = host }) |
26 :tag("user", { name = username }):reset(); | 31 :tag("user", { name = username }):reset(); |
150 | 155 |
151 event.response.headers["Content-Type"] = "application/xml"; | 156 event.response.headers["Content-Type"] = "application/xml"; |
152 return [[<?xml version="1.0" encoding="utf-8" ?>]]..tostring(xml_data); | 157 return [[<?xml version="1.0" encoding="utf-8" ?>]]..tostring(xml_data); |
153 end | 158 end |
154 | 159 |
160 local function generic_keyval_importer(username, host, store_name, source_store) | |
161 -- Read the current data | |
162 local data, err = source_store:get(username); | |
163 if data ~= nil or not err then | |
164 local target_store = sm.open(host, store_name); | |
165 -- Transform the data and update user_xml (via the _set_user_xml callback) | |
166 if not target_store:set(username, data == nil and {} or data) then | |
167 return 500; | |
168 end | |
169 module:log("debug", "Imported data for '%s' store", store_name); | |
170 elseif err then | |
171 return nil, err; | |
172 else | |
173 module:log("debug", "No data for store '%s'", store_name); | |
174 end | |
175 return true; | |
176 end | |
177 | |
178 local function generic_archive_importer(username, host, store_name, source_archive) | |
179 local dest_driver = get_config_driver(store_name, host); | |
180 local dest_archive = dest_driver:open(store_name, "archive"); | |
181 local results_iter, results_err = source_archive:find(username); | |
182 if results_iter then | |
183 local count, errs = 0, 0; | |
184 for id, item, when, with in source_archive:find(username) do | |
185 local ok, err = dest_archive:append(username, id, item, when, with); | |
186 if ok then | |
187 count = count + 1; | |
188 else | |
189 module:log("warn", "Error: %s", err); | |
190 errs = errs + 1; | |
191 end | |
192 if ( count + errs ) % 100 == 0 then | |
193 module:log("info", "%d items migrated, %d errors", count, errs); | |
194 end | |
195 end | |
196 elseif results_err then | |
197 module:log("warn", "Unable to read from '%s': %s", store_name, results_err); | |
198 return nil, "error reading from source archive"; | |
199 end | |
200 return true; | |
201 end | |
202 | |
203 local special_keyval_importers = {}; | |
204 | |
205 function special_keyval_importers.pep(username, host, store_name, store) --luacheck: ignore 212/store_name | |
206 local user_jid = jid_join(username, host); | |
207 local pep_service = mod_pep.get_pep_service(username); | |
208 local pep_nodes, store_err = store:get(username); | |
209 if not pep_nodes and store_err then | |
210 return nil, store_err; | |
211 end | |
212 | |
213 local all_ok = true; | |
214 for node_name, node_config in pairs(pep_nodes) do | |
215 local ok, ret = pep_service:get_node_config(node_name, user_jid); | |
216 if not ok and ret == "item-not-found" then | |
217 -- Create node according to imported data | |
218 if node_config == true then node_config = {}; end | |
219 local create_ok, create_err = pep_service:create(node_name, user_jid, node_config.config); | |
220 if not create_ok then | |
221 module:log("warn", "Failed to create PEP node: %s", create_err); | |
222 all_ok = false; | |
223 end | |
224 end | |
225 end | |
226 | |
227 return all_ok; | |
228 end | |
229 | |
230 local special_archive_importers = setmetatable({}, { | |
231 __index = function (t, k) | |
232 if k:match("^pep_") then | |
233 return t.pep_data; | |
234 end | |
235 end; | |
236 }); | |
237 | |
238 function special_archive_importers.pep_data(username, host, store_name, source_archive) | |
239 local user_jid = jid_join(username, host); | |
240 local pep_service = mod_pep.get_pep_service(username); | |
241 | |
242 local node_name = store_name:match("^pep_(.+)$"); | |
243 if not node_name then | |
244 return nil, "invalid store name"; | |
245 end | |
246 | |
247 local results_iter, results_err = source_archive:find(username); | |
248 if results_iter then | |
249 local count, errs = 0, 0; | |
250 for id, item in source_archive:find(username) do | |
251 local wrapped_item = st.stanza("item", { xmlns = xmlns_pubsub, id = id }) | |
252 :add_child(item); | |
253 local ok, err = pep_service:publish(node_name, user_jid, id, wrapped_item); | |
254 if not ok then | |
255 module:log("warn", "Failed to publish PEP item to '%s': %s", node_name, err, tostring(wrapped_item)); | |
256 end | |
257 end | |
258 module:log("debug", "Imported %d PEP items (%d errors)", count, errs); | |
259 elseif results_err then | |
260 return nil, "store access error"; | |
261 end | |
262 return true; | |
263 end | |
264 | |
155 local function is_looking_like_xep227(xml_data) | 265 local function is_looking_like_xep227(xml_data) |
156 if not xml_data or xml_data.name ~= "server-data" | 266 if not xml_data or xml_data.name ~= "server-data" |
157 or xml_data.attr.xmlns ~= "urn:xmpp:pie:0" then | 267 or xml_data.attr.xmlns ~= "urn:xmpp:pie:0" then |
158 return false; | 268 return false; |
159 end | 269 end |
185 local xep227_driver = sm.load_driver(session.host, "xep0227"); | 295 local xep227_driver = sm.load_driver(session.host, "xep0227"); |
186 | 296 |
187 local query_params = http.formdecode(event.request.url.query or ""); | 297 local query_params = http.formdecode(event.request.url.query or ""); |
188 local selected_stores = get_selected_stores(query_params); | 298 local selected_stores = get_selected_stores(query_params); |
189 | 299 |
300 module:log("debug", "Importing %d keyval stores (%s)...", #selected_stores.keyval, table.concat(selected_stores.keyval, ", ")); | |
190 for _, store_name in ipairs(selected_stores.keyval) do | 301 for _, store_name in ipairs(selected_stores.keyval) do |
302 module:log("debug", "Importing keyval store %s...", store_name); | |
191 -- Initialize the destination store (XEP-0227 backed) | 303 -- Initialize the destination store (XEP-0227 backed) |
192 local store = xep227_driver:open_xep0227(store_name, nil, user_xml); | 304 local source_store = xep227_driver:open_xep0227(store_name, nil, user_xml); |
193 | 305 |
194 -- Read the current data | 306 local importer = special_keyval_importers[store_name] or generic_keyval_importer; |
195 local data, err = store:get(username); | 307 local ok, err = importer(username, session.host, store_name, source_store); |
196 if data ~= nil or not err then | 308 if not ok then |
197 local target_store = sm.open(session.host, store_name); | 309 module:log("warn", "Importer for keyval store '%s' encountered error: %s", store_name, err or "<no error returned>"); |
198 -- Transform the data and update user_xml (via the _set_user_xml callback) | |
199 if not target_store:set(username, data == nil and {} or data) then | |
200 return 500; | |
201 end | |
202 elseif err then | |
203 return 500; | 310 return 500; |
204 end | 311 end |
205 end | 312 end |
206 | 313 |
207 if selected_stores.export_pep_data then | 314 if selected_stores.export_pep_data then |
212 table.insert(selected_stores.archive, "pep_"..node_name); | 319 table.insert(selected_stores.archive, "pep_"..node_name); |
213 end | 320 end |
214 end | 321 end |
215 end | 322 end |
216 | 323 |
324 module:log("debug", "Importing %d archive stores (%s)...", #selected_stores.archive, table.concat(selected_stores.archive, ", ")); | |
217 for store_name in it.values(selected_stores.archive) do | 325 for store_name in it.values(selected_stores.archive) do |
326 module:log("debug", "Importing archive store %s...", store_name); | |
218 local source_archive = xep227_driver:open_xep0227(store_name, "archive", user_xml); | 327 local source_archive = xep227_driver:open_xep0227(store_name, "archive", user_xml); |
219 local dest_driver = get_config_driver(store_name, session.host); | 328 |
220 local dest_archive = dest_driver:open(store_name, "archive"); | 329 local importer = special_archive_importers[store_name] or generic_archive_importer; |
221 local results_iter, results_err = source_archive:find(username); | 330 local ok, err = importer(username, session.host, store_name, source_archive); |
222 if results_iter then | 331 if not ok then |
223 local count, errs = 0, 0; | 332 module:log("warn", "Importer for archive store '%s' encountered error: %s", err or "<no error returned>"); |
224 for id, item, when, with in source_archive:find(username) do | |
225 local ok, err = dest_archive:append(username, id, item, when, with); | |
226 if ok then | |
227 count = count + 1; | |
228 else | |
229 module:log("warn", "Error: %s", err); | |
230 errs = errs + 1; | |
231 end | |
232 if ( count + errs ) % 100 == 0 then | |
233 module:log("info", "%d items migrated, %d errors", count, errs); | |
234 end | |
235 end | |
236 elseif results_err then | |
237 module:log("warn", "Unable to read from '%s': %s", store_name, results_err); | |
238 return 500; | 333 return 500; |
239 end | 334 end |
240 end | 335 end |
241 | 336 |
242 return 200; | 337 return 200; |