]> hydra-www.ietfng.org Git - acmetensortoys-esp-lua_core/commitdiff
nwfmqtt: interpose on pub/sub/unsub with fifo
authorNathaniel Wesley Filardo <nwfilardo@gmail.com>
Sat, 6 Jul 2019 23:26:18 +0000 (00:26 +0100)
committerNathaniel Wesley Filardo <nwfilardo@gmail.com>
Sun, 7 Jul 2019 00:40:59 +0000 (01:40 +0100)
Blech, but this way we don't confuse the underlying mqtt module by
ever trying to, say, subscribe twice before the server's gotten
back to us.

net/nwfmqtt.lua

index 8e38865a32c04eb5a8b784f543a60aea9611065d..feb53f877ae99b98a29ffb38b760fab09ec4248a 100644 (file)
@@ -1,4 +1,4 @@
--- DEPENDS: file, mqtt, sjson; nwfnet
+-- DEPENDS: fifo, file, mqtt, sjson; nwfnet
 local nwfnet = require "nwfnet"
 local self = {}
 function self.mkclient(cf) -- construct a client with config from json file cf
@@ -14,10 +14,40 @@ function self.mkclient(cf) -- construct a client with config from json file cf
     k = k or 1500
     l = l or 0
     local m = mqtt.Client(c,k,u,p,l)
-    m:on("connect", function(c) nwfnet:runnet("mqttconn",c) end)
-    m:on("offline", function(c) nwfnet:runnet("mqttdscn",c) end)
-    m:on("message", function(c,t,m) nwfnet:runmqtt(c,t,m) end)
-    return m, u, c
+    local mprime = {}
+
+    m:on("connect", function(_) nwfnet:runnet("mqttconn",mprime) end)
+    m:on("offline", function(_) nwfnet:runnet("mqttdscn",mprime) end)
+    m:on("message", function(_,t,m) nwfnet:runmqtt(mprime,t,m,false) end)
+    m:on("overflow", function(_) nwfnet:runnet(mprime,t,m,true) end)
+
+    local fifoc = require "fifo"
+    local function unthunk(thunk) thunk() end
+    local sfifo = fifoc.new()
+    local ufifo = fifoc.new()
+    local pfifo = fifoc.new()
+
+    m:on("puback"  , function() pfifo:dequeue(unthunk) end)
+    m:on("suback"  , function() sfifo:dequeue(unthunk) end)
+    m:on("unsuback", function() ufifo:dequeue(unthunk) end)
+
+    mprime.close       = function(_,...) m:close()          end -- indirect
+    mprime.lwt         = function(_,...) m:lwt(...)         end -- indirect
+    mprime.connect     = function(_,...) m:connect(...)     end -- indirect
+    mprime.subscribe   = function(_,...)
+        local t = { ... }
+        sfifo:queue(function() m:subscribe(unpack(t)) end, unthunk)
+      end
+    mprime.unsubscribe = function(_,...)
+        local t = { ... }
+        ufifo:queue(function() m:unsubscribe(unpack(t)) end, unthunk)
+      end
+    mprime.publish     = function(_,...)
+        local t = { ... }
+        pfifo:queue(function() m:publish(unpack(t)) end, unthunk)
+      end
+
+    return mprime, u, c
   end
   return nil
 end