comparison mod_smacks/mod_smacks.lua @ 1520:2881d532f385

mod_smacks: Use filters for queuing outgoing stanzas instead of wrapping session.send()
author Kim Alvefur <zash@zash.se>
date Sun, 12 Oct 2014 13:24:50 +0200
parents 9475fe14d58d
children d4a4ed31567e
comparison
equal deleted inserted replaced
1519:67c80abe742e 1520:2881d532f385
66 session.sends2s(st.stanza("enable", sm2_attr)); 66 session.sends2s(st.stanza("enable", sm2_attr));
67 end 67 end
68 end 68 end
69 end); 69 end);
70 70
71 local function wrap_session(session, resume, xmlns_sm) 71 local function outgoing_stanza_filter(stanza, session)
72 local sm_attr = { xmlns = xmlns_sm }; 72 local is_stanza = stanza.attr and not stanza.attr.xmlns;
73 if is_stanza and not stanza._cached then -- Stanza in default stream namespace
74 local queue = session.outgoing_stanza_queue;
75 module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
76 local cached_stanza = st.clone(stanza);
77 cached_stanza._cached = true;
78
79 if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
80 cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
81 end
82
83 queue[#queue+1] = cached_stanza;
84 session.log("debug", "#queue = %d", #queue);
85 if #queue > max_unacked_stanzas then
86 module:add_timer(0, function ()
87 if not session.awaiting_ack then
88 session.awaiting_ack = true;
89 session.send(st.stanza("r", { xmlns = session.smacks }));
90 end
91 end);
92 end
93 end
94 if session.hibernating then
95 session.log("debug", "hibernating, stanza queued")
96 -- The session is hibernating, no point in sending the stanza
97 -- over a dead connection. It will be delivered upon resumption.
98 return nil; -- or empty string?
99 end
100 return stanza;
101 end
102
103 local function count_incoming_stanzas(stanza, session)
104 if not stanza.attr.xmlns then
105 session.handled_stanza_count = session.handled_stanza_count + 1;
106 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
107 end
108 return stanza;
109 end
110
111 local function wrap_session(session, resume)
73 -- Overwrite process_stanza() and send() 112 -- Overwrite process_stanza() and send()
74 local queue; 113 local queue;
75 if not resume then 114 if not resume then
76 queue = {}; 115 queue = {};
77 session.outgoing_stanza_queue = queue; 116 session.outgoing_stanza_queue = queue;
78 session.last_acknowledged_stanza = 0; 117 session.last_acknowledged_stanza = 0;
79 else 118 else
80 queue = session.outgoing_stanza_queue; 119 queue = session.outgoing_stanza_queue;
81 end 120 end
82 121
83 local _send = session.sends2s or session.send; 122 add_filter(session, "stanzas/out", outgoing_stanza_filter, -1000);
84 local function new_send(stanza)
85 local is_stanza = stanza.attr and not stanza.attr.xmlns;
86 if is_stanza then -- Stanza in default stream namespace
87 module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
88 local cached_stanza = st.clone(stanza);
89
90 if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
91 cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
92 end
93
94 queue[#queue+1] = cached_stanza;
95 session.log("debug", "#queue = %d", #queue);
96 end
97 if session.hibernating then
98 session.log("debug", "hibernating, stanza queued")
99 -- The session is hibernating, no point in sending the stanza
100 -- over a dead connection. It will be delivered upon resumption.
101 return true;
102 end
103 local ok, err = _send(stanza);
104 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack and is_stanza then
105 session.awaiting_ack = true;
106 return _send(st.stanza("r", sm_attr));
107 end
108 return ok, err;
109 end
110
111 if session.sends2s then
112 session.sends2s = new_send;
113 else
114 session.send = new_send;
115 end
116 123
117 local session_close = session.close; 124 local session_close = session.close;
118 function session.close(...) 125 function session.close(...)
119 if session.resumption_token then 126 if session.resumption_token then
120 session_registry[session.resumption_token] = nil; 127 session_registry[session.resumption_token] = nil;
123 return session_close(...); 130 return session_close(...);
124 end 131 end
125 132
126 if not resume then 133 if not resume then
127 session.handled_stanza_count = 0; 134 session.handled_stanza_count = 0;
128 add_filter(session, "stanzas/in", function (stanza) 135 add_filter(session, "stanzas/in", count_incoming_stanzas, 1000);
129 if not stanza.attr.xmlns then
130 session.handled_stanza_count = session.handled_stanza_count + 1;
131 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
132 end
133 return stanza;
134 end);
135 end 136 end
136 137
137 return session; 138 return session;
138 end 139 end
139 140
144 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); 145 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
145 return true; 146 return true;
146 end 147 end
147 148
148 module:log("debug", "Enabling stream management"); 149 module:log("debug", "Enabling stream management");
149 session.smacks = true; 150 session.smacks = xmlns_sm;
150 151
151 wrap_session(session, false, xmlns_sm); 152 wrap_session(session, false);
152 153
153 local resume_token; 154 local resume_token;
154 local resume = stanza.attr.resume; 155 local resume = stanza.attr.resume;
155 if resume == "true" or resume == "1" then 156 if resume == "true" or resume == "1" then
156 resume_token = uuid_generate(); 157 resume_token = uuid_generate();
163 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); 164 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
164 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); 165 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
165 166
166 function handle_enabled(session, stanza, xmlns_sm) 167 function handle_enabled(session, stanza, xmlns_sm)
167 module:log("debug", "Enabling stream management"); 168 module:log("debug", "Enabling stream management");
168 session.smacks = true; 169 session.smacks = xmlns_sm;
169 170
170 wrap_session(session, false, xmlns_sm); 171 wrap_session(session, false);
171 172
172 -- FIXME Resume? 173 -- FIXME Resume?
173 174
174 return true; 175 return true;
175 end 176 end
308 conn:close(); 309 conn:close();
309 end 310 end
310 original_session.ip = session.ip; 311 original_session.ip = session.ip;
311 original_session.conn = session.conn; 312 original_session.conn = session.conn;
312 original_session.send = session.send; 313 original_session.send = session.send;
314 original_session.send.filter = original_session.filter;
313 original_session.stream = session.stream; 315 original_session.stream = session.stream;
314 original_session.secure = session.secure; 316 original_session.secure = session.secure;
315 original_session.hibernating = nil; 317 original_session.hibernating = nil;
316 local filter = original_session.filter; 318 wrap_session(original_session, true);
317 local stream = session.stream;
318 local log = session.log;
319 function original_session.data(data)
320 data = filter("bytes/in", data);
321 if data then
322 local ok, err = stream:feed(data);
323 if ok then return; end
324 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
325 original_session:close("xml-not-well-formed");
326 end
327 end
328 wrap_session(original_session, true, xmlns_sm);
329 -- Inform xmppstream of the new session (passed to its callbacks) 319 -- Inform xmppstream of the new session (passed to its callbacks)
330 stream:set_session(original_session); 320 original_session.stream:set_session(original_session);
331 -- Similar for connlisteners 321 -- Similar for connlisteners
332 c2s_sessions[session.conn] = original_session; 322 c2s_sessions[session.conn] = original_session;
333 323
334 session.send(st.stanza("resumed", { xmlns = xmlns_sm, 324 session.send(st.stanza("resumed", { xmlns = xmlns_sm,
335 h = original_session.handled_stanza_count, previd = id })); 325 h = original_session.handled_stanza_count, previd = id }));