# HG changeset patch # User Matthew Wild # Date 1671229005 0 # Node ID 85a7304cfea1f6ce8f0d966645eea858f7cd5aa4 # Parent 9499b88f3453bf334ffe16bc30edd40da0de044e mod_pubsub_mqtt: Support atom_title payload type This commit adds the ability to publish and subscribe with arbitrary payload types. It has a breaking change, which is that topics are now of the form: HOST/TYPE/NODE Currently supported types are utf8_data, json and atom_title. diff -r 9499b88f3453 -r 85a7304cfea1 mod_pubsub_mqtt/mod_pubsub_mqtt.lua --- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Fri Dec 16 12:12:01 2022 +0000 +++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Fri Dec 16 22:16:45 2022 +0000 @@ -1,8 +1,46 @@ module:set_global(); local mqtt = module:require "mqtt"; +local id = require "util.id"; local st = require "util.stanza"; +local function tostring_content(item) + return tostring(item[1]); +end + +local data_translators = setmetatable({ + utf8 = { + from_item = function (item) + return item:find("{https://prosody.im/protocol/data}data#"); + end; + to_item = function (payload) + return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) + :text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" }) + end; + }; + json = { + from_item = function (item) + return item:find("{urn:xmpp:json:0}json#"); + end; + to_item = function (payload) + return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) + :text_tag("json", payload, { xmlns = "urn:xmpp:json:0" }); + end; + }; + atom_title = { + from_item = function (item) + return item:find("{http://www.w3.org/2005/Atom}entry/title#"); + end; + to_item = function (payload) + return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) + :tag("entry", { xmlns = "http://www.w3.org/2005/Atom" }) + :text_tag("title", payload, { type = "text" }); + end; + }; +}, { + __index = function () return { from_item = tostring }; end; +}); + local pubsub_services = {}; local pubsub_subscribers = {}; local packet_handlers = {}; @@ -33,17 +71,25 @@ function packet_handlers.publish(session, packet) module:log("info", "PUBLISH to %s", packet.topic); - local host, node = packet.topic:match("^([^/]+)/(.+)$"); + local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$"); + if not host then + module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); + return; + end local pubsub = pubsub_services[host]; if not pubsub then module:log("warn", "Unable to locate host/node: %s", packet.topic); return; end - local id = "mqtt"; - local ok, err = pubsub:publish(node, true, id, - st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id }) - :text_tag("data", packet.data, { xmlns = "https://prosody.im/protocol/data" }) - ); + + local payload_translator = data_translators[payload_type]; + if not payload_translator or not payload_translator.to_item then + module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic); + return; + end + + local payload_item = payload_translator.to_item(packet.data); + local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item); if not ok then module:log("warn", "Error publishing MQTT data: %s", tostring(err)); end @@ -52,7 +98,11 @@ function packet_handlers.subscribe(session, packet) for _, topic in ipairs(packet.topics) do module:log("info", "SUBSCRIBE to %s", topic); - local host, node = topic:match("^([^/]+)/(.+)$"); + local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); + if not host then + module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); + return; + end local pubsub = pubsub_subscribers[host]; if not pubsub then module:log("warn", "Unable to locate host/node: %s", topic); @@ -63,8 +113,8 @@ node_subs = {}; pubsub[node] = node_subs; end - session.subscriptions[topic] = true; - node_subs[session] = true; + session.subscriptions[topic] = payload_type; + node_subs[session] = payload_type; end end @@ -116,17 +166,6 @@ listener = mqtt_listener; }); -local function tostring_content(item) - return tostring(item[1]); -end - -local data_translators = setmetatable({ - ["data https://prosody.im/protocol/data"] = tostring_content; - ["json urn:xmpp:json:0"] = tostring_content; -}, { - __index = function () return tostring; end; -}); - function module.add_host(module) local pubsub_module = hosts[module.host].modules.pubsub if pubsub_module then @@ -137,16 +176,22 @@ pubsub_subscribers[module.host] = subscribers; local function handle_publish(event) -- Build MQTT packet - local packet = mqtt.serialize_packet{ - type = "publish"; - id = "\000\000"; - topic = module.host.."/"..event.node; - data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item); - }; + local packet_types = setmetatable({}, { + __index = function (self, payload_type) + local packet = mqtt.serialize_packet{ + type = "publish"; + id = "\000\000"; + topic = module.host.."/"..payload_type.."/"..event.node; + data = data_translators[payload_type].from_item(event.item) or ""; + }; + rawset(self, packet); + return packet; + end; + }); -- Broadcast to subscribers - module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node); - for session in pairs(subscribers[event.node] or {}) do - session.conn:write(packet); + module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node); + for session, payload_type in pairs(subscribers[event.node] or {}) do + session.conn:write(packet_types[payload_type]); module:log("debug", "Sent to %s", tostring(session)); end end