comparison mod_smacks/mod_smacks.lua @ 345:445178d15b51

mod_smacks: Merge mod_fastreconnect (resumption support), fix a number of bugs, refactor the code and add some more comments to explain process
author Matthew Wild <mwild1@gmail.com>
date Fri, 18 Mar 2011 21:26:27 +0000
parents 41f1cac40560
children db0f065c4e09
comparison
equal deleted inserted replaced
344:2b0f2160fc61 345:445178d15b51
1 local st = require "util.stanza"; 1 local st = require "util.stanza";
2 local uuid_generate = require "util.uuid".generate;
2 3
3 local t_insert, t_remove = table.insert, table.remove; 4 local t_insert, t_remove = table.insert, table.remove;
4 local math_min = math.min; 5 local math_min = math.min;
6 local os_time = os.time;
5 local tonumber, tostring = tonumber, tostring; 7 local tonumber, tostring = tonumber, tostring;
6 local add_filter = require "util.filters".add_filter; 8 local add_filter = require "util.filters".add_filter;
7 local timer = require "util.timer"; 9 local timer = require "util.timer";
8 10
9 local xmlns_sm = "urn:xmpp:sm:2"; 11 local xmlns_sm = "urn:xmpp:sm:2";
12 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
10 13
11 local sm_attr = { xmlns = xmlns_sm }; 14 local sm_attr = { xmlns = xmlns_sm };
12 15
13 local resume_timeout = 300; 16 local resume_timeout = module:get_option("smacks_hibernation_time", 300);
14 local max_unacked_stanzas = 0; 17 local max_unacked_stanzas = 0;
18
19 local session_registry = {};
15 20
16 module:hook("stream-features", 21 module:hook("stream-features",
17 function (event) 22 function (event)
18 event.features:tag("sm", sm_attr):tag("optional"):up():up(); 23 event.features:tag("sm", sm_attr):tag("optional"):up():up();
19 end); 24 end);
21 module:hook("s2s-stream-features", 26 module:hook("s2s-stream-features",
22 function (event) 27 function (event)
23 event.features:tag("sm", sm_attr):tag("optional"):up():up(); 28 event.features:tag("sm", sm_attr):tag("optional"):up():up();
24 end); 29 end);
25 30
26 module:hook_stanza(xmlns_sm, "enable", 31 module:hook_stanza("http://etherx.jabber.org/streams", "features",
27 function (session, stanza) 32 function (session, stanza)
28 module:log("debug", "Enabling stream management"); 33 if not session.smacks and stanza:get_child("sm", xmlns_sm) then
29 session.smacks = true; 34 session.send(st.stanza("enable", sm_attr));
30 35 end
31 -- Overwrite process_stanza() and send() 36 end);
32 local queue = {}; 37
33 session.outgoing_stanza_queue = queue; 38 local function wrap_session(session, resume)
34 session.last_acknowledged_stanza = 0; 39 -- Overwrite process_stanza() and send()
35 local _send = session.sends2s or session.send; 40 local queue;
36 local function new_send(stanza) 41 if not resume then
37 local attr = stanza.attr; 42 queue = {};
38 if attr and not attr.xmlns then -- Stanza in default stream namespace 43 session.outgoing_stanza_queue = queue;
39 queue[#queue+1] = st.clone(stanza); 44 session.last_acknowledged_stanza = 0;
40 end 45 else
41 local ok, err = _send(stanza); 46 queue = session.outgoing_stanza_queue;
42 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then 47 end
43 session.awaiting_ack = true; 48
44 return _send(st.stanza("r", { xmlns = xmlns_sm })); 49 local _send = session.sends2s or session.send;
45 end 50 local function new_send(stanza)
46 return ok, err; 51 local attr = stanza.attr;
47 end 52 if attr and not attr.xmlns then -- Stanza in default stream namespace
48 53 queue[#queue+1] = st.clone(stanza);
49 if session.sends2s then 54 end
50 session.sends2s = new_send; 55 local ok, err = _send(stanza);
51 else 56 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
52 session.send = new_send; 57 session.awaiting_ack = true;
53 end 58 return _send(st.stanza("r", sm_attr));
54 59 end
55 session.handled_stanza_count = 0; 60 return ok, err;
56 add_filter(session, "stanzas/in", function (stanza) 61 end
57 if not stanza.attr.xmlns then 62
58 session.handled_stanza_count = session.handled_stanza_count + 1; 63 if session.sends2s then
59 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); 64 session.sends2s = new_send;
60 end 65 else
61 return stanza; 66 session.send = new_send;
62 end); 67 end
63 68
64 if not stanza.attr.resume then -- FIXME: Resumption should be a different spec :/ 69 if not resume then
65 _send(st.stanza("enabled", sm_attr)); 70 session.handled_stanza_count = 0;
66 return true; 71 add_filter(session, "stanzas/in", function (stanza)
67 end 72 if not stanza.attr.xmlns then
68 end, 100); 73 session.handled_stanza_count = session.handled_stanza_count + 1;
74 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
75 end
76 return stanza;
77 end);
78 end
79
80 return session;
81 end
82
83 module:hook_stanza(xmlns_sm, "enable", function (session, stanza)
84 module:log("debug", "Enabling stream management");
85 session.smacks = true;
86
87 wrap_session(session);
88
89 local resume_token;
90 local resume = stanza.attr.resume;
91 if resume == "true" or resume == "1" then
92 resume_token = uuid_generate();
93 session_registry[resume_token] = session;
94 session.resumption_token = resume_token;
95 end
96 session.send(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
97 return true;
98 end, 100);
69 99
70 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) 100 module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
71 if not origin.smacks then 101 if not origin.smacks then
72 module:log("debug", "Received ack request from non-smack-enabled session"); 102 module:log("debug", "Received ack request from non-smack-enabled session");
73 return; 103 return;
86 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; 116 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
87 local queue = origin.outgoing_stanza_queue; 117 local queue = origin.outgoing_stanza_queue;
88 if handled_stanza_count > #queue then 118 if handled_stanza_count > #queue then
89 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", 119 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
90 handled_stanza_count, #queue); 120 handled_stanza_count, #queue);
121 module:log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza);
91 for i=1,#queue do 122 for i=1,#queue do
92 module:log("debug", "Q item %d: %s", i, tostring(queue[i])); 123 module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
93 end 124 end
94 end 125 end
95 for i=1,math_min(handled_stanza_count,#queue) do 126 for i=1,math_min(handled_stanza_count,#queue) do
132 module:log("warn", "::%s", tostring(queue[i])); 163 module:log("warn", "::%s", tostring(queue[i]));
133 end 164 end
134 handle_unacked_stanzas(session); 165 handle_unacked_stanzas(session);
135 end 166 end
136 else 167 else
137 session.hibernating = true; 168 local hibernate_time = os_time(); -- Track the time we went into hibernation
169 session.hibernating = hibernate_time;
138 timer.add_task(resume_timeout, function () 170 timer.add_task(resume_timeout, function ()
139 if session.hibernating then 171 -- Check the hibernate time still matches what we think it is,
172 -- otherwise the session resumed and re-hibernated.
173 if session.hibernating == hibernate_time then
174 session_registry[session.resumption_token] = nil;
140 session.resumption_token = nil; 175 session.resumption_token = nil;
141 sessionmanager.destroy_session(session); -- Re-destroy 176 -- This recursion back into our destroy handler is to
177 -- make sure we still handle any queued stanzas
178 sessionmanager.destroy_session(session);
142 end 179 end
143 end); 180 end);
144 return; -- Postpone destruction for now 181 return; -- Postpone destruction for now
145 end 182 end
146 183
147 end 184 end
148 return _destroy_session(session, err); 185 return _destroy_session(session, err);
149 end 186 end
187
188 module:hook_stanza(xmlns_sm, "resume", function (session, stanza)
189 local id = stanza.attr.previd;
190 local original_session = session_registry[id];
191 if not original_session then
192 session.send(st.stanza("failed", sm_attr)
193 :tag("item-not-found", { xmlns = xmlns_errors })
194 );
195 elseif session.username == original_session.username
196 and session.host == original_session.host then
197 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
198 original_session.ip = session.ip;
199 original_session.conn = session.conn;
200 original_session.send = session.send;
201 original_session.stream = session.stream;
202 original_session.secure = session.secure;
203 original_session.hibernating = nil;
204 local filter = original_session.filter;
205 local stream = session.stream;
206 local log = session.log;
207 function original_session.data(data)
208 data = filter("bytes/in", data);
209 if data then
210 local ok, err = stream:feed(data);
211 if ok then return; end
212 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
213 original_session:close("xml-not-well-formed");
214 end
215 end
216 wrap_session(original_session, true);
217 -- Inform xmppstream of the new session (passed to its callbacks)
218 stream:set_session(original_session);
219 -- Similar for connlisteners
220 require "net.connlisteners".get("xmppclient").associate_session(session.conn, original_session);
221
222 session.send(st.stanza("resumed", { xmlns = xmlns_sm,
223 h = original_session.handled_stanza_count, previd = id }));
224
225 -- Fake an <a> with the h of the <resume/> from the client
226 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
227 h = stanza.attr.h }));
228
229 -- Ok, we need to re-send any stanzas that the client didn't see
230 -- ...they are what is now left in the outgoing stanza queue
231 local queue = original_session.outgoing_stanza_queue;
232 for i=1,#queue do
233 session.send(queue[i]);
234 end
235 else
236 log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
237 session.username or "?", session.host or "?", session.type,
238 original_session.username or "?", original_session.host or "?", original_session.type);
239 session.send(st.stanza("failed", sm_attr)
240 :tag("not-authorized", { xmlns = xmlns_errors }));
241 end
242 return true;
243 end);