New monads/MonadAdvSTM: Difference between revisions
m (Make countRetries more polymorphic) |
m (Minor formatting changes) |
||
(12 intermediate revisions by 4 users not shown) | |||
Line 1: | Line 1: | ||
[[Category:Code]] | [[Category:Code]] | ||
__TOC__ | |||
== Caveat == | |||
The <code>onCommit</code> works great. The <code>onRetry</code>/<code>retryWith</code> really ought to be implemented with changes in the runtime. The ''Helper Thread'' code is a very close attempt to simulate the correct semantics. The ''Single Thread'' code is flawed since it will get caught in a busy wait if the <code>onRetry</code> commands do not allow for a commit on the following re-attempt. | |||
== Email == | |||
The e-mail that inspired this monad, and the monad itself: | |||
<pre> | |||
From: Simon Peyton-Jones <simonpj@microsoft.com> | From: Simon Peyton-Jones <simonpj@microsoft.com> | ||
To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>, | To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>, | ||
Line 17: | Line 26: | ||
| commit that means that the memory transaction and database transaction either both commit or both | | commit that means that the memory transaction and database transaction either both commit or both | ||
| abort. | | abort. | ||
</pre> | |||
Yes, I have toyed with extending GHC's implementation of STM to support | {| | ||
|Yes, I have toyed with extending GHC's implementation of STM to support | |||
onCommit :: IO a -> STM () | onCommit :: IO a -> STM () | ||
Line 26: | Line 36: | ||
xv <- readTVar x | xv <- readTVar x | ||
yv <- readTVar y | yv <- readTVar y | ||
if xv>yv then | if xv > yv | ||
then onCommit launchMissiles | |||
else return () }) | else return () }) | ||
and the missiles would only get launched when the transaction successfully commits. | and the missiles would only get launched when the transaction successfully commits. | ||
Line 36: | Line 46: | ||
xv <- readTVar x; | xv <- readTVar x; | ||
yv <- readTVar y; | yv <- readTVar y; | ||
if xv>yv then | if xv > yv | ||
then return launchMissiles | |||
else return (return ()) }) ; | else return (return ()) }) ; | ||
action } | action } | ||
Line 65: | Line 75: | ||
PS: I agree wholeheartedly with this: | PS: I agree wholeheartedly with this: | ||
|} | |||
<pre> | |||
| Of course, these solutions don't deal with the question of atomic blocks that want to perform output | | Of course, these solutions don't deal with the question of atomic blocks that want to perform output | ||
| (e.g. to the console) and receive input in response to that. My view at the moment is _that does not | | (e.g. to the console) and receive input in response to that. My view at the moment is _that does not | ||
Line 74: | Line 85: | ||
Haskell-Cafe@haskell.org | Haskell-Cafe@haskell.org | ||
http://www.haskell.org/mailman/listinfo/haskell-cafe | http://www.haskell.org/mailman/listinfo/haskell-cafe | ||
</pre> | |||
== Single Threaded code == | |||
This Single Threaded code can get caught in a busy wait. The Helper Thread code below is better. | |||
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]] | |||
<haskell> | <haskell> | ||
Line 80: | Line 97: | ||
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com> | Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com> | ||
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe | This is inspired by a post by Simon Peyton-Jones on the haskell-cafe | ||
mailing list, in which the type and semantics of onCommit and | mailing list, in which the type and semantics of onCommit and | ||
retryWith were put forth. | |||
The semantics of printing the contents of the TVar "v" created in | The semantics of printing the contents of the TVar "v" created in | ||
Line 92: | Line 107: | ||
*AdvSTM> main | *AdvSTM> main | ||
"hello world" | "hello world" | ||
" | |||
(" | "test" | ||
"onRetry Start" | |||
("onRetry v",7) | |||
"Flipped choice to True to avoid infinite loop" | "Flipped choice to True to avoid infinite loop" | ||
"onCommit Start" | "onCommit Start" | ||
("onCommit v",42) | ("onCommit v",42) | ||
("result","foo") | ("result","foo","retries",1) | ||
"testUnlift" | |||
"onRetry Start" | |||
("onRetry v",7) | |||
"Flipped choice to True to avoid infinite loop" | |||
"onCommit Start" | |||
("onCommit v",42) | |||
("result","foo","retries",2) | |||
"bye world" | "bye world" | ||
Line 113: | Line 139: | ||
-} | -} | ||
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith) where | module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries | ||
,unlifter,unlift,unlift1,unlift2) where | |||
-- import MonadBase | -- import MonadBase | ||
Line 119: | Line 146: | ||
import Control.Monad(MonadPlus(..),liftM) | import Control.Monad(MonadPlus(..),liftM) | ||
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks) | import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks) | ||
import Control.Concurrent.MVar(MVar,newEmptyMVar,newMVar,takeMVar,tryTakeMVar,putMVar) | |||
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically) | import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically) | ||
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar) | import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar) | ||
import Data.Generics(Data) | |||
import Data.Maybe(maybe) | |||
import Data.Typeable(Typeable) | |||
import GHC.Conc(unsafeIOToSTM) | import GHC.Conc(unsafeIOToSTM) | ||
-- for countRetries example | |||
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef) | import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef) | ||
class MonadAdvSTM m where | class (Monad m) => MonadAdvSTM m where | ||
onCommit :: IO a -> m () | onCommit :: IO a -> m () | ||
onRetry :: IO a -> m () | onRetry :: IO a -> m () | ||
Line 136: | Line 166: | ||
-- Export type but not constructor! | -- Export type but not constructor! | ||
newtype AdvSTM a = AdvSTM (ReaderT | newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable) | ||
type CommitVar = TVar ( | type Env = (CommitVar,RetryVar) | ||
type RetryVar = | type CommitVar = TVar (IO ()->IO ()) | ||
type RetryVar = MVar (IO ()->IO ()) | |||
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead | {- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead | ||
Line 147: | Line 178: | ||
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift | -- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift | ||
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b | retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b | ||
retryWith io = onRetry io >> retryAdv | retryWith io = onRetry io >> retryAdv | ||
Line 153: | Line 183: | ||
instance MonadAdvSTM AdvSTM where | instance MonadAdvSTM AdvSTM where | ||
onCommit io = do | onCommit io = do | ||
commitVar <- AdvSTM $ asks fst | |||
old <- liftAdv $ readTVar | old <- liftAdv $ readTVar commitVar | ||
liftAdv $ writeTVar | liftAdv $ writeTVar commitVar (old . (io >>)) | ||
onRetry io = do | onRetry io = do | ||
retryVar <- AdvSTM $ asks snd | |||
liftAdv $ unsafeIOToSTM | liftAdv $ unsafeIOToSTM (do | ||
may'do <- tryTakeMVar retryVar | |||
let todo = maybe (io >>) (. (io >>)) may'do | |||
seq todo (putMVar retryVar todo)) | |||
orElseAdv = mplus | orElseAdv = mplus | ||
retryAdv = liftAdv retry -- the same as retryAdv = mzero | retryAdv = liftAdv retry -- the same as retryAdv = mzero | ||
atomicAdv = runAdvSTM | atomicAdv = runAdvSTM | ||
catchAdv | catchAdv action handler = do | ||
action' <- unlift action | |||
handler' <- unlift1 handler | |||
liftAdv $ catchSTM action' handler' | |||
liftAdv = AdvSTM . lift | liftAdv = AdvSTM . lift | ||
Line 181: | Line 204: | ||
runAdvSTM :: AdvSTM a -> IO a | runAdvSTM :: AdvSTM a -> IO a | ||
runAdvSTM (AdvSTM action) = do | runAdvSTM (AdvSTM action) = do | ||
commitVar <- newTVarIO id | |||
retryVar <- newMVar id | |||
let wrappedAction = (runReaderT (liftM | let check'retry = do | ||
`orElse` ( | may'todo <- unsafeIOToSTM $ tryTakeMVar retryVar | ||
maybe retry (return . Right) may'todo | |||
let wrappedAction = (runReaderT (liftM Left action) (commitVar,retryVar)) | |||
`orElse` (check'retry) | |||
let attempt = do | |||
result <- atomically $ wrappedAction | result <- atomically $ wrappedAction | ||
case result of | case result of | ||
Left answer -> do | |||
cFun <- atomically (readTVar | cFun <- atomically (readTVar commitVar) | ||
cFun (return ()) | |||
return answer | return answer | ||
Right rFun -> do | |||
rFun | rFun (return ()) | ||
attempt | |||
attempt | |||
-- | -- Using ReaderT we can write "unlift" from AdvSTM into STM: | ||
test :: TVar Bool -> | -- Do not export runWith | ||
runWith :: Env -> AdvSTM t -> STM t | |||
runWith env (AdvSTM action) = runReaderT action env | |||
unlifter :: AdvSTM (AdvSTM a -> STM a) | |||
unlifter = do | |||
env <- AdvSTM ask | |||
return (runWith env) | |||
unlift :: AdvSTM a -> AdvSTM (STM a) | |||
unlift f = do | |||
u <- unlifter | |||
return (u f) | |||
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a) | |||
unlift1 f = do | |||
u <- unlifter | |||
return (\x -> u (f x)) | |||
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a) | |||
unlift2 f = do | |||
u <- unlifter | |||
return (\x y -> u (f x y)) | |||
-- Example code using the above, lifting into MonadAdvSTM: | |||
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char] | |||
test todo = do | test todo = do | ||
onCommit (print "onCommit Start") | onCommit (print "onCommit Start") | ||
Line 215: | Line 264: | ||
atomically (writeTVar todo True) | atomically (writeTVar todo True) | ||
print "Flipped choice to True to avoid infinite loop" | print "Flipped choice to True to avoid infinite loop" | ||
-- Same example as test, but unlifting from AdvSTM | |||
testUnlift :: TVar Bool -> AdvSTM [Char] | |||
testUnlift todo = do | |||
onCommit <- unlift1 onCommit | |||
onRetry <- unlift1 onRetry | |||
retryWith <- unlift1 retryWith | |||
liftAdv $ do | |||
onCommit (print "onCommit Start") | |||
onRetry (print "onRetry Start") | |||
v <- newTVar 7 | |||
writeTVar v 42 | |||
onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x)) | |||
onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x)) | |||
choice <- readTVar todo | |||
case choice of | |||
True -> return "foo" | |||
False -> retryWith $ do | |||
atomically (writeTVar todo True) | |||
print "Flipped choice to True to avoid infinite loop" | |||
-- Example similar to Simon's suggested example: | -- Example similar to Simon's suggested example: | ||
countRetries :: (MonadAdvSTM m, Enum a) => IORef a -> m a1 -> m a1 | |||
countRetries ioref action = | |||
let incr = do old <- readIORef ioref | |||
writeIORef ioref $! (succ old) | |||
in action `orElseAdv` (retryWith incr) | |||
-- Load this file in GHCI and execute main to run the test: | |||
main = do | |||
print "hello world" | |||
putStrLn "" | |||
counter <- newIORef 0 | |||
todo <- newTVarIO False | |||
print "test" | |||
result <- runAdvSTM (countRetries counter $ test todo) | |||
retries <- readIORef counter | |||
print ("result",result,"retries",retries) | |||
atomically (writeTVar todo False) | |||
putStrLn "" | |||
print "testUnlift" | |||
result <- runAdvSTM (countRetries counter $ testUnlift todo) | |||
retries <- readIORef counter | |||
print ("result",result,"retries",retries) | |||
putStrLn "" | |||
print "bye world" | |||
</haskell> | |||
== Helper Thread code == | |||
This is my preferred solution at the moment. | |||
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]] | |||
<haskell> | |||
{- November 24th, 2006 | |||
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com> | |||
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe | |||
mailing list, in which the type and semantics of onCommit and | |||
withRetry were put forth. | |||
The semantics of printing the contents of the TVar "v" created in | |||
test via retryWith may or may not be well defined. With GHC 6.6 I get | |||
*AdvSTM> main | |||
(ThreadId 415,"hello world") | |||
(ThreadId 415,"") | |||
(ThreadId 415,"test") | |||
(ThreadId 416,"onRetry Start") | |||
(ThreadId 416,("onRetry v",7)) | |||
(ThreadId 416,"Flipped choice to True to avoid infinite loop") | |||
(ThreadId 415,"onCommit Start") | |||
(ThreadId 415,("onCommit v",42)) | |||
(ThreadId 415,("result","foo","retries",1)) | |||
(ThreadId 415,"") | |||
(ThreadId 415,"testUnlift") | |||
(ThreadId 417,"onRetry Start") | |||
(ThreadId 417,("onRetry v",7)) | |||
(ThreadId 417,"Flipped choice to True to avoid infinite loop") | |||
(ThreadId 415,"onCommit Start") | |||
(ThreadId 415,("onCommit v",42)) | |||
(ThreadId 415,("result","foo","retries",2)) | |||
(ThreadId 415,"") | |||
(ThreadId 415,"testFork") | |||
(ThreadId 418,"onRetry Start") | |||
(ThreadId 418,"Flipped choice to True to avoid infinite loop") | |||
(ThreadId 419,("onRetry v",7)) | |||
(ThreadId 415,"onCommit Start") | |||
(ThreadId 415,("onCommit v",42)) | |||
(ThreadId 415,("result","foo","retries",3)) | |||
(ThreadId 415,"") | |||
(ThreadId 415,"bye world") | |||
Aside from that I think the unsafeIOToSTM is not really unsafe here | |||
since it writes to privately created and maintained variables. | |||
Since the implementation is hidden it could be changed from ReaderT | |||
to some other scheme. | |||
Once could also use MonadBase from | |||
http://haskell.org/haskellwiki/New_monads/MonadBase to help with the | |||
lifting, but this has been commented out below. | |||
TODO: figure out semantics of catchAdv. At least it compiles... | |||
-} | |||
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries | |||
,unlifter,unlift,unlift1,unlift2) where | |||
-- import MonadBase | |||
import Control.Exception(Exception,try) | |||
import Control.Monad(MonadPlus(..),liftM,when) | |||
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks) | |||
import Control.Concurrent(forkIO,myThreadId) | |||
import Control.Concurrent.Chan(Chan,newChan,readChan,writeChan) | |||
import Control.Concurrent.MVar(MVar,newMVar,newEmptyMVar,isEmptyMVar,takeMVar,tryTakeMVar,putMVar) | |||
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically) | |||
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar) | |||
import GHC.Conc(unsafeIOToSTM) | |||
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef) | |||
import Data.Typeable(Typeable) | |||
class MonadAdvSTM m where | |||
onCommit :: IO a -> m () | |||
onRetry :: IO a -> m () | |||
orElseAdv :: m a -> m a -> m a | |||
retryAdv :: m a | |||
atomicAdv :: m a -> IO a | |||
catchAdv :: m a -> (Exception -> m a) -> m a | |||
liftAdv :: STM a -> m a | |||
-- Export type but not constructor! | |||
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable) | |||
type Env = (CommitVar,RetryVar) | |||
type CommitVar = TVar (IO ()->IO ()) | |||
type RetryVar = MVar (IO ()->IO ()) | |||
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead | |||
instance MonadPlus AdvSTM where | |||
mzero = retryAdv | |||
mplus = orElseAdv | |||
-} | |||
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift | |||
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b | |||
retryWith io = onRetry io >> retryAdv | |||
instance MonadAdvSTM AdvSTM where | |||
onCommit io = do | |||
commitVar <- AdvSTM $ asks fst | |||
commitFun <- liftAdv $ readTVar commitVar | |||
liftAdv $ writeTVar commitVar (commitFun . (io >>)) | |||
onRetry io = do | |||
retryVar <- AdvSTM $ asks snd | |||
liftAdv . unsafeIOToSTM $ do | |||
may'retryFun <- tryTakeMVar retryVar | |||
let retryFun = maybe (io >>) (. (io >>)) may'retryFun | |||
putMVar retryVar $! retryFun | |||
orElseAdv = mplus | |||
retryAdv = liftAdv retry -- the same as retryAdv = mzero | |||
atomicAdv = runAdvSTM | |||
catchAdv action handler = do | |||
action' <- unlift action | |||
handler' <- unlift1 handler | |||
liftAdv $ catchSTM action' handler' | |||
liftAdv = AdvSTM . lift | |||
-- Helper thread | |||
spawn'retry'thread nextJob atEnd = forkIO $ loop | |||
where loop = do | |||
may'job <- nextJob | |||
case may'job of | |||
Nothing -> atEnd | |||
Just job -> try job >> loop | |||
-- This replaces "atomically" | |||
-- onRetry/retryWith actions are sent to a helper thread | |||
runAdvSTM :: AdvSTM a -> IO a | |||
runAdvSTM (AdvSTM action) = do | |||
commitVar <- newTVarIO id -- todo after a commit | |||
retryVar <- newEmptyMVar -- filled if something todo upon a retry | |||
chanVar <- newIORef Nothing -- send retry jobs to helper thread | |||
endVar <- newEmptyMVar -- listen for helper thread to finish | |||
let check'retry = do | |||
unsafeIOToSTM $ do | |||
may'todo <- tryTakeMVar retryVar | |||
case may'todo of | |||
Nothing -> return () | |||
Just retryFun -> do | |||
may'chan <- readIORef chanVar | |||
chan <- case may'chan of | |||
Nothing -> do | |||
chan <- newChan | |||
writeIORef chanVar (Just chan) | |||
spawn'retry'thread (readChan chan) (putMVar endVar ()) | |||
return chan | |||
Just chan -> return chan | |||
writeChan chan (Just (retryFun (return()))) | |||
retry | |||
let wait'retry'finished = do | |||
may'chan <- readIORef chanVar | |||
case may'chan of | |||
Nothing -> return () | |||
Just chan -> do | |||
writeChan chan Nothing | |||
takeMVar endVar | |||
let wrappedAction = (runReaderT action (commitVar,retryVar)) | |||
`orElse` (check'retry) | |||
result <- atomically $ wrappedAction | |||
commitFun <- atomically (readTVar commitVar) | |||
commitFun (return ()) | |||
wait'retry'finished | |||
return result | |||
-- Using ReaderT we can write "unlift" from AdvSTM into STM: | |||
-- Do not export runWith | |||
runWith :: Env -> AdvSTM t -> STM t | |||
runWith env (AdvSTM action) = runReaderT action env | |||
unlifter :: AdvSTM (AdvSTM a -> STM a) | |||
unlifter = do | |||
env <- AdvSTM ask | |||
return (runWith env) | |||
unlift :: AdvSTM a -> AdvSTM (STM a) | |||
unlift f = do | |||
u <- unlifter | |||
return (u f) | |||
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a) | |||
unlift1 f = do | |||
u <- unlifter | |||
return (\x -> u (f x)) | |||
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a) | |||
unlift2 f = do | |||
u <- unlifter | |||
return (\x y -> u (f x y)) | |||
printThread x = do | |||
tid <- myThreadId | |||
print (tid,x) | |||
-- Example code using the above, lifting into MonadAdvSTM: | |||
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char] | |||
test todo = do | |||
onCommit (printThread "onCommit Start") | |||
onRetry (printThread "onRetry Start") | |||
v <- liftAdv $ newTVar 7 | |||
liftAdv $ writeTVar v 42 | |||
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x)) | |||
onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) | |||
choice <- liftAdv $ readTVar todo | |||
case choice of | |||
True -> return "foo" | |||
False -> retryWith $ do | |||
atomically (writeTVar todo True) | |||
printThread "Flipped choice to True to avoid infinite loop" | |||
-- Same example as test, but unlifting from AdvSTM | |||
testUnlift :: TVar Bool -> AdvSTM [Char] | |||
testUnlift todo = do | |||
onCommit <- unlift1 onCommit | |||
onRetry <- unlift1 onRetry | |||
retryWith <- unlift1 retryWith | |||
liftAdv $ do | |||
onCommit (printThread "onCommit Start") | |||
onRetry (printThread "onRetry Start") | |||
v <- newTVar 7 | |||
writeTVar v 42 | |||
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x)) | |||
onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) | |||
choice <- readTVar todo | |||
case choice of | |||
True -> return "foo" | |||
False -> retryWith $ do | |||
atomically (writeTVar todo True) | |||
printThread "Flipped choice to True to avoid infinite loop" | |||
-- Same example as testUnlift, but use forkIO inside onRetry | |||
testFork :: TVar Bool -> AdvSTM [Char] | |||
testFork todo = do | |||
onCommit <- unlift1 onCommit | |||
onRetry <- unlift1 onRetry | |||
retryWith <- unlift1 retryWith | |||
liftAdv $ do | |||
onCommit (printThread "onCommit Start") | |||
onRetry (printThread "onRetry Start") | |||
v <- newTVar 7 | |||
writeTVar v 42 | |||
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x)) | |||
onRetry (forkIO (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) >> return () ) | |||
choice <- readTVar todo | |||
case choice of | |||
True -> return "foo" | |||
False -> retryWith $ do | |||
atomically (writeTVar todo True) | |||
printThread "Flipped choice to True to avoid infinite loop" | |||
-- Example similar to Simon's suggested example: | |||
countRetries :: (MonadAdvSTM m, Monad m, Enum a) => IORef a -> m a1 -> m a1 | countRetries :: (MonadAdvSTM m, Monad m, Enum a) => IORef a -> m a1 -> m a1 | ||
countRetries ioref action = | countRetries ioref action = | ||
Line 226: | Line 575: | ||
-- Load this file in GHCI and execute main to run the test: | -- Load this file in GHCI and execute main to run the test: | ||
main = do | main = do | ||
printThread "hello world" | |||
printThread "" | |||
counter <- newIORef 0 | |||
todo <- newTVarIO False | todo <- newTVarIO False | ||
counter <- | printThread "test" | ||
result <- runAdvSTM ( | result <- runAdvSTM (countRetries counter $ test todo) | ||
retries <- readIORef counter | |||
print " | printThread ("result",result,"retries",retries) | ||
atomically (writeTVar todo False) | |||
printThread "" | |||
printThread "testUnlift" | |||
result <- runAdvSTM (countRetries counter $ testUnlift todo) | |||
retries <- readIORef counter | |||
printThread ("result",result,"retries",retries) | |||
atomically (writeTVar todo False) | |||
printThread "" | |||
printThread "testFork" | |||
result <- runAdvSTM (countRetries counter $ testFork todo) | |||
retries <- readIORef counter | |||
printThread ("result",result,"retries",retries) | |||
atomically (writeTVar todo False) | |||
printThread "" | |||
printThread "bye world" | |||
</haskell> | |||
== Just onCommit == | |||
Leaving out <code>onRetry</code>/<code>retryWith</code> makes it much simpler: | |||
This is now under the usual permissive copyright for this wiki: [[HaskellWiki:Copyrights]] | |||
<haskell> | |||
{- November 24th, 2006 | |||
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com> | |||
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe | |||
mailing list, in which the type and semantics of onCommit and were | |||
put forth. | |||
-} | |||
module AdvSTM(MonadAdvSTM(..),AdvSTM,atomic | |||
,unlifter,unlift,unlift1,unlift2) where | |||
-- import MonadBase | |||
import Control.Exception(Exception) | |||
import Control.Monad(MonadPlus(..),join,liftM) | |||
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks) | |||
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically) | |||
import Control.Concurrent.STM.TVar(TVar,newTVarIO,readTVar,writeTVar) | |||
import Data.Typeable(Typeable) | |||
import Control.Concurrent(forkIO,killThread) | |||
import Control.Monad(when) | |||
class MonadAdvSTM m where | |||
onCommit :: IO a -> m () | |||
orElseAdv :: m a -> m a -> m a | |||
retryAdv :: m a | |||
atomicAdv :: m a -> IO a | |||
catchAdv :: m a -> (Exception -> m a) -> m a | |||
liftAdv :: STM a -> m a | |||
-- Export type but not constructor! | |||
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable) | |||
type Env = (CommitVar) | |||
type CommitVar = TVar (IO ()->IO ()) | |||
instance MonadAdvSTM AdvSTM where | |||
onCommit io = do | |||
cv <- AdvSTM $ ask | |||
old <- liftAdv $ readTVar cv | |||
liftAdv $ writeTVar cv (old . (io >>)) | |||
orElseAdv = mplus | |||
retryAdv = mzero | |||
atomicAdv = runAdvSTM | |||
catchAdv action handler = do | |||
action' <- unlift action | |||
handler' <- unlift1 handler | |||
liftAdv $ catchSTM action' handler' | |||
liftAdv = AdvSTM . lift | |||
runAdvSTM :: AdvSTM a -> IO a | |||
runAdvSTM (AdvSTM action) = do | |||
cv <- newTVarIO id | |||
let commit answer = do | |||
cFun <- lift $ readTVar cv | |||
return (cFun (return ()) >> return answer) | |||
wrappedAction = (runReaderT (action >>= commit) cv) | |||
join . atomically $ wrappedAction | |||
atomic :: AdvSTM a -> IO a | |||
atomic = atomicAdv | |||
-- Using ReaderT we can write "unlift" from AdvSTM into STM: | |||
-- Do not export runWith | |||
runWith :: Env -> AdvSTM t -> STM t | |||
runWith env (AdvSTM action) = runReaderT action env | |||
unlifter :: AdvSTM (AdvSTM a -> STM a) | |||
unlifter = do | |||
env <- AdvSTM ask | |||
return (runWith env) | |||
unlift :: AdvSTM a -> AdvSTM (STM a) | |||
unlift f = do | |||
u <- unlifter | |||
return (u f) | |||
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a) | |||
unlift1 f = do | |||
u <- unlifter | |||
return (\x -> u (f x)) | |||
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a) | |||
unlift2 f = do | |||
u <- unlifter | |||
return (\x y -> u (f x y)) | |||
-- From here on this is example code | |||
test = atomicAdv $ (unlift (onCommit (print "hello"))) | |||
test2 = atomicAdv $ do op <- unlift (onCommit (print "world")) | |||
liftAdv op | |||
test3 = do | |||
v <- newTVarIO 10 | |||
atomic (onCommit (atomically $ writeTVar v 20)) | |||
atomic (liftAdv (readTVar v) >>= \x -> onCommit (print x) :: AdvSTM ()) | |||
-- Prints 10 9 8 7 6 5 4 3 2 1 0 | |||
test4 = do | |||
v <- newTVarIO 10 | |||
let loop = atomic $ do | |||
onC <- unlift1 onCommit | |||
liftAdv $ do | |||
x <- readTVar v | |||
writeTVar v (pred x) | |||
onC (print x) | |||
if x>0 then onC loop | |||
else retry | |||
bump 0 = atomic $ do | |||
x <- liftAdv $ readTVar v | |||
when (x > 0) retryAdv | |||
onCommit (print "bump at 0, done") | |||
bump i = atomic $ do | |||
x <- liftAdv $ readTVar v | |||
if x <= 0 then do liftAdv $ writeTVar v 10 | |||
onCommit (print ("bump by 10",i)) | |||
onCommit (bump (pred i)) | |||
else retryAdv | |||
tid <- forkIO (loop) | |||
bump 5 | |||
killThread tid | |||
main = do | |||
op <- test -- no print | |||
atomically op -- no print | |||
test2 -- prints | |||
test3 -- prints 20 | |||
test4 | |||
</haskell> | </haskell> |
Latest revision as of 05:20, 12 July 2021
Caveat
The onCommit
works great. The onRetry
/retryWith
really ought to be implemented with changes in the runtime. The Helper Thread code is a very close attempt to simulate the correct semantics. The Single Thread code is flawed since it will get caught in a busy wait if the onRetry
commands do not allow for a commit on the following re-attempt.
The e-mail that inspired this monad, and the monad itself:
From: Simon Peyton-Jones <simonpj@microsoft.com> To: "Tim Harris (RESEARCH)" <tharris@microsoft.com>, Benjamin Franksen <benjamin.franksen@bessy.de> Cc: "haskell-cafe@haskell.org" <haskell-cafe@haskell.org> Subject: RE: [Haskell] Re: [Haskell-cafe] SimonPJ and Tim Harris explain STM - video Date: Fri, 24 Nov 2006 08:22:36 +0000 | The basic idea is to provide a way for a transaction to call into transaction-aware libraries. The libraries | can register callbacks for if the transaction commits (to actually do any "O") and for if the transaction | aborts (to re-buffer any "I" that the transaction has consumed). In addition, a library providing access | to another transactional abstraction (e.g. a database supporting transactions) can perform a 2-phase | commit that means that the memory transaction and database transaction either both commit or both | abort.
Yes, I have toyed with extending GHC's implementation of STM to support
onCommit :: IO a -> STM () The idea is that onCommit would queue up an IO action to be performed when the transaction commits, but without any atomicity guarantee. If the transaction retries, the action is discarded. Now you could say atomic (do { xv <- readTVar x yv <- readTVar y if xv > yv then onCommit launchMissiles else return () }) and the missiles would only get launched when the transaction successfully commits. This is pure programming convenience. It's always possible to make an existing Haskell STM transaction that *returns* an IO action, which is performed by the caller, thus: dO { action <- atomic (do { xv <- readTVar x; yv <- readTVar y; if xv > yv then return launchMissiles else return (return ()) }) ; action } All onCommit does is make it more convenient. Perhaps a *lot* more convenient. I have also toyed with adding retryWith :: IO a -> STM () The idea here is that the transction is undone (i.e. just like the 'retry' combinator), then the specified action is performed, and then the transaction is retried. Again no atomicity guarantee. If there's an orElse involved, both actions would get done. Unlike onCommit, onRetry adds new power. Suppose you have a memory buffer, with an STM interface: getLine :: Buffer -> STM STring This is the way to do transactional input: if there is not enough input, the transaction retries; and the effects of getLine aren't visible until the transaction commits. The problem is that if there is not enough data in the buffer, getLine will retry; but alas there is no way at present to "tell" someone to fill the buffer with more data. onRetry would fix that. getLine could say if <not enough data> then retryWith <fill-buffer action> It would also make it possible to count how many retries happened: atomic (<transaction> `orElse` retryWith <increment retry counter>) I have not implemented either of these, but I think they'd be cool. Simon PS: I agree wholeheartedly with this: |
| Of course, these solutions don't deal with the question of atomic blocks that want to perform output | (e.g. to the console) and receive input in response to that. My view at the moment is _that does not | make sense in an atomic block_ -- the output and input can't be performed atomically because the | intervening state must be visible for the user to respond to. _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
Single Threaded code
This Single Threaded code can get caught in a busy wait. The Helper Thread code below is better.
This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights
{- November 24th, 2006
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
mailing list, in which the type and semantics of onCommit and
retryWith were put forth.
The semantics of printing the contents of the TVar "v" created in
test via retryWith may or may not be well defined. With GHC 6.6 I get
*AdvSTM> main
"hello world"
"test"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
("onCommit v",42)
("result","foo","retries",1)
"testUnlift"
"onRetry Start"
("onRetry v",7)
"Flipped choice to True to avoid infinite loop"
"onCommit Start"
("onCommit v",42)
("result","foo","retries",2)
"bye world"
Aside from that I think the unsafeIOToSTM is not really unsafe here
since it writes to privately created and maintained variables.
Since the implementation is hidden it could be changed from ReaderT
to some other scheme.
Once could also use MonadBase from
http://haskell.org/haskellwiki/New_monads/MonadBase to help with the
lifting, but this has been commented out below.
TODO: figure out semantics of catchAdv. At least it compiles...
-}
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
,unlifter,unlift,unlift1,unlift2) where
-- import MonadBase
import Control.Exception(Exception)
import Control.Monad(MonadPlus(..),liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.MVar(MVar,newEmptyMVar,newMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import Data.Generics(Data)
import Data.Maybe(maybe)
import Data.Typeable(Typeable)
import GHC.Conc(unsafeIOToSTM)
-- for countRetries example
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
class (Monad m) => MonadAdvSTM m where
onCommit :: IO a -> m ()
onRetry :: IO a -> m ()
orElseAdv :: m a -> m a -> m a
retryAdv :: m a
atomicAdv :: m a -> IO a
catchAdv :: m a -> (Exception -> m a) -> m a
liftAdv :: STM a -> m a
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar,RetryVar)
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
instance MonadPlus AdvSTM where
mzero = retryAdv
mplus = orElseAdv
-}
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith io = onRetry io >> retryAdv
instance MonadAdvSTM AdvSTM where
onCommit io = do
commitVar <- AdvSTM $ asks fst
old <- liftAdv $ readTVar commitVar
liftAdv $ writeTVar commitVar (old . (io >>))
onRetry io = do
retryVar <- AdvSTM $ asks snd
liftAdv $ unsafeIOToSTM (do
may'do <- tryTakeMVar retryVar
let todo = maybe (io >>) (. (io >>)) may'do
seq todo (putMVar retryVar todo))
orElseAdv = mplus
retryAdv = liftAdv retry -- the same as retryAdv = mzero
atomicAdv = runAdvSTM
catchAdv action handler = do
action' <- unlift action
handler' <- unlift1 handler
liftAdv $ catchSTM action' handler'
liftAdv = AdvSTM . lift
-- This replaces "atomically"
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
commitVar <- newTVarIO id
retryVar <- newMVar id
let check'retry = do
may'todo <- unsafeIOToSTM $ tryTakeMVar retryVar
maybe retry (return . Right) may'todo
let wrappedAction = (runReaderT (liftM Left action) (commitVar,retryVar))
`orElse` (check'retry)
let attempt = do
result <- atomically $ wrappedAction
case result of
Left answer -> do
cFun <- atomically (readTVar commitVar)
cFun (return ())
return answer
Right rFun -> do
rFun (return ())
attempt
attempt
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
env <- AdvSTM ask
return (runWith env)
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
u <- unlifter
return (u f)
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
u <- unlifter
return (\x -> u (f x))
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
u <- unlifter
return (\x y -> u (f x y))
-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
onCommit (print "onCommit Start")
onRetry (print "onRetry Start")
v <- liftAdv $ newTVar 7
liftAdv $ writeTVar v 42
onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x))
onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x))
choice <- liftAdv $ readTVar todo
case choice of
True -> return "foo"
False -> retryWith $ do
atomically (writeTVar todo True)
print "Flipped choice to True to avoid infinite loop"
-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
onCommit <- unlift1 onCommit
onRetry <- unlift1 onRetry
retryWith <- unlift1 retryWith
liftAdv $ do
onCommit (print "onCommit Start")
onRetry (print "onRetry Start")
v <- newTVar 7
writeTVar v 42
onCommit (atomically (readTVar v) >>= \x->print ("onCommit v",x))
onRetry (atomically (readTVar v) >>= \x->print ("onRetry v",x))
choice <- readTVar todo
case choice of
True -> return "foo"
False -> retryWith $ do
atomically (writeTVar todo True)
print "Flipped choice to True to avoid infinite loop"
-- Example similar to Simon's suggested example:
countRetries :: (MonadAdvSTM m, Enum a) => IORef a -> m a1 -> m a1
countRetries ioref action =
let incr = do old <- readIORef ioref
writeIORef ioref $! (succ old)
in action `orElseAdv` (retryWith incr)
-- Load this file in GHCI and execute main to run the test:
main = do
print "hello world"
putStrLn ""
counter <- newIORef 0
todo <- newTVarIO False
print "test"
result <- runAdvSTM (countRetries counter $ test todo)
retries <- readIORef counter
print ("result",result,"retries",retries)
atomically (writeTVar todo False)
putStrLn ""
print "testUnlift"
result <- runAdvSTM (countRetries counter $ testUnlift todo)
retries <- readIORef counter
print ("result",result,"retries",retries)
putStrLn ""
print "bye world"
Helper Thread code
This is my preferred solution at the moment.
This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights
{- November 24th, 2006
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
mailing list, in which the type and semantics of onCommit and
withRetry were put forth.
The semantics of printing the contents of the TVar "v" created in
test via retryWith may or may not be well defined. With GHC 6.6 I get
*AdvSTM> main
(ThreadId 415,"hello world")
(ThreadId 415,"")
(ThreadId 415,"test")
(ThreadId 416,"onRetry Start")
(ThreadId 416,("onRetry v",7))
(ThreadId 416,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",1))
(ThreadId 415,"")
(ThreadId 415,"testUnlift")
(ThreadId 417,"onRetry Start")
(ThreadId 417,("onRetry v",7))
(ThreadId 417,"Flipped choice to True to avoid infinite loop")
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",2))
(ThreadId 415,"")
(ThreadId 415,"testFork")
(ThreadId 418,"onRetry Start")
(ThreadId 418,"Flipped choice to True to avoid infinite loop")
(ThreadId 419,("onRetry v",7))
(ThreadId 415,"onCommit Start")
(ThreadId 415,("onCommit v",42))
(ThreadId 415,("result","foo","retries",3))
(ThreadId 415,"")
(ThreadId 415,"bye world")
Aside from that I think the unsafeIOToSTM is not really unsafe here
since it writes to privately created and maintained variables.
Since the implementation is hidden it could be changed from ReaderT
to some other scheme.
Once could also use MonadBase from
http://haskell.org/haskellwiki/New_monads/MonadBase to help with the
lifting, but this has been commented out below.
TODO: figure out semantics of catchAdv. At least it compiles...
-}
module AdvSTM(MonadAdvSTM(..),AdvSTM,retryWith,countRetries
,unlifter,unlift,unlift1,unlift2) where
-- import MonadBase
import Control.Exception(Exception,try)
import Control.Monad(MonadPlus(..),liftM,when)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent(forkIO,myThreadId)
import Control.Concurrent.Chan(Chan,newChan,readChan,writeChan)
import Control.Concurrent.MVar(MVar,newMVar,newEmptyMVar,isEmptyMVar,takeMVar,tryTakeMVar,putMVar)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,newTVar,readTVar,writeTVar)
import GHC.Conc(unsafeIOToSTM)
import Data.IORef(IORef,newIORef,readIORef,writeIORef,modifyIORef)
import Data.Typeable(Typeable)
class MonadAdvSTM m where
onCommit :: IO a -> m ()
onRetry :: IO a -> m ()
orElseAdv :: m a -> m a -> m a
retryAdv :: m a
atomicAdv :: m a -> IO a
catchAdv :: m a -> (Exception -> m a) -> m a
liftAdv :: STM a -> m a
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar,RetryVar)
type CommitVar = TVar (IO ()->IO ())
type RetryVar = MVar (IO ()->IO ())
{- Since lifting retry and `orElse` gives the semantics Simon wants, use deriving MonadPlus instead
instance MonadPlus AdvSTM where
mzero = retryAdv
mplus = orElseAdv
-}
-- instance MonadBase STM AdvSTM where liftBase = AdvSTM . lift
retryWith :: (Monad m, MonadAdvSTM m) => IO a -> m b
retryWith io = onRetry io >> retryAdv
instance MonadAdvSTM AdvSTM where
onCommit io = do
commitVar <- AdvSTM $ asks fst
commitFun <- liftAdv $ readTVar commitVar
liftAdv $ writeTVar commitVar (commitFun . (io >>))
onRetry io = do
retryVar <- AdvSTM $ asks snd
liftAdv . unsafeIOToSTM $ do
may'retryFun <- tryTakeMVar retryVar
let retryFun = maybe (io >>) (. (io >>)) may'retryFun
putMVar retryVar $! retryFun
orElseAdv = mplus
retryAdv = liftAdv retry -- the same as retryAdv = mzero
atomicAdv = runAdvSTM
catchAdv action handler = do
action' <- unlift action
handler' <- unlift1 handler
liftAdv $ catchSTM action' handler'
liftAdv = AdvSTM . lift
-- Helper thread
spawn'retry'thread nextJob atEnd = forkIO $ loop
where loop = do
may'job <- nextJob
case may'job of
Nothing -> atEnd
Just job -> try job >> loop
-- This replaces "atomically"
-- onRetry/retryWith actions are sent to a helper thread
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
commitVar <- newTVarIO id -- todo after a commit
retryVar <- newEmptyMVar -- filled if something todo upon a retry
chanVar <- newIORef Nothing -- send retry jobs to helper thread
endVar <- newEmptyMVar -- listen for helper thread to finish
let check'retry = do
unsafeIOToSTM $ do
may'todo <- tryTakeMVar retryVar
case may'todo of
Nothing -> return ()
Just retryFun -> do
may'chan <- readIORef chanVar
chan <- case may'chan of
Nothing -> do
chan <- newChan
writeIORef chanVar (Just chan)
spawn'retry'thread (readChan chan) (putMVar endVar ())
return chan
Just chan -> return chan
writeChan chan (Just (retryFun (return())))
retry
let wait'retry'finished = do
may'chan <- readIORef chanVar
case may'chan of
Nothing -> return ()
Just chan -> do
writeChan chan Nothing
takeMVar endVar
let wrappedAction = (runReaderT action (commitVar,retryVar))
`orElse` (check'retry)
result <- atomically $ wrappedAction
commitFun <- atomically (readTVar commitVar)
commitFun (return ())
wait'retry'finished
return result
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
env <- AdvSTM ask
return (runWith env)
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
u <- unlifter
return (u f)
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
u <- unlifter
return (\x -> u (f x))
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
u <- unlifter
return (\x y -> u (f x y))
printThread x = do
tid <- myThreadId
print (tid,x)
-- Example code using the above, lifting into MonadAdvSTM:
test ::(Monad m, MonadAdvSTM m) => TVar Bool -> m [Char]
test todo = do
onCommit (printThread "onCommit Start")
onRetry (printThread "onRetry Start")
v <- liftAdv $ newTVar 7
liftAdv $ writeTVar v 42
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
choice <- liftAdv $ readTVar todo
case choice of
True -> return "foo"
False -> retryWith $ do
atomically (writeTVar todo True)
printThread "Flipped choice to True to avoid infinite loop"
-- Same example as test, but unlifting from AdvSTM
testUnlift :: TVar Bool -> AdvSTM [Char]
testUnlift todo = do
onCommit <- unlift1 onCommit
onRetry <- unlift1 onRetry
retryWith <- unlift1 retryWith
liftAdv $ do
onCommit (printThread "onCommit Start")
onRetry (printThread "onRetry Start")
v <- newTVar 7
writeTVar v 42
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
onRetry (atomically (readTVar v) >>= \x->printThread ("onRetry v",x))
choice <- readTVar todo
case choice of
True -> return "foo"
False -> retryWith $ do
atomically (writeTVar todo True)
printThread "Flipped choice to True to avoid infinite loop"
-- Same example as testUnlift, but use forkIO inside onRetry
testFork :: TVar Bool -> AdvSTM [Char]
testFork todo = do
onCommit <- unlift1 onCommit
onRetry <- unlift1 onRetry
retryWith <- unlift1 retryWith
liftAdv $ do
onCommit (printThread "onCommit Start")
onRetry (printThread "onRetry Start")
v <- newTVar 7
writeTVar v 42
onCommit (atomically (readTVar v) >>= \x->printThread ("onCommit v",x))
onRetry (forkIO (atomically (readTVar v) >>= \x->printThread ("onRetry v",x)) >> return () )
choice <- readTVar todo
case choice of
True -> return "foo"
False -> retryWith $ do
atomically (writeTVar todo True)
printThread "Flipped choice to True to avoid infinite loop"
-- Example similar to Simon's suggested example:
countRetries :: (MonadAdvSTM m, Monad m, Enum a) => IORef a -> m a1 -> m a1
countRetries ioref action =
let incr = do old <- readIORef ioref
writeIORef ioref $! (succ old)
in action `orElseAdv` (retryWith incr)
-- Load this file in GHCI and execute main to run the test:
main = do
printThread "hello world"
printThread ""
counter <- newIORef 0
todo <- newTVarIO False
printThread "test"
result <- runAdvSTM (countRetries counter $ test todo)
retries <- readIORef counter
printThread ("result",result,"retries",retries)
atomically (writeTVar todo False)
printThread ""
printThread "testUnlift"
result <- runAdvSTM (countRetries counter $ testUnlift todo)
retries <- readIORef counter
printThread ("result",result,"retries",retries)
atomically (writeTVar todo False)
printThread ""
printThread "testFork"
result <- runAdvSTM (countRetries counter $ testFork todo)
retries <- readIORef counter
printThread ("result",result,"retries",retries)
atomically (writeTVar todo False)
printThread ""
printThread "bye world"
Just onCommit
Leaving out onRetry
/retryWith
makes it much simpler:
This is now under the usual permissive copyright for this wiki: HaskellWiki:Copyrights
{- November 24th, 2006
Demonstration Code by Chris Kuklewicz <haskell@list.mightyreason.com>
This is inspired by a post by Simon Peyton-Jones on the haskell-cafe
mailing list, in which the type and semantics of onCommit and were
put forth.
-}
module AdvSTM(MonadAdvSTM(..),AdvSTM,atomic
,unlifter,unlift,unlift1,unlift2) where
-- import MonadBase
import Control.Exception(Exception)
import Control.Monad(MonadPlus(..),join,liftM)
import Control.Monad.Reader(MonadReader(..),ReaderT,runReaderT,lift,asks)
import Control.Concurrent.STM(STM,orElse,retry,catchSTM,atomically)
import Control.Concurrent.STM.TVar(TVar,newTVarIO,readTVar,writeTVar)
import Data.Typeable(Typeable)
import Control.Concurrent(forkIO,killThread)
import Control.Monad(when)
class MonadAdvSTM m where
onCommit :: IO a -> m ()
orElseAdv :: m a -> m a -> m a
retryAdv :: m a
atomicAdv :: m a -> IO a
catchAdv :: m a -> (Exception -> m a) -> m a
liftAdv :: STM a -> m a
-- Export type but not constructor!
newtype AdvSTM a = AdvSTM (ReaderT Env STM a) deriving (Functor,Monad,MonadPlus,Typeable)
type Env = (CommitVar)
type CommitVar = TVar (IO ()->IO ())
instance MonadAdvSTM AdvSTM where
onCommit io = do
cv <- AdvSTM $ ask
old <- liftAdv $ readTVar cv
liftAdv $ writeTVar cv (old . (io >>))
orElseAdv = mplus
retryAdv = mzero
atomicAdv = runAdvSTM
catchAdv action handler = do
action' <- unlift action
handler' <- unlift1 handler
liftAdv $ catchSTM action' handler'
liftAdv = AdvSTM . lift
runAdvSTM :: AdvSTM a -> IO a
runAdvSTM (AdvSTM action) = do
cv <- newTVarIO id
let commit answer = do
cFun <- lift $ readTVar cv
return (cFun (return ()) >> return answer)
wrappedAction = (runReaderT (action >>= commit) cv)
join . atomically $ wrappedAction
atomic :: AdvSTM a -> IO a
atomic = atomicAdv
-- Using ReaderT we can write "unlift" from AdvSTM into STM:
-- Do not export runWith
runWith :: Env -> AdvSTM t -> STM t
runWith env (AdvSTM action) = runReaderT action env
unlifter :: AdvSTM (AdvSTM a -> STM a)
unlifter = do
env <- AdvSTM ask
return (runWith env)
unlift :: AdvSTM a -> AdvSTM (STM a)
unlift f = do
u <- unlifter
return (u f)
unlift1 :: (t -> AdvSTM a) -> AdvSTM (t -> STM a)
unlift1 f = do
u <- unlifter
return (\x -> u (f x))
unlift2 :: (t -> t1 -> AdvSTM a) -> AdvSTM (t -> t1 -> STM a)
unlift2 f = do
u <- unlifter
return (\x y -> u (f x y))
-- From here on this is example code
test = atomicAdv $ (unlift (onCommit (print "hello")))
test2 = atomicAdv $ do op <- unlift (onCommit (print "world"))
liftAdv op
test3 = do
v <- newTVarIO 10
atomic (onCommit (atomically $ writeTVar v 20))
atomic (liftAdv (readTVar v) >>= \x -> onCommit (print x) :: AdvSTM ())
-- Prints 10 9 8 7 6 5 4 3 2 1 0
test4 = do
v <- newTVarIO 10
let loop = atomic $ do
onC <- unlift1 onCommit
liftAdv $ do
x <- readTVar v
writeTVar v (pred x)
onC (print x)
if x>0 then onC loop
else retry
bump 0 = atomic $ do
x <- liftAdv $ readTVar v
when (x > 0) retryAdv
onCommit (print "bump at 0, done")
bump i = atomic $ do
x <- liftAdv $ readTVar v
if x <= 0 then do liftAdv $ writeTVar v 10
onCommit (print ("bump by 10",i))
onCommit (bump (pred i))
else retryAdv
tid <- forkIO (loop)
bump 5
killThread tid
main = do
op <- test -- no print
atomically op -- no print
test2 -- prints
test3 -- prints 20
test4