comparison mod_storage_xmlarchive/mod_storage_xmlarchive.lua @ 1690:8c0fbc685364

mod_storage_xmlarchive: New stanza archive-only storage module backed by plain text files
author Kim Alvefur <zash@zash.se>
date Sun, 03 May 2015 13:45:42 +0200
parents
children 160c35d2a5a2
comparison
equal deleted inserted replaced
1689:06f9ab0c078c 1690:8c0fbc685364
1 local dm = require "core.storagemanager".olddm;
2 local hmac_sha256 = require"util.hashes".hmac_sha256;
3 local st = require"util.stanza";
4 local dt = require"util.datetime";
5 local new_stream = require "util.xmppstream".new;
6 local empty = {};
7
8 local function fallocate(f, offset, len)
9 -- This assumes that current position == offset
10 local fake_data = (" "):rep(len);
11 local ok, msg = f:write(fake_data);
12 if not ok then
13 return ok, msg;
14 end
15 f:seek("set", offset);
16 return true;
17 end;
18 pcall(function()
19 local pposix = require "util.pposix";
20 fallocate = pposix.fallocate or fallocate;
21 end);
22
23 local archive = {};
24 local archive_mt = { __index = archive };
25
26 function archive:append(username, _, when, with, data)
27 if getmetatable(data) ~= st.stanza_mt then
28 return nil, "unsupported-datatype";
29 end
30 username = username or "@";
31 data = tostring(data) .. "\n";
32 local day = dt.date(when);
33 local filename = dm.getpath(username.."@"..day, module.host, self.store, "xml", true);
34 local ok, err;
35 local f = io.open(filename, "r+");
36 if not f then
37 f, err = io.open(filename, "w");
38 if not f then return nil, err; end
39 ok, err = dm.list_append(username, module.host, self.store, day);
40 if not ok then return nil, err; end
41 end
42 local offset = f and f:seek("end");
43 ok, err = fallocate(f, offset, #data);
44 if not ok then return nil, err; end
45 f:seek("set", offset);
46 ok, err = f:write(data);
47 if not ok then return nil, err; end
48 ok, err = f:close();
49 if not ok then return nil, err; end
50 local id = day .. "-" .. hmac_sha256(username.."@"..day.."+"..offset, data, true):sub(-16);
51 ok, err = dm.list_append(username.."@"..day, module.host, self.store, { id = id, when = when, with = with, offset = offset, length = #data });
52 if not ok then return nil, err; end
53 return id;
54 end
55
56 function archive:find(username, query)
57 username = username or "@";
58 query = query or empty;
59
60 local result;
61 local function cb(_, stanza)
62 if result then
63 module:log("warn", "Multiple items in chunk");
64 end
65 result = stanza;
66 end
67
68 local stream_sess = { notopen = true };
69 local stream = new_stream(stream_sess, { handlestanza = cb, stream_ns = "jabber:client"});
70 local dates = dm.list_load(username, module.host, self.store) or empty;
71 stream:feed(st.stanza("stream", { xmlns = "jabber:client" }):top_tag());
72 stream_sess.notopen = nil;
73
74 local limit = query.limit;
75 local start_day, step, last_day = 1, 1, #dates;
76 local count = 0;
77 local rev = query.reverse;
78 local in_range = not (query.after or query.before);
79 if query.after or query.start then
80 local d = query.after and query.after:sub(1, 10) or dt.date(query.start);
81 for i = 1, #dates do
82 if dates[i] == d then
83 start_day = i; break;
84 end
85 end
86 end
87 if query.before or query["end"] then
88 local d = query.before and query.before:sub(1, 10) or dt.date(query["end"]);
89 for i = #dates, 1, -1 do
90 if dates[i] == d then
91 last_day = i; break;
92 end
93 end
94 end
95 if rev then
96 start_day, step, last_day = last_day, -step, start_day;
97 end
98 local items, xmlfile;
99 local first_item, last_item;
100
101 return function ()
102 if limit and count >= limit then xmlfile:close() return; end
103
104 for d = start_day, last_day, step do
105 if d ~= start_day or not items then
106 module:log("debug", "Load items for %s", dates[d]);
107 start_day = d;
108 items = dm.list_load(username .. "@" .. dates[d], module.host, self.store) or empty;
109 if not rev then
110 first_item, last_item = 1, #items;
111 else
112 first_item, last_item = #items, 1;
113 end
114 local ferr;
115 xmlfile, ferr = io.open(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "xml"));
116 if not xmlfile then
117 module:log("error", "Error: %s", ferr);
118 return;
119 end
120 end
121
122 for i = first_item, last_item, step do
123 module:log("debug", "data[%q][%d]", dates[d], i);
124 local item = items[i];
125 if not item then
126 module:log("debug", "data[%q][%d] is nil", dates[d], i);
127 break;
128 end
129 if xmlfile and in_range
130 and (not query.with or item.with == query.with)
131 and (not query.start or item.when >= query.start)
132 and (not query["end"] or item.when <= query["end"]) then
133 count = count + 1;
134 first_item = i + step;
135
136 xmlfile:seek("set", item.offset);
137 local data = xmlfile:read(item.length);
138 local ok, err = stream:feed(data);
139 if not ok then
140 module:log("warn", "Parse error: %s", err);
141 end
142 if result then
143 local stanza = result;
144 result = nil;
145 return item.id, stanza, item.when, item.with;
146 end
147 end
148 if (rev and item.id == query.after) or
149 (not rev and item.id == query.before) then
150 in_range = false;
151 limit = count;
152 end
153 if (rev and item.id == query.before) or
154 (not rev and item.id == query.after) then
155 in_range = true;
156 end
157 end
158 end
159 if xmlfile then
160 xmlfile:close();
161 xmlfile = nil;
162 end
163 end
164 end
165
166 function archive:delete(username, query)
167 username = username or "@";
168 query = query or empty;
169 if query.with or query.start or query.after then
170 return nil, "not-implemented"; -- Only trimming the oldest messages
171 end
172 local before = query.before or query["end"] or "9999-12-31";
173 if type(before) == "number" then before = dt.date(before); else before = before:sub(1, 10); end
174 local dates = dm.list_load(username, module.host, self.store) or empty;
175 local remaining_dates = {};
176 for d = 1, #dates do
177 if dates[d] >= before then
178 table.insert(remaining_dates, dates[d]);
179 end
180 end
181 table.sort(remaining_dates);
182 local ok, err = dm.list_store(username, module.host, self.store, remaining_dates);
183 if not ok then return ok, err; end
184 for d = 1, #dates do
185 if dates[d] < before then
186 os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "list"));
187 os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "xml"));
188 end
189 end
190 return true;
191 end
192
193 local provider = {};
194 function provider:open(store, typ)
195 if typ ~= "archive" then return nil, "unsupported-store"; end
196 return setmetatable({ store = store }, archive_mt);
197 end
198
199 module:provides("storage", provider);