diff mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 1240:e0d97eb52ab8

mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author Matthew Wild <mwild1@gmail.com>
date Sun, 01 Dec 2013 19:12:08 +0000
parents
children c2bf6b2102aa
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Sun Dec 01 19:12:08 2013 +0000
@@ -0,0 +1,161 @@
+module:set_global();
+
+local mqtt = module:require "mqtt";
+local st = require "util.stanza";
+
+local pubsub_services = {};
+local pubsub_subscribers = {};
+local packet_handlers = {};
+
+function handle_packet(session, packet)
+	module:log("warn", "MQTT packet received! Length: %d", packet.length);
+	for k,v in pairs(packet) do
+		module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
+	end
+	local handler = packet_handlers[packet.type];
+	if not handler then
+		module:log("warn", "Unhandled command: %s", tostring(packet.type));
+		return;
+	end
+	handler(session, packet);
+end
+
+function packet_handlers.connect(session, packet)
+	session.conn:write(mqtt.serialize_packet{
+		type = "connack";
+		data = string.char(0x00, 0x00);
+	});
+end
+
+function packet_handlers.disconnect(session, packet)
+	session.conn:close();
+end
+
+function packet_handlers.publish(session, packet)
+	module:log("warn", "PUBLISH to %s", packet.topic);
+	local host, node = packet.topic:match("^([^/]+)/(.+)$");
+	local pubsub = pubsub_services[host];
+	if not pubsub then
+		module:log("warn", "Unable to locate host/node: %s", packet.topic);
+		return;
+	end
+	local id = "mqtt";
+	local ok, err = pubsub:publish(node, true, id,
+		st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" })
+			:text(packet.data)
+	);
+	if not ok then
+		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
+	end
+end
+
+function packet_handlers.subscribe(session, packet)
+	for _, topic in ipairs(packet.topics) do
+		module:log("warn", "SUBSCRIBE to %s", topic);
+		local host, node = topic:match("^([^/]+)/(.+)$");
+		local pubsub = pubsub_subscribers[host];
+		if not pubsub then
+			module:log("warn", "Unable to locate host/node: %s", topic);
+			return;
+		end
+		local node_subs = pubsub[node];
+		if not node_subs then
+			node_subs = {};
+			pubsub[node] = node_subs;
+		end
+		session.subscriptions[topic] = true;
+		node_subs[session] = true;
+	end
+	
+end
+
+function packet_handlers.pingreq(session, packet)
+	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
+end
+
+local sessions = {};
+
+local mqtt_listener = {};
+
+function mqtt_listener.onconnect(conn)
+	sessions[conn] = {
+		conn = conn;
+		stream = mqtt.new_stream();
+		subscriptions = {};
+	};
+end
+
+function mqtt_listener.onincoming(conn, data)
+	local session = sessions[conn];
+	if session then
+		local packets = session.stream:feed(data);
+		for i = 1, #packets do
+			handle_packet(session, packets[i]);
+		end
+	end
+end
+
+function mqtt_listener.ondisconnect(conn)
+	local session = sessions[conn];
+	for topic in pairs(session.subscriptions) do
+		local host, node = topic:match("^([^/]+)/(.+)$");
+		local subs = pubsub_subscribers[host];
+		if subs then
+			local node_subs = subs[node];
+			if node_subs then
+				node_subs[session] = nil;
+			end
+		end
+	end
+	sessions[conn] = nil;
+	module:log("debug", "MQTT client disconnected");
+end
+
+module:provides("net", {
+	default_port = 1883;
+	listener = mqtt_listener;
+});
+
+local function tostring_content(item)
+	return tostring(item[1]);
+end
+
+local data_translators = setmetatable({
+	["data https://prosody.im/protocol/mqtt"] = tostring_content;
+	["json urn:xmpp:json:0"] = tostring_content;
+}, {
+	__index = function () return tostring; end;
+});
+
+function module.add_host(module)
+	local pubsub_module = hosts[module.host].modules.pubsub
+	if pubsub_module then
+		module:log("debug", "MQTT enabled for %s", module.host);
+		module:depends("pubsub");
+		pubsub_services[module.host] = assert(pubsub_module.service);
+		local subscribers = {};
+		pubsub_subscribers[module.host] = subscribers;
+		local function handle_publish(event)
+			-- Build MQTT packet
+			local packet = mqtt.serialize_packet{
+				type = "publish";
+				id = "\000\000";
+				topic = module.host.."/"..event.node;
+				data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item);
+			};
+			-- Broadcast to subscribers
+			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
+			for session in pairs(subscribers[event.node] or {}) do
+				session.conn:write(packet);
+				module:log("debug", "Sent to %s", tostring(session));
+			end
+		end
+		pubsub_services[module.host].events.add_handler("item-published", handle_publish);
+		function module.unload()
+			module:log("debug", "MQTT disabled for %s", module.host);
+			pubsub_module.service.remove_handler("item-published", handle_publish);
+			pubsub_services[module.host] = nil;
+			pubsub_subscribers[module.host] = nil;
+		end
+	end
+end