Difference between revisions of "Background thread example"

From HaskellWiki
Jump to navigation Jump to search
m
 
m (Fix for 'ambiguous type variable' failure in error handler)
 
(4 intermediate revisions by 3 users not shown)
Line 1: Line 1:
  +
By Chris Kuklewicz, public domain.
<haskell>
 
   
What is interesting to me about threads in Haskell is how easy it is
 
to send STM or IO actions and closures between threads.
 
   
 
What is interesting to me about threads in Haskell is how easy it is to send STM or IO actions and closures between threads.
> import Control.Monad
 
> import Control.Concurrent
 
> import Control.Exception as E
 
> import Control.Concurrent.STM
 
   
 
<haskell>
Work is the IO action to be sent to a background thread.
 
 
import Control.Monad
 
 
import Control.Concurrent
> type Work = IO ()
 
 
import Control.Exception as E
 
 
import Control.Concurrent.STM
SendWork is a function that takes a bit of work and sends it to a
 
  +
</haskell>
background thread.
 
 
Work is the IO action to be sent to a background thread.
 
  +
<haskell>
> type SendWork = Work -> STM ()
 
 
type Work = IO ()
 
  +
</haskell>
Spawn workers creates 'i' background threads as a pool to execute the
 
work that is sent via the returned function.
 
 
> spawnWorkers :: Int -> IO (SendWork,IO ())
 
> spawnWorkers i | i <= 0 = error "Need positive number of workers"
 
> | otherwise = do
 
 
Create the channel through which the jobs are sent, and a counter of
 
running threads.
 
 
> workChan <- atomically newTChan
 
> runCount <- atomically (newTVar i)
 
   
 
<hask>SendWork</hask>'s type is a function that takes a bit of work and sends it to a background thread.
  +
<haskell>
 
type SendWork = Work -> STM ()
  +
</haskell>
 
Spawn workers creates <hask>i</hask> background threads as a pool to execute the work that is sent via the returned function.
  +
<haskell>
 
spawnWorkers :: Int -> IO (SendWork,IO ())
 
spawnWorkers i | i <= 0 = error "Need positive number of workers"
 
| otherwise = do
  +
</haskell>
 
Create the channel through which the jobs are sent, and a counter of running threads.
  +
<haskell>
 
workChan <- atomically newTChan
 
runCount <- atomically (newTVar i)
  +
</haskell>
 
Define and start the threads. These choose to stop gracefully if the
 
Define and start the threads. These choose to stop gracefully if the
 
job throws an exception.
 
job throws an exception.
  +
<haskell>
 
> let stop = atomically (writeTVar runCount . pred =<< readTVar runCount)
+
let stop = atomically (writeTVar runCount . pred =<< readTVar runCount)
> die e = do id <- myThreadId
+
die e = do id <- myThreadId
> print ("Thread "++show id++" died with exception "++show e)
+
print ("Thread "++show id++" died with exception "++show (e :: ErrorCall))
> stop
+
stop
> work = do mJob <- atomically (readTChan workChan)
+
work = do mJob <- atomically (readTChan workChan)
> case mJob of Nothing -> stop
+
case mJob of Nothing -> stop
> Just job -> E.catch job die >> work
+
Just job -> E.catch job die >> work
> replicateM_ i (forkIO work)
+
replicateM_ i (forkIO work)
  +
</haskell>
 
 
Create a convenience command to stop the threads (which blocks)
 
Create a convenience command to stop the threads (which blocks)
  +
<haskell>
 
> let stopCommand = do atomically (replicateM_ i (writeTChan workChan Nothing))
+
let stopCommand = do atomically (replicateM_ i (writeTChan workChan Nothing))
> atomically (do running <- readTVar runCount
+
atomically (do running <- readTVar runCount
> when (running>0) retry)
+
when (running>0) retry)
  +
</haskell>
 
Send a closure to submit jobs; this hides the channel so you
+
Send a closure to submit jobs; this hides the channel so you can't read from it.
  +
<haskell>
can't read from it.
 
 
return (writeTChan workChan . Just,stopCommand)
 
  +
</haskell>
> return (writeTChan workChan . Just,stopCommand)
 
 
 
A toy command
 
A toy command
  +
<haskell>
 
> printJob i = do threadDelay (i*1000)
+
printJob i = do threadDelay (i*1000)
> id <- myThreadId
+
id <- myThreadId
> print ("printJob took "++show i++" ms in thread "++show id)
+
print ("printJob took "++show i++" ms in thread "++show id)
 
demo = do
 
 
(submit,stop) <- spawnWorkers 4
> demo = do
 
  +
mapM_ (atomically . submit . printJob) (take 40 (cycle [100,200,300,400]))
> (submit,stop) <- spawnWorkers 4
 
> mapM_ (atomically . submit . printJob) (take 40 (cycle [100,200,300,400]))
+
atomically $ submit (error "Boom")
 
stop
> atomically $ submit (error "Boom")
 
> stop
 
 
</haskell>
 
</haskell>
   
Line 110: Line 107:
 
"printJob took 300 ms in thread ThreadId 95"
 
"printJob took 300 ms in thread ThreadId 95"
 
</pre>
 
</pre>
  +
  +
[[Category:Code]]

Latest revision as of 13:20, 29 May 2009

By Chris Kuklewicz, public domain.


What is interesting to me about threads in Haskell is how easy it is to send STM or IO actions and closures between threads.

import Control.Monad
import Control.Concurrent
import Control.Exception as E
import Control.Concurrent.STM

Work is the IO action to be sent to a background thread.

type Work = IO ()

SendWork's type is a function that takes a bit of work and sends it to a background thread.

type SendWork = Work -> STM ()

Spawn workers creates i background threads as a pool to execute the work that is sent via the returned function.

spawnWorkers :: Int -> IO (SendWork,IO ())
spawnWorkers i | i <= 0 = error "Need positive number of workers"
               | otherwise = do

Create the channel through which the jobs are sent, and a counter of running threads.

  workChan <- atomically newTChan
  runCount <- atomically (newTVar i)
Define and start the threads.  These choose to stop gracefully if the
job throws an exception.
  let stop = atomically (writeTVar runCount . pred =<< readTVar runCount)
      die e = do id <- myThreadId
                 print ("Thread "++show id++" died with exception "++show (e :: ErrorCall))
                 stop
      work = do mJob <- atomically (readTChan workChan)
                case mJob of Nothing -> stop
                             Just job -> E.catch job die >> work
  replicateM_ i (forkIO work)
Create a convenience command to stop the threads (which blocks)
  let stopCommand = do atomically (replicateM_ i (writeTChan workChan Nothing))
                       atomically (do running <- readTVar runCount
                                      when (running>0) retry)

Send a closure to submit jobs; this hides the channel so you can't read from it.

  return (writeTChan workChan . Just,stopCommand)

A toy command

 printJob i = do threadDelay (i*1000)
                 id <- myThreadId
                 print ("printJob took "++show i++" ms in thread "++show id)
 demo = do
  (submit,stop) <- spawnWorkers 4
  mapM_ (atomically . submit . printJob) (take 40 (cycle [100,200,300,400]))
  atomically $ submit (error "Boom")
  stop

An example run:

"printJob took 100 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 400 ms in thread ThreadId 96"
"printJob took 100 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 200 ms in thread ThreadId 96"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 100 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 95"
"printJob took 200 ms in thread ThreadId 97"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 100 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 94"
"printJob took 400 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 100 ms in thread ThreadId 94"
"printJob took 400 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 100 ms in thread ThreadId 96"
"printJob took 300 ms in thread ThreadId 97"
"printJob took 200 ms in thread ThreadId 95"
"printJob took 400 ms in thread ThreadId 94"
"printJob took 100 ms in thread ThreadId 95"
"printJob took 300 ms in thread ThreadId 96"
"printJob took 200 ms in thread ThreadId 94"
"Thread ThreadId 94 died with exception Boom"
"printJob took 400 ms in thread ThreadId 97"
"printJob took 300 ms in thread ThreadId 95"