# HG changeset patch # User Kim Alvefur # Date 1394406240 -3600 # Node ID f5c256a5f20976a15793fca20e86e77ada9ab9ba # Parent 3ffd64b4ab59908ad6b2a5e153a0eb0652c15b13# Parent 50555c2ccbcd933afa7d397bb7e0900610c00101 Merge diff -r 50555c2ccbcd -r f5c256a5f209 mod_websocket/mod_websocket.lua --- a/mod_websocket/mod_websocket.lua Sun Mar 09 23:17:17 2014 +0100 +++ b/mod_websocket/mod_websocket.lua Mon Mar 10 00:04:00 2014 +0100 @@ -11,7 +11,11 @@ local sha1 = require "util.hashes".sha1; local base64 = require "util.encodings".base64.encode; local softreq = require "util.dependencies".softreq; +local st = require "util.stanza"; +local parse_xml = require "util.xml".parse; local portmanager = require "core.portmanager"; +local sm_destroy_session = sessionmanager.destroy_session; +local log = module._log; local bit; pcall(function() bit = require"bit"; end); @@ -38,6 +42,10 @@ end end +local xmlns_framing = "urn:ietf:params:xml:ns:xmpp-framing"; +local xmlns_streams = "http://etherx.jabber.org/streams"; +local xmlns_client = "jabber:client"; + module:depends("c2s") local sessions = module:shared("c2s/sessions"); local c2s_listener = portmanager.get_service("c2s").listener; @@ -128,7 +136,93 @@ return t_concat(result, ""); end +--- Session methods +local function session_open_stream(session) + local attr = { + xmlns = xmlns_framing, + version = "1.0", + id = session.streamid or "", + from = session.host + }; + session.send(st.stanza("open", attr)); +end + +local function session_close(session, reason) + local log = session.log or log; + if session.conn then + if session.notopen then + session:open_stream(); + end + if reason then -- nil == no err, initiated by us, false == initiated by client + local stream_error = st.stanza("stream:error"); + if type(reason) == "string" then -- assume stream error + stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); + elseif type(reason) == "table" then + if reason.condition then + stream_error:tag(reason.condition, stream_xmlns_attr):up(); + if reason.text then + stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); + end + if reason.extra then + stream_error:add_child(reason.extra); + end + elseif reason.name then -- a stanza + stream_error = reason; + end + end + stream_error = tostring(stream_error); + log("debug", "Disconnecting client, is: %s", stream_error); + session.send(stream_error); + end + + session.send(st.stanza("close", { xmlns = xmlns_framing })); + function session.send() return false; end + + local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; + session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); + + -- Authenticated incoming stream may still be sending us stanzas, so wait for from remote + local conn = session.conn; + if reason == nil and not session.notopen and session.type == "c2s" then + -- Grace time to process data from authenticated cleanly-closed stream + add_task(stream_close_timeout, function () + if not session.destroyed then + session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); + sm_destroy_session(session, reason); + -- Sends close with code 1000 and message "Stream closed" + local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed"; + conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); + conn:close(); + end + end); + else + sm_destroy_session(session, reason); + -- Sends close with code 1000 and message "Stream closed" + local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed"; + conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); + conn:close(); + end + end +end + + --- Filter stuff +local function filter_open_close(data) + if not data:find(xmlns_framing, 1, true) then return data; end + + local oc = parse_xml(data); + if not oc then return data; end + if oc.attr.xmlns ~= xmlns_framing then return data; end + if oc.name == "close" then return ""; end + if oc.name == "open" then + oc.name = "stream:stream"; + oc.attr.xmlns = nil; + oc.attr["xmlns:stream"] = xmlns_streams; + return oc:top_tag(); + end + + return data; +end function handle_request(event, path) local request, response = event.request, event.response; local conn = response.conn; @@ -245,6 +339,9 @@ session.secure = consider_websocket_secure or session.secure; + session.open_stream = session_open_stream; + session.close = session_close; + local frameBuffer = ""; add_filter(session, "bytes/in", function(data) local cache = {}; @@ -255,7 +352,7 @@ frameBuffer = frameBuffer:sub(length + 1); local result = handle_frame(frame); if not result then return; end - cache[#cache+1] = result; + cache[#cache+1] = filter_open_close(result); frame, length = parse_frame(frameBuffer); end return t_concat(cache, ""); @@ -265,6 +362,15 @@ return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); end); + add_filter(session, "stanzas/out", function(stanza) + local attr = stanza.attr; + attr.xmlns = attr.xmlns or xmlns_client; + if stanza.name:find("^stream:") then + attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams; + end + return stanza; + end); + response.status_code = 101; response.headers.upgrade = "websocket"; response.headers.connection = "Upgrade";