diff mod_muc_rai/mod_muc_rai.lua @ 3974:f14c862598a9

mod_muc_rai: New module to implement Room Activity Indicators
author Matthew Wild <mwild1@gmail.com>
date Wed, 15 Apr 2020 21:19:45 +0100
parents
children 0e72dd70afff
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_muc_rai/mod_muc_rai.lua	Wed Apr 15 21:19:45 2020 +0100
@@ -0,0 +1,215 @@
+local cache = require "util.cache";
+local jid = require "util.jid";
+local st = require "util.stanza";
+
+local max_subscribers = module:get_option_number("muc_rai_max_subscribers", 1024);
+
+local muc_affiliation_store = module:open_store("config", "map");
+local muc_archive = module:open_store("muc_log", "archive");
+
+local xmlns_rai = "xmpp:prosody.im/protocol/rai";
+
+local muc_markers = module:depends("muc_markers");
+
+-- subscriber_jid -> { [room_jid] = interested }
+local subscribed_users = cache.new(max_subscribers, false);
+-- room_jid -> { [user_jid] = interested }
+local interested_users = {};
+-- room_jid -> last_id
+local room_activity_cache = cache.new(1024);
+
+-- Send a single notification for a room, updating data structures as needed
+local function send_single_notification(user_jid, room_jid)
+	local notification = st.message({ to = user_jid, from = module.host })
+		:tag("rai", { xmlns = xmlns_rai })
+			:text_tag("activity", room_jid)
+		:up();
+	local interested_room_users = interested_users[room_jid];
+	if interested_room_users then
+		interested_room_users[user_jid] = nil;
+	end
+	local interested_rooms = subscribed_users:get(user_jid);
+	if interested_rooms then
+		interested_rooms[room_jid] = nil;
+	end
+	module:log("debug", "Sending notification from %s to %s", room_jid, user_jid);
+	return module:send(notification);
+end
+
+local function subscribe_room(user_jid, room_jid)
+	local interested_rooms = subscribed_users:get(user_jid);
+	if not interested_rooms then
+		return nil, "not-subscribed";
+	end
+	module:log("debug", "Subscribed %s to %s", user_jid, room_jid);
+	interested_rooms[room_jid] = true;
+
+	local interested_room_users = interested_users[room_jid];
+	if not interested_room_users then
+		interested_room_users = {};
+		interested_users[room_jid] = interested_room_users;
+	end
+	interested_room_users[user_jid] = true;
+	return true;
+end
+
+local function unsubscribe_room(user_jid, room_jid)
+	local interested_rooms = subscribed_users:get(user_jid);
+	if not interested_rooms then
+		return nil, "not-subscribed";
+	end
+	interested_rooms[room_jid] = nil;
+
+	local interested_room_users = interested_users[room_jid];
+	if not interested_room_users then
+		return true;
+	end
+	interested_room_users[user_jid] = nil;
+	return true;
+end
+
+local function notify_interested_users(room_jid)
+	module:log("warn", "NOTIFYING FOR %s", room_jid)
+	local interested_room_users = interested_users[room_jid];
+	if not interested_room_users then
+		module:log("debug", "Nobody interested in %s", room_jid);
+		return;
+	end
+	for user_jid in pairs(interested_room_users) do
+		send_single_notification(user_jid, room_jid);
+	end
+	return true;
+end
+
+local function unsubscribe_user_from_all_rooms(user_jid)
+	local interested_rooms = subscribed_users:get(user_jid);
+	if not interested_rooms then
+		return nil, "not-subscribed";
+	end
+	for room_jid in pairs(interested_rooms) do
+		unsubscribe_room(user_jid, room_jid);
+	end
+	return true;
+end
+
+local function get_last_room_message_id(room_jid)
+	local last_room_message_id = room_activity_cache:get(room_jid);
+	if last_room_message_id then
+		return last_room_message_id;
+	end
+
+	-- Load all the data!
+	local query = {
+		limit = 1;
+		reverse = true;
+		with = "message<groupchat";
+	}
+	local data, err = muc_archive:find(jid.node(room_jid), query);
+
+	if not data then
+		module:log("error", "Could not fetch history: %s", err);
+		return nil;
+	end
+
+	local id = data();
+	room_activity_cache:set(room_jid, id);
+	return id;
+end
+
+local function update_room_activity(room_jid, last_id)
+	room_activity_cache:set(room_jid, last_id);
+end
+
+local function get_last_user_read_id(user_jid, room_jid)
+	return muc_markers.get_user_read_marker(user_jid, room_jid);
+end
+
+local function has_new_activity(room_jid, user_jid)
+	local last_room_message_id = get_last_room_message_id(room_jid);
+	local last_user_read_id = get_last_user_read_id(user_jid, room_jid);
+	return last_room_message_id ~= last_user_read_id;
+end
+
+-- Returns a set of rooms that a user is interested in
+local function get_interested_rooms(user_jid)
+	-- Use affiliation as an indication of interest, return
+	-- all rooms a user is affiliated
+	return muc_affiliation_store:get_all(jid.bare(user_jid));
+end
+
+-- Subscribes to all rooms that the user has an interest in
+-- Returns a set of room JIDs that have already had activity (thus no subscription)
+local function subscribe_all_rooms(user_jid)
+	-- Send activity notifications for all relevant rooms
+	local interested_rooms, err = get_interested_rooms(user_jid);
+
+	if not interested_rooms then
+		if err then
+			return nil, "internal-server-error";
+		end
+		interested_rooms = {};
+	end
+
+	if not subscribed_users:set(user_jid, interested_rooms) then
+		module:log("warn", "Subscriber limit (%d) reached, rejecting subscription from %s", max_subscribers, user_jid);
+		return nil, "resource-constraint";
+	end
+
+	local rooms_with_activity;
+	for room_name in pairs(interested_rooms) do
+		local room_jid = room_name.."@"..module.host;
+		if has_new_activity(room_jid, user_jid) then
+			-- There has already been activity, include this room
+			-- in the response
+			if not rooms_with_activity then
+				rooms_with_activity = {};
+			end
+			rooms_with_activity[room_jid] = true;
+		else
+			-- Subscribe to any future activity
+			subscribe_room(user_jid, room_jid);
+		end
+	end
+	return rooms_with_activity;
+end
+
+module:hook("presence/host", function (event)
+	local origin, stanza = event.origin, event.stanza;
+	local user_jid = stanza.attr.from;
+
+	if stanza.attr.type == "unavailable" then -- User going offline
+		unsubscribe_user_from_all_rooms(user_jid);
+		return true;
+	end
+
+	local rooms_with_activity, err = subscribe_all_rooms(user_jid);
+
+	if not rooms_with_activity then
+		if not err then
+			module:log("debug", "No activity to notify");
+			return true;
+		else
+			return origin.send(st.error_reply(stanza, "wait", "resource-constraint"));
+		end
+	end
+
+	local reply = st.reply(stanza)
+		:tag("rai", { xmlns = xmlns_rai });
+	for room_jid in pairs(rooms_with_activity) do
+		reply:text_tag("activity", room_jid);
+	end
+	return origin.send(reply);
+end);
+
+module:hook("muc-broadcast-message", function (event)
+	local room, stanza = event.room, event.stanza;
+	local archive_id = stanza:get_child_text("stanza-id", "urn:xmpp:sid:0");
+	if archive_id then
+		-- Remember the id of the last message so we can compare it
+		-- to the per-user marker (managed by mod_muc_markers)
+		update_room_activity(room.jid, archive_id);
+		-- Notify any users that need to be notified
+		notify_interested_users(room.jid);
+	end
+end, -1);
+