]> hydra-www.ietfng.org Git - acmetensortoys-esp-lua_core/commitdiff
nwfmqtt: refactor fifo interposition
authorNathaniel Wesley Filardo <nwfilardo@gmail.com>
Fri, 4 Oct 2019 17:03:48 +0000 (18:03 +0100)
committerNathaniel Wesley Filardo <nwfilardo@gmail.com>
Thu, 26 Dec 2019 00:35:08 +0000 (00:35 +0000)
Expose the wrapper as its own operation and switch to a single FIFO for
all of sub/unsub/pub operations.

net/nwfmqtt.lua

index feb53f877ae99b98a29ffb38b760fab09ec4248a..0c2ae970d62d70edeaff0afecf76869a5cf28680 100644 (file)
@@ -1,6 +1,77 @@
 -- DEPENDS: fifo, file, mqtt, sjson; nwfnet
 local nwfnet = require "nwfnet"
 local self = {}
+
+-- wrap an existing mqtt client.  This serializes the command directives
+-- (pub/sub/unsub) while making the callbacks useful on a per-call basis and
+-- pushes the asynchronous callbacks (connect/disconn/message/overflow) over
+-- the nwfnet broadcast chains (runnet/runmqtt).  An optional second argument
+-- can replace the default global nwfnet chains.
+--
+-- The underlying C implementation does not do well with multiple
+-- outstanding requests of the same sort.  In order to permit modular
+-- use of a mqtt connection, serialize and handle callbacks here.  A single
+-- fifo is used to reduce memory overhead.
+--
+-- This makes heavy use of the fifo module and its replacement and phantom
+-- element facilities: each command is queued as a thunk on the fifo and,
+-- when popped, replaces itself with a thunk for its callback (if present).
+-- This latter thunk signals to the fifo that it is phantom, thereby
+-- advancing the queue to the next command, if any.
+
+function self.wrap(m, nn) 
+  local mprime = {}
+
+  nn = nn or require "nwfnet"
+
+  m:on("connect", function(_) nn:runnet("mqttconn",mprime) end)
+  m:on("offline", function(_) nn:runnet("mqttdscn",mprime) end)
+  m:on("message", function(_,t,p) nn:runmqtt(mprime,t,p,false) end)
+  m:on("overflow", function(_,t,p) nn:runmqtt(mprime,t,p,true) end)
+
+  local cfifo = (require "fifo").new()
+  local function unthunk(thunk) return thunk() end
+  local function cfifodq() cfifo:dequeue(unthunk) end
+
+  m:on("puback"  , cfifodq)
+  m:on("suback"  , cfifodq)
+  m:on("unsuback", cfifodq)
+
+  mprime._m          = m
+  mprime._f          = cfifo
+  mprime.close       = function(mp,...) mp._m:close()          end -- indirect
+  mprime.lwt         = function(mp,...) mp._m:lwt(...)         end -- indirect
+  mprime.connect     = function(mp,...) mp._m:connect(...)     end -- indirect
+
+  mprime.subscribe   = function(mp, a1, a2, a3)
+      if type(a1) == "table" then
+        mp._f:queue(function()
+          mp._m:subscribe(a1)
+          return (a2 and function() a2(mp); return nil, true end)
+        end, unthunk)
+      else
+        mp._f:queue(function()
+          mp._m:subscribe(a1, a2)
+          return (a3 and function() a3(mp); return nil, true end)
+        end, unthunk)
+      end
+    end
+  mprime.unsubscribe = function(mp, tt, cb)
+      mp._f:queue(function()
+        mp._m:unsubscribe(tt)
+        return (cb and function() cb(mp); return nil, true end)
+      end, unthunk)
+    end
+  mprime.publish     = function(mp, t, p, q, r, cb)
+      mp._f:queue(function()
+        mp._m:publish(t, p, q, r)
+        return (cb and function() cb(mp); return nil, true end)
+      end, unthunk)
+    end
+
+  return mprime
+end
+
 function self.mkclient(cf) -- construct a client with config from json file cf
   local c, k, u, p, l
   if file.open(cf) then
@@ -14,40 +85,8 @@ function self.mkclient(cf) -- construct a client with config from json file cf
     k = k or 1500
     l = l or 0
     local m = mqtt.Client(c,k,u,p,l)
-    local mprime = {}
-
-    m:on("connect", function(_) nwfnet:runnet("mqttconn",mprime) end)
-    m:on("offline", function(_) nwfnet:runnet("mqttdscn",mprime) end)
-    m:on("message", function(_,t,m) nwfnet:runmqtt(mprime,t,m,false) end)
-    m:on("overflow", function(_) nwfnet:runnet(mprime,t,m,true) end)
-
-    local fifoc = require "fifo"
-    local function unthunk(thunk) thunk() end
-    local sfifo = fifoc.new()
-    local ufifo = fifoc.new()
-    local pfifo = fifoc.new()
-
-    m:on("puback"  , function() pfifo:dequeue(unthunk) end)
-    m:on("suback"  , function() sfifo:dequeue(unthunk) end)
-    m:on("unsuback", function() ufifo:dequeue(unthunk) end)
-
-    mprime.close       = function(_,...) m:close()          end -- indirect
-    mprime.lwt         = function(_,...) m:lwt(...)         end -- indirect
-    mprime.connect     = function(_,...) m:connect(...)     end -- indirect
-    mprime.subscribe   = function(_,...)
-        local t = { ... }
-        sfifo:queue(function() m:subscribe(unpack(t)) end, unthunk)
-      end
-    mprime.unsubscribe = function(_,...)
-        local t = { ... }
-        ufifo:queue(function() m:unsubscribe(unpack(t)) end, unthunk)
-      end
-    mprime.publish     = function(_,...)
-        local t = { ... }
-        pfifo:queue(function() m:publish(unpack(t)) end, unthunk)
-      end
 
-    return mprime, u, c
+    return self.wrap(m), u, c
   end
   return nil
 end