A woefully incomplete reimplementation of Aaron Turon’s Reagents (http://dl.acm.org/citation.cfm?id=2254084)

Mostly a test of “do I understand what the paper is doing” rather than anything meant to be serious.

This page’s raw RST source is a literate Haskell file.

There are a number of disclaimers I feel necessary to state up front:

It would be interesting to contrast this with an acquire-as-you-go locking scheme ala Coyotos, complete with inter-thread “get out of my way”. It already has distinct “read” and “commit” phases; the change would make that “read and acquire” and “commit”.

On the other hand, “testRacers 10 10000 addOneRea” works

We start off with the usual pile of LANGUAGE pragmas and module imports:

> {-# LANGUAGE ExistentialQuantification #-}
> {-# LANGUAGE GADTs #-}
> {-# LANGUAGE Rank2Types #-}
> {-# LANGUAGE ScopedTypeVariables #-}
>
> module Reagents where
>
> import Control.Concurrent
> import Control.Monad
> import Data.Function
> import Data.IORef
> import qualified Data.Map as M
> import Data.Maybe
> import Debug.Trace
> import qualified Data.Set as S
> import System.Mem.StableName
> -- import System.Mem.Weak
> import System.IO.Unsafe
> import Unsafe.Coerce

We need some way to make a reference object:

> newtype Ref a = Ref { unRef :: Eq a => IORef (Maybe a) }
>
> newRef :: a -> IO (Ref a)
> newRef a = do
>   ior <- newIORef (Just a)
>   return $ Ref ior
>
> casRef (Ref ior) ex nv = atomicModifyIORef ior cas
>  where cas iorv | iorv == (Just ex) = (Just nv,True)
>        cas iorv | otherwise         = (iorv,False)

And moreover need to maintain logs of reference objects, existentially hiding their data type:

> data ExRef = forall a . ExRef (Ref a)
> data RefLogEnt = forall a . (Eq a) => RLE
>                { rle_ref :: Ref a
>                , rle_val :: !a
>                }
>
>   -- Man I do not feel good about these.
>   -- ("Your code is bad and you should feel bad")
> mkersn :: ExRef -> StableName (Ref b)
> mkersn r = case r of
>              (ExRef r') -> r' `seq` unsafeCoerce$unsafePerformIO$makeStableName r'
> instance Eq ExRef where
>   (==) = (==) `on` mkersn
> instance Ord ExRef where
>   compare = compare `on` (hashStableName . mkersn)

An Offer:

> data OfferD a = Pending
>               | Rescinded
>               | Final a
>   -- Additionally, make sure that Pending objects are not discarded
>   -- when seen (e.g. use p@Pending, not just Pending, in match),
>   -- as we might want to use Weak magic to hook Pending objects
>
> instance Eq (OfferD a) where
>   Pending == Pending     = True
>   Rescinded == Rescinded = True
>   (Final _) == (Final _) = error "Equality of Final Offer"
>   _ == _                 = False
>
> newtype Offer a = Offer (Ref (OfferD a))
>
> newOffer :: IO (Offer a)
> newOffer = do
>   ior <- newIORef (Just Pending)
>   return $ Offer $ Ref ior
>
> tryRescindOffer :: Offer a -> IO (Maybe a)
> tryRescindOffer (Offer (Ref ior)) = atomicModifyIORef ior cas
>  where
>   cas r@(Just (Final a)) = (r, Just a)
>   cas x                  = (x, Nothing)

Endpoints:

> data Failure = Retry | Block
> newtype Result a = Result { unResult :: Either Failure a }
>
> data Message a b = forall c . Message
>                  { m_payload :: a
>                  , m_sendrx  :: Reaction
>                  , m_sendk   :: Reagent b c
>                  , m_offer   :: Offer c
>                  }
>
> data Endpoint a b = Endpoint
>                   { e_chan :: IORef [Message a b]
>                   , e_dual :: Endpoint b a
>                   }
>
> mkChan :: IO (Endpoint a b, Endpoint b a)
> mkChan = do
>   ior1 <- newIORef []
>   ior2 <- newIORef []
>   let r@(e1, e2) = (Endpoint ior1 e2, Endpoint ior2 e1)
>   return r

Reactions:

> data Reaction = Reaction
>               { rx_redoLog :: M.Map Int RefLogEnt -- desired values of refs
>               , rx_undoLog :: M.Map Int RefLogEnt -- expected values of refs
>               }
>
> rx_empty :: Reaction
> rx_empty = Reaction M.empty M.empty
>
> rx_withcas :: (Eq a) => Reaction -> Ref a -> a -> a -> Reaction
> rx_withcas (Reaction re un) ra ex ne =
>   let ix = hashStableName $ mkersn $ ExRef ra in
>   Reaction (M.insert ix (RLE ra ne) re)
>            (M.insert ix (RLE ra ex) un)
>
> rx_commit :: Reaction -> IO Bool
> rx_commit (Reaction re un) = do
>   -- traceShow ("RXC") $ return ()
>   ok <- acquireAll [] (map snd $ M.toAscList un)
>   if ok
>    then setAll (map snd $ M.toList re) >> return True
>    else return False
>  where
>   acquireAll _  [] = return True
>   acquireAll us (p:ps) = do
>      case p of RLE ref val -> do
>        -- traceShow ("AcqAll", hashStableName $ mkersn (ExRef ref)) $ return ()
>        ok <- atomicModifyIORef (unRef ref)
>                                (\a -> if a == Just val
>                                        then (Nothing, True)
>                                        else (a, False))
>        if ok
>         then acquireAll (p:us) ps
>         else setAll us >> return False
>
>   setAll :: [RefLogEnt] -> IO ()
>   setAll = mapM_ (\rle -> case rle of RLE ref val -> writeIORef (unRef ref) (Just val))
>
> rx_hascas :: Reaction -> Bool
> rx_hascas (Reaction r _) = M.null r
>
> offerFulfill :: Reaction -> Offer a -> a -> Reaction
> offerFulfill r (Offer o) a = rx_withcas r o Pending (Final a)

Ah, finally we can start getting close to the thing we’re after. A Reagent is just a thing in one of a few states:

> data Reagent a b where
>       -- The paper gets this definition wrong, I think; they pass
>       -- () to the tryReact sub-call at the end of the CAS case
>       -- of figure 6, but claim that the continuation is
>       -- "Reagent a r".
>   Cas      :: (Eq a) => Ref a -> a -> a -> Reagent () r -> Reagent () r
>   Choice   :: Reagent a b -> Reagent a b -> Reagent a b
>   Commit   :: Reagent a a
>   Computed :: (a -> Maybe (Reagent () b)) -> Reagent a b
>     -- Not given in figure 6, but is hinted at in figure 3
>   Read     :: (Eq a) => Ref a -> Reagent a r -> Reagent () r
>   Swap     :: Endpoint a b -> Reagent b r -> Reagent a r

And we need some tests on those states:

> re_hascas :: Reagent a b -> Bool
> re_hascas (Cas _ _ _ _)  = True
> re_hascas (Choice r1 r2) = re_hascas r1 || re_hascas r2
> re_hascas Commit         = False
> re_hascas (Computed _)   = True -- incorrect but safe
> re_hascas (Read _ r)     = re_hascas r
> re_hascas (Swap _ r)     = re_hascas r
>
> re_maysync :: Reagent a b -> Bool
> re_maysync (Cas _ _ _ _)  = False
> re_maysync (Choice r1 r2) = re_maysync r1 || re_maysync r2
> re_maysync Commit         = False
> re_maysync (Computed _)   = True -- incorrect but safe
> re_maysync (Read _ r)     = re_maysync r
> re_maysync (Swap _ _)     = True

And now, the meat of paper (figure 6):

> tryReact :: Reagent a b -> a -> Reaction -> (Maybe (Offer b)) -> IO (Result b)
>
> tryReact Commit a rx Nothing = do
>   rxok <- rx_commit rx
>   return . Result $ if rxok then Right a else Left Retry
> tryReact Commit a rx (Just off) = do
>   rescinded <- tryRescindOffer off
>   case rescinded of
>     Just a' -> return $ Result $ Right a'
>     Nothing -> tryReact Commit a rx Nothing
>
> tryReact (Cas ref ov nv k) () rx off = do
>   -- traceShow ("TR Cas") $ return ()
>   if (not (rx_hascas rx)) && (not (re_hascas k))
>    then do
>           casok <- casRef ref ov nv
>           if casok
>            then tryReact k () rx off
>            else return $ Result $ Left Retry
>    else tryReact k () (rx_withcas rx ref ov nv) off
>
> tryReact (Choice r1 r2) a rx off = do
>   tr1 <- tryReact r1 a rx off
>   case unResult tr1 of
>     Left Retry -> do
>                     tr2 <- tryReact r2 a rx off
>                     case unResult tr2 of
>                       Left _ -> return $ Result $ Left $ Retry
>                       a'@(Right _) -> return $ Result a'
>     Left Block -> tryReact r2 a rx off
>     a'@(Right _) -> return $ Result a'
>
> tryReact (Computed c) a rx off = do
>   case c a of
>     Nothing -> return $ Result $ Left Block
>     Just r  -> tryReact r () rx off
>
> tryReact (Read ref k) () rx off = do
>   val <- readIORef $ unRef ref
>   case val of
>     Nothing -> return $ Result $ Left Retry
>     Just v' -> tryReact k v' rx off
>
> -- tryReact (Swap e k) a rx Nothing = do
> -- XXX The paper has a typo here and starts tryFrom with
> -- a failureMode of Retry; tryFrom never transitions to
> -- Block.  The prose of the paper says that tryFrom Blocks
> -- only if all observed offers Block, so I believe this
> -- to be the correct fix.
>
> -- tryReact (Swap e k) a rx (Just off) = do
> --   atomicModifyIORef (e_chan e) (\ms -> ((Message a rx k off):ms, ms))
> --   ...
>
>
> -- "tryFrom" but with arguments flipped
> tryReactFrom :: a -> Failure -> [Message a b] -> IO (Result b)
>
> tryReactFrom a fm [] = return $ Result $ Left fm
> tryReactFrom a fm ((Message b rx' k' off'):ms) = do
>   undefined -- forkIO $ tryReact k' rx'

Here’s the core of the whole thing:

> bang :: Reagent a b -> a -> IO b
> bang r a = withoutOffer
>  where
>   withoutOffer = do
>     trace (show ("WithoutOffer")) $ return ()
>     res <- tryReact r a rx_empty Nothing
>     case unResult res of
>       Left Block -> withOffer
>       Left Retry -> do
>                        backoffOnce
>                        if re_maysync r then withOffer else withoutOffer
>       Right ans -> return ans
>
>   withOffer = do
>     traceShow ("WithOffer") $ return ()
>     off <- newOffer
>     res <- tryReact r a rx_empty (Just off)
>     case unResult res of
>       Left Block -> backoffOnce {- XXX -} >> retry off
>       Left Retry -> backoffOnce >> retry off
>       Right b    -> return b
>
>   retry off = do
>     res <- tryRescindOffer off
>     case res of
>       Nothing -> withOffer
>       Just b  -> traceShow ("Offer met") $ return b

Well, this is one way to do it:

> backoffOnce :: IO ()
> backoffOnce = threadDelay 10

Wouldn’t be complete without test cases:

> oneCas ref = Cas ref 0 1 Commit
> twoCas ref = Cas ref 1 2 (Cas ref 0 1 Commit)
>
> testIntZero rea = do
>   ref <- newRef (0 :: Int)
>   bang (rea ref) ()
>   res <- readIORef $ unRef $ ref
>   return $ fromJust res
>
> addOneRea ref = Read ref (Computed (\a -> Just $ Cas ref a (a+1) Commit))
>
> testRacers :: Int -> Int -> (Ref Int -> Reagent () ()) -> IO Int
> testRacers n m rea = do
>   ref <- newRef (0 :: Int)
>
>   mvs <- replicateM n newEmptyMVar
>   mapM_ (\mv -> forkIO $ racer ref >> putMVar mv ()) mvs
>
>   mapM_ takeMVar mvs
>
>   res <- readIORef $ unRef $ ref
>   return $ fromJust res
>
>  where
>   racer ref = mapM_ (const $ bang (rea ref) ()) [1..m]