Mercurial > prosody-modules
comparison mod_pubsub_feeds/mod_pubsub_feeds.lua @ 670:9d8efb804a00
mod_pubsub_feed: Rename to mod_pubsub_feeds
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Wed, 23 May 2012 21:52:14 +0200 |
parents | mod_pubsub_feed/mod_pubsub_feed.lua@343b115ebbea |
children | e79147fb39f9 |
comparison
equal
deleted
inserted
replaced
669:dd7d30c175d4 | 670:9d8efb804a00 |
---|---|
1 -- Fetches Atom feeds and publishes to PubSub nodes | |
2 -- | |
3 -- Depends: http://code.matthewwild.co.uk/lua-feeds | |
4 -- | |
5 -- Config: | |
6 -- Component "pubsub.example.com" "pubsub" | |
7 -- modules_enabled = { | |
8 -- "pubsub_feed"; | |
9 -- } | |
10 -- feeds = { -- node -> url | |
11 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml"; | |
12 -- } | |
13 -- feed_pull_interval = 20 -- minutes | |
14 -- | |
15 -- Reference | |
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html | |
17 | |
18 local modules = hosts[module.host].modules; | |
19 if not modules.pubsub or module:get_option("component_module") ~= "pubsub" then | |
20 module:log("warn", "Pubsub needs to be loaded on this host"); | |
21 --module:log("debug", "component_module is %s", tostring(module:get_option("component_module"))); | |
22 return | |
23 end | |
24 | |
25 local date, time = os.date, os.time; | |
26 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; | |
27 local uuid = require "util.uuid".generate; | |
28 local hmac_sha1 = require "util.hmac".sha1; | |
29 local parse_feed = require "feeds".feed_from_string; | |
30 local st = require "util.stanza"; | |
31 --local dump = require"util.serialization".serialize; | |
32 | |
33 local xmlns_atom = "http://www.w3.org/2005/Atom"; | |
34 | |
35 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true); | |
36 if use_pubsubhubub then | |
37 module:depends"http"; | |
38 end | |
39 | |
40 local http = require "net.http"; | |
41 local formdecode = http.formdecode; | |
42 local formencode = http.formencode; | |
43 local urldecode = http.urldecode; | |
44 local urlencode = http.urlencode; | |
45 | |
46 local feed_list = module:shared("feed_list"); | |
47 local refresh_interval; | |
48 | |
49 -- Dynamically reloadable config. | |
50 local function update_config() | |
51 local config = module:get_option("feeds") or { | |
52 planet_jabber = "http://planet.jabber.org/atom.xml"; | |
53 prosody_blog = "http://blog.prosody.im/feed/atom.xml"; | |
54 }; | |
55 refresh_interval = module:get_option_number("feed_pull_interval", 15) * 60; | |
56 local new_feed_list = {}; | |
57 for node, url in pairs(config) do | |
58 new_feed_list[node] = true; | |
59 if not feed_list[node] then | |
60 feed_list[node] = { url = url; node = node; last_update = 0 }; | |
61 else | |
62 feed_list[node].url = url; | |
63 end | |
64 end | |
65 for node in pairs(feed_list) do | |
66 if not new_feed_list[node] then | |
67 feed_list[node] = nil; | |
68 end | |
69 end | |
70 end | |
71 update_config(); | |
72 module:hook("config-reloaded", update_config); | |
73 | |
74 local actor = module.host.."/"..module.name; | |
75 | |
76 function update_entry(item) | |
77 local node = item.node; | |
78 module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node) | |
79 local feed = parse_feed(item.data); | |
80 for _, entry in ipairs(feed) do | |
81 entry.attr.xmlns = xmlns_atom; | |
82 | |
83 local e_published = entry:get_child_text("published"); | |
84 e_published = e_published and dt_parse(e_published); | |
85 local e_updated = entry:get_child_text("updated"); | |
86 e_updated = e_updated and dt_parse(e_updated); | |
87 | |
88 local timestamp = e_updated or e_published or nil; | |
89 --module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update)); | |
90 if not timestamp or not item.last_update or timestamp > item.last_update then | |
91 local id = entry:get_child_text("id"); | |
92 id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up | |
93 local xitem = st.stanza("item", { id = id }):add_child(entry); | |
94 -- TODO Put data from /feed into item/source | |
95 | |
96 --module:log("debug", "publishing to %s, id %s", node, id); | |
97 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); | |
98 if not ok then | |
99 if err == "item-not-found" then -- try again | |
100 --module:log("debug", "got item-not-found, creating %s and trying again", node); | |
101 local ok, err = modules.pubsub.service:create(node, actor); | |
102 if not ok then | |
103 module:log("error", "could not create node %s: %s", node, err); | |
104 return; | |
105 end | |
106 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem); | |
107 if not ok then | |
108 module:log("error", "could not create or publish node %s: %s", node, err); | |
109 return | |
110 end | |
111 else | |
112 module:log("error", "publishing %s failed: %s", node, err); | |
113 end | |
114 end | |
115 end | |
116 end | |
117 | |
118 if use_pubsubhubub and not item.subscription then | |
119 --module:log("debug", "check if %s has a hub", item.node); | |
120 local hub = feed.links and feed.links.hub; | |
121 if hub then | |
122 item.hub = hub; | |
123 module:log("debug", "%s has a hub: %s", item.node, item.hub); | |
124 subscribe(item); | |
125 end | |
126 end | |
127 end | |
128 | |
129 function fetch(item, callback) -- HTTP Pull | |
130 local headers = { }; | |
131 if item.data and item.last_update then | |
132 headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update); | |
133 end | |
134 http.request(item.url, { headers = headers }, function(data, code, req) | |
135 if code == 200 then | |
136 item.data = data; | |
137 if callback then callback(item) end | |
138 item.last_update = time(); | |
139 elseif code == 304 then | |
140 item.last_update = time(); | |
141 end | |
142 end); | |
143 end | |
144 | |
145 function refresh_feeds() | |
146 local now = time(); | |
147 --module:log("debug", "Refreshing feeds"); | |
148 for node, item in pairs(feed_list) do | |
149 --FIXME Don't fetch feeds which have a subscription | |
150 -- Otoho, what if the subscription expires or breaks? | |
151 if item.last_update + refresh_interval < now then | |
152 --module:log("debug", "checking %s", item.node); | |
153 fetch(item, update_entry); | |
154 end | |
155 end | |
156 return refresh_interval; | |
157 end | |
158 | |
159 local function format_url(node) | |
160 return module:http_url(nil, "/callback") .. "?node=" .. urlencode(node); | |
161 end | |
162 | |
163 function subscribe(feed) | |
164 feed.token = uuid(); | |
165 feed.secret = uuid(); | |
166 local body = formencode{ | |
167 ["hub.callback"] = format_url(feed.node); | |
168 ["hub.mode"] = "subscribe"; --TODO unsubscribe | |
169 ["hub.topic"] = feed.url; | |
170 ["hub.verify"] = "async"; | |
171 ["hub.verify_token"] = feed.token; | |
172 ["hub.secret"] = feed.secret; | |
173 --["hub.lease_seconds"] = ""; | |
174 }; | |
175 | |
176 --module:log("debug", "subscription request, body: %s", body); | |
177 | |
178 --FIXME The subscription states and related stuff | |
179 feed.subscription = "subscribe"; | |
180 http.request(feed.hub, { body = body }, function(data, code, req) | |
181 module:log("debug", "subscription to %s submitted, status %s", feed.node, tostring(code)); | |
182 if code >= 400 then | |
183 module:log("error", "There was something wrong with our subscription request, body: %s", tostring(data)); | |
184 feed.subscription = "failed"; | |
185 end | |
186 end); | |
187 end | |
188 | |
189 function handle_http_request(event) | |
190 local request = event.request; | |
191 local method = request.method; | |
192 local body = request.body; | |
193 | |
194 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty"); | |
195 local query = request.url.query or {}; --FIXME | |
196 if query and type(query) == "string" then | |
197 query = formdecode(query); | |
198 --module:log("debug", "GET data: %s", dump(query)); | |
199 end | |
200 --module:log("debug", "Headers: %s", dump(request.headers)); | |
201 | |
202 local feed = feed_list[query.node]; | |
203 if method == "GET" then | |
204 if query.node and feed then | |
205 if query["hub.topic"] ~= feed.url then | |
206 module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"])) | |
207 return 404 | |
208 end | |
209 if query["hub.mode"] ~= feed.subscription then | |
210 module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"])) | |
211 return 400 | |
212 -- Would this work for unsubscribe? | |
213 -- Also, if feed.subscription is changed here, | |
214 -- it would probably invalidate the subscription | |
215 -- when/if the hub asks if it should be renewed | |
216 end | |
217 if query["hub.verify_token"] ~= feed.token then | |
218 module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"])) | |
219 return 401 | |
220 end | |
221 module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url) | |
222 return query["hub.challenge"]; | |
223 end | |
224 return 400; | |
225 elseif method == "POST" then | |
226 local body = request.body; | |
227 if #body > 0 and feed then | |
228 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node); | |
229 local signature = request.headers.x_hub_signature; | |
230 if feed.secret then | |
231 local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true); | |
232 if localsig ~= signature then | |
233 module:log("debug", "Invalid signature, got %s but wanted %s", tostring(signature), tostring(localsig)); | |
234 return 401; | |
235 end | |
236 module:log("debug", "Valid signature"); | |
237 end | |
238 feed.data = body; | |
239 update_entry(feed); | |
240 feed.last_update = time(); | |
241 return 202; | |
242 end | |
243 return 400; | |
244 end | |
245 return 501; | |
246 end | |
247 | |
248 if use_pubsubhubub then | |
249 module:provides("http", { | |
250 default_path = "/callback"; | |
251 route = { | |
252 GET = handle_http_request; | |
253 POST = handle_http_request; | |
254 -- This all? | |
255 }; | |
256 }); | |
257 end | |
258 | |
259 module:add_timer(1, refresh_feeds); |