comparison mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 5113:85a7304cfea1

mod_pubsub_mqtt: Support atom_title payload type This commit adds the ability to publish and subscribe with arbitrary payload types. It has a breaking change, which is that topics are now of the form: HOST/TYPE/NODE Currently supported types are utf8_data, json and atom_title.
author Matthew Wild <mwild1@gmail.com>
date Fri, 16 Dec 2022 22:16:45 +0000
parents 9499b88f3453
children
comparison
equal deleted inserted replaced
5112:9499b88f3453 5113:85a7304cfea1
1 module:set_global(); 1 module:set_global();
2 2
3 local mqtt = module:require "mqtt"; 3 local mqtt = module:require "mqtt";
4 local id = require "util.id";
4 local st = require "util.stanza"; 5 local st = require "util.stanza";
6
7 local function tostring_content(item)
8 return tostring(item[1]);
9 end
10
11 local data_translators = setmetatable({
12 utf8 = {
13 from_item = function (item)
14 return item:find("{https://prosody.im/protocol/data}data#");
15 end;
16 to_item = function (payload)
17 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
18 :text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" })
19 end;
20 };
21 json = {
22 from_item = function (item)
23 return item:find("{urn:xmpp:json:0}json#");
24 end;
25 to_item = function (payload)
26 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
27 :text_tag("json", payload, { xmlns = "urn:xmpp:json:0" });
28 end;
29 };
30 atom_title = {
31 from_item = function (item)
32 return item:find("{http://www.w3.org/2005/Atom}entry/title#");
33 end;
34 to_item = function (payload)
35 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
36 :tag("entry", { xmlns = "http://www.w3.org/2005/Atom" })
37 :text_tag("title", payload, { type = "text" });
38 end;
39 };
40 }, {
41 __index = function () return { from_item = tostring }; end;
42 });
5 43
6 local pubsub_services = {}; 44 local pubsub_services = {};
7 local pubsub_subscribers = {}; 45 local pubsub_subscribers = {};
8 local packet_handlers = {}; 46 local packet_handlers = {};
9 47
31 session.conn:close(); 69 session.conn:close();
32 end 70 end
33 71
34 function packet_handlers.publish(session, packet) 72 function packet_handlers.publish(session, packet)
35 module:log("info", "PUBLISH to %s", packet.topic); 73 module:log("info", "PUBLISH to %s", packet.topic);
36 local host, node = packet.topic:match("^([^/]+)/(.+)$"); 74 local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$");
75 if not host then
76 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
77 return;
78 end
37 local pubsub = pubsub_services[host]; 79 local pubsub = pubsub_services[host];
38 if not pubsub then 80 if not pubsub then
39 module:log("warn", "Unable to locate host/node: %s", packet.topic); 81 module:log("warn", "Unable to locate host/node: %s", packet.topic);
40 return; 82 return;
41 end 83 end
42 local id = "mqtt"; 84
43 local ok, err = pubsub:publish(node, true, id, 85 local payload_translator = data_translators[payload_type];
44 st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id }) 86 if not payload_translator or not payload_translator.to_item then
45 :text_tag("data", packet.data, { xmlns = "https://prosody.im/protocol/data" }) 87 module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic);
46 ); 88 return;
89 end
90
91 local payload_item = payload_translator.to_item(packet.data);
92 local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item);
47 if not ok then 93 if not ok then
48 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); 94 module:log("warn", "Error publishing MQTT data: %s", tostring(err));
49 end 95 end
50 end 96 end
51 97
52 function packet_handlers.subscribe(session, packet) 98 function packet_handlers.subscribe(session, packet)
53 for _, topic in ipairs(packet.topics) do 99 for _, topic in ipairs(packet.topics) do
54 module:log("info", "SUBSCRIBE to %s", topic); 100 module:log("info", "SUBSCRIBE to %s", topic);
55 local host, node = topic:match("^([^/]+)/(.+)$"); 101 local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
102 if not host then
103 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
104 return;
105 end
56 local pubsub = pubsub_subscribers[host]; 106 local pubsub = pubsub_subscribers[host];
57 if not pubsub then 107 if not pubsub then
58 module:log("warn", "Unable to locate host/node: %s", topic); 108 module:log("warn", "Unable to locate host/node: %s", topic);
59 return; 109 return;
60 end 110 end
61 local node_subs = pubsub[node]; 111 local node_subs = pubsub[node];
62 if not node_subs then 112 if not node_subs then
63 node_subs = {}; 113 node_subs = {};
64 pubsub[node] = node_subs; 114 pubsub[node] = node_subs;
65 end 115 end
66 session.subscriptions[topic] = true; 116 session.subscriptions[topic] = payload_type;
67 node_subs[session] = true; 117 node_subs[session] = payload_type;
68 end 118 end
69 119
70 end 120 end
71 121
72 function packet_handlers.pingreq(session, packet) 122 function packet_handlers.pingreq(session, packet)
114 module:provides("net", { 164 module:provides("net", {
115 default_port = 1883; 165 default_port = 1883;
116 listener = mqtt_listener; 166 listener = mqtt_listener;
117 }); 167 });
118 168
119 local function tostring_content(item)
120 return tostring(item[1]);
121 end
122
123 local data_translators = setmetatable({
124 ["data https://prosody.im/protocol/data"] = tostring_content;
125 ["json urn:xmpp:json:0"] = tostring_content;
126 }, {
127 __index = function () return tostring; end;
128 });
129
130 function module.add_host(module) 169 function module.add_host(module)
131 local pubsub_module = hosts[module.host].modules.pubsub 170 local pubsub_module = hosts[module.host].modules.pubsub
132 if pubsub_module then 171 if pubsub_module then
133 module:log("debug", "MQTT enabled for %s", module.host); 172 module:log("debug", "MQTT enabled for %s", module.host);
134 module:depends("pubsub"); 173 module:depends("pubsub");
135 pubsub_services[module.host] = assert(pubsub_module.service); 174 pubsub_services[module.host] = assert(pubsub_module.service);
136 local subscribers = {}; 175 local subscribers = {};
137 pubsub_subscribers[module.host] = subscribers; 176 pubsub_subscribers[module.host] = subscribers;
138 local function handle_publish(event) 177 local function handle_publish(event)
139 -- Build MQTT packet 178 -- Build MQTT packet
140 local packet = mqtt.serialize_packet{ 179 local packet_types = setmetatable({}, {
141 type = "publish"; 180 __index = function (self, payload_type)
142 id = "\000\000"; 181 local packet = mqtt.serialize_packet{
143 topic = module.host.."/"..event.node; 182 type = "publish";
144 data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item); 183 id = "\000\000";
145 }; 184 topic = module.host.."/"..payload_type.."/"..event.node;
185 data = data_translators[payload_type].from_item(event.item) or "";
186 };
187 rawset(self, packet);
188 return packet;
189 end;
190 });
146 -- Broadcast to subscribers 191 -- Broadcast to subscribers
147 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node); 192 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);
148 for session in pairs(subscribers[event.node] or {}) do 193 for session, payload_type in pairs(subscribers[event.node] or {}) do
149 session.conn:write(packet); 194 session.conn:write(packet_types[payload_type]);
150 module:log("debug", "Sent to %s", tostring(session)); 195 module:log("debug", "Sent to %s", tostring(session));
151 end 196 end
152 end 197 end
153 pubsub_services[module.host].events.add_handler("item-published", handle_publish); 198 pubsub_services[module.host].events.add_handler("item-published", handle_publish);
154 function module.unload() 199 function module.unload()