New monads/MonadAdvSTM: Difference between revisions

From HaskellWiki
(Created MonadAdvSTM)
 
m (Minor formatting changes)
 
(14 intermediate revisions by 4 users not shown)
Line 1: Line 1:
[[Category:Code]]
[[Category:Code]]


The e-mail that inspired this Monad and the Monad itself:
__TOC__


== Caveat ==
The <code>onCommit</code> works great.  The <code>onRetry</code>/<code>retryWith</code> really ought to be implemented with changes in the runtime.  The ''Helper Thread'' code is a very close attempt to simulate the correct semantics.  The ''Single Thread'' code is flawed since it will get caught in a busy wait if the <code>onRetry</code> commands do not allow for a commit on the following re-attempt.
== Email ==
The e-mail that inspired this monad, and the monad itself:
<pre>
From: Simon Peyton-Jones <simonpj@microsoft.com>
From: Simon Peyton-Jones <simonpj@microsoft.com>
To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>,
To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>,
Line 17: Line 26:
| commit that means that the memory transaction and database transaction either both commit or both
| commit that means that the memory transaction and database transaction either both commit or both
| abort.
| abort.
 
</pre>
Yes, I have toyed with extending GHC's implementation of STM to support
{|
|Yes, I have toyed with extending GHC's implementation of STM to support


         onCommit :: IO a -> STM ()
         onCommit :: IO a -> STM ()
Line 26: Line 36:
           xv <- readTVar x
           xv <- readTVar x
           yv <- readTVar y
           yv <- readTVar y
           if xv>yv then
           if xv > yv
                onCommit launchMissiles
            then onCommit launchMissiles
             else return () })
             else return () })
and the missiles would only get launched when the transaction successfully commits.
and the missiles would only get launched when the transaction successfully commits.
Line 36: Line 46:
           xv <- readTVar x;
           xv <- readTVar x;
           yv <- readTVar y;
           yv <- readTVar y;
           if xv>yv then
           if xv > yv
                retur launchMissiles
            then return launchMissiles
             else return (return ()) }) ;
             else return (return ()) }) ;
       action }
       action }
Line 65: Line 75:


PS: I agree wholeheartedly with this:
PS: I agree wholeheartedly with this:
 
|}
<pre>
| Of course, these solutions don't deal with the question of atomic blocks that want to perform output
| Of course, these solutions don't deal with the question of atomic blocks that want to perform output
| (e.g. to the console) and receive input in response to that.  My view at the moment is _that does not
| (e.g. to the console) and receive input in response to that.  My view at the moment is _that does not
Line 74: Line 85:
Haskell-Cafe@haskell.org
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
http://www.haskell.org/mailman/listinfo/haskell-cafe
</pre>
== Single Threaded code ==


This Single Threaded code can get caught in a busy wait.  The Helper Thread code below is better.
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]]


<haskell>
<haskell>
Line 80: Line 97:


   Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
   Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
  Usual 3 clause BSD Licence
  Copyright 2006


   This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
   This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
   mailing list, in which the type and semantics of onCommit and
   mailing list, in which the type and semantics of onCommit and
   withRetry were put forth.
   retryWith were put forth.


   The semantics of printing the contents of the TVar "v" created in
   The semantics of printing the contents of the TVar "v" created in
Line 92: Line 107:
*AdvSTM> main
*AdvSTM> main
"hello world"
"hello world"
"retryWith Start"
 
("retryWith v",7)
"test"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
("onCommit v",42)
("result","foo","retries",1)
 
"testUnlift"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
"onCommit Start"
("onCommit v",42)
("onCommit v",42)
("result","foo")
("result","foo","retries",2)
 
"bye world"
"bye world"


Line 113: Line 139:
-}
-}


module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith) where
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
            ,unlifter,unlift,unlift1,unlift2) where


-- import MonadBase
-- import MonadBase
Line 119: Line 146:
import Control.Monad(MonadPlus(..),liftM)
import Control.Monad(MonadPlus(..),liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.MVar(MVar,newEmptyMVar,newMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import Data.Generics(Data)
import Data.Maybe(maybe)
import Data.Typeable(Typeable)
import GHC.Conc(unsafeIOToSTM)
import GHC.Conc(unsafeIOToSTM)
-- for countRetries example
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
import Data.Typeable(Typeable)
import Data.Generics(Data)


class MonadAdvSTM m where
class (Monad m) => MonadAdvSTM m where
   onCommit :: IO a -> m ()
   onCommit :: IO a -> m ()
   onRetry :: IO a -> m ()
   onRetry :: IO a -> m ()
Line 136: Line 166:


-- Export type but not constructor!
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT (CommitVar,RetryVar) STM a) deriving (Functor,Monad,MonadPlus,Typeable)
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type CommitVar = TVar ([IO ()]->[IO ()])
type Env = (CommitVar,RetryVar)
type RetryVar = IORef ([IO ()]->[IO ()])
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())


{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
Line 147: Line 178:


-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
 
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith :: IO a -> AdvSTM b
retryWith io = onRetry io >> retryAdv
retryWith io = onRetry io >> retryAdv


instance MonadAdvSTM AdvSTM where
instance MonadAdvSTM AdvSTM where
   onCommit io = do
   onCommit io = do
     cv <- AdvSTM $ asks fst
     commitVar <- AdvSTM $ asks fst
     old <- liftAdv $ readTVar cv
     old <- liftAdv $ readTVar commitVar
     liftAdv $ writeTVar cv (old . ((io >> return ()):))
     liftAdv $ writeTVar commitVar (old . (io >>))
   onRetry io = do
   onRetry io = do
     rv <- AdvSTM $ asks snd
     retryVar <- AdvSTM $ asks snd
     liftAdv $ unsafeIOToSTM $ modifyIORef rv (\ old -> old . ((io >> return ()):) )
     liftAdv $ unsafeIOToSTM (do
{-
      may'do <- tryTakeMVar retryVar
  orElseAdv (AdvSTM a) (AdvSTM b) =
      let todo = maybe (io >>) (. (io >>)) may'do
    {- If a retries then its onRetry commands are kept on the list of
      seq todo (putMVar retryVar todo))
      actions to do if the whole command fails. It would be possible
      to save the "rv" and use unsafeIOToSTM to implement a different
      policy here -}
    AdvSTM $ do env <- ask
                lift $ (runReaderT a env) `orElse` (runReaderT b env)
-}
   orElseAdv = mplus
   orElseAdv = mplus
   retryAdv = liftAdv retry -- the same as retryAdv = mzero
   retryAdv = liftAdv retry -- the same as retryAdv = mzero
   atomicAdv = runAdvSTM
   atomicAdv = runAdvSTM
   catchAdv (AdvSTM action) handler =
   catchAdv action handler = do
     let h env error = let (AdvSTM cleanup) = handler error
     action' <- unlift action
                      in runReaderT cleanup env
     handler' <- unlift1 handler
     in AdvSTM $ do env <- ask
    liftAdv $ catchSTM action' handler'
                  lift $ catchSTM (runReaderT action env) (h env)
   liftAdv = AdvSTM . lift
   liftAdv = AdvSTM . lift


Line 181: Line 204:
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
runAdvSTM (AdvSTM action) = do
   cv <- newTVarIO id
   commitVar <- newTVarIO id
   rv <- newIORef id
   retryVar <- newMVar id
   let wrappedAction = (runReaderT (liftM Just action) (cv,rv))
  let check'retry = do
                       `orElse` (return Nothing)
        may'todo <- unsafeIOToSTM $ tryTakeMVar retryVar
      loop = do
        maybe retry (return . Right) may'todo
   let wrappedAction = (runReaderT (liftM Left action) (commitVar,retryVar))
                       `orElse` (check'retry)
  let attempt = do
         result <- atomically $ wrappedAction
         result <- atomically $ wrappedAction
         case result of
         case result of
           Just answer -> do
           Left answer -> do
             cFun <- atomically (readTVar cv)
             cFun <- atomically (readTVar commitVar)
             sequence_ (cFun [])
             cFun (return ())
             return answer
             return answer
           Nothing -> do
           Right rFun -> do
             rFun <- readIORef rv
             rFun (return ())
            writeIORef rv id  -- must reset the list
            attempt
            sequence_ (rFun [])
  attempt
            loop
 
   loop
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
 
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
 
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)
 
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
   return (u f)


-- Example code using the above:
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))


test :: TVar Bool -> AdvSTM String
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
  u <- unlifter
  return (\x y -> u (f x y))
 
-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
test todo = do
   onCommit (print "onCommit Start")
   onCommit (print "onCommit Start")
Line 215: Line 264:
       atomically (writeTVar todo True)
       atomically (writeTVar todo True)
       print "Flipped choice to True to avoid infinite loop"
       print "Flipped choice to True to avoid infinite loop"
-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (print "onCommit Start")
    onRetry (print "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x))
    onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x))
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do
        atomically (writeTVar todo True)
        print "Flipped choice to True to avoid infinite loop"


-- Example similar to Simon's suggested example:
-- Example similar to Simon's suggested example:
 
countRetries :: (MonadAdvSTM m, Enum a) => IORef a -> m a1 -> m a1
countRetries :: IORef Int -> AdvSTM a -> AdvSTM a
countRetries ioref action =
countRetries ioref action =
   let incr = do old <- readIORef ioref
   let incr = do old <- readIORef ioref
Line 227: Line 295:
main = do
main = do
   print "hello world"
   print "hello world"
  putStrLn ""
  counter <- newIORef 0
   todo <- newTVarIO False
   todo <- newTVarIO False
  print "test"
  result <- runAdvSTM (countRetries counter $ test todo)
  retries <- readIORef counter
  print ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  putStrLn ""
  print "testUnlift"
  result <- runAdvSTM (countRetries counter $ testUnlift todo)
  retries <- readIORef counter
  print ("result",result,"retries",retries)
  putStrLn ""
  print "bye world"
</haskell>
== Helper Thread code ==
This is my preferred solution at the moment.
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]]
<haskell>
{- November 24th, 2006
  Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
  This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
  mailing list, in which the type and semantics of onCommit and
  withRetry were put forth.
  The semantics of printing the contents of the TVar "v" created in
  test via retryWith may or may not be well defined.  With GHC 6.6 I get
*AdvSTM> main
(ThreadId 415,"hello world")
(ThreadId 415,"")
(ThreadId 415,"test")
(ThreadId 416,"onRetry Start")
(ThreadId 416,("onRetry v",7))
(ThreadId 416,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",1))
(ThreadId 415,"")
(ThreadId 415,"testUnlift")
(ThreadId 417,"onRetry Start")
(ThreadId 417,("onRetry v",7))
(ThreadId 417,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",2))
(ThreadId 415,"")
(ThreadId 415,"testFork")
(ThreadId 418,"onRetry Start")
(ThreadId 418,"Flipped choice to True to avoid infinite loop")
(ThreadId 419,("onRetry v",7))
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",3))
(ThreadId 415,"")
(ThreadId 415,"bye world")
  Aside from that I think the unsafeIOToSTM is not really unsafe here
  since it writes to privately created and maintained variables.
  Since the implementation is hidden it could be changed from ReaderT
  to some other scheme.
  Once could also use MonadBase from
  http://haskell.org/haskellwiki/New_monads/MonadBase to help with the
  lifting, but this has been commented out below.
  TODO: figure out semantics of catchAdv.  At least it compiles...
-}
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
            ,unlifter,unlift,unlift1,unlift2) where
-- import MonadBase
import Control.Exception(Exception,try)
import Control.Monad(MonadPlus(..),liftM,when)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent(forkIO,myThreadId)
import Control.Concurrent.Chan(Chan,newChan,readChan,writeChan)
import Control.Concurrent.MVar(MVar,newMVar,newEmptyMVar,isEmptyMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import GHC.Conc(unsafeIOToSTM)
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
import Data.Typeable(Typeable)
class MonadAdvSTM m where
  onCommit :: IO a -> m ()
  onRetry :: IO a -> m ()
  orElseAdv :: m a -> m a -> m a
  retryAdv :: m a
  atomicAdv :: m a -> IO a
  catchAdv :: m a -> (Exception -> m a) -> m a
  liftAdv :: STM a -> m a
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar,RetryVar)
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
instance MonadPlus AdvSTM where
  mzero = retryAdv
  mplus = orElseAdv
-}
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith io = onRetry io >> retryAdv
instance MonadAdvSTM AdvSTM where
  onCommit io = do
    commitVar <- AdvSTM $ asks fst
    commitFun <- liftAdv $ readTVar commitVar
    liftAdv $ writeTVar commitVar (commitFun . (io >>))
  onRetry io = do
    retryVar <- AdvSTM $ asks snd
    liftAdv . unsafeIOToSTM $ do
      may'retryFun <- tryTakeMVar retryVar
      let retryFun = maybe (io >>) (. (io >>)) may'retryFun
      putMVar retryVar $! retryFun
  orElseAdv = mplus
  retryAdv = liftAdv retry -- the same as retryAdv = mzero
  atomicAdv = runAdvSTM
  catchAdv action handler = do
    action' <- unlift action
    handler' <- unlift1 handler
    liftAdv $ catchSTM action' handler'
  liftAdv = AdvSTM . lift
-- Helper thread
spawn'retry'thread nextJob atEnd = forkIO $ loop
  where loop = do
          may'job <- nextJob
          case may'job of
            Nothing -> atEnd
            Just job -> try job >> loop
-- This replaces "atomically"
-- onRetry/retryWith actions are sent to a helper thread
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
  commitVar <- newTVarIO id    -- todo after a commit
  retryVar <- newEmptyMVar    -- filled if something todo upon a retry
  chanVar <- newIORef Nothing  -- send retry jobs to helper thread
  endVar <- newEmptyMVar      -- listen for helper thread to finish
  let check'retry = do
        unsafeIOToSTM $ do
          may'todo <- tryTakeMVar retryVar
          case may'todo of
            Nothing -> return ()
            Just retryFun -> do
              may'chan <- readIORef chanVar
              chan <- case may'chan of
                        Nothing -> do
                          chan <- newChan
                          writeIORef chanVar (Just chan)
                          spawn'retry'thread (readChan chan) (putMVar endVar ())
                          return chan
                        Just chan -> return chan
              writeChan chan (Just (retryFun (return())))
        retry
  let wait'retry'finished = do
        may'chan <- readIORef chanVar
        case may'chan of
          Nothing -> return ()
          Just chan -> do
            writeChan chan Nothing
            takeMVar endVar
  let wrappedAction = (runReaderT action (commitVar,retryVar))
            `orElse` (check'retry)
  result <- atomically $ wrappedAction
  commitFun <- atomically (readTVar commitVar)
  commitFun (return ())
  wait'retry'finished
  return result
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
  return (u f)
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
  u <- unlifter
  return (\x y -> u (f x y))
printThread x = do
  tid <- myThreadId
  print (tid,x)
-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
  onCommit (printThread "onCommit Start")
  onRetry (printThread "onRetry Start")
  v <- liftAdv $ newTVar 7
  liftAdv $ writeTVar v 42
  onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
  onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
  choice <- liftAdv $ readTVar todo
  case choice of
    True -> return "foo"
    False -> retryWith $ do
      atomically (writeTVar todo True)
      printThread "Flipped choice to True to avoid infinite loop"
-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (printThread "onCommit Start")
    onRetry (printThread "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
    onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do
        atomically (writeTVar todo True)
        printThread "Flipped choice to True to avoid infinite loop"
-- Same example as testUnlift, but use forkIO inside onRetry
testFork :: TVar Bool -> AdvSTM [Char]
testFork todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (printThread "onCommit Start")
    onRetry (printThread "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
    onRetry (forkIO  (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) >> return () )
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do
        atomically (writeTVar todo True)
        printThread "Flipped choice to True to avoid infinite loop"
-- Example similar to Simon's suggested example:
countRetries :: (MonadAdvSTM m, Monad m, Enum a) => IORef a -> m a1 -> m a1
countRetries ioref action =
  let incr = do old <- readIORef ioref
                writeIORef ioref $! (succ old)
  in action `orElseAdv` (retryWith incr)
-- Load this file in GHCI and execute main to run the test:
main = do
  printThread "hello world"
  printThread ""
   counter <- newIORef 0
   counter <- newIORef 0
   result <- runAdvSTM (test todo)
  todo <- newTVarIO False
   print ("result",result)
  printThread "test"
   print "bye world"
   result <- runAdvSTM (countRetries counter $ test todo)
   retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "testUnlift"
  result <- runAdvSTM (countRetries counter $ testUnlift todo)
  retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "testFork"
  result <- runAdvSTM (countRetries counter $ testFork todo)
  retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "bye world"
</haskell>
 
== Just onCommit ==
 
Leaving out <code>onRetry</code>/<code>retryWith</code> makes it much simpler:
 
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]]
<haskell>
{- November 24th, 2006
 
  Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
 
  This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
  mailing list, in which the type and semantics of onCommit and were
  put forth.
 
-}
 
module AdvSTM(MonadAdvSTM(..),AdvSTM,atomic
            ,unlifter,unlift,unlift1,unlift2) where
 
-- import MonadBase
import Control.Exception(Exception)
import Control.Monad(MonadPlus(..),join,liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,readTVar,writeTVar)
import Data.Typeable(Typeable)
import Control.Concurrent(forkIO,killThread)
import Control.Monad(when)
 
class MonadAdvSTM m where
  onCommit :: IO a -> m ()
  orElseAdv :: m a -> m a -> m a
  retryAdv :: m a
  atomicAdv :: m a -> IO a
  catchAdv :: m a -> (Exception -> m a) -> m a
  liftAdv :: STM a -> m a
 
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar)
type CommitVar = TVar (IO ()->IO ())
 
instance MonadAdvSTM AdvSTM where
  onCommit io = do
    cv <- AdvSTM $ ask
    old <- liftAdv $ readTVar cv
    liftAdv $ writeTVar cv (old . (io >>))
  orElseAdv = mplus
  retryAdv = mzero
  atomicAdv = runAdvSTM
  catchAdv action handler = do
    action' <- unlift action
    handler' <- unlift1 handler
    liftAdv $ catchSTM action' handler'
  liftAdv = AdvSTM . lift
 
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
  cv <- newTVarIO id
  let commit answer = do
        cFun <- lift $ readTVar cv
        return (cFun (return ()) >> return answer)
      wrappedAction = (runReaderT (action >>= commit) cv)
  join . atomically $ wrappedAction
 
atomic :: AdvSTM a -> IO a
atomic = atomicAdv
 
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
 
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
 
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)
 
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
  return (u f)
 
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))
 
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
   u <- unlifter
  return (\x y -> u (f x y))
 
-- From here on this is example code
 
test = atomicAdv $ (unlift (onCommit (print "hello")))
 
test2 = atomicAdv $ do op <- unlift (onCommit (print "world"))
                      liftAdv op
 
test3 = do
  v <- newTVarIO 10
  atomic (onCommit (atomically $ writeTVar v 20))
  atomic (liftAdv (readTVar v) >>= \x -> onCommit (print x) :: AdvSTM ())
 
-- Prints 10 9 8 7 6 5 4 3 2 1 0
test4 = do
  v <- newTVarIO 10
  let loop = atomic $ do
        onC <- unlift1 onCommit
        liftAdv $ do
          x <- readTVar v
          writeTVar v (pred x)
          onC (print x)
          if x>0 then onC loop
                else retry
 
      bump 0 = atomic $ do
        x <- liftAdv $ readTVar v
        when (x > 0) retryAdv
        onCommit (print "bump at 0, done")
 
      bump i = atomic $ do
        x <- liftAdv $ readTVar v
        if x <= 0 then do liftAdv $ writeTVar v 10
                          onCommit (print ("bump by 10",i))
                          onCommit (bump (pred i))
                  else retryAdv
 
  tid <- forkIO (loop)
  bump 5
  killThread tid
 
main = do
  op <- test    -- no print
  atomically op  -- no print
  test2          -- prints
  test3          -- prints 20
  test4
</haskell>
</haskell>

Latest revision as of 05:20, 12 July 2021


Caveat

The onCommit works great. The onRetry/retryWith really ought to be implemented with changes in the runtime. The Helper Thread code is a very close attempt to simulate the correct semantics. The Single Thread code is flawed since it will get caught in a busy wait if the onRetry commands do not allow for a commit on the following re-attempt.

Email

The e-mail that inspired this monad, and the monad itself:

From: Simon Peyton-Jones <simonpj@microsoft.com>
To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>,
	Benjamin Franksen <benjamin.franksen@bessy.de>
Cc: "haskell-cafe@haskell.org" <haskell-cafe@haskell.org>
Subject: RE: [Haskell] Re: [Haskell-cafe] SimonPJ and Tim Harris explain STM
	-	video
Date: Fri, 24 Nov 2006 08:22:36 +0000

| The basic idea is to provide a way for a transaction to call into transaction-aware libraries.  The libraries
| can register callbacks for if the transaction commits (to actually do any "O") and for if the transaction
| aborts (to re-buffer any "I" that the transaction has consumed).  In addition, a library providing access
| to another transactional abstraction (e.g. a database supporting transactions) can perform a 2-phase
| commit that means that the memory transaction and database transaction either both commit or both
| abort.
Yes, I have toyed with extending GHC's implementation of STM to support
       onCommit :: IO a -> STM ()

The idea is that onCommit would queue up an IO action to be performed when the transaction commits, but without any atomicity guarantee. If the transaction retries, the action is discarded. Now you could say

       atomic (do {
         xv <- readTVar x
         yv <- readTVar y
         if xv > yv
            then onCommit launchMissiles
            else return () })

and the missiles would only get launched when the transaction successfully commits.

This is pure programming convenience. It's always possible to make an existing Haskell STM transaction that *returns* an IO action, which is performed by the caller, thus:

dO { action <- atomic (do {
         xv <- readTVar x;
         yv <- readTVar y;
         if xv > yv
            then return launchMissiles
            else return (return ()) }) ;
     action }

All onCommit does is make it more convenient. Perhaps a *lot* more convenient.

I have also toyed with adding

       retryWith :: IO a -> STM ()

The idea here is that the transction is undone (i.e. just like the 'retry' combinator), then the specified action is performed, and then the transaction is retried. Again no atomicity guarantee. If there's an orElse involved, both actions would get done.

Unlike onCommit, onRetry adds new power. Suppose you have a memory buffer, with an STM interface:

   getLine :: Buffer -> STM STring

This is the way to do transactional input: if there is not enough input, the transaction retries; and the effects of getLine aren't visible until the transaction commits. The problem is that if there is not enough data in the buffer, getLine will retry; but alas there is no way at present to "tell" someone to fill the buffer with more data.

onRetry would fix that. getLine could say

   if <not enough data> then retryWith <fill-buffer action>

It would also make it possible to count how many retries happened:

  atomic (<transaction> `orElse` retryWith <increment retry counter>)

I have not implemented either of these, but I think they'd be cool.

Simon

PS: I agree wholeheartedly with this:

| Of course, these solutions don't deal with the question of atomic blocks that want to perform output
| (e.g. to the console) and receive input in response to that.  My view at the moment is _that does not
| make sense in an atomic block_ -- the output and input can't be performed atomically because the
| intervening state must be visible for the user to respond to.
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

Single Threaded code

This Single Threaded code can get caught in a busy wait. The Helper Thread code below is better.

This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights

{- November 24th, 2006

  Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>

  This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
  mailing list, in which the type and semantics of onCommit and
  retryWith were put forth.

  The semantics of printing the contents of the TVar "v" created in
  test via retryWith may or may not be well defined.  With GHC 6.6 I get

*AdvSTM> main
"hello world"

"test"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
("onCommit v",42)
("result","foo","retries",1)

"testUnlift"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
("onCommit v",42)
("result","foo","retries",2)

"bye world"

  Aside from that I think the unsafeIOToSTM is not really unsafe here
  since it writes to privately created and maintained variables.

  Since the implementation is hidden it could be changed from ReaderT
  to some other scheme.

  Once could also use MonadBase from
  http://haskell.org/haskellwiki/New_monads/MonadBase to help with the
  lifting, but this has been commented out below.

  TODO: figure out semantics of catchAdv.  At least it compiles...
-}

module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
             ,unlifter,unlift,unlift1,unlift2) where

-- import MonadBase
import Control.Exception(Exception)
import Control.Monad(MonadPlus(..),liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.MVar(MVar,newEmptyMVar,newMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import Data.Generics(Data)
import Data.Maybe(maybe)
import Data.Typeable(Typeable)
import GHC.Conc(unsafeIOToSTM)
-- for countRetries example
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)

class (Monad m) => MonadAdvSTM m where
  onCommit :: IO a -> m ()
  onRetry :: IO a -> m ()
  orElseAdv :: m a -> m a -> m a
  retryAdv :: m a
  atomicAdv :: m a -> IO a
  catchAdv :: m a -> (Exception -> m a) -> m a
  liftAdv :: STM a -> m a

-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar,RetryVar)
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())

{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
instance MonadPlus AdvSTM where
  mzero = retryAdv
  mplus = orElseAdv
-}

-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith io = onRetry io >> retryAdv

instance MonadAdvSTM AdvSTM where
  onCommit io = do
    commitVar <- AdvSTM $ asks fst
    old <- liftAdv $ readTVar commitVar
    liftAdv $ writeTVar commitVar (old . (io >>))
  onRetry io = do
    retryVar <- AdvSTM $ asks snd
    liftAdv $ unsafeIOToSTM (do
      may'do <- tryTakeMVar retryVar
      let todo = maybe (io >>) (. (io >>)) may'do
      seq todo (putMVar retryVar todo))
  orElseAdv = mplus
  retryAdv = liftAdv retry -- the same as retryAdv = mzero
  atomicAdv = runAdvSTM
  catchAdv action handler = do
    action' <- unlift action
    handler' <- unlift1 handler
    liftAdv $ catchSTM action' handler'
  liftAdv = AdvSTM . lift

-- This replaces "atomically"
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
  commitVar <- newTVarIO id
  retryVar <- newMVar id
  let check'retry = do
        may'todo <- unsafeIOToSTM $ tryTakeMVar retryVar
        maybe retry (return . Right) may'todo
  let wrappedAction = (runReaderT (liftM Left action) (commitVar,retryVar))
                      `orElse` (check'retry)
  let attempt = do
        result <- atomically $ wrappedAction
        case result of
          Left answer -> do
            cFun <- atomically (readTVar commitVar)
            cFun (return ())
            return answer
          Right rFun -> do
            rFun (return ())
            attempt
  attempt

-- Using ReaderT we can write "unlift" from AdvSTM into STM:

-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env

unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)

unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
  return (u f)

unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))

unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
  u <- unlifter
  return (\x y -> u (f x y))

-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
  onCommit (print "onCommit Start")
  onRetry (print "onRetry Start")
  v <- liftAdv $ newTVar 7
  liftAdv $ writeTVar v 42
  onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x))
  onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x))
  choice <- liftAdv $ readTVar todo
  case choice of
    True -> return "foo"
    False -> retryWith $ do
      atomically (writeTVar todo True)
      print "Flipped choice to True to avoid infinite loop"

-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (print "onCommit Start")
    onRetry (print "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x))
    onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x))
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do 
        atomically (writeTVar todo True)
        print "Flipped choice to True to avoid infinite loop"

-- Example similar to Simon's suggested example:
countRetries :: (MonadAdvSTM m, Enum a) => IORef a -> m a1 -> m a1
countRetries ioref action =
  let incr = do old <- readIORef ioref
                writeIORef ioref $! (succ old)
  in action `orElseAdv` (retryWith incr)

-- Load this file in GHCI and execute main to run the test:
main = do
  print "hello world"
  putStrLn ""
  counter <- newIORef 0
  todo <- newTVarIO False
  print "test"
  result <- runAdvSTM (countRetries counter $ test todo)
  retries <- readIORef counter
  print ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  putStrLn ""
  print "testUnlift"
  result <- runAdvSTM (countRetries counter $ testUnlift todo)
  retries <- readIORef counter
  print ("result",result,"retries",retries)
  putStrLn ""
  print "bye world"

Helper Thread code

This is my preferred solution at the moment.

This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights

{- November 24th, 2006

  Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>

  This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
  mailing list, in which the type and semantics of onCommit and
  withRetry were put forth.

  The semantics of printing the contents of the TVar "v" created in
  test via retryWith may or may not be well defined.  With GHC 6.6 I get

*AdvSTM> main
(ThreadId 415,"hello world")
(ThreadId 415,"")
(ThreadId 415,"test")
(ThreadId 416,"onRetry Start")
(ThreadId 416,("onRetry v",7))
(ThreadId 416,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",1))
(ThreadId 415,"")
(ThreadId 415,"testUnlift")
(ThreadId 417,"onRetry Start")
(ThreadId 417,("onRetry v",7))
(ThreadId 417,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",2))
(ThreadId 415,"")
(ThreadId 415,"testFork")
(ThreadId 418,"onRetry Start")
(ThreadId 418,"Flipped choice to True to avoid infinite loop")
(ThreadId 419,("onRetry v",7))
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",3))
(ThreadId 415,"")
(ThreadId 415,"bye world")

  Aside from that I think the unsafeIOToSTM is not really unsafe here
  since it writes to privately created and maintained variables.

  Since the implementation is hidden it could be changed from ReaderT
  to some other scheme.

  Once could also use MonadBase from
  http://haskell.org/haskellwiki/New_monads/MonadBase to help with the
  lifting, but this has been commented out below.

  TODO: figure out semantics of catchAdv.  At least it compiles...
-}

module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
             ,unlifter,unlift,unlift1,unlift2) where

-- import MonadBase
import Control.Exception(Exception,try)
import Control.Monad(MonadPlus(..),liftM,when)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent(forkIO,myThreadId)
import Control.Concurrent.Chan(Chan,newChan,readChan,writeChan)
import Control.Concurrent.MVar(MVar,newMVar,newEmptyMVar,isEmptyMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import GHC.Conc(unsafeIOToSTM)
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
import Data.Typeable(Typeable)

class MonadAdvSTM m where
  onCommit :: IO a -> m ()
  onRetry :: IO a -> m ()
  orElseAdv :: m a -> m a -> m a
  retryAdv :: m a
  atomicAdv :: m a -> IO a
  catchAdv :: m a -> (Exception -> m a) -> m a
  liftAdv :: STM a -> m a

-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar,RetryVar)
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())

{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
instance MonadPlus AdvSTM where
  mzero = retryAdv
  mplus = orElseAdv
-}

-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith io = onRetry io >> retryAdv

instance MonadAdvSTM AdvSTM where
  onCommit io = do
    commitVar <- AdvSTM $ asks fst
    commitFun <- liftAdv $ readTVar commitVar
    liftAdv $ writeTVar commitVar (commitFun . (io >>))
  onRetry io = do
    retryVar <- AdvSTM $ asks snd
    liftAdv . unsafeIOToSTM $ do
      may'retryFun <- tryTakeMVar retryVar
      let retryFun = maybe (io >>) (. (io >>)) may'retryFun
      putMVar retryVar $! retryFun
  orElseAdv = mplus
  retryAdv = liftAdv retry -- the same as retryAdv = mzero
  atomicAdv = runAdvSTM
  catchAdv action handler = do
    action' <- unlift action
    handler' <- unlift1 handler
    liftAdv $ catchSTM action' handler'
  liftAdv = AdvSTM . lift

-- Helper thread
spawn'retry'thread nextJob atEnd = forkIO $ loop
  where loop = do
          may'job <- nextJob
          case may'job of
            Nothing -> atEnd
            Just job -> try job >> loop

-- This replaces "atomically"
-- onRetry/retryWith actions are sent to a helper thread
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
  commitVar <- newTVarIO id    -- todo after a commit
  retryVar <- newEmptyMVar     -- filled if something todo upon a retry
  chanVar <- newIORef Nothing  -- send retry jobs to helper thread
  endVar <- newEmptyMVar       -- listen for helper thread to finish
  let check'retry = do 
        unsafeIOToSTM $ do
          may'todo <- tryTakeMVar retryVar
          case may'todo of
            Nothing -> return ()
            Just retryFun -> do
              may'chan <- readIORef chanVar
              chan <- case may'chan of
                        Nothing -> do
                          chan <- newChan
                          writeIORef chanVar (Just chan)
                          spawn'retry'thread (readChan chan) (putMVar endVar ())
                          return chan
                        Just chan -> return chan
              writeChan chan (Just (retryFun (return())))
        retry
  let wait'retry'finished = do
        may'chan <- readIORef chanVar
        case may'chan of
          Nothing -> return ()
          Just chan -> do
            writeChan chan Nothing
            takeMVar endVar
  let wrappedAction = (runReaderT action (commitVar,retryVar))
             `orElse` (check'retry)
  result <- atomically $ wrappedAction
  commitFun <- atomically (readTVar commitVar)
  commitFun (return ())
  wait'retry'finished
  return result

-- Using ReaderT we can write "unlift" from AdvSTM into STM:

-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env

unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)

unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
  return (u f)

unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))

unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
  u <- unlifter
  return (\x y -> u (f x y))

printThread x = do
  tid <- myThreadId
  print (tid,x)

-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
  onCommit (printThread "onCommit Start")
  onRetry (printThread "onRetry Start")
  v <- liftAdv $ newTVar 7
  liftAdv $ writeTVar v 42
  onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
  onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
  choice <- liftAdv $ readTVar todo
  case choice of
    True -> return "foo"
    False -> retryWith $ do
      atomically (writeTVar todo True)
      printThread "Flipped choice to True to avoid infinite loop"

-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (printThread "onCommit Start")
    onRetry (printThread "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
    onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do 
        atomically (writeTVar todo True)
        printThread "Flipped choice to True to avoid infinite loop"

-- Same example as testUnlift, but use forkIO inside onRetry
testFork :: TVar Bool -> AdvSTM [Char]
testFork todo = do
  onCommit <- unlift1 onCommit
  onRetry <- unlift1 onRetry
  retryWith <- unlift1 retryWith
  liftAdv $ do
    onCommit (printThread "onCommit Start")
    onRetry (printThread "onRetry Start")
    v <- newTVar 7
    writeTVar v 42
    onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
    onRetry (forkIO  (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) >> return () )
    choice <- readTVar todo
    case choice of
      True -> return "foo"
      False -> retryWith $ do 
        atomically (writeTVar todo True)
        printThread "Flipped choice to True to avoid infinite loop"

-- Example similar to Simon's suggested example:
countRetries :: (MonadAdvSTM m, Monad m, Enum a) => IORef a -> m a1 -> m a1
countRetries ioref action =
  let incr = do old <- readIORef ioref
                writeIORef ioref $! (succ old)
  in action `orElseAdv` (retryWith incr)

-- Load this file in GHCI and execute main to run the test:
main = do
  printThread "hello world"
  printThread ""
  counter <- newIORef 0
  todo <- newTVarIO False
  printThread "test"
  result <- runAdvSTM (countRetries counter $ test todo)
  retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "testUnlift"
  result <- runAdvSTM (countRetries counter $ testUnlift todo)
  retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "testFork"
  result <- runAdvSTM (countRetries counter $ testFork todo)
  retries <- readIORef counter
  printThread ("result",result,"retries",retries)
  atomically (writeTVar todo False)
  printThread ""
  printThread "bye world"

Just onCommit

Leaving out onRetry/retryWith makes it much simpler:

This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights

{- November 24th, 2006

  Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>

  This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
  mailing list, in which the type and semantics of onCommit and were
  put forth.

-}

module AdvSTM(MonadAdvSTM(..),AdvSTM,atomic
             ,unlifter,unlift,unlift1,unlift2) where

-- import MonadBase
import Control.Exception(Exception)
import Control.Monad(MonadPlus(..),join,liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,readTVar,writeTVar)
import Data.Typeable(Typeable)
import Control.Concurrent(forkIO,killThread)
import Control.Monad(when)

class MonadAdvSTM m where
  onCommit :: IO a -> m ()
  orElseAdv :: m a -> m a -> m a
  retryAdv :: m a
  atomicAdv :: m a -> IO a
  catchAdv :: m a -> (Exception -> m a) -> m a
  liftAdv :: STM a -> m a

-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar)
type CommitVar = TVar (IO ()->IO ())

instance MonadAdvSTM AdvSTM where
  onCommit io = do
    cv <- AdvSTM $ ask
    old <- liftAdv $ readTVar cv
    liftAdv $ writeTVar cv (old . (io >>))
  orElseAdv = mplus
  retryAdv = mzero
  atomicAdv = runAdvSTM
  catchAdv action handler = do
    action' <- unlift action
    handler' <- unlift1 handler
    liftAdv $ catchSTM action' handler'
  liftAdv = AdvSTM . lift

runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
  cv <- newTVarIO id
  let commit answer = do
        cFun <- lift $ readTVar cv
        return (cFun (return ()) >> return answer)
      wrappedAction = (runReaderT (action >>= commit) cv)
  join . atomically $ wrappedAction

atomic :: AdvSTM a -> IO a
atomic = atomicAdv

-- Using ReaderT we can write "unlift" from AdvSTM into STM:

-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env

unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
  env <- AdvSTM ask
  return (runWith env)

unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
  u <- unlifter
  return (u f)

unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
  u <- unlifter
  return (\x -> u (f x))

unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
  u <- unlifter
  return (\x y -> u (f x y))

-- From here on this is example code

test = atomicAdv $ (unlift (onCommit (print "hello")))

test2 = atomicAdv $ do op <- unlift (onCommit (print "world"))
                       liftAdv op

test3 = do
  v <- newTVarIO 10
  atomic (onCommit (atomically $ writeTVar v 20))
  atomic (liftAdv (readTVar v) >>= \x -> onCommit (print x) :: AdvSTM ())

-- Prints 10 9 8 7 6 5 4 3 2 1 0
test4 = do
  v <- newTVarIO 10
  let loop = atomic $ do
        onC <- unlift1 onCommit
        liftAdv $ do
          x <- readTVar v
          writeTVar v (pred x)
          onC (print x)
          if x>0 then onC loop
                 else retry

      bump 0 = atomic $ do
        x <- liftAdv $ readTVar v
        when (x > 0) retryAdv
        onCommit (print "bump at 0, done")

      bump i = atomic $ do
        x <- liftAdv $ readTVar v
        if x <= 0 then do liftAdv $ writeTVar v 10
                          onCommit (print ("bump by 10",i))
                          onCommit (bump (pred i))
                  else retryAdv

  tid <- forkIO (loop)
  bump 5
  killThread tid

main = do
  op <- test     -- no print
  atomically op  -- no print
  test2          -- prints
  test3          -- prints 20
  test4