Personal tools

MapReduce as a monad

From HaskellWiki

(Difference between revisions)
Jump to: navigation, search
(Generalised Monad)
(New material about monad transformer)
Line 1: Line 1:
MapReduce is a general technique for massively parallel programming developed by Google.  It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach.
I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad.
The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental.  Moreover, it is necessary to write HADOOP-specific code into mappers and reducers.  My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
==Why a monad?==
What the monadic implementation lets us do is the following:
*Map and reduce look the same.
*You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
*All of the guts of MapReduce are hidden in the monad's <hask>bind</hask> function
*The implementation is naturally parallel
*Making a MapReduce program is trivial:<br/>
... >>= wrapMR mapper >>= wrapMR reducer >>= ...
Full details of the implementation and sample code can be found [ here].  I'll just give highlights here.
===Generalised mappers / reducers===
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/>
a -> ([(s,a)] -> [(s',b)])
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values. 
===Generalised Monad===
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately.  Therefore we do the following. 
Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>.
Generalise the monadic <hask>bind</hask> operation to:<br/>
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
See [ Parametrized monads].
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/>
... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...
===Implementation details===
class Monad' m where
        return :: a -> m s x s a
        (>>=)  :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }
retMR :: a -> MapReduce s x s a
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])
bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
bindMR f g = MR (\s ->
                fs = runMR f s
                gs = g $ nub $ snd <$> fs
        concat $ map (\g' -> runMR g' fs) gs)
The key point here is that <hask></hask> is a parallel version of the simple <hask>map</hask> function.
Now we can write a wrapper function<br/>
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
wrapMR f = (\k -> MR (g k))
        g k ss = f $ fst <$> filter (\s -> k == snd s) ss
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>.  Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''.  So a standard MapReduce job becomes<br/>
mapReduce :: [String] -> [(String,Int)]
mapReduce state = runMapReduce mr state
        mr = return () >>= wrapMR mapper >>= wrapMR reducer
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).
==Future Directions==
*My code so far runs concurrently and in multiple threads within a single OS image.  It won't work on clustered systems.  This is clearly where work should go next.
*Currently all of the data is sent to all of the mappers / reducers at each iteration.  This is okay on a single machine, but may be prohibitive on a cluster.
I would be eager for collaborative working on taking this forward.
[[User:Julianporter|julianporter]] 18:32, 2 April 2011 (UTC)

Revision as of 18:06, 31 October 2011