comparison mod_muc_cloud_notify/mod_muc_cloud_notify.lua @ 3882:3b8f4f3b1718

mod_reload_modules: Ignore removed hosts...
author tmolitor <thilo@eightysoft.de>
date Wed, 05 Feb 2020 23:27:33 +0100
parents 8a93af85f319
children 571249f69577
comparison
equal deleted inserted replaced
3881:5d7df207dc2b 3882:3b8f4f3b1718
22 local max_push_devices = module:get_option_number("push_max_devices", 5); 22 local max_push_devices = module:get_option_number("push_max_devices", 5);
23 local dummy_body = module:get_option_string("push_notification_important_body", "New Message!"); 23 local dummy_body = module:get_option_string("push_notification_important_body", "New Message!");
24 24
25 local host_sessions = prosody.hosts[module.host].sessions; 25 local host_sessions = prosody.hosts[module.host].sessions;
26 local push_errors = {}; 26 local push_errors = {};
27 local id2node = {}; 27 local id2room = {}
28 local id2user = {};
28 29
29 module:depends("muc"); 30 module:depends("muc");
30
31 -- ordered table iterator, allow to iterate on the natural order of the keys of a table,
32 -- see http://lua-users.org/wiki/SortedIteration
33 local function __genOrderedIndex( t )
34 local orderedIndex = {}
35 for key in pairs(t) do
36 table.insert( orderedIndex, key )
37 end
38 -- sort in reverse order (newest one first)
39 table.sort( orderedIndex, function(a, b)
40 if a == nil or t[a] == nil or b == nil or t[b] == nil then return false end
41 -- only one timestamp given, this is the newer one
42 if t[a].timestamp ~= nil and t[b].timestamp == nil then return true end
43 if t[a].timestamp == nil and t[b].timestamp ~= nil then return false end
44 -- both timestamps given, sort normally
45 if t[a].timestamp ~= nil and t[b].timestamp ~= nil then return t[a].timestamp > t[b].timestamp end
46 return false -- normally not reached
47 end)
48 return orderedIndex
49 end
50 local function orderedNext(t, state)
51 -- Equivalent of the next function, but returns the keys in timestamp
52 -- order. We use a temporary ordered key table that is stored in the
53 -- table being iterated.
54
55 local key = nil
56 --print("orderedNext: state = "..tostring(state) )
57 if state == nil then
58 -- the first time, generate the index
59 t.__orderedIndex = __genOrderedIndex( t )
60 key = t.__orderedIndex[1]
61 else
62 -- fetch the next value
63 for i = 1, #t.__orderedIndex do
64 if t.__orderedIndex[i] == state then
65 key = t.__orderedIndex[i+1]
66 end
67 end
68 end
69
70 if key then
71 return key, t[key]
72 end
73
74 -- no more value to return, cleanup
75 t.__orderedIndex = nil
76 return
77 end
78 local function orderedPairs(t)
79 -- Equivalent of the pairs() function on tables. Allows to iterate
80 -- in order
81 return orderedNext, t, nil
82 end
83
84 -- small helper function to return new table with only "maximum" elements containing only the newest entries
85 local function reduce_table(table, maximum)
86 local count = 0;
87 local result = {};
88 for key, value in orderedPairs(table) do
89 count = count + 1;
90 if count > maximum then break end
91 result[key] = value;
92 end
93 return result;
94 end
95 31
96 -- For keeping state across reloads while caching reads 32 -- For keeping state across reloads while caching reads
97 local push_store = (function() 33 local push_store = (function()
98 local store = module:open_store(); 34 local store = module:open_store();
99 local push_services = {}; 35 local push_services = {};
100 local api = {}; 36 local api = {};
101 function api:get(user) 37 local function load_room(room)
102 if not push_services[user] then 38 if not push_services[room] then
103 local err; 39 local err;
104 push_services[user], err = store:get(user); 40 push_services[room], err = store:get(room);
105 if not push_services[user] and err then 41 if not push_services[room] and err then
106 module:log("warn", "Error reading push notification storage for user '%s': %s", user, tostring(err)); 42 module:log("warn", "Error reading push notification storage for room '%s': %s", room, tostring(err));
107 push_services[user] = {}; 43 push_services[room] = {};
108 return push_services[user], false; 44 return false;
109 end 45 end
110 end 46 end
111 if not push_services[user] then push_services[user] = {} end 47 return true;
112 return push_services[user], true; 48 end
113 end 49 function api:get(room, user)
114 function api:set(user, data) 50 load_room(room);
115 push_services[user] = reduce_table(data, max_push_devices); 51 if not push_services[room] then push_services[room] = {}; push_services[room][user] = {}; end
116 local ok, err = store:set(user, push_services[user]); 52 return push_services[room][user], true;
53 end
54 function api:set(room, user, data)
55 push_services[room][user] = data;
56 local ok, err = store:set(room, push_services[room]);
117 if not ok then 57 if not ok then
118 module:log("error", "Error writing push notification storage for user '%s': %s", user, tostring(err)); 58 module:log("error", "Error writing push notification storage for room '%s' on behalf of user '%s': %s", room, user, tostring(err));
119 return false; 59 return false;
120 end 60 end
121 return true; 61 return true;
122 end 62 end
123 function api:set_identifier(user, push_identifier, data) 63 function api:get_room_users(room)
124 local services = self:get(user); 64 local users = {};
125 services[push_identifier] = data; 65 load_room(room);
126 return self:set(user, services); 66 for k, v in pairs(push_services[room]) do
67 table.insert(users, k);
68 end
69 return users;
127 end 70 end
128 return api; 71 return api;
129 end)(); 72 end)();
130 73
131 74
133 local handle_push_success, handle_push_error; 76 local handle_push_success, handle_push_error;
134 77
135 function handle_push_error(event) 78 function handle_push_error(event)
136 local stanza = event.stanza; 79 local stanza = event.stanza;
137 local error_type, condition = stanza:get_error(); 80 local error_type, condition = stanza:get_error();
138 local node = id2node[stanza.attr.id]; 81 local room = id2room[stanza.attr.id];
139 if node == nil then return false; end -- unknown stanza? Ignore for now! 82 local user = id2user[stanza.attr.id];
140 local from = stanza.attr.from; 83 if room == nil or user == nil then return false; end -- unknown stanza? Ignore for now!
141 local user_push_services = push_store:get(node); 84 local push_service = push_store:get(room, user);
142 local changed = false; 85 local push_identifier = room.."<"..user..">";
143 86
144 for push_identifier, _ in pairs(user_push_services) do 87 local stanza_id = hashes.sha256(push_identifier, true);
145 local stanza_id = hashes.sha256(push_identifier, true); 88 if stanza_id == stanza.attr.id then
146 if stanza_id == stanza.attr.id then 89 if push_service and push_service.push_jid == stanza.attr.from and error_type ~= "wait" then
147 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then 90 push_errors[push_identifier] = push_errors[push_identifier] + 1;
148 push_errors[push_identifier] = push_errors[push_identifier] + 1; 91 module:log("info", "Got error of type '%s' (%s) for identifier '%s': "
149 module:log("info", "Got error of type '%s' (%s) for identifier '%s': " 92 .."error count for this identifier is now at %s", error_type, condition, push_identifier,
150 .."error count for this identifier is now at %s", error_type, condition, push_identifier, 93 tostring(push_errors[push_identifier]));
151 tostring(push_errors[push_identifier])); 94 if push_errors[push_identifier] >= max_push_errors then
152 if push_errors[push_identifier] >= max_push_errors then 95 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier);
153 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier); 96 -- save changed global config
154 -- remove push settings from sessions 97 push_store:set(room, user, nil);
155 if host_sessions[node] then 98 push_errors[push_identifier] = nil;
156 for _, session in pairs(host_sessions[node].sessions) do 99 -- unhook iq handlers for this identifier (if possible)
157 if session.push_identifier == push_identifier then 100 if module.unhook then
158 session.push_identifier = nil; 101 module:unhook("iq-error/host/"..stanza_id, handle_push_error);
159 session.push_settings = nil; 102 module:unhook("iq-result/host/"..stanza_id, handle_push_success);
160 session.first_hibernated_push = nil; 103 id2room[stanza_id] = nil;
161 end 104 id2user[stanza_id] = nil;
162 end
163 end
164 -- save changed global config
165 changed = true;
166 user_push_services[push_identifier] = nil
167 push_errors[push_identifier] = nil;
168 -- unhook iq handlers for this identifier (if possible)
169 if module.unhook then
170 module:unhook("iq-error/host/"..stanza_id, handle_push_error);
171 module:unhook("iq-result/host/"..stanza_id, handle_push_success);
172 id2node[stanza_id] = nil;
173 end
174 end 105 end
175 elseif user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type == "wait" then 106 end
176 module:log("debug", "Got error of type '%s' (%s) for identifier '%s': " 107 elseif push_service and push_service.push_jid == stanza.attr.from and error_type == "wait" then
177 .."NOT increasing error count for this identifier", error_type, condition, push_identifier); 108 module:log("debug", "Got error of type '%s' (%s) for identifier '%s': "
178 end 109 .."NOT increasing error count for this identifier", error_type, condition, push_identifier);
179 end 110 end
180 end
181 if changed then
182 push_store:set(node, user_push_services);
183 end 111 end
184 return true; 112 return true;
185 end 113 end
186 114
187 function handle_push_success(event) 115 function handle_push_success(event)
188 local stanza = event.stanza; 116 local stanza = event.stanza;
189 local node = id2node[stanza.attr.id]; 117 local room = id2room[stanza.attr.id];
190 if node == nil then return false; end -- unknown stanza? Ignore for now! 118 local user = id2user[stanza.attr.id];
191 local from = stanza.attr.from; 119 if room == nil or user == nil then return false; end -- unknown stanza? Ignore for now!
192 local user_push_services = push_store:get(node); 120 local push_service = push_store:get(room, user);
193 121 local push_identifier = room.."<"..user..">";
194 for push_identifier, _ in pairs(user_push_services) do 122
195 if hashes.sha256(push_identifier, true) == stanza.attr.id then 123 if hashes.sha256(push_identifier, true) == stanza.attr.id then
196 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then 124 if push_service and push_service.push_jid == stanza.attr.from and push_errors[push_identifier] > 0 then
197 push_errors[push_identifier] = 0; 125 push_errors[push_identifier] = 0;
198 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); 126 -- unhook iq handlers for this identifier (if possible)
199 end 127 if module.unhook then
128 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error);
129 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success);
130 id2room[stanza.attr.id] = nil;
131 id2user[stanza.attr.id] = nil;
132 end
133 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
200 end 134 end
201 end 135 end
202 return true; 136 return true;
203 end 137 end
204 138
205 -- http://xmpp.org/extensions/xep-0357.html#disco 139 -- http://xmpp.org/extensions/xep-xxxx.html#disco
206 local function account_dico_info(event) 140 module:hook("muc-disco#info", function(event)
207 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up(); 141 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up();
208 end 142 end);
209 module:hook("account-disco-info", account_dico_info);
210 143
211 -- http://xmpp.org/extensions/xep-0357.html#enabling 144 -- http://xmpp.org/extensions/xep-0357.html#enabling
212 local function push_enable(event) 145 local function push_enable(event)
213 local origin, stanza = event.origin, event.stanza; 146 local origin, stanza = event.origin, event.stanza;
147 local room = jid.split(stanza.attr.to);
214 local enable = stanza.tags[1]; 148 local enable = stanza.tags[1];
215 origin.log("debug", "Attempting to enable push notifications"); 149 origin.log("debug", "Attempting to enable push notifications");
216 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled 150 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled
217 local push_jid = enable.attr.jid; 151 local push_jid = enable.attr.jid;
218 -- SHOULD contain a 'node' attribute
219 local push_node = enable.attr.node;
220 -- CAN contain a 'include_payload' attribute
221 local include_payload = enable.attr.include_payload;
222 if not push_jid then 152 if not push_jid then
223 origin.log("debug", "MUC Push notification enable request missing the 'jid' field"); 153 origin.log("debug", "MUC Push notification enable request missing the 'jid' field");
224 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); 154 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid"));
225 return true; 155 return true;
226 end 156 end
227 local publish_options = enable:get_child("x", "jabber:x:data"); 157 local publish_options = enable:get_child("x", "jabber:x:data");
228 if not publish_options then 158 if not publish_options then
229 -- Could be intentional 159 -- Could be intentional
230 origin.log("debug", "No publish options in request"); 160 origin.log("debug", "No publish options in request");
231 end 161 end
232 local push_identifier = push_jid .. "<" .. (push_node or "");
233 local push_service = { 162 local push_service = {
234 jid = push_jid; 163 push_jid = push_jid;
235 node = push_node; 164 device = stanza.attr.from;
236 include_payload = include_payload;
237 options = publish_options and st.preserialize(publish_options); 165 options = publish_options and st.preserialize(publish_options);
238 timestamp = os_time(); 166 timestamp = os_time();
239 }; 167 };
240 local ok = push_store:set_identifier(origin.username.."@"..origin.host, push_identifier, push_service); 168
169 local ok = push_store:set(room, stanza.attr.from, push_service);
241 if not ok then 170 if not ok then
242 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); 171 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
243 else 172 else
244 origin.push_identifier = push_identifier; 173 origin.log("info", "MUC Push notifications enabled for room %s by %s (%s)",
245 origin.push_settings = push_service; 174 tostring(room),
246 origin.first_hibernated_push = nil;
247 origin.log("info", "MUC Push notifications enabled for %s by %s (%s)",
248 tostring(stanza.attr.to),
249 tostring(stanza.attr.from), 175 tostring(stanza.attr.from),
250 tostring(origin.push_identifier) 176 tostring(push_jid)
251 ); 177 );
252 origin.send(st.reply(stanza)); 178 origin.send(st.reply(stanza));
253 end 179 end
254 return true; 180 return true;
255 end 181 end
257 183
258 184
259 -- http://xmpp.org/extensions/xep-0357.html#disabling 185 -- http://xmpp.org/extensions/xep-0357.html#disabling
260 local function push_disable(event) 186 local function push_disable(event)
261 local origin, stanza = event.origin, event.stanza; 187 local origin, stanza = event.origin, event.stanza;
188 local room = jid.split(stanza.attr.to);
262 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute 189 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute
263 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included
264 if not push_jid then 190 if not push_jid then
265 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); 191 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid"));
266 return true; 192 return true;
267 end 193 end
268 local user_push_services = push_store:get(origin.username); 194 local push_identifier = room.."<"..stanza.attr.from..">";
269 for key, push_info in pairs(user_push_services) do 195 local push_service = push_store:get(room, stanza.attr.from);
270 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then 196 local ok = true;
271 origin.log("info", "Push notifications disabled (%s)", tostring(key)); 197 if push_service.push_jid == push_jid then
272 if origin.push_identifier == key then 198 origin.log("info", "Push notifications disabled for room %s by %s (%s)",
273 origin.push_identifier = nil; 199 tostring(room),
274 origin.push_settings = nil; 200 sotring(stanza.attr.from),
275 origin.first_hibernated_push = nil; 201 tostring(push_jid)
276 end 202 );
277 user_push_services[key] = nil; 203 ok = push_store:set(room, stanza.attr.from, nil);
278 push_errors[key] = nil; 204 push_errors[push_identifier] = nil;
279 if module.unhook then 205 if module.unhook then
280 module:unhook("iq-error/host/"..key, handle_push_error); 206 local stanza_id = hashes.sha256(push_identifier, true);
281 module:unhook("iq-result/host/"..key, handle_push_success); 207 module:unhook("iq-error/host/"..stanza_id, handle_push_error);
282 id2node[key] = nil; 208 module:unhook("iq-result/host/"..stanza_id, handle_push_success);
283 end 209 id2room[stanza_id] = nil;
284 end 210 id2user[stanza_id] = nil;
285 end 211 end
286 local ok = push_store:set(origin.username, user_push_services); 212 end
287 if not ok then 213 if not ok then
288 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); 214 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
289 else 215 else
290 origin.send(st.reply(stanza)); 216 origin.send(st.reply(stanza));
291 end 217 end
363 end 289 end
364 return false; -- this stanza wasn't one of the above cases --> it is not important, too 290 return false; -- this stanza wasn't one of the above cases --> it is not important, too
365 end 291 end
366 292
367 local push_form = dataform { 293 local push_form = dataform {
368 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; }; 294 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:muc_push:summary"; };
369 { name = "message-count"; type = "text-single"; }; 295 --{ name = "dummy"; type = "text-single"; };
370 { name = "pending-subscription-count"; type = "text-single"; };
371 { name = "last-message-sender"; type = "jid-single"; };
372 { name = "last-message-body"; type = "text-single"; };
373 }; 296 };
374 297
375 -- http://xmpp.org/extensions/xep-0357.html#publishing 298 -- http://xmpp.org/extensions/xep-0357.html#publishing
376 local function handle_notify_request(stanza, node, user_push_services, log_push_decline) 299 local function handle_notify_request(stanza, user, user_push_services)
377 local pushes = 0; 300 local pushes = 0;
378 if not user_push_services or next(user_push_services) == nil then return pushes end 301 if not user_push_services or next(user_push_services) == nil then return pushes end
379
380 -- XXX: customized
381 local body = stanza:get_child_text("body");
382 if not body then
383 return pushes;
384 end
385 302
386 for push_identifier, push_info in pairs(user_push_services) do 303 for push_identifier, push_info in pairs(user_push_services) do
387 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all 304 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all
388 if stanza then 305 if stanza then
389 if not stanza._push_notify then stanza._push_notify = {}; end 306 if not stanza._push_notify then stanza._push_notify = {}; end
390 if stanza._push_notify[push_identifier] then 307 if stanza._push_notify[push_identifier] then
391 if log_push_decline then 308 if log_push_decline then
392 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); 309 module:log("debug", "Already sent push notification for %s to %s (%s)", user, push_info.push_jid, tostring(push_info.node));
393 end 310 end
394 send_push = false; 311 send_push = false;
395 end 312 end
396 stanza._push_notify[push_identifier] = true; 313 stanza._push_notify[push_identifier] = true;
397 end 314 end
455 end 372 end
456 373
457 -- archive message added 374 -- archive message added
458 local function archive_message_added(event) 375 local function archive_message_added(event)
459 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } 376 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
460 -- only notify for new mam messages when at least one device is online
461 local room = event.room; 377 local room = event.room;
462 local stanza = event.stanza; 378 local stanza = event.stanza;
463 local body = stanza:get_child_text('body'); 379 local room_name = jid.split(room.jid);
464 380
381 -- extract all real ocupant jids in room
382 occupants = {};
383 for nick, occupant in room:each_occupant() do
384 for jid in occupant:each_session() do
385 occupants[jid] = true;
386 end
387 end
388
389 -- check all push registered users against occupants list
390 for _, user in pairs(push_store:get_room_users(room_name)) do
391 -- send push if not found in occupants list
392 if not occupants[user] then
393 local push_service = push_store:get(room_name, user);
394 handle_notify_request(event.stanza, user, push_service);
395 end
396 end
397
398
399
400
401 liste der registrierten push user eines raumes durchgehen
402 jeder user der NICHT im muc ist, wird gepusht
403
404
405 handle_notify_request(event.stanza, jid, user_push_services, true);
406
407
408
465 for reference in stanza:childtags("reference", "urn:xmpp:reference:0") do 409 for reference in stanza:childtags("reference", "urn:xmpp:reference:0") do
466 if reference.attr['type'] == 'mention' and reference.attr['begin'] and reference.attr['end'] then 410 if reference.attr['type'] == 'mention' and reference.attr['begin'] and reference.attr['end'] then
467 local nick = extract_reference(body, reference.attr['begin'], reference.attr['end']); 411 local nick = extract_reference(body, reference.attr['begin'], reference.attr['end']);
468 local jid = room:get_registered_jid(nick); 412 local jid = room:get_registered_jid(nick);
469 413
481 end 425 end
482 426
483 module:hook("muc-add-history", archive_message_added); 427 module:hook("muc-add-history", archive_message_added);
484 428
485 local function send_ping(event) 429 local function send_ping(event)
486 local user = event.user; 430 local push_services = event.push_services;
487 local user_push_services = push_store:get(user); 431 if not push_services then
488 local push_services = event.push_services or user_push_services; 432 local room = event.room;
433 local user = event.user;
434 push_services = push_store:get(room, user);
435 end
489 handle_notify_request(nil, user, push_services, true); 436 handle_notify_request(nil, user, push_services, true);
490 end 437 end
491 -- can be used by other modules to ping one or more (or all) push endpoints 438 -- can be used by other modules to ping one or more (or all) push endpoints
492 module:hook("cloud-notify-ping", send_ping); 439 module:hook("muc-cloud-notify-ping", send_ping);
493 440
494 module:log("info", "Module loaded"); 441 module:log("info", "Module loaded");
495 function module.unload() 442 function module.unload()
496 if module.unhook then 443 if module.unhook then
497 module:unhook("account-disco-info", account_dico_info); 444 module:unhook("account-disco-info", account_dico_info);
503 450
504 for push_identifier, _ in pairs(push_errors) do 451 for push_identifier, _ in pairs(push_errors) do
505 local stanza_id = hashes.sha256(push_identifier, true); 452 local stanza_id = hashes.sha256(push_identifier, true);
506 module:unhook("iq-error/host/"..stanza_id, handle_push_error); 453 module:unhook("iq-error/host/"..stanza_id, handle_push_error);
507 module:unhook("iq-result/host/"..stanza_id, handle_push_success); 454 module:unhook("iq-result/host/"..stanza_id, handle_push_success);
508 id2node[stanza_id] = nil; 455 id2room[stanza_id] = nil;
456 id2user[stanza_id] = nil;
509 end 457 end
510 end 458 end
511 459
512 module:log("info", "Module unloaded"); 460 module:log("info", "Module unloaded");
513 end 461 end