view mod_storage_xmlarchive/mod_storage_xmlarchive.lua @ 2712:d89ab70808f6

mod_cloud_notify: fix bug when multiple resources are used This bug was triggered under the rare circumstances that a message arrived and one resource was smacks hibernated while the other one(s) were offline. Then only the hibernated resource but not the offline one(s) (or the other way round) got notified.
author tmolitor <thilo@eightysoft.de>
date Mon, 08 May 2017 18:24:29 +0200
parents 1b081c8fc1d9
children e5ce64aee4ac
line wrap: on
line source

-- mod_storage_xmlarchive
-- Copyright (C) 2015-2017 Kim Alvefur
--
-- This file is MIT/X11 licensed.
--
-- luacheck: ignore unused self

local lfs = require "lfs";
local dm = require "core.storagemanager".olddm;
local hmac_sha256 = require"util.hashes".hmac_sha256;
local st = require"util.stanza";
local dt = require"util.datetime";
local new_stream = require "util.xmppstream".new;
local empty = {};

if not dm.append_raw then
	module:require"datamanager_append_raw";
end

local archive = {};
local archive_mt = { __index = archive };

local is_stanza = st.is_stanza or function (s)
	return getmetatable(s) == st.stanza_mt;
end

function archive:append(username, _, data, when, with)
	if type(when) ~= "number" then
		when, with, data = data, when, with;
	end
	if not is_stanza(data) then
		module:log("error", "Attempt to store non-stanza object, traceback: %s", debug.traceback());
		return nil, "unsupported-datatype";
	end

	username = username or "@";
	data = tostring(data) .. "\n";

	local day = dt.date(when);
	local ok, err = dm.append_raw(username.."@"..day, module.host, self.store, "xml", data);
	if not ok then
		return nil, err;
	end

	-- If the day-file is missing then we need to add it to the list of days
	local first_of_day = not lfs.attributes(dm.getpath(username .. "@" .. day, module.host, self.store, "list"));

	local offset = ok and err or 0;

	local id = day .. "-" .. hmac_sha256(username.."@"..day.."+"..offset, data, true):sub(-16);
	ok, err = dm.list_append(username.."@"..day, module.host, self.store,
		{ id = id, when = dt.datetime(when), with = with, offset = offset, length = #data });
	if ok and first_of_day then
		ok, err = dm.list_append(username, module.host, self.store, day);
	end
	if not ok then
		return nil, err;
	end
	return id;
end

function archive:_get_idx(username, id, dates)
	dates = dates or self:dates(username) or empty;
	local date = id:sub(1, 10);
	for d = 1, #dates do
		if date == dates[d] then
			module:log("debug", "Loading items from %s", dates[d]);
			local items = dm.list_load(username .. "@" .. dates[d], module.host, self.store) or empty;
			for i = 1, #items do
				if items[i].id == id then
					return d, i, items;
				end
			end
			return; -- Assuming no duplicates
		elseif date < dates[d] then
			return; -- List is assumed to be sorted
		end
	end
end

function archive:find(username, query)
	username = username or "@";
	query = query or empty;

	local result;
	local function cb(_, stanza)
		assert(not result, "Multiple items in chunk");
		result = stanza;
	end

	local stream_session = { notopen = true };
	local stream_callbacks = { handlestanza = cb, stream_ns = "jabber:client", default_ns = "jabber:client" };
	local stream = new_stream(stream_session, stream_callbacks);
	local dates = self:dates(username) or empty;
	local function reset_stream()
		stream:reset();
		stream_session.notopen = true;
		stream:feed(st.stanza("stream", { xmlns = "jabber:client" }):top_tag());
		stream_session.notopen = nil;
	end
	reset_stream();

	local limit = query.limit;
	local start_day, step, last_day = 1, 1, #dates;
	local count = 0;
	local rev = query.reverse;
	if query.start then
		local d = dt.date(query.start);
		for i = 1, #dates do
			if dates[i] == d then
				start_day = i; break;
			end
		end
	end
	if query["end"] then
		local d = dt.date(query["end"]);
		for i = #dates, 1, -1 do
			if dates[i] == d then
				last_day = i; break;
			end
		end
	end
	local items;
	local first_item, last_item;
	if rev then
		start_day, step, last_day = last_day, -step, start_day;
		if query.before then
			local before_day, before_item, items_ = self:_get_idx(username, query.before, dates);
			if before_day and before_day <= start_day then
				if before_item then
					first_item = before_item - 1;
				else
					first_item = #items_;
				end
				last_item = 1;
				start_day = before_day;
				items = items_;
			end
		end
	elseif query.after then
		local after_day, after_item, items_ = self:_get_idx(username, query.after, dates);
		if after_day and after_day >= start_day then
			if after_item then
				first_item = after_item + 1;
			else
				first_item = 1;
			end
			last_item = #items_;
			start_day = after_day;
			items = items_;
		end
	end

	local date_open, xmlfile;
	local function read_xml(date, offset, length)
		if xmlfile and date ~= date_open then
			xmlfile:close();
			xmlfile = nil;
		end
		if not xmlfile then
			date_open = date;
			local filename = dm.getpath(username .. "@" .. date, module.host, self.store, "xml");
			local ferr;
			xmlfile, ferr = io.open(filename);
			if not xmlfile then
				module:log("error", "Error: %s", ferr);
				return nil, ferr;
			end
		end
		local pos, err = xmlfile:seek("set", offset);
		if pos ~= offset then
			return nil, err or "seek-failed";
		end
		return xmlfile:read(length);
	end

	return function ()
		if limit and count >= limit then if xmlfile then xmlfile:close() end return; end
		for d = start_day, last_day, step do
			local date = dates[d];
			if not items then
				module:log("debug", "Loading items from %s", date);
				start_day = d;
				items = dm.list_load(username .. "@" .. date, module.host, self.store) or empty;
				if not rev then
					first_item, last_item = 1, #items;
				else
					first_item, last_item = #items, 1;
				end
			end

			local q_with, q_start, q_end = query.with, query.start, query["end"];
			for i = first_item, last_item, step do
				local item = items[i];
				if not item then
					module:log("warn", "data[%q][%d] is nil", date, i);
					break;
				end

				local i_when, i_with = item.when, item.with;

				if type(i_when) == "string" then
					i_when = dt.parse(i_when);
				end
				if type(i_when) ~= "number" then
					module:log("warn", "data[%q][%d].when is invalid", date, i);
					break;
				end

				if  (not q_with or i_with == q_with)
				and (not q_start or i_when >= q_start)
				and (not q_end or i_when <= q_end) then
					count = count + 1;
					first_item = i + step;

					local data = read_xml(date, item.offset, item.length);
					if not data then return end
					local ok, err = stream:feed(data);
					if not ok then
						module:log("warn", "Parse error in %s@%s/%s/%q[%d]: %s", username, module.host, self.store, i, err);
						reset_stream();
					end
					if result then
						local stanza = result;
						result = nil;
						return item.id, stanza, i_when, i_with;
					end
				end
			end
			items = nil;
			if xmlfile then
				xmlfile:close();
				xmlfile = nil;
			end
		end
	end
end

function archive:delete(username, query)
	username = username or "@";
	query = query or empty;
	if query.with or query.start or query.after then
		return nil, "not-implemented"; -- Only trimming the oldest messages
	end
	local before = query.before or query["end"] or "9999-12-31";
	if type(before) == "number" then before = dt.date(before); else before = before:sub(1, 10); end
	local dates, err = self:dates(username);
	if not dates or next(dates) == nil then
		if not err then return true end -- already empty
		return dates, err;
	end
	if dates[1] > before then return true; end -- Nothing to delete
	local remaining_dates = {};
	for d = 1, #dates do
		if dates[d] >= before then
			table.insert(remaining_dates, dates[d]);
		end
	end
	table.sort(remaining_dates);
	local ok, err = dm.list_store(username, module.host, self.store, remaining_dates);
	if not ok then return ok, err; end
	for d = 1, #dates do
		if dates[d] < before then
			os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "list"));
			os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "xml"));
		end
	end
	return true;
end

function archive:dates(username)
	return dm.list_load(username, module.host, self.store);
end

local provider = {};
function provider:open(store, typ)
	if typ ~= "archive" then return nil, "unsupported-store"; end
	return setmetatable({ store = store }, archive_mt);
end

function provider:purge(username)
	local encoded_username = dm.path_encode((username or "@") .. "@");
	local basepath = prosody.paths.data .. "/" .. dm.path_encode(module.host);
	for store in lfs.dir(basepath) do
		store = basepath .. "/" .. dm.path_encode(store);
		if lfs.attributes(store, "mode") == "directory" then
			for file in lfs.dir(store) do
				if file:sub(1, #encoded_username) == encoded_username then
					if file:sub(-4) == ".xml" or file:sub(-5) == ".list" then
						os.remove(store .. "/" .. file);
					end
				end
			end
			return true;
		end
	end
end

module:provides("storage", provider);