Concurrency demos/Zeta

From HaskellWiki
< Concurrency demos
Revision as of 10:58, 22 June 2021 by Atravers (talk | contribs) (Parallel strategies for initial "simple example")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

A simple example of parallelism in Haskell

This little piece of code computes an approximation of Riemann's zeta function, balancing the work to be done between N threads.

import Control.Parallel.Strategies
import Control.Monad
import Data.Complex
import System.Environment

-- Return the list of the terms of the zeta function for the given range.
-- We don't sum the terms here but let the main thread sum the lists returned
-- by all the other threads so as to avoid accumulating rounding imprecisions.
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]

cut :: (Integral a) => (a, a) -> a -> [(a, a)]
cut (x,y) n = (x, x + mine - 1) : cut' (x + mine) size (y - mine)
 where
  (size, modulo)   = y `divMod` n
  mine             = size + modulo

  cut' _ _ 0       = []
  cut' x' size' n' = (x', x' + size' - 1) : cut' (x' + size') size' (n' - size') 

getParams :: IO (Int, Int, Complex Double)
getParams = do
  argv <- getArgs
  case argv of
    (t:n:s:[]) -> return (read t, read n, read s)
    _          -> error "usage: zeta <nthreads> <boundary> <s>"

main :: IO ()
main = do
  (t, n, s) <- getParams
  let ranges    = cut (1, n) t
      results   = map (zetaRange s) ranges `using` parList rnf
  putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
  print (sum (concat results))

With concurrent threads

Replace:

import Control.Parallel.Strategies

with:

import Control.Concurrent
import Control.Concurrent.MVar

Using mutex-variables (MVar)

  • Replace main with:
main :: IO ()
main = do
  (t, n, s) <- getParams
  childs    <- mapM (thread s) (cut (1, n) t)
  results   <- mapM takeMVar childs
  print (sum (concat results))
 where
  thread s range = do
    putStrLn ("Starting thread for range " ++ show range)
    mvar <- newEmptyMVar
    forkIO (do let zs = zetaRange s range
               when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list
    return mvar

Using a channel (Chan)

  • Replace main with:
main :: IO ()
main = do
  (t, n, s) <- getParams
  chan      <- newChan
  terms     <- getChanContents chan

  forM_ (cut (1,n) t) $ thread chan s

  let wait xs i result
        | i >= t    = print result  -- Done.
        | otherwise = case xs of
           Nothing : rest -> wait rest (i + 1) result
           Just x  : rest -> wait rest i (result + x)
           _              -> error "missing thread termination marker"

  wait terms 0 0
 where
  thread chan s range = do
    putStrLn ("Starting thread for range " ++ show range)
    forkIO $ do
      mapM_ (writeChan chan . Just) (zetaRange s range)
      writeChan chan Nothing

Benchmarks

Here's a simple script for runing all three variants, with four threads using 1, 2, and 3 OS threads.

a="$1"
[ -z "$a" ] && { echo Usage: "$0" variant_name; exit 1; }
for n in 1 2 3; do 
    echo -n $a $n ' '; 
    /usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null;
done;
echo;

Results on a dual Opteron system:

  • strat - using strategies:
 strat 1  8.82u 0.07s 0:08.93e 99%CPU
 strat 2  4.42u 0.06s 0:03.82e 117%CPU
 strat 3  5.01u 0.08s 0:04.46e 114%CPU

  • mvar - using mutex-variables:
 mvar 1  2.52u 0.06s 0:02.63e 98%CPU
 mvar 2  2.69u 0.05s 0:02.10e 130%CPU
 mvar 3  2.85u 0.07s 0:02.30e 126%CPU

  • chan - using channels:
 chan 1  11.75u 4.06s 0:15.91e 99%CPU
 chan 2  9.81u 0.05s 0:09.48e 104%CPU
 chan 3  10.96u 3.25s 0:12.24e 116%CPU