comparison mod_smacks/mod_smacks.lua @ 2596:ffb6646b4253

Implement XEP-0198 revision 1.5.2 and limit number of hibernated sessions per user Revision 1.5.2 allows sending h-values on resumes that fail due to hibernation timeout and to send out a smacks ack directly before the stream close tag. I also made the used timers stoppable even for prosody 0.10 and below, this makes the smacks-ack-delayed event more useful.
author tmolitor <thilo@eightysoft.de>
date Sun, 05 Mar 2017 20:23:53 +0100
parents d300ae5dba87
children 362ca94192ee
comparison
equal deleted inserted replaced
2595:307ddebb72e1 2596:ffb6646b4253
10 -- This project is MIT/X11 licensed. Please see the 10 -- This project is MIT/X11 licensed. Please see the
11 -- COPYING file in the source package for more information. 11 -- COPYING file in the source package for more information.
12 -- 12 --
13 13
14 local st = require "util.stanza"; 14 local st = require "util.stanza";
15 local dep = require "util.dependencies";
16 local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+
15 local uuid_generate = require "util.uuid".generate; 17 local uuid_generate = require "util.uuid".generate;
16 18
17 local t_insert, t_remove = table.insert, table.remove; 19 local t_insert, t_remove = table.insert, table.remove;
18 local math_min = math.min; 20 local math_min = math.min;
19 local os_time = os.time; 21 local os_time = os.time;
33 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300); 35 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300);
34 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false); 36 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
35 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); 37 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
36 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); 38 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
37 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60); 39 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60);
40 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
41 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
38 local core_process_stanza = prosody.core_process_stanza; 42 local core_process_stanza = prosody.core_process_stanza;
39 local sessionmanager = require"core.sessionmanager"; 43 local sessionmanager = require"core.sessionmanager";
40 44
41 local c2s_sessions = module:shared("/*/c2s/sessions"); 45 local c2s_sessions = module:shared("/*/c2s/sessions");
42 local session_registry = {}; 46
47 local function init_session_cache(max_entries, evict_callback)
48 -- old prosody version < 0.10 (no limiting at all!)
49 if not cache then
50 local store = {};
51 return {
52 get = function(user, key) return store[user.."@"..key]; end;
53 set = function(user, key, value) store[user.."@"..key] = value; end;
54 };
55 end
56
57 -- use per user limited cache for prosody >= 0.10
58 local stores = {};
59 return {
60 get = function(user, key)
61 if not stores[user] then
62 stores[user] = cache.new(max_entries, evict_callback);
63 end
64 return stores[user]:get(key);
65 end;
66 set = function(user, key, value)
67 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end
68 stores[user]:set(key, value);
69 -- remove empty caches completely
70 if not stores[user]:count() then stores[user] = nil; end
71 end;
72 };
73 end
74 local old_session_registry = init_session_cache(max_old_sessions, nil);
75 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session)
76 if session.destroyed then return; end
77 session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token);
78 -- store old session's h values on force delete
79 -- save only actual h value and username/host (for security)
80 old_session_registry.set(session.username, resumption_token, {
81 h = session.handled_stanza_count,
82 username = session.username,
83 host = session.host
84 });
85 return true; -- allow session to be removed from full cache to make room for new one
86 end);
87
88 local function stoppable_timer(delay, callback)
89 local stopped = false;
90 return {
91 stop = function () stopped = true end;
92 module:add_timer(delay, function (t)
93 if stopped then return; end
94 return callback(t);
95 end);
96 };
97 end
43 98
44 local function delayed_ack_function(session) 99 local function delayed_ack_function(session)
45 -- fire event only when configured to do so 100 -- fire event only if configured to do so and our session is not hibernated or destroyed
46 if delayed_ack_timeout > 0 and session.awaiting_ack and not (session.outgoing_stanza_queue == nil) then 101 if delayed_ack_timeout > 0 and session.awaiting_ack
47 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", #session.outgoing_stanza_queue); 102 and not session.hibernating and not session.destroyed then
103 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
104 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
48 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); 105 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue});
49 end 106 end
50 session.delayed_ack_timer = nil; 107 session.delayed_ack_timer = nil;
51 end 108 end
52 109
84 local function request_ack_if_needed(session) 141 local function request_ack_if_needed(session)
85 local queue = session.outgoing_stanza_queue; 142 local queue = session.outgoing_stanza_queue;
86 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then 143 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then
87 session.log("debug", "Queuing <r> (in a moment)"); 144 session.log("debug", "Queuing <r> (in a moment)");
88 session.awaiting_ack = false; 145 session.awaiting_ack = false;
89 session.awaiting_ack_timer = module:add_timer(1e-06, function () 146 session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
90 if not session.awaiting_ack then 147 if not session.awaiting_ack then
91 session.log("debug", "Sending <r> (inside timer, before send)"); 148 session.log("debug", "Sending <r> (inside timer, before send)");
92 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 149 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
93 session.log("debug", "Sending <r> (inside timer, after send)"); 150 session.log("debug", "Sending <r> (inside timer, after send)");
94 session.awaiting_ack = true; 151 session.awaiting_ack = true;
95 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() 152 if not session.delayed_ack_timer then
96 delayed_ack_function(session); 153 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
97 end); 154 delayed_ack_function(session);
155 end);
156 end
98 end 157 end
99 end); 158 end);
100 end 159 end
101 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue 160 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
102 -- and there isn't already a timer for this event running. 161 -- and there isn't already a timer for this event running.
147 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); 206 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
148 207
149 local session_close = session.close; 208 local session_close = session.close;
150 function session.close(...) 209 function session.close(...)
151 if session.resumption_token then 210 if session.resumption_token then
152 session_registry[session.resumption_token] = nil; 211 session_registry.set(session.username, session.resumption_token, nil);
212 old_session_registry.set(session.username, session.resumption_token, nil);
153 session.resumption_token = nil; 213 session.resumption_token = nil;
214 end
215 -- send out last ack as per revision 1.5.2 of XEP-0198
216 if session.smacks then
217 (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = tostring(session.handled_stanza_count) }));
154 end 218 end
155 return session_close(...); 219 return session_close(...);
156 end 220 end
157 return session; 221 return session;
158 end 222 end
187 251
188 local resume_token; 252 local resume_token;
189 local resume = stanza.attr.resume; 253 local resume = stanza.attr.resume;
190 if resume == "true" or resume == "1" then 254 if resume == "true" or resume == "1" then
191 resume_token = uuid_generate(); 255 resume_token = uuid_generate();
192 session_registry[resume_token] = session; 256 session_registry.set(session.username, resume_token, session);
193 session.resumption_token = resume_token; 257 session.resumption_token = resume_token;
194 end 258 end
195 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); 259 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
196 return true; 260 return true;
197 end 261 end
198 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); 262 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
199 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); 263 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
200 264
201 module:hook_stanza("http://etherx.jabber.org/streams", "features", 265 module:hook_stanza("http://etherx.jabber.org/streams", "features",
202 function (session, stanza) 266 function (session, stanza)
203 module:add_timer(1e-6, function () 267 stoppable_timer(1e-6, function ()
204 if can_do_smacks(session) then 268 if can_do_smacks(session) then
205 if stanza:get_child("sm", xmlns_sm3) then 269 if stanza:get_child("sm", xmlns_sm3) then
206 session.sends2s(st.stanza("enable", sm3_attr)); 270 session.sends2s(st.stanza("enable", sm3_attr));
207 session.smacks = xmlns_sm3; 271 session.smacks = xmlns_sm3;
208 elseif stanza:get_child("sm", xmlns_sm2) then 272 elseif stanza:get_child("sm", xmlns_sm2) then
251 if origin.delayed_ack_timer then 315 if origin.delayed_ack_timer then
252 origin.delayed_ack_timer:stop(); 316 origin.delayed_ack_timer:stop();
253 origin.delayed_ack_timer = nil; 317 origin.delayed_ack_timer = nil;
254 end 318 end
255 -- Remove handled stanzas from outgoing_stanza_queue 319 -- Remove handled stanzas from outgoing_stanza_queue
256 log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); 320 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
257 local h = tonumber(stanza.attr.h); 321 local h = tonumber(stanza.attr.h);
258 if not h then 322 if not h then
259 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; 323 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
260 end 324 end
261 local handled_stanza_count = h-origin.last_acknowledged_stanza; 325 local handled_stanza_count = h-origin.last_acknowledged_stanza;
328 elseif curr_session and curr_session.resumption_token == resumption_token 392 elseif curr_session and curr_session.resumption_token == resumption_token
329 -- Check the hibernate time still matches what we think it is, 393 -- Check the hibernate time still matches what we think it is,
330 -- otherwise the session resumed and re-hibernated. 394 -- otherwise the session resumed and re-hibernated.
331 and session.hibernating == hibernate_time then 395 and session.hibernating == hibernate_time then
332 session.log("debug", "Destroying session for hibernating too long"); 396 session.log("debug", "Destroying session for hibernating too long");
333 session_registry[session.resumption_token] = nil; 397 session_registry.set(session.username, session.resumption_token, nil);
398 -- save only actual h value and username/host (for security)
399 old_session_registry.set(session.username, session.resumption_token, {
400 h = session.handled_stanza_count,
401 username = session.username,
402 host = session.host
403 });
334 session.resumption_token = nil; 404 session.resumption_token = nil;
335 sessionmanager.destroy_session(session); 405 sessionmanager.destroy_session(session);
336 else 406 else
337 session.log("debug", "Session resumed before hibernation timeout, all is well") 407 session.log("debug", "Session resumed before hibernation timeout, all is well")
338 end 408 end
370 ); 440 );
371 return true; 441 return true;
372 end 442 end
373 443
374 local id = stanza.attr.previd; 444 local id = stanza.attr.previd;
375 local original_session = session_registry[id]; 445 local original_session = session_registry.get(session.username, id);
376 if not original_session then 446 if not original_session then
377 session.log("debug", "Tried to resume non-existent session with id %s", id); 447 session.log("debug", "Tried to resume non-existent session with id %s", id);
378 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 448 local old_session = old_session_registry.get(session.username, id);
379 :tag("item-not-found", { xmlns = xmlns_errors }) 449 if old_session and session.username == old_session.username
380 ); 450 and session.host == old_session.host
451 and old_session.h then
452 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = tostring(old_session.h) })
453 :tag("item-not-found", { xmlns = xmlns_errors })
454 );
455 else
456 session.send(st.stanza("failed", { xmlns = xmlns_sm })
457 :tag("item-not-found", { xmlns = xmlns_errors })
458 );
459 end;
381 elseif session.username == original_session.username 460 elseif session.username == original_session.username
382 and session.host == original_session.host then 461 and session.host == original_session.host then
383 session.log("debug", "mod_smacks resuming existing session..."); 462 session.log("debug", "mod_smacks resuming existing session...");
384 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) 463 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
385 if original_session.conn then 464 if original_session.conn then
446 end 525 end
447 session.log("debug", "Sending <r> (read timeout)"); 526 session.log("debug", "Sending <r> (read timeout)");
448 session.awaiting_ack = false; 527 session.awaiting_ack = false;
449 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); 528 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
450 session.awaiting_ack = true; 529 session.awaiting_ack = true;
451 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() 530 if not session.delayed_ack_timer then
452 delayed_ack_function(session); 531 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
453 end); 532 delayed_ack_function(session);
533 end);
534 end
454 return true; 535 return true;
455 end 536 end
456 end 537 end
457 538
458 module:hook("s2s-read-timeout", handle_read_timeout); 539 module:hook("s2s-read-timeout", handle_read_timeout);