Difference between revisions of "MapReduce as a monad"

From HaskellWiki
Jump to navigation Jump to search
m (Minor formatting changes)
 
(7 intermediate revisions by 2 users not shown)
Line 4: Line 4:
   
 
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach.
 
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach.
I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad.
+
I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
   
  +
Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a <hask>MapReduceT</hask> monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the <hask>State</hask> monad is just the MapReduce type of the monad <hask>Hom a</hask> of maps <hask>h -> a</hask> where <hask>h</hask> is some fixed type.
The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
 
   
==Why a monad?==
+
==Initial Approach==
  +
  +
===Why a monad?===
   
 
What the monadic implementation lets us do is the following:
 
What the monadic implementation lets us do is the following:
Line 18: Line 20:
 
<hask>
 
<hask>
 
... >>= wrapMR mapper >>= wrapMR reducer >>= ...
 
... >>= wrapMR mapper >>= wrapMR reducer >>= ...
</hask></br>
+
</hask><br/>
   
==Details==
+
===Details===
 
Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here.
 
Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here.
   
===Generalised mappers / reducers===
+
====Generalised mappers / reducers====
 
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/>
 
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/>
 
<hask>
 
<hask>
Line 30: Line 32:
 
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values.
 
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values.
   
===Generalised Monad===
+
====Generalised Monad====
 
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.
 
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.
   
Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>, where <hask>s, s'</hask> are data types and <hask>a, b</hask> are key types.
+
Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>.
   
 
Generalise the monadic <hask>bind</hask> operation to:<br/>
 
Generalise the monadic <hask>bind</hask> operation to:<br/>
Line 39: Line 41:
 
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
 
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
 
</hask><br/>
 
</hask><br/>
  +
  +
See [http://blog.sigfpe.com/2009/02/beyond-monads.html Parametrized monads].
  +
 
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/>
 
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/>
 
<hask>
 
<hask>
Line 44: Line 49:
 
</hask>
 
</hask>
   
===Implementation details===
+
====Implementation details====
   
<hask>
+
<haskell>
 
class Monad' m where
 
class Monad' m where
 
return :: a -> m s x s a
 
return :: a -> m s x s a
Line 63: Line 68:
 
in
 
in
 
concat $ map (\g' -> runMR g' fs) gs)
 
concat $ map (\g' -> runMR g' fs) gs)
</hask><br/>
+
</haskell>
 
The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function.
 
The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function.
   
 
Now we can write a wrapper function<br/>
 
Now we can write a wrapper function<br/>
<hask>
+
<haskell>
 
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
 
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
 
wrapMR f = (\k -> MR (g k))
 
wrapMR f = (\k -> MR (g k))
 
where
 
where
 
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
 
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
</hask><br/>
+
</haskell>
 
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/>
 
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/>
<hask>
+
<haskell>
 
mapReduce :: [String] -> [(String,Int)]
 
mapReduce :: [String] -> [(String,Int)]
 
mapReduce state = runMapReduce mr state
 
mapReduce state = runMapReduce mr state
 
where
 
where
 
mr = return () >>= wrapMR mapper >>= wrapMR reducer
 
mr = return () >>= wrapMR mapper >>= wrapMR reducer
</hask></br>
+
</haskell>
 
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).
 
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).
  +
  +
==The monad transformer approach==
  +
  +
Define the monad transformer type <hask>MapReduceT</hask> by:<br/>
  +
  +
<haskell>
  +
newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}
  +
</haskell>
  +
  +
with operations<br/>
  +
  +
<haskell>
  +
lift :: (Monad m) => m t -> MapReduceT m t t
  +
lift x = MR (const x)
  +
  +
return :: (Monad m) => t -> MapReduceT m t t
  +
return x = lift (return x)
  +
  +
bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
  +
bind p f g = MR (\ xs -> ps xs >>= gs xs)
  +
where
  +
ps xs = (f >>> p) -< xs
  +
gs xs x = (f >>> g x) -< xs
  +
</haskell>
  +
  +
where <hask> >>> </hask> and <hask> -< </hask> are the obvious arrow operations on <hask>MapeduceT</hask> types.
  +
  +
Then we show in [http://media.jpembeddedsolutions.com/pdf/mrmonad.pdf this paper] that:
  +
* <hask>MapReduce = MapReduceT []</hask> with <hask> (>>=) = bind nub</hask>
  +
* For a suitable choice of <hask>p</hask> the standard <hask>State</hask> monad is <hask>MapReduceT Hom</hask> where
  +
  +
:<haskell>
  +
data Hom a b = H {run :: (a -> b)}
  +
  +
return x = H (const x)
  +
f >>= g = H (\ x -> g' (f' x) x)
  +
where
  +
f' = run f
  +
g' x y = run (g x) y
  +
</haskell>
   
 
==Future Directions==
 
==Future Directions==
   
*My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. This is clearly where work should go next.
+
*My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see [[MapReduce_with_CloudHaskell|here]].
 
*Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.
 
*Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.
   
 
I would be eager for collaborative working on taking this forward.
 
I would be eager for collaborative working on taking this forward.
   
[[User:Julianporter|julianporter]] 18:32, 2 April 2011 (UTC)
+
[[User:Julianporter|julianporter]] 18:10, 31 October 2011 (UTC)

Latest revision as of 03:27, 9 April 2021


Introduction

MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.

Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a MapReduceT monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the State monad is just the MapReduce type of the monad Hom a of maps h -> a where h is some fixed type.

Initial Approach

Why a monad?

What the monadic implementation lets us do is the following:

  • Map and reduce look the same.
  • You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
  • All of the guts of MapReduce are hidden in the monad's bind function
  • The implementation is naturally parallel
  • Making a MapReduce program is trivial:

... >>= wrapMR mapper >>= wrapMR reducer >>= ...

Details

Full details of the implementation and sample code can be found here. I'll just give highlights here.

Generalised mappers / reducers

One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature
a -> ([(s,a)] -> [(s',b)])
where s and s' are data types and a and b are key values.

Generalised Monad

Now, this is suggestive of a monad, but we can't use a monad per se, because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.

Let m be a Monad', a type with four parameters: m s a s' b.

Generalise the monadic bind operation to:
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c

See Parametrized monads.

Then clearly the generalised mapper/reducer above can be written as a Monad', meaning that we can write MapReduce as
... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...

Implementation details

class Monad' m where
        return :: a -> m s x s a
        (>>=)  :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c

newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }

retMR :: a -> MapReduce s x s a
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])

bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
bindMR f g = MR (\s ->
        let
                fs = runMR f s
                gs = P.map g $ nub $ snd <$> fs
        in
        concat $ map (\g' -> runMR g' fs) gs)

The key point here is that P.map is a parallel version of the simple map function.

Now we can write a wrapper function

wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
wrapMR f = (\k -> MR (g k))
        where
        g k ss = f $ fst <$> filter (\s -> k == snd s) ss

which takes a conventional mapper / reducer and wraps it in the Monad'. Note that this means that the mapper / reducer functions do not need to know anything about the way MapReduce is implemented. So a standard MapReduce job becomes

mapReduce :: [String] -> [(String,Int)]
mapReduce state = runMapReduce mr state
        where
        mr = return () >>= wrapMR mapper >>= wrapMR reducer

I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).

The monad transformer approach

Define the monad transformer type MapReduceT by:

newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}

with operations

lift :: (Monad m) => m t -> MapReduceT m t t
lift x = MR (const x)

return :: (Monad m) => t -> MapReduceT m t t
return x = lift (return x)

bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
bind p f g = MR (\ xs -> ps xs >>= gs xs)
        where
            ps xs = (f >>> p) -< xs
            gs xs x = (f >>> g x) -< xs

where >>> and -< are the obvious arrow operations on MapeduceT types.

Then we show in this paper that:

  • MapReduce = MapReduceT [] with (>>=) = bind nub
  • For a suitable choice of p the standard State monad is MapReduceT Hom where
data Hom a b = H {run :: (a ->  b)}

return x = H (const x)
f >>= g = H (\ x -> g' (f' x) x)
    where 
        f' = run f 
        g' x y = run (g x) y

Future Directions

  • My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see here.
  • Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.

I would be eager for collaborative working on taking this forward.

julianporter 18:10, 31 October 2011 (UTC)