comparison mod_pubsub_subscription/mod_pubsub_subscription.lua @ 5651:b40750891bee

mod_pubsub_subscription: support subscribing from a bare JID Allow subscribing from a bare JID on the component instead of only the component host, useful for subscribing to whitelist access model nodes that want to see a particular JID in the from.
author Stephen Paul Weber <singpolyma@singpolyma.net>
date Sat, 06 May 2023 19:42:08 -0500
parents 97fac0ba0469
children
comparison
equal deleted inserted replaced
5650:0eb2d5ea2428 5651:b40750891bee
10 -- TODO query known pubsub nodes to sync current subscriptions 10 -- TODO query known pubsub nodes to sync current subscriptions
11 -- TODO subscription ids per 'item' would be handy 11 -- TODO subscription ids per 'item' would be handy
12 12
13 local pending_subscription = cache.new(256); -- uuid → node 13 local pending_subscription = cache.new(256); -- uuid → node
14 local pending_unsubscription = cache.new(256); -- uuid → node 14 local pending_unsubscription = cache.new(256); -- uuid → node
15 local active_subscriptions = mt.new() -- service | node | uuid | { item } 15 local active_subscriptions = mt.new() -- service | node | subscriber | uuid | { item }
16 function module.save() 16 function module.save()
17 return { active_subscriptions = active_subscriptions.data } 17 return { active_subscriptions = active_subscriptions.data }
18 end 18 end
19 function module.restore(data) 19 function module.restore(data)
20 if data and data.active_subscriptions then 20 if data and data.active_subscriptions then
26 26
27 local function subscription_added(item_event) 27 local function subscription_added(item_event)
28 local item = item_event.item; 28 local item = item_event.item;
29 assert(item.service, "pubsub subscription item MUST have a 'service' field."); 29 assert(item.service, "pubsub subscription item MUST have a 'service' field.");
30 assert(item.node, "pubsub subscription item MUST have a 'node' field."); 30 assert(item.node, "pubsub subscription item MUST have a 'node' field.");
31 item.from = item.from or module.host;
31 32
32 local already_subscibed = false; 33 local already_subscibed = false;
33 for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512 34 for _ in active_subscriptions:iter(item.service, item.node, item.from, nil) do -- luacheck: ignore 512
34 already_subscibed = true; 35 already_subscibed = true;
35 break 36 break
36 end 37 end
37 38
38 item._id = uuid.generate(); 39 item._id = uuid.generate();
39 local iq_id = uuid.generate(); 40 local iq_id = uuid.generate();
40 pending_subscription:set(iq_id, item._id); 41 pending_subscription:set(iq_id, item._id);
41 active_subscriptions:set(item.service, item.node, item._id, item); 42 active_subscriptions:set(item.service, item.node, item.from, item._id, item);
42 43
43 if not already_subscibed then 44 if not already_subscibed then
44 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) 45 module:send(st.iq({ type = "set", id = iq_id, from = item.from, to = item.service })
45 :tag("pubsub", { xmlns = xmlns_pubsub }) 46 :tag("pubsub", { xmlns = xmlns_pubsub })
46 :tag("subscribe", { jid = module.host, node = item.node })); 47 :tag("subscribe", { jid = item.from, node = item.node }));
47 end 48 end
48 end 49 end
49 50
50 for _, event_name in ipairs(valid_events) do 51 for _, event_name in ipairs(valid_events) do
51 module:hook("pubsub-event/host/"..event_name, function (event) 52 module:hook("pubsub-event/host/"..event_name, function (event)
52 for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do 53 for _, _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, event.stanza.attr.to, nil, "on_"..event_name) do
54 pcall(cb, event);
55 end
56 end);
57
58 module:hook("pubsub-event/bare/"..event_name, function (event)
59 for _, _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, event.stanza.attr.to, nil, "on_"..event_name) do
53 pcall(cb, event); 60 pcall(cb, event);
54 end 61 end
55 end); 62 end);
56 end 63 end
57 64
58 module:hook("iq/host", function (event) 65 function handle_iq(context, event)
59 local stanza = event.stanza; 66 local stanza = event.stanza;
60 local service = stanza.attr.from; 67 local service = stanza.attr.from;
61 68
62 if not stanza.attr.id then return end -- shouldn't be possible 69 if not stanza.attr.id then return end -- shouldn't be possible
63 70
77 what = "on_subscribed"; 84 what = "on_subscribed";
78 elseif subscription.attr.subscription == "none" then 85 elseif subscription.attr.subscription == "none" then
79 what = "on_unsubscribed"; 86 what = "on_unsubscribed";
80 end 87 end
81 if not what then return end -- there are other states but we don't handle them 88 if not what then return end -- there are other states but we don't handle them
82 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do 89 for _, _, _, _, _, cb in active_subscriptions:iter(service, node, stanza.attr.to, nil, what) do
83 cb(event); 90 cb(event);
84 end 91 end
85 return true; 92 return true;
86 93
87 elseif stanza.attr.type == "error" then 94 elseif stanza.attr.type == "error" then
88 local node = subscribed_node or unsubscribed_node; 95 local node = subscribed_node or unsubscribed_node;
89 local error_type, error_condition, reason, pubsub_error = stanza:get_error(); 96 local error_type, error_condition, reason, pubsub_error = stanza:get_error();
90 local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error }; 97 local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error };
91 if active_subscriptions:get(service) then 98 if active_subscriptions:get(service) then
92 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do 99 for _, _, _, _, _, cb in active_subscriptions:iter(service, node, stanza.attr.to, nil, "on_error") do
93 cb(err); 100 cb(err);
94 end 101 end
95 return true; 102 return true;
96 end 103 end
97 end 104 end
105 end
106
107 module:hook("iq/host", function (event)
108 handle_iq("host", event);
109 end, 1);
110
111 module:hook("iq/bare", function (event)
112 handle_iq("bare", event);
98 end, 1); 113 end, 1);
99 114
100 local function subscription_removed(item_event) 115 local function subscription_removed(item_event)
101 local item = item_event.item; 116 local item = item_event.item;
102 active_subscriptions:set(item.service, item.node, item._id, nil); 117 active_subscriptions:set(item.service, item.node, item.from, item._id, nil);
103 local node_subs = active_subscriptions:get(item.service, item.node); 118 local node_subs = active_subscriptions:get(item.service, item.node, item.from);
104 if node_subs and next(node_subs) then return end 119 if node_subs and next(node_subs) then return end
105 120
106 local iq_id = uuid.generate(); 121 local iq_id = uuid.generate();
107 pending_unsubscription:set(iq_id, item._id); 122 pending_unsubscription:set(iq_id, item._id);
108 123
109 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) 124 module:send(st.iq({ type = "set", id = iq_id, from = item.from, to = item.service })
110 :tag("pubsub", { xmlns = xmlns_pubsub }) 125 :tag("pubsub", { xmlns = xmlns_pubsub })
111 :tag("unsubscribe", { jid = module.host, node = item.node })) 126 :tag("unsubscribe", { jid = item.from, node = item.node }))
112 end 127 end
113 128
114 module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true); 129 module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true);
115 130
116 module:hook("message/host", function(event) 131 function handle_message(context, event)
117 local origin, stanza = event.origin, event.stanza; 132 local origin, stanza = event.origin, event.stanza;
118 local ret = nil; 133 local ret = nil;
119 local service = stanza.attr.from; 134 local service = stanza.attr.from;
120 module:log("debug", "Got message/host: %s", stanza:top_tag()); 135 module:log("debug", "Got message/%s: %s", context, stanza:top_tag());
121 for event_container in stanza:childtags("event", xmlns_pubsub_event) do 136 for event_container in stanza:childtags("event", xmlns_pubsub_event) do
122 for pubsub_event in event_container:childtags() do 137 for pubsub_event in event_container:childtags() do
123 module:log("debug", "Got pubsub event %s", pubsub_event:top_tag()); 138 module:log("debug", "Got pubsub event %s", pubsub_event:top_tag());
124 local node = pubsub_event.attr.node; 139 local node = pubsub_event.attr.node;
125 module:fire_event("pubsub-event/host/"..pubsub_event.name, { 140 module:fire_event("pubsub-event/" .. context .. "/"..pubsub_event.name, {
126 stanza = stanza; 141 stanza = stanza;
127 origin = origin; 142 origin = origin;
128 event = pubsub_event; 143 event = pubsub_event;
129 service = service; 144 service = service;
130 node = node; 145 node = node;
131 }); 146 });
132 ret = true; 147 ret = true;
133 end 148 end
134 end 149 end
135 return ret; 150 return ret;
151 end
152
153 module:hook("message/host", function(event)
154 return handle_message("host", event);
136 end); 155 end);
137 156
138 module:hook("pubsub-event/host/items", function (event) 157 module:hook("message/bare", function(event)
158 return handle_message("bare", event);
159 end);
160
161
162 function handle_items(context, event)
139 for item in event.event:childtags() do 163 for item in event.event:childtags() do
140 module:log("debug", "Got pubsub item event %s", item:top_tag()); 164 module:log("debug", "Got pubsub item event %s", item:top_tag());
141 event.item = item; 165 event.item = item;
142 event.payload = item.tags[1]; 166 event.payload = item.tags[1];
143 module:fire_event("pubsub-event/host/"..item.name, event); 167 module:fire_event("pubsub-event/" .. context .. "/"..item.name, event);
144 end 168 end
169 end
170
171 module:hook("pubsub-event/host/items", function (event)
172 handle_items("host", event);
145 end); 173 end);
174
175 module:hook("pubsub-event/bare/items", function (event)
176 handle_items("bare", event);
177 end);