Mercurial > prosody-modules
comparison mod_smacks/mod_smacks.lua @ 345:445178d15b51
mod_smacks: Merge mod_fastreconnect (resumption support), fix a number of bugs, refactor the code and add some more comments to explain process
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 18 Mar 2011 21:26:27 +0000 |
parents | 41f1cac40560 |
children | db0f065c4e09 |
comparison
equal
deleted
inserted
replaced
344:2b0f2160fc61 | 345:445178d15b51 |
---|---|
1 local st = require "util.stanza"; | 1 local st = require "util.stanza"; |
2 local uuid_generate = require "util.uuid".generate; | |
2 | 3 |
3 local t_insert, t_remove = table.insert, table.remove; | 4 local t_insert, t_remove = table.insert, table.remove; |
4 local math_min = math.min; | 5 local math_min = math.min; |
6 local os_time = os.time; | |
5 local tonumber, tostring = tonumber, tostring; | 7 local tonumber, tostring = tonumber, tostring; |
6 local add_filter = require "util.filters".add_filter; | 8 local add_filter = require "util.filters".add_filter; |
7 local timer = require "util.timer"; | 9 local timer = require "util.timer"; |
8 | 10 |
9 local xmlns_sm = "urn:xmpp:sm:2"; | 11 local xmlns_sm = "urn:xmpp:sm:2"; |
12 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; | |
10 | 13 |
11 local sm_attr = { xmlns = xmlns_sm }; | 14 local sm_attr = { xmlns = xmlns_sm }; |
12 | 15 |
13 local resume_timeout = 300; | 16 local resume_timeout = module:get_option("smacks_hibernation_time", 300); |
14 local max_unacked_stanzas = 0; | 17 local max_unacked_stanzas = 0; |
18 | |
19 local session_registry = {}; | |
15 | 20 |
16 module:hook("stream-features", | 21 module:hook("stream-features", |
17 function (event) | 22 function (event) |
18 event.features:tag("sm", sm_attr):tag("optional"):up():up(); | 23 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
19 end); | 24 end); |
21 module:hook("s2s-stream-features", | 26 module:hook("s2s-stream-features", |
22 function (event) | 27 function (event) |
23 event.features:tag("sm", sm_attr):tag("optional"):up():up(); | 28 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
24 end); | 29 end); |
25 | 30 |
26 module:hook_stanza(xmlns_sm, "enable", | 31 module:hook_stanza("http://etherx.jabber.org/streams", "features", |
27 function (session, stanza) | 32 function (session, stanza) |
28 module:log("debug", "Enabling stream management"); | 33 if not session.smacks and stanza:get_child("sm", xmlns_sm) then |
29 session.smacks = true; | 34 session.send(st.stanza("enable", sm_attr)); |
30 | 35 end |
31 -- Overwrite process_stanza() and send() | 36 end); |
32 local queue = {}; | 37 |
33 session.outgoing_stanza_queue = queue; | 38 local function wrap_session(session, resume) |
34 session.last_acknowledged_stanza = 0; | 39 -- Overwrite process_stanza() and send() |
35 local _send = session.sends2s or session.send; | 40 local queue; |
36 local function new_send(stanza) | 41 if not resume then |
37 local attr = stanza.attr; | 42 queue = {}; |
38 if attr and not attr.xmlns then -- Stanza in default stream namespace | 43 session.outgoing_stanza_queue = queue; |
39 queue[#queue+1] = st.clone(stanza); | 44 session.last_acknowledged_stanza = 0; |
40 end | 45 else |
41 local ok, err = _send(stanza); | 46 queue = session.outgoing_stanza_queue; |
42 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then | 47 end |
43 session.awaiting_ack = true; | 48 |
44 return _send(st.stanza("r", { xmlns = xmlns_sm })); | 49 local _send = session.sends2s or session.send; |
45 end | 50 local function new_send(stanza) |
46 return ok, err; | 51 local attr = stanza.attr; |
47 end | 52 if attr and not attr.xmlns then -- Stanza in default stream namespace |
48 | 53 queue[#queue+1] = st.clone(stanza); |
49 if session.sends2s then | 54 end |
50 session.sends2s = new_send; | 55 local ok, err = _send(stanza); |
51 else | 56 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then |
52 session.send = new_send; | 57 session.awaiting_ack = true; |
53 end | 58 return _send(st.stanza("r", sm_attr)); |
54 | 59 end |
55 session.handled_stanza_count = 0; | 60 return ok, err; |
56 add_filter(session, "stanzas/in", function (stanza) | 61 end |
57 if not stanza.attr.xmlns then | 62 |
58 session.handled_stanza_count = session.handled_stanza_count + 1; | 63 if session.sends2s then |
59 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); | 64 session.sends2s = new_send; |
60 end | 65 else |
61 return stanza; | 66 session.send = new_send; |
62 end); | 67 end |
63 | 68 |
64 if not stanza.attr.resume then -- FIXME: Resumption should be a different spec :/ | 69 if not resume then |
65 _send(st.stanza("enabled", sm_attr)); | 70 session.handled_stanza_count = 0; |
66 return true; | 71 add_filter(session, "stanzas/in", function (stanza) |
67 end | 72 if not stanza.attr.xmlns then |
68 end, 100); | 73 session.handled_stanza_count = session.handled_stanza_count + 1; |
74 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); | |
75 end | |
76 return stanza; | |
77 end); | |
78 end | |
79 | |
80 return session; | |
81 end | |
82 | |
83 module:hook_stanza(xmlns_sm, "enable", function (session, stanza) | |
84 module:log("debug", "Enabling stream management"); | |
85 session.smacks = true; | |
86 | |
87 wrap_session(session); | |
88 | |
89 local resume_token; | |
90 local resume = stanza.attr.resume; | |
91 if resume == "true" or resume == "1" then | |
92 resume_token = uuid_generate(); | |
93 session_registry[resume_token] = session; | |
94 session.resumption_token = resume_token; | |
95 end | |
96 session.send(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); | |
97 return true; | |
98 end, 100); | |
69 | 99 |
70 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) | 100 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) |
71 if not origin.smacks then | 101 if not origin.smacks then |
72 module:log("debug", "Received ack request from non-smack-enabled session"); | 102 module:log("debug", "Received ack request from non-smack-enabled session"); |
73 return; | 103 return; |
86 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; | 116 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; |
87 local queue = origin.outgoing_stanza_queue; | 117 local queue = origin.outgoing_stanza_queue; |
88 if handled_stanza_count > #queue then | 118 if handled_stanza_count > #queue then |
89 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", | 119 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", |
90 handled_stanza_count, #queue); | 120 handled_stanza_count, #queue); |
121 module:log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); | |
91 for i=1,#queue do | 122 for i=1,#queue do |
92 module:log("debug", "Q item %d: %s", i, tostring(queue[i])); | 123 module:log("debug", "Q item %d: %s", i, tostring(queue[i])); |
93 end | 124 end |
94 end | 125 end |
95 for i=1,math_min(handled_stanza_count,#queue) do | 126 for i=1,math_min(handled_stanza_count,#queue) do |
132 module:log("warn", "::%s", tostring(queue[i])); | 163 module:log("warn", "::%s", tostring(queue[i])); |
133 end | 164 end |
134 handle_unacked_stanzas(session); | 165 handle_unacked_stanzas(session); |
135 end | 166 end |
136 else | 167 else |
137 session.hibernating = true; | 168 local hibernate_time = os_time(); -- Track the time we went into hibernation |
169 session.hibernating = hibernate_time; | |
138 timer.add_task(resume_timeout, function () | 170 timer.add_task(resume_timeout, function () |
139 if session.hibernating then | 171 -- Check the hibernate time still matches what we think it is, |
172 -- otherwise the session resumed and re-hibernated. | |
173 if session.hibernating == hibernate_time then | |
174 session_registry[session.resumption_token] = nil; | |
140 session.resumption_token = nil; | 175 session.resumption_token = nil; |
141 sessionmanager.destroy_session(session); -- Re-destroy | 176 -- This recursion back into our destroy handler is to |
177 -- make sure we still handle any queued stanzas | |
178 sessionmanager.destroy_session(session); | |
142 end | 179 end |
143 end); | 180 end); |
144 return; -- Postpone destruction for now | 181 return; -- Postpone destruction for now |
145 end | 182 end |
146 | 183 |
147 end | 184 end |
148 return _destroy_session(session, err); | 185 return _destroy_session(session, err); |
149 end | 186 end |
187 | |
188 module:hook_stanza(xmlns_sm, "resume", function (session, stanza) | |
189 local id = stanza.attr.previd; | |
190 local original_session = session_registry[id]; | |
191 if not original_session then | |
192 session.send(st.stanza("failed", sm_attr) | |
193 :tag("item-not-found", { xmlns = xmlns_errors }) | |
194 ); | |
195 elseif session.username == original_session.username | |
196 and session.host == original_session.host then | |
197 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | |
198 original_session.ip = session.ip; | |
199 original_session.conn = session.conn; | |
200 original_session.send = session.send; | |
201 original_session.stream = session.stream; | |
202 original_session.secure = session.secure; | |
203 original_session.hibernating = nil; | |
204 local filter = original_session.filter; | |
205 local stream = session.stream; | |
206 local log = session.log; | |
207 function original_session.data(data) | |
208 data = filter("bytes/in", data); | |
209 if data then | |
210 local ok, err = stream:feed(data); | |
211 if ok then return; end | |
212 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | |
213 original_session:close("xml-not-well-formed"); | |
214 end | |
215 end | |
216 wrap_session(original_session, true); | |
217 -- Inform xmppstream of the new session (passed to its callbacks) | |
218 stream:set_session(original_session); | |
219 -- Similar for connlisteners | |
220 require "net.connlisteners".get("xmppclient").associate_session(session.conn, original_session); | |
221 | |
222 session.send(st.stanza("resumed", { xmlns = xmlns_sm, | |
223 h = original_session.handled_stanza_count, previd = id })); | |
224 | |
225 -- Fake an <a> with the h of the <resume/> from the client | |
226 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm, | |
227 h = stanza.attr.h })); | |
228 | |
229 -- Ok, we need to re-send any stanzas that the client didn't see | |
230 -- ...they are what is now left in the outgoing stanza queue | |
231 local queue = original_session.outgoing_stanza_queue; | |
232 for i=1,#queue do | |
233 session.send(queue[i]); | |
234 end | |
235 else | |
236 log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", | |
237 session.username or "?", session.host or "?", session.type, | |
238 original_session.username or "?", original_session.host or "?", original_session.type); | |
239 session.send(st.stanza("failed", sm_attr) | |
240 :tag("not-authorized", { xmlns = xmlns_errors })); | |
241 end | |
242 return true; | |
243 end); |