comparison mod_pubsub_hub/mod_pubsub_hub.lua @ 764:d11d91ee81ed

mod_pubsub_hub: New module that implements the Hub part of PubSubHubbub
author Kim Alvefur <zash@zash.se>
date Wed, 01 Aug 2012 16:11:12 +0200
parents
children 1184fe8ebb21
comparison
equal deleted inserted replaced
763:bcf0c9fff512 764:d11d91ee81ed
1 -- Copyright (C) 2011 - 2012 Kim Alvefur
2 --
3 -- This file is MIT/X11 licensed.
4
5 local http = require "net.http";
6 local formdecode = http.formdecode;
7 local formencode = http.formencode;
8 local uuid = require "util.uuid".generate;
9 local hmac_sha1 = require "util.hmac".sha1;
10 local json_encode = require "util.json".encode;
11 local time = os.time;
12 local m_min, m_max = math.min, math.max;
13 local tostring = tostring;
14 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
15 local xmlns_pubsub_event = xmlns_pubsub .. "#event";
16 local subs_by_topic = module:shared"subscriptions";
17
18 local max_lease, min_lease, default_lease = 86400, 600, 3600;
19
20 module:depends"pubsub";
21
22 local valid_modes = { ["subscribe"] = true, ["unsubscribe"] = true, }
23
24 local function do_subscribe(subscription)
25 -- FIXME handle other states
26 if subscription.state == "subscribed" then
27 local ok, err = hosts[module.host].modules.pubsub.service:add_subscription(subscription.topic, true, module.host);
28 module:log(ok and "debug" or "error", "add_subscription() => %s, %s", tostring(ok), tostring(err));
29 end
30 end
31
32 local function handle_request(event)
33 local request, response = event.request, event.response;
34 local method, body = request.method, request.body;
35
36 local query = request.url.query or {};
37 if query and type(query) == "string" then
38 query = formdecode(query);
39 end
40 if body and request.headers.content_type == "application/x-www-form-urlencoded" then
41 body = formdecode(body);
42 end
43
44 if method == "POST" then
45 -- Subscription request
46 if body["hub.callback"] and body["hub.mode"] and valid_modes[body["hub.mode"]]
47 and body["hub.topic"] and body["hub.verify"] then
48
49 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5
50 local callback = body["hub.callback"];
51 local mode = body["hub.mode"];
52 local topic = body["hub.topic"];
53 local lease_seconds = m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease);
54 local secret = body["hub.secret"];
55 local verify_token = body["hub.verify_token"];
56
57 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic);
58
59 if not subs_by_topic[topic] then
60 subs_by_topic[topic] = {};
61 end
62 local subscription = subs_by_topic[topic][callback];
63
64 local verify_modes = {};
65 for i=1,#body do
66 if body[i].name == "hub.verify" then
67 verify_modes[body[i].value] = true;
68 end
69 end
70
71 subscription = subscription or {
72 id = uuid(),
73 callback = callback,
74 topic = topic,
75 state = "unsubscribed",
76 secret = secret,
77 want_state = mode,
78 lease_seconds = lease_seconds,
79 expires = time() + lease_seconds,
80 };
81 subs_by_topic[topic][callback] = subscription;
82 local challenge = uuid();
83
84 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
85 ["hub.mode"] = mode,
86 ["hub.topic"] = topic,
87 ["hub.challenge"] = challenge,
88 ["hub.lease_seconds"] = tostring(lease_seconds),
89 ["hub.verify_token"] = verify_token,
90 }
91 module:log("debug", require"util.serialization".serialize(verify_modes));
92 if verify_modes["async"] then
93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription));
94 http.request(callback_url, nil, function(body, code)
95 if body == challenge and code > 199 and code < 300 then
96 if not subscription.want_state then
97 module:log("warn", "Verification of already verified request, probably");
98 return;
99 end
100 subscription.state = subscription.want_state .. "d";
101 subscription.want_state = nil;
102 module:log("debug", "calling do_subscribe()");
103 do_subscribe(subscription);
104 subs_by_topic[topic][callback] = subscription;
105 else
106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body));
107 subs_by_topic[topic][callback] = subscription;
108 end
109 end)
110 return 202;
111 elseif verify_modes["sync"] then
112 http.request(callback_url, nil, function(body, code)
113 if body == challenge and code > 199 and code < 300 then
114 if not subscription.want_state then
115 module:log("warn", "Verification of already verified request, probably");
116 return;
117 end
118 if mode == "unsubscribe" then
119 subs_by_topic[topic][callback] = nil;
120 else
121 subscription.state = subscription.want_state .. "d";
122 subscription.want_state = nil;
123 module:log("debug", "calling do_subscribe()");
124 do_subscribe(subscription);
125 subs_by_topic[topic][callback] = subscription;
126 end
127 else
128 subs_by_topic[topic][callback] = subscription;
129 end
130 response.status = 204;
131 response:send();
132 end)
133 return true;
134 end
135 return 400;
136 else
137 response.status = 400;
138 response.headers.content_type = "text/html";
139 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n"
140 end
141 end
142 end
143
144 local function periodic()
145 local now = time();
146 local next_check = now + max_lease;
147 local purge = false
148 for topic, callbacks in pairs(subs_by_topic) do
149 for callback, subscription in pairs(callbacks) do
150 if subscription.mode == "subscribed" then
151 if subscription.expires < now then
152 -- Subscription has expired, drop it.
153 purge = true
154 end
155 if subscription.expires < now + min_lease then
156 -- Subscription set to expire soon, re-confirm it.
157 local challenge = uuid();
158 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
159 ["hub.mode"] = subscription.state,
160 ["hub.topic"] = topic,
161 ["hub.challenge"] = challenge,
162 ["hub.lease_seconds"] = subscription.lease_seconds,
163 ["hub.verify_token"] = subscription.verify_token,
164 }
165 http.request(callback_url, nil, function(body, code)
166 if body == challenge and code > 199 and code < 300 then
167 subscription.expires = now + subscription.lease_seconds;
168 end
169 end);
170 else
171 next_check = m_min(next_check, now - subscription.expires)
172 end
173 end
174 end
175 if purge then
176 local new_callbacks = {};
177 for callback, subscription in pairs(callbacks) do
178 if (subscription.state == "subscribed" and subscription.expires < now)
179 and subscription.want_state ~= "remove" then
180 new_callbacks[callback] = subscription;
181 end
182 end
183 subs_by_topic[topic] = new_callbacks
184 end
185 end
186 return m_max(next_check - min_lease, min_lease);
187 end
188
189 local function on_notify(subscription, content)
190 local body = tostring(content);
191 local headers = {
192 ["Content-Type"] = "application/xml",
193 };
194 if subscription.secret then
195 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true);
196 end
197 http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code)
198 if code >= 200 and code <= 299 then
199 module:log("debug", "Delivered");
200 else
201 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback));
202 -- TODO Retry
203 -- ... but the spec says that you should not retry, wtf?
204 end
205 end);
206 end
207
208 module:hook("message/host", function(event)
209 local stanza = event.stanza;
210 if stanza.attr.from ~= module.host then return end;
211
212 for pubsub_event in stanza:childtags("event", xmlns_pubsub_event) do
213 local items = pubsub_event:get_child("items");
214 local node = items.attr.node;
215 if items and node and subs_by_topic[node] then
216 for item in items:childtags("item") do
217 local content = item.tags[1];
218 for callback, subscription in pairs(subs_by_topic[node]) do
219 on_notify(subscription, content)
220 end
221 end
222 end
223 end
224 return true;
225 end, 10);
226
227 module:depends"http";
228 module:provides("http", {
229 default_path = "/hub";
230 route = {
231 POST = handle_request;
232 GET = function()
233 return json_encode(subs_by_topic);
234 end;
235 ["GET /topic/*"] = function(event, path)
236 return json_encode(subs_by_topic[path])
237 end;
238 };
239 });
240
241 module:add_timer(1, periodic);