Show Source

This is a miniature, woefully incomplete implementation of Atze van der Ploeg’s and Koen Claessen’s Practical Principled FRP (http://www.cse.chalmers.se/~atze/papers/prprfrp.pdf).

I deliberately present things a little differently from the paper. Their library (at https://github.com/atzeus/FRPNow) offers yet another presentation; much of the errata I point out below is, I believe, also fixed (differently) in the library.

This file’s raw source is a literate Haskell file. Loading it, one can run the paper’s example with, e.g., runNow (test64 10).

Header Material

As per usual, we begin with a pile of pragmas and imports. The module exports are organized by figure:

> {-# LANGUAGE ExistentialQuantification #-}
> {-# LANGUAGE FlexibleContexts #-}
> {-# LANGUAGE GeneralizedNewtypeDeriving #-}
> {-# LANGUAGE InstanceSigs #-}
> {-# LANGUAGE Rank2Types #-}
> {-# LANGUAGE ScopedTypeVariables #-}
>
> module Main (
>  -- Figure 1
>  E, B, never, switch, whenJust,
>
>  -- Figure 3
>  Now, async, sample, planNow, runNow,
>
>  -- Additional, harmless to export
>  minTime, whenB,
>
>  -- Tests, exported to suppress export warnings
>  testImmediateExit, testHang, testOneTick, test64
> ) where
>
> import           Control.Concurrent
> import           Control.Monad
> import           Control.Monad.Trans
> import           Control.Monad.Reader
> import           Control.Monad.Writer
> import           Data.Bifunctor
> import           Data.IORef
> import qualified Data.Sequence as S
> import           Debug.Trace
> import           System.CPUTime
> import           System.IO.Unsafe
> import           System.Mem.Weak

Generic Memoization

“Memoization” here is essentially the same as graph reduction / the STG machine’s operation. One might imagine that there would be a way to hook that existing machinery, in fact, though I don’t know of any such.

A generic encoding of memoization, abstracted from memoEIO and memoBIO in the paper:

> memoIO :: forall a b m .
>           (MonadIO m)
>        => (m b -> a)      -- ^ constructor (hide m)
>        -> (a -> m b)      -- ^ destructor (reveal m)
>        -> (b -> a)        -- ^ "unrun" result b into an a; should be
>                           --   observably equivalent to (constructor . pure)
>        -> a               -- ^ initial object
>        -> IO a            -- ^ IO action returning new, memoized object
> memoIO i o u a = newIORef a >>= pure . usePrev
>  where
>   usePrev :: IORef a -> a
>   usePrev r = i $ do
>     p <- liftIO (readIORef r)
>     res <- o p
>     liftIO (writeIORef r (u res))
>     pure res
>
> memo :: forall a b . (M b -> a) -> (a -> M b) -> (b -> a) -> a -> a
> memo i o u = unsafePerformIO . memoIO i o u

Sections 6.1 and 6.2: Events

An Event is a M-computation which either yields a value (the event “happened”) or another Event (the event “has not happened yet”); these are to be used linearly; that is, whenever you do get your hands on a new Event, drop references to the earlier ones.:

> newtype E a = E { runE :: M (Either (E a) a) }

We need to apply the usual memoizing optimization:

> unrunE :: Either (E a) a -> E a
> unrunE (Left e) = e
> unrunE (Right a) = pure a
>
> memoE :: E a -> E a
> memoE = memo E runE unrunE

We will also need an operation that explicitly punts an Event to the “immediate future” (i.e. the next loop of the driver). This will be used to ensure that things “always take (some amount of) time”. Note that this is not in the paper:

> delayEpsilon :: E a -> E a
> delayEpsilon e = E (pure (Left e))

And now we can state a bunch of properties of Events:

> instance Functor E where
>   fmap f = E . fmap (bimap (fmap f) f) . runE
>
> instance Applicative E where
>   pure x = E (pure (Right x))
>   (<*>) = ap
>
> -- | AKA (>>=')
> pureBindE :: E a -> (a -> E b) -> E b
> pureBindE (E a) f = E $
>   a >>= either
>            -- the input hasn't happened yet; it gave us another event
>            -- to wait on, so do that.
>            (pure . Left . (`pureBindE` f))
>            -- or it has, and so we have our a; just feed it to our
>            -- function and get the resulting event
>            (runE . f)
>
> instance Monad E where
>   m >>= f = memoE (m `pureBindE` f)

Some things just never happen:

> never :: E a
> never = E (pure (Left never))

Given two events, form an event which happens as soon as either one happens:

> minTime :: E x -> E y -> E ()
> minTime l r = E (merge <$> runE l <*> runE r)
>  where
>   -- If either event has finished, discard its output and finish
>   merge (Right _) _         = Right ()
>   merge _         (Right _) = Right ()
>   -- If both events are still pending (and have given us new
>   -- continuations to wait on), wait on those.
>   merge (Left l') (Left r') = Left (minTime l' r')

Sections 6.3 and 6.4: Behaviors

A Behavior is a computation which yields a value now and an event which indicates when the behavior changes:

> newtype B a = B { runB :: M (a, E (B a)) }

Here’s that memoization stunt again:

> unrunB :: (a, E (B a)) -> B a
> unrunB (h,t) = B $ do
>   -- Has the switch event happened yet?
>   r <- runE t
>   case r of
>     Right b -> runB b      -- yes, so act like its yield
>     Left t' -> pure (h,t') -- no, so keep acting like us
>
> memoB :: B a -> B a
> memoB = memo B runB unrunB

fmap should be written like this:

> pureFmapB :: (a -> b) -> B a -> B b
> pureFmapB f = B . fmap (\(a,b) -> (f a, fmap (fmap f) b)) . runB

But instead it’s memoizing:

> instance Functor B where
>   fmap f = memoB . pureFmapB f

Given events which produce behaviors, try to behave like the right when it happens; if the left happens first, behave like that until the right happens. The implementation here is more direct than the paper:

> switchE :: E (B a) -> E (B a) -> E (B a)
> switchE l r = E (merge <$> runE l <*> runE r)
>  where
>   merge _         (Right b) = Right b -- right happened, take it
>   merge (Right b) (Left r') = -- l happened, r still pending (as r')
>                               -- return a behavior which is like l
>                               -- until r' happens
>                               Right (b `switch'` r')
>   merge (Left l') (Left r') = -- Neither happened; wait.
>                               Left (switchE l' r')

The paper uses

switchE l r = ((pure undefined `switch'` l) `switch'` r)
              <$ minTime l r

which should make sense in light of the above expansion.

b `switch'` e behaves like b until e happens, at which point it behaves like e’s yield. It can be thought of as a switchE where we know that the left event has already happened:

> switch' :: B a -> E (B a) -> B a
> switch' b e = B $ do
>   -- run e to see if it has happened yet
>   r <- runE e
>   case r of
>     Right b' -> runB b'  -- yes; behave like it
>     Left e'  ->          -- no; behave like b until e'
>                          -- that is, yield the same value (id)
>                          --  but change behavior when either b or e'
>                          --  says to do so!
>       (bimap id (`switchE` e')) <$> runB b

And of course we memoize it:

> switch :: B a -> E (B a) -> B a
> switch b e = memoB (switch' b e)

The behavior’s monad can be understood perhaps best through join. Given a behavior-generating-behavior, behave like the behaviors it generates through time:

> joinB' :: forall a . B (B a) -> B a
> joinB' m = B $ do
>   -- Run the outer B "now", to obtain a behavior h and switching event,
>   -- which will generate a new behavior-behavior.
>   (h :: B a, t :: E (B (B a))) <- runB m
>   -- So armed, behave like that behavior h until its switching event
>   -- happens...
>   runB $ h `switch'`
>          -- at which point, we will be given a new
>          -- behavior-generating-behavior, which we should sample
>          -- *then* and so on.
>          (joinB' <$> t)

And memoized:

> joinB :: B (B a) -> B a
> joinB = memoB . joinB'

Now we’re finally ready to say something about the properties of Behaviors:

> instance Applicative B where
>   pure x = B $ pure (x, never)
>   (<*>) = ap
>
> instance Monad B where
>   -- Given an alpha-behavior (that is, a time-indexed family of alphas)
>   -- and an alpha-indexed family of beta-behaviors, behave like the
>   -- appropriate beta-behavior through time.
>   (>>=) :: B a -> (a -> B b) -> B b
>   m >>= f = joinB (f <$> m)

I’m going to define a combinator to capture some confusing behavior in whenJust': Given an Event which yields the pieces of an Event-valued-Behavior, yield an Event-valued-Behavior whose value does not happen until after the original Event, and whose switching is likewise delayed. Note that this is emphatically not of the form B $ runE e >>= ... because we want to yield a Behavior whose value waits, not wait to yield a behavior. Thankfully, the types steer us in the right direction:

> awaitEB :: E (E b, E (B (E b))) -> B (E b)
> awaitEB e = B $ pure (e >>= fst, e >>= snd)

And now for the centerpiece, whenJust:

> whenJust' :: B (Maybe a) -> B (E a)
> whenJust' b = B $ do
>   -- Grab the behavior's value and switching event now.
>   (h :: Maybe a, t :: E (B (Maybe a))) <- runB b
>   -- And apply ourselves in the future to the switching event's yield:
>   let nextTime :: E (B (E a)) = whenJust' <$> t
>   case h of
>     Just a -> -- if it's returning Just a now, then
>       -- we should yield
>       pure
>         ( -- an event that happens now whose yield is a
>           pure a
>         , -- and whose switching behavior is to keep watching b's
>           -- switching event.
>          nextTime
>         )
>
>     Nothing -> do -- on the other hand, it might be Nothing.
>       --
>       -- We still need to yield, *now*,
>       --
>       --   - a value (of type E a)
>       --   - and a switching event (of type E (B (E a)))
>       --
>       -- That is, we want to create two events which will wait around for
>       -- the first alpha of the stream defined by b:
>       --
>       --   - an Event which yields that first alpha
>       --   - an Event which yields a behavior that continues whenJust'ing
>       --
>       -- So we clearly need to do something with nextTime, because that
>       -- is all we have.  We could crack it open...
>       let nextM :: E (M (E a, E (B (E a)))) = runB <$> nextTime
>       -- That has the right pieces in it... if only we could get at them.
>       -- Oh, but we could exchange E and M using that funny "plan" thing.
>       eNext :: E (E a, E (B (E a))) <- (planM nextM :: M (E (E a, E (B (E a)))))
>       -- Ah ha, eNext ::~ E (B (E a)), so now all we need to do is make
>       -- the behavior which does the waiting thing.
>       runB $ awaitEB eNext
>
> whenJust :: B (Maybe a) -> B (E a)
> whenJust = memoB . whenJust'

Section 6.6: Primitive Events

Figure 5, instantiated as slapdashedly as possible

> data Round = Round
>   { -- | As returned by System.CPUTime.getCPUTime
>     round_start_time :: Integer
>   , round_clock      :: Clock
>   }
> data Clock = Clock
>   { -- | What time is it now?
>     clock_now        :: IO Round
>   , -- |
>     clock_last_round :: IORef Round
>   , -- |
>     clock_block      :: MVar ()
>   }
> data PrimE a = PrimE
>   { -- | Whose clock are we on, anyway?
>     prime_clock    :: Clock
>   , -- | We've either happened or not; if we did,
>     --   we happened at some point and yielded a value
>     prime_happened :: MVar (Round, a)
>   }
>
> newClock :: IO Clock
> newClock = mfix $ \self -> do
>   rr <- newIORef (Round 0 self)
>   c  <- newEmptyMVar
>   let now = (\t -> Round t self) <$> getCPUTime
>   pure (Clock now rr c)
>
> spawn :: IO a -> Clock -> IO (PrimE a)
> spawn act clock = do
>   mv  <- newEmptyMVar
>   _   <- forkIO (go mv)
>   pure (PrimE clock mv)
>  where
>   go mv = do
>     res <- act
>     now <- clock_now clock
>     putMVar mv (now, res)               -- Announce our success
>     _ <- tryPutMVar (clock_block clock) ()   -- Wake the clock
>     pure ()
>
> curRound :: Clock -> IO Round
> curRound = readIORef . clock_last_round
>
> waitNextRound :: Clock -> IO ()
> waitNextRound clock = do
>   () <- takeMVar (clock_block clock)      -- wait until some primitive event posts
>   -- do
>   --   lr <- readIORef (clock_last_round clock)
>   --   trace ("waitNextRound " ++ show (round_start_time lr)) $ return ()
>   now <- clock_now clock                  -- what time is it?
>   writeIORef (clock_last_round clock) now -- start the new round
>
> observeAt :: PrimE a -> Round -> Maybe a
> observeAt e r = unsafePerformIO $
>   tryReadMVar (prime_happened e) >>=
>   maybe (pure Nothing)
>         (\(rh,vh) -> pure $
>            if (round_start_time rh <= round_start_time r) -- XXX assert equal clocks
>             then Just vh
>             else Nothing)

Section 6.7

> getRound :: (MonadIO m, MonadReader Clock m) => m Round
> getRound = ask >>= liftIO . curRound
>
> toE :: PrimE a -> E a
> toE p = E (maybe (Left (toE p)) Right . (p `observeAt`) <$> getRound)
>
> endRound :: (MonadIO m, MonadReader Clock m) => m ()
> endRound = ask >>= liftIO . waitNextRound

Figure 6: References

> data Ref a = RefStrong (IORef a)
>            | RefWeak   (Weak a)
>
> makeStrongRef, makeWeakRef :: a -> IO (Ref a)
> makeStrongRef = fmap RefStrong . newIORef
> makeWeakRef = fmap RefWeak . flip mkWeakPtr Nothing
>
> deRef :: Ref a -> IO (Maybe a)
> deRef (RefStrong r) = Just <$> readIORef r
> deRef (RefWeak r) = deRefWeak r
>
> -- Not in the paper, used in some errata handling below
> makeLikewiseRef :: Ref b -> a -> IO (Ref a)
> makeLikewiseRef (RefStrong _) a = makeStrongRef a
> makeLikewiseRef (RefWeak _)   a = makeWeakRef   a

Section 6.7

Ah ha! Note that the actual implementation (https://github.com/atzeus/FRPNow/blob/master/Control/FRPNow/Core.hs) uses a different Plan type: type Plan = IORef (Either (Event (M a)) a) and so fixes the ERRATA below in a superior way. This version continues to work the version as given in the paper and makes a smaller patch.

We need Plans:

> data Plan a = Plan
>   { plan_event :: E (M a)
>   , plan_cache :: IORef (Maybe a)
>   }

The planToEv given in the paper does not (I believe) properly respect the linearity of the wait-on chain:

planToEv :: Plan a -> E a
planToEv (Plan ev ref) = E $
  -- Consult the cache!
  liftIO (readIORef ref) >>= flip maybe
    -- If we did this already, then the event already happened.
    (pure . Right)
    -- Otherwise, let's see what happens if we run the thing
    (runE ev >>= either
       -- If it is still waiting, so so are we
       (\_ -> pure $ Left $ planToEv p)
       -- Otherwise, store into the cache and happen!
       (>>= cacheHappen)
    )
 where
  cacheHappen v = do
    liftIO $ writeIORef ref (Just v)
    pure (Right v)

A closely-related function, however, attempts to do so:

> planToEvish_errata :: Plan a -> M (Either (Plan a) a)
> planToEvish_errata (Plan ev ref) =
>   -- Consult the cache!
>   liftIO (readIORef ref) >>= flip maybe
>     -- If we did this already, then the event already happened.
>     (pure . Right)
>     -- Otherwise, let's see what happens if we run the thing
>     (runE ev >>= either
>        -- If it is still waiting, so so are we.
>        --
>        -- XXX ERRATA: Note that it has given us a new way of waiting
>        -- on future events, and we should be using that, rather than
>        -- the old mechanism.  We keep the same result reference, of
>        -- course, since that's what other people are really waiting on.
>        (\ev' -> pure $ Left $ Plan ev' ref)
>        -- Otherwise, store into the cache and happen!
>        (>>= cacheHappen)
>     )
>  where
>   cacheHappen v = do
>     liftIO $ writeIORef ref (Just v)
>     pure (Right v)
>
> planToEv_errata :: Plan a -> E a
> planToEv_errata p = E $
>   either (Left . planToEv_errata) Right <$> planToEvish_errata p

OK, back to more types:

> data SomePlan = forall a . SomePlan (Ref (Plan a))
> type Plans = S.Seq SomePlan
> type M = WriterT Plans (ReaderT Clock IO)
>
> addPlan :: Ref (Plan a) -> M ()
> addPlan = tell . S.singleton . SomePlan

As in the paper, tryPlan, like planToEv, did not properly respect linearity:

tryPlan :: SomePlan -> M ()
tryPlan (SomePlan pr) =
  -- See if the plan is still hanging around
  liftIO (deRef pr) >>= maybe
    -- Nope!  Nothing needs doing!
    (return ())
    -- Yup.  Gotta give this a shot.
    (\p -> runE (planToEv p) >>= either
             -- OK, gave it a shot.  We're still waiting.
             (\_ -> addPlan pr)
             -- Oh, look, it finished.
             -- The return value here is *discarded* as
             -- planToEv has stashed it in the cache whence
             -- it will be retrieved whenever someone else
             -- in the dataflow network needs it.
             (\_ -> return ())
    )

Here’s an attempt that does:

> tryPlan_errata :: SomePlan -> M ()
> tryPlan_errata (SomePlan pr) =
>   -- See if the plan is still hanging around
>   liftIO (deRef pr) >>= maybe
>     -- Nope!  Nothing needs doing!
>     (return ())
>     -- Yup.  Gotta give this a shot.
>     (\p -> planToEvish_errata p >>= either
>              -- OK, gave it a shot.
>              --
>              -- XXX ERRATA: We're still waiting and have a new plan; stick
>              -- that new plan in a reference of the same flavor as it
>              -- originally was, and add it.
>              --
>              (\p' -> liftIO (makeLikewiseRef pr p') >>= addPlan)
>              -- Oh, look, it finished.
>              -- The return value here is *discarded* as
>              -- planToEv has stashed it in the cache whence
>              -- it will be retrieved whenever someone else
>              -- in the dataflow network needs it.
>              (\_ -> return ())
>     )
>
> tryPlans :: Plans -> ReaderT Clock IO Plans
> tryPlans = execWriterT . mapM_ tryPlan_errata

If we have an Event which will yield a M-monadic computation of alphas, we can produce an M-monadic computation which will yield an Event of alphas. Depends on being told how to store plans internally, through the reference creator argument:

> plan :: (forall x . x -> IO (Ref x)) -> E (M a) -> M (E a)
> plan makeRef e = do
>   -- Make a Plan from this event; that's easy, we have an event
>   -- and we haven't looked to see if it's happened yet, so cache
>   -- Nothing.
>   p <- Plan e <$> liftIO (newIORef Nothing)
>   -- Grab a reference to this plan, which might be weak.
>   pr <- liftIO (makeRef p)
>   -- Register it
>   addPlan pr
>   -- And make an event out of it
>   --
>   -- XXX ERRATA: if we really want to ensure that asynchronous
>   -- computations take one round, then we should delayEpsilon here,
>   -- which the paper does not do.
>   --
>   -- XXX ERRATA: this also needs to use the modified planToEv.
>   pure (delayEpsilon $ planToEv_errata p)
>
> planM :: E (M a) -> M (E a)
> planM = plan makeWeakRef

Note that E (M a) ~ M (Either (E (M a)) (M a)), so we’re not really “exchanging” monads or anything. This is all taking place in M and is just a clever slight of hand: plan mkR ::~ M (Either (E (M a)) (M a)) -> M (M (Either (E a) a)).

Anyway, armed with that, we can now define the Now monad and the attendant main loop of the whole design:

> newtype Now a = Now { getNow :: M a }
>  deriving (Functor,Applicative,Monad)
>
> sample :: B a -> Now a
> sample (B b) = Now $ fst <$> b
>
> planNow :: E (Now a) -> Now (E a)
> planNow = Now . plan makeStrongRef . fmap getNow
>
> mainLoop :: (E a, Plans) -> ReaderT Clock IO a
> mainLoop (ev, pl0) = loop pl0 where
>   loop pli = do
>     -- trace ("mainLoop top; npl=" ++ (show $ S.length pli)) $ return ()
>     -- Probe at the breaking event to see if we are done
>     (er, ple) <- runWriterT (runE ev)
>     case er of
>       -- Indeed!  We're done here; bail out!
>       Right x -> pure x
>       -- No, the breaking event has yet to happen.
>       Left _ -> do  -- XXX _ ?
>         let pli' = pli S.>< ple
>         -- trace ("mainLoop endRound; npl=" ++ (show $ S.length pli')) $ return ()
>         -- wait for the next round to start
>         endRound
>         -- It's the next round now; run everything under the sun
>         pl' <- tryPlans pli'
>         -- and go again!
>         loop pl'
>
>
> runNow :: Now (E a) -> IO a
> runNow (Now m) = do
>   c <- newClock
>   runReaderT (runWriterT m >>= mainLoop) c

async lets us lift any IO action into an Event upon which we will wait. There is a typo in the paper. “spawn c m” should be “spawn m c”:

> async :: IO a -> Now (E a)
> async m = Now $ ask >>= fmap toE . liftIO . spawn m

Extras

when from section 3.1:

> whenB :: B Bool -> B (E ())
> whenB b = whenJust (b2m <$> b)
>  where
>   b2m True  = Just ()
>   b2m False = Nothing

Example in 6.4

> test64 :: Int -> Now (E ())
> test64 n = do b <- count64
>               sample (whenB (traceP <$> b))
>  where
>   traceP x = trace ("test64 : " ++ (show x)) $ n == x
>
> count64 :: Now (B Int)
> count64 = loop 0
>  where
>   loop i = do
>     -- trace ("At loop top: " ++ (show i)) $ return ()
>     e  <- async (return ())
>     e' <- planNow (loop (i+1) <$ e)
>     pure (pure i `switch` e')

Some more small examples

> testImmediateExit :: Now (E ())
> testImmediateExit = sample (whenB ((== 0) <$> (pure (0 :: Int))))
>
> testHang :: Now (E a)
> testHang = sample (whenJust (pure Nothing))
>
> testOneTick :: Now (E ())
> testOneTick = do
>   tickme <- async (return ())
>   sample $ whenJust (         pure Nothing
>                      `switch` (pure (pure (Just ())) <* tickme))