Mercurial > prosody-modules
changeset 4511:97fac0ba0469
mod_pubsub_subscription: New module providing an API for pubsub subscriptions
This lets other modules hook events from local or remote XEP-0060 pubsub
services. API allows keeping track of items added, removed or if the
whole node gets cleared or even deleted.
Requested by MattJ.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 15 Mar 2021 16:31:23 +0100 (2021-03-15) |
parents | 6690586826e8 |
children | b88f05c878ac |
files | mod_pubsub_subscription/README.markdown mod_pubsub_subscription/mod_pubsub_subscription.lua |
diffstat | 2 files changed, 230 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_pubsub_subscription/README.markdown Mon Mar 15 16:31:23 2021 +0100 @@ -0,0 +1,85 @@ +# Introduction + +This module lets you programmatically subscribe to updates from a +[pubsub][xep0060] node, even if the pubsub service is remote. + +## Example + +``` {.lua} +module:depends("pubsub_subscription"); +module:add_item("pubsub-subscription", { + service = "pubsub.example.com"; + node = "otter_facts"; + + -- Callbacks: + on_subscribed = function() + module:log("info", "Otter facts incoming!"); + end; + + on_item = function(event) + module:log("info", "Random Otter Fact: %s", event.payload:get_text()); + end; +}); +``` + +## Usage + +Ensure the module is loaded and add your subscription via the +`:add_item` API. The item table MUST have `service` and `node` fields +and SHOULD have one or more `on_<event>` callbacks. + +The JID of the pubsub service is given in `service` (could also be the +JID of an user for advanced PEP usage) and the node is given in, +unsurprisingly, the `node` field. + +The various `on_event` callback functions, if present, gets called when +new events are received. The most interesting would be `on_item`, which +receives incoming items. Available events are: + +`on_subscribed` +: The subscription was successful, events may follow. + +`on_unsubscribed` +: Subscription was removed successfully, this happens if the + subscription is removed, which you would normally never do. + +`on_error` +: If there was an error subscribing to the pubsub service. Receives a + table with `type`, `condition`, `text`, and `extra` fields as + argument. + +`on_item` +: An item publication, the payload itself available in the `payload` + field in the table provided as argument. The ID of the item can be + found in `item.attr.id`. + +`on_retract` +: When an item gets retracted (removed by the publisher). The ID of + the item can be found in `item.attr.id` of the table argument.. + +`on_purge` +: All the items were removed by the publisher. + +`on_delete` +: The entire pubsub node was removed from the pubsub service. No + subscription exists after this. + +``` {.lua} +event_payload = { + -- Common prosody event entries: + stanza = util.stanza; + origin = util.session; + + -- PubSub service details + service = "pubsub.example.com"; + node = "otter_facts"; + + -- The pubsub event itself + item = util.stanza; -- <item/> + payload = util.stanza; -- actual payload, child of <item/> +} +``` + +# Compatibility + +Should work with Prosody \>= 0.11.x
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_pubsub_subscription/mod_pubsub_subscription.lua Mon Mar 15 16:31:23 2021 +0100 @@ -0,0 +1,145 @@ +local st = require "util.stanza"; +local uuid = require "util.uuid"; +local mt = require "util.multitable"; +local cache = require "util.cache"; + +local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; +local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event"; + +-- TODO persist +-- TODO query known pubsub nodes to sync current subscriptions +-- TODO subscription ids per 'item' would be handy + +local pending_subscription = cache.new(256); -- uuid → node +local pending_unsubscription = cache.new(256); -- uuid → node +local active_subscriptions = mt.new() -- service | node | uuid | { item } +function module.save() + return { active_subscriptions = active_subscriptions.data } +end +function module.restore(data) + if data and data.active_subscriptions then + active_subscriptions.data = data.active_subscriptions + end +end + +local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"} + +local function subscription_added(item_event) + local item = item_event.item; + assert(item.service, "pubsub subscription item MUST have a 'service' field."); + assert(item.node, "pubsub subscription item MUST have a 'node' field."); + + local already_subscibed = false; + for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512 + already_subscibed = true; + break + end + + item._id = uuid.generate(); + local iq_id = uuid.generate(); + pending_subscription:set(iq_id, item._id); + active_subscriptions:set(item.service, item.node, item._id, item); + + if not already_subscibed then + module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) + :tag("pubsub", { xmlns = xmlns_pubsub }) + :tag("subscribe", { jid = module.host, node = item.node })); + end +end + +for _, event_name in ipairs(valid_events) do + module:hook("pubsub-event/host/"..event_name, function (event) + for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do + pcall(cb, event); + end + end); +end + +module:hook("iq/host", function (event) + local stanza = event.stanza; + local service = stanza.attr.from; + + if not stanza.attr.id then return end -- shouldn't be possible + + local subscribed_node = pending_subscription:get(stanza.attr.id); + pending_subscription:set(stanza.attr.id, nil); + local unsubscribed_node = pending_unsubscription:get(stanza.attr.id); + pending_unsubscription:set(stanza.attr.id, nil); + + if stanza.attr.type == "result" then + local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub); + local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription"); + if not subscription then return end + local node = subscription.attr.node; + + local what; + if subscription.attr.subscription == "subscribed" then + what = "on_subscribed"; + elseif subscription.attr.subscription == "none" then + what = "on_unsubscribed"; + end + if not what then return end -- there are other states but we don't handle them + for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do + cb(event); + end + return true; + + elseif stanza.attr.type == "error" then + local node = subscribed_node or unsubscribed_node; + local error_type, error_condition, reason, pubsub_error = stanza:get_error(); + local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error }; + if active_subscriptions:get(service) then + for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do + cb(err); + end + return true; + end + end +end, 1); + +local function subscription_removed(item_event) + local item = item_event.item; + active_subscriptions:set(item.service, item.node, item._id, nil); + local node_subs = active_subscriptions:get(item.service, item.node); + if node_subs and next(node_subs) then return end + + local iq_id = uuid.generate(); + pending_unsubscription:set(iq_id, item._id); + + module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) + :tag("pubsub", { xmlns = xmlns_pubsub }) + :tag("unsubscribe", { jid = module.host, node = item.node })) +end + +module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true); + +module:hook("message/host", function(event) + local origin, stanza = event.origin, event.stanza; + local ret = nil; + local service = stanza.attr.from; + module:log("debug", "Got message/host: %s", stanza:top_tag()); + for event_container in stanza:childtags("event", xmlns_pubsub_event) do + for pubsub_event in event_container:childtags() do + module:log("debug", "Got pubsub event %s", pubsub_event:top_tag()); + local node = pubsub_event.attr.node; + module:fire_event("pubsub-event/host/"..pubsub_event.name, { + stanza = stanza; + origin = origin; + event = pubsub_event; + service = service; + node = node; + }); + ret = true; + end + end + return ret; +end); + +module:hook("pubsub-event/host/items", function (event) + for item in event.event:childtags() do + module:log("debug", "Got pubsub item event %s", item:top_tag()); + event.item = item; + event.payload = item.tags[1]; + module:fire_event("pubsub-event/host/"..item.name, event); + end +end);