comparison mod_websocket/mod_websocket.lua @ 1340:3ffd64b4ab59

mod_websocket: Update to draft-ietf-xmpp-websocket-01
author Florian Zeitz <florob@babelmonkeys.de>
date Sun, 09 Mar 2014 23:35:57 +0100
parents b21236b6b8d8
children cc77341af5ee
comparison
equal deleted inserted replaced
1337:c38f163f18b9 1340:3ffd64b4ab59
9 9
10 local add_filter = require "util.filters".add_filter; 10 local add_filter = require "util.filters".add_filter;
11 local sha1 = require "util.hashes".sha1; 11 local sha1 = require "util.hashes".sha1;
12 local base64 = require "util.encodings".base64.encode; 12 local base64 = require "util.encodings".base64.encode;
13 local softreq = require "util.dependencies".softreq; 13 local softreq = require "util.dependencies".softreq;
14 local st = require "util.stanza";
15 local parse_xml = require "util.xml".parse;
14 local portmanager = require "core.portmanager"; 16 local portmanager = require "core.portmanager";
17 local sm_destroy_session = sessionmanager.destroy_session;
18 local log = module._log;
15 19
16 local bit; 20 local bit;
17 pcall(function() bit = require"bit"; end); 21 pcall(function() bit = require"bit"; end);
18 bit = bit or softreq"bit32" 22 bit = bit or softreq"bit32"
19 if not bit then module:log("error", "No bit module found. Either LuaJIT 2, lua-bitop or Lua 5.2 is required"); end 23 if not bit then module:log("error", "No bit module found. Either LuaJIT 2, lua-bitop or Lua 5.2 is required"); end
35 end 39 end
36 if type(cross_domain) ~= "string" then 40 if type(cross_domain) ~= "string" then
37 cross_domain = nil; 41 cross_domain = nil;
38 end 42 end
39 end 43 end
44
45 local xmlns_framing = "urn:ietf:params:xml:ns:xmpp-framing";
46 local xmlns_streams = "http://etherx.jabber.org/streams";
47 local xmlns_client = "jabber:client";
40 48
41 module:depends("c2s") 49 module:depends("c2s")
42 local sessions = module:shared("c2s/sessions"); 50 local sessions = module:shared("c2s/sessions");
43 local c2s_listener = portmanager.get_service("c2s").listener; 51 local c2s_listener = portmanager.get_service("c2s").listener;
44 52
126 result[#result+1] = data; 134 result[#result+1] = data;
127 135
128 return t_concat(result, ""); 136 return t_concat(result, "");
129 end 137 end
130 138
139 --- Session methods
140 local function session_open_stream(session)
141 local attr = {
142 xmlns = xmlns_framing,
143 version = "1.0",
144 id = session.streamid or "",
145 from = session.host
146 };
147 session.send(st.stanza("open", attr));
148 end
149
150 local function session_close(session, reason)
151 local log = session.log or log;
152 if session.conn then
153 if session.notopen then
154 session:open_stream();
155 end
156 if reason then -- nil == no err, initiated by us, false == initiated by client
157 local stream_error = st.stanza("stream:error");
158 if type(reason) == "string" then -- assume stream error
159 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
160 elseif type(reason) == "table" then
161 if reason.condition then
162 stream_error:tag(reason.condition, stream_xmlns_attr):up();
163 if reason.text then
164 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
165 end
166 if reason.extra then
167 stream_error:add_child(reason.extra);
168 end
169 elseif reason.name then -- a stanza
170 stream_error = reason;
171 end
172 end
173 stream_error = tostring(stream_error);
174 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
175 session.send(stream_error);
176 end
177
178 session.send(st.stanza("close", { xmlns = xmlns_framing }));
179 function session.send() return false; end
180
181 local reason = (reason and (reason.name or reason.text or reason.condition)) or reason;
182 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
183
184 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
185 local conn = session.conn;
186 if reason == nil and not session.notopen and session.type == "c2s" then
187 -- Grace time to process data from authenticated cleanly-closed stream
188 add_task(stream_close_timeout, function ()
189 if not session.destroyed then
190 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
191 sm_destroy_session(session, reason);
192 -- Sends close with code 1000 and message "Stream closed"
193 local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed";
194 conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
195 conn:close();
196 end
197 end);
198 else
199 sm_destroy_session(session, reason);
200 -- Sends close with code 1000 and message "Stream closed"
201 local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed";
202 conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
203 conn:close();
204 end
205 end
206 end
207
208
131 --- Filter stuff 209 --- Filter stuff
210 local function filter_open_close(data)
211 if not data:find(xmlns_framing, 1, true) then return data; end
212
213 local oc = parse_xml(data);
214 if not oc then return data; end
215 if oc.attr.xmlns ~= xmlns_framing then return data; end
216 if oc.name == "close" then return "</stream:stream>"; end
217 if oc.name == "open" then
218 oc.name = "stream:stream";
219 oc.attr.xmlns = nil;
220 oc.attr["xmlns:stream"] = xmlns_streams;
221 return oc:top_tag();
222 end
223
224 return data;
225 end
132 function handle_request(event, path) 226 function handle_request(event, path)
133 local request, response = event.request, event.response; 227 local request, response = event.request, event.response;
134 local conn = response.conn; 228 local conn = response.conn;
135 229
136 if not request.headers.sec_websocket_key then 230 if not request.headers.sec_websocket_key then
243 337
244 local session = sessions[conn]; 338 local session = sessions[conn];
245 339
246 session.secure = consider_websocket_secure or session.secure; 340 session.secure = consider_websocket_secure or session.secure;
247 341
342 session.open_stream = session_open_stream;
343 session.close = session_close;
344
248 local frameBuffer = ""; 345 local frameBuffer = "";
249 add_filter(session, "bytes/in", function(data) 346 add_filter(session, "bytes/in", function(data)
250 local cache = {}; 347 local cache = {};
251 frameBuffer = frameBuffer .. data; 348 frameBuffer = frameBuffer .. data;
252 local frame, length = parse_frame(frameBuffer); 349 local frame, length = parse_frame(frameBuffer);
253 350
254 while frame do 351 while frame do
255 frameBuffer = frameBuffer:sub(length + 1); 352 frameBuffer = frameBuffer:sub(length + 1);
256 local result = handle_frame(frame); 353 local result = handle_frame(frame);
257 if not result then return; end 354 if not result then return; end
258 cache[#cache+1] = result; 355 cache[#cache+1] = filter_open_close(result);
259 frame, length = parse_frame(frameBuffer); 356 frame, length = parse_frame(frameBuffer);
260 end 357 end
261 return t_concat(cache, ""); 358 return t_concat(cache, "");
262 end); 359 end);
263 360
264 add_filter(session, "bytes/out", function(data) 361 add_filter(session, "bytes/out", function(data)
265 return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); 362 return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
363 end);
364
365 add_filter(session, "stanzas/out", function(stanza)
366 local attr = stanza.attr;
367 attr.xmlns = attr.xmlns or xmlns_client;
368 if stanza.name:find("^stream:") then
369 attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams;
370 end
371 return stanza;
266 end); 372 end);
267 373
268 response.status_code = 101; 374 response.status_code = 101;
269 response.headers.upgrade = "websocket"; 375 response.headers.upgrade = "websocket";
270 response.headers.connection = "Upgrade"; 376 response.headers.connection = "Upgrade";