

Line 1: 
Line 1: 
−  [[Category:Applications]][[Category:Monad]][[Category:Libraries]][[Category:Concurrency]][[Category:Parallel]][[Category:Research]]
 
   
−  ==Introduction==
 
− 
 
−  MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach.
 
−  I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad.
 
− 
 
−  The standard implementation of MapReduce is the JAVAbased HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOPspecific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
 
− 
 
−  ==Why a monad?==
 
− 
 
−  What the monadic implementation lets us do is the following:
 
−  *Map and reduce look the same.
 
−  *You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
 
−  *All of the guts of MapReduce are hidden in the monad's <hask>bind</hask> function
 
−  *The implementation is naturally parallel
 
−  *Making a MapReduce program is trivial:<br/>
 
−  <hask>
 
−  ... >>= wrapMR mapper >>= wrapMR reducer >>= ...
 
−  </hask><br/>
 
− 
 
−  ==Details==
 
−  Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here.
 
− 
 
−  ===Generalised mappers / reducers===
 
−  One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/>
 
−  <hask>
 
−  a > ([(s,a)] > [(s',b)])
 
−  </hask><br/>
 
−  where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values.
 
− 
 
−  ===Generalised Monad===
 
−  Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.
 
− 
 
−  Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>.
 
− 
 
−  Generalise the monadic <hask>bind</hask> operation to:<br/>
 
−  <hask>
 
−  m s a s' b > ( b > m s' b s'' c ) > m s a s'' c
 
−  </hask><br/>
 
− 
 
−  See [http://blog.sigfpe.com/2009/02/beyondmonads.html Parametrized monads].
 
− 
 
−  Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/>
 
−  <hask>
 
−  ... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...
 
−  </hask>
 
− 
 
−  ===Implementation details===
 
− 
 
−  <hask>
 
−  class Monad' m where
 
−  return :: a > m s x s a
 
−  (>>=) :: (Eq b) => m s a s' b > ( b > m s' b s'' c ) > m s a s'' c
 
− 
 
−  newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] > [(s',b)]) }
 
− 
 
−  retMR :: a > MapReduce s x s a
 
−  retMR k = MR (\ss > [(s,k)  s < fst <$> ss])
 
− 
 
−  bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b > (b > MapReduce s' b s'' c) > MapReduce s a s'' c
 
−  bindMR f g = MR (\s >
 
−  let
 
−  fs = runMR f s
 
−  gs = P.map g $ nub $ snd <$> fs
 
−  in
 
−  concat $ map (\g' > runMR g' fs) gs)
 
−  </hask><br/>
 
−  The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function.
 
− 
 
−  Now we can write a wrapper function<br/>
 
−  <hask>
 
−  wrapMR :: (Eq a) => ([s] > [(s',b)]) > (a > MapReduce s a s' b)
 
−  wrapMR f = (\k > MR (g k))
 
−  where
 
−  g k ss = f $ fst <$> filter (\s > k == snd s) ss
 
−  </hask><br/>
 
−  which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/>
 
−  <hask>
 
−  mapReduce :: [String] > [(String,Int)]
 
−  mapReduce state = runMapReduce mr state
 
−  where
 
−  mr = return () >>= wrapMR mapper >>= wrapMR reducer
 
−  </hask><br/>
 
−  I have tested the implementation with the standard wordcounter mapper and reducer, and it works perfectly (full code is available via the link above).
 
− 
 
−  ==Future Directions==
 
− 
 
−  *My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. This is clearly where work should go next.
 
−  *Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.
 
− 
 
−  I would be eager for collaborative working on taking this forward.
 
− 
 
−  [[User:Julianporterjulianporter]] 18:32, 2 April 2011 (UTC)
 