comparison mod_cloud_notify/mod_cloud_notify.lua @ 2625:8c6562f16496

mod_cloud_notify: Fixed error in push deduplication Make handling of node ids and push_identifiers more error resilient. Also add some better debug output.
author tmolitor <thilo@eightysoft.de>
date Sat, 18 Mar 2017 00:20:04 +0100
parents 6ab46ff685d0
children 0f1421af7f6a
comparison
equal deleted inserted replaced
2624:c110b6bfe5d1 2625:8c6562f16496
5 -- This file is MIT/X11 licensed. 5 -- This file is MIT/X11 licensed.
6 6
7 local st = require"util.stanza"; 7 local st = require"util.stanza";
8 local jid = require"util.jid"; 8 local jid = require"util.jid";
9 local dataform = require"util.dataforms".new; 9 local dataform = require"util.dataforms".new;
10 local filters = require "util.filters"; 10 local filters = require"util.filters";
11 local hashes = require"util.hashes";
11 12
12 local xmlns_push = "urn:xmpp:push:0"; 13 local xmlns_push = "urn:xmpp:push:0";
13 14
14 -- configuration 15 -- configuration
15 local include_body = module:get_option_boolean("push_notification_with_body", false); 16 local include_body = module:get_option_boolean("push_notification_with_body", false);
55 end)(); 56 end)();
56 57
57 local function handle_push_error(event) 58 local function handle_push_error(event)
58 local stanza = event.stanza; 59 local stanza = event.stanza;
59 local error_type, condition = stanza:get_error(); 60 local error_type, condition = stanza:get_error();
60 local push_identifier = stanza.attr.id;
61 local node = jid.split(stanza.attr.to); 61 local node = jid.split(stanza.attr.to);
62 local from = stanza.attr.from; 62 local from = stanza.attr.from;
63 local user_push_services = push_store:get(node); 63 local user_push_services = push_store:get(node);
64 64
65 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then 65 for push_identifier, _ in pairs(user_push_services) do
66 push_errors[push_identifier] = push_errors[push_identifier] + 1; 66 if hashes.sha256(push_identifier, true) == stanza.attr.id then
67 module:log("info", "Got error of type '%s' (%s) for identifier '%s':" 67 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then
68 .."error count for this identifier is now at %s", error_type, condition, push_identifier, 68 push_errors[push_identifier] = push_errors[push_identifier] + 1;
69 tostring(push_errors[push_identifier])); 69 module:log("info", "Got error of type '%s' (%s) for identifier '%s': "
70 if push_errors[push_identifier] >= max_push_errors then 70 .."error count for this identifier is now at %s", error_type, condition, push_identifier,
71 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier); 71 tostring(push_errors[push_identifier]));
72 -- remove push settings from sessions 72 if push_errors[push_identifier] >= max_push_errors then
73 for _, session in pairs(host_sessions[node].sessions) do 73 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier);
74 if session.push_identifier == push_identifier then 74 -- remove push settings from sessions
75 session.push_identifier = nil; 75 for _, session in pairs(host_sessions[node].sessions) do
76 session.push_settings = nil; 76 if session.push_identifier == push_identifier then
77 session.push_identifier = nil;
78 session.push_settings = nil;
79 end
80 end
81 -- save changed global config
82 push_store:set_identifier(node, push_identifier, nil);
83 push_errors[push_identifier] = nil;
84 -- unhook iq handlers for this identifier
85 module:unhook("iq-error/bare/"..hashes.sha256(push_identifier, true), handle_push_error);
86 module:unhook("iq-result/bare/"..hashes.sha256(push_identifier, true), handle_push_success);
77 end 87 end
78 end 88 elseif user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type == "wait" then
79 -- save changed global config 89 module:log("debug", "Got error of type '%s' (%s) for identifier '%s': "
80 push_store:set_identifier(node, push_identifier, nil); 90 .."NOT increasing error count for this identifier", error_type, condition, push_identifier);
81 push_errors[push_identifier] = nil; 91 end
82 -- unhook iq handlers for this identifier
83 module:unhook("iq-error/bare/"..push_identifier, handle_push_error);
84 module:unhook("iq-result/bare/"..push_identifier, handle_push_success);
85 end 92 end
86 end 93 end
87 return true; 94 return true;
88 end 95 end
89 96
90 local function handle_push_success(event) 97 local function handle_push_success(event)
91 local stanza = event.stanza; 98 local stanza = event.stanza;
92 local push_identifier = stanza.attr.id;
93 local node = jid.split(stanza.attr.to); 99 local node = jid.split(stanza.attr.to);
94 local from = stanza.attr.from; 100 local from = stanza.attr.from;
95 local user_push_services = push_store:get(node); 101 local user_push_services = push_store:get(node);
96 102
97 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] then 103 for push_identifier, _ in pairs(user_push_services) do
98 push_errors[push_identifier] = 0; 104 if hashes.sha256(push_identifier, true) == stanza.attr.id then
99 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s", push_identifier, tostring(push_errors[push_identifier])); 105 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] then
106 push_errors[push_identifier] = 0;
107 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s", push_identifier, tostring(push_errors[push_identifier]));
108 end
109 end
100 end 110 end
101 return true; 111 return true;
102 end 112 end
103 113
104 -- http://xmpp.org/extensions/xep-0357.html#disco 114 -- http://xmpp.org/extensions/xep-0357.html#disco
137 if not ok then 147 if not ok then
138 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); 148 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
139 else 149 else
140 origin.push_identifier = push_identifier; 150 origin.push_identifier = push_identifier;
141 origin.push_settings = push_service; 151 origin.push_settings = push_service;
142 origin.log("info", "Push notifications enabled (%s)", tostring(origin.push_identifier)); 152 origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(origin.push_identifier));
143 origin.send(st.reply(stanza)); 153 origin.send(st.reply(stanza));
144 end 154 end
145 return true; 155 return true;
146 end 156 end
147 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable); 157 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable);
189 199
190 -- http://xmpp.org/extensions/xep-0357.html#publishing 200 -- http://xmpp.org/extensions/xep-0357.html#publishing
191 local function handle_notify_request(stanza, node, user_push_services) 201 local function handle_notify_request(stanza, node, user_push_services)
192 if not user_push_services or not #user_push_services then return end 202 if not user_push_services or not #user_push_services then return end
193 203
194 if stanza and stanza._notify then
195 module:log("debug", "Already sent push notification to %s@%s for this stanza, not doing it again", node, module.host);
196 return;
197 end
198 if stanza then
199 stanza._notify = true;
200 end
201
202 for push_identifier, push_info in pairs(user_push_services) do 204 for push_identifier, push_info in pairs(user_push_services) do
205 if stanza then
206 if not stanza._push_notify then stanza._push_notify = {}; end
207 if stanza._push_notify[push_identifier] then
208 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
209 return;
210 end
211 stanza._push_notify[push_identifier] = true;
212 end
213
203 -- increment count and save it 214 -- increment count and save it
204 push_info.count = push_info.count + 1; 215 push_info.count = push_info.count + 1;
205 push_store:set_identifier(node, push_identifier, push_info); 216 push_store:set_identifier(node, push_identifier, push_info);
206 -- construct push stanza 217 -- construct push stanza
207 local push_publish = st.iq({ to = push_info.jid, from = node .. "@" .. module.host, type = "set", id = push_identifier }) 218 local push_publish = st.iq({ to = push_info.jid, from = node .. "@" .. module.host, type = "set", id = hashes.sha256(push_identifier, true) })
208 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) 219 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" })
209 :tag("publish", { node = push_info.node }) 220 :tag("publish", { node = push_info.node })
210 :tag("item") 221 :tag("item")
211 :tag("notification", { xmlns = xmlns_push }); 222 :tag("notification", { xmlns = xmlns_push });
212 local form_data = { 223 local form_data = {
228 -- send out push 239 -- send out push
229 module:log("debug", "Sending push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); 240 module:log("debug", "Sending push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
230 -- handle push errors for this node 241 -- handle push errors for this node
231 if push_errors[push_identifier] == nil then 242 if push_errors[push_identifier] == nil then
232 push_errors[push_identifier] = 0; 243 push_errors[push_identifier] = 0;
233 module:hook("iq-error/bare/"..push_identifier, handle_push_error); 244 module:hook("iq-error/bare/"..hashes.sha256(push_identifier, true), handle_push_error);
234 module:hook("iq-result/bare/"..push_identifier, handle_push_success); 245 module:hook("iq-result/bare/"..hashes.sha256(push_identifier, true), handle_push_success);
235 end 246 end
236 module:send(push_publish); 247 module:send(push_publish);
237 end 248 end
238 end 249 end
239 250
252 end, 1); 263 end, 1);
253 264
254 -- publish on unacked smacks message 265 -- publish on unacked smacks message
255 local function process_smacks_stanza(stanza, session) 266 local function process_smacks_stanza(stanza, session)
256 if session.push_identifier then 267 if session.push_identifier then
257 session.log("debug", "Invoking cloud handle_notify_request for smacks queued stanza..."); 268 session.log("debug", "Invoking cloud handle_notify_request for smacks queued stanza");
258 local user_push_services = {[session.push_identifier] = session.push_settings}; 269 local user_push_services = {[session.push_identifier] = session.push_settings};
259 local node = get_push_settings(stanza, session); 270 local node = get_push_settings(stanza, session);
260 handle_notify_request(stanza, node, user_push_services); 271 handle_notify_request(stanza, node, user_push_services);
261 end 272 end
262 return stanza; 273 return stanza;
361 -- end 372 -- end
362 -- end 373 -- end
363 -- push_store:set(origin.username, user_push_services); 374 -- push_store:set(origin.username, user_push_services);
364 -- end, 1); 375 -- end, 1);
365 376
377 module:log("info", "Module loaded");
366 function module.unload() 378 function module.unload()
367 module:unhook("account-disco-info", account_dico_info); 379 module:unhook("account-disco-info", account_dico_info);
368 module:unhook("iq-set/self/"..xmlns_push..":enable", push_enable); 380 module:unhook("iq-set/self/"..xmlns_push..":enable", push_enable);
369 module:unhook("iq-set/self/"..xmlns_push..":disable", push_disable); 381 module:unhook("iq-set/self/"..xmlns_push..":disable", push_disable);
370 382
373 module:unhook("smacks-ack-delayed", ack_delayed); 385 module:unhook("smacks-ack-delayed", ack_delayed);
374 module:unhook("archive-message-added", archive_message_added); 386 module:unhook("archive-message-added", archive_message_added);
375 module:unhook("cloud-notify-ping", send_ping); 387 module:unhook("cloud-notify-ping", send_ping);
376 388
377 for push_identifier, _ in pairs(push_errors) do 389 for push_identifier, _ in pairs(push_errors) do
378 module:hook("iq-error/bare/"..push_identifier, handle_push_error); 390 module:hook("iq-error/bare/"..hashes.sha256(push_identifier, true), handle_push_error);
379 module:hook("iq-result/bare/"..push_identifier, handle_push_success); 391 module:hook("iq-result/bare/"..hashes.sha256(push_identifier, true), handle_push_success);
380 end 392 end
381 end 393
394 module:log("info", "Module unloaded");
395 end