We start off with the usual pile of LANGUAGE pragmas and module imports:

> {-# LANGUAGE ExistentialQuantification #-}
> {-# LANGUAGE GADTs #-}
> {-# LANGUAGE Rank2Types #-}
> {-# LANGUAGE ScopedTypeVariables #-}
> module Reagents where
> import Control.Concurrent
> import Control.Monad
> import Data.Function
> import Data.IORef
> import qualified Data.Map as M
> import Data.Maybe
> import Debug.Trace
> import qualified Data.Set as S
> import System.Mem.StableName
> -- import System.Mem.Weak
> import System.IO.Unsafe
> import Unsafe.Coerce

We need some way to make a reference object:

> newtype Ref a = Ref { unRef :: Eq a => IORef (Maybe a) }
> newRef :: a -> IO (Ref a)
> newRef a = do
>   ior <- newIORef (Just a)
>   return $ Ref ior
> casRef (Ref ior) ex nv = atomicModifyIORef ior cas
>  where cas iorv | iorv == (Just ex) = (Just nv,True)
>        cas iorv | otherwise         = (iorv,False)

And moreover need to maintain logs of reference objects, existentially hiding their data type:

> data ExRef = forall a . ExRef (Ref a)
> data RefLogEnt = forall a . (Eq a) => RLE
>                { rle_ref :: Ref a
>                , rle_val :: !a
>                }
>   -- Man I do not feel good about these.
>   -- ("Your code is bad and you should feel bad")
> mkersn :: ExRef -> StableName (Ref b)
> mkersn r = case r of
>              (ExRef r') -> r' `seq` unsafeCoerce$unsafePerformIO$makeStableName r'
> instance Eq ExRef where
>   (==) = (==) `on` mkersn
> instance Ord ExRef where
>   compare = compare `on` (hashStableName . mkersn)

An Offer:

> data OfferD a = Pending
>               | Rescinded
>               | Final a
>   -- Additionally, make sure that Pending objects are not discarded
>   -- when seen (e.g. use p@Pending, not just Pending, in match),
>   -- as we might want to use Weak magic to hook Pending objects
> instance Eq (OfferD a) where
>   Pending == Pending     = True
>   Rescinded == Rescinded = True
>   (Final _) == (Final _) = error "Equality of Final Offer"
>   _ == _                 = False
> newtype Offer a = Offer (Ref (OfferD a))
> newOffer :: IO (Offer a)
> newOffer = do
>   ior <- newIORef (Just Pending)
>   return $ Offer $ Ref ior
> tryRescindOffer :: Offer a -> IO (Maybe a)
> tryRescindOffer (Offer (Ref ior)) = atomicModifyIORef ior cas
>  where
>   cas r@(Just (Final a)) = (r, Just a)
>   cas x                  = (x, Nothing)


> data Failure = Retry | Block
> newtype Result a = Result { unResult :: Either Failure a }
> data Message a b = forall c . Message
>                  { m_payload :: a
>                  , m_sendrx  :: Reaction
>                  , m_sendk   :: Reagent b c
>                  , m_offer   :: Offer c
>                  }
> data Endpoint a b = Endpoint
>                   { e_chan :: IORef [Message a b]
>                   , e_dual :: Endpoint b a
>                   }
> mkChan :: IO (Endpoint a b, Endpoint b a)
> mkChan = do
>   ior1 <- newIORef []
>   ior2 <- newIORef []
>   let r@(e1, e2) = (Endpoint ior1 e2, Endpoint ior2 e1)
>   return r


> data Reaction = Reaction
>               { rx_redoLog :: M.Map Int RefLogEnt -- desired values of refs
>               , rx_undoLog :: M.Map Int RefLogEnt -- expected values of refs
>               }
> rx_empty :: Reaction
> rx_empty = Reaction M.empty M.empty
> rx_withcas :: (Eq a) => Reaction -> Ref a -> a -> a -> Reaction
> rx_withcas (Reaction re un) ra ex ne =
>   let ix = hashStableName $ mkersn $ ExRef ra in
>   Reaction (M.insert ix (RLE ra ne) re)
>            (M.insert ix (RLE ra ex) un)
> rx_commit :: Reaction -> IO Bool
> rx_commit (Reaction re un) = do
>   -- traceShow ("RXC") $ return ()
>   ok <- acquireAll [] (map snd $ M.toAscList un)
>   if ok
>    then setAll (map snd $ M.toList re) >> return True
>    else return False
>  where
>   acquireAll _  [] = return True
>   acquireAll us (p:ps) = do
>      case p of RLE ref val -> do
>        -- traceShow ("AcqAll", hashStableName $ mkersn (ExRef ref)) $ return ()
>        ok <- atomicModifyIORef (unRef ref)
>                                (\a -> if a == Just val
>                                        then (Nothing, True)
>                                        else (a, False))
>        if ok
>         then acquireAll (p:us) ps
>         else setAll us >> return False
>   setAll :: [RefLogEnt] -> IO ()
>   setAll = mapM_ (\rle -> case rle of RLE ref val -> writeIORef (unRef ref) (Just val))
> rx_hascas :: Reaction -> Bool
> rx_hascas (Reaction r _) = M.null r
> offerFulfill :: Reaction -> Offer a -> a -> Reaction
> offerFulfill r (Offer o) a = rx_withcas r o Pending (Final a)

Ah, finally we can start getting close to the thing we’re after. A Reagent is just a thing in one of a few states:

> data Reagent a b where
>       -- The paper gets this definition wrong, I think; they pass
>       -- () to the tryReact sub-call at the end of the CAS case
>       -- of figure 6, but claim that the continuation is
>       -- "Reagent a r".
>   Cas      :: (Eq a) => Ref a -> a -> a -> Reagent () r -> Reagent () r
>   Choice   :: Reagent a b -> Reagent a b -> Reagent a b
>   Commit   :: Reagent a a
>   Computed :: (a -> Maybe (Reagent () b)) -> Reagent a b
>     -- Not given in figure 6, but is hinted at in figure 3
>   Read     :: (Eq a) => Ref a -> Reagent a r -> Reagent () r
>   Swap     :: Endpoint a b -> Reagent b r -> Reagent a r

And we need some tests on those states:

> re_hascas :: Reagent a b -> Bool
> re_hascas (Cas _ _ _ _)  = True
> re_hascas (Choice r1 r2) = re_hascas r1 || re_hascas r2
> re_hascas Commit         = False
> re_hascas (Computed _)   = True -- incorrect but safe
> re_hascas (Read _ r)     = re_hascas r
> re_hascas (Swap _ r)     = re_hascas r
> re_maysync :: Reagent a b -> Bool
> re_maysync (Cas _ _ _ _)  = False
> re_maysync (Choice r1 r2) = re_maysync r1 || re_maysync r2
> re_maysync Commit         = False
> re_maysync (Computed _)   = True -- incorrect but safe
> re_maysync (Read _ r)     = re_maysync r
> re_maysync (Swap _ _)     = True

And now, the meat of paper (figure 6):

> tryReact :: Reagent a b -> a -> Reaction -> (Maybe (Offer b)) -> IO (Result b)
> tryReact Commit a rx Nothing = do
>   rxok <- rx_commit rx
>   return . Result $ if rxok then Right a else Left Retry
> tryReact Commit a rx (Just off) = do
>   rescinded <- tryRescindOffer off
>   case rescinded of
>     Just a' -> return $ Result $ Right a'
>     Nothing -> tryReact Commit a rx Nothing
> tryReact (Cas ref ov nv k) () rx off = do
>   -- traceShow ("TR Cas") $ return ()
>   if (not (rx_hascas rx)) && (not (re_hascas k))
>    then do
>           casok <- casRef ref ov nv
>           if casok
>            then tryReact k () rx off
>            else return $ Result $ Left Retry
>    else tryReact k () (rx_withcas rx ref ov nv) off
> tryReact (Choice r1 r2) a rx off = do
>   tr1 <- tryReact r1 a rx off
>   case unResult tr1 of
>     Left Retry -> do
>                     tr2 <- tryReact r2 a rx off
>                     case unResult tr2 of
>                       Left _ -> return $ Result $ Left $ Retry
>                       a'@(Right _) -> return $ Result a'
>     Left Block -> tryReact r2 a rx off
>     a'@(Right _) -> return $ Result a'
> tryReact (Computed c) a rx off = do
>   case c a of
>     Nothing -> return $ Result $ Left Block
>     Just r  -> tryReact r () rx off
> tryReact (Read ref k) () rx off = do
>   val <- readIORef $ unRef ref
>   case val of
>     Nothing -> return $ Result $ Left Retry
>     Just v' -> tryReact k v' rx off
> -- tryReact (Swap e k) a rx Nothing = do
> -- XXX The paper has a typo here and starts tryFrom with
> -- a failureMode of Retry; tryFrom never transitions to
> -- Block.  The prose of the paper says that tryFrom Blocks
> -- only if all observed offers Block, so I believe this
> -- to be the correct fix.
> -- tryReact (Swap e k) a rx (Just off) = do
> --   atomicModifyIORef (e_chan e) (\ms -> ((Message a rx k off):ms, ms))
> --   ...
> -- "tryFrom" but with arguments flipped
> tryReactFrom :: a -> Failure -> [Message a b] -> IO (Result b)
> tryReactFrom a fm [] = return $ Result $ Left fm
> tryReactFrom a fm ((Message b rx' k' off'):ms) = do
>   undefined -- forkIO $ tryReact k' rx'

Here’s the core of the whole thing:

> bang :: Reagent a b -> a -> IO b
> bang r a = withoutOffer
>  where
>   withoutOffer = do
>     trace (show ("WithoutOffer")) $ return ()
>     res <- tryReact r a rx_empty Nothing
>     case unResult res of
>       Left Block -> withOffer
>       Left Retry -> do
>                        backoffOnce
>                        if re_maysync r then withOffer else withoutOffer
>       Right ans -> return ans
>   withOffer = do
>     traceShow ("WithOffer") $ return ()
>     off <- newOffer
>     res <- tryReact r a rx_empty (Just off)
>     case unResult res of
>       Left Block -> backoffOnce {- XXX -} >> retry off
>       Left Retry -> backoffOnce >> retry off
>       Right b    -> return b
>   retry off = do
>     res <- tryRescindOffer off
>     case res of
>       Nothing -> withOffer
>       Just b  -> traceShow ("Offer met") $ return b

Well, this is one way to do it:

> backoffOnce :: IO ()
> backoffOnce = threadDelay 10

Wouldn’t be complete without test cases:

> oneCas ref = Cas ref 0 1 Commit
> twoCas ref = Cas ref 1 2 (Cas ref 0 1 Commit)
> testIntZero rea = do
>   ref <- newRef (0 :: Int)
>   bang (rea ref) ()
>   res <- readIORef $ unRef $ ref
>   return $ fromJust res
> addOneRea ref = Read ref (Computed (\a -> Just $ Cas ref a (a+1) Commit))
> testRacers :: Int -> Int -> (Ref Int -> Reagent () ()) -> IO Int
> testRacers n m rea = do
>   ref <- newRef (0 :: Int)
>   mvs <- replicateM n newEmptyMVar
>   mapM_ (\mv -> forkIO $ racer ref >> putMVar mv ()) mvs
>   mapM_ takeMVar mvs
>   res <- readIORef $ unRef $ ref
>   return $ fromJust res
>  where
>   racer ref = mapM_ (const $ bang (rea ref) ()) [1..m]