-- (placeholders for) callbacks to observers, that cannot otherwise act as
-- ordinary fifo elements do.
--
+-- k is also given a second argument, a boolean indicating that this is the
+-- last element in the queue.
+--
-- If the queue is empty, do not invoke k but flag it to enable immediate
-- execution at the next call to queue.
--
local function dequeue(q,k)
if #q > 0
then
- local new, again = k(q[1])
+ local new, again = k(q[1], #q == 1)
if new == nil
then table.remove(q,1)
if again then return dequeue(q, k) end -- note tail call
-- Wrap a two-staged fifo around a socket's send, borrowing TerryE's
-- scheme from NodeMCU's lua_examples/telnet/telnet.lua .
+--
+-- Our fifos can take functions; these can be useful for either lazy
+-- generators or callbacks for parts of the stream having been sent.
local BIGTHRESH = 256 -- how big is a "big" string?
local SPLITSLOP = 16 -- any slop in the big question?
local FSMALLLIM = 32 -- maximum number of small strings held
+local COALIMIT = 3
local concat = table.concat
local insert = table.insert
+local gc = collectgarbage
local fifo = OVL.fifo()
return function(sock)
- local ssend = function(s)
+ local fsmall, lsmall, fbig = {}, 0, fifo()
+
+ local ssla, sslan = nil, 0
+ local ssend = function(s,islast)
ns = nil
- if type(s) == "function" then s, ns = s() end
+
+ -- Optimistically, try coalescing FIFO dequeues. But, don't try to
+ -- coalesce function outputs, since functions might be staging their
+ -- execution on the send event implied by being called.
+
+ if type(s) == "function" then
+ if sslan ~= 0 then
+ sock:send(ssla)
+ ssla, sslan = nil, 0; gc()
+ return s, false -- stay as is and wait for :on("sent")
+ end
+ s, ns = s()
+ elseif type(s) == "string" and sslan < COALIMIT then
+ if sslan == 0
+ then ssla, sslan = s, 1
+ else ssla, sslan = ssla .. s, sslan + 1
+ end
+ if islast then
+ -- this is shipping; if there's room, steal the small fifo, too
+ if sslan < COALIMIT then
+ sock:send(ssla .. concat(fsmall))
+ fsmall, lsmall = {}, 0
+ else
+ sock:send(ssla)
+ end
+ ssla, sslan = "", 0; gc()
+ return nil, false
+ else
+ return nil, true
+ end
+ end
+
if s ~= nil then
- sock:send(s)
+ if sslan == 0 then sock:send(s) else sock:send(ssla .. s) end
+ ssla, sslan = nil, 0; gc()
return ns or nil, false
+ elseif sslan ~= 0 then
+ assert (ns == nil)
+ sock:send(ssla)
+ ssla, sslan = nil, 0; gc()
+ return nil, false
else
+ assert (ns == nil)
return nil, true
end
end
- local fsmall, lsmall, fbig = {}, 0, fifo()
-- Move fsmall to fbig; might send if fbig empty
local function promote()
-- don't sweat the petty things
if s == nil or s == "" then return end
- -- Our fifos can take functions; these can be useful for either lazy
- -- generators or callbacks for parts of the stream having been sent.
- -- Go ahead and queue this thing in the right place.
+ -- Function? Go ahead and queue this thing in the right place.
if type(s) == "function" then promote(); fbig:queue(s, ssend); return; end
+ s = tostring(s)
+
-- small fifo would overfill? promote it
if lsmall + #s > BIGTHRESH or #fsmall >= FSMALLLIM then promote() end
-- big string? chunk and queue big components immediately
- -- behind any promotion that just took place
+ -- behind any promotion that just took place, but cork sending in
+ -- case we're the head of line
+ local corked = false
while #s > BIGTHRESH + SPLITSLOP do
local pfx
- pfx, s = s:sub(1,256), s:sub(257)
- fbig:queue(pfx, ssend)
+ pfx, s = s:sub(1,BIGTHRESH), s:sub(BIGTHRESH+1)
+ fbig:queue(pfx, function(t) corked = true; return t end)
end
- -- Big string? queue
+ -- Big string? queue and maybe tx now
if #s > BIGTHRESH then fbig:queue(s, ssend)
- -- small and empty line; start txing now. (no corking)
+ -- small and empty line; start txing now. (maybe no corking)
elseif fbig._go and lsmall == 0 then fbig:queue(s, ssend)
-- small and queue already moving
else insert(fsmall, s) ; lsmall = lsmall + #s
end
+
+ -- if it happened that we corked the transmission above, uncork now
+ if corked then sendnext() end
end
end
OVL.fifo = function() return dofile("./fifo/fifo.lua") end
package.loaded["fifosock"] = dofile("./net/fifosock.lua")
-verbose = false
+verbose = 0
-vprint = verbose and print or function() end
+vprint = (verbose > 0) and print or function() end
outs = {}
fakesock = {
cb = nil,
on = function(this, _, cb) vprint("CBSET") this.cb = cb end,
- send = function(this, s) vprint("SEND", verbose and s) table.insert(outs, s) end,
+ send = function(this, s) vprint("SEND", (verbose > 1) and s) table.insert(outs, s) end,
}
function sent() vprint("CB") fakesock.cb() end
fsend = require "fifosock" (fakesock)
+function nocoal() fsend(function() return nil end) end
function fcheck(x)
- vprint ("CHECK", verbose and x)
+ vprint ("CHECK", (verbose > 1) and x)
assert (#outs > 0)
assert (x == outs[1])
table.remove(outs, 1)
-- Hit default FSMALLLIM while building up
fsendc("abracadabra lots small")
-for i = 1, 34 do fsend("a") end
+for i = 1, 32 do fsend("a") end
+nocoal()
+for i = 1, 4 do fsend("a") end
sent() ; fcheck(string.rep("a", 32))
-sent() ; fcheck("aa")
+sent() ; fcheck(string.rep("a", 4))
sent() ; fchecke()
-- Hit string length while building up
fsendc("abracadabra overlong")
for i = 1, 10 do fsend(string.rep("a",32)) end
-sent() ; fcheck(string.rep("a", 256))
-sent() ; fcheck(string.rep("a", 64))
+sent() ; fcheck(string.rep("a", 320))
sent() ; fchecke()
-- Hit neither before sending a big string
fsendc("abracadabra mid long")
for i = 1, 6 do fsend(string.rep("a",32)) end
fsend(string.rep("b", 256))
+nocoal()
for i = 1, 6 do fsend(string.rep("c",32)) end
-sent() ; fcheck(string.rep("a", 192))
-sent() ; fcheck(string.rep("b", 256))
+sent() ; fcheck(string.rep("a", 192) .. string.rep("b", 256))
sent() ; fcheck(string.rep("c", 192))
sent() ; fchecke()
--- send a huge string
-fsend(string.rep("a",256) .. string.rep("b", 256) .. string.rep("c", 260))
-fcheck(string.rep("a",256))
-sent() ; fcheck(string.rep("b",256))
-sent() ; fcheck(string.rep("c",260))
+-- send a huge string, verify that it coalesces
+fsendc(string.rep("a",256) .. string.rep("b", 256) .. string.rep("c", 260))
+sent() ; fchecke()
+
+-- send a huge string, verify that it coalesces save for the short bit at the end
+fsend(string.rep("a",256) .. string.rep("b", 256) .. string.rep("c", 256) .. string.rep("d",256))
+fsend("e")
+fcheck(string.rep("a",256) .. string.rep("b", 256) .. string.rep("c", 256))
+sent() ; fcheck(string.rep("d",256) .. "e")
+sent() ; fchecke()
+
+-- send enough that our 4x lookahead still leaves something in the queue
+fsend(string.rep("a",512) .. string.rep("b", 512) .. string.rep("c", 512))
+fcheck(string.rep("a",512) .. string.rep("b", 512))
+sent() ; fcheck(string.rep("c",512))
sent() ; fchecke()
-- test a lazy generator