Mercurial > prosody-modules
comparison mod_smacks/mod_smacks.lua @ 4413:0b9501f82e63
mod_smacks: allow O(1) processing of delayed ack events
This adds a stanza field to the eent if the stanza which triggered this event
is known exactly.
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sat, 30 Jan 2021 07:19:35 +0100 |
parents | 22e7b3d6fcae |
children | 74da3643c62d |
comparison
equal
deleted
inserted
replaced
4412:e5493a10c4d1 | 4413:0b9501f82e63 |
---|---|
114 stop = function(self) stopped = true end; | 114 stop = function(self) stopped = true end; |
115 timer; | 115 timer; |
116 }; | 116 }; |
117 end | 117 end |
118 | 118 |
119 local function delayed_ack_function(session) | 119 local function delayed_ack_function(session, stanza) |
120 -- fire event only if configured to do so and our session is not already hibernated or destroyed | 120 -- fire event only if configured to do so and our session is not already hibernated or destroyed |
121 if delayed_ack_timeout > 0 and session.awaiting_ack | 121 if delayed_ack_timeout > 0 and session.awaiting_ack |
122 and not session.hibernating and not session.destroyed then | 122 and not session.hibernating and not session.destroyed then |
123 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", | 123 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", |
124 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); | 124 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); |
125 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); | 125 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza}); |
126 end | 126 end |
127 session.delayed_ack_timer = nil; | 127 session.delayed_ack_timer = nil; |
128 end | 128 end |
129 | 129 |
130 local function can_do_smacks(session, advertise_only) | 130 local function can_do_smacks(session, advertise_only) |
156 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | 156 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); |
157 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | 157 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); |
158 end | 158 end |
159 end); | 159 end); |
160 | 160 |
161 local function request_ack_if_needed(session, force, reason) | 161 local function request_ack_if_needed(session, force, reason, stanza) |
162 local queue = session.outgoing_stanza_queue; | 162 local queue = session.outgoing_stanza_queue; |
163 local expected_h = session.last_acknowledged_stanza + #queue; | 163 local expected_h = session.last_acknowledged_stanza + #queue; |
164 -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); | 164 -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); |
165 if session.awaiting_ack == nil and not session.hibernating then | 165 if session.awaiting_ack == nil and not session.hibernating then |
166 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong | 166 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong |
180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | 180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
181 session.last_requested_h = session.last_acknowledged_stanza + #queue; | 181 session.last_requested_h = session.last_acknowledged_stanza + #queue; |
182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); | 182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); |
183 if not session.delayed_ack_timer then | 183 if not session.delayed_ack_timer then |
184 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() | 184 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() |
185 delayed_ack_function(session); | 185 delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue |
186 end); | 186 end); |
187 end | 187 end |
188 end | 188 end |
189 end); | 189 end); |
190 end | 190 end |
194 -- and there isn't already a timer for this event running. | 194 -- and there isn't already a timer for this event running. |
195 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event | 195 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event |
196 -- would not trigger this event (again). | 196 -- would not trigger this event (again). |
197 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then | 197 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then |
198 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); | 198 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); |
199 delayed_ack_function(session); | 199 delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules |
200 end | 200 end |
201 end | 201 end |
202 | 202 |
203 local function outgoing_stanza_filter(stanza, session) | 203 local function outgoing_stanza_filter(stanza, session) |
204 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's | 204 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's |
226 if session.hibernating then | 226 if session.hibernating then |
227 session.log("debug", "hibernating, stanza queued"); | 227 session.log("debug", "hibernating, stanza queued"); |
228 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); | 228 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); |
229 return nil; | 229 return nil; |
230 end | 230 end |
231 request_ack_if_needed(session, false, "outgoing_stanza_filter"); | 231 request_ack_if_needed(session, false, "outgoing_stanza_filter", stanza); |
232 end | 232 end |
233 return stanza; | 233 return stanza; |
234 end | 234 end |
235 | 235 |
236 local function count_incoming_stanzas(stanza, session) | 236 local function count_incoming_stanzas(stanza, session) |
346 -- Reply with <a> | 346 -- Reply with <a> |
347 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); | 347 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); |
348 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) | 348 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) |
349 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; | 349 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; |
350 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then | 350 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then |
351 request_ack_if_needed(origin, true, "piggybacked by handle_r"); | 351 request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); |
352 end | 352 end |
353 return true; | 353 return true; |
354 end | 354 end |
355 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); | 355 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); |
356 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); | 356 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); |
388 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); | 388 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); |
389 end | 389 end |
390 | 390 |
391 origin.log("debug", "#queue = %d", #queue); | 391 origin.log("debug", "#queue = %d", #queue); |
392 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | 392 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; |
393 request_ack_if_needed(origin, false, "handle_a") | 393 request_ack_if_needed(origin, false, "handle_a", nil) |
394 return true; | 394 return true; |
395 end | 395 end |
396 module:hook_stanza(xmlns_sm2, "a", handle_a); | 396 module:hook_stanza(xmlns_sm2, "a", handle_a); |
397 module:hook_stanza(xmlns_sm3, "a", handle_a); | 397 module:hook_stanza(xmlns_sm3, "a", handle_a); |
398 | 398 |
621 function session.send(stanza) | 621 function session.send(stanza) |
622 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 622 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); |
623 return false; | 623 return false; |
624 end | 624 end |
625 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); | 625 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); |
626 request_ack_if_needed(original_session, true, "handle_resume"); | 626 request_ack_if_needed(original_session, true, "handle_resume", nil); |
627 else | 627 else |
628 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", | 628 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", |
629 session.username or "?", session.host or "?", session.type, | 629 session.username or "?", session.host or "?", session.type, |
630 original_session.username or "?", original_session.host or "?", original_session.type); | 630 original_session.username or "?", original_session.host or "?", original_session.type); |
631 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 631 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |
652 session.log("debug", "Sending <r> (read timeout)"); | 652 session.log("debug", "Sending <r> (read timeout)"); |
653 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); | 653 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); |
654 session.awaiting_ack = true; | 654 session.awaiting_ack = true; |
655 if not session.delayed_ack_timer then | 655 if not session.delayed_ack_timer then |
656 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() | 656 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() |
657 delayed_ack_function(session); | 657 delayed_ack_function(session, nil); |
658 end); | 658 end); |
659 end | 659 end |
660 return true; | 660 return true; |
661 end | 661 end |
662 end | 662 end |