view mod_push2/mod_push2.lua @ 5668:ecfd7aece33b

mod_measure_modules: Report module statuses via OpenMetrics Someone in the chat asked about a health check endpoint, which reminded me of mod_http_status, which provides access to module statuses with full details. After that, this idea came about, which seems natural. As noted in the README, it could be used to monitor that critical modules are in fact loaded correctly. As more modules use the status API, the more useful this module and mod_http_status becomes.
author Kim Alvefur <zash@zash.se>
date Fri, 06 Oct 2023 18:34:39 +0200
parents 4b052598e435
children 83ee752f148c
line wrap: on
line source

local os_time = os.time;
local st = require"util.stanza";
local jid = require"util.jid";
local hashes = require"util.hashes";
local random = require"util.random";
local watchdog = require "util.watchdog";
local uuid = require "util.uuid";
local base64 = require "util.encodings".base64;
local ciphers = require "openssl.cipher";
local pkey = require "openssl.pkey";
local kdf = require "openssl.kdf";
local jwt = require "util.jwt";

local xmlns_push = "urn:xmpp:push2:0";

-- configuration
local contact_uri = module:get_option_string("contact_uri", "xmpp:" .. module.host)
local extended_hibernation_timeout = module:get_option_number("push_max_hibernation_timeout", 72*3600)  -- use same timeout like ejabberd

local host_sessions = prosody.hosts[module.host].sessions
local push2_registrations = module:open_store("push2_registrations", "keyval")

if _VERSION:match("5%.1") or _VERSION:match("5%.2") then
	module:log("warn", "This module may behave incorrectly on Lua before 5.3. It is recommended to upgrade to a newer Lua version.")
end

local function account_dico_info(event)
	(event.reply or event.stanza):tag("feature", {var=xmlns_push}):up()
end
module:hook("account-disco-info", account_dico_info);

local function parse_match(matchel)
		local match = { match = matchel.attr.profile }
		local send = matchel:get_child("send", "urn:xmpp:push2:send:notify-only:0")
		if send then
			match.send = send.attr.xmlns
			return match
		end

		send = matchel:get_child("send", "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0")
		if send then
			match.send = send.attr.xmlns
			match.ua_public = send:get_child_text("ua-public")
			match.auth_secret = send:get_child_text("auth-secret")
			match.jwt_alg = send:get_child_text("jwt-alg")
			match.jwt_key = send:get_child_text("jwt-key")
			match.jwt_claims = {}
			for claim in send:childtags("jwt-claim") do
				match.jwt_claims[claim.attr.name] = claim:get_text()
			end
			return match
		end

		return nil
end

local function push_enable(event)
	local origin, stanza = event.origin, event.stanza;
	local enable = stanza.tags[1];
	origin.log("debug", "Attempting to enable push notifications")
	-- MUST contain a jid of the push service being enabled
	local service_jid = enable:get_child_text("service")
	-- MUST contain a string to identify the client fo the push service
	local client = enable:get_child_text("client")
	if not service_jid then
		origin.log("debug", "Push notification enable request missing service")
		origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing service"))
		return true
	end
	if not client then
		origin.log("debug", "Push notification enable request missing client")
		origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing client"))
		return true
	end
	if service_jid == stanza.attr.from then
		origin.log("debug", "Push notification enable request service JID identical to our own")
		origin.send(st.error_reply(stanza, "modify", "bad-request", "JID must be different from ours"))
		return true
	end
	local matches = {}
	for matchel in enable:childtags("match") do
		local match = parse_match(matchel)
		if match then
			matches[#matches + 1] = match
		end
	end
	-- Tie registration to client, via client_id with sasl2 or else fallback to resource
	local registration_id = origin.client_id or origin.resource
	local push_registration = {
		service = service_jid;
		client = client;
		timestamp = os_time();
		matches = matches;
	};
	-- TODO: can we move to keyval+ on trunk?
	local registrations = push2_registrations:get(origin.username) or {}
	registrations[registration_id] = push_registration
	if not push2_registrations:set(origin.username, registrations) then
		origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
	else
		origin.push_registration_id = registration_id
		origin.push_registration = push_registration
		origin.first_hibernated_push = nil
		origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(service_jid))
		origin.send(st.reply(stanza))
	end
	return true
end
module:hook("iq-set/self/"..xmlns_push..":enable", push_enable)

-- urgent stanzas should be delivered without delay
local function is_voip(stanza)
	if stanza.name == "message" then
		if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then
			return true, "jingle call"
		end
	end
end

local function has_body(stanza)
	-- We can't check for body contents in encrypted messages, so let's treat them as important
	-- Some clients don't even set a body or an empty body for encrypted messages

	-- check omemo https://xmpp.org/extensions/inbox/omemo.html
	if stanza:get_child("encrypted", "eu.siacs.conversations.axolotl") or stanza:get_child("encrypted", "urn:xmpp:omemo:0") then return true; end

	-- check xep27 pgp https://xmpp.org/extensions/xep-0027.html
	if stanza:get_child("x", "jabber:x:encrypted") then return true; end

	-- check xep373 pgp (OX) https://xmpp.org/extensions/xep-0373.html
	if stanza:get_child("openpgp", "urn:xmpp:openpgp:0") then return true; end

	local body = stanza:get_child_text("body");

	return body ~= nil and body ~= ""
end

-- is this push a high priority one
local function is_important(stanza)
	local is_voip_stanza, urgent_reason = is_voip(stanza)
	if is_voip_stanza then return true; end

	local st_name = stanza and stanza.name or nil
	if not st_name then return false; end -- nonzas are never important here
	if st_name == "presence" then
		return false; -- same for presences
	elseif st_name == "message" then
		-- unpack carbon copied message stanzas
		local carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message")
		local stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"
		if carbon then stanza = carbon; end
		local st_type = stanza.attr.type

		-- headline message are always not important
		if st_type == "headline" then return false; end

		-- carbon copied outgoing messages are not important
		if carbon and stanza_direction == "out" then return false; end

		-- groupchat subjects are not important here
		if st_type == "groupchat" and stanza:get_child_text("subject") then
			return false
		end

		-- empty bodies are not important
		return has_body(stanza)
	end
	return false;		-- this stanza wasn't one of the above cases --> it is not important, too
end

local function add_sce_rfc8291(match, stanza, push_notification_payload)
	local max_data_size = 2847 -- https://github.com/web-push-libs/web-push-php/issues/108
	local stanza_clone = st.clone(stanza)
	stanza_clone.attr.xmlns = "jabber:client"
	local envelope = st.stanza("envelope", { xmlns = "urn:xmpp:sce:1" })
		:tag("content")
		:tag("forwarded", { xmlns = "urn:xmpp:forward:0" })
		:add_child(stanza_clone)
		:up():up():up()
	local envelope_bytes = tostring(envelope)
	if string.len(envelope_bytes) > max_data_size then
		-- If stanza is too big, remove extra elements
		stanza_clone:maptags(function(el)
			if el.attr.xmlns == nil or
				el.attr.xmlns == "jabber:client" or
				el.attr.xmlns == "jabber:x:oob" or
				(el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
				el.attr.xmlns == "eu.siacs.conversations.axolotl" or
				el.attr.xmlns == "urn:xmpp:omemo:0" or
				el.attr.xmlns == "jabber:x:encrypted" or
				el.attr.xmlns == "urn:xmpp:openpgp:0" or
				el.attr.xmlns == "urn:xmpp:sce:1" or
				el.attr.xmlns == "urn:xmpp:jingle-message:0" or
				el.attr.xmlns == "jabber:x:conference"
			then
				return el
			else
				return nil
			end
		end)
		envelope_bytes = tostring(envelope)
	end
	if string.len(envelope_bytes) > max_data_size then
		local body = stanza:get_child_text("body")
		if string.len(body) > 50 then
			stanza_clone:maptags(function(el)
				if el.name == "body" then
					return nil
				else
					return el
				end
			end)

			body = string.gsub(string.gsub("\n" .. body, "\n>[^\n]*", ""), "^%s", "")
			stanza_clone:body(body:sub(1, utf8.offset(body, 50)) .. "…")
			envelope_bytes = tostring(envelope)
		end
	end
	if string.len(envelope_bytes) > max_data_size then
		-- If still too big, get aggressive
		stanza_clone:maptags(function(el)
			if el.name == "body" or
				(el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
				el.attr.xmlns == "urn:xmpp:jingle-message:0" or
				el.attr.xmlns == "jabber:x:conference"
			then
				return el
			else
				return nil
			end
		end)
		envelope_bytes = tostring(envelope)
	end
	if string.len(envelope_bytes) < max_data_size/2 then
		envelope:text_tag("rpad", base64.encode(random.bytes(math.min(150, max_data_size/3 - string.len(envelope_bytes)))))
		envelope_bytes = tostring(envelope)
	end

	local p256dh_raw = base64.decode(match.ua_public .. "==")
	local p256dh = pkey.new(p256dh_raw, "*", "public", "prime256v1")
	local one_time_key = pkey.new({ type = "EC", curve = "prime256v1" })
	local one_time_key_public = one_time_key:getParameters().pub_key:toBinary()
	local info = "WebPush: info\0" .. p256dh_raw .. one_time_key_public
	local auth_secret = base64.decode(match.auth_secret .. "==")
	local salt = random.bytes(16)
	local shared_secret = one_time_key:derive(p256dh)
	local ikm = kdf.derive({
		type = "HKDF",
		outlen = 32,
		salt = auth_secret,
		key = shared_secret,
		info = info,
		md = "sha256"
	})
	local key = kdf.derive({
		type = "HKDF",
		outlen = 16,
		salt = salt,
		key = ikm,
		info = "Content-Encoding: aes128gcm\0",
		md = "sha256"
	})
	local nonce = kdf.derive({
		type = "HKDF",
		outlen = 12,
		salt = salt,
		key = ikm,
		info = "Content-Encoding: nonce\0",
		md = "sha256"
	})
	local header = salt .. "\0\0\16\0" .. string.char(string.len(one_time_key_public)) .. one_time_key_public
	local encryptor = ciphers.new("AES-128-GCM"):encrypt(key, nonce)

	push_notification_payload
		:tag("encrypted", { xmlns = "urn:xmpp:sce:rfc8291:0" })
		:text_tag("payload", base64.encode(header .. encryptor:final(envelope_bytes .. "\2") .. encryptor:getTag(16)))
		:up()
end

local function add_rfc8292(match, stanza, push_notification_payload)
	if not match.jwt_alg then return; end
	local key = match.jwt_key
	if match.jwt_alg ~= "HS256" then
		-- keypairs are in PKCS#8 PEM format without header/footer
		key = "-----BEGIN PRIVATE KEY-----\n"..key.."\n-----END PRIVATE KEY-----"
	end

	local public_key = pkey.new(key):getParameters().pub_key:toBinary()
	local signer = jwt.new_signer(match.jwt_alg, key)
	local payload = {}
	for k, v in pairs(match.jwt_claims or {}) do
		payload[k] = v
	end
	payload.sub = contact_uri
	push_notification_payload:text_tag("jwt", signer(payload), { key = base64.encode(public_key) })
end

local function handle_notify_request(stanza, node, user_push_services, log_push_decline)
	local pushes = 0;
	if not #user_push_services then return pushes end

	local notify_push_services = {};
	if is_important(stanza) then
		notify_push_services = user_push_services
	else
		for identifier, push_info in pairs(user_push_services) do
			for _, match in ipairs(push_info.matches) do
				if match.match == "urn:xmpp:push2:match:important" then
					identifier_found.log("debug", "Not pushing because not important")
				else
					notify_push_services[identifier] = push_info;
				end
			end
		end
	end

	for push_registration_id, push_info in pairs(notify_push_services) do
		local send_push = true;		-- only send push to this node when not already done for this stanza or if no stanza is given at all
		if stanza then
			if not stanza._push_notify2 then stanza._push_notify2 = {}; end
			if stanza._push_notify2[push_registration_id] then
				if log_push_decline then
					module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
				end
				send_push = false;
			end
			stanza._push_notify2[push_registration_id] = true;
		end

		if send_push then
			local push_notification_payload = st.stanza("notification", { xmlns = xmlns_push })
			push_notification_payload:text_tag("client", push_info.client)
			push_notification_payload:text_tag("priority", is_voip(stanza) and "high" or (is_important(stanza) and "normal" or "low"))
			if is_voip(stanza) then
				push_notification_payload:tag("voip"):up()
			end

			local sends_added = {};
			for _, match in ipairs(push_info.matches) do
				local does_match = false;
				if match.match == "urn:xmpp:push2:match:all" then
					does_match = true
				elseif match.match == "urn:xmpp:push2:match:important" then
					does_match = is_important(stanza)
				elseif match.match == "urn:xmpp:push2:match:archived" then
					does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0")
				elseif match.match == "urn:xmpp:push2:match:archived-with-body" then
					does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0") and has_body(stanza)
				end

				if does_match and not sends_added[match.send] then
					sends_added[match.send] = true
					if match.send == "urn:xmpp:push2:send:notify-only" then
						-- Nothing more to add
					elseif match.send == "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0" then
						add_sce_rfc8291(match, stanza, push_notification_payload)
						add_rfc8292(match, stanza, push_notification_payload)
					else
						module:log("debug", "Unkonwn send profile: " .. push_info.send)
					end
				end
			end

			local push_publish = st.message({ to = push_info.service, from = module.host, id = uuid.generate() })
				:add_child(push_notification_payload):up()

			-- TODO: watch for message error replies and count or something
			module:send(push_publish)
			pushes = pushes + 1
		end
	end

	return pushes
end

-- small helper function to extract relevant push settings
local function get_push_settings(stanza, session)
	local to = stanza.attr.to
	local node = to and jid.split(to) or session.username
	local user_push_services = push2_registrations:get(node)
	return node, (user_push_services or {})
end

-- publish on offline message
module:hook("message/offline/handle", function(event)
	local node, user_push_services = get_push_settings(event.stanza, event.origin);
	module:log("debug", "Invoking handle_notify_request() for offline stanza");
	handle_notify_request(event.stanza, node, user_push_services, true);
end, 1);

-- publish on bare groupchat
-- this picks up MUC messages when there are no devices connected
module:hook("message/bare/groupchat", function(event)
	local node, user_push_services = get_push_settings(event.stanza, event.origin);
	local notify_push_services = {};
	for identifier, push_info in pairs(user_push_services) do
		for _, match in ipairs(push_info.matches) do
			if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
				identifier_found.log("debug", "Not pushing because we are not archiving this stanza")
			else
				notify_push_services[identifier] = push_info;
			end
		end
	end

	handle_notify_request(event.stanza, node, notify_push_services, true);
end, 1);

local function process_stanza_queue(queue, session, queue_type)
	if not session.push_registration_id then return; end
	local user_push_services = {[session.push_registration_id] = session.push_settings};
	local notified = { unimportant = false; important = false }
	for i=1, #queue do
		local stanza = queue[i];
		-- fast ignore of already pushed stanzas
		if stanza and not (stanza._push_notify2 and stanza._push_notify2[session.push_registration_id]) then
			local node = get_push_settings(stanza, session);
			local stanza_type = "unimportant";
			if is_important(stanza) then stanza_type = "important"; end
			if not notified[stanza_type] then		-- only notify if we didn't try to push for this stanza type already
				if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then
					if session.hibernating and not session.first_hibernated_push then
						-- if the message was important
						-- then record the time of first push in the session for the smack module which will extend its hibernation
						-- timeout based on the value of session.first_hibernated_push
						if is_important(stanza) then
							session.first_hibernated_push = os_time();
							-- check for prosody 0.12 mod_smacks
							if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
								-- restore old smacks watchdog (--> the start of our original timeout will be delayed until first push)
								session.hibernating_watchdog:cancel();
								session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
							end
						end
					end
					notified[stanza_type] = true
				end
			end
		end
		if notified.unimportant and notified.important then break; end		-- stop processing the queue if all push types are exhausted
	end
end

-- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
local function process_stanza(session, stanza)
	if session.push_registration_id then
		session.log("debug", "adding new stanza to push_queue");
		if not session.push_queue then session.push_queue = {}; end
		local queue = session.push_queue;
		queue[#queue+1] = st.clone(stanza);
		if not session.awaiting_push_timer then		-- timer not already running --> start new timer
			session.awaiting_push_timer = module:add_timer(1.0, function ()
				process_stanza_queue(session.push_queue, session, "push");
				session.push_queue = {};		-- clean up queue after push
				session.awaiting_push_timer = nil;
			end);
		end
	end
	return stanza;
end

local function process_smacks_stanza(event)
	local session = event.origin;
	local stanza = event.stanza;
	if not session.push_registration_id then
		session.log("debug", "NOT invoking handle_notify_request() for newly smacks queued stanza (session.push_registration_id is not set: %s)",
			session.push_registration_id
		);
	else
		process_stanza(session, stanza)
	end
end

-- smacks hibernation is started
local function hibernate_session(event)
	local session = event.origin;
	local queue = event.queue;
	session.first_hibernated_push = nil;
	if session.push_registration_id and session.hibernating_watchdog then -- check for prosody 0.12 mod_smacks
		-- save old watchdog callback and timeout
		session.original_smacks_callback = session.hibernating_watchdog.callback;
		session.original_smacks_timeout = session.hibernating_watchdog.timeout;
		-- cancel old watchdog and create a new watchdog with extended timeout
		session.hibernating_watchdog:cancel();
		session.hibernating_watchdog = watchdog.new(extended_hibernation_timeout, function()
			session.log("debug", "Push-extended smacks watchdog triggered");
			if session.original_smacks_callback then
				session.log("debug", "Calling original smacks watchdog handler");
				session.original_smacks_callback();
			end
		end);
	end
	-- process unacked stanzas
	process_stanza_queue(queue, session, "smacks");
end

-- smacks hibernation is ended
local function restore_session(event)
	local session = event.resumed;
	if session then		-- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
		if session.awaiting_push_timer then
			session.awaiting_push_timer:stop();
			session.awaiting_push_timer = nil;
		end
		session.first_hibernated_push = nil;
		-- the extended smacks watchdog will be canceled by the smacks module, no need to anything here
	end
end

-- smacks ack is delayed
local function ack_delayed(event)
	local session = event.origin;
	local queue = event.queue;
	local stanza = event.stanza;
	if not session.push_registration_id then return; end
	if stanza then process_stanza(session, stanza); return; end		-- don't iterate through smacks queue if we know which stanza triggered this
	for i=1, #queue do
		local queued_stanza = queue[i];
		-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
		process_stanza(session, queued_stanza);
	end
end

-- archive message added
local function archive_message_added(event)
	-- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
	-- only notify for new mam messages when at least one device is online
	if not event.for_user or not host_sessions[event.for_user] then return; end
	-- Note that the stanza in the event is a clone not the same as other hooks, so dedupe doesn't work
	-- This is a problem if you wan to to also hook offline message storage for example
	local stanza = st.clone(event.stanza)
	stanza:tag("stanza-id", { xmlns = "urn:xmpp:sid:0", by = event.for_user.."@"..module.host, id = event.id }):up()
	local user_session = host_sessions[event.for_user] and host_sessions[event.for_user].sessions or {}
	local to = stanza.attr.to
	to = to and jid.split(to) or event.origin.username

	-- only notify if the stanza destination is the mam user we store it for
	if event.for_user == to then
		local user_push_services = push2_registrations:get(to)

		-- Urgent stanzas are time-sensitive (e.g. calls) and should
		-- be pushed immediately to avoid getting stuck in the smacks
		-- queue in case of dead connections, for example
		local is_voip_stanza, urgent_reason = is_voip(stanza);

		local notify_push_services;
		if is_voip_stanza then
			module:log("debug", "Urgent push for %s (%s)", to, urgent_reason);
			notify_push_services = user_push_services;
		else
			-- only notify nodes with no active sessions (smacks is counted as active and handled separate)
			notify_push_services = {};
			for identifier, push_info in pairs(user_push_services) do
				local identifier_found = nil;
				for _, session in pairs(user_session) do
					if session.push_registration_id == identifier then
						identifier_found = session;
						break;
					end
				end
				if identifier_found then
					identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (session still alive)", identifier)
				else
					notify_push_services[identifier] = push_info
				end
			end
		end

		handle_notify_request(stanza, to, notify_push_services, true);
	end
end

module:hook("smacks-hibernation-start", hibernate_session);
module:hook("smacks-hibernation-end", restore_session);
module:hook("smacks-ack-delayed", ack_delayed);
module:hook("smacks-hibernation-stanza-queued", process_smacks_stanza);
module:hook("archive-message-added", archive_message_added);

module:log("info", "Module loaded");
function module.unload()
	module:log("info", "Unloading module");
	-- cleanup some settings, reloading this module can cause process_smacks_stanza() to stop working otherwise
	for user, _ in pairs(host_sessions) do
		for _, session in pairs(host_sessions[user].sessions) do
			if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
			session.awaiting_push_timer = nil;
			session.push_queue = nil;
			session.first_hibernated_push = nil;
			-- check for prosody 0.12 mod_smacks
			if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
				-- restore old smacks watchdog
				session.hibernating_watchdog:cancel();
				session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
			end
		end
	end
	module:log("info", "Module unloaded");
end