Mercurial > prosody-modules
view mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 5326:dc058fcc3fe3
mod_audit: Improve filtering options and add documentation to README
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 07 Apr 2023 13:44:18 +0100 |
parents | 85a7304cfea1 |
children | 801f64e6d4e9 |
line wrap: on
line source
module:set_global(); local mqtt = module:require "mqtt"; local id = require "util.id"; local st = require "util.stanza"; local function tostring_content(item) return tostring(item[1]); end local data_translators = setmetatable({ utf8 = { from_item = function (item) return item:find("{https://prosody.im/protocol/data}data#"); end; to_item = function (payload) return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) :text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" }) end; }; json = { from_item = function (item) return item:find("{urn:xmpp:json:0}json#"); end; to_item = function (payload) return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) :text_tag("json", payload, { xmlns = "urn:xmpp:json:0" }); end; }; atom_title = { from_item = function (item) return item:find("{http://www.w3.org/2005/Atom}entry/title#"); end; to_item = function (payload) return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) :tag("entry", { xmlns = "http://www.w3.org/2005/Atom" }) :text_tag("title", payload, { type = "text" }); end; }; }, { __index = function () return { from_item = tostring }; end; }); local pubsub_services = {}; local pubsub_subscribers = {}; local packet_handlers = {}; function handle_packet(session, packet) module:log("debug", "MQTT packet received! Length: %d", packet.length); for k,v in pairs(packet) do module:log("debug", "MQTT %s: %s", tostring(k), tostring(v)); end local handler = packet_handlers[packet.type]; if not handler then module:log("warn", "Unhandled command: %s", tostring(packet.type)); return; end handler(session, packet); end function packet_handlers.connect(session, packet) session.conn:write(mqtt.serialize_packet{ type = "connack"; data = string.char(0x00, 0x00); }); end function packet_handlers.disconnect(session, packet) session.conn:close(); end function packet_handlers.publish(session, packet) module:log("info", "PUBLISH to %s", packet.topic); local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$"); if not host then module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); return; end local pubsub = pubsub_services[host]; if not pubsub then module:log("warn", "Unable to locate host/node: %s", packet.topic); return; end local payload_translator = data_translators[payload_type]; if not payload_translator or not payload_translator.to_item then module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic); return; end local payload_item = payload_translator.to_item(packet.data); local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item); if not ok then module:log("warn", "Error publishing MQTT data: %s", tostring(err)); end end function packet_handlers.subscribe(session, packet) for _, topic in ipairs(packet.topics) do module:log("info", "SUBSCRIBE to %s", topic); local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); if not host then module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); return; end local pubsub = pubsub_subscribers[host]; if not pubsub then module:log("warn", "Unable to locate host/node: %s", topic); return; end local node_subs = pubsub[node]; if not node_subs then node_subs = {}; pubsub[node] = node_subs; end session.subscriptions[topic] = payload_type; node_subs[session] = payload_type; end end function packet_handlers.pingreq(session, packet) session.conn:write(mqtt.serialize_packet{type = "pingresp"}); end local sessions = {}; local mqtt_listener = {}; function mqtt_listener.onconnect(conn) sessions[conn] = { conn = conn; stream = mqtt.new_stream(); subscriptions = {}; }; end function mqtt_listener.onincoming(conn, data) local session = sessions[conn]; if session then local packets = session.stream:feed(data); for i = 1, #packets do handle_packet(session, packets[i]); end end end function mqtt_listener.ondisconnect(conn) local session = sessions[conn]; for topic in pairs(session.subscriptions) do local host, node = topic:match("^([^/]+)/(.+)$"); local subs = pubsub_subscribers[host]; if subs then local node_subs = subs[node]; if node_subs then node_subs[session] = nil; end end end sessions[conn] = nil; module:log("debug", "MQTT client disconnected"); end module:provides("net", { default_port = 1883; listener = mqtt_listener; }); function module.add_host(module) local pubsub_module = hosts[module.host].modules.pubsub if pubsub_module then module:log("debug", "MQTT enabled for %s", module.host); module:depends("pubsub"); pubsub_services[module.host] = assert(pubsub_module.service); local subscribers = {}; pubsub_subscribers[module.host] = subscribers; local function handle_publish(event) -- Build MQTT packet local packet_types = setmetatable({}, { __index = function (self, payload_type) local packet = mqtt.serialize_packet{ type = "publish"; id = "\000\000"; topic = module.host.."/"..payload_type.."/"..event.node; data = data_translators[payload_type].from_item(event.item) or ""; }; rawset(self, packet); return packet; end; }); -- Broadcast to subscribers module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node); for session, payload_type in pairs(subscribers[event.node] or {}) do session.conn:write(packet_types[payload_type]); module:log("debug", "Sent to %s", tostring(session)); end end pubsub_services[module.host].events.add_handler("item-published", handle_publish); function module.unload() module:log("debug", "MQTT disabled for %s", module.host); pubsub_module.service.remove_handler("item-published", handle_publish); pubsub_services[module.host] = nil; pubsub_subscribers[module.host] = nil; end end end