# HG changeset patch # User tmolitor # Date 1583953250 -3600 # Node ID f5e6368a1c39d9f7b70afc5ef40a8ced5d0787ad # Parent e93e58b33bf6aecfbef2135f99b005d9df70422d mod_cloud_notify: Cleanup code and drop support for prosody 0.9 This removes some legacy stuff that was needed for prosody 0.9 We now use util.stanza:find instead of our own patched version to be more mainstream The unneeded module unloading code was removed, too diff -r e93e58b33bf6 -r f5e6368a1c39 mod_cloud_notify/README.markdown --- a/mod_cloud_notify/README.markdown Tue Mar 10 23:59:19 2020 +0100 +++ b/mod_cloud_notify/README.markdown Wed Mar 11 20:00:50 2020 +0100 @@ -73,7 +73,13 @@ Compatibility ============= -Should work with 0.9+. +----- ----------------------------------------------------------------------------- + trunk Works + 0.11 Works + 0.10 Works + 0.9 Support dropped, use version last supported version [675726ab06d3] + ----- ----------------------------------------------------------------------------- [^1]: The service which is expected to forward notifications to something like Google Cloud Messaging or Apple Notification Service [^2]: [business_rules.markdown](//hg.prosody.im/prosody-modules/file/tip/mod_cloud_notify/business_rules.markdown) +[675726ab06d3]: //hg.prosody.im/prosody-modules/raw-file/675726ab06d3/mod_cloud_notify/mod_cloud_notify.lua \ No newline at end of file diff -r e93e58b33bf6 -r f5e6368a1c39 mod_cloud_notify/mod_cloud_notify.lua --- a/mod_cloud_notify/mod_cloud_notify.lua Tue Mar 10 23:59:19 2020 +0100 +++ b/mod_cloud_notify/mod_cloud_notify.lua Wed Mar 11 20:00:50 2020 +0100 @@ -14,6 +14,8 @@ local dataform = require"util.dataforms".new; local filters = require"util.filters"; local hashes = require"util.hashes"; +local random = require"util.random"; +local cache = require"util.cache"; local xmlns_push = "urn:xmpp:push:0"; @@ -27,106 +29,36 @@ local host_sessions = prosody.hosts[module.host].sessions; local push_errors = {}; local id2node = {}; - --- ordered table iterator, allow to iterate on the natural order of the keys of a table, --- see http://lua-users.org/wiki/SortedIteration -local function __genOrderedIndex( t ) - local orderedIndex = {} - for key in pairs(t) do - table.insert( orderedIndex, key ) - end - -- sort in reverse order (newest one first) - table.sort( orderedIndex, function(a, b) - if a == nil or t[a] == nil or b == nil or t[b] == nil then return false end - -- only one timestamp given, this is the newer one - if t[a].timestamp ~= nil and t[b].timestamp == nil then return true end - if t[a].timestamp == nil and t[b].timestamp ~= nil then return false end - -- both timestamps given, sort normally - if t[a].timestamp ~= nil and t[b].timestamp ~= nil then return t[a].timestamp > t[b].timestamp end - return false -- normally not reached - end) - return orderedIndex -end -local function orderedNext(t, state) - -- Equivalent of the next function, but returns the keys in timestamp - -- order. We use a temporary ordered key table that is stored in the - -- table being iterated. - - local key = nil - --print("orderedNext: state = "..tostring(state) ) - if state == nil then - -- the first time, generate the index - t.__orderedIndex = __genOrderedIndex( t ) - key = t.__orderedIndex[1] - else - -- fetch the next value - for i = 1, #t.__orderedIndex do - if t.__orderedIndex[i] == state then - key = t.__orderedIndex[i+1] - end - end - end - - if key then - return key, t[key] - end - - -- no more value to return, cleanup - t.__orderedIndex = nil - return -end -local function orderedPairs(t) - -- Equivalent of the pairs() function on tables. Allows to iterate - -- in order - return orderedNext, t, nil -end - --- small helper function to return new table with only "maximum" elements containing only the newest entries -local function reduce_table(table, maximum) - local count = 0; - local result = {}; - for key, value in orderedPairs(table) do - count = count + 1; - if count > maximum then break end - result[key] = value; - end - return result; -end - -local function stoppable_timer(delay, callback) - local stopped = false; - local timer = module:add_timer(delay, function (t) - if stopped then return; end - return callback(t); - end); - if timer and timer.stop then return timer; end -- new prosody api includes stop() function - return { - stop = function(self) stopped = true end; - timer; - }; -end +local id2identifier = {}; -- For keeping state across reloads while caching reads +-- This uses util.cache for caching the most recent devices and removing all old devices when max_push_devices is reached local push_store = (function() local store = module:open_store(); local push_services = {}; local api = {}; function api:get(user) if not push_services[user] then - local err; - push_services[user], err = store:get(user); - if not push_services[user] and err then + local loaded, err = store:get(user); + if not loaded and err then module:log("warn", "Error reading push notification storage for user '%s': %s", user, tostring(err)); - push_services[user] = {}; + push_services[user] = cache.new(max_push_devices):table(); return push_services[user], false; end + if loaded then + push_services[user] = cache.new(max_push_devices):table(); + -- copy over plain table loaded from disk into our cache + for k, v in pairs(loaded) do push_services[user][k] = v; end + else + push_services[user] = cache.new(max_push_devices):table(); + end end - if not push_services[user] then push_services[user] = {} end return push_services[user], true; end - function api:set(user, data) - push_services[user] = reduce_table(data, max_push_devices); - local ok, err = store:set(user, push_services[user]); + function api:flush_to_disk(user) + local plain_table = {}; + for k, v in pairs(push_services[user]) do plain_table[k] = v; end + local ok, err = store:set(user, plain_table); if not ok then module:log("error", "Error writing push notification storage for user '%s': %s", user, tostring(err)); return false; @@ -136,7 +68,6 @@ function api:set_identifier(user, push_identifier, data) local services = self:get(user); services[push_identifier] = data; - return self:set(user, services); end return api; end)(); @@ -149,14 +80,14 @@ local stanza = event.stanza; local error_type, condition = stanza:get_error(); local node = id2node[stanza.attr.id]; + local identifier = id2identifier[stanza.attr.id]; if node == nil then return false; end -- unknown stanza? Ignore for now! local from = stanza.attr.from; local user_push_services = push_store:get(node); local changed = false; for push_identifier, _ in pairs(user_push_services) do - local stanza_id = hashes.sha256(push_identifier, true); - if stanza_id == stanza.attr.id then + if push_identifier == identifier then if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then push_errors[push_identifier] = push_errors[push_identifier] + 1; module:log("info", "Got error of type '%s' (%s) for identifier '%s': " @@ -179,11 +110,10 @@ user_push_services[push_identifier] = nil push_errors[push_identifier] = nil; -- unhook iq handlers for this identifier (if possible) - if module.unhook then - module:unhook("iq-error/host/"..stanza_id, handle_push_error); - module:unhook("iq-result/host/"..stanza_id, handle_push_success); - id2node[stanza_id] = nil; - end + module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); + module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); + id2node[stanza.attr.id] = nil; + id2identifier[stanza.attr.id] = nil; end elseif user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type == "wait" then module:log("debug", "Got error of type '%s' (%s) for identifier '%s': " @@ -192,7 +122,7 @@ end end if changed then - push_store:set(node, user_push_services); + push_store:flush_to_disk(node); end return true; end @@ -200,20 +130,20 @@ function handle_push_success(event) local stanza = event.stanza; local node = id2node[stanza.attr.id]; + local identifier = id2identifier[stanza.attr.id]; if node == nil then return false; end -- unknown stanza? Ignore for now! local from = stanza.attr.from; local user_push_services = push_store:get(node); for push_identifier, _ in pairs(user_push_services) do - if hashes.sha256(push_identifier, true) == stanza.attr.id then + if push_identifier == identifier then if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then push_errors[push_identifier] = 0; -- unhook iq handlers for this identifier (if possible) - if module.unhook then - module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); - module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); - id2node[stanza.attr.id] = nil; - end + module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); + module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); + id2node[stanza.attr.id] = nil; + id2identifier[stanza.attr.id] = nil; module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); end end @@ -256,7 +186,8 @@ options = publish_options and st.preserialize(publish_options); timestamp = os_time(); }; - local ok = push_store:set_identifier(origin.username, push_identifier, push_service); + push_store:set_identifier(origin.username, push_identifier, push_service); + local ok = push_store:flush_to_disk(origin.username); if not ok then origin.send(st.error_reply(stanza, "wait", "internal-server-error")); else @@ -290,15 +221,17 @@ end user_push_services[key] = nil; push_errors[key] = nil; - if module.unhook then - local stanza_id = hashes.sha256(key, true) - module:unhook("iq-error/host/"..stanza_id, handle_push_error); - module:unhook("iq-result/host/"..stanza_id, handle_push_success); - id2node[stanza_id] = nil; + for stanza_id, identifier in pairs(id2identifier) do + if identifier == key then + module:unhook("iq-error/host/"..stanza_id, handle_push_error); + module:unhook("iq-result/host/"..stanza_id, handle_push_success); + id2node[stanza_id] = nil; + id2identifier[stanza_id] = nil; + end end end end - local ok = push_store:set(origin.username, user_push_services); + local ok = push_store:flush_to_disk(origin.username); if not ok then origin.send(st.error_reply(stanza, "wait", "internal-server-error")); else @@ -308,34 +241,6 @@ end module:hook("iq-set/self/"..xmlns_push..":disable", push_disable); --- Patched version of util.stanza:find() that supports giving stanza names --- without their namespace, allowing for every namespace. -local function find(self, path) - local pos = 1; - local len = #path + 1; - - repeat - local xmlns, name, text; - local char = s_sub(path, pos, pos); - if char == "@" then - return self.attr[s_sub(path, pos + 1)]; - elseif char == "{" then - xmlns, pos = s_match(path, "^([^}]+)}()", pos + 1); - end - name, text, pos = s_match(path, "^([^@/#]*)([/#]?)()", pos); - name = name ~= "" and name or nil; - if pos == len then - if text == "#" then - local child = xmlns ~= nil and self:get_child(name, xmlns) or self:child_with_name(name); - return child and child:get_text() or nil; - end - return xmlns ~= nil and self:get_child(name, xmlns) or self:child_with_name(name); - end - self = xmlns ~= nil and self:get_child(name, xmlns) or self:child_with_name(name); - until not self - return nil; -end - -- is this push a high priority one (this is needed for ios apps not using voip pushes) local function is_important(stanza) local st_name = stanza and stanza.name or nil; @@ -348,8 +253,7 @@ local carbon; local st_type; -- support carbon copied message stanzas having an arbitrary message-namespace or no message-namespace at all - if not carbon then carbon = find(stanza, "{urn:xmpp:carbons:2}/forwarded/message"); end - if not carbon then carbon = find(stanza, "{urn:xmpp:carbons:1}/forwarded/message"); end + if not carbon then carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message"); end stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"; if carbon then stanza = carbon; end st_type = stanza.attr.type; @@ -390,7 +294,7 @@ -- http://xmpp.org/extensions/xep-0357.html#publishing local function handle_notify_request(stanza, node, user_push_services, log_push_decline) local pushes = 0; - if not user_push_services or next(user_push_services) == nil then return pushes end + if not #user_push_services then return pushes end for push_identifier, push_info in pairs(user_push_services) do local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all @@ -407,7 +311,7 @@ if send_push then -- construct push stanza - local stanza_id = hashes.sha256(push_identifier, true); + local stanza_id = hashes.sha256(random.bytes(8), true); local push_publish = st.iq({ to = push_info.jid, from = module.host, type = "set", id = stanza_id }) :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) :tag("publish", { node = push_info.node }) @@ -438,10 +342,11 @@ -- handle push errors for this node if push_errors[push_identifier] == nil then push_errors[push_identifier] = 0; - module:hook("iq-error/host/"..stanza_id, handle_push_error); - module:hook("iq-result/host/"..stanza_id, handle_push_success); - id2node[stanza_id] = node; end + module:hook("iq-error/host/"..stanza_id, handle_push_error); + module:hook("iq-result/host/"..stanza_id, handle_push_success); + id2node[stanza_id] = node; + id2identifier[stanza_id] = push_identifier; module:send(push_publish); pushes = pushes + 1; end @@ -500,7 +405,7 @@ queue[#queue+1] = st.clone(stanza); if #queue == 1 then -- first stanza --> start timer session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); - session.awaiting_push_timer = stoppable_timer(1e-06, function () + session.awaiting_push_timer = module:add_timer(1e-06, function () session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); process_stanza_queue(session.push_queue, session, "push"); session.push_queue = {}; -- clean up queue after push @@ -552,7 +457,6 @@ -- only notify if the stanza destination is the mam user we store it for if event.for_user == to then local user_push_services = push_store:get(to); - if next(user_push_services) == nil then return end -- only notify nodes with no active sessions (smacks is counted as active and handled separate) local notify_push_services = {}; @@ -584,31 +488,14 @@ local function send_ping(event) local user = event.user; local push_services = event.push_services or push_store:get(user); - handle_notify_request(nil, user, push_services, true); + module:log("debug", "Handling event 'cloud-notify-ping' for user '%s'", user); + local retval = handle_notify_request(nil, user, push_services, true); + module:log("debug", "handle_notify_request() returned %s", tostring(retval)); end -- can be used by other modules to ping one or more (or all) push endpoints module:hook("cloud-notify-ping", send_ping); module:log("info", "Module loaded"); function module.unload() - if module.unhook then - module:unhook("account-disco-info", account_dico_info); - module:unhook("iq-set/self/"..xmlns_push..":enable", push_enable); - module:unhook("iq-set/self/"..xmlns_push..":disable", push_disable); - - module:unhook("smacks-hibernation-start", hibernate_session); - module:unhook("smacks-hibernation-end", restore_session); - module:unhook("smacks-ack-delayed", ack_delayed); - module:unhook("archive-message-added", archive_message_added); - module:unhook("cloud-notify-ping", send_ping); - - for push_identifier, _ in pairs(push_errors) do - local stanza_id = hashes.sha256(push_identifier, true); - module:unhook("iq-error/host/"..stanza_id, handle_push_error); - module:unhook("iq-result/host/"..stanza_id, handle_push_success); - id2node[stanza_id] = nil; - end - end - module:log("info", "Module unloaded"); end