From 9fd67755baceba3065542061fd861fef7be915f0 Mon Sep 17 00:00:00 2001 From: Nathaniel Wesley Filardo Date: Sun, 7 Jul 2019 00:26:18 +0100 Subject: [PATCH] nwfmqtt: interpose on pub/sub/unsub with fifo Blech, but this way we don't confuse the underlying mqtt module by ever trying to, say, subscribe twice before the server's gotten back to us. --- net/nwfmqtt.lua | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/net/nwfmqtt.lua b/net/nwfmqtt.lua index 8e38865..feb53f8 100644 --- a/net/nwfmqtt.lua +++ b/net/nwfmqtt.lua @@ -1,4 +1,4 @@ --- DEPENDS: file, mqtt, sjson; nwfnet +-- DEPENDS: fifo, file, mqtt, sjson; nwfnet local nwfnet = require "nwfnet" local self = {} function self.mkclient(cf) -- construct a client with config from json file cf @@ -14,10 +14,40 @@ function self.mkclient(cf) -- construct a client with config from json file cf k = k or 1500 l = l or 0 local m = mqtt.Client(c,k,u,p,l) - m:on("connect", function(c) nwfnet:runnet("mqttconn",c) end) - m:on("offline", function(c) nwfnet:runnet("mqttdscn",c) end) - m:on("message", function(c,t,m) nwfnet:runmqtt(c,t,m) end) - return m, u, c + local mprime = {} + + m:on("connect", function(_) nwfnet:runnet("mqttconn",mprime) end) + m:on("offline", function(_) nwfnet:runnet("mqttdscn",mprime) end) + m:on("message", function(_,t,m) nwfnet:runmqtt(mprime,t,m,false) end) + m:on("overflow", function(_) nwfnet:runnet(mprime,t,m,true) end) + + local fifoc = require "fifo" + local function unthunk(thunk) thunk() end + local sfifo = fifoc.new() + local ufifo = fifoc.new() + local pfifo = fifoc.new() + + m:on("puback" , function() pfifo:dequeue(unthunk) end) + m:on("suback" , function() sfifo:dequeue(unthunk) end) + m:on("unsuback", function() ufifo:dequeue(unthunk) end) + + mprime.close = function(_,...) m:close() end -- indirect + mprime.lwt = function(_,...) m:lwt(...) end -- indirect + mprime.connect = function(_,...) m:connect(...) end -- indirect + mprime.subscribe = function(_,...) + local t = { ... } + sfifo:queue(function() m:subscribe(unpack(t)) end, unthunk) + end + mprime.unsubscribe = function(_,...) + local t = { ... } + ufifo:queue(function() m:unsubscribe(unpack(t)) end, unthunk) + end + mprime.publish = function(_,...) + local t = { ... } + pfifo:queue(function() m:publish(unpack(t)) end, unthunk) + end + + return mprime, u, c end return nil end -- 2.50.1