# MapReduce as a monad

### From HaskellWiki

## Contents |

## 1 Introduction

MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad.

The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.

## 2 Why a monad?

What the monadic implementation lets us do is the following:

- Map and reduce look the same.
- You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
- All of the guts of MapReduce are hidden in the monad's functionbind
- The implementation is naturally parallel
- Making a MapReduce program is trivial:

## 3 Details

Full details of the implementation and sample code can be found here. I'll just give highlights here.

### 3.1 Generalised mappers / reducers

One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature

where

### 3.2 Generalised Monad

Now, this is suggestive of a monad, but we can't use a monad *per se*, because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.

Then clearly the generalised mapper/reducer above can be written as a

### 3.3 Implementation details

return :: a -> m s x s a

(>>=) :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c

newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }

retMR :: a -> MapReduce s x s a

retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])

bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c

bindMR f g = MR (\s ->

let

fs = runMR f s

gs = P.map g $ nub $ snd <$> fs

in

concat $ map (\g' -> runMR g' fs) gs)

The key point here is that

Now we can write a wrapper function

wrapMR f = (\k -> MR (g k))

where

g k ss = f $ fst <$> filter (\s -> k == snd s) ss

which takes a conventional mapper / reducer and wraps it in the

*do not need to know anything about the way MapReduce is implemented*. So a standard MapReduce job becomes

mapReduce state = runMapReduce mr state

where

mr = return () >>= wrapMR mapper >>= wrapMR reducer

I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).

## 4 Future Directions

- My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. This is clearly where work should go next.
- Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.

I would be eager for collaborative working on taking this forward.

julianporter 18:32, 2 April 2011 (UTC)