Difference between revisions of "SafeConcurrent"

From HaskellWiki
Jump to navigation Jump to search
(Add "fixed" SampleVar)
(Update to 0.5.0 with MSampleVar)
 
(3 intermediate revisions by 2 users not shown)
Line 3: Line 3:
 
= Motivation =
 
= Motivation =
   
The base package (version 3.0.3.1) code for Control.Concurrent.QSem and QSemN and SamepleVar is not exception safe. This page is for holding proposed replacement code.
+
The base package (versions 3 and 4) implementations of Control.Concurrent.QSem and QSemN (and perhaps SamepleVar) are not exception safe. The proposed replacement code is on hackage as [http://hackage.haskell.org/package/SafeSemaphore SafeSemaphore] and source from version 0.4.1 is also included on this page.
   
Specifically, both the wait and signal operations on a semaphore may block. These may then be interrupted by a killThread or other asynchronous exception. Exception safety means that this will never leave the semaphore in a broken state. Exception correctness means that the semaphore does not lose any of its quantity if the waiter is interrupted before the wait operation finished.
+
Exception correctness means that the semaphore does not lose any of its quantity if the waiter or signaler is interrupted before the operation finishes. QSem and QSemN violate this safetly.
   
MSem is the proposed replacements for QSem.
+
SafeSemaphore defines MSem as the proposed replacements for QSem, and MSemQ as the proposed replacement for QSemN.
   
  +
The SampleVar module in base also has the same kind of bug, but with SampleVar the rutnime error is worse because it can case writeSampleVar to block indefinitely. The SafeSemaphore package as of version 0.5.0 has a MSampleVar module that does not have this bug.
A replacement for QSemN is also below. It is merely a slightly improved version of the QSemN code in base.
 
   
  +
The GHC ticket is [http://hackage.haskell.org/trac/ghc/ticket/3160 #3160].
The SampleVar code is also not exception safe. The replacement has not yet been written.
 
   
= MSem =
+
= TestKillSem =
   
  +
The problem with QSem and QSemN is that a blocked waiter might be killed. This does not prevent a later signal from trying to pass quantity to a dead thread. This quantity is thus thrown out, a blatantly leaky abstraction. This is illustrated by the tests/TestKillSem.hs program in the SafeSemaphore package (run with cabal test, please read the log generated).
This code should be exception safe and exception correct. (And was derived from MSenN below).
 
   
  +
The program, preceded by its output as a comment is:
Note that it does not allocate any MVars to manage the waiting queue. Only newMSem allocates them. This should be more efficient than QSem.
 
   
 
<haskell>
 
<haskell>
  +
{- output demonstrating fate of thread 3:
{-# LANGUAGE DeriveDataTypeable #-}
 
-- |This modules is intended to replace "Control.Concurrent.QSem". Unlike QSem, this MSem module
 
-- should be exception safe and correct. This means that when signalMSem and waitQSem operations
 
-- receive an asynchronous exception such as killThread they will leave the MSem in a non-broken
 
-- state, and will not lose any quantity of the semaphore's value.
 
--
 
-- TODO : drop the MSem suffix from the operations.
 
--
 
-- Author : Chris Kuklewicz < haskell @at@ list .dot. mightyreason .dot. com >
 
-- Copyright : BSD3 2009
 
module MSem(MSem,newMSem,signalMSem,waitMSem,MSem'Exception) where
 
   
  +
Cases: 4 Tried: 0 Errors: 0 Failures: 0
  +
Test QSem
  +
0: forkIO wait thread 1
  +
0: stop thread 1
  +
1: wait interrupted
  +
0: signal q #1
  +
0: forkIO wait thread 2
  +
0: forkIO wait thread 3
  +
0: signal q #2
  +
2: wait done
  +
0: stop thread 2
  +
0: stop thread 3
  +
3: wait interrupted (QUANTITY LOST) FAIL
  +
### Failure in: 0
  +
  +
Cases: 4 Tried: 1 Errors: 0 Failures: 1
  +
Test QSemN
  +
0: forkIO wait thread 1
  +
0: stop thread 1
  +
1: wait interrupted
  +
0: signal q #1
  +
0: forkIO wait thread 2
  +
0: forkIO wait thread 3
  +
0: signal q #2
  +
2: wait done
  +
0: stop thread 2
  +
0: stop thread 3
  +
3: wait interrupted (QUANTITY LOST) FAIL
  +
### Failure in: 1
  +
  +
Cases: 4 Tried: 2 Errors: 0 Failures: 2
  +
Test MSem
  +
0: forkIO wait thread 1
  +
0: stop thread 1
  +
1: wait interrupted
  +
0: signal q #1
  +
0: forkIO wait thread 2
  +
2: wait done
  +
0: forkIO wait thread 3
  +
0: signal q #2
  +
3: wait done (QUANTITY CONSERVED) PASS
  +
0: stop thread 2
  +
0: stop thread 3
  +
Cases: 4 Tried: 3 Errors: 0 Failures: 2
  +
Test MSemN
  +
0: forkIO wait thread 1
  +
0: stop thread 1
  +
1: wait interrupted
  +
0: signal q #1
  +
0: forkIO wait thread 2
  +
2: wait done
  +
0: forkIO wait thread 3
  +
0: signal q #2
  +
3: wait done (QUANTITY CONSERVED) PASS
  +
0: stop thread 2
  +
0: stop thread 3
  +
Cases: 4 Tried: 4 Errors: 0 Failures: 2
  +
  +
-}
  +
module Main where
  +
  +
import Control.Concurrent
  +
import Control.Exception
  +
import Control.Concurrent.QSem
  +
import Control.Concurrent.QSemN
  +
import qualified Control.Concurrent.MSem as MSem
  +
import qualified Control.Concurrent.MSemN as MSemN
 
import Control.Concurrent.MVar
 
import Control.Concurrent.MVar
  +
import Test.HUnit
import Control.Exception(Exception,throwIO,block)
 
import Data.Maybe(fromMaybe)
+
import System.Exit
import Data.Typeable(Typeable)
 
   
  +
delay = threadDelay (1000*100)
newtype MSem = MSem (MVar M)
 
   
data M = M { avail :: Int
+
fork x = do m <- newEmptyMVar
, headWants :: Bool
+
t <- forkIO (finally x (putMVar m ()))
, headWait :: MVar ()
+
delay
, tailWait :: MVar () }
+
return (t,m)
   
  +
stop (t,m) = do killThread t
newtype MSem'Exception = MSem'Exception String deriving (Show,Typeable)
 
  +
delay
instance Exception MSem'Exception
 
  +
takeMVar m
   
  +
-- True if test passed, False if test failed
-- |'newSem' allows positive, zero, and negative initial values.
 
  +
testSem :: Integral n
newMSem initial = do
 
  +
=> String
newHeadWait <- newEmptyMVar
 
  +
-> (n -> IO a)
newTailWait <- newMVar ()
 
let m = M { avail = initial
+
-> (a->IO ())
, headWants = False
+
-> (a -> IO ())
, headWait = newHeadWait
+
-> IO Bool
  +
testSem name new wait signal = do
, tailWait = newTailWait }
 
  +
putStrLn ("\nTest "++ name)
sem <- newMVar m
 
  +
q <- new 0
return (MSem sem)
 
  +
putStrLn "0: forkIO wait thread 1"
  +
(t1,m1) <- fork $ do
  +
wait q `onException` (putStrLn "1: wait interrupted")
  +
putStrLn "1: wait done UNEXPECTED"
  +
putStrLn "0: stop thread 1"
  +
stop (t1,m1)
  +
putStrLn "0: signal q #1"
  +
signal q
  +
delay
  +
putStrLn "0: forkIO wait thread 2"
  +
(t2,m2) <- fork $ do
  +
wait q `onException` (putStrLn "2: wait interrupted UNEXPECTED")
  +
putStrLn "2: wait done"
  +
putStrLn "0: forkIO wait thread 3"
  +
result <- newEmptyMVar
  +
(t3,m3) <- fork $ do
  +
wait q `onException` (putStrLn "3: wait interrupted (QUANTITY LOST) FAIL" >> putMVar result False)
  +
putStrLn "3: wait done (QUANTITY CONSERVED) PASS"
  +
putMVar result True
  +
putStrLn "0: signal q #2"
  +
signal q
  +
delay
  +
putStrLn "0: stop thread 2"
  +
stop (t2,m2)
  +
putStrLn "0: stop thread 3"
  +
stop (t3,m3)
  +
takeMVar result
   
  +
testsQ = TestList . map test $
-- |Waiters block in FIFO order. This returns when it is the front waiter and the available value
 
  +
[ testSem "QSem" newQSem waitQSem signalQSem
-- is positive. If this throws an exception then no quantity of semaphore will be lost.
 
  +
, testSem "QSemN" newQSemN (flip waitQSemN 1) (flip signalQSemN 1)
waitMSem :: MSem -> IO ()
 
  +
]
waitMSem (MSem sem) = block $ do
 
-- sem throw?
 
advance <- withMVar sem $ \ m -> return (tailWait m)
 
-- advance throw?
 
withMVar advance $ \ _ -> do
 
-- sem throw? withMVar cleans advance
 
todo <- modifyMVar sem $ \ m -> do
 
-- clean up if previous waiter died
 
mStale <- tryTakeMVar (headWait m)
 
let avail' = avail m + maybe 0 (const 1) mStale
 
-- ensure the sem is in a sane state
 
if avail' >= 1
 
then do return (m { avail = avail' - 1, headWants = False }, Nothing)
 
else do return (m { avail = avail', headWants = True }, Just (headWait m))
 
case todo of
 
Nothing -> return ()
 
Just wait -> do
 
-- takeMVar throw? the headWants is still set to True, withMVar cleans advance
 
takeMVar wait
 
   
  +
testsM = TestList . map test $
-- |Add one to the semaphore, if the new value is greater than 0 then the first waiter is woken.
 
  +
[ testSem "MSem" MSem.new MSem.wait MSem.signal
-- This may momentarily block, and thus may throw an exception and leave then MSem unchanged.
 
  +
, testSem "MSemN" MSemN.new (flip MSemN.wait 1) (flip MSemN.signal 1)
signalMSem :: MSem -> IO ()
 
  +
]
signalMSem msem@(MSem sem) = block $ modifyMVar_ sem $ \ m -> do
 
  +
case headWants m of
 
  +
-- This is run by "cabal test"
False -> return (m { avail = avail m + 1 })
 
  +
main = do
True ->
 
  +
runTestTT testsQ
if avail m >= 0
 
  +
c <- runTestTT testsM
then do
 
  +
if failures c == 0 then exitSuccess else exitFailure
ok <- tryPutMVar (headWait m) ()
 
if ok then return (m { headWants = False })
 
else throwIO . MSem'Exception $
 
"MSem.signalMSem: impossible happened, the headWait MVar was full"
 
else return (m { avail = avail m + 1 })
 
 
</haskell>
 
</haskell>
   
  +
This shows that quantity can be easily lost when using a QSem or QSemN, and shows that MSem and MSemN do not have this problem.
== MSemN ==
 
   
  +
= MSem =
The MSemN has different semantics than QSemN. The first waiter in line is the only one being considered for waking.
 
  +
  +
This code should be exception safe and exception correct. The API for QSem is slightly extended to allow peekAvail to query the amount of content in the semaphore. The semantics of QSem are slightly extended to allow a new MSem to be initialized with negative, zero, or positive quantity. The use of Int has been replaced with Integer. The wait operation has been added to encourage safely bracketing wait and signal.
  +
  +
Note that it does not allocate any MVars to manage the waiting queue. Only MSem.new allocates them. This should be more efficient than QSem.
   
 
<haskell>
 
<haskell>
 
{-# LANGUAGE DeriveDataTypeable #-}
 
{-# LANGUAGE DeriveDataTypeable #-}
  +
-- |
-- |This modules is intended to replace "Control.Concurrent.QSemN". Unlike QSemN, this MSemN module
 
  +
-- Module : Control.Concurrent.MSem
-- should be exception safe and correct. This means that when signalMSemN and waitQSemN operations
 
  +
-- Copyright : (c) Chris Kuklewicz 2011
-- receive an asynchronous exception such as killThread they will leave the MSemN in a non-broken
 
  +
-- License : 3 clause BSD-style (see the file LICENSE)
-- state, and will not lose any quantity of the semaphore's value.
 
  +
--
  +
-- Maintainer : haskell@list.mightyreason.com
  +
-- Stability : experimental
  +
-- Portability : non-portable (concurrency)
 
--
 
--
  +
-- A semaphore in which operations may 'wait' for or 'signal' single units of value. This modules
-- TODO : drop the MSem suffix from the operations.
 
  +
-- is intended to improve on "Control.Concurrent.QSem".
  +
--
  +
-- This semaphore gracefully handles threads which die while blocked waiting. The fairness
  +
-- guarantee is that blocked threads are FIFO.
 
--
 
--
  +
-- If 'with' is used to guard a critical section then no quantity of the semaphore will be lost if
-- Author : Chris Kuklewicz < haskell @at@ list .dot. mightyreason .dot. com >
 
  +
-- the activity throws an exception. 'new' can initialize the semaphore to negative, zero, or
-- Copyright : BSD3 2009
 
  +
-- positive quantity. 'wait' always leaves the 'MSem' with non-negative quantity.
module MSemN(MSemN,newMSemN,signalMSemN,waitMSemN,MSemN'Exception) where
 
  +
module Control.Concurrent.MSem
  +
(MSem
  +
,new
  +
,with
  +
,wait
  +
,signal
  +
,peekAvail
  +
) where
   
import Control.Concurrent.MVar
+
import Control.Concurrent.MVar(MVar,withMVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,putMVar,takeMVar,tryTakeMVar,tryPutMVar)
import Control.Exception(Exception,throwIO,block)
+
import Control.Exception(bracket_,uninterruptibleMask_,evaluate,mask_)
import Data.Maybe(fromMaybe)
 
 
import Data.Typeable(Typeable)
 
import Data.Typeable(Typeable)
   
  +
{- design notes are in MSemN.hs -}
newtype MSemN = MSemN (MVar M)
 
   
  +
data MS = MS { avail :: !Integer -- ^ This is the quantity available to be taken from the semaphore. Often updated.
data M = M { avail :: Int
 
  +
, headWait :: MVar () -- ^ The head of the waiter queue blocks on headWait. Never updated.
, headWants :: Maybe Int
 
, headWait :: MVar Int
+
}
  +
deriving (Eq,Typeable)
, tailWait :: MVar () }
 
   
  +
-- | A 'MSem' is a semaphore in which the available quantity can be added and removed in single
newtype MSemN'Exception = MSemN'Exception String deriving (Show,Typeable)
 
  +
-- units, and which can start with positive, zero, or negative value.
instance Exception MSemN'Exception
 
  +
data MSem = MSem { mSem :: !(MVar MS) -- ^ Used to lock access to state of semaphore quantity. Never updated.
  +
, queueWait :: !(MVar ()) -- ^ Used as FIFO queue for waiter, held by head of queue. Never updated.
  +
}
  +
deriving (Eq,Typeable)
   
-- |'newSemN' allows positive, zero, and negative initial values.
+
-- |'new' allows positive, zero, and negative initial values. The initial value is forced here to
  +
-- better localize errors.
newMSemN initial = do
 
  +
new :: Integer -> IO MSem
  +
new initial = do
 
newHeadWait <- newEmptyMVar
 
newHeadWait <- newEmptyMVar
newTailWait <- newMVar ()
+
newQueueWait <- newMVar ()
let m = M { avail = initial
+
newMS <- newMVar $! (MS { avail = initial
, headWants = Nothing
+
, headWait = newHeadWait })
  +
return (MSem { mSem = newMS
, headWait = newHeadWait
 
, tailWait = newTailWait }
+
, queueWait = newQueueWait })
sem <- newMVar m
 
return (MSemN sem)
 
   
  +
-- | 'with' takes a unit of value from the semaphore to hold while performing the provided
-- |'waitMSemN' allow positive, zero, and negative wanted values. Waiters block in FIFO order.
 
-- This returns when it is the front waiter and the available value is not less than the wanted
+
-- operation. 'with' ensures the quantity of the sempahore cannot be lost if there are exceptions.
  +
--
-- value. If this throws an exception then no quantity of semaphore will be lost.
 
  +
-- 'with' uses 'bracket_' to ensure 'wait' and 'signal' get called correctly.
waitMSemN :: MSemN -> Int -> IO ()
 
  +
with :: MSem -> IO a -> IO a
waitMSemN (MSemN sem) wanted = block $ do
 
  +
with m = bracket_ (wait m) (signal m)
-- sem throw?
 
advance <- withMVar sem $ \ m -> return (tailWait m)
 
-- advance throw?
 
withMVar advance $ \ _ -> do
 
-- sem throw? withMVar cleans advance
 
todo <- modifyMVar sem $ \ m -> do
 
-- clean up if previous waiter died
 
mStale <- tryTakeMVar (headWait m)
 
let avail' = avail m + fromMaybe 0 mStale
 
-- ensure the sem is in a sane state
 
if avail' >= wanted
 
then do return (m { avail = avail' - wanted, headWants = Nothing }, Nothing)
 
else do return (m { avail = avail', headWants = Just wanted }, Just (headWait m))
 
case todo of
 
Nothing -> return ()
 
Just wait -> getWanted wait
 
where
 
getWanted wait = do
 
-- takeMVar throw? clean up with next waiter
 
given <- takeMVar wait
 
if given == wanted
 
then return ()
 
else throwIO . MSemN'Exception $
 
"MSemN.waitMSemN: impossible happened, (wanted,given) == "++ show (wanted,given)
 
   
  +
-- |'wait' will take one unit of value from the sempahore, but will block if the quantity available
-- |'signalMSemN' allows positive, zero, and negative size values. If the new total is greater than
 
  +
-- is not positive.
-- the value waited for then the first waiter is woken. This may momentarily block, and thus may
 
  +
--
-- throw an exception and leave then MSemN unchanged.
 
  +
-- If 'wait' returns without interruption then it left the 'MSem' with a remaining quantity that was
signalMSemN :: MSemN -> Int -> IO ()
 
  +
-- greater than or equal to zero. If 'wait' is interrupted then no quantity is lost. If 'wait'
signalMSemN _ 0 = return ()
 
  +
-- returns without interruption then it is known that each earlier waiter has definitely either been
signalMSemN msem@(MSemN sem) size = block $ modifyMVar_ sem $ \ m -> do
 
  +
-- interrupted or has retured without interruption.
case headWants m of
 
  +
wait :: MSem -> IO ()
Nothing -> return (m { avail = avail m + size })
 
  +
wait (MSem sem advance) = mask_ $ withMVar advance $ \ () -> do
Just wanted -> do
 
let avail' = avail m + size
+
todo <- mask_ $ modifyMVar sem $ \ m -> do
  +
mayGrab <- tryTakeMVar (headWait m)
if avail' >= wanted
 
then do
+
case mayGrab of
ok <- tryPutMVar (headWait m) wanted
+
Just () -> return (m,Nothing)
if ok then return (m { avail = avail' - wanted, headWants = Nothing })
+
Nothing -> if 1 <= avail m
else throwIO . MSemN'Exception $
+
then do
"MSemN.signalMSemN: impossible happened, the headWait MVar was full"
+
m' <- evaluate $ m { avail = avail m - 1 }
else return (m { avail = avail' })
+
return (m', Nothing)
  +
else do
  +
return (m, Just (headWait m))
  +
-- mask_ is needed above because we may have just decremented 'avail' and we must finished 'wait'
  +
-- without being interrupted so that a 'bracket' can ensure a matching 'signal' can be ensured.
  +
case todo of
  +
Nothing -> return ()
  +
Just hw -> takeMVar hw -- actually may or may not block, a 'signal' could have already arrived.
   
  +
-- | 'signal' adds one unit to the sempahore.
{-
 
  +
--
  +
-- 'signal' may block, but it cannot be interrupted, which allows it to dependably restore value to
  +
-- the 'MSem'. All 'signal', 'peekAvail', and the head waiter may momentarily block in a fair FIFO
  +
-- manner.
  +
signal :: MSem -> IO ()
  +
signal (MSem sem _) = uninterruptibleMask_ $ modifyMVar_ sem $ \ m -> do
  +
-- mask_ might be as good as uninterruptibleMask_ since nothing below can block
  +
if avail m < 0
  +
then evaluate m { avail = avail m + 1 }
  +
else do
  +
didPlace <- tryPutMVar (headWait m) ()
  +
if didPlace
  +
then return m
  +
else evaluate m { avail = avail m + 1 }
   
  +
-- | 'peekAvail' skips the queue of any blocked 'wait' threads, but may momentarily block on
-- |'queryMSemN' returns two value, the first is the available value in the semaphore. The second
 
-- value, if not Nothing, is Just the value wanted by the first blocked waiter. If the second value
+
-- 'signal', other 'peekAvail', and the head waiter. This returns the amount of value available to
  +
-- be taken. Using this value without producing unwanted race conditions is left up to the
-- is Nothing that does not imply there are no blocked waiters.
 
--
+
-- programmer.
  +
--
-- Warning: the first value may be momentarily wrong (and the second Nothing) if the previous waiter
 
  +
-- Note that "Control.Concurrent.MSemN" offers a more powerful API for making decisions based on the available amount.
-- died between being signaled and receiving its wanted value.
 
queryMSemN :: MSemN -> IO (Int,Maybe Int)
+
peekAvail :: MSem -> IO Integer
queryMSemN (MSemN sem) = withMVar sem $ \ m -> return (avail m, headWants m)
+
peekAvail (MSem sem _) = mask_ $ withMVar sem $ \ m -> do
  +
extraFlag <- tryTakeMVar (headWait m)
 
  +
case extraFlag of
-}
 
  +
Nothing -> return (avail m)
  +
Just () -> do putMVar (headWait m) () -- cannot block
  +
return (1 + avail m)
 
</haskell>
 
</haskell>
   
= QSemN =
+
= MSemN =
   
  +
The API for MSemN follows QSemN with several more complicated additions. All quantity arguments may be negative, zero, or positive. There are waitF, signalF, and withF operations that take a pure function to computes the quantity change based on the current quantity in the semaphore. And peekAvail was added to query the semaphore's quantity.
This is a slightly improved version of QSemN that should be exception safe. It is also nearly, but not quite exception correct. There is still a race between the dying waitQSemN and the signalQSemN.
 
   
 
<haskell>
 
<haskell>
  +
{-# LANGUAGE DeriveDataTypeable #-}
-----------------------------------------------------------------------------
 
-- |
+
-- |
-- Module : Control.Concurrent.QSemN
+
-- Module : Control.Concurrent.MSemN
-- Copyright : (c) The University of Glasgow 2001
+
-- Copyright : (c) Chris Kuklewicz 2011
-- License : BSD-style (see the file libraries/base/LICENSE)
+
-- License : 3 clause BSD-style (see the file LICENSE)
 
--
 
--
-- Maintainer : libraries@haskell.org
+
-- Maintainer : haskell@list.mightyreason.com
 
-- Stability : experimental
 
-- Stability : experimental
 
-- Portability : non-portable (concurrency)
 
-- Portability : non-portable (concurrency)
 
--
 
--
-- Quantity semaphores in which each thread may wait for an arbitrary
+
-- Quantity semaphores in which each thread may wait for an arbitrary amount. This modules is
  +
-- intended to improve on "Control.Concurrent.QSemN".
-- \"amount\". Modified by Chris Kuklewicz to make it exception safe.
 
  +
--
  +
-- This semaphore gracefully handles threads which die while blocked waiting for quantity. The
  +
-- fairness guarantee is that blocked threads are FIFO. An early thread waiting for a large
  +
-- quantity will prevent a later thread waiting for a small quantity from jumping the queue.
 
--
 
--
  +
-- If 'with' is used to guard a critical section then no quantity of the semaphore will be lost
-----------------------------------------------------------------------------
 
  +
-- if the activity throws an exception.
  +
--
  +
module Control.Concurrent.MSemN
  +
(MSemN
  +
,new
  +
,with
  +
,wait
  +
,signal
  +
,withF
  +
,waitF
  +
,signalF
  +
,peekAvail
  +
) where
   
  +
import Control.Concurrent.MVar(MVar,withMVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,putMVar,takeMVar,tryTakeMVar)
module Control.Concurrent.QSemN
 
  +
import Control.Exception(bracket,uninterruptibleMask_,onException,evaluate,mask_)
( -- * General Quantity Semaphores
 
  +
import Data.Typeable(Typeable)
QSemN, -- abstract
 
newQSemN, -- :: Int -> IO QSemN
 
waitQSemN, -- :: QSemN -> Int -> IO ()
 
signalQSemN -- :: QSemN -> Int -> IO ()
 
) where
 
   
  +
{-
import Prelude
 
   
  +
The only MVars allocated are the three created be 'new'. Their three roles are
import Control.Concurrent.MVar
 
  +
1) to have a FIFO queue of waiters
import Control.Exception(block,onException)
 
  +
2) for the head waiter to block on
import Data.Typeable
 
  +
3) to protect the quantity state of the semaphore and the head waiter
   
  +
subtle design notes:
#include "Typeable.h"
 
   
-- |A 'QSemN' is a quantity semaphore, in which the available
+
with, wait, and signal pattern match the quantity against 0 which has two effect: it avoids locking
  +
in the easy case and it ensures strict evaluation of the quantity before any locks are taken.
-- \"quantity\" may be signalled or waited for in arbitrary amounts.
 
newtype QSemN = QSemN (MVar (Int,[(Int,MVar ())]))
 
   
  +
Originally withF, waitF, and signal did not strictly evalaute the function they are passed before
INSTANCE_TYPEABLE0(QSemN,qSemNTc,"QSemN")
 
  +
locks are taken because there is no real point since the function may throw an error when computing
  +
the size. But then I realized forcing 'f' might run forever with the locks held and I could move
  +
this particular hang outside the locks by first evaluating 'f'.
   
  +
-}
-- |Build a new 'QSemN' with a supplied initial quantity.
 
newQSemN :: Int -> IO QSemN
 
newQSemN initial = do
 
sem <- newMVar (initial, [])
 
return (QSemN sem)
 
   
  +
-- MS has an invariant that "maybe True (> avail) headWants" is always True.
-- |Wait for the specified quantity to become available
 
  +
data MS = MS { avail :: !Integer -- ^ This is the quantity available to be taken from the semaphore. Often updated.
waitQSemN :: QSemN -> Int -> IO ()
 
  +
, headWants :: !(Maybe Integer) -- ^ If there is waiter then this is Just the amount being waited for. Often updated.
waitQSemN (QSemN sem) sz = block $ do
 
  +
, headWait :: MVar () -- ^ The head of the waiter queue blocks on headWait. Never updated.
todo <- modifyMVar sem $ \ (avail,blocked) -> do
 
if (avail - sz) >= 0
+
}
  +
deriving (Eq,Typeable)
then return ((avail-sz,blocked),Nothing)
 
else do
 
block <- newEmptyMVar
 
return ((avail, blocked++[(sz,block)]),Just block)
 
case todo of
 
Nothing -> return ()
 
Just block -> onException (takeMVar block) (tryPutMVar block ())
 
   
-- |Signal that a given quantity is now available from the 'QSemN'.
+
-- | A 'MSemN' is a quantity semaphore, in which the available quantity may be signalled or
  +
-- waited for in arbitrary amounts.
signalQSemN :: QSemN -> Int -> IO ()
 
  +
data MSemN = MSemN { mSem :: !(MVar MS) -- ^ Used to lock access to state of semaphore quantity. Never updated.
signalQSemN (QSemN sem) n = block $ modifyMVar_ sem $
 
  +
, queueWait :: !(MVar ()) -- ^ Used as FIFO queue for waiter, held by head of queue. Never updated.
\ (avail,blocked) -> free (avail+n) blocked
 
  +
}
where
 
  +
deriving (Eq,Typeable)
free avail [] = return (avail,[])
 
free avail ((req,block):blocked)
 
| avail >= req = do
 
ok <- tryPutMVar block ()
 
if ok then free (avail-req) blocked
 
else free avail blocked
 
| otherwise = do
 
(avail',blocked') <- free avail blocked
 
return (avail',(req,block):blocked')
 
</haskell>
 
   
  +
-- |'new' allows positive, zero, and negative initial values. The initial value is forced here to
= SampleVar =
 
  +
-- better localize errors.
  +
new :: Integer -> IO MSemN
  +
new initial = do
  +
newHeadWait <- newEmptyMVar
  +
newQueueWait <- newMVar ()
  +
newMS <- newMVar $! (MS { avail = initial
  +
, headWants = Nothing
  +
, headWait = newHeadWait })
  +
return (MSemN { mSem = newMS
  +
, queueWait = newQueueWait })
   
  +
-- | 'with' takes a quantity of the semaphore to take and hold while performing the provided
This keeps the documented behavior of SampleVar, but not all the detailed behavior.
 
  +
-- operation. 'with' ensures the quantity of the sempahore cannot be lost if there are exceptions.
  +
-- This uses 'bracket' to ensure 'wait' and 'signal' get called correctly.
  +
with :: MSemN -> Integer -> IO a -> IO a
  +
with _ 0 = id
  +
with m wanted = bracket (wait m wanted) (\() -> signal m wanted) . const
   
  +
-- | 'withF' takes a pure function and an operation. The pure function converts the available
<haskell>
 
  +
-- quantity to a pair of the wanted quantity and a returned value. The operation takes the result
-- |Proposed replacement for SampleVar that keeps most of the behavior and is now exception safe.
 
  +
-- of the pure function. 'withF' ensures the quantity of the sempahore cannot be lost if there
  +
-- are exceptions. This uses 'bracket' to ensure 'waitF' and 'signal' get called correctly.
 
--
 
--
  +
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- By Chris Kuklewicz
 
  +
-- evaluated.
module Control.Concurrent.SampleVar
 
  +
withF :: MSemN -> (Integer -> (Integer,b)) -> ((Integer,b) -> IO a) -> IO a
(
 
  +
withF m f = seq f $ bracket (waitF m f) (\(wanted,_) -> signal m wanted)
-- * Sample Variables
 
SampleVar, -- :: type _ =
 
 
newEmptySampleVar, -- :: IO (SampleVar a)
 
newSampleVar, -- :: a -> IO (SampleVar a)
 
emptySampleVar, -- :: SampleVar a -> IO ()
 
readSampleVar, -- :: SampleVar a -> IO a
 
tryReadSampleVar, -- :: SampleVar a -> IO a
 
writeSampleVar, -- :: SampleVar a -> a -> IO ()
 
isEmptySampleVar, -- :: SampleVar a -> IO Bool
 
   
  +
-- |'wait' allow positive, zero, and negative wanted values. Waiters may block, and will be handled
) where
 
  +
-- fairly in FIFO order.
  +
--
  +
-- If 'wait' returns without interruption then it left the 'MSemN' with a remaining quantity that was
  +
-- greater than or equal to zero. If 'wait' is interrupted then no quantity is lost. If 'wait'
  +
-- returns without interruption then it is known that each earlier waiter has definitely either been
  +
-- interrupted or has retured without interruption.
  +
wait :: MSemN -> Integer -> IO ()
  +
wait _ 0 = return ()
  +
wait m wanted = fmap snd $ waitF m (const (wanted,()))
   
  +
-- | 'waitWith' takes the 'MSemN' and a pure function that takes the available quantity and computes the
import Prelude
 
  +
-- amount wanted and a second value. The value wanted is stricly evaluated but the second value is
 
  +
-- returned lazily.
import Control.Concurrent.MVar
 
import Control.Exception(block)
 
 
-- |
 
-- Sample variables are slightly different from a normal 'MVar':
 
--
 
-- * Reading an empty 'SampleVar' causes the reader to block.
 
-- (same as 'takeMVar' on empty 'MVar')
 
--
 
-- * Reading a filled 'SampleVar' empties it and returns value.
 
-- (same as 'takeMVar')
 
 
--
 
--
  +
-- 'waitF' allow positive, zero, and negative wanted values. Waiters may block, and will be handled
-- * Try reading a filled 'SampleVar' returns a Maybe value.
 
  +
-- fairly in FIFO order.
-- (same as 'tryTakeMVar')
 
--
 
-- * Writing to an empty 'SampleVar' fills it with a value, and
 
-- potentially, wakes up a blocked reader (same as for 'putMVar' on
 
-- empty 'MVar').
 
 
--
 
--
  +
-- If 'waitF' returns without interruption then it left the 'MSemN' with a remaining quantity that was
-- * Writing to a filled 'SampleVar' overwrites the current value.
 
  +
-- greater than or equal to zero. If 'waitF' or the provided function are interrupted then no
-- (different from 'putMVar' on full 'MVar'.)
 
  +
-- quantity is lost. If 'waitF' returns without interruption then it is known that each previous
  +
-- waiter has each definitely either been interrupted or has retured without interruption.
  +
--
  +
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
  +
-- evaluated.
  +
waitF :: MSemN -> (Integer -> (Integer,b)) -> IO (Integer,b)
  +
waitF (MSemN sem advance) f = seq f $ mask_ $ withMVar advance $ \ () -> do
  +
(out@(wanted,_),todo) <- modifyMVar sem $ \ m -> do
  +
let outVal@(wantedVal,_) = f (avail m)
  +
-- assert that headDown is Nothing via new or signal or cleanup
  +
-- wantedVal gets forced by the (<=) condition here:
  +
if wantedVal <= avail m
  +
then do
  +
let avail'down = avail m - wantedVal
  +
m' <- evaluate $ m { avail = avail'down }
  +
return (m', (outVal,Nothing))
  +
else do
  +
m' <- evaluate $ m { headWants = Just wantedVal }
  +
return (m', (outVal,Just (headWait m)))
  +
-- mask_ is needed above because either (Just wantedVal) may be set here and this means we need to
  +
-- get the `onException` setup without being interrupted, or avail'down was set and we must finish
  +
-- 'waitF' without being interrupted so that a 'bracket' can ensure a matching 'signal' can
  +
-- protect the returned quantity.
  +
case todo of
  +
Nothing -> return ()
  +
Just hw -> do
  +
let cleanup = uninterruptibleMask_ $ modifyMVar_ sem $ \m -> do
  +
mStale <- tryTakeMVar (headWait m)
  +
let avail' = avail m + maybe 0 (const wanted) mStale
  +
evaluate $ m {avail = avail', headWants = Nothing}
  +
takeMVar hw `onException` cleanup -- may not block if a 'signal' has already arrived.
  +
return out
   
  +
-- |'signal' allows positive, zero, and negative values, thus this is also way to remove quantity
data SampleVar a = SampleVar { readQueue :: MVar ()
 
  +
-- that skips any threads in the 'wait'/'waitF' queue. If the new total is greater than the next
, lockedStore :: MVar (MVar a) }
 
  +
-- value being waited for (if present) then the first waiter is woken. If there are queued waiters
  +
-- then the next one will wake after a waiter has proceeded and notice the remaining value; thus a
  +
-- single 'signal' may result in several waiters obtaining values. Waking waiting threads is
  +
-- asynchronous.
  +
--
  +
-- 'signal' may block, but it cannot be interrupted, which allows it to dependably restore value to
  +
-- the 'MSemN'. All 'signal', 'signalF', 'peekAvail', and the head waiter may momentarily block in a
  +
-- fair FIFO manner.
  +
signal :: MSemN -> Integer -> IO ()
  +
signal _ 0 = return ()
  +
signal m size = uninterruptibleMask_ $ fmap snd $ signalF m (const (size,()))
   
  +
-- | Instead of providing a fixed change to the available quantity, 'signalF' applies a provided
-- |Build a new, empty, 'SampleVar'
 
  +
-- pure function to the available quantity to compute the change and a second value. The
newEmptySampleVar :: IO (SampleVar a)
 
  +
-- requested change is stricly evaluated but the second value is returned lazily. If the new total is
newEmptySampleVar = do
 
  +
-- greater than the next value being waited for then the first waiter is woken. If there are queued
newReadQueue <- newMVar ()
 
  +
-- waiters then the next one will wake after a waiter has proceeded and notice the remaining value;
newLockedStore <- newMVar =<< newEmptyMVar
 
  +
-- thus a single 'signalF' may result in several waiters obtaining values. Waking waiting threads
return (SampleVar { readQueue = newReadQueue
 
  +
-- is asynchronous.
, lockedStore = newLockedStore })
 
  +
--
 
  +
-- 'signalF' may block, and it can be safely interrupted. If the provided function throws an error
-- |Build a 'SampleVar' with an initial value.
 
  +
-- or is interrupted then it leaves the 'MSemN' unchanged. All 'signal', 'signalF', 'peekAvail', and
newSampleVar :: a -> IO (SampleVar a)
 
  +
-- the head waiter may momentarily block in a fair FIFO manner.
newSampleVar value = do
 
  +
--
newReadQueue <- newMVar ()
 
  +
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
newLockedStore <- newMVar =<< newMVar value
 
  +
-- evaluated.
return (SampleVar { readQueue = newReadQueue
 
  +
signalF :: MSemN -> (Integer -> (Integer,b)) -> IO (Integer,b)
, lockedStore = newLockedStore })
 
  +
signalF (MSemN sem _) f = seq f $ modifyMVar sem $ \ m -> do
 
  +
let out@(size,_) = f (avail m)
-- |If the SampleVar is full, leave it empty. Otherwise, do nothing. This jumps the FIFO queue of
 
  +
avail' <- evaluate $ avail m + size -- this forces 'size'
-- readers. This may momentarily block.
 
  +
case headWants m of
emptySampleVar :: SampleVar a -> IO ()
 
  +
Just wanted | wanted <= avail' -> do
emptySampleVar svar = block $ withMVar (lockedStore svar) $ \ store -> do
 
  +
let avail'down = avail' - wanted
_ <- tryTakeMVar store
 
  +
m' <- evaluate $ m { avail = avail'down, headWants = Nothing }
return ()
 
  +
putMVar (headWait m') () -- will always succeed without blocking
 
  +
return (m',out)
-- |Wait for a value to become available, then take it and return. This may block indefinately
 
  +
_ -> do
-- waiting for a value.
 
  +
m' <- evaluate $ m { avail = avail' }
readSampleVar :: SampleVar a -> IO a
 
  +
return (m',out)
readSampleVar svar = block $
 
withMVar (readQueue svar) $ \ _ -> do
 
todo <- withMVar (lockedStore svar) $ \ store -> do
 
maybeValue <- tryTakeMVar store
 
case maybeValue of
 
Nothing -> return (Left store)
 
Just value -> return (Right value)
 
-- postcondition for the withMVar (lockedStore svar) is that the store is empty.
 
case todo of
 
Left store -> takeMVar store
 
Right value -> return value -- block indefinately
 
 
-- |See if a value is immediately available, then take it and return. This may momentarily block.
 
-- This does not jump the FIFO reading queue.
 
tryReadSampleVar :: SampleVar a -> IO (Maybe a)
 
tryReadSampleVar svar = block $ do
 
maybeH <- tryTakeMVar (readQueue svar)
 
case maybeH of
 
Nothing -> return Nothing
 
Just h -> do
 
maybeVal <- withMVar (lockedStore svar) tryTakeMVar
 
putMVar (readQueue svar) h
 
return maybeVal
 
 
-- |Write a value into the 'SampleVar', overwriting any previous value that was there. A currently
 
-- blocked reader will find the new value, not the old value.
 
writeSampleVar :: SampleVar a -> a -> IO ()
 
writeSampleVar svar value = do
 
withMVar (lockedStore svar) $ \ store -> do
 
_ <- tryTakeMVar store
 
putMVar store value
 
-- postcondition for the withMVar (lockedStore svar) is that the store is full.
 
   
  +
-- | 'peekAvail' skips the queue of any blocked 'wait' and 'waitF' threads, but may momentarily
-- | Returns 'True' if the 'SampleVar' is currently empty.
 
  +
-- block on 'signal', 'signalF', other 'peekAvail', and the head waiter. This returns the amount of
  +
-- value available to be taken. Using this value without producing unwanted race conditions is left
  +
-- up to the programmer.
 
--
 
--
  +
-- 'peekAvail' is an optimized form of \"signalF m (\x -> (0,x))\".
-- Note that this function is only useful if you know that no other
 
-- threads can be modifying the state of the 'SampleVar', because
 
-- otherwise the state of the 'SampleVar' may have changed by the time
 
-- you see the result of 'isEmptySampleVar'.
 
 
--
 
--
  +
-- A version of 'peekAvail' that joins the FIFO queue of 'wait' and 'waitF' can be acheived by
-- This may momentarily block
 
  +
-- \"waitF m (\x -> (0,x))\"
isEmptySampleVar :: SampleVar a -> IO Bool
 
  +
peekAvail :: MSemN -> IO Integer
isEmptySampleVar svar = withMVar (lockedStore svar) isEmptyMVar
 
  +
peekAvail (MSemN sem _) = withMVar sem (return . avail)
 
</haskell>
 
</haskell>

Latest revision as of 13:06, 12 April 2011


Motivation

The base package (versions 3 and 4) implementations of Control.Concurrent.QSem and QSemN (and perhaps SamepleVar) are not exception safe. The proposed replacement code is on hackage as SafeSemaphore and source from version 0.4.1 is also included on this page.

Exception correctness means that the semaphore does not lose any of its quantity if the waiter or signaler is interrupted before the operation finishes. QSem and QSemN violate this safetly.

SafeSemaphore defines MSem as the proposed replacements for QSem, and MSemQ as the proposed replacement for QSemN.

The SampleVar module in base also has the same kind of bug, but with SampleVar the rutnime error is worse because it can case writeSampleVar to block indefinitely. The SafeSemaphore package as of version 0.5.0 has a MSampleVar module that does not have this bug.

The GHC ticket is #3160.

TestKillSem

The problem with QSem and QSemN is that a blocked waiter might be killed. This does not prevent a later signal from trying to pass quantity to a dead thread. This quantity is thus thrown out, a blatantly leaky abstraction. This is illustrated by the tests/TestKillSem.hs program in the SafeSemaphore package (run with cabal test, please read the log generated).

The program, preceded by its output as a comment is:

{- output demonstrating fate of thread 3:

Cases: 4  Tried: 0  Errors: 0  Failures: 0
Test QSem
0: forkIO wait thread 1
0: stop thread 1
1: wait interrupted
0: signal q #1
0: forkIO wait thread 2
0: forkIO wait thread 3
0: signal q #2
2: wait done
0: stop thread 2
0: stop thread 3
3: wait interrupted (QUANTITY LOST) FAIL
### Failure in: 0                         

Cases: 4  Tried: 1  Errors: 0  Failures: 1
Test QSemN
0: forkIO wait thread 1
0: stop thread 1
1: wait interrupted
0: signal q #1
0: forkIO wait thread 2
0: forkIO wait thread 3
0: signal q #2
2: wait done
0: stop thread 2
0: stop thread 3
3: wait interrupted (QUANTITY LOST) FAIL
### Failure in: 1                         

Cases: 4  Tried: 2  Errors: 0  Failures: 2
Test MSem
0: forkIO wait thread 1
0: stop thread 1
1: wait interrupted
0: signal q #1
0: forkIO wait thread 2
2: wait done
0: forkIO wait thread 3
0: signal q #2
3: wait done (QUANTITY CONSERVED) PASS
0: stop thread 2
0: stop thread 3
Cases: 4  Tried: 3  Errors: 0  Failures: 2
Test MSemN
0: forkIO wait thread 1
0: stop thread 1
1: wait interrupted
0: signal q #1
0: forkIO wait thread 2
2: wait done
0: forkIO wait thread 3
0: signal q #2
3: wait done (QUANTITY CONSERVED) PASS
0: stop thread 2
0: stop thread 3
Cases: 4  Tried: 4  Errors: 0  Failures: 2

-}
module Main where

import Control.Concurrent
import Control.Exception
import Control.Concurrent.QSem
import Control.Concurrent.QSemN
import qualified Control.Concurrent.MSem as MSem
import qualified Control.Concurrent.MSemN as MSemN
import Control.Concurrent.MVar
import Test.HUnit
import System.Exit

delay = threadDelay (1000*100)

fork x = do m <- newEmptyMVar
            t <- forkIO (finally x (putMVar m ()))
            delay
            return (t,m)

stop (t,m) = do killThread t
                delay
                takeMVar m

-- True if test passed, False if test failed
testSem :: Integral n 
        => String
        -> (n -> IO a) 
        -> (a->IO ()) 
        -> (a -> IO ()) 
        -> IO Bool
testSem name new wait signal = do
  putStrLn ("\nTest "++ name)
  q <- new 0
  putStrLn "0: forkIO wait thread 1"
  (t1,m1) <- fork $ do
    wait q `onException` (putStrLn "1: wait interrupted")
    putStrLn "1: wait done UNEXPECTED"
  putStrLn "0: stop thread 1"
  stop (t1,m1)
  putStrLn "0: signal q #1"
  signal q
  delay
  putStrLn "0: forkIO wait thread 2"
  (t2,m2) <- fork $ do
    wait q `onException` (putStrLn "2: wait interrupted UNEXPECTED")
    putStrLn "2: wait done"
  putStrLn "0: forkIO wait thread 3"
  result <- newEmptyMVar
  (t3,m3) <- fork $ do
    wait q `onException` (putStrLn "3: wait interrupted (QUANTITY LOST) FAIL" >> putMVar result False)
    putStrLn "3: wait done (QUANTITY CONSERVED) PASS"
    putMVar result True
  putStrLn "0: signal q #2"
  signal q
  delay
  putStrLn "0: stop thread 2"
  stop (t2,m2)
  putStrLn "0: stop thread 3"
  stop (t3,m3)
  takeMVar result

testsQ = TestList . map test $
  [ testSem "QSem" newQSem waitQSem signalQSem
  , testSem "QSemN" newQSemN (flip waitQSemN 1) (flip signalQSemN 1)
  ]

testsM = TestList . map test $
  [ testSem "MSem" MSem.new MSem.wait MSem.signal
  , testSem "MSemN" MSemN.new (flip MSemN.wait 1) (flip MSemN.signal 1)
  ]

-- This is run by "cabal test"
main = do
  runTestTT testsQ
  c <- runTestTT testsM
  if failures c == 0 then exitSuccess else exitFailure

This shows that quantity can be easily lost when using a QSem or QSemN, and shows that MSem and MSemN do not have this problem.

MSem

This code should be exception safe and exception correct. The API for QSem is slightly extended to allow peekAvail to query the amount of content in the semaphore. The semantics of QSem are slightly extended to allow a new MSem to be initialized with negative, zero, or positive quantity. The use of Int has been replaced with Integer. The wait operation has been added to encourage safely bracketing wait and signal.

Note that it does not allocate any MVars to manage the waiting queue. Only MSem.new allocates them. This should be more efficient than QSem.

{-# LANGUAGE DeriveDataTypeable #-}
-- | 
-- Module      :  Control.Concurrent.MSem
-- Copyright   :  (c) Chris Kuklewicz 2011
-- License     :  3 clause BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  haskell@list.mightyreason.com
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- A semaphore in which operations may 'wait' for or 'signal' single units of value.  This modules
-- is intended to improve on "Control.Concurrent.QSem".
-- 
-- This semaphore gracefully handles threads which die while blocked waiting.  The fairness
-- guarantee is that blocked threads are FIFO.
--
-- If 'with' is used to guard a critical section then no quantity of the semaphore will be lost if
-- the activity throws an exception. 'new' can initialize the semaphore to negative, zero, or
-- positive quantity. 'wait' always leaves the 'MSem' with non-negative quantity.
module Control.Concurrent.MSem
    (MSem
    ,new
    ,with
    ,wait
    ,signal
    ,peekAvail
    ) where

import Control.Concurrent.MVar(MVar,withMVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,putMVar,takeMVar,tryTakeMVar,tryPutMVar)
import Control.Exception(bracket_,uninterruptibleMask_,evaluate,mask_)
import Data.Typeable(Typeable)

{- design notes are in MSemN.hs -}

data MS = MS { avail :: !Integer     -- ^ This is the quantity available to be taken from the semaphore. Often updated.
             , headWait :: MVar ()   -- ^ The head of the waiter queue blocks on headWait. Never updated.
             }
  deriving (Eq,Typeable)

-- | A 'MSem' is a semaphore in which the available quantity can be added and removed in single
--  units, and which can start with positive, zero, or negative value.
data MSem = MSem { mSem :: !(MVar MS)      -- ^ Used to lock access to state of semaphore quantity. Never updated.
                 , queueWait :: !(MVar ()) -- ^ Used as FIFO queue for waiter, held by head of queue.  Never updated.
                 }
  deriving (Eq,Typeable)

-- |'new' allows positive, zero, and negative initial values.  The initial value is forced here to
-- better localize errors.
new :: Integer -> IO MSem
new initial = do
  newHeadWait <- newEmptyMVar
  newQueueWait <- newMVar ()
  newMS <- newMVar $! (MS { avail = initial
                          , headWait = newHeadWait })
  return (MSem { mSem = newMS
               , queueWait = newQueueWait })

-- | 'with' takes a unit of value from the semaphore to hold while performing the provided
-- operation.  'with' ensures the quantity of the sempahore cannot be lost if there are exceptions.
--
-- 'with' uses 'bracket_' to ensure 'wait' and 'signal' get called correctly.
with :: MSem -> IO a -> IO a
with m = bracket_ (wait m)  (signal m)

-- |'wait' will take one unit of value from the sempahore, but will block if the quantity available
-- is not positive.
--
-- If 'wait' returns without interruption then it left the 'MSem' with a remaining quantity that was
-- greater than or equal to zero.  If 'wait' is interrupted then no quantity is lost.  If 'wait'
-- returns without interruption then it is known that each earlier waiter has definitely either been
-- interrupted or has retured without interruption.
wait :: MSem -> IO ()
wait (MSem sem advance) = mask_ $ withMVar advance $ \ () -> do
  todo <- mask_ $ modifyMVar sem $ \ m -> do
    mayGrab <- tryTakeMVar (headWait m)
    case mayGrab of
      Just () -> return (m,Nothing)
      Nothing -> if 1 <= avail m
                   then do
                     m' <- evaluate $ m { avail = avail m - 1 }
                     return (m', Nothing)
                   else do
                     return (m, Just (headWait m))
  -- mask_ is needed above because we may have just decremented 'avail' and we must finished 'wait'
  -- without being interrupted so that a 'bracket' can ensure a matching 'signal' can be ensured.
  case todo of
    Nothing -> return ()
    Just hw -> takeMVar hw -- actually may or may not block, a 'signal' could have already arrived.

-- | 'signal' adds one unit to the sempahore.
--
-- 'signal' may block, but it cannot be interrupted, which allows it to dependably restore value to
-- the 'MSem'.  All 'signal', 'peekAvail', and the head waiter may momentarily block in a fair FIFO
-- manner.
signal :: MSem -> IO ()
signal (MSem sem _) = uninterruptibleMask_ $ modifyMVar_ sem $ \ m -> do
  -- mask_ might be as good as uninterruptibleMask_ since nothing below can block
  if avail m < 0
    then evaluate m { avail = avail m + 1 }
    else do
      didPlace <- tryPutMVar (headWait m) ()
      if didPlace
        then return m
        else evaluate m { avail = avail m + 1 }

-- | 'peekAvail' skips the queue of any blocked 'wait' threads, but may momentarily block on
-- 'signal', other 'peekAvail', and the head waiter. This returns the amount of value available to
-- be taken.  Using this value without producing unwanted race conditions is left up to the
-- programmer.
--
-- Note that "Control.Concurrent.MSemN" offers a more powerful API for making decisions based on the available amount.
peekAvail :: MSem -> IO Integer
peekAvail (MSem sem _) = mask_ $ withMVar sem $  \ m -> do
  extraFlag <- tryTakeMVar (headWait m)
  case extraFlag of
    Nothing -> return (avail m)
    Just () -> do putMVar (headWait m) () -- cannot block
                  return (1 + avail m)

MSemN

The API for MSemN follows QSemN with several more complicated additions. All quantity arguments may be negative, zero, or positive. There are waitF, signalF, and withF operations that take a pure function to computes the quantity change based on the current quantity in the semaphore. And peekAvail was added to query the semaphore's quantity.

{-# LANGUAGE DeriveDataTypeable #-}
-- | 
-- Module      :  Control.Concurrent.MSemN
-- Copyright   :  (c) Chris Kuklewicz 2011
-- License     :  3 clause BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  haskell@list.mightyreason.com
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Quantity semaphores in which each thread may wait for an arbitrary amount.  This modules is
-- intended to improve on "Control.Concurrent.QSemN".
-- 
-- This semaphore gracefully handles threads which die while blocked waiting for quantity.  The
-- fairness guarantee is that blocked threads are FIFO.  An early thread waiting for a large
-- quantity will prevent a later thread waiting for a small quantity from jumping the queue.
--
-- If 'with' is used to guard a critical section then no quantity of the semaphore will be lost
-- if the activity throws an exception.
--
module Control.Concurrent.MSemN
    (MSemN
    ,new
    ,with
    ,wait
    ,signal
    ,withF
    ,waitF
    ,signalF
    ,peekAvail
    ) where

import Control.Concurrent.MVar(MVar,withMVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,putMVar,takeMVar,tryTakeMVar)
import Control.Exception(bracket,uninterruptibleMask_,onException,evaluate,mask_)
import Data.Typeable(Typeable)

{- 

The only MVars allocated are the three created be 'new'.  Their three roles are
1) to have a FIFO queue of waiters
2) for the head waiter to block on
3) to protect the quantity state of the semaphore and the head waiter

subtle design notes:

with, wait, and signal pattern match the quantity against 0 which has two effect: it avoids locking
in the easy case and it ensures strict evaluation of the quantity before any locks are taken.

Originally withF, waitF, and signal did not strictly evalaute the function they are passed before
locks are taken because there is no real point since the function may throw an error when computing
the size.  But then I realized forcing 'f' might run forever with the locks held and I could move
this particular hang outside the locks by first evaluating 'f'.

-}

-- MS has an invariant that "maybe True (> avail) headWants" is always True.
data MS = MS { avail :: !Integer             -- ^ This is the quantity available to be taken from the semaphore. Often updated.
             , headWants :: !(Maybe Integer) -- ^ If there is waiter then this is Just the amount being waited for. Often updated.
             , headWait :: MVar ()           -- ^ The head of the waiter queue blocks on headWait. Never updated.
             }
  deriving (Eq,Typeable)

-- | A 'MSemN' is a quantity semaphore, in which the available quantity may be signalled or
-- waited for in arbitrary amounts.
data MSemN = MSemN { mSem :: !(MVar MS)      -- ^ Used to lock access to state of semaphore quantity. Never updated.
                   , queueWait :: !(MVar ()) -- ^ Used as FIFO queue for waiter, held by head of queue.  Never updated.
                   }
  deriving (Eq,Typeable)

-- |'new' allows positive, zero, and negative initial values.  The initial value is forced here to
-- better localize errors.
new :: Integer -> IO MSemN
new initial = do
  newHeadWait <- newEmptyMVar
  newQueueWait <- newMVar ()
  newMS <- newMVar $! (MS { avail = initial
                          , headWants = Nothing
                          , headWait = newHeadWait })
  return (MSemN { mSem = newMS
                , queueWait = newQueueWait })

-- | 'with' takes a quantity of the semaphore to take and hold while performing the provided
-- operation.  'with' ensures the quantity of the sempahore cannot be lost if there are exceptions.
-- This uses 'bracket' to ensure 'wait' and 'signal' get called correctly.
with :: MSemN -> Integer -> IO a -> IO a
with _ 0 = id
with m wanted = bracket (wait m wanted)  (\() -> signal m wanted) . const

-- | 'withF' takes a pure function and an operation.  The pure function converts the available
-- quantity to a pair of the wanted quantity and a returned value.  The operation takes the result
-- of the pure function.  'withF' ensures the quantity of the sempahore cannot be lost if there
-- are exceptions.  This uses 'bracket' to ensure 'waitF' and 'signal' get called correctly.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
withF :: MSemN -> (Integer -> (Integer,b)) -> ((Integer,b) -> IO a) -> IO a
withF m f = seq f $ bracket (waitF m f)  (\(wanted,_) -> signal m wanted)

-- |'wait' allow positive, zero, and negative wanted values.  Waiters may block, and will be handled
-- fairly in FIFO order.
--
-- If 'wait' returns without interruption then it left the 'MSemN' with a remaining quantity that was
-- greater than or equal to zero.  If 'wait' is interrupted then no quantity is lost.  If 'wait'
-- returns without interruption then it is known that each earlier waiter has definitely either been
-- interrupted or has retured without interruption.
wait :: MSemN -> Integer -> IO ()
wait _ 0 = return ()
wait m wanted = fmap snd $ waitF m (const (wanted,()))

-- | 'waitWith' takes the 'MSemN' and a pure function that takes the available quantity and computes the
-- amount wanted and a second value.  The value wanted is stricly evaluated but the second value is
-- returned lazily.
--
-- 'waitF' allow positive, zero, and negative wanted values.  Waiters may block, and will be handled
-- fairly in FIFO order.
--
-- If 'waitF' returns without interruption then it left the 'MSemN' with a remaining quantity that was
-- greater than or equal to zero.  If 'waitF' or the provided function are interrupted then no
-- quantity is lost.  If 'waitF' returns without interruption then it is known that each previous
-- waiter has each definitely either been interrupted or has retured without interruption.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
waitF :: MSemN -> (Integer -> (Integer,b)) -> IO (Integer,b)
waitF (MSemN sem advance) f = seq f $ mask_ $ withMVar advance $ \ () -> do
  (out@(wanted,_),todo) <- modifyMVar sem $ \ m -> do
    let outVal@(wantedVal,_) = f (avail m)
    -- assert that headDown is Nothing via new or signal or cleanup
    -- wantedVal gets forced by the (<=) condition here:
    if wantedVal <= avail m
      then do
        let avail'down = avail m - wantedVal
        m' <- evaluate $ m { avail = avail'down }
        return (m', (outVal,Nothing))
      else do
        m' <- evaluate $ m { headWants = Just wantedVal }
        return (m', (outVal,Just (headWait m)))
  -- mask_ is needed above because either (Just wantedVal) may be set here and this means we need to
  -- get the `onException` setup without being interrupted, or avail'down was set and we must finish
  -- 'waitF' without being interrupted so that a 'bracket' can ensure a matching 'signal' can
  -- protect the returned quantity.
  case todo of
    Nothing -> return ()
    Just hw -> do
      let cleanup = uninterruptibleMask_ $ modifyMVar_ sem $ \m -> do
            mStale <- tryTakeMVar (headWait  m)
            let avail' = avail m + maybe 0 (const wanted) mStale
            evaluate $ m {avail = avail', headWants = Nothing}
      takeMVar hw `onException` cleanup -- may not block if a 'signal' has already arrived.
  return out

-- |'signal' allows positive, zero, and negative values, thus this is also way to remove quantity
-- that skips any threads in the 'wait'/'waitF' queue.  If the new total is greater than the next
-- value being waited for (if present) then the first waiter is woken.  If there are queued waiters
-- then the next one will wake after a waiter has proceeded and notice the remaining value; thus a
-- single 'signal' may result in several waiters obtaining values.  Waking waiting threads is
-- asynchronous.
--
-- 'signal' may block, but it cannot be interrupted, which allows it to dependably restore value to
-- the 'MSemN'.  All 'signal', 'signalF', 'peekAvail', and the head waiter may momentarily block in a
-- fair FIFO manner.
signal :: MSemN -> Integer -> IO ()
signal _ 0 = return ()
signal m size = uninterruptibleMask_ $ fmap snd $ signalF m (const (size,()))

-- | Instead of providing a fixed change to the available quantity, 'signalF' applies a provided
-- pure function to the available quantity to compute the change and a second value.  The
-- requested change is stricly evaluated but the second value is returned lazily.  If the new total is
-- greater than the next value being waited for then the first waiter is woken.  If there are queued
-- waiters then the next one will wake after a waiter has proceeded and notice the remaining value;
-- thus a single 'signalF' may result in several waiters obtaining values.  Waking waiting threads
-- is asynchronous.
--
-- 'signalF' may block, and it can be safely interrupted.  If the provided function throws an error
-- or is interrupted then it leaves the 'MSemN' unchanged.  All 'signal', 'signalF', 'peekAvail', and
-- the head waiter may momentarily block in a fair FIFO manner.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
signalF :: MSemN -> (Integer -> (Integer,b)) -> IO (Integer,b)
signalF (MSemN sem _) f = seq f $ modifyMVar sem $ \ m -> do
  let out@(size,_) = f (avail m)
  avail' <- evaluate $ avail m + size -- this forces 'size'
  case headWants m of
    Just wanted | wanted <= avail' -> do
      let avail'down = avail' - wanted
      m' <- evaluate $ m { avail = avail'down, headWants = Nothing }
      putMVar (headWait m') () -- will always succeed without blocking
      return (m',out)
    _ -> do
      m' <- evaluate $ m { avail = avail' }
      return (m',out)

-- | 'peekAvail' skips the queue of any blocked 'wait' and 'waitF' threads, but may momentarily
-- block on 'signal', 'signalF', other 'peekAvail', and the head waiter. This returns the amount of
-- value available to be taken.  Using this value without producing unwanted race conditions is left
-- up to the programmer.
--
-- 'peekAvail' is an optimized form of \"signalF m (\x -> (0,x))\".
--
-- A version of 'peekAvail' that joins the FIFO queue of 'wait' and 'waitF' can be acheived by
-- \"waitF m (\x -> (0,x))\"
peekAvail :: MSemN -> IO Integer
peekAvail (MSemN sem _) = withMVar sem (return . avail)