Concurrent programming in GHC
This page contains notes and information about how to write concurrent programs in GHC.
Please feel free to add stuff here (Edit page link at the bottom).
- Basic concurrency: forkIO and MVars. Read Tackling the awkward squad: monadic input/output, concurrency, exceptions, and foreign-language calls in Haskell.
The original paper about Concurrent Haskell contains quite a few examples about how to write concurrent programs. A larger example is Writing High-Performance Server Applications in Haskell, Case Study: A Haskell Web Server
- Software Transactional Memory (STM) is a new way to coordinate concurrent threads. There's a separate Wiki page devoted to STM.
- STM was added to GHC 6.4, and is described in the paper Composable memory transactions. The paper Lock-free data structures using Software Transactional Memory in Haskell gives further examples of concurrent programming using STM.
- Foreign function interface. If you are calling foreign functions in a concurrent program, you need to know about bound threads. They are described in a Haskell workshop paper, Extending the Haskell Foreign Function Interface with Concurrency. The GHC Commentary Supporting multi-threaded interoperation contains more detailed explanation of cooperation between FFI calls and multi-threaded runtime.
- Nested Data Parallelism. For an approach to exploiting the implicit parallelism in array programs for multiprocessors, see Data Parallel Haskell (work in progress).
Using concurrency in GHC
- You get access to concurrency operations by importing the library Control.Concurrent.
- The GHC manual gives a few useful flags that control scheduling (not usually necessary) RTS options.
As of version 6.5, GHC supports running programs in parallel on an SMP or multi-core machine. How to do it:
- You'll need to get a version of GHC that supports SMP. Either download a nightly snapshot distribution, or get the sources from darcs and build it yourself.
- You need to link your program using the -threaded switch. (NOTE: previously it was necessary to compile all code, including libraries, with the -smp switch, this is no longer the case. The -smp flag is now a synonym for -threaded).
- Run the program with +RTS -N2 to use 2 threads, for example. You should use a -N value equal to the number of CPU cores on your machine (not including Hyper-threading cores).
- Concurrent threads (forkIO and forkOS) will run in parallel, and you can also use the par combinator and Strategies from the Control.Parallel.Strategies module to create parallelism.
- Use +RTS -sstderr for timing stats.
- Glasgow Parallel Haskell
- Glasgow Distributed Haskell
throwTo & block statements considered harmful
Title: "throwTo statements considered harmful"
This is a short essay to prove that the current GHC concurrency implementation has a critical flaw.
The key problem is, at least in the presence of block/unblock, that Exceptions are never reliably delivered. And this is not a theoretical case, but can cause a hang in something as innocuous as "program A" given below.
Simon Marlow wrote:
>> I think people are misunderstanding the nature of a safepoint. The >> safepoint is a point at which you are prepared to have exceptions >> delivered. This does not mean that they *will* be delivered, just that they >> can. If you need to *wait* for an asynchronous exception, then you >> shouldn't be using them at all. > > Right. If a thread mostly runs inside 'block' with the occasional safe > point, then your exceptions are not really asynchronous, they're synchronous. > > > In this case, I'd say a better solution is to have an explicit event queue, > and instead of the safe point take an event from the queue. The action on > receiving an event can be to raise an exception, if necessary. > > Cheers, Simon
The implementation of asynchronous signals, as described by the paper "Asynchronous exceptions in Haskell
Simon Marlow, Simon Peyton Jones, Andy Moran and John Reppy, PLDI'01."
is fatally inconsistent with the implementation in GHC 6.4 and GHC 6.6 today.
The implemented semantics have strictly weaker guarantees and render programs using asynchronous expressions impossible to write correctly. The semantics in the paper were carefully designed to solve the problem laid out in the first sentence of the abstract:
"Asynchronous exceptions, such as timeouts, are important for robust, modular programs, but are extremely difficult to program with -- so much so that most programming languages either heavily restrict them or ban them altogether."
And I believe the paper succeeded. The paper shows how to replace other languages pervasive and intrusive error catching and handling code with much cleaner, clearer, and often more correct code.
The implementation in GHC has changed the behavior of throwTo from asynchronous (not-interruptible) to synchronous (interruptible?) as discussed in section 8 of the paper. This change, in and of itself, is not the fatal problem; as described in the paper a (forkIO (throwTo ...)) recovers the asynchronous behavior.
The fatal change between the paper and GHC comes from not following section 7.2 as published. Section "7.2 Implementation of throwTo" has two bullet point, and the second bullet point is (retyped, so typos are my own fault):
"As soon as a thread exits the scope of a 'block', and at regular intervals during execution inside 'unblock', it should check its queue of pending exceptions. If the queue is non-empty, the first exception from the queue should be raised."
A test of GHC 6.6 shows that this is not the case. Test program A:
> loop = block (print "alive") >> loop main = do tid <- forkIO loop threadDelay > 1 killThread tid
Program A, compiled with (-threaded) on a single CPU machine never halts. It will print "alive" forever while the the main thread is blocked on "killThread".
This is wh
As an aside, removing the threadDelay causes killThread to destroy the child before it can enter the block, thus showing the need to add "forkBlockedIO" or "forkInheritIO" to the library. This can be worked around using an MVar.
Changing the definition of loop produces Test program B:
> loop = block (print "alive") >> yield >> loop main = do tid <- forkIO loop > threadDelay 1 killThread tid
This prints "alive" twice before the killThread succeeds.
The paper demands that when the loop in Program A exits the scope of "block (print a)" that it check a queue of pending exceptions, see that it is non-empty, and raise the exception thrown by killThread. This can also be seen in "Figure 5. Transition Rules for Asynchronous Exceptions", where killThread should use throwTo to create an in-flight exception and exiting the scope of block in the presence of this in-flight exception should raise the exception.
The implementation in GHC sleeps the main thread at the killThread command, and it is awoken when the block is exited and to succeed in delivering the exception it must execute while the child is still in the unblocked state. But the child re-enters a blocked state too quickly in Program A, so killThread never succeeds. The change in Program B has the child "yield" when unblocked and this gives the main thread a change to succeed.
This trick using yield to make a safepopint was suggested by Simon Marlow:
> The window in 'unblock (return ())' is tiny, I'm not really surprised if > nothing ever gets through it. You might have more luck with 'unblock yield'.
It has been said on this mailing list thread that needing "yield" to program concurrently is a bug in the user's code and should be replaced by other mechanisms. In a complex program with many threads even the yield trick may not be enough to program reliably. Using
blockY io = do x <- block io yield return x unblockY io = unblock (yield >> io)
instead of 'block' and 'unblock' is at least a simple but unreliable workaround.
This would not be a fatal flaw if there were a simple reliable workaround. The best workaround is the following, which is anything but simple, and amount to re-implementing the mechanisms in the paper:
* When forking a thread: ** Create an (Chan Exception) (or (TChan Exception)) and pass it to the child thread. Call this the queue. ** Create a ((T)MVar queue) to indicate whether the thread is still alive or not and wrap the whole child action in a 'finally' to ensure this is set to the dead state when the child exits. (Alive iff the queue is in the MVar) ** To be sure the child thread is in the handler, use another MVar or re-use the "alive/dead" MVar to ensure the child is running before returning the ThreadId from the fork operation.
* Replace all uses throwTo/killThread by ** Test the alive/dead state of the thread, proceeding only on living threads ** Putting the Exception in the queue ** Call throwTo/killThread with the Exception
* The child thread must: ** implement safepoints by checking the queue ** At each 'block' to 'unblock' transition the child ought to add a safepoint
This machinery closely recovers the queue of exceptions and the semantics described in the paper. When a thread has died the queue is removed from the alive/dead MVar and can be garbage collected, along with any pending exceptions for the dead thread.
To ensure the above semantics are not violated by the caller, the now fork operation should not return an exposed ThreadId, but should return some proxy type or operation to ensure only the proper throwto/killThread are peformed.
Next I will demonstrate how the paper's achievement of simple and clear error handling code is lost.
The child thread is where the workaround becomes a nightmare. It is impossible to test the current 'block' versus 'unblock' state. So the only time the child is certain that a 'block' to 'unblock' transition has occurred is when it is explicit:
> unblock ( .... block ( ... ) <*> .... )
The <*> is obviously such a transition and ought to have a safepoint added.
But is only because the block had the explicit unblock around it. If there were a function such as
> atomicUpdateFoo = block (...) <?>
then there is no way to know if the <?> should be a safepoint. Therefore atomicUpdateFoo needs documentation so that the caller knows to add a safepoint in the case it is called from the unblocked case.
In a more complex example:
> atomicUpdateFooBar = block (...) <1?> block (...) <2?>
the caller cannot add <1?> itself, and must therefore pass in code 'safe :: IO ()' to all such functions:
> atomicUpdateFooBar safe = block (...) >> trans >> block (...) >>
trans where 'trans' is "return ()" or "checkChan queue" depending on whether the the caller knows it is in a 'block' or 'unblock' state.
If the parent never needed to use 'throwTo'/'killThread' then the child never needs 'block'/'unblock' and the nightmare is avoided. But if the child uses any operation which can block such as 'accept' or 'getChar' or 'takeMVar' then the parent cannot deliver the signal without waking up the child with a 'throwTo' which quickly leads to either an overwhelming use of 'catch' in the child (like other languages) or use of 'block' and the fragile workaround I just described.
As an aside, if there was a "isBlockedThreadState" operation, then it would be possible to create a better client workaround:
checkChan queue x = do isBlocked <- isBlockedThreadState -- perhaps needing =<< myThreadId case isBlocked of True -> return x False -> raiseQueue queue x raiseQueue queue x = do hasException <- isEmptyChan queue case hasException of False -> return x True -> readChan >>= raise checkedBlock queue io = do x <- block io checkChan queue x checkedUnblock queue io = checkChan queue >> io
If that existed then the client should use checkedBlock and checkedUnblock instead of block and unblock. The net effect of this workaround is to implement the queue described in the original paper and wrap the forkIO/throwTo/block/unblock operations with new ones that implement the semantics described in the section 7.2 of the paper.
We can try to implement isBlockedThread state by maintaining a stack of block/unblock operations:
checkedBlock stack queue io = do x <- bracket_ (modifyIORef stack (True:)) (modifyIORef stack tail) (io) checkChan queue x checkedUnblock stack queue io = do checkChan queue () x <- bracket_ (modifyIORef stack (False:)) (modifyIORef stack tail) (io) isBlockedThread stack = liftM head (readIORef stack)
Where "stack :: IORef [Bool]" is created and passed to the child by the modified fork operation. The 'stack' above grows without bound during recursion and is not tail recursive, so the stack management should be changed to the algorithm in the paper... in which case one really has recreated the paper's implementation inside Haskell.
The only alternatives are to
* write code that may hang forever at a throwTo/killThread * never use throwTo or killThread making block/unblock irrelevant * never use 'block' (and 'unblock') * only write children with explicit "(unblock ... block (... block () ...) >>= raiseQueue queue >>= ...." where the different between checked and unchecked block is obvious and explicit. * Track and propagate the blocked state as in atomicUpdateFooBar.
This problem with throwTo/block was not obvious until I stated writing small programs to test the implementation in GHC and thus discovered that the paper's description had mislead me.
The key problem is, at least in the presence of block/unblock, that Exceptions are never reliably delivered. And this is not a theoretical case, but can cause a hang in something as innocuous as "program A" given above.
-- Chris Kuklewicz