comparison mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 1240:e0d97eb52ab8

mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author Matthew Wild <mwild1@gmail.com>
date Sun, 01 Dec 2013 19:12:08 +0000
parents
children c2bf6b2102aa
comparison
equal deleted inserted replaced
1239:cc5cbeeb9fc7 1240:e0d97eb52ab8
1 module:set_global();
2
3 local mqtt = module:require "mqtt";
4 local st = require "util.stanza";
5
6 local pubsub_services = {};
7 local pubsub_subscribers = {};
8 local packet_handlers = {};
9
10 function handle_packet(session, packet)
11 module:log("warn", "MQTT packet received! Length: %d", packet.length);
12 for k,v in pairs(packet) do
13 module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
14 end
15 local handler = packet_handlers[packet.type];
16 if not handler then
17 module:log("warn", "Unhandled command: %s", tostring(packet.type));
18 return;
19 end
20 handler(session, packet);
21 end
22
23 function packet_handlers.connect(session, packet)
24 session.conn:write(mqtt.serialize_packet{
25 type = "connack";
26 data = string.char(0x00, 0x00);
27 });
28 end
29
30 function packet_handlers.disconnect(session, packet)
31 session.conn:close();
32 end
33
34 function packet_handlers.publish(session, packet)
35 module:log("warn", "PUBLISH to %s", packet.topic);
36 local host, node = packet.topic:match("^([^/]+)/(.+)$");
37 local pubsub = pubsub_services[host];
38 if not pubsub then
39 module:log("warn", "Unable to locate host/node: %s", packet.topic);
40 return;
41 end
42 local id = "mqtt";
43 local ok, err = pubsub:publish(node, true, id,
44 st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" })
45 :text(packet.data)
46 );
47 if not ok then
48 module:log("warn", "Error publishing MQTT data: %s", tostring(err));
49 end
50 end
51
52 function packet_handlers.subscribe(session, packet)
53 for _, topic in ipairs(packet.topics) do
54 module:log("warn", "SUBSCRIBE to %s", topic);
55 local host, node = topic:match("^([^/]+)/(.+)$");
56 local pubsub = pubsub_subscribers[host];
57 if not pubsub then
58 module:log("warn", "Unable to locate host/node: %s", topic);
59 return;
60 end
61 local node_subs = pubsub[node];
62 if not node_subs then
63 node_subs = {};
64 pubsub[node] = node_subs;
65 end
66 session.subscriptions[topic] = true;
67 node_subs[session] = true;
68 end
69
70 end
71
72 function packet_handlers.pingreq(session, packet)
73 session.conn:write(mqtt.serialize_packet{type = "pingresp"});
74 end
75
76 local sessions = {};
77
78 local mqtt_listener = {};
79
80 function mqtt_listener.onconnect(conn)
81 sessions[conn] = {
82 conn = conn;
83 stream = mqtt.new_stream();
84 subscriptions = {};
85 };
86 end
87
88 function mqtt_listener.onincoming(conn, data)
89 local session = sessions[conn];
90 if session then
91 local packets = session.stream:feed(data);
92 for i = 1, #packets do
93 handle_packet(session, packets[i]);
94 end
95 end
96 end
97
98 function mqtt_listener.ondisconnect(conn)
99 local session = sessions[conn];
100 for topic in pairs(session.subscriptions) do
101 local host, node = topic:match("^([^/]+)/(.+)$");
102 local subs = pubsub_subscribers[host];
103 if subs then
104 local node_subs = subs[node];
105 if node_subs then
106 node_subs[session] = nil;
107 end
108 end
109 end
110 sessions[conn] = nil;
111 module:log("debug", "MQTT client disconnected");
112 end
113
114 module:provides("net", {
115 default_port = 1883;
116 listener = mqtt_listener;
117 });
118
119 local function tostring_content(item)
120 return tostring(item[1]);
121 end
122
123 local data_translators = setmetatable({
124 ["data https://prosody.im/protocol/mqtt"] = tostring_content;
125 ["json urn:xmpp:json:0"] = tostring_content;
126 }, {
127 __index = function () return tostring; end;
128 });
129
130 function module.add_host(module)
131 local pubsub_module = hosts[module.host].modules.pubsub
132 if pubsub_module then
133 module:log("debug", "MQTT enabled for %s", module.host);
134 module:depends("pubsub");
135 pubsub_services[module.host] = assert(pubsub_module.service);
136 local subscribers = {};
137 pubsub_subscribers[module.host] = subscribers;
138 local function handle_publish(event)
139 -- Build MQTT packet
140 local packet = mqtt.serialize_packet{
141 type = "publish";
142 id = "\000\000";
143 topic = module.host.."/"..event.node;
144 data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item);
145 };
146 -- Broadcast to subscribers
147 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
148 for session in pairs(subscribers[event.node] or {}) do
149 session.conn:write(packet);
150 module:log("debug", "Sent to %s", tostring(session));
151 end
152 end
153 pubsub_services[module.host].events.add_handler("item-published", handle_publish);
154 function module.unload()
155 module:log("debug", "MQTT disabled for %s", module.host);
156 pubsub_module.service.remove_handler("item-published", handle_publish);
157 pubsub_services[module.host] = nil;
158 pubsub_subscribers[module.host] = nil;
159 end
160 end
161 end