From 6be9cecaa294ca11d46ca8897ec94cd18bd10f14 Mon Sep 17 00:00:00 2001 From: Nathaniel Wesley Filardo Date: Fri, 4 Oct 2019 18:03:48 +0100 Subject: [PATCH] nwfmqtt: refactor fifo interposition Expose the wrapper as its own operation and switch to a single FIFO for all of sub/unsub/pub operations. --- net/nwfmqtt.lua | 105 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 33 deletions(-) diff --git a/net/nwfmqtt.lua b/net/nwfmqtt.lua index feb53f8..0c2ae97 100644 --- a/net/nwfmqtt.lua +++ b/net/nwfmqtt.lua @@ -1,6 +1,77 @@ -- DEPENDS: fifo, file, mqtt, sjson; nwfnet local nwfnet = require "nwfnet" local self = {} + +-- wrap an existing mqtt client. This serializes the command directives +-- (pub/sub/unsub) while making the callbacks useful on a per-call basis and +-- pushes the asynchronous callbacks (connect/disconn/message/overflow) over +-- the nwfnet broadcast chains (runnet/runmqtt). An optional second argument +-- can replace the default global nwfnet chains. +-- +-- The underlying C implementation does not do well with multiple +-- outstanding requests of the same sort. In order to permit modular +-- use of a mqtt connection, serialize and handle callbacks here. A single +-- fifo is used to reduce memory overhead. +-- +-- This makes heavy use of the fifo module and its replacement and phantom +-- element facilities: each command is queued as a thunk on the fifo and, +-- when popped, replaces itself with a thunk for its callback (if present). +-- This latter thunk signals to the fifo that it is phantom, thereby +-- advancing the queue to the next command, if any. + +function self.wrap(m, nn) + local mprime = {} + + nn = nn or require "nwfnet" + + m:on("connect", function(_) nn:runnet("mqttconn",mprime) end) + m:on("offline", function(_) nn:runnet("mqttdscn",mprime) end) + m:on("message", function(_,t,p) nn:runmqtt(mprime,t,p,false) end) + m:on("overflow", function(_,t,p) nn:runmqtt(mprime,t,p,true) end) + + local cfifo = (require "fifo").new() + local function unthunk(thunk) return thunk() end + local function cfifodq() cfifo:dequeue(unthunk) end + + m:on("puback" , cfifodq) + m:on("suback" , cfifodq) + m:on("unsuback", cfifodq) + + mprime._m = m + mprime._f = cfifo + mprime.close = function(mp,...) mp._m:close() end -- indirect + mprime.lwt = function(mp,...) mp._m:lwt(...) end -- indirect + mprime.connect = function(mp,...) mp._m:connect(...) end -- indirect + + mprime.subscribe = function(mp, a1, a2, a3) + if type(a1) == "table" then + mp._f:queue(function() + mp._m:subscribe(a1) + return (a2 and function() a2(mp); return nil, true end) + end, unthunk) + else + mp._f:queue(function() + mp._m:subscribe(a1, a2) + return (a3 and function() a3(mp); return nil, true end) + end, unthunk) + end + end + mprime.unsubscribe = function(mp, tt, cb) + mp._f:queue(function() + mp._m:unsubscribe(tt) + return (cb and function() cb(mp); return nil, true end) + end, unthunk) + end + mprime.publish = function(mp, t, p, q, r, cb) + mp._f:queue(function() + mp._m:publish(t, p, q, r) + return (cb and function() cb(mp); return nil, true end) + end, unthunk) + end + + return mprime +end + function self.mkclient(cf) -- construct a client with config from json file cf local c, k, u, p, l if file.open(cf) then @@ -14,40 +85,8 @@ function self.mkclient(cf) -- construct a client with config from json file cf k = k or 1500 l = l or 0 local m = mqtt.Client(c,k,u,p,l) - local mprime = {} - - m:on("connect", function(_) nwfnet:runnet("mqttconn",mprime) end) - m:on("offline", function(_) nwfnet:runnet("mqttdscn",mprime) end) - m:on("message", function(_,t,m) nwfnet:runmqtt(mprime,t,m,false) end) - m:on("overflow", function(_) nwfnet:runnet(mprime,t,m,true) end) - - local fifoc = require "fifo" - local function unthunk(thunk) thunk() end - local sfifo = fifoc.new() - local ufifo = fifoc.new() - local pfifo = fifoc.new() - - m:on("puback" , function() pfifo:dequeue(unthunk) end) - m:on("suback" , function() sfifo:dequeue(unthunk) end) - m:on("unsuback", function() ufifo:dequeue(unthunk) end) - - mprime.close = function(_,...) m:close() end -- indirect - mprime.lwt = function(_,...) m:lwt(...) end -- indirect - mprime.connect = function(_,...) m:connect(...) end -- indirect - mprime.subscribe = function(_,...) - local t = { ... } - sfifo:queue(function() m:subscribe(unpack(t)) end, unthunk) - end - mprime.unsubscribe = function(_,...) - local t = { ... } - ufifo:queue(function() m:unsubscribe(unpack(t)) end, unthunk) - end - mprime.publish = function(_,...) - local t = { ... } - pfifo:queue(function() m:publish(unpack(t)) end, unthunk) - end - return mprime, u, c + return self.wrap(m), u, c end return nil end -- 2.50.1