view mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 1983:f7fc5088b340

mod_storage_lmdb: Allow passing a transaction flag so we can run get operations in a read-only transaction
author Kim Alvefur <zash@zash.se>
date Sat, 19 Dec 2015 00:34:55 +0100
parents 7dbde05b48a9
children 6ba2188e2686
line wrap: on
line source

module:set_global();

local mqtt = module:require "mqtt";
local st = require "util.stanza";

local pubsub_services = {};
local pubsub_subscribers = {};
local packet_handlers = {};

function handle_packet(session, packet)
	module:log("warn", "MQTT packet received! Length: %d", packet.length);
	for k,v in pairs(packet) do
		module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
	end
	local handler = packet_handlers[packet.type];
	if not handler then
		module:log("warn", "Unhandled command: %s", tostring(packet.type));
		return;
	end
	handler(session, packet);
end

function packet_handlers.connect(session, packet)
	session.conn:write(mqtt.serialize_packet{
		type = "connack";
		data = string.char(0x00, 0x00);
	});
end

function packet_handlers.disconnect(session, packet)
	session.conn:close();
end

function packet_handlers.publish(session, packet)
	module:log("warn", "PUBLISH to %s", packet.topic);
	local host, node = packet.topic:match("^([^/]+)/(.+)$");
	local pubsub = pubsub_services[host];
	if not pubsub then
		module:log("warn", "Unable to locate host/node: %s", packet.topic);
		return;
	end
	local id = "mqtt";
	local ok, err = pubsub:publish(node, true, id,
		st.stanza("data", { xmlns = "https://prosody.im/protocol/data" })
			:text(packet.data)
	);
	if not ok then
		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
	end
end

function packet_handlers.subscribe(session, packet)
	for _, topic in ipairs(packet.topics) do
		module:log("warn", "SUBSCRIBE to %s", topic);
		local host, node = topic:match("^([^/]+)/(.+)$");
		local pubsub = pubsub_subscribers[host];
		if not pubsub then
			module:log("warn", "Unable to locate host/node: %s", topic);
			return;
		end
		local node_subs = pubsub[node];
		if not node_subs then
			node_subs = {};
			pubsub[node] = node_subs;
		end
		session.subscriptions[topic] = true;
		node_subs[session] = true;
	end

end

function packet_handlers.pingreq(session, packet)
	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
end

local sessions = {};

local mqtt_listener = {};

function mqtt_listener.onconnect(conn)
	sessions[conn] = {
		conn = conn;
		stream = mqtt.new_stream();
		subscriptions = {};
	};
end

function mqtt_listener.onincoming(conn, data)
	local session = sessions[conn];
	if session then
		local packets = session.stream:feed(data);
		for i = 1, #packets do
			handle_packet(session, packets[i]);
		end
	end
end

function mqtt_listener.ondisconnect(conn)
	local session = sessions[conn];
	for topic in pairs(session.subscriptions) do
		local host, node = topic:match("^([^/]+)/(.+)$");
		local subs = pubsub_subscribers[host];
		if subs then
			local node_subs = subs[node];
			if node_subs then
				node_subs[session] = nil;
			end
		end
	end
	sessions[conn] = nil;
	module:log("debug", "MQTT client disconnected");
end

module:provides("net", {
	default_port = 1883;
	listener = mqtt_listener;
});

local function tostring_content(item)
	return tostring(item[1]);
end

local data_translators = setmetatable({
	["data https://prosody.im/protocol/data"] = tostring_content;
	["json urn:xmpp:json:0"] = tostring_content;
}, {
	__index = function () return tostring; end;
});

function module.add_host(module)
	local pubsub_module = hosts[module.host].modules.pubsub
	if pubsub_module then
		module:log("debug", "MQTT enabled for %s", module.host);
		module:depends("pubsub");
		pubsub_services[module.host] = assert(pubsub_module.service);
		local subscribers = {};
		pubsub_subscribers[module.host] = subscribers;
		local function handle_publish(event)
			-- Build MQTT packet
			local packet = mqtt.serialize_packet{
				type = "publish";
				id = "\000\000";
				topic = module.host.."/"..event.node;
				data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item);
			};
			-- Broadcast to subscribers
			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
			for session in pairs(subscribers[event.node] or {}) do
				session.conn:write(packet);
				module:log("debug", "Sent to %s", tostring(session));
			end
		end
		pubsub_services[module.host].events.add_handler("item-published", handle_publish);
		function module.unload()
			module:log("debug", "MQTT disabled for %s", module.host);
			pubsub_module.service.remove_handler("item-published", handle_publish);
			pubsub_services[module.host] = nil;
			pubsub_subscribers[module.host] = nil;
		end
	end
end