view mod_mam_archive/mod_mam_archive.lua @ 4776:13e913471b75

mod_http_admin_api: Ensure freshness of metrics when in manual mode When in manual collection mode, as recommended for Prometheus, collection needs to be triggered manually, or they would be stale, possibly dating from the start of the server. This might vary per metric depending on how and when the metrics are gathered.
author Kim Alvefur <zash@zash.se>
date Thu, 18 Nov 2021 19:26:07 +0100
parents 3e97dae28215
children 8ff308fad9fd
line wrap: on
line source

-- Prosody IM
--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
--
local get_prefs = module:require"mod_mam/mamprefs".get;
local set_prefs = module:require"mod_mam/mamprefs".set;
local rsm = require "util.rsm";
local jid_bare = require "util.jid".bare;
local jid_prep = require "util.jid".prep;
local date_parse = require "util.datetime".parse;
local date_format = require "util.datetime".datetime;

local st = require "util.stanza";
local archive_store = "archive2";
local archive = module:open_store(archive_store, "archive");
local global_default_policy = module:get_option("default_archive_policy", false);
local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
local conversation_interval = tonumber(module:get_option_number("archive_conversation_interval", 86400));
local resolve_relative_path = require "core.configmanager".resolve_relative_path;

-- Feature discovery
local xmlns_archive = "urn:xmpp:archive"
local feature_archive = st.stanza("feature", {xmlns=xmlns_archive}):tag("optional");
if(global_default_policy) then
    feature_archive:tag("default");
end
module:add_extension(feature_archive);
module:add_feature("urn:xmpp:archive:auto");
module:add_feature("urn:xmpp:archive:manage");
module:add_feature("urn:xmpp:archive:pref");
module:add_feature("http://jabber.org/protocol/rsm");
-- --------------------------------------------------

local function prefs_to_stanza(prefs)
    local prefstanza = st.stanza("pref", { xmlns="urn:xmpp:archive" });
    local default = prefs[false] ~= nil and prefs[false] or global_default_policy;

    prefstanza:tag("default", {otr="oppose", save=default and "true" or "false"}):up();
    prefstanza:tag("method", {type="auto", use="concede"}):up();
    prefstanza:tag("method", {type="local", use="concede"}):up();
    prefstanza:tag("method", {type="manual", use="concede"}):up();

    for jid, choice in pairs(prefs) do
        if jid then
            prefstanza:tag("item", {jid=jid, otr="prefer", save=choice and "message" or "false" }):up()
        end
    end

    return prefstanza;
end
local function prefs_from_stanza(stanza, username)
    local current_prefs = get_prefs(username);

    -- "default" | "item" | "session" | "method"
    for elem in stanza:children() do
        if elem.name == "default" then
            current_prefs[false] = elem.attr["save"] == "true";
        elseif elem.name == "item" then
            current_prefs[elem.attr["jid"]] = not elem.attr["save"] == "false";
        elseif elem.name == "session" then
            module:log("info", "element is not supported: " .. tostring(elem));
--            local found = false;
--            for child in data:children() do
--                if child.name == elem.name and child.attr["thread"] == elem.attr["thread"] then
--                    for k, v in pairs(elem.attr) do
--                        child.attr[k] = v;
--                    end
--                    found = true;
--                    break;
--                end
--            end
--            if not found then
--                data:tag(elem.name, elem.attr):up();
--            end
        elseif elem.name == "method" then
            module:log("info", "element is not supported: " .. tostring(elem));
--            local newpref = stanza.tags[1]; -- iq:pref
--            for _, e in ipairs(newpref.tags) do
--                -- if e.name ~= "method" then continue end
--                local found = false;
--                for child in data:children() do
--                    if child.name == "method" and child.attr["type"] == e.attr["type"] then
--                        child.attr["use"] = e.attr["use"];
--                        found = true;
--                        break;
--                    end
--                end
--                if not found then
--                    data:tag(e.name, e.attr):up();
--                end
--            end
        end
    end
end

------------------------------------------------------------
-- Preferences
------------------------------------------------------------
local function preferences_handler(event)
    local origin, stanza = event.origin, event.stanza;
    local user = origin.username;
    local reply = st.reply(stanza);

    if stanza.attr.type == "get" then
        reply:add_child(prefs_to_stanza(get_prefs(user)));
    end
    if stanza.attr.type == "set" then
        local new_prefs = stanza:get_child("pref", xmlns_archive);
        if not new_prefs then return false; end

        local prefs = prefs_from_stanza(stanza, origin.username);
        local ok, err = set_prefs(user, prefs);

        if not ok then
            return origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err)));
        end
    end
    return origin.send(reply);
end
local function auto_handler(event)
    local origin, stanza = event.origin, event.stanza;
    if not stanza.attr["type"] == "set" then return false; end

    local user = origin.username;
    local prefs = get_prefs(user);
    local auto = stanza:get_child("auto", xmlns_archive);

    prefs[false] = auto.attr["save"] ~= nil and auto.attr["save"] == "true" or false;
    set_prefs(user, prefs);

    return origin.send(st.reply(stanza));
end

-- excerpt from mod_storage_sql2
local function get_db()
    local mod_sql = module:require("sql");
    local params = module:get_option("sql");
    local engine;

    params = params or { driver = "SQLite3" };
    if params.driver == "SQLite3" then
        params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite");
    end

    assert(params.driver and params.database, "Both the SQL driver and the database need to be specified");
    engine = mod_sql:create_engine(params);
    engine:set_encoding();

    return engine;
end

------------------------------------------------------------
-- Collections. In our case there is one conversation with each contact for the whole day for simplicity
------------------------------------------------------------
local function list_stanza_to_query(origin, list_el)
    local sql = "SELECT `with`, `when` / ".. conversation_interval .." as `day`, COUNT(0) FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? ";
    local args = {origin.host, origin.username, archive_store};

    local with = list_el.attr["with"];
    if with ~= nil then
        sql = sql .. "AND `with` = ? ";
        table.insert(args, jid_bare(with));
    end

    local after = list_el.attr["start"];
    if after ~= nil then
        sql = sql .. "AND `when` >= ? ";
        table.insert(args, date_parse(after));
    end

    local before = list_el.attr["end"];
    if before ~= nil then
        sql = sql .. "AND `when` <= ? ";
        table.insert(args, date_parse(before));
    end

    sql = sql .. "GROUP BY `with`, `when` / ".. conversation_interval .." ORDER BY `when` / ".. conversation_interval .." ASC ";

    local qset = rsm.get(list_el);
    local limit = math.min(qset and qset.max or default_max_items, max_max_items);
    sql = sql.."LIMIT ?";
    table.insert(args, limit);

    table.insert(args, 1, sql);
    return args;
end
local function list_handler(event)
    local db = get_db();
    local origin, stanza = event.origin, event.stanza;
    local reply = st.reply(stanza);

    local query = list_stanza_to_query(origin, stanza.tags[1]);
    local list = reply:tag("list", {xmlns=xmlns_archive});

    for row in db:select(unpack(query)) do
        list:tag("chat", {
            xmlns=xmlns_archive,
            with=row[1],
            start=date_format(row[2] * conversation_interval),
            version=row[3]
        }):up();
    end

    origin.send(reply);
    return true;
end

------------------------------------------------------------
-- Message archive retrieval
------------------------------------------------------------

local function retrieve_handler(event)
    local origin, stanza = event.origin, event.stanza;
    local reply = st.reply(stanza);

    local retrieve = stanza:get_child("retrieve", xmlns_archive);

    local qwith = retrieve.attr["with"];
    local qstart = retrieve.attr["start"];

    module:log("debug", "Archive query, with %s from %s)",
        qwith or "anyone", qstart or "the dawn of time");

    if qstart then -- Validate timestamps
        local vstart = (qstart and date_parse(qstart));
        if (qstart and not vstart) then
            origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid timestamp"))
            return true
        end
        qstart = vstart;
    end

    if qwith then -- Validate the "with" jid
        local pwith = qwith and jid_prep(qwith);
        if pwith and not qwith then -- it failed prepping
            origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid JID"))
            return true
        end
        qwith = jid_bare(pwith);
    end

    -- RSM stuff
    local qset = rsm.get(retrieve);
    local qmax = math.min(qset and qset.max or default_max_items, max_max_items);
    local reverse = qset and qset.before or false;
    local before, after = qset and qset.before, qset and qset.after;
    if type(before) ~= "string" then before = nil; end

    -- Load all the data!
    local data, err = archive:find(origin.username, {
        start = qstart; ["end"] = qstart + conversation_interval;
        with = qwith;
        limit = qmax;
        before = before; after = after;
        reverse = reverse;
        total = true;
    });

    if not data then
        return origin.send(st.error_reply(stanza, "cancel", "internal-server-error", err));
    end
    local count = err;

    local chat = reply:tag("chat", {xmlns=xmlns_archive, with=qwith, start=date_format(qstart), version=count});
    local first, last;

    module:log("debug", "Count "..count);
    for id, item, when in data do
        if not getmetatable(item) == st.stanza_mt then
            item = st.deserialize(item);
        end
        module:log("debug", tostring(item));

        local tag = jid_bare(item.attr["from"]) == jid_bare(origin.full_jid) and "to" or "from";
        tag = chat:tag(tag, {secs = when - qstart});
        tag:add_child(item:get_child("body")):up();
        if not first then first = id; end
        last = id;
    end
    reply:add_child(rsm.generate{ first = first, last = last, count = count })

    origin.send(reply);
    return true;
end

local function not_implemented(event)
    local origin, stanza = event.origin, event.stanza;
    local reply = st.reply(stanza):tag("error", {type="cancel"});
    reply:tag("feature-not-implemented", {xmlns="urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
    origin.send(reply);
end

-- Preferences
module:hook("iq/self/urn:xmpp:archive:pref", preferences_handler);
module:hook("iq/self/urn:xmpp:archive:auto", auto_handler);
module:hook("iq/self/urn:xmpp:archive:itemremove", not_implemented);
module:hook("iq/self/urn:xmpp:archive:sessionremove", not_implemented);

-- Message Archive Management
module:hook("iq/self/urn:xmpp:archive:list", list_handler);
module:hook("iq/self/urn:xmpp:archive:retrieve", retrieve_handler);
module:hook("iq/self/urn:xmpp:archive:remove", not_implemented);

-- manual archiving
module:hook("iq/self/urn:xmpp:archive:save", not_implemented);
-- replication
module:hook("iq/self/urn:xmpp:archive:modified", not_implemented);