Concurrency demos/Graceful exit

From HaskellWiki


Problem[edit]

This was put the haskell mailing list by Cat Dancer.

"I'd like to write a server accepting incoming network connections that can be gracefully shutdown.

When the server is asked to shutdown, it should stop accepting new connections, finish processing any current connections, and then terminate."

I could not solve the problem that a new thread made by forkIO did not immediately have my finally/catch exception handler installed. Therefore there was a time after forkIO returned and before the thread started running in which it could be killed with killThread without this getting caught. The "fork" operation in the STM example works hard to ensure the thread is running with the handler in place before returning.

Using throwTo[edit]

By Chris Kuklewicz

<haskell>
> import Control.Concurrent
> import Control.Concurrent.MVar
> import Control.Exception as Exception
> import Network.Socket
> import Data.Typeable
> import System.IO
>
> -- A ConnectionHandler is a function which handles an incoming
> -- client connection.  The handler is run in its own thread, and is
> -- passed a handle to the client socket.  The handler does whatever
> -- communication it wants to do with the client, and when it returns,
> -- the client socket handle is closed and the thread terminates.
> -- A list of active handlers is kept, and the client connection is
> -- also marked as finished when the handler returns.
>
> type ConnectionHandler = Handle -> IO ()
>
> example_connection_handler :: ConnectionHandler
>
> example_connection_handler handle = do
>   hPutStrLn handle "Hello."
>   hPutStrLn handle "Goodbye."
>
>
> type ChildrenDone = MVar [MVar ()]
>
> data ExitGracefully = ExitGracefully deriving Typeable
>
>
> waitForChildren :: ChildrenDone -> IO ()
>
> waitForChildren childrenDone = do
>   cs <- takeMVar childrenDone
>   mapM_ takeMVar cs
>
> shutdownServer :: MVar () -> ChildrenDone -> ThreadId -> IO ()
>
> shutdownServer acceptLoopDone childrenDone acceptThreadId = do
>   throwDynTo acceptThreadId ExitGracefully
>   takeMVar acceptLoopDone
>   -- There can be no more changes to childrenDone
>   waitForChildren childrenDone
>   return ()
>
> acceptConnections :: MVar () -> ChildrenDone -> ConnectionHandler -> Socket -> IO ()
>
> acceptConnections acceptLoopDone childrenDone connectionHandler sock = 
>   finially (acceptConnections' acceptLoopDone childrenDone connectionHandler sock)
>            (putStrLn "accept loop exiting" >> putMVar acceptLoopDone () ) -- run last
>
> -- This only looks for exceptions when "accept sock" is executed
> acceptConnections' acceptLoopDone childrenDone connectionHandler sock = block loop
>   where loop = do
>           unblock (return ()) -- safe point to be interrupted, so unblock
>           (clientSocket, addr) <- accept sock  -- may or may not unblock and wait
>           clientHandle <- socketToHandle clientSocket ReadWriteMode
>           childDone <- newEmptyMVar
>           forkIO $ handleConnection childDone connectionHandler clientHandle
>           modifyMVar_ childrenDone (return . (childDone:))  -- non-blocking atomic change to MVar
>           loop
>
> handleConnection childDone connectionHandler clientHandle = do
>   Exception.catch
>     (finially (connectionHandler clientHandle)
>               (hClose clientHandle >> putMVar childDone () )
>
>     -- TODO we'll want to do something better when
>     -- connectionHandler throws an exception, but
>     -- for now we'll at least display the exception.
>     (\e -> print e)


Using STM[edit]

{-

The main accepting thread spawns this a slave thread to run accept and
stuffs the result into a TMVar.  The main loop then atomically checks
the TVar used for graceful shutdown and the TMVar.  These two checks
are combined by `orElse` which gives the semantics one wants: on each
loop either the TVar has been set to True or the the slave thread has
accepted a client into the TMVar.

There is still the possibility that a busy server could accept a
connection from the last client and put it in the TMVar where the main
loop will miss it when it exits.  This is handled by the finally
action which waits for the slave thread to be well and truly dead and
then looks for that last client in the TMVar.

The list of child threads is cleaned periodically (currently every
10th child), which allows the garbage collected to remove the dead
threads' structures.

By Chris Kuklewicz <haskell at list dot mightyreason dot com>

-}

-- Example using STM and orElse to compose a solution
import Control.Monad
import Control.Concurrent
import Control.Exception
import Control.Concurrent.STM
import Data.IORef
import Network
import System.IO

forever x = x >> forever x

runExampleFor socket seconds = do
  tv <- newTVarIO False           -- Set to True to indicate graceful exit requested
  sInfo <- startServer socket tv
  threadDelay (1000*1000*seconds)
  shutdownServer tv sInfo

startServer socket tv = do
  childrenList <- newMVar []
  tInfo <- fork (acceptUntil socket exampleReceiver childrenList (retry'until'true tv))
  return (tInfo,childrenList)

shutdownServer tv ((acceptLoopDone,_),childrenList) = do
  atomically (writeTVar tv True)
  atomically (readTMVar acceptLoopDone)
  withMVar childrenList (mapM_ (atomically . readTMVar . fst))

-- Capture idiom of notifying a new TMVar when a thread is finished
fork todo = block $ do
  doneVar <- atomically (newEmptyTMVar)
  let putStarted = atomically (putTMVar doneVar False)
      putStopped = atomically (tryTakeTMVar doneVar >> putTMVar doneVar True)
  tid <- forkIO $ block $ (finally (putStarted >> unblock todo) putStopped)
  yield
  atomically $ do
    value <- takeTMVar doneVar
    when value (putTMVar doneVar True)
  return (doneVar,tid)

cond true false test = if test then true else false

-- This is an asychronous exception safe way to use accept to get one
-- client at a time and pass them to the parent thread via a TMVar.
acceptInto socket chan =  block . forever $ do
  unblock . atomically $
    isEmptyTMVar chan >>= cond (return ()) retry
  client <- accept socket
  atomically (putTMVar chan client)

-- This demonstrates how to use acceptInto to spawn client thread
-- running "receiver".  It ends when checker commits instead of using
-- retry.
acceptUntil socket receiver childrenList checker = do
  counter <- newIORef (0::Int) -- who cares if it rolls over?
  chan <- atomically (newEmptyTMVar)
  (mv,tid) <- fork (acceptInto socket chan)
  let loop = atomically (fmap Left checker `orElse` fmap Right (takeTMVar chan))
             >>= either (const (return ()))    (\client -> spawn client >> loop)
      spawn client@(handle,_,_) = do
        cInfo <- fork (finally (receiver client) (hClose handle))
        count <- readIORef counter
        writeIORef counter $! (succ count)
        modifyMVar_ childrenList $ \kids -> fmap (cInfo:) $
          if count `mod` 10 /= 0  -- 10 is arbitrary frequency for cleaning list
            then return kids
            else atomically $ filterM (isEmptyTMVar . fst) kids
      end = do
        killThread tid
        atomically (readTMVar mv)
        atomically (tryTakeTMVar chan) >>= maybe (return ()) spawn
  finally (handle (\e -> throwTo tid e >> throw e) loop) end

exampleReceiver (handle,_,_) = do
  hPutStrLn handle "Hello."
  hPutStrLn handle "Goodbye."

retry'until'true tv = (readTVar tv >>= cond (return ()) retry)


AcceptLoop Library[edit]

Work has started on a library module to implement just the "accepting a network connection with graceful shutdown" part, with a goal of creating code that is both correct and reusable. Details and source code can be found at AcceptLoop.