Concurrency demos/Zeta: Difference between revisions
No edit summary |
(Parallel strategies for initial "simple example") |
||
(16 intermediate revisions by 6 users not shown) | |||
Line 1: | Line 1: | ||
__TOC__ | __TOC__ | ||
[[Category:Code]] | |||
== A simple example of parallelism in Haskell == | == A simple example of parallelism in Haskell == | ||
Line 6: | Line 6: | ||
<haskell> | <haskell> | ||
import Control. | import Control.Parallel.Strategies | ||
import Control.Monad | import Control.Monad | ||
import Data.Complex | import Data.Complex | ||
import System.Environment | import System.Environment | ||
-- Return the list of the terms of the zeta function for the given range. | |||
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> a | -- We don't sum the terms here but let the main thread sum the lists returned | ||
zetaRange s (x,y) = | -- by all the other threads so as to avoid accumulating rounding imprecisions. | ||
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a] | |||
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ] | |||
cut :: (Integral a) => (a, a) -> a -> [(a, a)] | cut :: (Integral a) => (a, a) -> a -> [(a, a)] | ||
Line 35: | Line 36: | ||
main = do | main = do | ||
(t, n, s) <- getParams | (t, n, s) <- getParams | ||
childs <- | let ranges = cut (1, n) t | ||
results = map (zetaRange s) ranges `using` parList rnf | |||
putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ] | |||
print (sum (concat results)) | |||
</haskell> | |||
== With concurrent threads == | |||
Replace: | |||
<haskell> | |||
import Control.Parallel.Strategies | |||
</haskell> | |||
with: | |||
<haskell> | |||
import Control.Concurrent | |||
import Control.Concurrent.MVar | |||
</haskell> | |||
=== Using mutex-variables (<code>MVar</code>) === | |||
* Replace <code>main</code> with: | |||
:<haskell> | |||
main :: IO () | |||
main = do | |||
(t, n, s) <- getParams | |||
childs <- mapM (thread s) (cut (1, n) t) | |||
results <- mapM takeMVar childs | results <- mapM takeMVar childs | ||
print (sum results) | print (sum (concat results)) | ||
where | where | ||
thread s range = do | thread s range = do | ||
putStrLn ("Starting thread for range " ++ show range) | putStrLn ("Starting thread for range " ++ show range) | ||
mvar <- newEmptyMVar | mvar <- newEmptyMVar | ||
forkIO ( | forkIO (do let zs = zetaRange s range | ||
when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list | |||
return mvar | return mvar | ||
</haskell> | </haskell> | ||
=== Using a channel (<code>Chan</code>) === | |||
* Replace <code>main</code> with: | |||
:<haskell> | |||
main :: IO () | |||
main = do | |||
(t, n, s) <- getParams | |||
chan <- newChan | |||
terms <- getChanContents chan | |||
forM_ (cut (1,n) t) $ thread chan s | |||
let wait xs i result | |||
| i >= t = print result -- Done. | |||
| otherwise = case xs of | |||
Nothing : rest -> wait rest (i + 1) result | |||
Just x : rest -> wait rest i (result + x) | |||
_ -> error "missing thread termination marker" | |||
wait terms 0 0 | |||
where | |||
thread chan s range = do | |||
putStrLn ("Starting thread for range " ++ show range) | |||
forkIO $ do | |||
mapM_ (writeChan chan . Just) (zetaRange s range) | |||
writeChan chan Nothing | |||
</haskell> | |||
== Benchmarks == | == Benchmarks == | ||
Here's a simple script for runing all three variants, with four threads using 1, 2, and 3 OS threads. | |||
<pre> | |||
a="$1" | |||
[ -z "$a" ] && { echo Usage: "$0" variant_name; exit 1; } | |||
for n in 1 2 3; do | |||
echo -n $a $n ' '; | |||
/usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null; | |||
done; | |||
echo; | |||
</pre> | |||
Results on a dual Opteron system: | |||
* <code>strat</code> - using strategies: | |||
:{| | |||
|<pre> | |||
strat 1 8.82u 0.07s 0:08.93e 99%CPU | |||
strat 2 4.42u 0.06s 0:03.82e 117%CPU | |||
strat 3 5.01u 0.08s 0:04.46e 114%CPU | |||
</pre> | |||
|} | |||
* <code>mvar</code> - using mutex-variables: | |||
:{| | |||
|<pre> | |||
mvar 1 2.52u 0.06s 0:02.63e 98%CPU | |||
mvar 2 2.69u 0.05s 0:02.10e 130%CPU | |||
mvar 3 2.85u 0.07s 0:02.30e 126%CPU | |||
</pre> | |||
|} | |||
* <code>chan</code> - using channels: | |||
:{| | |||
|<pre> | |||
chan 1 11.75u 4.06s 0:15.91e 99%CPU | |||
chan 2 9.81u 0.05s 0:09.48e 104%CPU | |||
chan 3 10.96u 3.25s 0:12.24e 116%CPU | |||
</pre> | |||
|} |
Latest revision as of 10:58, 22 June 2021
A simple example of parallelism in Haskell
This little piece of code computes an approximation of Riemann's zeta function, balancing the work to be done between N threads.
import Control.Parallel.Strategies
import Control.Monad
import Data.Complex
import System.Environment
-- Return the list of the terms of the zeta function for the given range.
-- We don't sum the terms here but let the main thread sum the lists returned
-- by all the other threads so as to avoid accumulating rounding imprecisions.
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]
cut :: (Integral a) => (a, a) -> a -> [(a, a)]
cut (x,y) n = (x, x + mine - 1) : cut' (x + mine) size (y - mine)
where
(size, modulo) = y `divMod` n
mine = size + modulo
cut' _ _ 0 = []
cut' x' size' n' = (x', x' + size' - 1) : cut' (x' + size') size' (n' - size')
getParams :: IO (Int, Int, Complex Double)
getParams = do
argv <- getArgs
case argv of
(t:n:s:[]) -> return (read t, read n, read s)
_ -> error "usage: zeta <nthreads> <boundary> <s>"
main :: IO ()
main = do
(t, n, s) <- getParams
let ranges = cut (1, n) t
results = map (zetaRange s) ranges `using` parList rnf
putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
print (sum (concat results))
With concurrent threads
Replace:
import Control.Parallel.Strategies
with:
import Control.Concurrent
import Control.Concurrent.MVar
Using mutex-variables (MVar
)
- Replace
main
with:
main :: IO () main = do (t, n, s) <- getParams childs <- mapM (thread s) (cut (1, n) t) results <- mapM takeMVar childs print (sum (concat results)) where thread s range = do putStrLn ("Starting thread for range " ++ show range) mvar <- newEmptyMVar forkIO (do let zs = zetaRange s range when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list return mvar
Using a channel (Chan
)
- Replace
main
with:
main :: IO () main = do (t, n, s) <- getParams chan <- newChan terms <- getChanContents chan forM_ (cut (1,n) t) $ thread chan s let wait xs i result | i >= t = print result -- Done. | otherwise = case xs of Nothing : rest -> wait rest (i + 1) result Just x : rest -> wait rest i (result + x) _ -> error "missing thread termination marker" wait terms 0 0 where thread chan s range = do putStrLn ("Starting thread for range " ++ show range) forkIO $ do mapM_ (writeChan chan . Just) (zetaRange s range) writeChan chan Nothing
Benchmarks
Here's a simple script for runing all three variants, with four threads using 1, 2, and 3 OS threads.
a="$1" [ -z "$a" ] && { echo Usage: "$0" variant_name; exit 1; } for n in 1 2 3; do echo -n $a $n ' '; /usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null; done; echo;
Results on a dual Opteron system:
strat
- using strategies:
strat 1 8.82u 0.07s 0:08.93e 99%CPU strat 2 4.42u 0.06s 0:03.82e 117%CPU strat 3 5.01u 0.08s 0:04.46e 114%CPU
mvar
- using mutex-variables:
mvar 1 2.52u 0.06s 0:02.63e 98%CPU mvar 2 2.69u 0.05s 0:02.10e 130%CPU mvar 3 2.85u 0.07s 0:02.30e 126%CPU
chan
- using channels:
chan 1 11.75u 4.06s 0:15.91e 99%CPU chan 2 9.81u 0.05s 0:09.48e 104%CPU chan 3 10.96u 3.25s 0:12.24e 116%CPU