view mod_smacks/mod_smacks.lua @ 735:c1b0f0c33c6a

mod_archive: Fix hour offset in stored message date os.date expect a timestamp in local time, that is subject to daylight saving. But since we pass an UTC timestamp to os.date one hour is (wrongly) added in the summer. The only sensible thing is to call the os.date only once with the ! parametter. And then parsing this sting to get the utc_timestamp. Calling os.date with an UTC timestamp is not possible, and calling os.date twice without timestamp could give different results.
author Olivier Goffart <ogoffart@woboq.com>
date Wed, 04 Jul 2012 13:49:57 +0200
parents 842a8a3b0d81
children 713c6791fbcc
line wrap: on
line source

local st = require "util.stanza";
local uuid_generate = require "util.uuid".generate;

local t_insert, t_remove = table.insert, table.remove;
local math_min = math.min;
local os_time = os.time;
local tonumber, tostring = tonumber, tostring;
local add_filter = require "util.filters".add_filter;
local timer = require "util.timer";
local datetime = require "util.datetime";

local xmlns_sm = "urn:xmpp:sm:2";
local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
local xmlns_delay = "urn:xmpp:delay";

local sm_attr = { xmlns = xmlns_sm };

local resume_timeout = module:get_option_number("smacks_hibernation_time", 300);
local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);

local c2s_sessions = module:shared("/*/c2s/sessions");
local session_registry = {};

local function can_do_smacks(session, advertise_only)
	if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end
	
	local session_type = session.type;
	if session_type == "c2s" then
		if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm
			return false, "unexpected-request", "Client must bind a resource before enabling stream management";
		end
		return true;
	elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then
		return true;
	end
	return false, "service-unavailable", "Stream management is not available for this stream";
end

module:hook("stream-features",
		function (event)
			if can_do_smacks(event.origin, true) then
				event.features:tag("sm", sm_attr):tag("optional"):up():up();
			end
		end);

module:hook("s2s-stream-features",
		function (event)
			if can_do_smacks(event.origin, true) then
				event.features:tag("sm", sm_attr):tag("optional"):up():up();
			end
		end);

module:hook_stanza("http://etherx.jabber.org/streams", "features",
		function (session, stanza)
			if can_do_smacks(session) and stanza:get_child("sm", xmlns_sm) then
				session.sends2s(st.stanza("enable", sm_attr));
			end
end);

local function wrap_session(session, resume)
	-- Overwrite process_stanza() and send()
	local queue;
	if not resume then
		queue = {};
		session.outgoing_stanza_queue = queue;
		session.last_acknowledged_stanza = 0;
	else
		queue = session.outgoing_stanza_queue;
	end
	
	local _send = session.sends2s or session.send;
	local function new_send(stanza)
		local attr = stanza.attr;
		if attr and not attr.xmlns then -- Stanza in default stream namespace
			local cached_stanza = st.clone(stanza);
			
			if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
				cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
			end
			
			queue[#queue+1] = cached_stanza;
		end
		local ok, err = _send(stanza);
		if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
			session.awaiting_ack = true;
			return _send(st.stanza("r", sm_attr));
		end
		return ok, err;
	end
	
	if session.sends2s then
		session.sends2s = new_send;
	else
		session.send = new_send;
	end
	
	if not resume then
		session.handled_stanza_count = 0;
		add_filter(session, "stanzas/in", function (stanza)
			if not stanza.attr.xmlns then
				session.handled_stanza_count = session.handled_stanza_count + 1;
				session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
			end
			return stanza;
		end);
	end

	return session;
end

module:hook_stanza(xmlns_sm, "enable", function (session, stanza)
	local ok, err, err_text = can_do_smacks(session);
	if not ok then
		session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it?
		session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
		return true;
	end

	module:log("debug", "Enabling stream management");
	session.smacks = true;
	
	wrap_session(session);
	
	local resume_token;
	local resume = stanza.attr.resume;
	if resume == "true" or resume == "1" then
		resume_token = uuid_generate();
		session_registry[resume_token] = session;
		session.resumption_token = resume_token;
	end
	(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
	return true;
end, 100);

module:hook_stanza(xmlns_sm, "enabled", function (session, stanza)
	module:log("debug", "Enabling stream management");
	session.smacks = true;
	
	wrap_session(session);

	-- FIXME Resume?
	
	return true;
end, 100);

module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
	if not origin.smacks then
		module:log("debug", "Received ack request from non-smack-enabled session");
		return;
	end
	module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
	-- Reply with <a>
	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
	return true;
end);

module:hook_stanza(xmlns_sm, "a", function (origin, stanza)
	if not origin.smacks then return; end
	origin.awaiting_ack = nil;
	-- Remove handled stanzas from outgoing_stanza_queue
	--log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
	local queue = origin.outgoing_stanza_queue;
	if handled_stanza_count > #queue then
		module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
			handled_stanza_count, #queue);
		module:log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza);
		for i=1,#queue do
			module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
		end
	end
	for i=1,math_min(handled_stanza_count,#queue) do
		t_remove(origin.outgoing_stanza_queue, 1);
	end
	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
	return true;
end);

--TODO: Optimise... incoming stanzas should be handled by a per-session
-- function that has a counter as an upvalue (no table indexing for increments,
-- and won't slow non-198 sessions). We can also then remove the .handled flag
-- on stanzas

function handle_unacked_stanzas(session)
	local queue = session.outgoing_stanza_queue;
	local error_attr = { type = "cancel" };
	if #queue > 0 then
		session.outgoing_stanza_queue = {};
		for i=1,#queue do
			local reply = st.reply(queue[i]);
			if reply.attr.to ~= session.full_jid then
				reply.attr.type = "error";
				reply:tag("error", error_attr)
					:tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
				core_process_stanza(session, reply);
			end
		end
	end
end

module:hook("pre-resource-unbind", function (event)
	local session, err = event.session, event.error;
	if session.smacks and err ~= "session closed" then
		if not session.resumption_token then
			local queue = session.outgoing_stanza_queue;
			if #queue > 0 then
				module:log("warn", "Destroying session with %d unacked stanzas:", #queue);
				for i=1,#queue do
					module:log("warn", "::%s", tostring(queue[i]));
				end
				handle_unacked_stanzas(session);
			end
		else
			session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout);
			local hibernate_time = os_time(); -- Track the time we went into hibernation
			session.hibernating = hibernate_time;
			local resumption_token = session.resumption_token;
			timer.add_task(resume_timeout, function ()
				session.log("debug", "mod_smacks hibernation timeout reached...");
				-- We need to check the current resumption token for this resource
				-- matches the smacks session this timer is for in case it changed
				-- (for example, the client may have bound a new resource and
				-- started a new smacks session, or not be using smacks)
				local curr_session = hosts[session.host].sessions[session.username].sessions[session.resource];
				if curr_session.resumption_token == resumption_token
				-- Check the hibernate time still matches what we think it is,
				-- otherwise the session resumed and re-hibernated.
				and session.hibernating == hibernate_time then
					session.log("debug", "Destroying session for hibernating too long");
					session_registry[session.resumption_token] = nil;
					session.resumption_token = nil;
					sessionmanager.destroy_session(session);
				else
					session.log("debug", "Session resumed before hibernation timeout, all is well")
				end
			end);
			return true; -- Postpone destruction for now
		end
		
	end
end);

module:hook_stanza(xmlns_sm, "resume", function (session, stanza)
	local id = stanza.attr.previd;
	local original_session = session_registry[id];
	if not original_session then
		session.log("debug", "Tried to resume non-existent session with id %s", id);
		session.send(st.stanza("failed", sm_attr)
			:tag("item-not-found", { xmlns = xmlns_errors })
		);
	elseif session.username == original_session.username
	and session.host == original_session.host then
		session.log("debug", "mod_smacks resuming existing session...");
		-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
		if original_session.conn then
			session.log("debug", "mod_smacks closing an old connection for this session");
			local conn = original_session.conn;
			c2s_sessions[conn] = nil;
			conn:close();
		end
		original_session.ip = session.ip;
		original_session.conn = session.conn;
		original_session.send = session.send;
		original_session.stream = session.stream;
		original_session.secure = session.secure;
		original_session.hibernating = nil;
		local filter = original_session.filter;
		local stream = session.stream;
		local log = session.log;
		function original_session.data(data)
			data = filter("bytes/in", data);
			if data then
				local ok, err = stream:feed(data);
				if ok then return; end
				log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
				original_session:close("xml-not-well-formed");
			end
		end
		wrap_session(original_session, true);
		-- Inform xmppstream of the new session (passed to its callbacks)
		stream:set_session(original_session);
		-- Similar for connlisteners
		c2s_sessions[session.conn] = original_session;

		session.send(st.stanza("resumed", { xmlns = xmlns_sm,
			h = original_session.handled_stanza_count, previd = id }));
		
		-- Fake an <a> with the h of the <resume/> from the client
		original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
			h = stanza.attr.h }));
		
		-- Ok, we need to re-send any stanzas that the client didn't see
		-- ...they are what is now left in the outgoing stanza queue
		local queue = original_session.outgoing_stanza_queue;
		for i=1,#queue do
			session.send(queue[i]);
		end
	else
		log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
			session.username or "?", session.host or "?", session.type,
			original_session.username or "?", original_session.host or "?", original_session.type);
		session.send(st.stanza("failed", sm_attr)
			:tag("not-authorized", { xmlns = xmlns_errors }));
	end
	return true;
end);