comparison mod_websocket/mod_websocket.lua @ 927:a9dfa7232d88

Merge
author Matthew Wild <mwild1@gmail.com>
date Tue, 12 Mar 2013 12:10:25 +0000
parents c469a2b2d77d
children 8eba9d4809d2
comparison
equal deleted inserted replaced
926:f88381a39c56 927:a9dfa7232d88
1 -- Prosody IM 1 -- Prosody IM
2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
4 -- Copyright (C) 2012 Florian Zeitz 2 -- Copyright (C) 2012 Florian Zeitz
5 -- 3 --
6 -- This project is MIT/X11 licensed. Please see the 4 -- This project is MIT/X11 licensed. Please see the
7 -- COPYING file in the source package for more information. 5 -- COPYING file in the source package for more information.
8 -- 6 --
9 7
10 module:set_global(); 8 module:set_global();
11 9
12 local add_task = require "util.timer".add_task; 10 local add_filter = require "util.filters".add_filter;
13 local new_xmpp_stream = require "util.xmppstream".new;
14 local nameprep = require "util.encodings".stringprep.nameprep;
15 local sessionmanager = require "core.sessionmanager";
16 local st = require "util.stanza";
17 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
18 local uuid_generate = require "util.uuid".generate;
19 local sha1 = require "util.hashes".sha1; 11 local sha1 = require "util.hashes".sha1;
20 local base64 = require "util.encodings".base64.encode; 12 local base64 = require "util.encodings".base64.encode;
21 local band = require "bit".band; 13 local softreq = require "util.dependencies".softreq;
22 local bxor = require "bit".bxor; 14 local portmanager = require "core.portmanager";
23 15
24 local xpcall, tostring, type = xpcall, tostring, type; 16 local bit;
25 local traceback = debug.traceback; 17 pcall(function() bit = require"bit"; end);
26 18 bit = bit or softreq"bit32"
27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 19 if not bit then module:log("error", "No bit module found. Either LuaJIT 2, lua-bitop or Lua 5.2 is required"); end
28 20 local band = bit.band;
29 local log = module._log; 21 local bxor = bit.bxor;
30 22 local rshift = bit.rshift;
31 local c2s_timeout = module:get_option_number("c2s_timeout");
32 local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5);
33 local opt_keepalives = module:get_option_boolean("tcp_keepalives", false);
34 23
35 local cross_domain = module:get_option("cross_domain_websocket"); 24 local cross_domain = module:get_option("cross_domain_websocket");
36 if cross_domain then 25 if cross_domain then
37 if cross_domain == true then 26 if cross_domain == true then
38 cross_domain = "*"; 27 cross_domain = "*";
42 if type(cross_domain) ~= "string" then 31 if type(cross_domain) ~= "string" then
43 cross_domain = nil; 32 cross_domain = nil;
44 end 33 end
45 end 34 end
46 35
47 local sessions = module:shared("sessions"); 36 module:depends("c2s")
48 local core_process_stanza = prosody.core_process_stanza; 37 local sessions = module:shared("c2s/sessions");
49 38 local c2s_listener = portmanager.get_service("c2s").listener;
50 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
51 local listener = {};
52 39
53 -- Websocket helpers 40 -- Websocket helpers
54 local function parse_frame(frame) 41 local function parse_frame(frame)
55 local result = {}; 42 local result = {};
56 local pos = 1; 43 local pos = 1;
116 length = #data; 103 length = #data;
117 if length <= 125 then -- 7-bit length 104 if length <= 125 then -- 7-bit length
118 result = result .. string.char(length); 105 result = result .. string.char(length);
119 elseif length <= 0xFFFF then -- 2-byte length 106 elseif length <= 0xFFFF then -- 2-byte length
120 result = result .. string.char(126); 107 result = result .. string.char(126);
121 result = result .. string.char(length/0x100) .. string.char(length%0x100); 108 result = result .. string.char(rshift(length, 8)) .. string.char(length%0x100);
122 else -- 8-byte length 109 else -- 8-byte length
123 result = result .. string.char(127); 110 result = result .. string.char(127);
124 for i = 7, 0, -1 do 111 for i = 7, 0, -1 do
125 result = result .. string.char(( length / (2^(8*i)) ) % 0x100); 112 result = result .. string.char(rshift(length, 8*i) % 0x100);
126 end 113 end
127 end 114 end
128 115
129 result = result .. data; 116 result = result .. data;
130 117
131 return result; 118 return result;
132 end 119 end
133 120
134 --- Stream events handlers 121 --- Filter stuff
135 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 122 function handle_request(event, path)
136 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; 123 local request, response = event.request, event.response;
137 124 local conn = response.conn;
138 function stream_callbacks.streamopened(session, attr) 125
139 local send = session.send; 126 if not request.headers.sec_websocket_key then
140 session.host = nameprep(attr.to); 127 response.headers.content_type = "text/html";
141 if not session.host then 128 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
142 session:close{ condition = "improper-addressing", 129 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
143 text = "A valid 'to' attribute is required on stream headers" }; 130 </body></html>]];
144 return; 131 end
145 end 132
146 session.version = tonumber(attr.version) or 0; 133 local wants_xmpp = false;
147 session.streamid = uuid_generate(); 134 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
148 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); 135 if proto == "xmpp" then wants_xmpp = true; end
149 136 end);
150 if not hosts[session.host] then 137
151 -- We don't serve this host... 138 if not wants_xmpp then
152 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; 139 return 501;
153 return;
154 end
155
156 -- COMPAT: Some current client implementations need this to be self-closing
157 if session.self_closing_stream then
158 send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", {
159 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
160 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' })));
161 else
162 send("<?xml version='1.0'?>"..st.stanza("stream:stream", {
163 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
164 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag());
165 end
166
167 (session.log or log)("debug", "Sent reply <stream:stream> to client");
168 session.notopen = nil;
169
170 -- If session.secure is *false* (not nil) then it means we /were/ encrypting
171 -- since we now have a new stream header, session is secured
172 if session.secure == false then
173 session.secure = true;
174 end
175
176 local features = st.stanza("stream:features");
177 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
178 module:fire_event("stream-features", session, features);
179
180 send(features);
181 end
182
183 function stream_callbacks.streamclosed(session)
184 session.log("debug", "Received </stream:stream>");
185 session:close(false);
186 end
187
188 function stream_callbacks.error(session, error, data)
189 if error == "no-stream" then
190 session.log("debug", "Invalid opening stream header");
191 session:close("invalid-namespace");
192 elseif error == "parse-error" then
193 (session.log or log)("debug", "Client XML parse error: %s", tostring(data));
194 session:close("not-well-formed");
195 elseif error == "stream-error" then
196 local condition, text = "undefined-condition";
197 for child in data:children() do
198 if child.attr.xmlns == xmlns_xmpp_streams then
199 if child.name ~= "text" then
200 condition = child.name;
201 else
202 text = child:get_text();
203 end
204 if condition ~= "undefined-condition" and text then
205 break;
206 end
207 end
208 end
209 text = condition .. (text and (" ("..text..")") or "");
210 session.log("info", "Session closed by remote with error: %s", text);
211 session:close(nil, text);
212 end
213 end
214
215 local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end
216 function stream_callbacks.handlestanza(session, stanza)
217 stanza = session.filter("stanzas/in", stanza);
218 if stanza then
219 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
220 end
221 end
222
223 --- Session methods
224 local function session_close(session, reason)
225 local log = session.log or log;
226 if session.conn then
227 if session.notopen then
228 -- COMPAT: Some current client implementations need this to be self-closing
229 if session.self_closing_stream then
230 session.send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", default_stream_attr)));
231 else
232 session.send("<?xml version='1.0'?>"..st.stanza("stream:stream", default_stream_attr):top_tag());
233 end
234 end
235 if reason then -- nil == no err, initiated by us, false == initiated by client
236 if type(reason) == "string" then -- assume stream error
237 log("debug", "Disconnecting client, <stream:error> is: %s", reason);
238 session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
239 elseif type(reason) == "table" then
240 if reason.condition then
241 local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up();
242 if reason.text then
243 stanza:tag("text", stream_xmlns_attr):text(reason.text):up();
244 end
245 if reason.extra then
246 stanza:add_child(reason.extra);
247 end
248 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(stanza));
249 session.send(stanza);
250 elseif reason.name then -- a stanza
251 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(reason));
252 session.send(reason);
253 end
254 end
255 end
256 session.send("</stream:stream>");
257 function session.send() return false; end
258
259 local reason = (reason and (reason.text or reason.condition)) or reason;
260 session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
261
262 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
263 local conn = session.conn;
264 if reason == nil and not session.notopen and session.type == "c2s" then
265 -- Grace time to process data from authenticated cleanly-closed stream
266 add_task(stream_close_timeout, function ()
267 if not session.destroyed then
268 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
269 sm_destroy_session(session, reason);
270 conn:close();
271 end
272 end);
273 else
274 sm_destroy_session(session, reason);
275 conn:close();
276 end
277 end
278 end
279
280 module:hook_global("user-deleted", function(event)
281 local username, host = event.username, event.host;
282 local user = hosts[host].sessions[username];
283 if user and user.sessions then
284 for jid, session in pairs(user.sessions) do
285 session:close{ condition = "not-authorized", text = "Account deleted" };
286 end
287 end
288 end, 200);
289
290 --- Port listener
291 function listener.onconnect(conn)
292 local session = sm_new_session(conn);
293 sessions[conn] = session;
294
295 session.log("info", "Client connected");
296
297 -- Client is using legacy SSL (otherwise mod_tls sets this flag)
298 if conn:ssl() then
299 session.secure = true;
300 end
301
302 if opt_keepalives then
303 conn:setoption("keepalive", opt_keepalives);
304 end
305
306 session.close = session_close;
307
308 session.conn.starttls = nil;
309
310 local stream = new_xmpp_stream(session, stream_callbacks);
311 session.stream = stream;
312 session.notopen = true;
313
314 function session.reset_stream()
315 session.notopen = true;
316 session.stream:reset();
317 end 140 end
318 141
319 local function websocket_close(code, message) 142 local function websocket_close(code, message)
320 local data = string.char(code/0x100) .. string.char(code%0x100) .. message; 143 local data = string.char(rshift(code, 8)) .. string.char(code%0x100) .. message;
321 conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); 144 conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
322 conn:close(); 145 conn:close();
323 end 146 end
324 147
325 local filter = session.filter;
326 local dataBuffer; 148 local dataBuffer;
327 local function handle_frame(frame) 149 local function handle_frame(frame)
328 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data); 150 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data);
329 151
330 -- Error cases 152 -- Error cases
363 dataBuffer = dataBuffer .. frame.data; 185 dataBuffer = dataBuffer .. frame.data;
364 elseif frame.opcode == 0x1 then -- Text frame 186 elseif frame.opcode == 0x1 then -- Text frame
365 dataBuffer = frame.data; 187 dataBuffer = frame.data;
366 elseif frame.opcode == 0x2 then -- Binary frame 188 elseif frame.opcode == 0x2 then -- Binary frame
367 websocket_close(1003, "Only text frames are supported"); 189 websocket_close(1003, "Only text frames are supported");
368 return false; 190 return;
369 elseif frame.opcode == 0x8 then -- Close request 191 elseif frame.opcode == 0x8 then -- Close request
370 websocket_close(1000, "Goodbye"); 192 websocket_close(1000, "Goodbye");
371 return false; 193 return;
372 elseif frame.opcode == 0x9 then -- Ping frame 194 elseif frame.opcode == 0x9 then -- Ping frame
373 frame.opcode = 0xA; 195 frame.opcode = 0xA;
374 conn:write(build_frame(frame)); 196 conn:write(build_frame(frame));
375 return true; 197 return "";
376 else 198 else
377 log("warn", "Received frame with unsupported opcode %i", frame.opcode); 199 log("warn", "Received frame with unsupported opcode %i", frame.opcode);
378 return true; 200 return "";
379 end 201 end
380 202
381 if frame.FIN then 203 if frame.FIN then
382 data = dataBuffer; 204 data = dataBuffer;
383 dataBuffer = nil; 205 dataBuffer = nil;
384 206
385 -- COMPAT: Some current client implementations send a self-closing <stream:stream> 207 return data;
386 data, session.self_closing_stream = data:gsub("^(<stream:stream.*)/>$", "%1>"); 208 end
387 session.self_closing_stream = (session.self_closing_stream == 1) 209 return "";
388 210 end
389 data = filter("bytes/in", data); 211
390 if data then 212 conn:setlistener(c2s_listener);
391 local ok, err = stream:feed(data); 213 c2s_listener.onconnect(conn);
392 if ok then return; end
393 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
394 session:close("not-well-formed");
395 end
396 end
397 return true;
398 end
399 214
400 local frameBuffer = ""; 215 local frameBuffer = "";
401 function session.data(data) 216 add_filter(sessions[conn], "bytes/in", function(data)
217 local cache = "";
402 frameBuffer = frameBuffer .. data; 218 frameBuffer = frameBuffer .. data;
403 local frame, length = parse_frame(frameBuffer); 219 local frame, length = parse_frame(frameBuffer);
404 220
405 while frame do 221 while frame do
406 frameBuffer = frameBuffer:sub(length + 1); 222 frameBuffer = frameBuffer:sub(length + 1);
407 if not handle_frame(frame) then return; end 223 local result = handle_frame(frame);
224 if not result then return; end
225 cache = cache .. result;
408 frame, length = parse_frame(frameBuffer); 226 frame, length = parse_frame(frameBuffer);
409 end 227 end
410 end 228 return cache;
411 229
412 function session.send(s)
413 conn:write(build_frame({ FIN = true, opcode = 0x01, data = tostring(s)}));
414 end
415
416 if c2s_timeout then
417 add_task(c2s_timeout, function ()
418 if session.type == "c2s_unauthed" then
419 session:close("connection-timeout");
420 end
421 end);
422 end
423
424 session.dispatch_stanza = stream_callbacks.handlestanza;
425 end
426
427 function listener.onincoming(conn, data)
428 local session = sessions[conn];
429 if session then
430 session.data(data);
431 else
432 listener.onconnect(conn, data);
433 session = sessions[conn];
434 session.data(data);
435 end
436 end
437
438 function listener.ondisconnect(conn, err)
439 local session = sessions[conn];
440 if session then
441 (session.log or log)("info", "Client disconnected: %s", err or "connection closed");
442 sm_destroy_session(session, err);
443 sessions[conn] = nil;
444 end
445 end
446
447 function listener.associate_session(conn, session)
448 sessions[conn] = session;
449 end
450
451 function handle_request(event, path)
452 local request, response = event.request, event.response;
453
454 if not request.headers.sec_websocket_key then
455 response.headers.content_type = "text/html";
456 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
457 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
458 </body></html>]];
459 end
460
461 local wants_xmpp = false;
462 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
463 if proto == "xmpp" then wants_xmpp = true; end
464 end); 230 end);
465 231
466 if not wants_xmpp then 232 add_filter(sessions[conn], "bytes/out", function(data)
467 return 501; 233 return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
468 end 234 end);
469 235
470 response.conn:setlistener(listener);
471 response.status = "101 Switching Protocols"; 236 response.status = "101 Switching Protocols";
472 response.headers.Upgrade = "websocket"; 237 response.headers.upgrade = "websocket";
473 response.headers.Connection = "Upgrade"; 238 response.headers.connection = "Upgrade";
474 response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); 239 response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
475 response.headers.Sec_WebSocket_Protocol = "xmpp"; 240 response.headers.sec_webSocket_protocol = "xmpp";
476 response.headers.Access_Control_Allow_Origin = cross_domain; 241 response.headers.access_control_allow_origin = cross_domain;
477 242
478 return ""; 243 return "";
479 end 244 end
480 245
481 function module.add_host(module) 246 function module.add_host(module)
482 module:depends("http"); 247 module:depends("http");
483 module:provides("http", { 248 module:provides("http", {
484 name = "xmpp-websocket"; 249 name = "websocket";
250 default_path = "xmpp-websocket";
485 route = { 251 route = {
486 ["GET"] = handle_request; 252 ["GET"] = handle_request;
487 ["GET /"] = handle_request; 253 ["GET /"] = handle_request;
488 }; 254 };
489 }); 255 });