diff mod_pubsub_hub/mod_pubsub_hub.lua @ 764:d11d91ee81ed

mod_pubsub_hub: New module that implements the Hub part of PubSubHubbub
author Kim Alvefur <zash@zash.se>
date Wed, 01 Aug 2012 16:11:12 +0200
parents
children 1184fe8ebb21
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_hub/mod_pubsub_hub.lua	Wed Aug 01 16:11:12 2012 +0200
@@ -0,0 +1,241 @@
+-- Copyright (C) 2011 - 2012 Kim Alvefur
+--
+-- This file is MIT/X11 licensed.
+
+local http = require "net.http";
+local formdecode = http.formdecode;
+local formencode = http.formencode;
+local uuid = require "util.uuid".generate;
+local hmac_sha1 = require "util.hmac".sha1;
+local json_encode = require "util.json".encode;
+local time = os.time;
+local m_min, m_max = math.min, math.max;
+local tostring = tostring;
+local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
+local xmlns_pubsub_event = xmlns_pubsub .. "#event";
+local subs_by_topic = module:shared"subscriptions";
+
+local max_lease, min_lease, default_lease = 86400, 600, 3600;
+
+module:depends"pubsub";
+
+local valid_modes = { ["subscribe"] = true, ["unsubscribe"] = true, }
+
+local function do_subscribe(subscription)
+	-- FIXME handle other states
+	if subscription.state == "subscribed" then
+		local ok, err = hosts[module.host].modules.pubsub.service:add_subscription(subscription.topic, true, module.host);
+		module:log(ok and "debug" or "error", "add_subscription() => %s, %s", tostring(ok), tostring(err));
+	end
+end
+
+local function handle_request(event)
+	local request, response = event.request, event.response;
+	local method, body = request.method, request.body;
+
+	local query = request.url.query or {};
+	if query and type(query) == "string" then
+		query = formdecode(query);
+	end
+	if body and request.headers.content_type == "application/x-www-form-urlencoded" then
+		body = formdecode(body);
+	end
+
+	if method == "POST" then
+		-- Subscription request
+		if body["hub.callback"] and body["hub.mode"] and valid_modes[body["hub.mode"]]
+			and body["hub.topic"] and body["hub.verify"] then
+
+			-- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5
+			local callback = body["hub.callback"];
+			local mode = body["hub.mode"];
+			local topic = body["hub.topic"];
+			local lease_seconds = m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease);
+			local secret = body["hub.secret"];
+			local verify_token = body["hub.verify_token"];
+
+			module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic);
+
+			if not subs_by_topic[topic] then
+				subs_by_topic[topic] = {};
+			end
+			local subscription = subs_by_topic[topic][callback];
+
+			local verify_modes = {};
+			for i=1,#body do
+				if body[i].name == "hub.verify" then
+					verify_modes[body[i].value] = true;
+				end
+			end
+
+			subscription = subscription or {
+				id = uuid(),
+				callback = callback,
+				topic = topic,
+				state = "unsubscribed",
+				secret = secret,
+				want_state = mode,
+				lease_seconds = lease_seconds,
+				expires = time() + lease_seconds,
+			};
+			subs_by_topic[topic][callback] = subscription;
+			local challenge = uuid();
+
+			local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
+				["hub.mode"] = mode,
+				["hub.topic"] = topic,
+				["hub.challenge"] = challenge,
+				["hub.lease_seconds"] = tostring(lease_seconds),
+				["hub.verify_token"] = verify_token,
+			}
+			module:log("debug", require"util.serialization".serialize(verify_modes));
+			if verify_modes["async"] then
+				module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription));
+				http.request(callback_url, nil, function(body, code)
+					if body == challenge and code > 199 and code < 300 then
+						if not subscription.want_state then
+							module:log("warn", "Verification of already verified request, probably");
+							return;
+						end
+						subscription.state = subscription.want_state .. "d";
+						subscription.want_state = nil;
+						module:log("debug", "calling do_subscribe()");
+						do_subscribe(subscription);
+						subs_by_topic[topic][callback] = subscription;
+					else
+						module:log("warn", "status %d and body was %q", tostring(code), tostring(body));
+						subs_by_topic[topic][callback] = subscription;
+					end
+				end)
+				return 202;
+			elseif verify_modes["sync"] then
+				http.request(callback_url, nil, function(body, code)
+					if body == challenge and code > 199 and code < 300 then
+						if not subscription.want_state then
+							module:log("warn", "Verification of already verified request, probably");
+							return;
+						end
+						if mode == "unsubscribe" then
+							subs_by_topic[topic][callback] = nil;
+						else
+							subscription.state = subscription.want_state .. "d";
+							subscription.want_state = nil;
+							module:log("debug", "calling do_subscribe()");
+							do_subscribe(subscription);
+							subs_by_topic[topic][callback] = subscription;
+						end
+					else
+						subs_by_topic[topic][callback] = subscription;
+					end
+					response.status = 204;
+					response:send();
+				end)
+				return true;
+			end
+			return 400;
+		else
+			response.status = 400;
+			response.headers.content_type = "text/html";
+			return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n"
+		end
+	end
+end
+
+local function periodic()
+	local now = time();
+	local next_check = now + max_lease;
+	local purge = false
+	for topic, callbacks in pairs(subs_by_topic) do
+		for callback, subscription in pairs(callbacks) do
+			if subscription.mode == "subscribed" then
+				if subscription.expires < now then
+					-- Subscription has expired, drop it.
+					purge = true
+				end
+				if subscription.expires < now + min_lease  then
+					-- Subscription set to expire soon, re-confirm it.
+					local challenge = uuid();
+					local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
+						["hub.mode"] = subscription.state,
+						["hub.topic"] = topic,
+						["hub.challenge"] = challenge,
+						["hub.lease_seconds"] = subscription.lease_seconds,
+						["hub.verify_token"] = subscription.verify_token,
+					}
+					http.request(callback_url, nil, function(body, code)
+						if body == challenge and code > 199 and code < 300 then
+							subscription.expires = now + subscription.lease_seconds;
+						end
+					end);
+				else
+					next_check = m_min(next_check, now - subscription.expires)
+				end
+			end
+		end
+		if purge then
+			local new_callbacks = {};
+			for callback, subscription in pairs(callbacks) do
+				if (subscription.state == "subscribed" and subscription.expires < now)
+					and subscription.want_state ~= "remove" then
+					new_callbacks[callback] = subscription;
+				end
+			end
+			subs_by_topic[topic] = new_callbacks
+		end
+	end
+	return m_max(next_check - min_lease, min_lease);
+end
+
+local function on_notify(subscription, content)
+	local body = tostring(content);
+	local headers = {
+		["Content-Type"] = "application/xml",
+	};
+	if subscription.secret then
+		headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true);
+	end
+	http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code)
+		if code >= 200 and code <= 299 then
+			module:log("debug", "Delivered");
+		else
+			module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback));
+			-- TODO Retry
+			-- ... but the spec says that you should not retry, wtf?
+		end
+	end);
+end
+
+module:hook("message/host", function(event)
+	local stanza = event.stanza;
+	if stanza.attr.from ~= module.host then return end;
+
+	for pubsub_event in stanza:childtags("event", xmlns_pubsub_event) do
+		local items = pubsub_event:get_child("items");
+		local node = items.attr.node;
+		if items and node and subs_by_topic[node] then
+			for item in items:childtags("item") do
+				local content = item.tags[1];
+				for callback, subscription in pairs(subs_by_topic[node]) do
+					on_notify(subscription, content)
+				end
+			end
+		end
+	end
+	return true;
+end, 10);
+
+module:depends"http";
+module:provides("http", {
+	default_path = "/hub";
+	route = {
+		POST = handle_request;
+		GET = function()
+			return json_encode(subs_by_topic);
+		end;
+		["GET /topic/*"] = function(event, path)
+			return json_encode(subs_by_topic[path])
+		end;
+	};
+});
+
+module:add_timer(1, periodic);