Mercurial > prosody-modules
comparison mod_smacks/mod_smacks.lua @ 2491:5fbca7de2088
mod_smacks: Send out more ack requests where needed
Under some circumstances it was possible that more than "max_unacked_stanzas"
where left in the outgoing stanza queue without forcing an ack.
This could happen, when more stanzas entered the queue while the last ack request
was still unanswered.
Now the test "#queue > max_unacked_stanzas" is done upon receiving
an ack as well as when sending out stanzas, which fixes this bug.
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sun, 12 Feb 2017 19:27:50 +0100 |
parents | 5e7badecf7fe |
children | d300ae5dba87 |
comparison
equal
deleted
inserted
replaced
2490:85509650ba82 | 2491:5fbca7de2088 |
---|---|
3 -- Copyright (C) 2010-2015 Matthew Wild | 3 -- Copyright (C) 2010-2015 Matthew Wild |
4 -- Copyright (C) 2010 Waqas Hussain | 4 -- Copyright (C) 2010 Waqas Hussain |
5 -- Copyright (C) 2012-2015 Kim Alvefur | 5 -- Copyright (C) 2012-2015 Kim Alvefur |
6 -- Copyright (C) 2012 Thijs Alkemade | 6 -- Copyright (C) 2012 Thijs Alkemade |
7 -- Copyright (C) 2014 Florian Zeitz | 7 -- Copyright (C) 2014 Florian Zeitz |
8 -- Copyright (C) 2016 Thilo Molitor | 8 -- Copyright (C) 2016-2017 Thilo Molitor |
9 -- | 9 -- |
10 -- This project is MIT/X11 licensed. Please see the | 10 -- This project is MIT/X11 licensed. Please see the |
11 -- COPYING file in the source package for more information. | 11 -- COPYING file in the source package for more information. |
12 -- | 12 -- |
13 | 13 |
78 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | 78 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); |
79 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | 79 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); |
80 end | 80 end |
81 end); | 81 end); |
82 | 82 |
83 local function request_ack_if_needed(session) | |
84 local queue = session.outgoing_stanza_queue; | |
85 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then | |
86 session.log("debug", "Queuing <r> (in a moment)"); | |
87 session.awaiting_ack = false; | |
88 session.awaiting_ack_timer = module:add_timer(1e-06, function () | |
89 if not session.awaiting_ack then | |
90 session.log("debug", "Sending <r> (inside timer, before send)"); | |
91 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | |
92 session.log("debug", "Sending <r> (inside timer, after send)"); | |
93 session.awaiting_ack = true; | |
94 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | |
95 delayed_ack_function(session); | |
96 end); | |
97 end | |
98 end); | |
99 end | |
100 end | |
101 | |
83 local function outgoing_stanza_filter(stanza, session) | 102 local function outgoing_stanza_filter(stanza, session) |
84 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; | 103 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; |
85 if is_stanza and not stanza._cached then -- Stanza in default stream namespace | 104 if is_stanza and not stanza._cached then -- Stanza in default stream namespace |
86 local queue = session.outgoing_stanza_queue; | 105 local queue = session.outgoing_stanza_queue; |
87 local cached_stanza = st.clone(stanza); | 106 local cached_stanza = st.clone(stanza); |
95 session.log("debug", "#queue = %d", #queue); | 114 session.log("debug", "#queue = %d", #queue); |
96 if session.hibernating then | 115 if session.hibernating then |
97 session.log("debug", "hibernating, stanza queued"); | 116 session.log("debug", "hibernating, stanza queued"); |
98 return nil; | 117 return nil; |
99 end | 118 end |
100 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then | 119 request_ack_if_needed(session); |
101 session.log("debug", "Queuing <r> (in a moment)"); | |
102 session.awaiting_ack = false; | |
103 session.awaiting_ack_timer = module:add_timer(1e-06, function () | |
104 if not session.awaiting_ack then | |
105 session.log("debug", "Sending <r> (before send)"); | |
106 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | |
107 session.log("debug", "Sending <r> (after send)"); | |
108 session.awaiting_ack = true; | |
109 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | |
110 delayed_ack_function(session); | |
111 end); | |
112 end | |
113 end); | |
114 end | |
115 end | 120 end |
116 return stanza; | 121 return stanza; |
117 end | 122 end |
118 | 123 |
119 local function count_incoming_stanzas(stanza, session) | 124 local function count_incoming_stanzas(stanza, session) |
236 end | 241 end |
237 if origin.delayed_ack_timer then | 242 if origin.delayed_ack_timer then |
238 origin.delayed_ack_timer:stop(); | 243 origin.delayed_ack_timer:stop(); |
239 end | 244 end |
240 -- Remove handled stanzas from outgoing_stanza_queue | 245 -- Remove handled stanzas from outgoing_stanza_queue |
241 --log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); | 246 log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); |
242 local h = tonumber(stanza.attr.h); | 247 local h = tonumber(stanza.attr.h); |
243 if not h then | 248 if not h then |
244 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; | 249 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; |
245 end | 250 end |
246 local handled_stanza_count = h-origin.last_acknowledged_stanza; | 251 local handled_stanza_count = h-origin.last_acknowledged_stanza; |
256 for i=1,math_min(handled_stanza_count,#queue) do | 261 for i=1,math_min(handled_stanza_count,#queue) do |
257 t_remove(origin.outgoing_stanza_queue, 1); | 262 t_remove(origin.outgoing_stanza_queue, 1); |
258 end | 263 end |
259 origin.log("debug", "#queue = %d", #queue); | 264 origin.log("debug", "#queue = %d", #queue); |
260 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | 265 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; |
266 request_ack_if_needed(origin) | |
261 return true; | 267 return true; |
262 end | 268 end |
263 module:hook_stanza(xmlns_sm2, "a", handle_a); | 269 module:hook_stanza(xmlns_sm2, "a", handle_a); |
264 module:hook_stanza(xmlns_sm3, "a", handle_a); | 270 module:hook_stanza(xmlns_sm3, "a", handle_a); |
265 | 271 |