diff mod_smacks/mod_smacks.lua @ 2596:ffb6646b4253

Implement XEP-0198 revision 1.5.2 and limit number of hibernated sessions per user Revision 1.5.2 allows sending h-values on resumes that fail due to hibernation timeout and to send out a smacks ack directly before the stream close tag. I also made the used timers stoppable even for prosody 0.10 and below, this makes the smacks-ack-delayed event more useful.
author tmolitor <thilo@eightysoft.de>
date Sun, 05 Mar 2017 20:23:53 +0100
parents d300ae5dba87
children 362ca94192ee
line wrap: on
line diff
--- a/mod_smacks/mod_smacks.lua	Sat Mar 04 19:52:41 2017 +0100
+++ b/mod_smacks/mod_smacks.lua	Sun Mar 05 20:23:53 2017 +0100
@@ -12,6 +12,8 @@
 --
 
 local st = require "util.stanza";
+local dep = require "util.dependencies";
+local cache = dep.softreq("util.cache");	-- only available in prosody 0.10+
 local uuid_generate = require "util.uuid".generate;
 
 local t_insert, t_remove = table.insert, table.remove;
@@ -35,16 +37,71 @@
 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60);
+local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
+local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
 local core_process_stanza = prosody.core_process_stanza;
 local sessionmanager = require"core.sessionmanager";
 
 local c2s_sessions = module:shared("/*/c2s/sessions");
-local session_registry = {};
+
+local function init_session_cache(max_entries, evict_callback)
+	-- old prosody version < 0.10 (no limiting at all!)
+	if not cache then
+		local store = {};
+		return {
+			get = function(user, key) return store[user.."@"..key]; end;
+			set = function(user, key, value) store[user.."@"..key] = value; end;
+		};
+	end
+	
+	-- use per user limited cache for prosody >= 0.10
+	local stores = {};
+	return {
+			get = function(user, key)
+				if not stores[user] then
+					stores[user] = cache.new(max_entries, evict_callback);
+				end
+				return stores[user]:get(key);
+			end;
+			set = function(user, key, value)
+				if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end
+				stores[user]:set(key, value);
+				-- remove empty caches completely
+				if not stores[user]:count() then stores[user] = nil; end
+			end;
+		};
+end
+local old_session_registry = init_session_cache(max_old_sessions, nil);
+local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session)
+	if session.destroyed then return; end
+	session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token);
+	-- store old session's h values on force delete
+	-- save only actual h value and username/host (for security)
+	old_session_registry.set(session.username, resumption_token, {
+		h = session.handled_stanza_count,
+		username = session.username,
+		host = session.host
+	});
+	return true;	-- allow session to be removed from full cache to make room for new one
+end);
+
+local function stoppable_timer(delay, callback)
+	local stopped = false;
+	return {
+		stop = function () stopped = true end;
+		module:add_timer(delay, function (t)
+			if stopped then return; end
+			return callback(t);
+		end);
+	};
+end
 
 local function delayed_ack_function(session)
-	-- fire event only when configured to do so
-	if delayed_ack_timeout > 0 and session.awaiting_ack and not (session.outgoing_stanza_queue == nil) then
-		session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", #session.outgoing_stanza_queue);
+	-- fire event only if configured to do so and our session is not hibernated or destroyed
+	if delayed_ack_timeout > 0 and session.awaiting_ack
+	and not session.hibernating and not session.destroyed then
+		session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
+			session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
 		module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue});
 	end
 	session.delayed_ack_timer = nil;
@@ -86,15 +143,17 @@
 	if #queue > max_unacked_stanzas and session.awaiting_ack == nil then
 		session.log("debug", "Queuing <r> (in a moment)");
 		session.awaiting_ack = false;
-		session.awaiting_ack_timer = module:add_timer(1e-06, function ()
+		session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
 			if not session.awaiting_ack then
 				session.log("debug", "Sending <r> (inside timer, before send)");
 				(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
 				session.log("debug", "Sending <r> (inside timer, after send)");
 				session.awaiting_ack = true;
-				session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
-					delayed_ack_function(session);
-				end);
+				if not session.delayed_ack_timer then
+					session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
+						delayed_ack_function(session);
+					end);
+				end
 			end
 		end);
 	end
@@ -149,9 +208,14 @@
 	local session_close = session.close;
 	function session.close(...)
 		if session.resumption_token then
-			session_registry[session.resumption_token] = nil;
+			session_registry.set(session.username, session.resumption_token, nil);
+			old_session_registry.set(session.username, session.resumption_token, nil);
 			session.resumption_token = nil;
 		end
+		-- send out last ack as per revision 1.5.2 of XEP-0198
+		if session.smacks then
+			(session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = tostring(session.handled_stanza_count) }));
+		end
 		return session_close(...);
 	end
 	return session;
@@ -189,7 +253,7 @@
 	local resume = stanza.attr.resume;
 	if resume == "true" or resume == "1" then
 		resume_token = uuid_generate();
-		session_registry[resume_token] = session;
+		session_registry.set(session.username, resume_token, session);
 		session.resumption_token = resume_token;
 	end
 	(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
@@ -200,7 +264,7 @@
 
 module:hook_stanza("http://etherx.jabber.org/streams", "features",
 		function (session, stanza)
-			module:add_timer(1e-6, function ()
+			stoppable_timer(1e-6, function ()
 				if can_do_smacks(session) then
 					if stanza:get_child("sm", xmlns_sm3) then
 						session.sends2s(st.stanza("enable", sm3_attr));
@@ -253,7 +317,7 @@
 		origin.delayed_ack_timer = nil;
 	end
 	-- Remove handled stanzas from outgoing_stanza_queue
-	log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
+	-- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
 	local h = tonumber(stanza.attr.h);
 	if not h then
 		origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
@@ -330,7 +394,13 @@
 				-- otherwise the session resumed and re-hibernated.
 				and session.hibernating == hibernate_time then
 					session.log("debug", "Destroying session for hibernating too long");
-					session_registry[session.resumption_token] = nil;
+					session_registry.set(session.username, session.resumption_token, nil);
+					-- save only actual h value and username/host (for security)
+					old_session_registry.set(session.username, session.resumption_token, {
+						h = session.handled_stanza_count,
+						username = session.username,
+						host = session.host
+					});
 					session.resumption_token = nil;
 					sessionmanager.destroy_session(session);
 				else
@@ -372,12 +442,21 @@
 	end
 
 	local id = stanza.attr.previd;
-	local original_session = session_registry[id];
+	local original_session = session_registry.get(session.username, id);
 	if not original_session then
 		session.log("debug", "Tried to resume non-existent session with id %s", id);
-		session.send(st.stanza("failed", { xmlns = xmlns_sm })
-			:tag("item-not-found", { xmlns = xmlns_errors })
-		);
+		local old_session = old_session_registry.get(session.username, id);
+		if old_session and session.username == old_session.username
+		and session.host == old_session.host
+		and old_session.h then
+			session.send(st.stanza("failed", { xmlns = xmlns_sm, h = tostring(old_session.h) })
+				:tag("item-not-found", { xmlns = xmlns_errors })
+			);
+		else
+			session.send(st.stanza("failed", { xmlns = xmlns_sm })
+				:tag("item-not-found", { xmlns = xmlns_errors })
+			);
+		end;
 	elseif session.username == original_session.username
 	and session.host == original_session.host then
 		session.log("debug", "mod_smacks resuming existing session...");
@@ -448,9 +527,11 @@
 		session.awaiting_ack = false;
 		(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
 		session.awaiting_ack = true;
-		session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
-			delayed_ack_function(session);
-		end);
+		if not session.delayed_ack_timer then
+			session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
+				delayed_ack_function(session);
+			end);
+		end
 		return true;
 	end
 end