view mod_storage_xmlarchive/mod_storage_xmlarchive.lua @ 4282:281a864e7472

mod_pubsub_feeds: Don't skip publishing items after an existing one I encountered a feed which was backwards, such that older entries were considered first and then it would skip newer entries. This may however run into trouble if the feed contains more items than what's persisted in pubsub.
author Kim Alvefur <zash@zash.se>
date Mon, 30 Nov 2020 15:17:29 +0100
parents a03abb4bb6d7
children 591c643d55b2
line wrap: on
line source

-- mod_storage_xmlarchive
-- Copyright (C) 2015-2020 Kim Alvefur
--
-- This file is MIT/X11 licensed.
--
-- luacheck: ignore unused self

local lfs = require "lfs";
local dm = require "core.storagemanager".olddm;
local hmac_sha256 = require"util.hashes".hmac_sha256;
local st = require"util.stanza";
local dt = require"util.datetime";
local new_stream = require "util.xmppstream".new;
local xml = require "util.xml";
local empty = {};

if not dm.append_raw then
	module:require"datamanager_append_raw";
end

local archive = {};
local archive_mt = { __index = archive };

local is_stanza = st.is_stanza or function (s)
	return getmetatable(s) == st.stanza_mt;
end

function archive:append(username, _, data, when, with)
	if not is_stanza(data) then
		module:log("error", "Attempt to store non-stanza object, traceback: %s", debug.traceback());
		return nil, "unsupported-datatype";
	end

	username = username or "@";
	data = tostring(data) .. "\n";

	local day = dt.date(when);
	local ok, err = dm.append_raw(username.."@"..day, self.host, self.store, "xml", data);
	if not ok then
		-- append_raw, unlike list_append, does not log anything on failure atm, so we do so here
		module:log("error", "Unable to write to %s storage ('%s') for user: %s@%s",
			self.store, err, username, module.host)
		return nil, err;
	end

	-- If the day-file is missing then we need to add it to the list of days
	local first_of_day = not lfs.attributes(dm.getpath(username .. "@" .. day, self.host, self.store, "list"));

	local offset = ok and err or 0;

	local id = day .. "-" .. hmac_sha256(username.."@"..day.."+"..offset, data, true):sub(-16);
	ok, err = dm.list_append(username.."@"..day, self.host, self.store,
		{ id = id, when = dt.datetime(when), with = with, offset = offset, length = #data });
	if ok and first_of_day then
		ok, err = dm.list_append(username, self.host, self.store, day);
	end
	if not ok then
		return nil, err;
	end
	return id;
end

function archive:get(username, id)
	local dates = self:dates(username) or empty;
	local day_idx, item_idx, items = self:_get_idx(username, id, dates);
	if not day_idx then
		return nil, "item-not-found";
	end
	module:log("debug", ":_get_idx(%q, %q) --> %q, %q", username, id, dates[day_idx], items[item_idx]);
	local day = dates[day_idx];
	local item = items[item_idx];
	module:log("debug", "item = %q", item);
	local filename = dm.getpath(username.."@"..day, self.host, self.store, "xml");
	local xmlfile, ferr = io.open(filename, "r");
	if not xmlfile then return nil, ferr; end
	local p,err = xmlfile:seek("set", item.offset);
	if p ~= item.offset or err ~= nil then return nil, err; end
	local data = xmlfile:read(item.length);
	local parsed, perr = xml.parse(data);
	if not parsed then return nil, perr; end
	return parsed, dt.parse(item.when), item.with;
end

local overwrite = module:get_option("xmlarchive_overwrite", false);

function archive:set(username, id, data, new_when, new_with)
	if not is_stanza(data) then
		module:log("error", "Attempt to store non-stanza object, traceback: %s", debug.traceback());
		return nil, "unsupported-datatype";
	end

	username = username or "@";
	data = tostring(data) .. "\n";

	local dates = self:dates(username) or empty;
	local day_idx, item_idx, items = self:_get_idx(username, id, dates);
	if not day_idx then
		return nil, "item-not-found";
	end
	local day = dates[day_idx];
	local item = items[item_idx];

	local filename = dm.getpath(username.."@"..day, self.host, self.store, "xml");

	local replaced, err = dm.append_raw(username.."@"..day, self.host, self.store, "xml", data);
	if not replaced then return nil, err; end
	local new_offset = err;

	-- default yes or no?
	if overwrite then
		local xmlfile, ferr = io.open(filename, "r+");
		if not xmlfile then return nil, ferr; end
		local p,err = xmlfile:seek("set", item.offset);
		if p ~= item.offset or err ~= nil then return nil, err; end
		local _,err = xmlfile:write((" "):rep(item.length));
		if err ~= nil then return nil, err; end
		local _,err = xmlfile:close();
		if err ~= nil then return nil, err; end
	end

	items[item_idx] = {
		id = id,
		when = new_when or item.when,
		with = new_with or item.with,
		offset = new_offset,
		length = #data,
		replaces = item,
	};
	local ok, err = dm.list_store(username.."@"..day, self.host, self.store, items);
	return ok, err;
end

function archive:_get_idx(username, id, dates)
	module:log("debug", "Looking for item with id %q", id);
	dates = dates or self:dates(username) or empty;
	local date = id:match("^%d%d%d%d%-%d%d%-%d%d");
	for d = 1, #dates do
		if not date or date == dates[d] then
			module:log("debug", "Loading index for %s", dates[d]);
			local items = dm.list_load(username .. "@" .. dates[d], self.host, self.store) or empty;
			for i = 1, #items do
				if items[i].id == id then
					module:log("debug", "Found item!");
					return d, i, items;
				end
			end
			if date then
				return; -- Assuming no duplicates
			end
		elseif date and date < dates[d] then
			module:log("debug", "Skipping remaining dates after %s", date);
			return; -- List is assumed to be sorted
		end
	end
	module:log("debug", "Item not found");
end

function archive:find(username, query)
	username = username or "@";
	query = query or empty;

	local result;
	local function cb(_, stanza)
		assert(not result, "Multiple items in chunk");
		result = stanza;
	end

	local stream_session = { notopen = true };
	local stream_callbacks = { handlestanza = cb, stream_ns = "jabber:client", default_ns = "jabber:client" };
	local stream = new_stream(stream_session, stream_callbacks);
	local dates = self:dates(username) or empty;
	local function reset_stream()
		stream:reset();
		stream_session.notopen = true;
		stream:feed(st.stanza("stream", { xmlns = "jabber:client" }):top_tag());
		stream_session.notopen = nil;
	end
	reset_stream();

	local limit = query.limit;
	local start_day, step, last_day = 1, 1, #dates;
	local count = 0;
	local rev = query.reverse;
	if query.start then
		local d = dt.date(query.start);
		for i = start_day, last_day, step do
			if dates[i] < d then
				start_day = i + 1;
			elseif dates[i] >= d then
				start_day = i; break;
			end
		end
	end
	if query["end"] then
		local d = dt.date(query["end"]);
		for i = last_day, start_day, -step do
			if dates[i] > d then
				last_day = i - 1;
			elseif dates[i] <= d then
				last_day = i; break;
			end
		end
	end
	local items;
	local first_item, last_item;
	if rev then
		start_day, step, last_day = last_day, -step, start_day;
		if query.before then
			local before_day, before_item, items_ = self:_get_idx(username, query.before, dates);
			if not before_day then
				return nil, "item-not-found";
			elseif before_day <= start_day then
				if before_item then
					first_item = before_item - 1;
				else
					first_item = #items_;
				end
				last_item = 1;
				start_day = before_day;
				items = items_;
			end
		end
	elseif query.after then
		local after_day, after_item, items_ = self:_get_idx(username, query.after, dates);
		if not after_day then
			return nil, "item-not-found";
		elseif after_day >= start_day then
			if after_item then
				first_item = after_item + 1;
			else
				first_item = 1;
			end
			last_item = #items_;
			start_day = after_day;
			items = items_;
		end
	end

	local date_open, xmlfile;
	local function read_xml(date, offset, length)
		if xmlfile and date ~= date_open then
			module:log("debug", "Closing XML file for %s", date_open);
			xmlfile:close();
			xmlfile = nil;
		end
		if not xmlfile then
			date_open = date;
			local filename = dm.getpath(username .. "@" .. date, self.host, self.store, "xml");
			local ferr;
			xmlfile, ferr = io.open(filename);
			if not xmlfile then
				module:log("error", "Error: %s", ferr);
				return nil, ferr;
			end
			module:log("debug", "Opened XML file %s", filename);
		end
		local pos, err = xmlfile:seek("set", offset);
		if pos ~= offset then
			return nil, err or "seek-failed";
		end
		return xmlfile:read(length);
	end

	return function ()
		if limit and count >= limit then if xmlfile then xmlfile:close() end return; end
		for d = start_day, last_day, step do
			local date = dates[d];
			if not items then
				module:log("debug", "Loading index for %s", date);
				start_day = d;
				items = dm.list_load(username .. "@" .. date, self.host, self.store) or empty;
				if not rev then
					first_item, last_item = 1, #items;
				else
					first_item, last_item = #items, 1;
				end
			end

			local q_with, q_start, q_end = query.with, query.start, query["end"];
			for i = first_item, last_item, step do
				local item = items[i];
				if not item then
					module:log("warn", "data[%q][%d] is nil", date, i);
					break;
				end

				local i_when, i_with = item.when, item.with;

				if type(i_when) == "string" then
					i_when = dt.parse(i_when);
				end
				if type(i_when) ~= "number" then
					module:log("warn", "data[%q][%d].when is invalid", date, i);
					break;
				end

				if  (not q_with or i_with == q_with)
				and (not q_start or i_when >= q_start)
				and (not q_end or i_when <= q_end) then
					count = count + 1;
					first_item = i + step;

					local data = read_xml(date, item.offset, item.length);
					if not data then return end
					local ok, err = stream:feed(data);
					if not ok then
						module:log("warn", "Parse error in %s@%s/%s/%q[%d]: %s", username, self.host, self.store, i, err);
						reset_stream();
					end
					if result then
						local stanza = result;
						result = nil;
						return item.id, stanza, i_when, i_with;
					end
				end
			end
			items = nil;
			if xmlfile then
				xmlfile:close();
				xmlfile = nil;
			end
		end
	end
end

function archive:delete(username, query)
	username = username or "@";
	query = query or empty;
	if query.with or query.start or query.after then
		return nil, "not-implemented"; -- Only trimming the oldest messages
	end
	local before = query.before or query["end"] or "9999-12-31";
	if type(before) == "number" then before = dt.date(before); else before = before:sub(1, 10); end
	local dates, err = self:dates(username);
	if not dates or next(dates) == nil then
		if not err then return true end -- already empty
		return dates, err;
	end
	if dates[1] > before then return true; end -- Nothing to delete
	local remaining_dates = {};
	for d = 1, #dates do
		if dates[d] >= before then
			table.insert(remaining_dates, dates[d]);
		end
	end
	table.sort(remaining_dates);
	local ok, err = dm.list_store(username, self.host, self.store, remaining_dates);
	if not ok then return ok, err; end
	for d = 1, #dates do
		if dates[d] < before then
			os.remove(dm.getpath(username .. "@" .. dates[d], self.host, self.store, "list"));
			os.remove(dm.getpath(username .. "@" .. dates[d], self.host, self.store, "xml"));
		end
	end
	return true;
end

function archive:dates(username)
	module:log("debug", "Loading root index for %s", username);
	local dates, err = dm.list_load(username, self.host, self.store);
	if not dates then return dates, err; end
	assert(type(dates[1]) == "string" and type(dates[#dates]) == "string",
		"Archive does not appear to be in xmlarchive format");
	return dates;
end

function archive:users()
	return dm.users(module.host, self.store, "list");
end

local provider = {};
function provider:open(store, typ)
	if typ ~= "archive" then return nil, "unsupported-store"; end
	return setmetatable({ host = module.host, store = store }, archive_mt);
end

function provider:purge(username)
	local encoded_username = dm.path_encode((username or "@") .. "@");
	local basepath = prosody.paths.data .. "/" .. dm.path_encode(module.host);
	for store in lfs.dir(basepath) do
		store = basepath .. "/" .. dm.path_encode(store);
		if lfs.attributes(store, "mode") == "directory" then
			for file in lfs.dir(store) do
				if file:sub(1, #encoded_username) == encoded_username then
					if file:sub(-4) == ".xml" or file:sub(-5) == ".list" then
						os.remove(store .. "/" .. file);
					end
				end
			end
			return true;
		end
	end
end

module:provides("storage", provider);


function module.command(arg)
	local jid = require "util.jid";
	if arg[1] == "convert" and (arg[2] == "to" or arg[2] == "from") and arg[4] then
		local convert;
		if arg[2] == "to" then
			function convert(user, host, store)
				local dates, err = archive.dates({ host = host, store = store }, user);
				if not dates then assert(not err, err); return end
				assert(dm.list_store(user, host, store, nil));
				for _, date in ipairs(dates) do
					print(date);
					local items = assert(dm.list_load(user .. "@" .. date, host, store));
					local xmlfile = assert(io.open(dm.getpath(user .. "@" .. date, host, store, "xml")));
					for _, item in ipairs(items) do
						assert(xmlfile:seek("set", item.offset));
						local data = assert(xmlfile:read(item.length));
						assert(#data == item.length, "short read");
						data = assert(xml.parse(data));
						data = st.preserialize(data);
						data.key = item.id;
						data.with = item.with;
						data.when = tonumber(item.when) or dt.parse(item.when);
						data.attr.stamp = item.when;
						data.attr.stamp_legacy = dt.legacy(data.when);
						assert(dm.list_append(user, host, store, data));
					end
					assert(os.remove(dm.getpath(user .. "@" .. date, host, store, "list")));
					assert(os.remove(dm.getpath(user .. "@" .. date, host, store, "xml")));
				end
			end
		else -- convert from internal
			function convert(user, host, store)
				local items, err = dm.list_load(user, host, store);
				if not items then assert(not err, err); return end
				local dates = {};
				local dayitems, date, xmlfile;
				for _, item in ipairs(items) do
					local meta = {
						id = item.key;
						with = item.with;
						when = item.when or dt.parse(item.attr.stamp);
					};
					local current_date = dt.date(meta.when);
					if current_date ~= date then
						if xmlfile then
							assert(xmlfile:close());
						end
						if dayitems then
							assert(dm.list_store(user .. "@" .. date, host, store, dayitems));
						end
						print(current_date);
						dayitems = {};
						date = current_date;
						table.insert(dates, date);
						xmlfile = assert(io.open(dm.getpath(user .. "@" .. date, host, store, "xml"), "w"));
					end
					item.attr.stamp, item.attr.stamp_legacy = nil, nil;
					local stanza = tostring(st.deserialize(item)) .. "\n";
					meta.offset, meta.length = xmlfile:seek(), #stanza;
					assert(xmlfile:write(stanza));
					table.insert(dayitems, meta);
				end
				assert(xmlfile:close());
				assert(dm.list_store(user .. "@" .. date, host, store, dayitems));
				assert(dm.list_store(user, host, store, dates));
			end
		end

		local store = arg[4];
		if arg[3] == "internal" then
			for i = 5, #arg do
				local user, host = jid.prepped_split(arg[i]);
				if not user then
					print(string.format("Argument #%d (%q) is an invalid JID, aborting", i, arg[i]));
					os.exit(1);
				end
				convert(user, host, store);
			end
			print("Done");
			return 0;
		else
			print("Currently only conversion to/from mod_storage_internal is supported");
			print("Check out https://modules.prosody.im/mod_migrate");
		end
	end
	print("prosodyctl mod_storage_xmlarchive convert (from|to) internal (archive|archive2|muc_log) user@host");
end