view mod_smacks/mod_smacks.lua @ 234:abcb59ab355c

Add new motd_sequential module. This module lets you define numbered messages shown to each user in order, but only once per user, and persistent across server restarts. Useful for notifying users of added features and changes in an incremental fashion.
author Jeff Mitchell <jeffrey.mitchell@gmail.com>
date Wed, 04 Aug 2010 22:29:51 +0000
parents 263858d40ceb
children 9b9b089407b1
line wrap: on
line source

local st = require "util.stanza";

local t_insert, t_remove = table.insert, table.remove;
local math_min = math.min;
local tonumber, tostring = tonumber, tostring;
local add_filter = require "util.filters".add_filter;

local xmlns_sm = "urn:xmpp:sm:2";

local sm_attr = { xmlns = xmlns_sm };

local max_unacked_stanzas = 0;

module:add_event_hook("stream-features",
		function (session, features)
			features:tag("sm", sm_attr):tag("optional"):up():up();
		end);

module:hook("s2s-stream-features",
		function (data)
			data.features:tag("sm", sm_attr):tag("optional"):up():up();
		end);

module:hook_stanza(xmlns_sm, "enable",
		function (session, stanza)
			module:log("debug", "Enabling stream management");
			session.smacks = true;
			
			-- Overwrite process_stanza() and send()
			local queue = {};
			session.outgoing_stanza_queue = queue;
			session.last_acknowledged_stanza = 0;
			local _send = session.send;
			function session.send(stanza)
				local attr = stanza.attr;
				if attr and not attr.xmlns then -- Stanza in default stream namespace
					queue[#queue+1] = st.reply(stanza);
				end
				local ok, err = _send(stanza);
				if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
					session.awaiting_ack = true;
					return _send(st.stanza("r", { xmlns = xmlns_sm }));
				end
				return ok, err;
			end
			
			session.handled_stanza_count = 0;
			add_filter(session, "stanzas/in", function (stanza)
				if not stanza.attr.xmlns then
					session.handled_stanza_count = session.handled_stanza_count + 1;
					session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
				end
				return stanza;
			end);

			if not stanza.attr.resume then -- FIXME: Resumption should be a different spec :/
				_send(st.stanza("enabled", sm_attr));
				return true;
			end
		end, 100);

module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
	if not origin.smacks then
		module:log("debug", "Received ack request from non-smack-enabled session");
		return;
	end
	module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
	-- Reply with <a>
	origin.send(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
	return true;
end);

module:hook_stanza(xmlns_sm, "a", function (origin, stanza)
	if not origin.smacks then return; end
	origin.awaiting_ack = nil;
	-- Remove handled stanzas from outgoing_stanza_queue
	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
	local queue = origin.outgoing_stanza_queue;
	if handled_stanza_count > #queue then
		module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
			handled_stanza_count, #queue);
		for i=1,#queue do
			module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
		end
	end
	for i=1,math_min(handled_stanza_count,#queue) do
		t_remove(origin.outgoing_stanza_queue, 1);
	end
	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
	return true;
end);

--TODO: Optimise... incoming stanzas should be handled by a per-session
-- function that has a counter as an upvalue (no table indexing for increments,
-- and won't slow non-198 sessions). We can also then remove the .handled flag
-- on stanzas

function handle_unacked_stanzas(session)
	local queue = session.outgoing_stanza_queue;
	local error_attr = { type = "cancel" };
	if #queue > 0 then
		session.outgoing_stanza_queue = {};
		for i=1,#queue do
			local reply = queue[i];
			if reply.attr.to ~= session.full_jid then
				reply.attr.type = "error";
				reply:tag("error", error_attr)
					:tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
				core_process_stanza(session, queue[i]);
			end
		end
	end
end

local _destroy_session = sessionmanager.destroy_session;
function sessionmanager.destroy_session(session, err)
	if session.smacks then
		local queue = session.outgoing_stanza_queue;
		if #queue > 0 then
			module:log("warn", "Destroying session with %d unacked stanzas:", #queue);
			for i=1,#queue do
				module:log("warn", "::%s", tostring(queue[i]));
			end
			handle_unacked_stanzas(session);
		end
	end
	return _destroy_session(session, err);
end