Mercurial > prosody-modules
comparison mod_smacks/mod_smacks.lua @ 1520:2881d532f385
mod_smacks: Use filters for queuing outgoing stanzas instead of wrapping session.send()
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 12 Oct 2014 13:24:50 +0200 |
parents | 9475fe14d58d |
children | d4a4ed31567e |
comparison
equal
deleted
inserted
replaced
1519:67c80abe742e | 1520:2881d532f385 |
---|---|
66 session.sends2s(st.stanza("enable", sm2_attr)); | 66 session.sends2s(st.stanza("enable", sm2_attr)); |
67 end | 67 end |
68 end | 68 end |
69 end); | 69 end); |
70 | 70 |
71 local function wrap_session(session, resume, xmlns_sm) | 71 local function outgoing_stanza_filter(stanza, session) |
72 local sm_attr = { xmlns = xmlns_sm }; | 72 local is_stanza = stanza.attr and not stanza.attr.xmlns; |
73 if is_stanza and not stanza._cached then -- Stanza in default stream namespace | |
74 local queue = session.outgoing_stanza_queue; | |
75 module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not"); | |
76 local cached_stanza = st.clone(stanza); | |
77 cached_stanza._cached = true; | |
78 | |
79 if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then | |
80 cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()}); | |
81 end | |
82 | |
83 queue[#queue+1] = cached_stanza; | |
84 session.log("debug", "#queue = %d", #queue); | |
85 if #queue > max_unacked_stanzas then | |
86 module:add_timer(0, function () | |
87 if not session.awaiting_ack then | |
88 session.awaiting_ack = true; | |
89 session.send(st.stanza("r", { xmlns = session.smacks })); | |
90 end | |
91 end); | |
92 end | |
93 end | |
94 if session.hibernating then | |
95 session.log("debug", "hibernating, stanza queued") | |
96 -- The session is hibernating, no point in sending the stanza | |
97 -- over a dead connection. It will be delivered upon resumption. | |
98 return nil; -- or empty string? | |
99 end | |
100 return stanza; | |
101 end | |
102 | |
103 local function count_incoming_stanzas(stanza, session) | |
104 if not stanza.attr.xmlns then | |
105 session.handled_stanza_count = session.handled_stanza_count + 1; | |
106 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); | |
107 end | |
108 return stanza; | |
109 end | |
110 | |
111 local function wrap_session(session, resume) | |
73 -- Overwrite process_stanza() and send() | 112 -- Overwrite process_stanza() and send() |
74 local queue; | 113 local queue; |
75 if not resume then | 114 if not resume then |
76 queue = {}; | 115 queue = {}; |
77 session.outgoing_stanza_queue = queue; | 116 session.outgoing_stanza_queue = queue; |
78 session.last_acknowledged_stanza = 0; | 117 session.last_acknowledged_stanza = 0; |
79 else | 118 else |
80 queue = session.outgoing_stanza_queue; | 119 queue = session.outgoing_stanza_queue; |
81 end | 120 end |
82 | 121 |
83 local _send = session.sends2s or session.send; | 122 add_filter(session, "stanzas/out", outgoing_stanza_filter, -1000); |
84 local function new_send(stanza) | |
85 local is_stanza = stanza.attr and not stanza.attr.xmlns; | |
86 if is_stanza then -- Stanza in default stream namespace | |
87 module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not"); | |
88 local cached_stanza = st.clone(stanza); | |
89 | |
90 if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then | |
91 cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()}); | |
92 end | |
93 | |
94 queue[#queue+1] = cached_stanza; | |
95 session.log("debug", "#queue = %d", #queue); | |
96 end | |
97 if session.hibernating then | |
98 session.log("debug", "hibernating, stanza queued") | |
99 -- The session is hibernating, no point in sending the stanza | |
100 -- over a dead connection. It will be delivered upon resumption. | |
101 return true; | |
102 end | |
103 local ok, err = _send(stanza); | |
104 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack and is_stanza then | |
105 session.awaiting_ack = true; | |
106 return _send(st.stanza("r", sm_attr)); | |
107 end | |
108 return ok, err; | |
109 end | |
110 | |
111 if session.sends2s then | |
112 session.sends2s = new_send; | |
113 else | |
114 session.send = new_send; | |
115 end | |
116 | 123 |
117 local session_close = session.close; | 124 local session_close = session.close; |
118 function session.close(...) | 125 function session.close(...) |
119 if session.resumption_token then | 126 if session.resumption_token then |
120 session_registry[session.resumption_token] = nil; | 127 session_registry[session.resumption_token] = nil; |
123 return session_close(...); | 130 return session_close(...); |
124 end | 131 end |
125 | 132 |
126 if not resume then | 133 if not resume then |
127 session.handled_stanza_count = 0; | 134 session.handled_stanza_count = 0; |
128 add_filter(session, "stanzas/in", function (stanza) | 135 add_filter(session, "stanzas/in", count_incoming_stanzas, 1000); |
129 if not stanza.attr.xmlns then | |
130 session.handled_stanza_count = session.handled_stanza_count + 1; | |
131 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); | |
132 end | |
133 return stanza; | |
134 end); | |
135 end | 136 end |
136 | 137 |
137 return session; | 138 return session; |
138 end | 139 end |
139 | 140 |
144 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); | 145 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); |
145 return true; | 146 return true; |
146 end | 147 end |
147 | 148 |
148 module:log("debug", "Enabling stream management"); | 149 module:log("debug", "Enabling stream management"); |
149 session.smacks = true; | 150 session.smacks = xmlns_sm; |
150 | 151 |
151 wrap_session(session, false, xmlns_sm); | 152 wrap_session(session, false); |
152 | 153 |
153 local resume_token; | 154 local resume_token; |
154 local resume = stanza.attr.resume; | 155 local resume = stanza.attr.resume; |
155 if resume == "true" or resume == "1" then | 156 if resume == "true" or resume == "1" then |
156 resume_token = uuid_generate(); | 157 resume_token = uuid_generate(); |
163 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); | 164 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); |
164 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); | 165 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); |
165 | 166 |
166 function handle_enabled(session, stanza, xmlns_sm) | 167 function handle_enabled(session, stanza, xmlns_sm) |
167 module:log("debug", "Enabling stream management"); | 168 module:log("debug", "Enabling stream management"); |
168 session.smacks = true; | 169 session.smacks = xmlns_sm; |
169 | 170 |
170 wrap_session(session, false, xmlns_sm); | 171 wrap_session(session, false); |
171 | 172 |
172 -- FIXME Resume? | 173 -- FIXME Resume? |
173 | 174 |
174 return true; | 175 return true; |
175 end | 176 end |
308 conn:close(); | 309 conn:close(); |
309 end | 310 end |
310 original_session.ip = session.ip; | 311 original_session.ip = session.ip; |
311 original_session.conn = session.conn; | 312 original_session.conn = session.conn; |
312 original_session.send = session.send; | 313 original_session.send = session.send; |
314 original_session.send.filter = original_session.filter; | |
313 original_session.stream = session.stream; | 315 original_session.stream = session.stream; |
314 original_session.secure = session.secure; | 316 original_session.secure = session.secure; |
315 original_session.hibernating = nil; | 317 original_session.hibernating = nil; |
316 local filter = original_session.filter; | 318 wrap_session(original_session, true); |
317 local stream = session.stream; | |
318 local log = session.log; | |
319 function original_session.data(data) | |
320 data = filter("bytes/in", data); | |
321 if data then | |
322 local ok, err = stream:feed(data); | |
323 if ok then return; end | |
324 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | |
325 original_session:close("xml-not-well-formed"); | |
326 end | |
327 end | |
328 wrap_session(original_session, true, xmlns_sm); | |
329 -- Inform xmppstream of the new session (passed to its callbacks) | 319 -- Inform xmppstream of the new session (passed to its callbacks) |
330 stream:set_session(original_session); | 320 original_session.stream:set_session(original_session); |
331 -- Similar for connlisteners | 321 -- Similar for connlisteners |
332 c2s_sessions[session.conn] = original_session; | 322 c2s_sessions[session.conn] = original_session; |
333 | 323 |
334 session.send(st.stanza("resumed", { xmlns = xmlns_sm, | 324 session.send(st.stanza("resumed", { xmlns = xmlns_sm, |
335 h = original_session.handled_stanza_count, previd = id })); | 325 h = original_session.handled_stanza_count, previd = id })); |