Difference between revisions of "MapReduce as a monad"
Julianporter (talk | contribs) |
m (Minor formatting changes) |
||
(6 intermediate revisions by 2 users not shown) | |||
Line 4: | Line 4: | ||
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. |
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. |
||
− | I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. |
+ | I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions. |
+ | Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a <hask>MapReduceT</hask> monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the <hask>State</hask> monad is just the MapReduce type of the monad <hask>Hom a</hask> of maps <hask>h -> a</hask> where <hask>h</hask> is some fixed type. |
||
− | The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions. |
||
− | == |
+ | ==Initial Approach== |
+ | |||
+ | ===Why a monad?=== |
||
What the monadic implementation lets us do is the following: |
What the monadic implementation lets us do is the following: |
||
Line 20: | Line 22: | ||
</hask><br/> |
</hask><br/> |
||
− | ==Details== |
+ | ===Details=== |
Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here. |
Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here. |
||
− | ===Generalised mappers / reducers=== |
+ | ====Generalised mappers / reducers==== |
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/> |
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/> |
||
<hask> |
<hask> |
||
Line 30: | Line 32: | ||
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values. |
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values. |
||
− | ===Generalised Monad=== |
+ | ====Generalised Monad==== |
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following. |
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following. |
||
− | Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask> |
+ | Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>. |
Generalise the monadic <hask>bind</hask> operation to:<br/> |
Generalise the monadic <hask>bind</hask> operation to:<br/> |
||
Line 39: | Line 41: | ||
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c |
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c |
||
</hask><br/> |
</hask><br/> |
||
+ | |||
+ | See [http://blog.sigfpe.com/2009/02/beyond-monads.html Parametrized monads]. |
||
+ | |||
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/> |
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/> |
||
<hask> |
<hask> |
||
Line 44: | Line 49: | ||
</hask> |
</hask> |
||
− | ===Implementation details=== |
+ | ====Implementation details==== |
− | < |
+ | <haskell> |
class Monad' m where |
class Monad' m where |
||
return :: a -> m s x s a |
return :: a -> m s x s a |
||
Line 63: | Line 68: | ||
in |
in |
||
concat $ map (\g' -> runMR g' fs) gs) |
concat $ map (\g' -> runMR g' fs) gs) |
||
− | </ |
+ | </haskell> |
The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function. |
The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function. |
||
Now we can write a wrapper function<br/> |
Now we can write a wrapper function<br/> |
||
− | < |
+ | <haskell> |
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b) |
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b) |
||
wrapMR f = (\k -> MR (g k)) |
wrapMR f = (\k -> MR (g k)) |
||
where |
where |
||
g k ss = f $ fst <$> filter (\s -> k == snd s) ss |
g k ss = f $ fst <$> filter (\s -> k == snd s) ss |
||
− | </ |
+ | </haskell> |
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/> |
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/> |
||
− | < |
+ | <haskell> |
mapReduce :: [String] -> [(String,Int)] |
mapReduce :: [String] -> [(String,Int)] |
||
mapReduce state = runMapReduce mr state |
mapReduce state = runMapReduce mr state |
||
where |
where |
||
mr = return () >>= wrapMR mapper >>= wrapMR reducer |
mr = return () >>= wrapMR mapper >>= wrapMR reducer |
||
− | </ |
+ | </haskell> |
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above). |
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above). |
||
+ | |||
+ | ==The monad transformer approach== |
||
+ | |||
+ | Define the monad transformer type <hask>MapReduceT</hask> by:<br/> |
||
+ | |||
+ | <haskell> |
||
+ | newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u} |
||
+ | </haskell> |
||
+ | |||
+ | with operations<br/> |
||
+ | |||
+ | <haskell> |
||
+ | lift :: (Monad m) => m t -> MapReduceT m t t |
||
+ | lift x = MR (const x) |
||
+ | |||
+ | return :: (Monad m) => t -> MapReduceT m t t |
||
+ | return x = lift (return x) |
||
+ | |||
+ | bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v |
||
+ | bind p f g = MR (\ xs -> ps xs >>= gs xs) |
||
+ | where |
||
+ | ps xs = (f >>> p) -< xs |
||
+ | gs xs x = (f >>> g x) -< xs |
||
+ | </haskell> |
||
+ | |||
+ | where <hask> >>> </hask> and <hask> -< </hask> are the obvious arrow operations on <hask>MapeduceT</hask> types. |
||
+ | |||
+ | Then we show in [http://media.jpembeddedsolutions.com/pdf/mrmonad.pdf this paper] that: |
||
+ | * <hask>MapReduce = MapReduceT []</hask> with <hask> (>>=) = bind nub</hask> |
||
+ | * For a suitable choice of <hask>p</hask> the standard <hask>State</hask> monad is <hask>MapReduceT Hom</hask> where |
||
+ | |||
+ | :<haskell> |
||
+ | data Hom a b = H {run :: (a -> b)} |
||
+ | |||
+ | return x = H (const x) |
||
+ | f >>= g = H (\ x -> g' (f' x) x) |
||
+ | where |
||
+ | f' = run f |
||
+ | g' x y = run (g x) y |
||
+ | </haskell> |
||
==Future Directions== |
==Future Directions== |
||
− | *My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. |
+ | *My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see [[MapReduce_with_CloudHaskell|here]]. |
*Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster. |
*Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster. |
||
I would be eager for collaborative working on taking this forward. |
I would be eager for collaborative working on taking this forward. |
||
− | [[User:Julianporter|julianporter]] 18: |
+ | [[User:Julianporter|julianporter]] 18:10, 31 October 2011 (UTC) |
Latest revision as of 03:27, 9 April 2021
Introduction
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a MapReduceT
monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the State
monad is just the MapReduce type of the monad Hom a
of maps h -> a
where h
is some fixed type.
Initial Approach
Why a monad?
What the monadic implementation lets us do is the following:
- Map and reduce look the same.
- You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
- All of the guts of MapReduce are hidden in the monad's
bind
function - The implementation is naturally parallel
- Making a MapReduce program is trivial:
... >>= wrapMR mapper >>= wrapMR reducer >>= ...
Details
Full details of the implementation and sample code can be found here. I'll just give highlights here.
Generalised mappers / reducers
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature
a -> ([(s,a)] -> [(s',b)])
where s
and s'
are data types and a
and b
are key values.
Generalised Monad
Now, this is suggestive of a monad, but we can't use a monad per se, because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.
Let m
be a Monad'
, a type with four parameters: m s a s' b
.
Generalise the monadic bind
operation to:
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
See Parametrized monads.
Then clearly the generalised mapper/reducer above can be written as a Monad'
, meaning that we can write MapReduce as
... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...
Implementation details
class Monad' m where
return :: a -> m s x s a
(>>=) :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }
retMR :: a -> MapReduce s x s a
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])
bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
bindMR f g = MR (\s ->
let
fs = runMR f s
gs = P.map g $ nub $ snd <$> fs
in
concat $ map (\g' -> runMR g' fs) gs)
The key point here is that P.map
is a parallel version of the simple map
function.
Now we can write a wrapper function
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
wrapMR f = (\k -> MR (g k))
where
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
which takes a conventional mapper / reducer and wraps it in the Monad'
. Note that this means that the mapper / reducer functions do not need to know anything about the way MapReduce is implemented. So a standard MapReduce job becomes
mapReduce :: [String] -> [(String,Int)]
mapReduce state = runMapReduce mr state
where
mr = return () >>= wrapMR mapper >>= wrapMR reducer
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).
The monad transformer approach
Define the monad transformer type MapReduceT
by:
newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}
with operations
lift :: (Monad m) => m t -> MapReduceT m t t
lift x = MR (const x)
return :: (Monad m) => t -> MapReduceT m t t
return x = lift (return x)
bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
bind p f g = MR (\ xs -> ps xs >>= gs xs)
where
ps xs = (f >>> p) -< xs
gs xs x = (f >>> g x) -< xs
where >>>
and -<
are the obvious arrow operations on MapeduceT
types.
Then we show in this paper that:
MapReduce = MapReduceT []
with(>>=) = bind nub
- For a suitable choice of
p
the standardState
monad isMapReduceT Hom
where
data Hom a b = H {run :: (a -> b)} return x = H (const x) f >>= g = H (\ x -> g' (f' x) x) where f' = run f g' x y = run (g x) y
Future Directions
- My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see here.
- Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.
I would be eager for collaborative working on taking this forward.
julianporter 18:10, 31 October 2011 (UTC)