Mercurial > prosody-modules
comparison mod_cloud_notify/mod_cloud_notify.lua @ 4324:45dcf5d4cd6c
mod_cloud_notify: fix push flooding on delayed acks
Under some circumstances the delayed-ack handling caused a push flood,
this commit prevents this and caps pushes at one push per second only.
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Mon, 11 Jan 2021 22:48:17 +0100 |
parents | d44a8d3dd571 |
children | 9b95241c6ae5 |
comparison
equal
deleted
inserted
replaced
4323:a7a06c8cea37 | 4324:45dcf5d4cd6c |
---|---|
408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted | 408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted |
409 end | 409 end |
410 end | 410 end |
411 | 411 |
412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) | 412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) |
413 local function process_smacks_stanza(event) | 413 local function process_stanza(session, stanza) |
414 local session = event.origin; | |
415 local stanza = event.stanza; | |
416 if session.push_identifier then | 414 if session.push_identifier then |
417 session.log("debug", "adding new stanza to push_queue"); | 415 session.log("debug", "adding new stanza to push_queue"); |
418 if not session.push_queue then session.push_queue = {}; end | 416 if not session.push_queue then session.push_queue = {}; end |
419 local queue = session.push_queue; | 417 local queue = session.push_queue; |
420 queue[#queue+1] = st.clone(stanza); | 418 queue[#queue+1] = st.clone(stanza); |
421 if #queue == 1 then -- first stanza --> start timer | 419 if #queue == 1 then -- first stanza --> start timer |
422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); | 420 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); |
423 session.awaiting_push_timer = module:add_timer(1e-06, function () | 421 session.awaiting_push_timer = module:add_timer(1.0, function () |
424 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); | 422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); |
425 process_stanza_queue(session.push_queue, session, "push"); | 423 process_stanza_queue(session.push_queue, session, "push"); |
426 session.push_queue = {}; -- clean up queue after push | 424 session.push_queue = {}; -- clean up queue after push |
425 session.awaiting_push_timer = nil; | |
427 end); | 426 end); |
428 end | 427 end |
429 else | 428 end |
429 return stanza; | |
430 end | |
431 | |
432 local function process_smacks_stanza(event) | |
433 local session = event.origin; | |
434 local stanza = event.stanza; | |
435 if not session.push_identifier then | |
430 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", | 436 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", |
431 session.push_identifier | 437 session.push_identifier |
432 ); | 438 ); |
433 end | 439 else |
434 return stanza; | 440 process_stanza(session, stanza) |
441 end | |
435 end | 442 end |
436 | 443 |
437 -- smacks hibernation is started | 444 -- smacks hibernation is started |
438 local function hibernate_session(event) | 445 local function hibernate_session(event) |
439 local session = event.origin; | 446 local session = event.origin; |
454 | 461 |
455 -- smacks ack is delayed | 462 -- smacks ack is delayed |
456 local function ack_delayed(event) | 463 local function ack_delayed(event) |
457 local session = event.origin; | 464 local session = event.origin; |
458 local queue = event.queue; | 465 local queue = event.queue; |
459 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | 466 if not session.push_identifier then return; end |
460 process_stanza_queue(queue, session, "smacks"); | 467 for i=1, #queue do |
468 local stanza = queue[i]; | |
469 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | |
470 process_stanza(session, stanza); | |
471 end | |
461 end | 472 end |
462 | 473 |
463 -- archive message added | 474 -- archive message added |
464 local function archive_message_added(event) | 475 local function archive_message_added(event) |
465 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | 476 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |