--- DEPENDS: file, mqtt, sjson; nwfnet
+-- DEPENDS: fifo, file, mqtt, sjson; nwfnet
local nwfnet = require "nwfnet"
local self = {}
function self.mkclient(cf) -- construct a client with config from json file cf
k = k or 1500
l = l or 0
local m = mqtt.Client(c,k,u,p,l)
- m:on("connect", function(c) nwfnet:runnet("mqttconn",c) end)
- m:on("offline", function(c) nwfnet:runnet("mqttdscn",c) end)
- m:on("message", function(c,t,m) nwfnet:runmqtt(c,t,m) end)
- return m, u, c
+ local mprime = {}
+
+ m:on("connect", function(_) nwfnet:runnet("mqttconn",mprime) end)
+ m:on("offline", function(_) nwfnet:runnet("mqttdscn",mprime) end)
+ m:on("message", function(_,t,m) nwfnet:runmqtt(mprime,t,m,false) end)
+ m:on("overflow", function(_) nwfnet:runnet(mprime,t,m,true) end)
+
+ local fifoc = require "fifo"
+ local function unthunk(thunk) thunk() end
+ local sfifo = fifoc.new()
+ local ufifo = fifoc.new()
+ local pfifo = fifoc.new()
+
+ m:on("puback" , function() pfifo:dequeue(unthunk) end)
+ m:on("suback" , function() sfifo:dequeue(unthunk) end)
+ m:on("unsuback", function() ufifo:dequeue(unthunk) end)
+
+ mprime.close = function(_,...) m:close() end -- indirect
+ mprime.lwt = function(_,...) m:lwt(...) end -- indirect
+ mprime.connect = function(_,...) m:connect(...) end -- indirect
+ mprime.subscribe = function(_,...)
+ local t = { ... }
+ sfifo:queue(function() m:subscribe(unpack(t)) end, unthunk)
+ end
+ mprime.unsubscribe = function(_,...)
+ local t = { ... }
+ ufifo:queue(function() m:unsubscribe(unpack(t)) end, unthunk)
+ end
+ mprime.publish = function(_,...)
+ local t = { ... }
+ pfifo:queue(function() m:publish(unpack(t)) end, unthunk)
+ end
+
+ return mprime, u, c
end
return nil
end