comparison mod_pubsub_feeds/mod_pubsub_feeds.lua @ 670:9d8efb804a00

mod_pubsub_feed: Rename to mod_pubsub_feeds
author Kim Alvefur <zash@zash.se>
date Wed, 23 May 2012 21:52:14 +0200
parents mod_pubsub_feed/mod_pubsub_feed.lua@343b115ebbea
children e79147fb39f9
comparison
equal deleted inserted replaced
669:dd7d30c175d4 670:9d8efb804a00
1 -- Fetches Atom feeds and publishes to PubSub nodes
2 --
3 -- Depends: http://code.matthewwild.co.uk/lua-feeds
4 --
5 -- Config:
6 -- Component "pubsub.example.com" "pubsub"
7 -- modules_enabled = {
8 -- "pubsub_feed";
9 -- }
10 -- feeds = { -- node -> url
11 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml";
12 -- }
13 -- feed_pull_interval = 20 -- minutes
14 --
15 -- Reference
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html
17
18 local modules = hosts[module.host].modules;
19 if not modules.pubsub or module:get_option("component_module") ~= "pubsub" then
20 module:log("warn", "Pubsub needs to be loaded on this host");
21 --module:log("debug", "component_module is %s", tostring(module:get_option("component_module")));
22 return
23 end
24
25 local date, time = os.date, os.time;
26 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime;
27 local uuid = require "util.uuid".generate;
28 local hmac_sha1 = require "util.hmac".sha1;
29 local parse_feed = require "feeds".feed_from_string;
30 local st = require "util.stanza";
31 --local dump = require"util.serialization".serialize;
32
33 local xmlns_atom = "http://www.w3.org/2005/Atom";
34
35 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true);
36 if use_pubsubhubub then
37 module:depends"http";
38 end
39
40 local http = require "net.http";
41 local formdecode = http.formdecode;
42 local formencode = http.formencode;
43 local urldecode = http.urldecode;
44 local urlencode = http.urlencode;
45
46 local feed_list = module:shared("feed_list");
47 local refresh_interval;
48
49 -- Dynamically reloadable config.
50 local function update_config()
51 local config = module:get_option("feeds") or {
52 planet_jabber = "http://planet.jabber.org/atom.xml";
53 prosody_blog = "http://blog.prosody.im/feed/atom.xml";
54 };
55 refresh_interval = module:get_option_number("feed_pull_interval", 15) * 60;
56 local new_feed_list = {};
57 for node, url in pairs(config) do
58 new_feed_list[node] = true;
59 if not feed_list[node] then
60 feed_list[node] = { url = url; node = node; last_update = 0 };
61 else
62 feed_list[node].url = url;
63 end
64 end
65 for node in pairs(feed_list) do
66 if not new_feed_list[node] then
67 feed_list[node] = nil;
68 end
69 end
70 end
71 update_config();
72 module:hook("config-reloaded", update_config);
73
74 local actor = module.host.."/"..module.name;
75
76 function update_entry(item)
77 local node = item.node;
78 module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node)
79 local feed = parse_feed(item.data);
80 for _, entry in ipairs(feed) do
81 entry.attr.xmlns = xmlns_atom;
82
83 local e_published = entry:get_child_text("published");
84 e_published = e_published and dt_parse(e_published);
85 local e_updated = entry:get_child_text("updated");
86 e_updated = e_updated and dt_parse(e_updated);
87
88 local timestamp = e_updated or e_published or nil;
89 --module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update));
90 if not timestamp or not item.last_update or timestamp > item.last_update then
91 local id = entry:get_child_text("id");
92 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up
93 local xitem = st.stanza("item", { id = id }):add_child(entry);
94 -- TODO Put data from /feed into item/source
95
96 --module:log("debug", "publishing to %s, id %s", node, id);
97 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
98 if not ok then
99 if err == "item-not-found" then -- try again
100 --module:log("debug", "got item-not-found, creating %s and trying again", node);
101 local ok, err = modules.pubsub.service:create(node, actor);
102 if not ok then
103 module:log("error", "could not create node %s: %s", node, err);
104 return;
105 end
106 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
107 if not ok then
108 module:log("error", "could not create or publish node %s: %s", node, err);
109 return
110 end
111 else
112 module:log("error", "publishing %s failed: %s", node, err);
113 end
114 end
115 end
116 end
117
118 if use_pubsubhubub and not item.subscription then
119 --module:log("debug", "check if %s has a hub", item.node);
120 local hub = feed.links and feed.links.hub;
121 if hub then
122 item.hub = hub;
123 module:log("debug", "%s has a hub: %s", item.node, item.hub);
124 subscribe(item);
125 end
126 end
127 end
128
129 function fetch(item, callback) -- HTTP Pull
130 local headers = { };
131 if item.data and item.last_update then
132 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update);
133 end
134 http.request(item.url, { headers = headers }, function(data, code, req)
135 if code == 200 then
136 item.data = data;
137 if callback then callback(item) end
138 item.last_update = time();
139 elseif code == 304 then
140 item.last_update = time();
141 end
142 end);
143 end
144
145 function refresh_feeds()
146 local now = time();
147 --module:log("debug", "Refreshing feeds");
148 for node, item in pairs(feed_list) do
149 --FIXME Don't fetch feeds which have a subscription
150 -- Otoho, what if the subscription expires or breaks?
151 if item.last_update + refresh_interval < now then
152 --module:log("debug", "checking %s", item.node);
153 fetch(item, update_entry);
154 end
155 end
156 return refresh_interval;
157 end
158
159 local function format_url(node)
160 return module:http_url(nil, "/callback") .. "?node=" .. urlencode(node);
161 end
162
163 function subscribe(feed)
164 feed.token = uuid();
165 feed.secret = uuid();
166 local body = formencode{
167 ["hub.callback"] = format_url(feed.node);
168 ["hub.mode"] = "subscribe"; --TODO unsubscribe
169 ["hub.topic"] = feed.url;
170 ["hub.verify"] = "async";
171 ["hub.verify_token"] = feed.token;
172 ["hub.secret"] = feed.secret;
173 --["hub.lease_seconds"] = "";
174 };
175
176 --module:log("debug", "subscription request, body: %s", body);
177
178 --FIXME The subscription states and related stuff
179 feed.subscription = "subscribe";
180 http.request(feed.hub, { body = body }, function(data, code, req)
181 module:log("debug", "subscription to %s submitted, status %s", feed.node, tostring(code));
182 if code >= 400 then
183 module:log("error", "There was something wrong with our subscription request, body: %s", tostring(data));
184 feed.subscription = "failed";
185 end
186 end);
187 end
188
189 function handle_http_request(event)
190 local request = event.request;
191 local method = request.method;
192 local body = request.body;
193
194 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty");
195 local query = request.url.query or {}; --FIXME
196 if query and type(query) == "string" then
197 query = formdecode(query);
198 --module:log("debug", "GET data: %s", dump(query));
199 end
200 --module:log("debug", "Headers: %s", dump(request.headers));
201
202 local feed = feed_list[query.node];
203 if method == "GET" then
204 if query.node and feed then
205 if query["hub.topic"] ~= feed.url then
206 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"]))
207 return 404
208 end
209 if query["hub.mode"] ~= feed.subscription then
210 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"]))
211 return 400
212 -- Would this work for unsubscribe?
213 -- Also, if feed.subscription is changed here,
214 -- it would probably invalidate the subscription
215 -- when/if the hub asks if it should be renewed
216 end
217 if query["hub.verify_token"] ~= feed.token then
218 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"]))
219 return 401
220 end
221 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url)
222 return query["hub.challenge"];
223 end
224 return 400;
225 elseif method == "POST" then
226 local body = request.body;
227 if #body > 0 and feed then
228 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node);
229 local signature = request.headers.x_hub_signature;
230 if feed.secret then
231 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true);
232 if localsig ~= signature then
233 module:log("debug", "Invalid signature, got %s but wanted %s", tostring(signature), tostring(localsig));
234 return 401;
235 end
236 module:log("debug", "Valid signature");
237 end
238 feed.data = body;
239 update_entry(feed);
240 feed.last_update = time();
241 return 202;
242 end
243 return 400;
244 end
245 return 501;
246 end
247
248 if use_pubsubhubub then
249 module:provides("http", {
250 default_path = "/callback";
251 route = {
252 GET = handle_http_request;
253 POST = handle_http_request;
254 -- This all?
255 };
256 });
257 end
258
259 module:add_timer(1, refresh_feeds);