Mercurial > prosody-modules
comparison mod_pubsub_feed/mod_pubsub_feed.lua @ 668:343b115ebbea
mod_pubsub_feed: Cleanup and update to new APIs in 0.9
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 21 May 2012 21:30:51 +0200 |
parents | 0d3174d5a1cc |
children |
comparison
equal
deleted
inserted
replaced
667:ea9941812721 | 668:343b115ebbea |
---|---|
14 -- | 14 -- |
15 -- Reference | 15 -- Reference |
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html | 16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html |
17 | 17 |
18 local modules = hosts[module.host].modules; | 18 local modules = hosts[module.host].modules; |
19 if not modules.pubsub then | 19 if not modules.pubsub or module:get_option("component_module") ~= "pubsub" then |
20 --FIXME Should this throw an error() instead? | |
21 module:log("warn", "Pubsub needs to be loaded on this host"); | 20 module:log("warn", "Pubsub needs to be loaded on this host"); |
22 end | 21 --module:log("debug", "component_module is %s", tostring(module:get_option("component_module"))); |
23 | 22 return |
24 | 23 end |
25 local t_insert = table.insert; | 24 |
26 local add_task = require "util.timer".add_task; | |
27 local date, time = os.date, os.time; | 25 local date, time = os.date, os.time; |
28 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; | 26 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; |
29 local uuid = require "util.uuid".generate; | 27 local uuid = require "util.uuid".generate; |
30 local hmac_sha1 = require "util.hmac".sha1; | 28 local hmac_sha1 = require "util.hmac".sha1; |
31 local parse_feed = require "feeds".feed_from_string; | 29 local parse_feed = require "feeds".feed_from_string; |
32 local st = require "util.stanza"; | 30 local st = require "util.stanza"; |
31 --local dump = require"util.serialization".serialize; | |
32 | |
33 local xmlns_atom = "http://www.w3.org/2005/Atom"; | |
34 | |
35 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true); | |
36 if use_pubsubhubub then | |
37 module:depends"http"; | |
38 end | |
33 | 39 |
34 local http = require "net.http"; | 40 local http = require "net.http"; |
35 local httpserver = require "net.httpserver"; | |
36 local formdecode = http.formdecode; | 41 local formdecode = http.formdecode; |
37 local formencode = http.formencode; | 42 local formencode = http.formencode; |
38 local urldecode = http.urldecode; | 43 local urldecode = http.urldecode; |
39 local urlencode = http.urlencode; | 44 local urlencode = http.urlencode; |
40 | 45 |
41 local feed_list = {}; | 46 local feed_list = module:shared("feed_list"); |
42 local refresh_interval; | 47 local refresh_interval; |
43 | 48 |
44 -- Dynamically reloadable config. | 49 -- Dynamically reloadable config. |
45 local function update_config() | 50 local function update_config() |
46 local config = module:get_option("feeds") or { | 51 local config = module:get_option("feeds") or { |
64 end | 69 end |
65 end | 70 end |
66 update_config(); | 71 update_config(); |
67 module:hook("config-reloaded", update_config); | 72 module:hook("config-reloaded", update_config); |
68 | 73 |
69 -- Used to kill the timer | |
70 local module_unloaded = false; | |
71 function module.unload() | |
72 module_unloaded = true; | |
73 end | |
74 | |
75 -- Config stuff that can't be reloaded, since it would need to re-bind HTTP stuff. | |
76 | |
77 -- If module.host IN A doesn't point to this server, use this to override. | |
78 local httphost = module:get_option_string("pubsubhubub_httphost", module.host); | |
79 -- HTTP by default or not? | |
80 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true); | |
81 | |
82 -- Thanks to Maranda for this | |
83 local port, base, ssl = 5280, "callback", false; | |
84 local ports = module:get_option("feeds_ports") or { port = port, base = base, ssl = ssl }; | |
85 -- FIXME If ports isn't a table, this will cause an error | |
86 local _, first_port = next(ports); -- We base the callback URL on the first port config | |
87 if first_port then | |
88 if type(first_port) == "number" then | |
89 port = first_port; | |
90 elseif type(first_port) == "table" then | |
91 port, base, ssl = | |
92 first_port.port or port, | |
93 first_port.path or base, | |
94 first_port.ssl or ssl; | |
95 elseif type(first_port) == "string" then | |
96 base = first_port; | |
97 end | |
98 end | |
99 | |
100 local response_codes = { | |
101 ["200"] = "OK"; | |
102 ["202"] = "Accepted"; | |
103 ["400"] = "Bad Request"; | |
104 ["403"] = "Forbidden"; | |
105 ["404"] = "Not Found"; | |
106 ["500"] = "Internal Server Error"; | |
107 ["501"] = "Not Implemented"; | |
108 }; | |
109 | |
110 local function http_response(code, headers, body) | |
111 return { | |
112 status = (type(code) == "number" and code .. " " .. response_codes[tostring(code)]) or code; | |
113 headers = headers or {}; | |
114 body = body or "<h1>" .. response_codes[tostring(code)] .. "</h1>\n"; | |
115 }; | |
116 end | |
117 | |
118 local actor = module.host.."/"..module.name; | 74 local actor = module.host.."/"..module.name; |
119 | 75 |
120 function update_entry(item) | 76 function update_entry(item) |
121 local node = item.node; | 77 local node = item.node; |
122 --module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node) | 78 module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node) |
123 local feed = parse_feed(item.data); | 79 local feed = parse_feed(item.data); |
124 module:log("debug", "updating node %s", node); | |
125 for _, entry in ipairs(feed) do | 80 for _, entry in ipairs(feed) do |
126 entry.attr.xmlns = "http://www.w3.org/2005/Atom"; | 81 entry.attr.xmlns = xmlns_atom; |
127 | 82 |
128 local e_published = entry:get_child_text("published"); | 83 local e_published = entry:get_child_text("published"); |
129 e_published = e_published and dt_parse(e_published); | 84 e_published = e_published and dt_parse(e_published); |
130 local e_updated = entry:get_child_text("updated"); | 85 local e_updated = entry:get_child_text("updated"); |
131 e_updated = e_updated and dt_parse(e_updated); | 86 e_updated = e_updated and dt_parse(e_updated); |
136 local id = entry:get_child_text("id"); | 91 local id = entry:get_child_text("id"); |
137 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up | 92 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up |
138 local xitem = st.stanza("item", { id = id }):add_child(entry); | 93 local xitem = st.stanza("item", { id = id }):add_child(entry); |
139 -- TODO Put data from /feed into item/source | 94 -- TODO Put data from /feed into item/source |
140 | 95 |
141 module:log("debug", "publishing to %s, id %s", node, id); | 96 --module:log("debug", "publishing to %s, id %s", node, id); |
142 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); | 97 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); |
143 if not ok then | 98 if not ok then |
144 if err == "item-not-found" then -- try again | 99 if err == "item-not-found" then -- try again |
145 module:log("debug", "got item-not-found, creating %s and trying again", node); | 100 --module:log("debug", "got item-not-found, creating %s and trying again", node); |
146 local ok, err = modules.pubsub.service:create(node, actor); | 101 local ok, err = modules.pubsub.service:create(node, actor); |
147 if not ok then | 102 if not ok then |
148 module:log("error", "could not create node %s: %s", node, err); | 103 module:log("error", "could not create node %s: %s", node, err); |
149 return; | 104 return; |
150 end | 105 end |
159 end | 114 end |
160 end | 115 end |
161 end | 116 end |
162 | 117 |
163 if use_pubsubhubub and not item.subscription then | 118 if use_pubsubhubub and not item.subscription then |
164 module:log("debug", "check if %s has a hub", item.node); | 119 --module:log("debug", "check if %s has a hub", item.node); |
165 local hub = feed.links and feed.links.hub; | 120 local hub = feed.links and feed.links.hub; |
166 if hub then | 121 if hub then |
167 item.hub = hub; | 122 item.hub = hub; |
168 module:log("debug", "%s has a hub: %s", item.node, item.hub); | 123 module:log("debug", "%s has a hub: %s", item.node, item.hub); |
169 subscribe(item); | 124 subscribe(item); |
173 | 128 |
174 function fetch(item, callback) -- HTTP Pull | 129 function fetch(item, callback) -- HTTP Pull |
175 local headers = { }; | 130 local headers = { }; |
176 if item.data and item.last_update then | 131 if item.data and item.last_update then |
177 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update); | 132 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update); |
178 --COMPAT We could have saved 6 bytes here, but Microsoft apparently hates %T, so you got this gigantic comment instead. | |
179 end | 133 end |
180 http.request(item.url, { headers = headers }, function(data, code, req) | 134 http.request(item.url, { headers = headers }, function(data, code, req) |
181 if code == 200 then | 135 if code == 200 then |
182 item.data = data; | 136 item.data = data; |
183 if callback then callback(item) end | 137 if callback then callback(item) end |
184 item.last_update = time(); | 138 item.last_update = time(); |
185 end | 139 elseif code == 304 then |
186 if code == 304 then | |
187 item.last_update = time(); | 140 item.last_update = time(); |
188 end | 141 end |
189 end); | 142 end); |
190 end | 143 end |
191 | 144 |
192 function refresh_feeds() | 145 function refresh_feeds() |
193 local now = time(); | 146 local now = time(); |
194 if module_unloaded then return end | |
195 --module:log("debug", "Refreshing feeds"); | 147 --module:log("debug", "Refreshing feeds"); |
196 for node, item in pairs(feed_list) do | 148 for node, item in pairs(feed_list) do |
197 --FIXME Don't fetch feeds which have a subscription | 149 --FIXME Don't fetch feeds which have a subscription |
198 -- Otoho, what if the subscription expires or breaks? | 150 -- Otoho, what if the subscription expires or breaks? |
199 if item.last_update + refresh_interval < now then | 151 if item.last_update + refresh_interval < now then |
200 module:log("debug", "checking %s", item.node); | 152 --module:log("debug", "checking %s", item.node); |
201 fetch(item, update_entry); | 153 fetch(item, update_entry); |
202 end | 154 end |
203 end | 155 end |
204 return refresh_interval; | 156 return refresh_interval; |
205 end | 157 end |
206 | 158 |
207 local function format_url(secure, host, port, path, node) | 159 local function format_url(node) |
208 return ("%s://%s:%d/%s?node=%s"):format(secure and "https" or "http", host, port, path, urlencode(node)); | 160 return module:http_url(nil, "/callback") .. "?node=" .. urlencode(node); |
209 end | 161 end |
210 | 162 |
211 function subscribe(feed) | 163 function subscribe(feed) |
212 feed.token = uuid(); | 164 feed.token = uuid(); |
213 feed.secret = uuid(); | 165 feed.secret = uuid(); |
214 local body = formencode{ | 166 local body = formencode{ |
215 ["hub.callback"] = format_url(ssl, httphost, port, base, feed.node); | 167 ["hub.callback"] = format_url(feed.node); |
216 ["hub.mode"] = "subscribe"; --TODO unsubscribe | 168 ["hub.mode"] = "subscribe"; --TODO unsubscribe |
217 ["hub.topic"] = feed.url; | 169 ["hub.topic"] = feed.url; |
218 ["hub.verify"] = "async"; | 170 ["hub.verify"] = "async"; |
219 ["hub.verify_token"] = feed.token; | 171 ["hub.verify_token"] = feed.token; |
220 ["hub.secret"] = feed.secret; | 172 ["hub.secret"] = feed.secret; |
224 --module:log("debug", "subscription request, body: %s", body); | 176 --module:log("debug", "subscription request, body: %s", body); |
225 | 177 |
226 --FIXME The subscription states and related stuff | 178 --FIXME The subscription states and related stuff |
227 feed.subscription = "subscribe"; | 179 feed.subscription = "subscribe"; |
228 http.request(feed.hub, { body = body }, function(data, code, req) | 180 http.request(feed.hub, { body = body }, function(data, code, req) |
229 local code = tostring(code); | 181 module:log("debug", "subscription to %s submitted, status %s", feed.node, tostring(code)); |
230 module:log("debug", "subscription to %s submitted, status %s", feed.node, code); | 182 if code >= 400 then |
183 module:log("error", "There was something wrong with our subscription request, body: %s", tostring(data)); | |
184 feed.subscription = "failed"; | |
185 end | |
231 end); | 186 end); |
232 end | 187 end |
233 | 188 |
234 function handle_http_request(method, body, request) | 189 function handle_http_request(event) |
235 if module_unloaded then | 190 local request = event.request; |
236 module:log("warn", "Received a HTTP request after module unload"); | 191 local method = request.method; |
237 return http_response(500) | 192 local body = request.body; |
238 -- FIXME if this happens. | 193 |
239 end | |
240 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty"); | 194 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty"); |
241 local query = request.url.query or {}; | 195 local query = request.url.query or {}; --FIXME |
242 if query and type(query) == "string" then | 196 if query and type(query) == "string" then |
243 query = formdecode(query); | 197 query = formdecode(query); |
244 --module:log("debug", "GET data: %s", dump(query)); | 198 --module:log("debug", "GET data: %s", dump(query)); |
245 end | 199 end |
246 --module:log("debug", "Headers: %s", dump(request.headers)); | 200 --module:log("debug", "Headers: %s", dump(request.headers)); |
247 | 201 |
202 local feed = feed_list[query.node]; | |
248 if method == "GET" then | 203 if method == "GET" then |
249 if query.node and feed_list[query.node] then | 204 if query.node and feed then |
250 local feed = feed_list[query.node]; | |
251 if query["hub.topic"] ~= feed.url then | 205 if query["hub.topic"] ~= feed.url then |
252 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"])) | 206 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"])) |
253 return http_response(404) | 207 return 404 |
254 end | 208 end |
255 if query["hub.mode"] ~= feed.subscription then | 209 if query["hub.mode"] ~= feed.subscription then |
256 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"])) | 210 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"])) |
257 return http_response(400) | 211 return 400 |
258 -- Would this work for unsubscribe? | 212 -- Would this work for unsubscribe? |
259 -- Also, if feed.subscription is changed here, | 213 -- Also, if feed.subscription is changed here, |
260 -- it would probably invalidate the subscription | 214 -- it would probably invalidate the subscription |
261 -- when/if the hub asks if it should be renewed | 215 -- when/if the hub asks if it should be renewed |
262 end | 216 end |
263 if query["hub.verify_token"] ~= feed.token then | 217 if query["hub.verify_token"] ~= feed.token then |
264 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"])) | 218 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"])) |
265 return http_response(403) | 219 return 401 |
266 end | 220 end |
267 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url) | 221 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url) |
268 return http_response(200, nil, query["hub.challenge"]) | 222 return query["hub.challenge"]; |
269 end | 223 end |
270 return http_response(400); | 224 return 400; |
271 elseif method == "POST" then | 225 elseif method == "POST" then |
272 if #body > 0 and feed_list[query.node] then | 226 local body = request.body; |
227 if #body > 0 and feed then | |
273 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node); | 228 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node); |
274 local feed = feed_list[query.node]; | 229 local signature = request.headers.x_hub_signature; |
275 local signature = request.headers["x-hub-signature"]; | |
276 if feed.secret then | 230 if feed.secret then |
277 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true); | 231 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true); |
278 if localsig ~= signature then | 232 if localsig ~= signature then |
279 module:log("debug", "Invalid signature"); | 233 module:log("debug", "Invalid signature, got %s but wanted %s", tostring(signature), tostring(localsig)); |
280 return http_response(403); | 234 return 401; |
281 end | 235 end |
282 module:log("debug", "Valid signature"); | 236 module:log("debug", "Valid signature"); |
283 end | 237 end |
284 feed.data = body; | 238 feed.data = body; |
285 update_entry(feed); | 239 update_entry(feed); |
286 feed.last_update = time(); | 240 feed.last_update = time(); |
287 return http_response(202); | 241 return 202; |
288 end | 242 end |
289 return http_response(400); | 243 return 400; |
290 end | 244 end |
291 return http_response(501); | 245 return 501; |
292 end | 246 end |
293 | 247 |
294 function init() | 248 if use_pubsubhubub then |
295 module:log("debug", "initiating", module.name); | 249 module:provides("http", { |
296 if use_pubsubhubub then | 250 default_path = "/callback"; |
297 module:log("debug", "Starting http server on %s", format_url(ssl, httphost, port, base, "NODE")); | 251 route = { |
298 httpserver.new_from_config( ports, handle_http_request, { base = "callback" } ); | 252 GET = handle_http_request; |
299 end | 253 POST = handle_http_request; |
300 add_task(0, refresh_feeds); | 254 -- This all? |
301 end | 255 }; |
302 | 256 }); |
303 if prosody.start_time then -- already started | 257 end |
304 init(); | 258 |
305 else | 259 module:add_timer(1, refresh_feeds); |
306 module:hook_global("server-started", init); | |
307 end |