comparison mod_pubsub_feed/mod_pubsub_feed.lua @ 668:343b115ebbea

mod_pubsub_feed: Cleanup and update to new APIs in 0.9
author Kim Alvefur <zash@zash.se>
date Mon, 21 May 2012 21:30:51 +0200
parents 0d3174d5a1cc
children
comparison
equal deleted inserted replaced
667:ea9941812721 668:343b115ebbea
14 -- 14 --
15 -- Reference 15 -- Reference
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html 16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html
17 17
18 local modules = hosts[module.host].modules; 18 local modules = hosts[module.host].modules;
19 if not modules.pubsub then 19 if not modules.pubsub or module:get_option("component_module") ~= "pubsub" then
20 --FIXME Should this throw an error() instead?
21 module:log("warn", "Pubsub needs to be loaded on this host"); 20 module:log("warn", "Pubsub needs to be loaded on this host");
22 end 21 --module:log("debug", "component_module is %s", tostring(module:get_option("component_module")));
23 22 return
24 23 end
25 local t_insert = table.insert; 24
26 local add_task = require "util.timer".add_task;
27 local date, time = os.date, os.time; 25 local date, time = os.date, os.time;
28 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; 26 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime;
29 local uuid = require "util.uuid".generate; 27 local uuid = require "util.uuid".generate;
30 local hmac_sha1 = require "util.hmac".sha1; 28 local hmac_sha1 = require "util.hmac".sha1;
31 local parse_feed = require "feeds".feed_from_string; 29 local parse_feed = require "feeds".feed_from_string;
32 local st = require "util.stanza"; 30 local st = require "util.stanza";
31 --local dump = require"util.serialization".serialize;
32
33 local xmlns_atom = "http://www.w3.org/2005/Atom";
34
35 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true);
36 if use_pubsubhubub then
37 module:depends"http";
38 end
33 39
34 local http = require "net.http"; 40 local http = require "net.http";
35 local httpserver = require "net.httpserver";
36 local formdecode = http.formdecode; 41 local formdecode = http.formdecode;
37 local formencode = http.formencode; 42 local formencode = http.formencode;
38 local urldecode = http.urldecode; 43 local urldecode = http.urldecode;
39 local urlencode = http.urlencode; 44 local urlencode = http.urlencode;
40 45
41 local feed_list = {}; 46 local feed_list = module:shared("feed_list");
42 local refresh_interval; 47 local refresh_interval;
43 48
44 -- Dynamically reloadable config. 49 -- Dynamically reloadable config.
45 local function update_config() 50 local function update_config()
46 local config = module:get_option("feeds") or { 51 local config = module:get_option("feeds") or {
64 end 69 end
65 end 70 end
66 update_config(); 71 update_config();
67 module:hook("config-reloaded", update_config); 72 module:hook("config-reloaded", update_config);
68 73
69 -- Used to kill the timer
70 local module_unloaded = false;
71 function module.unload()
72 module_unloaded = true;
73 end
74
75 -- Config stuff that can't be reloaded, since it would need to re-bind HTTP stuff.
76
77 -- If module.host IN A doesn't point to this server, use this to override.
78 local httphost = module:get_option_string("pubsubhubub_httphost", module.host);
79 -- HTTP by default or not?
80 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true);
81
82 -- Thanks to Maranda for this
83 local port, base, ssl = 5280, "callback", false;
84 local ports = module:get_option("feeds_ports") or { port = port, base = base, ssl = ssl };
85 -- FIXME If ports isn't a table, this will cause an error
86 local _, first_port = next(ports); -- We base the callback URL on the first port config
87 if first_port then
88 if type(first_port) == "number" then
89 port = first_port;
90 elseif type(first_port) == "table" then
91 port, base, ssl =
92 first_port.port or port,
93 first_port.path or base,
94 first_port.ssl or ssl;
95 elseif type(first_port) == "string" then
96 base = first_port;
97 end
98 end
99
100 local response_codes = {
101 ["200"] = "OK";
102 ["202"] = "Accepted";
103 ["400"] = "Bad Request";
104 ["403"] = "Forbidden";
105 ["404"] = "Not Found";
106 ["500"] = "Internal Server Error";
107 ["501"] = "Not Implemented";
108 };
109
110 local function http_response(code, headers, body)
111 return {
112 status = (type(code) == "number" and code .. " " .. response_codes[tostring(code)]) or code;
113 headers = headers or {};
114 body = body or "<h1>" .. response_codes[tostring(code)] .. "</h1>\n";
115 };
116 end
117
118 local actor = module.host.."/"..module.name; 74 local actor = module.host.."/"..module.name;
119 75
120 function update_entry(item) 76 function update_entry(item)
121 local node = item.node; 77 local node = item.node;
122 --module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node) 78 module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node)
123 local feed = parse_feed(item.data); 79 local feed = parse_feed(item.data);
124 module:log("debug", "updating node %s", node);
125 for _, entry in ipairs(feed) do 80 for _, entry in ipairs(feed) do
126 entry.attr.xmlns = "http://www.w3.org/2005/Atom"; 81 entry.attr.xmlns = xmlns_atom;
127 82
128 local e_published = entry:get_child_text("published"); 83 local e_published = entry:get_child_text("published");
129 e_published = e_published and dt_parse(e_published); 84 e_published = e_published and dt_parse(e_published);
130 local e_updated = entry:get_child_text("updated"); 85 local e_updated = entry:get_child_text("updated");
131 e_updated = e_updated and dt_parse(e_updated); 86 e_updated = e_updated and dt_parse(e_updated);
136 local id = entry:get_child_text("id"); 91 local id = entry:get_child_text("id");
137 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up 92 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up
138 local xitem = st.stanza("item", { id = id }):add_child(entry); 93 local xitem = st.stanza("item", { id = id }):add_child(entry);
139 -- TODO Put data from /feed into item/source 94 -- TODO Put data from /feed into item/source
140 95
141 module:log("debug", "publishing to %s, id %s", node, id); 96 --module:log("debug", "publishing to %s, id %s", node, id);
142 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); 97 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
143 if not ok then 98 if not ok then
144 if err == "item-not-found" then -- try again 99 if err == "item-not-found" then -- try again
145 module:log("debug", "got item-not-found, creating %s and trying again", node); 100 --module:log("debug", "got item-not-found, creating %s and trying again", node);
146 local ok, err = modules.pubsub.service:create(node, actor); 101 local ok, err = modules.pubsub.service:create(node, actor);
147 if not ok then 102 if not ok then
148 module:log("error", "could not create node %s: %s", node, err); 103 module:log("error", "could not create node %s: %s", node, err);
149 return; 104 return;
150 end 105 end
159 end 114 end
160 end 115 end
161 end 116 end
162 117
163 if use_pubsubhubub and not item.subscription then 118 if use_pubsubhubub and not item.subscription then
164 module:log("debug", "check if %s has a hub", item.node); 119 --module:log("debug", "check if %s has a hub", item.node);
165 local hub = feed.links and feed.links.hub; 120 local hub = feed.links and feed.links.hub;
166 if hub then 121 if hub then
167 item.hub = hub; 122 item.hub = hub;
168 module:log("debug", "%s has a hub: %s", item.node, item.hub); 123 module:log("debug", "%s has a hub: %s", item.node, item.hub);
169 subscribe(item); 124 subscribe(item);
173 128
174 function fetch(item, callback) -- HTTP Pull 129 function fetch(item, callback) -- HTTP Pull
175 local headers = { }; 130 local headers = { };
176 if item.data and item.last_update then 131 if item.data and item.last_update then
177 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update); 132 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update);
178 --COMPAT We could have saved 6 bytes here, but Microsoft apparently hates %T, so you got this gigantic comment instead.
179 end 133 end
180 http.request(item.url, { headers = headers }, function(data, code, req) 134 http.request(item.url, { headers = headers }, function(data, code, req)
181 if code == 200 then 135 if code == 200 then
182 item.data = data; 136 item.data = data;
183 if callback then callback(item) end 137 if callback then callback(item) end
184 item.last_update = time(); 138 item.last_update = time();
185 end 139 elseif code == 304 then
186 if code == 304 then
187 item.last_update = time(); 140 item.last_update = time();
188 end 141 end
189 end); 142 end);
190 end 143 end
191 144
192 function refresh_feeds() 145 function refresh_feeds()
193 local now = time(); 146 local now = time();
194 if module_unloaded then return end
195 --module:log("debug", "Refreshing feeds"); 147 --module:log("debug", "Refreshing feeds");
196 for node, item in pairs(feed_list) do 148 for node, item in pairs(feed_list) do
197 --FIXME Don't fetch feeds which have a subscription 149 --FIXME Don't fetch feeds which have a subscription
198 -- Otoho, what if the subscription expires or breaks? 150 -- Otoho, what if the subscription expires or breaks?
199 if item.last_update + refresh_interval < now then 151 if item.last_update + refresh_interval < now then
200 module:log("debug", "checking %s", item.node); 152 --module:log("debug", "checking %s", item.node);
201 fetch(item, update_entry); 153 fetch(item, update_entry);
202 end 154 end
203 end 155 end
204 return refresh_interval; 156 return refresh_interval;
205 end 157 end
206 158
207 local function format_url(secure, host, port, path, node) 159 local function format_url(node)
208 return ("%s://%s:%d/%s?node=%s"):format(secure and "https" or "http", host, port, path, urlencode(node)); 160 return module:http_url(nil, "/callback") .. "?node=" .. urlencode(node);
209 end 161 end
210 162
211 function subscribe(feed) 163 function subscribe(feed)
212 feed.token = uuid(); 164 feed.token = uuid();
213 feed.secret = uuid(); 165 feed.secret = uuid();
214 local body = formencode{ 166 local body = formencode{
215 ["hub.callback"] = format_url(ssl, httphost, port, base, feed.node); 167 ["hub.callback"] = format_url(feed.node);
216 ["hub.mode"] = "subscribe"; --TODO unsubscribe 168 ["hub.mode"] = "subscribe"; --TODO unsubscribe
217 ["hub.topic"] = feed.url; 169 ["hub.topic"] = feed.url;
218 ["hub.verify"] = "async"; 170 ["hub.verify"] = "async";
219 ["hub.verify_token"] = feed.token; 171 ["hub.verify_token"] = feed.token;
220 ["hub.secret"] = feed.secret; 172 ["hub.secret"] = feed.secret;
224 --module:log("debug", "subscription request, body: %s", body); 176 --module:log("debug", "subscription request, body: %s", body);
225 177
226 --FIXME The subscription states and related stuff 178 --FIXME The subscription states and related stuff
227 feed.subscription = "subscribe"; 179 feed.subscription = "subscribe";
228 http.request(feed.hub, { body = body }, function(data, code, req) 180 http.request(feed.hub, { body = body }, function(data, code, req)
229 local code = tostring(code); 181 module:log("debug", "subscription to %s submitted, status %s", feed.node, tostring(code));
230 module:log("debug", "subscription to %s submitted, status %s", feed.node, code); 182 if code >= 400 then
183 module:log("error", "There was something wrong with our subscription request, body: %s", tostring(data));
184 feed.subscription = "failed";
185 end
231 end); 186 end);
232 end 187 end
233 188
234 function handle_http_request(method, body, request) 189 function handle_http_request(event)
235 if module_unloaded then 190 local request = event.request;
236 module:log("warn", "Received a HTTP request after module unload"); 191 local method = request.method;
237 return http_response(500) 192 local body = request.body;
238 -- FIXME if this happens. 193
239 end
240 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty"); 194 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty");
241 local query = request.url.query or {}; 195 local query = request.url.query or {}; --FIXME
242 if query and type(query) == "string" then 196 if query and type(query) == "string" then
243 query = formdecode(query); 197 query = formdecode(query);
244 --module:log("debug", "GET data: %s", dump(query)); 198 --module:log("debug", "GET data: %s", dump(query));
245 end 199 end
246 --module:log("debug", "Headers: %s", dump(request.headers)); 200 --module:log("debug", "Headers: %s", dump(request.headers));
247 201
202 local feed = feed_list[query.node];
248 if method == "GET" then 203 if method == "GET" then
249 if query.node and feed_list[query.node] then 204 if query.node and feed then
250 local feed = feed_list[query.node];
251 if query["hub.topic"] ~= feed.url then 205 if query["hub.topic"] ~= feed.url then
252 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"])) 206 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"]))
253 return http_response(404) 207 return 404
254 end 208 end
255 if query["hub.mode"] ~= feed.subscription then 209 if query["hub.mode"] ~= feed.subscription then
256 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"])) 210 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"]))
257 return http_response(400) 211 return 400
258 -- Would this work for unsubscribe? 212 -- Would this work for unsubscribe?
259 -- Also, if feed.subscription is changed here, 213 -- Also, if feed.subscription is changed here,
260 -- it would probably invalidate the subscription 214 -- it would probably invalidate the subscription
261 -- when/if the hub asks if it should be renewed 215 -- when/if the hub asks if it should be renewed
262 end 216 end
263 if query["hub.verify_token"] ~= feed.token then 217 if query["hub.verify_token"] ~= feed.token then
264 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"])) 218 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"]))
265 return http_response(403) 219 return 401
266 end 220 end
267 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url) 221 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url)
268 return http_response(200, nil, query["hub.challenge"]) 222 return query["hub.challenge"];
269 end 223 end
270 return http_response(400); 224 return 400;
271 elseif method == "POST" then 225 elseif method == "POST" then
272 if #body > 0 and feed_list[query.node] then 226 local body = request.body;
227 if #body > 0 and feed then
273 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node); 228 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node);
274 local feed = feed_list[query.node]; 229 local signature = request.headers.x_hub_signature;
275 local signature = request.headers["x-hub-signature"];
276 if feed.secret then 230 if feed.secret then
277 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true); 231 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true);
278 if localsig ~= signature then 232 if localsig ~= signature then
279 module:log("debug", "Invalid signature"); 233 module:log("debug", "Invalid signature, got %s but wanted %s", tostring(signature), tostring(localsig));
280 return http_response(403); 234 return 401;
281 end 235 end
282 module:log("debug", "Valid signature"); 236 module:log("debug", "Valid signature");
283 end 237 end
284 feed.data = body; 238 feed.data = body;
285 update_entry(feed); 239 update_entry(feed);
286 feed.last_update = time(); 240 feed.last_update = time();
287 return http_response(202); 241 return 202;
288 end 242 end
289 return http_response(400); 243 return 400;
290 end 244 end
291 return http_response(501); 245 return 501;
292 end 246 end
293 247
294 function init() 248 if use_pubsubhubub then
295 module:log("debug", "initiating", module.name); 249 module:provides("http", {
296 if use_pubsubhubub then 250 default_path = "/callback";
297 module:log("debug", "Starting http server on %s", format_url(ssl, httphost, port, base, "NODE")); 251 route = {
298 httpserver.new_from_config( ports, handle_http_request, { base = "callback" } ); 252 GET = handle_http_request;
299 end 253 POST = handle_http_request;
300 add_task(0, refresh_feeds); 254 -- This all?
301 end 255 };
302 256 });
303 if prosody.start_time then -- already started 257 end
304 init(); 258
305 else 259 module:add_timer(1, refresh_feeds);
306 module:hook_global("server-started", init);
307 end