Mercurial > prosody-modules
comparison mod_pubsub_feed/mod_pubsub_feed.lua @ 322:637dc0a04052
mod_pubsub_feed: Implement PubSubHubbub subscriber
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 31 Jan 2011 03:37:16 +0100 |
parents | b81e4f86a231 |
children | 433bf7dc3e7a |
comparison
equal
deleted
inserted
replaced
321:661f64627fed | 322:637dc0a04052 |
---|---|
1 -- Fetches Atom feeds and publishes to PubSub nodes | 1 -- Fetches Atom feeds and publishes to PubSub nodes |
2 -- | |
3 -- Depends: http://code.matthewwild.co.uk/lua-feeds | |
2 -- | 4 -- |
3 -- Config: | 5 -- Config: |
4 -- Component "pubsub.example.com" "pubsub" | 6 -- Component "pubsub.example.com" "pubsub" |
5 -- modules_enabled = { | 7 -- modules_enabled = { |
6 -- "pubsub_feed"; | 8 -- "pubsub_feed"; |
7 -- } | 9 -- } |
8 -- feeds = { -- node -> url | 10 -- feeds = { -- node -> url |
9 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml"; | 11 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml"; |
10 -- } | 12 -- } |
11 -- feed_pull_interval = 20 -- minutes | 13 -- feed_pull_interval = 20 -- minutes |
14 -- | |
15 -- Reference | |
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html | |
12 | 17 |
13 local modules = hosts[module.host].modules; | 18 local modules = hosts[module.host].modules; |
14 if not modules.pubsub then | 19 if not modules.pubsub then |
20 --FIXME Should this throw an error() instead? | |
15 module:log("warn", "Pubsub needs to be loaded on this host"); | 21 module:log("warn", "Pubsub needs to be loaded on this host"); |
16 end | 22 end |
23 | |
24 | |
25 local t_insert = table.insert; | |
17 local add_task = require "util.timer".add_task; | 26 local add_task = require "util.timer".add_task; |
18 local date, time = os.date, os.time; | 27 local date, time = os.date, os.time; |
19 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; | 28 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; |
20 local http = require "net.http"; | 29 local http = require "net.http"; |
21 local parse_feed = require "feeds".feed_from_string; | 30 local parse_feed = require "feeds".feed_from_string; |
22 local st = require "util.stanza"; | 31 local st = require "util.stanza"; |
32 local httpserver = require "net.httpserver"; | |
33 local formencode = require "net.http".formencode; | |
34 local dump = require "util.serialization".serialize; | |
35 | |
36 local urldecode = require "net.http".urldecode; | |
37 local urlencode = require "net.http".urlencode; | |
38 local urlparams = --require "net.http".getQueryParams or whatever MattJ names it, FIXME | |
39 function(s) | |
40 if not s:match("=") then return urldecode(s); end | |
41 local r = {} | |
42 s:gsub("([^=&]*)=([^&]*)", function(k,v) | |
43 r[ urldecode(k) ] = urldecode(v); | |
44 return nil | |
45 end) | |
46 return r | |
47 end; | |
23 | 48 |
24 local config = module:get_option("feeds") or { | 49 local config = module:get_option("feeds") or { |
25 planet_jabber = "http://planet.jabber.org/atom.xml"; | 50 planet_jabber = "http://planet.jabber.org/atom.xml"; |
26 prosody_blog = "http://blog.prosody.im/feed/atom.xml"; | 51 prosody_blog = "http://blog.prosody.im/feed/atom.xml"; |
27 }; | 52 }; |
28 local refresh_interval = (module:get_option("feed_pull_interval") or 15) * 60; | 53 local refresh_interval = module:get_option_number("feed_pull_interval", 15) * 60; |
54 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true); -- HTTP by default or not? | |
29 local feed_list = { } | 55 local feed_list = { } |
30 for node, url in pairs(config) do | 56 for node, url in pairs(config) do |
31 feed_list[node] = { url = url }; | 57 feed_list[node] = { url = url; node = node; last_update = 0 }; |
32 end | 58 end |
33 | 59 |
34 local function update(item, callback) | 60 local response_codes = { |
61 ["202"] = "Accepted"; | |
62 ["400"] = "Bad Request"; | |
63 ["501"] = "Not Implemented"; | |
64 }; | |
65 | |
66 local function http_response(code, headers, body) | |
67 return { | |
68 status = (type(code) == "number" and code .. " " .. response_codes[tostring(code)]) or code; | |
69 headers = headers or {}; | |
70 body = body or "<h1>" .. response_codes[tostring(code)] .. "</h1>\n"; | |
71 }; | |
72 end | |
73 | |
74 local actor = module.host.."/"..module.name; | |
75 | |
76 function update_entry(item) | |
77 local node = item.node; | |
78 --module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node) | |
79 local feed = parse_feed(item.data); | |
80 module:log("debug", "updating node %s", node); | |
81 for _, entry in ipairs(feed) do | |
82 entry.attr.xmlns = "http://www.w3.org/2005/Atom"; | |
83 | |
84 local e_published = entry:get_child("published"); | |
85 e_published = e_published and e_published:get_text(); | |
86 e_published = e_published and dt_parse(e_published); | |
87 local e_updated = entry:get_child("updated"); | |
88 e_updated = e_updated and e_updated:get_text(); | |
89 e_updated = e_updated and dt_parse(e_updated); | |
90 | |
91 local timestamp = e_updated or e_published or nil; | |
92 --module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update)); | |
93 if not timestamp or not item.last_update or timestamp > item.last_update then | |
94 local id = entry:get_child("id"); | |
95 id = id and id:get_text() or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up | |
96 local xitem = st.stanza("item", { id = id }):add_child(entry); | |
97 -- TODO Put data from /feed into item/source | |
98 | |
99 module:log("debug", "publishing to %s, id %s", node, id); | |
100 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); | |
101 if not ok then | |
102 if err == "item-not-found" then -- try again | |
103 module:log("debug", "got item-not-found, creating %s and trying again", node); | |
104 local ok, err = modules.pubsub.service:create(node, actor); | |
105 if not ok then | |
106 module:log("error", "could not create node %s: %s", node, err); | |
107 return; | |
108 end | |
109 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); | |
110 if not ok then | |
111 module:log("error", "could not create or publish node %s: %s", node, err); | |
112 return | |
113 end | |
114 else | |
115 module:log("error", "publishing %s failed: %s", node, err); | |
116 end | |
117 end | |
118 end | |
119 end | |
120 | |
121 if use_pubsubhubub and not item.subscription then | |
122 module:log("debug", "check if %s has a hub", item.node); | |
123 local hub = feed.links and feed.links.hub; | |
124 if hub then | |
125 item.hub = hub; | |
126 module:log("debug", "%s has a hub: %s", item.node, item.hub); | |
127 subscribe(item); | |
128 end | |
129 end | |
130 end | |
131 | |
132 function fetch(item, callback) -- HTTP Pull | |
35 local headers = { }; | 133 local headers = { }; |
36 if item.data and item.last_update then | 134 if item.data and item.last_update then |
37 headers["If-Modified-Since"] = date("!%a, %d %b %Y %T %Z", item.last_update); | 135 headers["If-Modified-Since"] = date("!%a, %d %b %Y %T %Z", item.last_update); |
38 end | 136 end |
39 http.request(item.url, {headers = headers}, function(data, code, req) | 137 http.request(item.url, { headers = headers }, function(data, code, req) |
40 if code == 200 then | 138 if code == 200 then |
41 item.data = data; | 139 item.data = data; |
42 callback(item) | 140 if callback then callback(item) end |
43 item.last_update = time(); | 141 item.last_update = time(); |
44 end | 142 end |
45 if code == 304 then | 143 if code == 304 then |
46 item.last_update = time(); | 144 item.last_update = time(); |
47 end | 145 end |
48 end); | 146 end); |
49 end | 147 end |
50 | 148 |
51 local actor = module.host.."/"..module.name; | 149 function refresh_feeds() |
52 | 150 --module:log("debug", "Refreshing feeds"); |
53 local function refresh_feeds() | |
54 for node, item in pairs(feed_list) do | 151 for node, item in pairs(feed_list) do |
55 update(item, function(item) | 152 --FIXME Don't fetch feeds which have a subscription |
56 local feed = parse_feed(item.data); | 153 -- Otoho, what if the subscription expires or breaks? |
57 module:log("debug", "node: %s", node); | 154 if item.last_update + refresh_interval < time() then |
58 for _, entry in ipairs(feed) do | 155 module:log("debug", "checking %s", item.node); |
59 entry.attr.xmlns = "http://www.w3.org/2005/Atom"; | 156 fetch(item, update_entry); |
60 | 157 end |
61 local e_published = entry:get_child("published"); | 158 end |
62 e_published = e_published and e_published[1]; | 159 return refresh_interval; |
63 e_published = e_published and dt_parse(e_published); | 160 end |
64 local e_updated = entry:get_child("updated"); | 161 |
65 e_updated = e_updated and e_updated[1]; | 162 function subscribe(feed, challenge) |
66 e_updated = e_updated and dt_parse(e_updated); | 163 local _body, body = { |
67 | 164 ["hub.callback"] = "http://"..module.host..":5280/callback?node=" .. urlencode(feed.node); --FIXME figure out your own hostname reliably? |
68 local timestamp = e_updated or e_published or nil; | 165 ["hub.mode"] = "subscribe"; --TODO unsubscribe |
69 module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update)); | 166 ["hub.topic"] = feed.url; |
70 if not timestamp or not item.last_update or timestamp > item.last_update then | 167 ["hub.verify"] = "async"; |
71 local id = entry:get_child("id"); | 168 ["hub.verify_token"] = challenge; |
72 id = id[1] or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up | 169 --["hub.lease_seconds"] = ""; |
73 local item = st.stanza("item", { id = id }):add_child(entry); | 170 }, { }; |
74 | 171 for name, value in pairs(_body) do |
75 module:log("debug", "publishing to %s, id %s", node, id); | 172 t_insert(body, { name = name, value = value }); |
76 local ok, err = modules.pubsub.service:publish(node, actor, id, item); | 173 end --FIXME Why do I have to do this? |
77 if not ok then | 174 body = formencode(body); |
78 if err == "item-not-found" then -- try again | 175 |
79 module:log("debug", "got item-not-found, creating %s and trying again", node); | 176 --module:log("debug", "subscription request, body: %s", body); |
80 local ok, err = modules.pubsub.service:create(node, actor); | 177 |
81 if not ok then | 178 --FIXME The subscription states and related stuff |
82 module:log("error", "could not create node: %s", err); | 179 --feed.subscription = challenge and "asked" or "asking"; |
83 return; | 180 feed.subscription = "asking"; |
84 end | 181 http.request(feed.hub, { body = body }, function(data, code, req) |
85 local ok, err = modules.pubsub.service:publish(node, actor, id, item); | 182 local code = tostring(code); |
86 if not ok then | 183 module:log("debug", "subscription to %s submitted, staus %s", feed.node, code); |
87 module:log("error", "still could not create node: %s", err); | 184 if code == '202' then |
88 return | 185 if challenge then |
89 end | 186 module:log("debug", "subscribe to %s confirmed", feed.node); |
90 else | 187 feed.subscription = "active"; |
91 module:log("error", "publish failed: %s", err); | 188 else |
92 end | 189 module:log("debug", "subscription to %s submitted", feed.node); |
93 end | 190 --feed.subscription = "incomplete"; |
94 end | |
95 end | 191 end |
96 end); | 192 end |
97 end | 193 end); |
98 return refresh_interval; | 194 end |
195 | |
196 function handle_http_request(method, body, request) | |
197 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty"); | |
198 local query = request.url.query; | |
199 if query and type(query) == "string" then | |
200 query = urlparams(query); | |
201 --module:log("debug", "GET data: %s", dump(query)); | |
202 end | |
203 | |
204 -- TODO http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#authednotify | |
205 | |
206 if method == "GET" then | |
207 if query.node and feed_list[query.node] then | |
208 local feed = feed_list[query.node]; | |
209 local challenge = query["hub.challenge"]; | |
210 if challenge and feed.subscription == "asking" then | |
211 module:log("debug", "got a challenge for %s: %s", feed.node, challenge); | |
212 subscribe(feed, challenge); | |
213 return http_response(202); | |
214 end | |
215 end | |
216 return http_response(400); | |
217 elseif method == "POST" then | |
218 if #body > 0 and feed_list[query.node] then | |
219 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node); | |
220 local feed = feed_list[query.node]; | |
221 feed.data = body; | |
222 update_entry(feed); | |
223 feed.last_update = time(); | |
224 return http_response(202); | |
225 end | |
226 return http_response(400); | |
227 end | |
228 return http_response(501); | |
99 end | 229 end |
100 | 230 |
101 function init() | 231 function init() |
232 module:log("debug", "initiating", module.name); | |
233 if use_pubsubhubub then | |
234 httpserver.new{ port = 5280, base = "callback", handler = handle_http_request } | |
235 end | |
102 add_task(0, refresh_feeds); | 236 add_task(0, refresh_feeds); |
103 end | 237 end |
104 | 238 |
105 if prosody.start_time then -- already started | 239 if prosody.start_time then -- already started |
106 init(); | 240 init(); |
107 else | 241 else |
108 prosody.events.add_handler("server-started", init); | 242 prosody.events.add_handler("server-started", init); |
109 end | 243 end |
110 |