changeset 1520:2881d532f385

mod_smacks: Use filters for queuing outgoing stanzas instead of wrapping session.send()
author Kim Alvefur <zash@zash.se>
date Sun, 12 Oct 2014 13:24:50 +0200
parents 67c80abe742e
children 71af9c272d72
files mod_smacks/mod_smacks.lua
diffstat 1 files changed, 50 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/mod_smacks/mod_smacks.lua	Thu Oct 09 15:08:05 2014 +0200
+++ b/mod_smacks/mod_smacks.lua	Sun Oct 12 13:24:50 2014 +0200
@@ -68,8 +68,47 @@
 			end
 		end);
 
-local function wrap_session(session, resume, xmlns_sm)
-	local sm_attr = { xmlns = xmlns_sm };
+local function outgoing_stanza_filter(stanza, session)
+	local is_stanza = stanza.attr and not stanza.attr.xmlns;
+	if is_stanza and not stanza._cached then -- Stanza in default stream namespace
+		local queue = session.outgoing_stanza_queue;
+		module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
+		local cached_stanza = st.clone(stanza);
+		cached_stanza._cached = true;
+
+		if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
+			cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
+		end
+
+		queue[#queue+1] = cached_stanza;
+		session.log("debug", "#queue = %d", #queue);
+		if #queue > max_unacked_stanzas then
+			module:add_timer(0, function ()
+				if not session.awaiting_ack then
+					session.awaiting_ack = true;
+					session.send(st.stanza("r", { xmlns = session.smacks }));
+				end
+			end);
+		end
+	end
+	if session.hibernating then
+		session.log("debug", "hibernating, stanza queued")
+		-- The session is hibernating, no point in sending the stanza
+		-- over a dead connection.  It will be delivered upon resumption.
+		return nil; -- or empty string?
+	end
+	return stanza;
+end
+
+local function count_incoming_stanzas(stanza, session)
+	if not stanza.attr.xmlns then
+		session.handled_stanza_count = session.handled_stanza_count + 1;
+		session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
+	end
+	return stanza;
+end
+
+local function wrap_session(session, resume)
 	-- Overwrite process_stanza() and send()
 	local queue;
 	if not resume then
@@ -80,39 +119,7 @@
 		queue = session.outgoing_stanza_queue;
 	end
 
-	local _send = session.sends2s or session.send;
-	local function new_send(stanza)
-		local is_stanza = stanza.attr and not stanza.attr.xmlns;
-		if is_stanza then -- Stanza in default stream namespace
-			module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
-			local cached_stanza = st.clone(stanza);
-
-			if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
-				cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
-			end
-
-			queue[#queue+1] = cached_stanza;
-			session.log("debug", "#queue = %d", #queue);
-		end
-		if session.hibernating then
-			session.log("debug", "hibernating, stanza queued")
-			-- The session is hibernating, no point in sending the stanza
-			-- over a dead connection.  It will be delivered upon resumption.
-			return true;
-		end
-		local ok, err = _send(stanza);
-		if ok and #queue > max_unacked_stanzas and not session.awaiting_ack and is_stanza then
-			session.awaiting_ack = true;
-			return _send(st.stanza("r", sm_attr));
-		end
-		return ok, err;
-	end
-
-	if session.sends2s then
-		session.sends2s = new_send;
-	else
-		session.send = new_send;
-	end
+	add_filter(session, "stanzas/out", outgoing_stanza_filter, -1000);
 
 	local session_close = session.close;
 	function session.close(...)
@@ -125,13 +132,7 @@
 
 	if not resume then
 		session.handled_stanza_count = 0;
-		add_filter(session, "stanzas/in", function (stanza)
-			if not stanza.attr.xmlns then
-				session.handled_stanza_count = session.handled_stanza_count + 1;
-				session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
-			end
-			return stanza;
-		end);
+		add_filter(session, "stanzas/in", count_incoming_stanzas, 1000);
 	end
 
 	return session;
@@ -146,9 +147,9 @@
 	end
 
 	module:log("debug", "Enabling stream management");
-	session.smacks = true;
+	session.smacks = xmlns_sm;
 
-	wrap_session(session, false, xmlns_sm);
+	wrap_session(session, false);
 
 	local resume_token;
 	local resume = stanza.attr.resume;
@@ -165,9 +166,9 @@
 
 function handle_enabled(session, stanza, xmlns_sm)
 	module:log("debug", "Enabling stream management");
-	session.smacks = true;
+	session.smacks = xmlns_sm;
 
-	wrap_session(session, false, xmlns_sm);
+	wrap_session(session, false);
 
 	-- FIXME Resume?
 
@@ -310,24 +311,13 @@
 		original_session.ip = session.ip;
 		original_session.conn = session.conn;
 		original_session.send = session.send;
+		original_session.send.filter = original_session.filter;
 		original_session.stream = session.stream;
 		original_session.secure = session.secure;
 		original_session.hibernating = nil;
-		local filter = original_session.filter;
-		local stream = session.stream;
-		local log = session.log;
-		function original_session.data(data)
-			data = filter("bytes/in", data);
-			if data then
-				local ok, err = stream:feed(data);
-				if ok then return; end
-				log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
-				original_session:close("xml-not-well-formed");
-			end
-		end
-		wrap_session(original_session, true, xmlns_sm);
+		wrap_session(original_session, true);
 		-- Inform xmppstream of the new session (passed to its callbacks)
-		stream:set_session(original_session);
+		original_session.stream:set_session(original_session);
 		-- Similar for connlisteners
 		c2s_sessions[session.conn] = original_session;