# MapReduce as a monad

## Introduction

MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.

Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a `MapReduceT`

monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the `State`

monad is just the MapReduce type of the monad `Hom a`

of maps `h -> a`

where `h`

is some fixed type.

## Initial Approach

### Why a monad?

What the monadic implementation lets us do is the following:

- Map and reduce look the same.
- You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
- All of the guts of MapReduce are hidden in the monad's
`bind`

function - The implementation is naturally parallel
- Making a MapReduce program is trivial:

`... >>= wrapMR mapper >>= wrapMR reducer >>= ...`

### Details

Full details of the implementation and sample code can be found here. I'll just give highlights here.

#### Generalised mappers / reducers

One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature

`a -> ([(s,a)] -> [(s',b)])`

where `s`

and `s'`

are data types and `a`

and `b`

are key values.

#### Generalised Monad

Now, this is suggestive of a monad, but we can't use a monad *per se*, because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.

Let `m`

be a `Monad'`

, a type with four parameters: `m s a s' b`

.

Generalise the monadic `bind`

operation to:

`m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c`

See Parametrized monads.

Then clearly the generalised mapper/reducer above can be written as a `Monad'`

, meaning that we can write MapReduce as

`... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...`

#### Implementation details

```
class Monad' m where
return :: a -> m s x s a
(>>=) :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }
retMR :: a -> MapReduce s x s a
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])
bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
bindMR f g = MR (\s ->
let
fs = runMR f s
gs = P.map g $ nub $ snd <$> fs
in
concat $ map (\g' -> runMR g' fs) gs)
```

The key point here is that `P.map`

is a parallel version of the simple `map`

function.

Now we can write a wrapper function

```
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
wrapMR f = (\k -> MR (g k))
where
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
```

which takes a conventional mapper / reducer and wraps it in the `Monad'`

. Note that this means that the mapper / reducer functions *do not need to know anything about the way MapReduce is implemented*. So a standard MapReduce job becomes

```
mapReduce :: [String] -> [(String,Int)]
mapReduce state = runMapReduce mr state
where
mr = return () >>= wrapMR mapper >>= wrapMR reducer
```

I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).

## The monad transformer approach

Define the monad transformer type `MapReduceT`

by:

```
newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}
```

with operations

```
lift :: (Monad m) => m t -> MapReduceT m t t
lift x = MR (const x)
return :: (Monad m) => t -> MapReduceT m t t
return x = lift (return x)
bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
bind p f g = MR (\ xs -> ps xs >>= gs xs)
where
ps xs = (f >>> p) -< xs
gs xs x = (f >>> g x) -< xs
```

where `>>>`

and `-<`

are the obvious arrow operations on `MapeduceT`

types.

Then we show in this paper that:

`MapReduce = MapReduceT []`

with`(>>=) = bind nub`

- For a suitable choice of
`p`

the standard`State`

monad is`MapReduceT Hom`

where

data Hom a b = H {run :: (a -> b)} return x = H (const x) f >>= g = H (\ x -> g' (f' x) x) where f' = run f g' x y = run (g x) y

## Future Directions

- My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see here.
- Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.

I would be eager for collaborative working on taking this forward.

julianporter 18:10, 31 October 2011 (UTC)