view mod_pubsub_mqtt/mqtt.lib.lua @ 4260:c539334dd01a

mod_http_oauth2: Rescope oauth client config into users' storage This produces client_id of the form owner@host/random and prevents clients from being deleted by registering an account with the same name and then deleting the account, as well as having the client automatically be deleted when the owner account is removed. On one hand, this leaks the bare JID of the creator to users. On the other hand, it makes it obvious who made the oauth application. This module is experimental and only for developers, so this can be changed if a better method comes up.
author Kim Alvefur <zash@zash.se>
date Sat, 21 Nov 2020 23:55:10 +0100
parents 7dbde05b48a9
children d2a84e6aed2b
line wrap: on
line source

local bit = require "bit";

local stream_mt = {};
stream_mt.__index = stream_mt;

function stream_mt:read_bytes(n_bytes)
	module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
	local data = self.buffer;
	if not data then
		module:log("debug", "No data, pausing.");
		data = coroutine.yield();
		module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
	end
	if #data >= n_bytes then
		data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
	elseif #data < n_bytes then
		module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
		self.buffer = data..coroutine.yield();
		module:log("debug", "Now we have %d bytes, reading...", #data);
		return self:read_bytes(n_bytes);
	end
	module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
	return data;
end

function stream_mt:read_string()
	local len1, len2 = self:read_bytes(2):byte(1,2);
	local len = bit.lshift(len1, 8) + len2;
	return self:read_bytes(len), len+2;
end

local packet_type_codes = {
	"connect", "connack",
	"publish", "puback", "pubrec", "pubrel", "pubcomp",
	"subscribe", "subak", "unsubscribe", "unsuback",
	"pingreq", "pingresp",
	"disconnect"
};

function stream_mt:read_packet()
	local packet = {};
	local header = self:read_bytes(1):byte();
	packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
	packet.dup = bit.band(header, 0x08) == 0x08;
	packet.qos = bit.rshift(bit.band(header, 0x06), 1);
	packet.retain = bit.band(header, 0x01) == 0x01;

	-- Get length
	local length, multiplier = 0, 1;
	repeat
		local digit = self:read_bytes(1):byte();
		length = length + bit.band(digit, 0x7f)*multiplier;
		multiplier = multiplier*128;
	until bit.band(digit, 0x80) == 0;
	packet.length = length;
	if packet.type == "connect" then
		if self:read_string() ~= "MQIsdp" then
			module:log("warn", "Unexpected packet signature!");
			packet.type = nil; -- Invalid packet
		else
			packet.version = self:read_bytes(1):byte();
			packet.connect_flags = self:read_bytes(1):byte();
			packet.keepalive_timer = self:read_bytes(1):byte();
			length = length - 11;
		end
	elseif packet.type == "publish" then
		packet.topic = self:read_string();
		length = length - (#packet.topic+2);
		if packet.qos == 1 or packet.qos == 2 then
			packet.id = self:read_bytes(2);
			length = length - 2;
		end
	elseif packet.type == "subscribe" then
		if packet.qos == 1 or packet.qos == 2 then
			packet.id = self:read_bytes(2);
			length = length - 2;
		end
		local topics = {};
		while length > 0 do
			local topic, len = self:read_string();
			table.insert(topics, topic);
			self:read_bytes(1); -- QoS not used
			length = length - (len+1);
		end
		packet.topics = topics;
	end
	if length > 0 then
		packet.data = self:read_bytes(length);
	end
	return packet;
end

local function new_parser(self)
	return coroutine.wrap(function (data)
		self.buffer = data;
		while true do
			data = coroutine.yield(self:read_packet());
			module:log("debug", "Parser: %d new bytes", #data);
			self.buffer = (self.buffer or "")..data;
		end
	end);
end

function stream_mt:feed(data)
	module:log("debug", "Feeding %d bytes", #data);
	local packets = {};
	local packet = self.parser(data);
	while packet do
		module:log("debug", "Received packet");
		table.insert(packets, packet);
		packet = self.parser("");
	end
	module:log("debug", "Returning %d packets", #packets);
	return packets;
end

local function new_stream()
	local stream = setmetatable({}, stream_mt);
	stream.parser = new_parser(stream);
	return stream;
end

local function serialize_packet(packet)
	local type_num = 0;
	for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
		if v == packet.type then
			type_num = i;
			break;
		end
	end
	local header = string.char(bit.lshift(type_num, 4));

	if packet.type == "publish" then
		local topic = packet.topic or "";
		packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
	elseif packet.type == "suback" then
		local t = {};
		for _, topic in ipairs(packet.topics) do
			table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
		end
		packet.data = table.concat(t);
	end

	-- Get length
	local length = #(packet.data or "");
	repeat
		local digit = length%128;
		length = math.floor(length/128);
		if length > 0 then
			digit = bit.bor(digit, 0x80);
		end
		header = header..string.char(digit); -- FIXME: ...
	until length <= 0;

	return header..(packet.data or "");
end

return {
	new_stream = new_stream;
	serialize_packet = serialize_packet;
};