comparison mod_pubsub_hub/mod_pubsub_hub.lua @ 1458:e9d164e694e7

mod_pubsub_hub: Update to PubSubHubbub version 0.4
author Kim Alvefur <zash@zash.se>
date Thu, 26 Jun 2014 19:41:32 +0200
parents b21236b6b8d8
children 4cec8b7aed6d
comparison
equal deleted inserted replaced
1457:720ff25d94e6 1458:e9d164e694e7
3 -- This file is MIT/X11 licensed. 3 -- This file is MIT/X11 licensed.
4 4
5 local http = require "net.http"; 5 local http = require "net.http";
6 local formdecode = http.formdecode; 6 local formdecode = http.formdecode;
7 local formencode = http.formencode; 7 local formencode = http.formencode;
8 local http_request = http.request;
8 local uuid = require "util.uuid".generate; 9 local uuid = require "util.uuid".generate;
9 local hmac_sha1 = require "util.hmac".sha1; 10 local hmac_sha1 = require "util.hmac".sha1;
10 local json_encode = require "util.json".encode; 11 local json_encode = require "util.json".encode;
11 local time = os.time; 12 local time = os.time;
12 local m_min, m_max = math.min, math.max; 13 local m_min, m_max = math.min, math.max;
13 local tostring = tostring; 14 local tostring = tostring;
15
14 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; 16 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
15 local xmlns_pubsub_event = xmlns_pubsub .. "#event"; 17 local xmlns_pubsub_event = xmlns_pubsub .. "#event";
16 local subs_by_topic = module:shared"subscriptions"; 18 local subs_by_topic = module:shared"subscriptions";
17 19
18 local max_lease, min_lease, default_lease = 86400, 600, 3600; 20 local max_lease, min_lease, default_lease = 86400, 600, 3600;
52 local topic = body["hub.topic"]; 54 local topic = body["hub.topic"];
53 local lease_seconds = m_max(min_lease, m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease)); 55 local lease_seconds = m_max(min_lease, m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease));
54 local secret = body["hub.secret"]; 56 local secret = body["hub.secret"];
55 local verify_token = body["hub.verify_token"]; 57 local verify_token = body["hub.verify_token"];
56 58
57 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic); 59 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), tostring(topic));
58 60
59 if not subs_by_topic[topic] then 61 if not subs_by_topic[topic] then
60 subs_by_topic[topic] = {}; 62 subs_by_topic[topic] = {};
61 end 63 end
62 local subscription = subs_by_topic[topic][callback]; 64 local subscription = subs_by_topic[topic][callback];
73 callback = callback, 75 callback = callback,
74 topic = topic, 76 topic = topic,
75 state = "unsubscribed", 77 state = "unsubscribed",
76 secret = secret, 78 secret = secret,
77 want_state = mode, 79 want_state = mode,
78 lease_seconds = lease_seconds,
79 expires = time() + lease_seconds,
80 }; 80 };
81 subscription.lease_seconds = lease_seconds;
82 subscription.expires = time() + lease_seconds;
81 subs_by_topic[topic][callback] = subscription; 83 subs_by_topic[topic][callback] = subscription;
82 local challenge = uuid(); 84 local challenge = uuid();
83 85
84 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ 86 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
85 ["hub.mode"] = mode, 87 ["hub.mode"] = mode,
86 ["hub.topic"] = topic, 88 ["hub.topic"] = topic,
87 ["hub.challenge"] = challenge, 89 ["hub.challenge"] = challenge,
88 ["hub.lease_seconds"] = tostring(lease_seconds), 90 ["hub.lease_seconds"] = tostring(lease_seconds),
89 ["hub.verify_token"] = verify_token, 91 ["hub.verify_token"] = verify_token, -- COMPAT draft version 0.3
90 } 92 }
91 module:log("debug", require"util.serialization".serialize(verify_modes)); 93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription));
92 if verify_modes["async"] then 94 http_request(callback_url, nil, function(body, code)
93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription)); 95 if body == challenge and code > 199 and code < 300 then
94 http.request(callback_url, nil, function(body, code) 96 if not subscription.want_state then
95 if body == challenge and code > 199 and code < 300 then 97 module:log("warn", "Verification of already verified request, probably");
96 if not subscription.want_state then 98 return;
97 module:log("warn", "Verification of already verified request, probably");
98 return;
99 end
100 subscription.state = subscription.want_state .. "d";
101 subscription.want_state = nil;
102 module:log("debug", "calling do_subscribe()");
103 do_subscribe(subscription);
104 subs_by_topic[topic][callback] = subscription;
105 else
106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body));
107 subs_by_topic[topic][callback] = subscription;
108 end 99 end
109 end) 100 subscription.state = subscription.want_state .. "d";
110 return 202; 101 subscription.want_state = nil;
111 elseif verify_modes["sync"] then 102 module:log("debug", "calling do_subscribe()");
112 http.request(callback_url, nil, function(body, code) 103 do_subscribe(subscription);
113 if body == challenge and code > 199 and code < 300 then 104 subs_by_topic[topic][callback] = subscription;
114 if not subscription.want_state then 105 else
115 module:log("warn", "Verification of already verified request, probably"); 106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body));
116 return; 107 subs_by_topic[topic][callback] = subscription;
117 end 108 end
118 if mode == "unsubscribe" then 109 end)
119 subs_by_topic[topic][callback] = nil; 110 return 202;
120 else
121 subscription.state = subscription.want_state .. "d";
122 subscription.want_state = nil;
123 module:log("debug", "calling do_subscribe()");
124 do_subscribe(subscription);
125 subs_by_topic[topic][callback] = subscription;
126 end
127 else
128 subs_by_topic[topic][callback] = subscription;
129 end
130 response.status = 204;
131 response:send();
132 end)
133 return true;
134 end
135 return 400;
136 else 111 else
137 response.status = 400; 112 response.status = 400;
138 response.headers.content_type = "text/html"; 113 response.headers.content_type = "text/html";
139 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n" 114 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n"
140 end 115 end
141 end 116 end
142 end 117 end
143 118
144 local function periodic() 119 local function periodic(now)
145 local now = time();
146 local next_check = now + max_lease; 120 local next_check = now + max_lease;
147 local purge = false; 121 local purge = false;
148 for topic, callbacks in pairs(subs_by_topic) do 122 for topic, callbacks in pairs(subs_by_topic) do
149 for callback, subscription in pairs(callbacks) do 123 for callback, subscription in pairs(callbacks) do
150 if subscription.mode == "subscribed" then 124 if subscription.mode == "subscribed" then
151 if subscription.expires < now then 125 if subscription.expires < now then
152 -- Subscription has expired, drop it. 126 -- Subscription has expired, drop it.
153 purge = true; 127 purge = true;
154 elseif subscription.expires < now + min_lease then
155 -- Subscription set to expire soon, re-confirm it.
156 local challenge = uuid();
157 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{
158 ["hub.mode"] = subscription.state,
159 ["hub.topic"] = topic,
160 ["hub.challenge"] = challenge,
161 ["hub.lease_seconds"] = subscription.lease_seconds,
162 ["hub.verify_token"] = subscription.verify_token,
163 }
164 http.request(callback_url, nil, function(body, code)
165 if body == challenge and code > 199 and code < 300 then
166 subscription.expires = now + subscription.lease_seconds;
167 end
168 end);
169 else 128 else
170 next_check = m_min(next_check, subscription.expires); 129 next_check = m_min(next_check, subscription.expires);
171 end 130 end
172 end 131 end
173 end 132 end
199 ["Content-Type"] = "application/xml", 158 ["Content-Type"] = "application/xml",
200 }; 159 };
201 if subscription.secret then 160 if subscription.secret then
202 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true); 161 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true);
203 end 162 end
204 http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code) 163 http_request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code)
205 if code >= 200 and code <= 299 then 164 if code >= 200 and code <= 299 then
206 module:log("debug", "Delivered"); 165 module:log("debug", "Delivered");
207 else 166 else
208 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback)); 167 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback));
209 -- TODO Retry 168 -- TODO Retry