Implement a chat server: Difference between revisions
(elaborate on Concurrency section) |
(Elaborate on "Adding communication between threads") |
||
Line 97: | Line 97: | ||
== Adding communication between threads == | == Adding communication between threads == | ||
We'll need some way for two connections, which we've just split into separate threads, to communicate. At first, this might seem a hard problem — requiring us to manage our own event handler / pub-sub implementation as well as start to cover topics such as MVar, TVar, TMVar, and their use-cases. However, we'll let you delve into that at your own pace and will stick to using the <hask>Control.Concurrent.Chan</hask> module which can take care of all of this for us. | |||
[https://hackage.haskell.org/package/base-4.8.2.0/docs/Control-Concurrent-Chan.html Control.Concurrent.Chan] provides exactly what we need: unbounded FIFO channels with a single write and multiple read ends. It's a very simple module where we'll take advantage of the abstract <hask>Chan</hask> datatype: <hask>data Chan a</hask> | |||
Notice that this datatype has a [https://en.wikipedia.org/wiki/Kind_%28type_theory%29 kind] of <hask>* -> *</hask>. To make this datatype concrete, we'll need to decide on a message type. This can be anything serializable, so to keep things simple we'll use <hask>String</hask> and create a type alias of <hask>Msg</hask> to make things a little more semantic. | |||
<haskell> | <haskell> | ||
-- in Main.hs | |||
type Msg = String | type Msg = String | ||
</haskell> | </haskell> | ||
<hask>main</hask> | First, let's import the module: | ||
<haskell> | |||
import Control.Concurrent.Chan -- at the top of Main.hs with the others | |||
</haskell> | |||
To ensure that all of our socket connections are running in the same channel, we'll have <hask>main</hask> create it and pass it to <hask>mainLoop</hask> which will, in turn, pass the channel to each thread in <hask>runConn</hask>. We'll adjust our code as follows: | |||
<haskell> | <haskell> | ||
main = do | main = do | ||
[...] | -- [...] | ||
chan <- newChan | chan <- newChan -- notice that newChan :: IO (Chan a) | ||
mainLoop sock chan | mainLoop sock chan -- pass it into the loop | ||
-- later, in mainLoop: | |||
mainLoop :: Socket -> Chan Msg -> IO () -- See how Chan now uses Msg. | |||
mainLoop :: Socket -> Chan Msg -> IO () | |||
mainLoop sock chan = do | mainLoop sock chan = do | ||
conn <- accept sock | conn <- accept sock | ||
forkIO (runConn conn chan) | forkIO (runConn conn chan) -- pass the channel to runConn | ||
mainLoop sock chan | mainLoop sock chan | ||
</haskell> | </haskell> | ||
At this point, we want to have <hask>runConn</hask> duplicate the channel in order to communicate with it. First, we'll need a couple of helpers, <hask>liftM</hask> and <hask>fix</hask> which you can learn more about [https://hackage.haskell.org/package/base-4.8.2.0/docs/Control-Monad.html#v:liftM here] and [https://hackage.haskell.org/package/base-4.8.2.0/docs/Control-Monad-Fix.html here], respectively. In short, <hask>liftM</hask> allows us to elegantly lift a function over some structure, while <hask>fix</hask> allows us to define a Monadic fixpoint. | |||
<haskell> | <haskell> | ||
import Control.Monad | -- at the top of Main.hs | ||
import Control.Monad (liftM) | |||
import Control.Monad.Fix (fix) | import Control.Monad.Fix (fix) | ||
</haskell> | |||
From <hask>Control.Concurrent.Chan</hask> we'll use some simple functions which are self-explanatory: <hask>writeChan</hask>, <hask>readChan</hask>, and <hask>dupChan</hask>. Of note, <hask>dupChan</hask> will create a new channel which will start empty and have any data written to either it or the original be available from both locations. This creates a way to broadcast messages. | |||
<haskell> | |||
runConn :: (Socket, SockAddr) -> Chan Msg -> IO () | runConn :: (Socket, SockAddr) -> Chan Msg -> IO () | ||
runConn (sock, _) chan = do | runConn (sock, _) chan = do | ||
Line 135: | Line 150: | ||
hdl <- socketToHandle sock ReadWriteMode | hdl <- socketToHandle sock ReadWriteMode | ||
hSetBuffering hdl NoBuffering | hSetBuffering hdl NoBuffering | ||
commLine <- dupChan chan | |||
-- fork off thread for reading from the duplicated channel | |||
-- fork off a thread for reading from the duplicated channel | |||
forkIO $ fix $ \loop -> do | forkIO $ fix $ \loop -> do | ||
line <- readChan | line <- readChan commLine | ||
hPutStrLn hdl line | hPutStrLn hdl line | ||
loop | loop | ||
-- read lines from socket and echo them back to the user | |||
-- read lines from the socket and echo them back to the user | |||
fix $ \loop -> do | fix $ \loop -> do | ||
line <- liftM init (hGetLine hdl) | line <- liftM init (hGetLine hdl) | ||
Line 148: | Line 165: | ||
</haskell> | </haskell> | ||
Notice how <hask>runConn</hask>, running in a separate thread from our main one, now forks <i>another</i> worker thread for sending messages to the connected user. | |||
== Cleanups and final code == | == Cleanups and final code == |
Revision as of 21:19, 14 February 2016
Introduction
This page describes how to implement a simple chat server. The server should support multiple connected users. Messages sent to the server are broadcast to all currently connected users. For this tutorial we'll use Network.Socket, which provides low-level bindings to the C-socket API.
Ultimately, our cabal file will hinge on an executable
section which might look like the following:
executable chat-server-exe
hs-source-dirs: app
main-is: Main.hs
ghc-options: -threaded -rtsopts -with-rtsopts=-N
build-depends: base, network
default-language: Haskell2010
Simple socket server
We start with a simple server. The structure of this server begins with a main
method which will create a reusable socket, open up a TCP connection on port 4242 which will allow a maximum of two queued connections.
-- in Main.hs
module Main where
import Network.Socket
main :: IO ()
main = do
sock <- socket AF_INET Stream 0 -- create socket
setSocketOption sock ReuseAddr 1 -- make socket immediately reusable - eases debugging.
bind sock (SockAddrInet 4242 iNADDR_ANY) -- listen on TCP port 4242.
listen sock 2 -- set a max of 2 queued connections
mainLoop sock -- unimplemented
In our main loop we'll build out the socket-server equivalent of a "Hello World!" example. For a given socket we'll: accept a connection, relay a simple "Hello World!", close the connection, and recurse on the original socket.
-- in Main.hs
mainLoop :: Socket -> IO ()
mainLoop sock = do
conn <- accept sock -- accept a connection and handle it
runConn conn -- run our server's logic
mainLoop sock -- repeat
runConn :: (Socket, SockAddr) -> IO ()
runConn (sock, _) = do
send sock "Hello!\n"
close sock
Notice that accepting a socket has a return type of (Socket, SockAddr)
— this corresponds to a new socket object which can be used to send and receive data for a given connection. This socket object is then closed at the end of our runConn
method.
The SockAddr
, as you can see from the runConn
method, is largely uninteresting for this use-case and will simply be the initial socket address of 4242.
Using System.IO for sockets
Network.Socket
incorrectly represent binary data in send
and recv
and, as a result, use of these functions is not advised and may lead to bugs. Network.Socket
actually recommends using these same methods defined in the ByteString module. However, to keep things simple, we'll stick to System.IO
for input and output.
Importing our new module and turning our Socket
into a Handle
now looks like the following:
-- in the imports our Main.hs add:
import System.IO
-- and we'll change our `runConn` function to look like:
runConn :: (Socket, SockAddr) -> IO ()
runConn (sock, _) = do
hdl <- socketToHandle sock ReadWriteMode
hSetBuffering hdl NoBuffering
hPutStrLn hdl "Hello!"
hClose hdl
Concurrency
So far the server can only handle one connection at a time. This is enough if all we want to do is have a read stream of messages, but it won't be enough if we want to have our server handle chat.
Control.Concurrent is a library in Prelude which does an excellent job of lightweight thread creation and context switching. You are encouraged to check out the hackage page. To handle each user in our chat client, we'll use forkIO
to create a new thread for each connection. Notice that the signature of forkIO
is:
forkIO :: IO () -> IO ThreadId
However, as we don't need the thread's id, we'll ignore the result.
-- add to our imports:
import Control.Concurrent
-- and in our mainLoop function...
mainLoop sock = do
conn <- accept sock
forkIO (runConn conn) -- split off each connection into its own thread
mainLoop sock
Adding communication between threads
We'll need some way for two connections, which we've just split into separate threads, to communicate. At first, this might seem a hard problem — requiring us to manage our own event handler / pub-sub implementation as well as start to cover topics such as MVar, TVar, TMVar, and their use-cases. However, we'll let you delve into that at your own pace and will stick to using the Control.Concurrent.Chan
module which can take care of all of this for us.
Control.Concurrent.Chan provides exactly what we need: unbounded FIFO channels with a single write and multiple read ends. It's a very simple module where we'll take advantage of the abstract Chan
datatype: data Chan a
Notice that this datatype has a kind of * -> *
. To make this datatype concrete, we'll need to decide on a message type. This can be anything serializable, so to keep things simple we'll use String
and create a type alias of Msg
to make things a little more semantic.
-- in Main.hs
type Msg = String
First, let's import the module:
import Control.Concurrent.Chan -- at the top of Main.hs with the others
To ensure that all of our socket connections are running in the same channel, we'll have main
create it and pass it to mainLoop
which will, in turn, pass the channel to each thread in runConn
. We'll adjust our code as follows:
main = do
-- [...]
chan <- newChan -- notice that newChan :: IO (Chan a)
mainLoop sock chan -- pass it into the loop
-- later, in mainLoop:
mainLoop :: Socket -> Chan Msg -> IO () -- See how Chan now uses Msg.
mainLoop sock chan = do
conn <- accept sock
forkIO (runConn conn chan) -- pass the channel to runConn
mainLoop sock chan
At this point, we want to have runConn
duplicate the channel in order to communicate with it. First, we'll need a couple of helpers, liftM
and fix
which you can learn more about here and here, respectively. In short, liftM
allows us to elegantly lift a function over some structure, while fix
allows us to define a Monadic fixpoint.
-- at the top of Main.hs
import Control.Monad (liftM)
import Control.Monad.Fix (fix)
From Control.Concurrent.Chan
we'll use some simple functions which are self-explanatory: writeChan
, readChan
, and dupChan
. Of note, dupChan
will create a new channel which will start empty and have any data written to either it or the original be available from both locations. This creates a way to broadcast messages.
runConn :: (Socket, SockAddr) -> Chan Msg -> IO ()
runConn (sock, _) chan = do
let broadcast msg = writeChan chan msg
hdl <- socketToHandle sock ReadWriteMode
hSetBuffering hdl NoBuffering
commLine <- dupChan chan
-- fork off a thread for reading from the duplicated channel
forkIO $ fix $ \loop -> do
line <- readChan commLine
hPutStrLn hdl line
loop
-- read lines from the socket and echo them back to the user
fix $ \loop -> do
line <- liftM init (hGetLine hdl)
broadcast line
loop
Notice how runConn
, running in a separate thread from our main one, now forks another worker thread for sending messages to the connected user.
Cleanups and final code

There are two major problems left in the code. First, the code has a memory leak, because the original channel is never read by anyone. This can be fixed by adding another thread just for that purpose.
Secondly, closing connections is not handled gracefully at all. This requires exception handling.
The code below fixes the first issue and mostly fixes the second one, and adds a few cosmetic improvements:
- messages are not echoed back to the user they came from.
- every connection is associated with a name.
-- with apologies for the lack of comments :)
import Network.Socket
import System.IO
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Monad
import Control.Monad.Fix (fix)
type Msg = (Int, String)
main :: IO ()
main = do
chan <- newChan
sock <- socket AF_INET Stream 0
setSocketOption sock ReuseAddr 1
bind sock (SockAddrInet 4242 iNADDR_ANY)
listen sock 2
forkIO $ fix $ \loop -> do
(_, msg) <- readChan chan
loop
mainLoop sock chan 0
mainLoop :: Socket -> Chan Msg -> Int -> IO ()
mainLoop sock chan nr = do
conn <- accept sock
forkIO (runConn conn chan nr)
mainLoop sock chan $! nr+1
runConn :: (Socket, SockAddr) -> Chan Msg -> Int -> IO ()
runConn (sock, _) chan nr = do
let broadcast msg = writeChan chan (nr, msg)
hdl <- socketToHandle sock ReadWriteMode
hSetBuffering hdl NoBuffering
hPutStrLn hdl "Hi, what's your name?"
name <- liftM init (hGetLine hdl)
broadcast ("--> " ++ name ++ " entered.")
hPutStrLn hdl ("Welcome, " ++ name ++ "!")
chan' <- dupChan chan
reader <- forkIO $ fix $ \loop -> do
(nr', line) <- readChan chan'
when (nr /= nr') $ hPutStrLn hdl line
loop
handle (\(SomeException _) -> return ()) $ fix $ \loop -> do
line <- liftM init (hGetLine hdl)
case line of
"quit" -> hPutStrLn hdl "Bye!"
_ -> do
broadcast (name ++ ": " ++ line)
loop
killThread reader
broadcast ("<-- " ++ name ++ " left.")
hClose hdl
Have fun chatting!