Mercurial > prosody-modules
diff mod_muc_rai/mod_muc_rai.lua @ 3974:f14c862598a9
mod_muc_rai: New module to implement Room Activity Indicators
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 15 Apr 2020 21:19:45 +0100 |
parents | |
children | 0e72dd70afff |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_muc_rai/mod_muc_rai.lua Wed Apr 15 21:19:45 2020 +0100 @@ -0,0 +1,215 @@ +local cache = require "util.cache"; +local jid = require "util.jid"; +local st = require "util.stanza"; + +local max_subscribers = module:get_option_number("muc_rai_max_subscribers", 1024); + +local muc_affiliation_store = module:open_store("config", "map"); +local muc_archive = module:open_store("muc_log", "archive"); + +local xmlns_rai = "xmpp:prosody.im/protocol/rai"; + +local muc_markers = module:depends("muc_markers"); + +-- subscriber_jid -> { [room_jid] = interested } +local subscribed_users = cache.new(max_subscribers, false); +-- room_jid -> { [user_jid] = interested } +local interested_users = {}; +-- room_jid -> last_id +local room_activity_cache = cache.new(1024); + +-- Send a single notification for a room, updating data structures as needed +local function send_single_notification(user_jid, room_jid) + local notification = st.message({ to = user_jid, from = module.host }) + :tag("rai", { xmlns = xmlns_rai }) + :text_tag("activity", room_jid) + :up(); + local interested_room_users = interested_users[room_jid]; + if interested_room_users then + interested_room_users[user_jid] = nil; + end + local interested_rooms = subscribed_users:get(user_jid); + if interested_rooms then + interested_rooms[room_jid] = nil; + end + module:log("debug", "Sending notification from %s to %s", room_jid, user_jid); + return module:send(notification); +end + +local function subscribe_room(user_jid, room_jid) + local interested_rooms = subscribed_users:get(user_jid); + if not interested_rooms then + return nil, "not-subscribed"; + end + module:log("debug", "Subscribed %s to %s", user_jid, room_jid); + interested_rooms[room_jid] = true; + + local interested_room_users = interested_users[room_jid]; + if not interested_room_users then + interested_room_users = {}; + interested_users[room_jid] = interested_room_users; + end + interested_room_users[user_jid] = true; + return true; +end + +local function unsubscribe_room(user_jid, room_jid) + local interested_rooms = subscribed_users:get(user_jid); + if not interested_rooms then + return nil, "not-subscribed"; + end + interested_rooms[room_jid] = nil; + + local interested_room_users = interested_users[room_jid]; + if not interested_room_users then + return true; + end + interested_room_users[user_jid] = nil; + return true; +end + +local function notify_interested_users(room_jid) + module:log("warn", "NOTIFYING FOR %s", room_jid) + local interested_room_users = interested_users[room_jid]; + if not interested_room_users then + module:log("debug", "Nobody interested in %s", room_jid); + return; + end + for user_jid in pairs(interested_room_users) do + send_single_notification(user_jid, room_jid); + end + return true; +end + +local function unsubscribe_user_from_all_rooms(user_jid) + local interested_rooms = subscribed_users:get(user_jid); + if not interested_rooms then + return nil, "not-subscribed"; + end + for room_jid in pairs(interested_rooms) do + unsubscribe_room(user_jid, room_jid); + end + return true; +end + +local function get_last_room_message_id(room_jid) + local last_room_message_id = room_activity_cache:get(room_jid); + if last_room_message_id then + return last_room_message_id; + end + + -- Load all the data! + local query = { + limit = 1; + reverse = true; + with = "message<groupchat"; + } + local data, err = muc_archive:find(jid.node(room_jid), query); + + if not data then + module:log("error", "Could not fetch history: %s", err); + return nil; + end + + local id = data(); + room_activity_cache:set(room_jid, id); + return id; +end + +local function update_room_activity(room_jid, last_id) + room_activity_cache:set(room_jid, last_id); +end + +local function get_last_user_read_id(user_jid, room_jid) + return muc_markers.get_user_read_marker(user_jid, room_jid); +end + +local function has_new_activity(room_jid, user_jid) + local last_room_message_id = get_last_room_message_id(room_jid); + local last_user_read_id = get_last_user_read_id(user_jid, room_jid); + return last_room_message_id ~= last_user_read_id; +end + +-- Returns a set of rooms that a user is interested in +local function get_interested_rooms(user_jid) + -- Use affiliation as an indication of interest, return + -- all rooms a user is affiliated + return muc_affiliation_store:get_all(jid.bare(user_jid)); +end + +-- Subscribes to all rooms that the user has an interest in +-- Returns a set of room JIDs that have already had activity (thus no subscription) +local function subscribe_all_rooms(user_jid) + -- Send activity notifications for all relevant rooms + local interested_rooms, err = get_interested_rooms(user_jid); + + if not interested_rooms then + if err then + return nil, "internal-server-error"; + end + interested_rooms = {}; + end + + if not subscribed_users:set(user_jid, interested_rooms) then + module:log("warn", "Subscriber limit (%d) reached, rejecting subscription from %s", max_subscribers, user_jid); + return nil, "resource-constraint"; + end + + local rooms_with_activity; + for room_name in pairs(interested_rooms) do + local room_jid = room_name.."@"..module.host; + if has_new_activity(room_jid, user_jid) then + -- There has already been activity, include this room + -- in the response + if not rooms_with_activity then + rooms_with_activity = {}; + end + rooms_with_activity[room_jid] = true; + else + -- Subscribe to any future activity + subscribe_room(user_jid, room_jid); + end + end + return rooms_with_activity; +end + +module:hook("presence/host", function (event) + local origin, stanza = event.origin, event.stanza; + local user_jid = stanza.attr.from; + + if stanza.attr.type == "unavailable" then -- User going offline + unsubscribe_user_from_all_rooms(user_jid); + return true; + end + + local rooms_with_activity, err = subscribe_all_rooms(user_jid); + + if not rooms_with_activity then + if not err then + module:log("debug", "No activity to notify"); + return true; + else + return origin.send(st.error_reply(stanza, "wait", "resource-constraint")); + end + end + + local reply = st.reply(stanza) + :tag("rai", { xmlns = xmlns_rai }); + for room_jid in pairs(rooms_with_activity) do + reply:text_tag("activity", room_jid); + end + return origin.send(reply); +end); + +module:hook("muc-broadcast-message", function (event) + local room, stanza = event.room, event.stanza; + local archive_id = stanza:get_child_text("stanza-id", "urn:xmpp:sid:0"); + if archive_id then + -- Remember the id of the last message so we can compare it + -- to the per-user marker (managed by mod_muc_markers) + update_room_activity(room.jid, archive_id); + -- Notify any users that need to be notified + notify_interested_users(room.jid); + end +end, -1); +