Mercurial > prosody-modules
diff mod_smacks/mod_smacks.lua @ 2596:ffb6646b4253
Implement XEP-0198 revision 1.5.2 and limit number of hibernated sessions per user
Revision 1.5.2 allows sending h-values on resumes that fail due to hibernation timeout
and to send out a smacks ack directly before the stream close tag.
I also made the used timers stoppable even for prosody 0.10 and below, this makes
the smacks-ack-delayed event more useful.
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sun, 05 Mar 2017 20:23:53 +0100 |
parents | d300ae5dba87 |
children | 362ca94192ee |
line wrap: on
line diff
--- a/mod_smacks/mod_smacks.lua Sat Mar 04 19:52:41 2017 +0100 +++ b/mod_smacks/mod_smacks.lua Sun Mar 05 20:23:53 2017 +0100 @@ -12,6 +12,8 @@ -- local st = require "util.stanza"; +local dep = require "util.dependencies"; +local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+ local uuid_generate = require "util.uuid".generate; local t_insert, t_remove = table.insert, table.remove; @@ -35,16 +37,71 @@ local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60); +local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); +local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10); local core_process_stanza = prosody.core_process_stanza; local sessionmanager = require"core.sessionmanager"; local c2s_sessions = module:shared("/*/c2s/sessions"); -local session_registry = {}; + +local function init_session_cache(max_entries, evict_callback) + -- old prosody version < 0.10 (no limiting at all!) + if not cache then + local store = {}; + return { + get = function(user, key) return store[user.."@"..key]; end; + set = function(user, key, value) store[user.."@"..key] = value; end; + }; + end + + -- use per user limited cache for prosody >= 0.10 + local stores = {}; + return { + get = function(user, key) + if not stores[user] then + stores[user] = cache.new(max_entries, evict_callback); + end + return stores[user]:get(key); + end; + set = function(user, key, value) + if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end + stores[user]:set(key, value); + -- remove empty caches completely + if not stores[user]:count() then stores[user] = nil; end + end; + }; +end +local old_session_registry = init_session_cache(max_old_sessions, nil); +local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) + if session.destroyed then return; end + session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); + -- store old session's h values on force delete + -- save only actual h value and username/host (for security) + old_session_registry.set(session.username, resumption_token, { + h = session.handled_stanza_count, + username = session.username, + host = session.host + }); + return true; -- allow session to be removed from full cache to make room for new one +end); + +local function stoppable_timer(delay, callback) + local stopped = false; + return { + stop = function () stopped = true end; + module:add_timer(delay, function (t) + if stopped then return; end + return callback(t); + end); + }; +end local function delayed_ack_function(session) - -- fire event only when configured to do so - if delayed_ack_timeout > 0 and session.awaiting_ack and not (session.outgoing_stanza_queue == nil) then - session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", #session.outgoing_stanza_queue); + -- fire event only if configured to do so and our session is not hibernated or destroyed + if delayed_ack_timeout > 0 and session.awaiting_ack + and not session.hibernating and not session.destroyed then + session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", + session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); end session.delayed_ack_timer = nil; @@ -86,15 +143,17 @@ if #queue > max_unacked_stanzas and session.awaiting_ack == nil then session.log("debug", "Queuing <r> (in a moment)"); session.awaiting_ack = false; - session.awaiting_ack_timer = module:add_timer(1e-06, function () + session.awaiting_ack_timer = stoppable_timer(1e-06, function () if not session.awaiting_ack then session.log("debug", "Sending <r> (inside timer, before send)"); (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) session.log("debug", "Sending <r> (inside timer, after send)"); session.awaiting_ack = true; - session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() - delayed_ack_function(session); - end); + if not session.delayed_ack_timer then + session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() + delayed_ack_function(session); + end); + end end end); end @@ -149,9 +208,14 @@ local session_close = session.close; function session.close(...) if session.resumption_token then - session_registry[session.resumption_token] = nil; + session_registry.set(session.username, session.resumption_token, nil); + old_session_registry.set(session.username, session.resumption_token, nil); session.resumption_token = nil; end + -- send out last ack as per revision 1.5.2 of XEP-0198 + if session.smacks then + (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = tostring(session.handled_stanza_count) })); + end return session_close(...); end return session; @@ -189,7 +253,7 @@ local resume = stanza.attr.resume; if resume == "true" or resume == "1" then resume_token = uuid_generate(); - session_registry[resume_token] = session; + session_registry.set(session.username, resume_token, session); session.resumption_token = resume_token; end (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); @@ -200,7 +264,7 @@ module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) - module:add_timer(1e-6, function () + stoppable_timer(1e-6, function () if can_do_smacks(session) then if stanza:get_child("sm", xmlns_sm3) then session.sends2s(st.stanza("enable", sm3_attr)); @@ -253,7 +317,7 @@ origin.delayed_ack_timer = nil; end -- Remove handled stanzas from outgoing_stanza_queue - log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); + -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); local h = tonumber(stanza.attr.h); if not h then origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; @@ -330,7 +394,13 @@ -- otherwise the session resumed and re-hibernated. and session.hibernating == hibernate_time then session.log("debug", "Destroying session for hibernating too long"); - session_registry[session.resumption_token] = nil; + session_registry.set(session.username, session.resumption_token, nil); + -- save only actual h value and username/host (for security) + old_session_registry.set(session.username, session.resumption_token, { + h = session.handled_stanza_count, + username = session.username, + host = session.host + }); session.resumption_token = nil; sessionmanager.destroy_session(session); else @@ -372,12 +442,21 @@ end local id = stanza.attr.previd; - local original_session = session_registry[id]; + local original_session = session_registry.get(session.username, id); if not original_session then session.log("debug", "Tried to resume non-existent session with id %s", id); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); + local old_session = old_session_registry.get(session.username, id); + if old_session and session.username == old_session.username + and session.host == old_session.host + and old_session.h then + session.send(st.stanza("failed", { xmlns = xmlns_sm, h = tostring(old_session.h) }) + :tag("item-not-found", { xmlns = xmlns_errors }) + ); + else + session.send(st.stanza("failed", { xmlns = xmlns_sm }) + :tag("item-not-found", { xmlns = xmlns_errors }) + ); + end; elseif session.username == original_session.username and session.host == original_session.host then session.log("debug", "mod_smacks resuming existing session..."); @@ -448,9 +527,11 @@ session.awaiting_ack = false; (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); session.awaiting_ack = true; - session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() - delayed_ack_function(session); - end); + if not session.delayed_ack_timer then + session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() + delayed_ack_function(session); + end); + end return true; end end