Background thread example

From HaskellWiki
Jump to navigation Jump to search

By Chris Kuklewicz, public domain.


What is interesting to me about threads in Haskell is how easy it is to send STM or IO actions and closures between threads.

import Control.Monad
import Control.Concurrent
import Control.Exception as E
import Control.Concurrent.STM

Work is the IO action to be sent to a background thread.

type Work = IO ()

SendWork's type is a function that takes a bit of work and sends it to a background thread.

type SendWork = Work -> STM ()

Spawn workers creates i background threads as a pool to execute the work that is sent via the returned function.

spawnWorkers :: Int -> IO (SendWork,IO ())
spawnWorkers i | i <= 0 = error "Need positive number of workers"
               | otherwise = do

Create the channel through which the jobs are sent, and a counter of running threads.

  workChan <- atomically newTChan
  runCount <- atomically (newTVar i)
Define and start the threads.  These choose to stop gracefully if the
job throws an exception.
  let stop = atomically (writeTVar runCount . pred =<< readTVar runCount)
      die e = do id <- myThreadId
                 print ("Thread "++show id++" died with exception "++show (e :: ErrorCall))
                 stop
      work = do mJob <- atomically (readTChan workChan)
                case mJob of Nothing -> stop
                             Just job -> E.catch job die >> work
  replicateM_ i (forkIO work)
Create a convenience command to stop the threads (which blocks)
  let stopCommand = do atomically (replicateM_ i (writeTChan workChan Nothing))
                       atomically (do running <- readTVar runCount
                                      when (running>0) retry)

Send a closure to submit jobs; this hides the channel so you can't read from it.

  return (writeTChan workChan . Just,stopCommand)

A toy command

 printJob i = do threadDelay (i*1000)
                 id <- myThreadId
                 print ("printJob took "++show i++" ms in thread "++show id)
 demo = do
  (submit,stop) <- spawnWorkers 4
  mapM_ (atomically . submit . printJob) (take 40 (cycle [100,200,300,400]))
  atomically $ submit (error "Boom")
  stop

An example run:

"printJob took 100 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 400 ms in thread ThreadId 96"
"printJob took 100 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 96"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 100 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 95"
"printJob took 200 ms in thread ThreadId 97"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 100 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 94"
"printJob took 400 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 400 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 100 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 94"
"Thread ThreadId 94 died with exception Boom"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 95"