# HG changeset patch # User Matthew Wild # Date 1385925128 0 # Node ID e0d97eb52ab80ff4da63c97a7dbca388c7758713 # Parent cc5cbeeb9fc715f5cf715b37ce04e8146d5c174d mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub diff -r cc5cbeeb9fc7 -r e0d97eb52ab8 mod_pubsub_mqtt/mod_pubsub_mqtt.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Sun Dec 01 19:12:08 2013 +0000 @@ -0,0 +1,161 @@ +module:set_global(); + +local mqtt = module:require "mqtt"; +local st = require "util.stanza"; + +local pubsub_services = {}; +local pubsub_subscribers = {}; +local packet_handlers = {}; + +function handle_packet(session, packet) + module:log("warn", "MQTT packet received! Length: %d", packet.length); + for k,v in pairs(packet) do + module:log("debug", "MQTT %s: %s", tostring(k), tostring(v)); + end + local handler = packet_handlers[packet.type]; + if not handler then + module:log("warn", "Unhandled command: %s", tostring(packet.type)); + return; + end + handler(session, packet); +end + +function packet_handlers.connect(session, packet) + session.conn:write(mqtt.serialize_packet{ + type = "connack"; + data = string.char(0x00, 0x00); + }); +end + +function packet_handlers.disconnect(session, packet) + session.conn:close(); +end + +function packet_handlers.publish(session, packet) + module:log("warn", "PUBLISH to %s", packet.topic); + local host, node = packet.topic:match("^([^/]+)/(.+)$"); + local pubsub = pubsub_services[host]; + if not pubsub then + module:log("warn", "Unable to locate host/node: %s", packet.topic); + return; + end + local id = "mqtt"; + local ok, err = pubsub:publish(node, true, id, + st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" }) + :text(packet.data) + ); + if not ok then + module:log("warn", "Error publishing MQTT data: %s", tostring(err)); + end +end + +function packet_handlers.subscribe(session, packet) + for _, topic in ipairs(packet.topics) do + module:log("warn", "SUBSCRIBE to %s", topic); + local host, node = topic:match("^([^/]+)/(.+)$"); + local pubsub = pubsub_subscribers[host]; + if not pubsub then + module:log("warn", "Unable to locate host/node: %s", topic); + return; + end + local node_subs = pubsub[node]; + if not node_subs then + node_subs = {}; + pubsub[node] = node_subs; + end + session.subscriptions[topic] = true; + node_subs[session] = true; + end + +end + +function packet_handlers.pingreq(session, packet) + session.conn:write(mqtt.serialize_packet{type = "pingresp"}); +end + +local sessions = {}; + +local mqtt_listener = {}; + +function mqtt_listener.onconnect(conn) + sessions[conn] = { + conn = conn; + stream = mqtt.new_stream(); + subscriptions = {}; + }; +end + +function mqtt_listener.onincoming(conn, data) + local session = sessions[conn]; + if session then + local packets = session.stream:feed(data); + for i = 1, #packets do + handle_packet(session, packets[i]); + end + end +end + +function mqtt_listener.ondisconnect(conn) + local session = sessions[conn]; + for topic in pairs(session.subscriptions) do + local host, node = topic:match("^([^/]+)/(.+)$"); + local subs = pubsub_subscribers[host]; + if subs then + local node_subs = subs[node]; + if node_subs then + node_subs[session] = nil; + end + end + end + sessions[conn] = nil; + module:log("debug", "MQTT client disconnected"); +end + +module:provides("net", { + default_port = 1883; + listener = mqtt_listener; +}); + +local function tostring_content(item) + return tostring(item[1]); +end + +local data_translators = setmetatable({ + ["data https://prosody.im/protocol/mqtt"] = tostring_content; + ["json urn:xmpp:json:0"] = tostring_content; +}, { + __index = function () return tostring; end; +}); + +function module.add_host(module) + local pubsub_module = hosts[module.host].modules.pubsub + if pubsub_module then + module:log("debug", "MQTT enabled for %s", module.host); + module:depends("pubsub"); + pubsub_services[module.host] = assert(pubsub_module.service); + local subscribers = {}; + pubsub_subscribers[module.host] = subscribers; + local function handle_publish(event) + -- Build MQTT packet + local packet = mqtt.serialize_packet{ + type = "publish"; + id = "\000\000"; + topic = module.host.."/"..event.node; + data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item); + }; + -- Broadcast to subscribers + module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node); + for session in pairs(subscribers[event.node] or {}) do + session.conn:write(packet); + module:log("debug", "Sent to %s", tostring(session)); + end + end + pubsub_services[module.host].events.add_handler("item-published", handle_publish); + function module.unload() + module:log("debug", "MQTT disabled for %s", module.host); + pubsub_module.service.remove_handler("item-published", handle_publish); + pubsub_services[module.host] = nil; + pubsub_subscribers[module.host] = nil; + end + end +end diff -r cc5cbeeb9fc7 -r e0d97eb52ab8 mod_pubsub_mqtt/mqtt.lib.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_pubsub_mqtt/mqtt.lib.lua Sun Dec 01 19:12:08 2013 +0000 @@ -0,0 +1,161 @@ +local bit = require "bit"; + +local stream_mt = {}; +stream_mt.__index = stream_mt; + +function stream_mt:read_bytes(n_bytes) + module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer); + local data = self.buffer; + if not data then + module:log("debug", "No data, pausing."); + data = coroutine.yield(); + module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes); + end + if #data >= n_bytes then + data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1); + elseif #data < n_bytes then + module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes); + self.buffer = data..coroutine.yield(); + module:log("debug", "Now we have %d bytes, reading...", #data); + return self:read_bytes(n_bytes); + end + module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer); + return data; +end + +function stream_mt:read_string() + local len1, len2 = self:read_bytes(2):byte(1,2); + local len = bit.lshift(len1, 8) + len2; + return self:read_bytes(len), len+2; +end + +local packet_type_codes = { + "connect", "connack", + "publish", "puback", "pubrec", "pubrel", "pubcomp", + "subscribe", "subak", "unsubscribe", "unsuback", + "pingreq", "pingresp", + "disconnect" +}; + +function stream_mt:read_packet() + local packet = {}; + local header = self:read_bytes(1):byte(); + packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)]; + packet.dup = bit.band(header, 0x08) == 0x08; + packet.qos = bit.rshift(bit.band(header, 0x06), 1); + packet.retain = bit.band(header, 0x01) == 0x01; + + -- Get length + local length, multiplier = 0, 1; + repeat + local digit = self:read_bytes(1):byte(); + length = length + bit.band(digit, 0x7f)*multiplier; + multiplier = multiplier*128; + until bit.band(digit, 0x80) == 0; + packet.length = length; + if packet.type == "connect" then + if self:read_string() ~= "MQIsdp" then + module:log("warn", "Unexpected packet signature!"); + packet.type = nil; -- Invalid packet + else + packet.version = self:read_bytes(1):byte(); + packet.connect_flags = self:read_bytes(1):byte(); + packet.keepalive_timer = self:read_bytes(1):byte(); + length = length - 11; + end + elseif packet.type == "publish" then + packet.topic = self:read_string(); + length = length - (#packet.topic+2); + if packet.qos == 1 or packet.qos == 2 then + packet.id = self:read_bytes(2); + length = length - 2; + end + elseif packet.type == "subscribe" then + if packet.qos == 1 or packet.qos == 2 then + packet.id = self:read_bytes(2); + length = length - 2; + end + local topics = {}; + while length > 0 do + local topic, len = self:read_string(); + table.insert(topics, topic); + self:read_bytes(1); -- QoS not used + length = length - (len+1); + end + packet.topics = topics; + end + if length > 0 then + packet.data = self:read_bytes(length); + end + return packet; +end + +local function new_parser(self) + return coroutine.wrap(function (data) + self.buffer = data; + while true do + data = coroutine.yield(self:read_packet()); + module:log("debug", "Parser: %d new bytes", #data); + self.buffer = (self.buffer or "")..data; + end + end); +end + +function stream_mt:feed(data) + module:log("debug", "Feeding %d bytes", #data); + local packets = {}; + local packet = self.parser(data); + while packet do + module:log("debug", "Received packet"); + table.insert(packets, packet); + packet = self.parser(""); + end + module:log("debug", "Returning %d packets", #packets); + return packets; +end + +local function new_stream() + local stream = setmetatable({}, stream_mt); + stream.parser = new_parser(stream); + return stream; +end + +local function serialize_packet(packet) + local type_num = 0; + for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now. + if v == packet.type then + type_num = i; + break; + end + end + local header = string.char(bit.lshift(type_num, 4)); + + if packet.type == "publish" then + local topic = packet.topic or ""; + packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data; + elseif packet.type == "suback" then + local t = {}; + for _, topic in ipairs(packet.topics) do + table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000"); + end + packet.data = table.concat(t); + end + + -- Get length + local length = #(packet.data or ""); + repeat + local digit = length%128; + length = math.floor(length/128); + if length > 0 then + digit = bit.bor(digit, 0x80); + end + header = header..string.char(digit); -- FIXME: ... + until length <= 0; + + return header..(packet.data or ""); +end + +return { + new_stream = new_stream; + serialize_packet = serialize_packet; +};