diff mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 5113:85a7304cfea1

mod_pubsub_mqtt: Support atom_title payload type This commit adds the ability to publish and subscribe with arbitrary payload types. It has a breaking change, which is that topics are now of the form: HOST/TYPE/NODE Currently supported types are utf8_data, json and atom_title.
author Matthew Wild <mwild1@gmail.com>
date Fri, 16 Dec 2022 22:16:45 +0000
parents 9499b88f3453
children 801f64e6d4e9
line wrap: on
line diff
--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Fri Dec 16 12:12:01 2022 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Fri Dec 16 22:16:45 2022 +0000
@@ -1,8 +1,46 @@
 module:set_global();
 
 local mqtt = module:require "mqtt";
+local id = require "util.id";
 local st = require "util.stanza";
 
+local function tostring_content(item)
+	return tostring(item[1]);
+end
+
+local data_translators = setmetatable({
+	utf8 = {
+		from_item = function (item)
+			return item:find("{https://prosody.im/protocol/data}data#");
+		end;
+		to_item = function (payload)
+			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+				:text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" })
+		end;
+	};
+	json = {
+		from_item = function (item)
+			return item:find("{urn:xmpp:json:0}json#");
+		end;
+		to_item = function (payload)
+			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+				:text_tag("json", payload, { xmlns = "urn:xmpp:json:0" });
+		end;
+	};
+	atom_title = {
+		from_item = function (item)
+			return item:find("{http://www.w3.org/2005/Atom}entry/title#");
+		end;
+		to_item = function (payload)
+			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+				:tag("entry", { xmlns = "http://www.w3.org/2005/Atom" })
+					:text_tag("title", payload, { type = "text" });
+		end;
+	};
+}, {
+	__index = function () return { from_item = tostring }; end;
+});
+
 local pubsub_services = {};
 local pubsub_subscribers = {};
 local packet_handlers = {};
@@ -33,17 +71,25 @@
 
 function packet_handlers.publish(session, packet)
 	module:log("info", "PUBLISH to %s", packet.topic);
-	local host, node = packet.topic:match("^([^/]+)/(.+)$");
+	local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$");
+	if not host then
+		module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
+		return;
+	end
 	local pubsub = pubsub_services[host];
 	if not pubsub then
 		module:log("warn", "Unable to locate host/node: %s", packet.topic);
 		return;
 	end
-	local id = "mqtt";
-	local ok, err = pubsub:publish(node, true, id,
-		st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id })
-		  :text_tag("data", packet.data, { xmlns = "https://prosody.im/protocol/data" })
-	);
+
+	local payload_translator = data_translators[payload_type];
+	if not payload_translator or not payload_translator.to_item then
+		module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic);
+		return;
+	end
+
+	local payload_item = payload_translator.to_item(packet.data);
+	local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item);
 	if not ok then
 		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
 	end
@@ -52,7 +98,11 @@
 function packet_handlers.subscribe(session, packet)
 	for _, topic in ipairs(packet.topics) do
 		module:log("info", "SUBSCRIBE to %s", topic);
-		local host, node = topic:match("^([^/]+)/(.+)$");
+		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
+		if not host then
+			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
+			return;
+		end
 		local pubsub = pubsub_subscribers[host];
 		if not pubsub then
 			module:log("warn", "Unable to locate host/node: %s", topic);
@@ -63,8 +113,8 @@
 			node_subs = {};
 			pubsub[node] = node_subs;
 		end
-		session.subscriptions[topic] = true;
-		node_subs[session] = true;
+		session.subscriptions[topic] = payload_type;
+		node_subs[session] = payload_type;
 	end
 
 end
@@ -116,17 +166,6 @@
 	listener = mqtt_listener;
 });
 
-local function tostring_content(item)
-	return tostring(item[1]);
-end
-
-local data_translators = setmetatable({
-	["data https://prosody.im/protocol/data"] = tostring_content;
-	["json urn:xmpp:json:0"] = tostring_content;
-}, {
-	__index = function () return tostring; end;
-});
-
 function module.add_host(module)
 	local pubsub_module = hosts[module.host].modules.pubsub
 	if pubsub_module then
@@ -137,16 +176,22 @@
 		pubsub_subscribers[module.host] = subscribers;
 		local function handle_publish(event)
 			-- Build MQTT packet
-			local packet = mqtt.serialize_packet{
-				type = "publish";
-				id = "\000\000";
-				topic = module.host.."/"..event.node;
-				data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item);
-			};
+			local packet_types = setmetatable({}, {
+				__index = function (self, payload_type)
+					local packet = mqtt.serialize_packet{
+						type = "publish";
+						id = "\000\000";
+						topic = module.host.."/"..payload_type.."/"..event.node;
+						data = data_translators[payload_type].from_item(event.item) or "";
+					};
+					rawset(self, packet);
+					return packet;
+				end;
+			});
 			-- Broadcast to subscribers
-			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
-			for session in pairs(subscribers[event.node] or {}) do
-				session.conn:write(packet);
+			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);
+			for session, payload_type in pairs(subscribers[event.node] or {}) do
+				session.conn:write(packet_types[payload_type]);
 				module:log("debug", "Sent to %s", tostring(session));
 			end
 		end