comparison mod_cloud_notify/mod_cloud_notify.lua @ 4324:45dcf5d4cd6c

mod_cloud_notify: fix push flooding on delayed acks Under some circumstances the delayed-ack handling caused a push flood, this commit prevents this and caps pushes at one push per second only.
author tmolitor <thilo@eightysoft.de>
date Mon, 11 Jan 2021 22:48:17 +0100
parents d44a8d3dd571
children 9b95241c6ae5
comparison
equal deleted inserted replaced
4323:a7a06c8cea37 4324:45dcf5d4cd6c
408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted 408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted
409 end 409 end
410 end 410 end
411 411
412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) 412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
413 local function process_smacks_stanza(event) 413 local function process_stanza(session, stanza)
414 local session = event.origin;
415 local stanza = event.stanza;
416 if session.push_identifier then 414 if session.push_identifier then
417 session.log("debug", "adding new stanza to push_queue"); 415 session.log("debug", "adding new stanza to push_queue");
418 if not session.push_queue then session.push_queue = {}; end 416 if not session.push_queue then session.push_queue = {}; end
419 local queue = session.push_queue; 417 local queue = session.push_queue;
420 queue[#queue+1] = st.clone(stanza); 418 queue[#queue+1] = st.clone(stanza);
421 if #queue == 1 then -- first stanza --> start timer 419 if #queue == 1 then -- first stanza --> start timer
422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); 420 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)");
423 session.awaiting_push_timer = module:add_timer(1e-06, function () 421 session.awaiting_push_timer = module:add_timer(1.0, function ()
424 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); 422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)");
425 process_stanza_queue(session.push_queue, session, "push"); 423 process_stanza_queue(session.push_queue, session, "push");
426 session.push_queue = {}; -- clean up queue after push 424 session.push_queue = {}; -- clean up queue after push
425 session.awaiting_push_timer = nil;
427 end); 426 end);
428 end 427 end
429 else 428 end
429 return stanza;
430 end
431
432 local function process_smacks_stanza(event)
433 local session = event.origin;
434 local stanza = event.stanza;
435 if not session.push_identifier then
430 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", 436 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)",
431 session.push_identifier 437 session.push_identifier
432 ); 438 );
433 end 439 else
434 return stanza; 440 process_stanza(session, stanza)
441 end
435 end 442 end
436 443
437 -- smacks hibernation is started 444 -- smacks hibernation is started
438 local function hibernate_session(event) 445 local function hibernate_session(event)
439 local session = event.origin; 446 local session = event.origin;
454 461
455 -- smacks ack is delayed 462 -- smacks ack is delayed
456 local function ack_delayed(event) 463 local function ack_delayed(event)
457 local session = event.origin; 464 local session = event.origin;
458 local queue = event.queue; 465 local queue = event.queue;
459 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) 466 if not session.push_identifier then return; end
460 process_stanza_queue(queue, session, "smacks"); 467 for i=1, #queue do
468 local stanza = queue[i];
469 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
470 process_stanza(session, stanza);
471 end
461 end 472 end
462 473
463 -- archive message added 474 -- archive message added
464 local function archive_message_added(event) 475 local function archive_message_added(event)
465 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } 476 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }