You are previewing Parallel and Concurrent Programming in Haskell.

Parallel and Concurrent Programming in Haskell

Cover of Parallel and Concurrent Programming in Haskell by Simon Marlow Published by O'Reilly Media, Inc.
  1. Special Upgrade Offer
  2. Preface
    1. Audience
    2. How to Read This Book
    3. Conventions Used in This Book
    4. Using Sample Code
    5. Safari® Books Online
    6. How to Contact Us
    7. Acknowledgments
  3. 1. Introduction
    1. Terminology: Parallelism and Concurrency
    2. Tools and Resources
    3. Sample Code
  4. I. Parallel Haskell
    1. 2. Basic Parallelism: The Eval Monad
      1. Lazy Evaluation and Weak Head Normal Form
      2. The Eval Monad, rpar, and rseq
      3. Example: Parallelizing a Sudoku Solver
      4. Deepseq
    2. 3. Evaluation Strategies
      1. Parameterized Strategies
      2. A Strategy for Evaluating a List in Parallel
      3. Example: The K-Means Problem
      4. GC’d Sparks and Speculative Parallelism
      5. Parallelizing Lazy Streams with parBuffer
      6. Chunking Strategies
      7. The Identity Property
    3. 4. Dataflow Parallelism: The Par Monad
      1. Example: Shortest Paths in a Graph
      2. Pipeline Parallelism
      3. Example: A Conference Timetable
      4. Example: A Parallel Type Inferencer
      5. Using Different Schedulers
      6. The Par Monad Compared to Strategies
    4. 5. Data Parallel Programming with Repa
      1. Arrays, Shapes, and Indices
      2. Operations on Arrays
      3. Example: Computing Shortest Paths
      4. Folding and Shape-Polymorphism
      5. Example: Image Rotation
      6. Summary
    5. 6. GPU Programming with Accelerate
      1. Overview
      2. Arrays and Indices
      3. Running a Simple Accelerate Computation
      4. Scalar Arrays
      5. Indexing Arrays
      6. Creating Arrays Inside Acc
      7. Zipping Two Arrays
      8. Constants
      9. Example: Shortest Paths
      10. Example: A Mandelbrot Set Generator
  5. II. Concurrent Haskell
    1. 7. Basic Concurrency: Threads and MVars
      1. A Simple Example: Reminders
      2. Communication: MVars
      3. MVar as a Simple Channel: A Logging Service
      4. MVar as a Container for Shared State
      5. MVar as a Building Block: Unbounded Channels
      6. Fairness
    2. 8. Overlapping Input/Output
      1. Exceptions in Haskell
      2. Error Handling with Async
      3. Merging
    3. 9. Cancellation and Timeouts
      1. Asynchronous Exceptions
      2. Masking Asynchronous Exceptions
      3. The bracket Operation
      4. Asynchronous Exception Safety for Channels
      5. Timeouts
      6. Catching Asynchronous Exceptions
      7. mask and forkIO
      8. Asynchronous Exceptions: Discussion
    4. 10. Software Transactional Memory
      1. Running Example: Managing Windows
      2. Blocking
      3. Blocking Until Something Changes
      4. Merging with STM
      5. Async Revisited
      6. Implementing Channels with STM
      7. An Alternative Channel Implementation
      8. Bounded Channels
      9. What Can We Not Do with STM?
      10. Performance
      11. Summary
    5. 11. Higher-Level Concurrency Abstractions
      1. Avoiding Thread Leakage
      2. Symmetric Concurrency Combinators
      3. Adding a Functor Instance
      4. Summary: The Async API
    6. 12. Concurrent Network Servers
      1. A Trivial Server
      2. Extending the Simple Server with State
      3. A Chat Server
    7. 13. Parallel Programming Using Threads
      1. How to Achieve Parallelism with Concurrency
      2. Example: Searching for Files
    8. 14. Distributed Programming
      1. The Distributed-Process Family of Packages
      2. Distributed Concurrency or Parallelism?
      3. A First Example: Pings
      4. Multi-Node Ping
      5. Typed Channels
      6. Handling Failure
      7. A Distributed Chat Server
      8. Exercise: A Distributed Key-Value Store
    9. 15. Debugging, Tuning, and Interfacing with Foreign Code
      1. Debugging Concurrent Programs
      2. Tuning Concurrent (and Parallel) Programs
      3. Concurrency and the Foreign Function Interface
    10. Index
  6. About the Author
  7. Colophon
  8. Special Upgrade Offer
  9. Copyright

Chapter 14. Distributed Programming

Up until now, we have been considering programs that run on a single machine, while possibly making use of multiple processors to exploit parallelism. But there is a far more plentiful source of parallelism: running a program on multiple machines simultaneously. We call this distributed programming, and Haskell supports it through a framework called distributed-process.[50]

Aside from the obvious advantages of multimachine parallelism, there are other reasons to write distributed programs. For example:

  • A distributed server can make more efficient use of network resources by moving the servers closer to the clients. We will see an example of this in A Distributed Chat Server.
  • A distributed program can exploit a heterogeneous environment, where certain resources are available only to certain machines. An example of this might be a cluster of machines with local disks, where a large data structure is spread across the disks and we wish to run our computation on the machine that has the appropriate part of the data structure on its local disk.

So what should distributed programming look like from the programmer’s perspective? Should it look like Concurrent Haskell, with forkIO, MVar, and STM? In fact, there are some good reasons to treat distributed computation very differently from computation on a shared-memory multicore:

  • There is a realistic possibility of partial hardware failure: that is, some of the machines involved in a computation may go down while others continue to run. Indeed, given a large enough cluster of machines, having nodes go down becomes the norm. It would be unacceptable to simply abort the entire program in this case. Recovery is likely to be application-specific, so it makes sense to make failure visible to the programmer and let him handle it in an appropriate way for his application.
  • Communication time becomes significant. In the shared-memory setting, it is convenient and practical to allow unrestricted sharing. This is because, for example, passing a pointer to a large data structure from one thread to another has no cost (beyond the costs imposed by the hardware and the runtime memory manager, but again it is convenient and practical to ignore these). In a distributed setting, however, communication can be costly, and sharing a data structure between threads is something the programmer will want to think about and explicitly control.
  • In a distributed setting, it becomes far more difficult to provide any global consistency guarantees of the kind that, for example, STM provides in the shared-memory setting. Achieving a consistent view of the state of the system becomes a very hard problem indeed. There are algorithms for achieving agreement between nodes in a distributed system, but the exact nature of the consistency requirements depend on the application, so we don’t want to build a particular algorithm into the system.

For these reasons, the Haskell developers decided that the model for distributed programming should be based on explicit message passing, and not the MVar and STM models that we provide for shared-memory concurrency.[51] Think of it as having TChan be the basic primitive available for communication. It is possible to build higher-level abstractions on top of the explicit message-passing layer, just as we built higher-level abstractions on top of STM and MVar in earlier chapters.

The Distributed-Process Family of Packages

There is no built-in support for distributed programming in Haskell. It is all implemented as libraries using the concurrency facilities we have covered in earlier chapters.

The package providing the core APIs for distributed programming is called distributed-process. It must be used together with a separate transport layer package that provides infrastructure for sending and receiving messages between nodes in the distributed network. The distributed-process package is deliberately independent of the transport layer so we can plug in different transport layer implementations. The most common transport layer is likely to be TCP/IP, as provided by the network-transport-tcp package, but we could imagine a transport layer that used shared memory to communicate among multiple nodes on the same multicore machine, or transport layers supporting some of the faster networks designed for clusters, such as InfiniBand.

Each transport layer needs a different mechanism for creating and shutting down nodes on the network and discovering which nodes are available (peer discovery). We will be using the package distributed-process-simplelocalnet that provides a simple implementation on top of the network-transport-tcp transport layer.

At the time of writing, the distributed-process framework is somewhat new and a little rough around the edges, but it is already quite fully featured and we expect it to mature in due course.[52]

It is reasonable to wonder whether we even need a framework to do distributed message-passing. After all, can’t we just use the network package directly and program our own message passing? Certainly you could do this, but the packages described in this chapter provide a lot of functionality that makes it much easier to build a distributed application. They let you think about your application as a single program that happens to run on multiple machines, rather than a collection of programs running on different machines that talk to one another.

For example, with the distributed-process framework, we can call a function spawn that spawns a process (like a thread) on a different machine, and we can exchange messages with the remote process directly in the form of Haskell data types. Even though we are writing a single program to execute on multiple machines, there is no need for all the machines to be identical; indeed, programmers often want to exploit some non-uniformity. For example, we might want to run a caching service on a machine with lots of memory while sending compute-intensive tasks to machines with lots of fast cores. There may also be nonuniformity in the network topology. We might want to perform a database query on a machine close to the database server, for example, or put services that communicate with each other frequently close to one another in the network.

The distributed-process framework provides a whole infrastructure suite that supports the distributed application domain. These are some of the important facilities it provides:

  • Remote spawning of processes
  • Serialization of Haskell data for message passing
  • Process linking (receiving notification when another process dies)
  • Receiving messages on multiple channels
  • A dedicated per-process channel for receiving dynamically typed messages
  • Automatic peer discovery

Distributed Concurrency or Parallelism?

We have included distribution in the concurrency part of this book for the simple reason that the explicit message-passing API we’ll describe is concurrent and nondeterministic. And yet, the main reason to want to use distribution is to exploit the parallelism of running on multiple machines simultaneously. So this setting is similar to parallel programming using threads described in Chapter 13, except that here we have only message passing and no shared state for coordination.

It is a little unfortunate that we have to resort to a nondeterministic programming model to achieve parallelism just because we want to exploit multiple machines. There are efforts under way to build deterministic programming models atop the distributed-process framework, although at the time of writing these projects are too experimental to include in this book.[53]

A First Example: Pings

To get acquainted with the basics of distributed programming, we will start with a simple example: a ping/pong message exchange. To start with, there will be a single master process that creates a child process. The master process will send a “ping” message to the child, which will respond with a “pong” message and the program will then exit.

The ping example will illustrate the basic pattern for setting up a program to use the distributed-process framework and introduce the APIs for creating processes and simple message passing. The first version of the program will run on a single node (machine) so we can get familiar with the basics of the interface before moving on to working with multiple nodes.

For reference, the subset of the Control.Distributed.Process API that we will be using is shown here:

data Process   -- instance Monad, MonadIO

data NodeId    -- instance Eq, Ord, Show, Typeable, Binary
data ProcessId -- instance Eq, Ord, Show, Typeable, Binary

getSelfPid  :: Process ProcessId
getSelfNode :: Process NodeId

spawn  :: NodeId -> Closure (Process ()) -> Process ProcessId

send   :: Serializable a => ProcessId -> a -> Process ()
expect :: Serializable a => Process a

terminate :: Process a

say :: String -> Process ()

Processes and the Process Monad

First, a bit of terminology. A distributed program consists of a set of processes that may communicate with one another by sending and receiving messages. A process is like a thread. Processes run concurrently with one another, and every process has a unique ProcessId. There are a couple of important differences between threads and processes, however:

  • Threads are always created on the current node, whereas a process can be created on a remote node (we won’t be using this facility until the next section, though).
  • Processes run in the Process monad, rather than the IO monad. Process is an instance of MonadIO, so you can perform IO operations in Process by wrapping them in liftIO. All message-passing operations are in Process, so only processes, not threads, can engage in message passing.

Defining a Message Type

We start by defining the type of messages that our processes will send and receive:


data Message = Ping ProcessId
             | Pong ProcessId
  deriving (Typeable, Generic)          -- 1

instance Binary Message                 -- 2

The Ping message contains the ProcessId of the process that sent it so that the target of the message knows where to send the response. The Pong response also includes the ProcessId of the responder so that the master process can tell which process a particular response comes from.

Messages in a distributed program can be sent over the network, which Involves serializing the Haskell data into a stream of bytes before it is sent and deserializing the bytes back into Haskell data at the other end. The distributed-process framework uses the Binary class from the binary package to implement serialization and deserialization, and hence every message type must be an instance of Binary.

The serialization format is under your control. If you want, you can define your own Binary instance that uses a specialized serialization format. Normally, however, you’ll just want an automatically derived Binary instance. Fortunately, the binary package[54] lets you derive Binary instances using GHC’s DeriveGeneric extension.[55] To do this, we first derive the Generic class (1) and then declare an instance of Binary for Message (2); GHC fills in the method definitions of this instance for us.

Message types must also be an instance of Typeable, because they can be sent to dynamically typed channels (more about this later). For Typeable, we can derive the instance directly (1).

Typeable and Binary are normally packaged up together and referred to as Serializable using the following class provided by Control.Distributed.Process.Serializable:

class (Binary a, Typeable a) => Serializable a
instance (Binary a, Typeable a) => Serializable a

There’s nothing magic about Serializable. Just think of Serializable a as shorthand for (Binary a, Typeable a). You’ll see Serializable used a lot in the Control.Distributed.Process APIs.

The Ping Server Process

Next, we’ll write the code for a “ping server” process. The ping server must wait for a Ping message and then respond with a Pong message.

pingServer :: Process ()
pingServer = do
  Ping from <- expect                              -- 1
  say $ printf "ping received from %s" (show from) -- 2
  mypid <- getSelfPid                              -- 3
  send from (Pong mypid)                           -- 4

First of all, notice that we are in the Process monad. As we mentioned earlier, virtually all of the Control.Distributed.Process API is in this monad, and only code running in the Process monad can communicate with other processes and spawn new processes. There has to be a way to get into Process in the first place; we’ll see how that happens shortly, but for now let’s assume we’re already in Process and we need to program the ping server.

At 1 we receive the next message using expect:

expect :: Serializable a => Process a

The expect function receives a message sent directly to this process. Each process has a channel associated with it, and the channel can receive messages of any type. The expect call receives a message of a particular type, where the type is determined by the context. If the type cannot be determined, the compiler will complain that the type is ambiguous, and the usual fix is to add a type signature. In the example just shown, the type of messages to receive is determined by the pattern match on the result, which matches directly on the Ping constructor and thus forces expect to receive messages of the type Message.

The expect function is a little like Haskell’s read function, in that it returns a value whose type depends on the context. But whereas read fails if its argument cannot be parsed as the desired type, expect skips over messages in the queue that do not match and returns the first one that matches. Messages that don’t match the expected type are left in the channel for the time being.

If there are no messages of the right type, expect will block until one arrives. Therefore, it should be used with care: the other messages in the queue are ignored while expect is waiting for the right kind of message to arrive, which could lead to a deadlock. We’ll see later how to wait for several different types of message at the same time.

The say function, called at 2, causes a message to be logged, which is a useful way to debug your program. Usually, the message will be logged to stderr, but it might be sent somewhere else if the transport layer overrides the default logging process.

At 3 we call getSelfPid to obtain the ProcessId of the current process. The ProcessId of the current process is needed because the Pong message will contain it:

getSelfPid  :: Process ProcessId

And at 4 we send a response back to the originator of the Ping. The function send is used to send a message to a process, and it has the following type:

send :: (Serializable a) => ProcessId -> a -> Process ()

We know which ProcessId to send the Pong to because it was contained in the original Ping message.

Now we need to be able to create processes running pingServer. Although in this example we will be creating the process on the local node, in general we might be creating the process on another node. Functions that will be executed remotely in this way need to be declared explicitly.[56] The following declaration invokes a bit of Template Haskell magic that creates the necessary infrastructure to allow pingServer to be executed remotely:[57]

remotable ['pingServer]

The Master Process

Next, we will write the code for the master process. As you might expect, this is an operation of type Process ():

master :: Process ()
master = do
  node <- getSelfNode                               -- 1

  say $ printf "spawning on %s" (show node)
  pid <- spawn node $(mkStaticClosure 'pingServer)  -- 2

  mypid <- getSelfPid                               -- 3
  say $ printf "sending ping to %s" (show pid)
  send pid (Ping mypid)                             -- 4

  Pong _ <- expect                                  -- 5
  say "pong."

  terminate                                         -- 6

Call getSelfNode, which returns the NodeId of the current node. A NodeId is needed when creating a new process.


Call spawn to create the child process. Here is the function’s signature:

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

The spawn function creates a new process on the given NodeId (which here is the current node). The new process runs the computation supplied as the second argument to spawn, which is a value of type Closure (Process ()). Ultimately, we want to spawn a computation of type Process (), but such values cannot be serialized because in practice a value of type Process () could refer to an arbitrary amount of local data, including things that cannot be sent to other nodes (such as a TVar). Hence the type Closure is used to represent serializable computations.

How do we get one of these? First, the function to call must be declared remotable, as we did above. Then, if there are no arguments to pass, the Template Haskell function mkStaticClosure generates the appropriate code for the closure. (If there are arguments, then we need to use a different function, which we will see later.)

The spawn operation returns the ProcessId of the new process, which we bind to pid.


Call getSelfPid to return the ProcessId of the current process. We need this to send in the Ping message.


Send the Ping message to the child process.


Call expect to receive the Pong message from the child process.


Finally, terminate the process by calling terminate. In this case, simply returning from master would terminate the process, but sometimes we need to end the process in a context where it is not practical to arrange the top-level function to return, and in those cases terminate is useful. Moreover, it is good practice to indicate the end of the process explicitly.

The main Function

All that remains to complete the program is to define our main function, and here it is:

main :: IO ()
main = distribMain (\_ -> master) Main.__remoteTable

The main function calls distribMain from DistribUtils, which is a small module of utilities provided with the sample code to make these examples a bit less cluttered. The distribMain function is a wrapper around the lower-level startup facilities from the distributed-process-simplelocalnet package. It starts up the distributed-process framework with the distributed-process-simplelocalnet backend on a single node.

The first argument to distribMain is the Process computation to run as the master process on the node. It has type [NodeId] -> Process (), where the list of NodeIds are the other nodes in our distributed network. Because this example is running on a single node, we ignore the [NodeId] and just invoke the master function as our master process.

The second argument to distribMain is the metadata used to execute remote calls; in this case we pass Main.__remoteTable, which is generated by the Template Haskell call to remotable we showed earlier.

When you run the program, you should see output like this:[58]

$ ./ping
pid://localhost:44444:0:3: spawning on nid://localhost:44444:0
pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4
pid://localhost:44444:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44444:0:3: pong.

Each of these messages corresponds to one of the calls to say in the example program, and they are tagged with the date, time, and ProcessId of the process that called say.

Summing Up the Ping Example

In this section, we built the simplest distributed program possible: it spawns a single child process and performs a simple ping/pong message exchange. Here are the key things to take away:

  • To create a process, we call spawn, passing a NodeId and a Closure (Process ()). The former we got from getSelfNode (there are other ways, which we will encounter shortly), and the latter was generated by a call to the Template Haskell function mkStaticClosure.
  • Processes run in the Process monad, which is a layer over the IO monad.
  • Messages can be sent to a process using send and received by calling expect. Messages are ordinary Haskell data; the only requirement is that the type of the message is an instance of the Binary and Typeable classes.

There is a certain amount of boilerplate associated with distributed programming: deriving Binary instances, declaring remotable functions with remotable, starting up the framework with distribMain, and so on. Remember that the distributed-process framework is currently implemented as a library entirely in Haskell. There is no support for distributed programming built into the language or GHC itself, and this accounts for some of the boilerplate. As the framework matures, distributed programming will likely become a smoother experience.

Multi-Node Ping

The previous example showed how to create a process and exchange some simple messages. Now we will extend the program to be truly distributed. Instead of spawning a process on the local node, we will run the program on several nodes, create a process on each one, and perform the ping/pong protocol with all nodes simultaneously.

The Message type and pingServer remain exactly as before. The only changes will be to the master and main functions. The new master function is shown below, along with a waitForPongs helper function:


master :: [NodeId] -> Process ()                     -- 1
master peers = do

  ps <- forM peers $ \nid -> do                      -- 2
          say $ printf "spawning on %s" (show nid)
          spawn nid $(mkStaticClosure 'pingServer)

  mypid <- getSelfPid

  forM_ps $ \pid -> do                               -- 3
    say $ printf "pinging %s" (show pid)
    send pid (Ping mypid)

  waitForPongs ps                                    -- 4

  say "All pongs successfully received"

waitForPongs :: [ProcessId] -> Process ()            -- 5
waitForPongs [] = return ()
waitForPongs ps = do
  m <- expect
  case m of
    Pong p -> waitForPongs (filter (/= p) ps)
    _  -> say "MASTER received ping" >> terminate

This time, the master process takes an argument of type [NodeId], containing a NodeId for each node in the distributed network. This list is supplied by the framework when it starts up, after it has discovered the set of peers in the network. We’ll see shortly how to start up the program on multiple nodes.


Spawn a new process on each of the peer nodes, and bind the resulting list of ProcessIds to ps.


Call waitForPongs (defined below) to receive all the pong messages. When waitForPongs returns, the program emits a diagnostic and terminates.


waitForPongs is a simple algorithm that removes each ProcessId from the list as its pong message is received and returns when the list is empty.

The main function is almost the same as before:

main :: IO ()
main = distribMain master Main.__remoteTable

The only difference is that the [Node] argument gets passed along to master instead of being discarded here.

Running with Multiple Nodes on One Machine

First, I’ll illustrate starting multiple nodes on the same machine and then progress on to multiple machines.

A distributed program consists of a single master node and one or more slave nodes. The master is the node that begins with a process running; the slave nodes just wait until processes are spawned on them.

Let’s start by creating two slave nodes:

$ ./ping-multi slave 44445 &
[3] 58837
$ ./ping-multi slave 44446 &
[4] 58847

The ping-multi program takes two command-line arguments; these are interpreted by the distrbMain function and tell it how to initialize the framework. The first argument is either master or slave and indicates which kind of node to create. The second argument is the TCP port number that this node should use to communicate on, with the default being 44444.[59] Always use different port numbers when creating multiple nodes on the same machine.

I used & to create these as background processes in the shell. If you’re on Windows, just open a few Command Prompt windows and run the program in each one.

Having started the slaves, we now start the master node:

$ ./ping-multi
pid://localhost:44444:0:3: spawning on nid://localhost:44445:0
pid://localhost:44444:0:3: spawning on nid://localhost:44446:0
pid://localhost:44444:0:3: pinging pid://localhost:44445:0:4
pid://localhost:44444:0:3: pinging pid://localhost:44446:0:4
pid://localhost:44446:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44445:0:4: ping received from pid://localhost:44444:0:3
pid://localhost:44444:0:3: All pongs successfully received

The first thing to note is that the master node automatically found the two slave nodes. The distributed-process-simplelocalnet package includes a peer discovery mechanism that is designed to automatically locate and connect to other instances running on the same machine or other machines on the local network.

It is also possible to restart the master without restarting the slaves—try invoking ping-multi again, and you should see the same result. The new master node discovers and reconnects to the existing slaves.

Running on Multiple Machines

If we have multiple machines connected on the same network, we can run a distributed Haskell program on them. The first step is to distribute the binary to all the machines; every machine must be running the same binary. A mismatch in the binary on different machines can cause strange failures, such as errors when decoding messages.

Next, we start the slaves as before, but this time we start slaves on the remote machines and pass an extra argument:

$ ./ping-multi slave 44444
$ ./ping-multi slave 44444

(The above commands are executed on the appropriate machines.) The second argument is new and gives the IP address that identifies the slave. This is the address that the other nodes will use to contact it, so it must be an address that resolves to the correct machine. It doesn’t have to be an IP address, but using IP addresses is simpler and eliminates a potential source of failure (the DNS).

When the slaves are running, we can start the master:

$ ./ping-multi master 44444
pid://localhost:44444:0:3: spawning on nid://
pid://localhost:44444:0:3: spawning on nid://
pid://localhost:44444:0:3: pinging pid://
pid://localhost:44444:0:3: pinging pid://
pid:// ping received from pid://localhost:44444:0:3
pid:// ping received from pid://localhost:44444:0:3
pid://localhost:44444:0:3: All pongs successfully received

The program successfully identified the remote nodes, spawned a processes on each one, and exchanged ping-pong messages with the process on each node.

Typed Channels

In the examples so far, we saw messages being delivered to a process and the process receiving the messages by using expect. This scheme is quite convenient: we need to know only a process’s ProcessId to send it messages, and we can send it messages of any type. However, all the messages for a process go into the same queue, which has a couple of disadvantages:

  • Each time we call expect, the implementation has to search the queue for a message of the right type, which could be slow.
  • If we are receiving messages of the same type from multiple senders, then we need to explicitly include some information in the message that lets us tell them apart (e.g., the ProcessId of the sender).

The distributed-process framework provides an alternative means of message passing based on typed channels, which addresses these two problems. The interface is as follows:

data SendPort a     -- instance of Typeable, Binary
data ReceivePort a

newChan :: Serializable a => Process (SendPort a, ReceivePort a)

sendChan :: Serializable a => SendPort a -> a -> Process ()

receiveChan :: Serializable a => ReceivePort a -> Process a

A typed channel consists of two ports, a SendPort and a ReceivePort. Messages are sent to the SendPort by sendChannel and received from the ReceivePort using receiveChannel. As the name suggests, a typed channel can carry messages only of a particular type.

Typed channels imply a different pattern of interaction. For example, suppose we were making a request to another process and expecting a response. Using typed channels, we could program this as follows:

  • The client creates a new channel for an interaction.
  • The client sends the request, along with the SendPort.
  • The server responds on the SendPort it was sent.

In general, the server might make its own channel and send that to the client, and the subsequent interaction would happen over these two channels.

The advantage of creating a channel to carry the response is that the client knows that a message arriving on this channel can only be a response to the original request, and it is not possible to mix up this response with other responses. The channel serves as a link between the original request and the response; we know that it is a response to this particular request, because it arrived on the right channel.

In the absence of typed channels, ensuring that the response can be uniquely identified would involve creating a new identifier to send along with the original message.[60]

Let’s look at how to modify the ping example to use typed channels:


data Message = Ping (SendPort ProcessId)
  deriving (Typeable, Generic)

instance Binary Message

Note that we don’t need a Pong message anymore. Instead, the Ping message will contain a SendPort on which to send the reply, and the reply is just the ProcessId of the sender. In fact, in this example we don’t really need to send any content back at all—just sending () would be enough—but for the purposes of illustration we will send back the ProcessId.

pingServer :: Process ()
pingServer = do
  Ping chan <- expect
  say $ printf "ping received from %s" (show chan)
  mypid <- getSelfPid
  sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do

  ps <- forM peers $ \nid -> do
          say $ printf "spawning on %s" (show nid)
          spawn nid $(mkStaticClosure 'pingServer)

  mapM_ monitor ps

  ports <- forM ps $ \pid -> do

    say $ printf "pinging %s" (show pid)
    (sendport,recvport) <- newChan      --1
    send pid (Ping sendport)            --2
    return recvport

  forM_ ports $ \port -> do             --3
     _ <- receiveChan port
     return ()

  say "All pongs successfully received"

Create a new channel to carry the response.


Send the ping message, including the SendPort of the channel.


Where previously we needed a function waitForPongs to collect all the responses and match them up with the peers, this time we can just wait for a response on each of the channels we created.

This code is simpler than the previous version in Multi-Node Ping. However, note that we still sent the Ping messages directly to the process, rather than using a typed channel. If we wanted to use a typed channel here too, things get more complicated. We want to do something like this (considering just a single worker for simplicity):

    (s1,r1) <- newChan
    spawn nid ($(mkClosure `pingServer) r1)

    (s2,r2) <- newChan
    sendChan s1 (Ping s2)

    receiveChan r2

This seems quite natural: we create a channel with send port s1 and receive port r1 on which to send the Ping message. Then we give the receive port of the channel to the pingServer process when we spawn it. The code shows how to use spawn to apply a function (here pingServer) to an argument (here r1): use mkClosure instead of mkStaticClosure, and then pass the argument to it (we’ll come back to this later; the details aren’t important right now).

But there’s a big problem here. ReceivePorts are not Serializable, which prevents us passing the ReceivePort r1 to the spawned process. GHC will reject the program with a type error.

Why are ReceivePorts not Serializable? If you think about it a bit, this makes a lot of sense. If a process were allowed to send a ReceivePort somewhere else, the implementation would have to deal with two things: routing messages to the correct destination when a ReceivePort has been forwarded (possibly multiple times), and routing messages to multiple destinations, because sending a ReceivePort would create a new copy. This would introduce a vast amount of complexity to the implementation, and it is not at all clear that it is a good feature to allow. So the remote framework explicitly disallows it, which fortunately can be done using Haskell’s type system.

This means that we have to jump through an extra hoop to fix the previous code, though. Instead of passing the ReceivePort to the spawned process, the spawned process must create the channel and send us back the SendPort. This means we need another channel so that the spawned process can send us back its SendPort.

    (s,r) <- newChan  -- throw-away channel
    spawn nid ($(mkClosure `pingServer) s)
    ping <- receiveChan r

    (sendpong,recvpong) <- newChan
    sendChan ping (Ping sendpong)

    receiveChan recvpong

Since this extra handshake is a bit of a hassle, you might well prefer to send messages directly to the spawned process using send rather than using typed channels, which is exactly what the example code at the beginning of this section did.

Merging Channels

In the previous section, we waited for a response from each child process in turn, whereas the old waitForPongs version processed the messages in the order they arrived. In this case it isn’t a problem, but suppose some of these messages required a response. Then we might have introduced some extra latency: if a process toward the end of the list replies early, it won’t get a response until the master process has dealt with the messages from the other processes earlier in the list, some of which might take a while to reply.

So we need a way to wait for messages from multiple channels simultaneously. The distributed-process framework has an elegant way to do this. Channels can be merged together to make a single channel that receives messages from any of the original channels. There are two ways to do this:

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR     :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)

The difference is in the order in which messages arrive on the merged channel. In mergePortsBiased, each receive searches the ports in left-to-right order for a message, returning the first message it finds. The alternative is mergePortsRR (the RR stands for “round robin”) which also searches left to right, but rotates the list by one element after each receive, with the leftmost port moving to the end of the list.

One important thing to note is that merging channels does not affect the original channel; we can still receive messages from either source, and indeed there is no problem with merging multiple overlapping sets of channels.[61]

Here is the ping example with channels, where instead of waiting for the responses one by one, we merge the channels together and wait for all the responses simultaneously.


master :: [NodeId] -> Process ()
master peers = do

  ps <- forM peers $ \nid -> do
          say $ printf "spawning on %s" (show nid)
          spawn nid $(mkStaticClosure 'pingServer)

  ports <- forM ps $ \pid -> do
    say $ printf "pinging %s" (show pid)
    (sendport,recvport) <- newChan
    send pid (Ping sendport)
    return recvport

  oneport <- mergePortsBiased ports     -- 1
  waitForPongs oneport ps               -- 2

  say "All pongs successfully received"

waitForPongs :: ReceivePort ProcessId -> [ProcessId] -> Process ()
waitForPongs _ [] = return ()
waitForPongs port ps = do
  pid <- receiveChan port
  waitForPongs port (filter (/= pid) ps)

Merge the ReceivePorts together into a single ReceivePort.


Now we need a loop to wait for the responses, which is written as a separate function waitForPongs. Each message received from the channel removes the corresponding ProcessId from the list until all the spawned processes have responded.

Handling Failure

One of the important benefits provided by the distributed-process framework is handling and recovering from failure. Failure is a fact of life in distributed computing, and we should be prepared for the possibility that any of our processes might fail at any time, whether due to network outage, a hardware crash, or software faults.

Here is a basic example showing how the failure of one process can be caught and acted upon by another process. In the original ping example from Defining a Message Type, recall that the Message type has two constructors:

data Message = Ping ProcessId
             | Pong ProcessId

and the code for pingServer matches explicitly on the Ping constructor:


pingServer :: Process ()
pingServer = do
  Ping from <- expect
  say $ printf "ping received from %s" (show from)
  mypid <- getSelfPid
  send from (Pong mypid)

What will happen if the message is a Pong, rather than a Ping? Both messages have the type Message, so expect cannot distinguish them; if the context requires a message of type Message, expect can return either a Ping or a Pong. Clearly, if expect returns a Pong here, then the pattern match against Ping will fail, and as usual in Haskell this throws an exception. Since there are no exception handlers, the exception will result in the termination of the pingServer process.

There are ways to prevent the error, of course, but for now let’s see how we can catch this failure from another process. We’ll use withMonitor, which has the following signature:

withMonitor :: ProcessId -> Process a -> Process a

withMonitor takes a ProcessId to monitor and an action to perform. During the action, if the specified process fails in any way, a special message of type ProcessMonitorNotification is sent to the current process.

To wait for either the ProcessMonitorNotification message or a Pong, we need to know how to wait for different types of message at the same time. The basic pattern for this is as follows:

    [ match $ \p -> do ...
    , match $ \q -> do ...

where p and q are patterns that match different types of message. The types of these functions are shown here:

receiveWait    ::        [Match b] -> Process b
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)

match   :: Serializable a =>                (a -> Process b) -> Match b
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b

The function receiveWait waits until any of the match functions applies to a message in the queue, and then executes the associated action. The receiveTimeout operation is similar, but instead of waiting indefinitely for a matching message, it takes a time in milliseconds and returns Nothing if a matching message did not arrive before the time.

Here is how we monitor the pingServer process and then wait for either a Pong message or a ProcessMonitorNotification:


  withMonitor pid $ do
    send pid (Pong mypid)               -- 1
      [ match $ \(Pong _) -> do
         say "pong."
      , match $ \(ProcessMonitorNotification _ref deadpid reason) -> do
         say (printf "process %s died: %s" (show deadpid) (show reason))

Note that we deliberately send the child a Pong message (1) to cause it to fail. Running the program results in this:

pid://localhost:44444:0:3: spawning on nid://localhost:44444:0
pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4
pid://localhost:44444:0:3: process pid://localhost:44444:0:4 died:
  DiedException "user error (Pattern match failure in do expression at

The third log message indicates that the master received the notification of the failed process, and gives the details of the failure: a pattern-match error, as we expected.

It is worth asking whether having a single Message data type for our messages was a good idea in the first place. Perhaps we should have made separate types, as in:

newtype Pong = Pong ProcessId
newtype Ping = Ping ProcessId

The choice comes down to whether we are using typed channels or not. With typed channels, we could use only a single message type, whereas using the per-process dynamically typed channel with send and expect or receiveWait, we could use multiple message types. Having one type for each message would avoid the possibility of a pattern-match failure when matching on a message, but unless we also have a catch-all case to match unrecognized messages, the other messages could be left in the queue forever, which could amount to an undetected error or deadlock. So there might well be cases where we want to match both messages because one is definitely an error, and so using a single message type would help ensure that we always match on all the possible messages.

The more appropriate choice depends on the particular circumstances in your application.

A summary of the API for process monitoring follows:

monitor     :: ProcessId -> Process MonitorRef
unmonitor   :: MonitorRef -> Process ()
withMonitor :: ProcessId -> Process a -> Process a

data ProcessMonitorNotification
  = ProcessMonitorNotification MonitorRef ProcessId DiedReason

data MonitorRef -- abstract

data DiedReason
  = DiedNormal             -- Normal termination
  | DiedException !String  -- The process exited with an exception
  | DiedDisconnect         -- We got disconnected from the process node
  | DiedNodeDown           -- The process node died
  | DiedUnknownId          -- Invalid (process/node/channel) identifier

In addition to the withMonitor function mentioned earlier, a process can also be monitored by calling the monitor function. This function returns a token of type MonitorRef, which can be passed to unmonitor to stop monitoring the process again. In general, it is better to use withMonitor than the monitor and unmonitor pair if possible, because withMonitor will automatically stop monitoring the remote process in the event of an exception. However, sometimes withMonitor doesn’t fit the control flow, which is when monitor and unmonitor are useful.

The Philosophy of Distributed Failure

In a distributed system, parts of the running program may fail at any time due to circumstances beyond our control. Such a failure typically results in one or more of the processes in our network becoming disconnected without warning; there is no exception and no opportunity to clean up whatever it was doing. Perhaps the hardware it was running on failed, or the network on which we were communicating with it stopped working.

A far-reaching approach for such failures can be seen in Erlang, a programming language with distributed programming at its heart. The only mechanism for communication is message passing, so every concurrent Erlang program is fundamentally distributable. The Erlang designers promote a particular philosophy for dealing with failure, often known by its catchphrase: “Let it crash.” The basic principle is that since in a distributed system we must already be prepared for a process to simply disappear, we might as well deal with all kinds of failure in this way because doing so makes failure handling much simpler. And since failure handling is difficult to test, making it simpler is highly desirable.

Concretely, instead of trying to enumerate local failure conditions and handle them in some way, we can just let them propagate to the top of the process and let the process die. The distributed program must be prepared for this eventuality already (since this is a distributed system), so the system will recover in some way: perhaps by restarting the failed process in some known-good state and logging the failure somewhere.

Thus the granularity at which we have to consider failure is the process, and we can design our applications such that individual processes can fail without catastrophic consequences. A process will probably have some internal state that is lost when it dies, but the parent should know how to construct the initial state to restart the process or to propagate the failure to a higher layer that can.

A Distributed Chat Server

In A Chat Server, we built a multithreaded chat server using Concurrent Haskell and STM. In this section, we will extend the chat server to be distributed. The server will be running across multiple machines, clients may connect to any of the machines, and any client will be able to chat with any other client connected via any of the servers. Essentially, the distributed chat server will behave just like the single-threaded server (minus some subtle differences that we will discuss shortly), except that clients have a choice of machines to connect to.

A distributed chat network saves bandwidth. For example, suppose we set up a chat network with two servers A and B on each side of the Atlantic Ocean. Each server has a large number of clients connected, with each client connecting to its closest server. When a client on server A broadcasts a message, it needs to be sent across the trans-Atlantic link to server B only once, and server B then forwards it to each of its connected clients. The broadcast message crosses the Atlantic only once, instead of once for each of the clients on the other side.

We have already written all the code for the multithreaded server, so it seems a shame to throw it away and rewrite it all to use distributed-process instead. Fortunately, we don’t have to do that. We can simply add some extra code to handle distribution, using the original server code nearly intact. Each client will still be managed by ordinary IO threads synchronized using STM, but additionally we will have some code communicating with the other servers using distributed-process. In Haskell, distributed programming is not all or nothing. We can freely mix distributed and concurrent programming in the same program. This means we can take advantage of the simplicity and performance of ordinary concurrent programming on each node, while using the heavier-weight distributed interfaces for the parts of the program that need to work across multiple nodes.

In this first version, we will use a master/slave configuration in which the master will start up server instances on all the slaves once at the beginning. Later, we will consider how to modify the program so that all nodes are equal, and nodes may come and go at arbitrary times.

Data Types

We will need a few changes to the data structures compared with the multithreaded server. When one client sends a message to another client connected to a different server, we need to know where to send the message. So each server will need to keep a list of all the clients connected to any server in the network, along with the server to which the client is connected. The information about a client now has two possibilities: either it is a local client (connected to this server), or a remote client (connected to a different server).


type ClientName = String

data Client
  = ClientLocal   LocalClient
  | ClientRemote  RemoteClient

data RemoteClient = RemoteClient
       { remoteName :: ClientName
       , clientHome :: ProcessId

data LocalClient = LocalClient
       { localName      :: ClientName
       , clientHandle   :: Handle
       , clientKicked   :: TVar (Maybe String)
       , clientSendChan :: TChan Message

clientName :: Client -> ClientName
clientName (ClientLocal  c) = localName c
clientName (ClientRemote c) = remoteName c

newLocalClient :: ClientName -> Handle -> STM LocalClient
newLocalClient name handle = do
  c <- newTChan
  k <- newTVar Nothing
  return LocalClient { localName      = name
                     , clientHandle   = handle
                     , clientSendChan = c
                     , clientKicked   = k

LocalClient is what we previously called Client, and RemoteClient is a client connected to another server. The Client type is now a disjunction of these two, with constructors ClientLocal and ClientRemote.

The Message type is as before, except that we need to derive Typeable and Binary, because Messages will be sent over the network:

data Message = Notice String
             | Tell ClientName String
             | Broadcast ClientName String
             | Command String
  deriving (Typeable, Generic)

instance Binary Message

Servers need to communicate with one another, and the kinds of messages they need to send are richer than Message. For example, servers need to tell one another when a new client connects, or one client kicks another. So we have a new type for messages sent between servers, which we call PMessage:

data PMessage
  = MsgServers            [ProcessId]
  | MsgSend               ClientName Message
  | MsgBroadcast          Message
  | MsgKick               ClientName ClientName
  | MsgNewClient          ClientName ProcessId
  | MsgClientDisconnected ClientName ProcessId
  deriving (Typeable, Generic)

instance Binary PMessage

Most of these are self-explanatory, except for one: MsgServers is a special message sent to each server node when it starts up, telling it the ProcessIds of all the server nodes in the network.

The Server type previously contained only the mapping from ClientName to Client, but now it needs some more information:

data Server = Server
  { clients   :: TVar (Map ClientName Client)
  , proxychan :: TChan (Process ())
  , servers   :: TVar [ProcessId]
  , spid      :: ProcessId

newServer :: [ProcessId] -> Process Server
newServer pids = do
  pid <- getSelfPid
  liftIO $ do
    s <- newTVarIO pids
    c <- newTVarIO Map.empty
    o <- newTChanIO
    return Server { clients = c, servers = s, proxychan = o, spid = pid }

clients is the client mapping, as before; servers is the list of other server ProcessIds, and spid is the ProcessId of this server (for convenience).

The proxychan field pertains to an added bit of complexity in our distributed architecture. Remember that we are leaving as much of the existing server infrastructure intact as possible; that means the existing server threads are ordinary forkIO threads. A forkIO thread cannot perform operations in the Process monad, yet we certainly need to be able to do that somehow because certain actions by a client must trigger communication with other servers in the network. So the trick we use is a proxy, which is a process that reads actions from a TChan and performs them in the Process monad. To have a Process action performed from an IO thread, we simply queue it on the proxy TChan. Each server has a single proxy channel, created when the server starts up and stored in the proxychan field of Server.

Sending Messages

Next, we need a few small utilities. First, a way to send a Message to a LocalClient:

sendLocal :: LocalClient -> Message -> STM ()
sendLocal LocalClient{..} msg = writeTChan clientSendChan msg

The following function, sendRemote, sends a PMessage to a remote server. To do this, it needs to use the proxychan (which it gets from the Server) and it needs the pid of the destination process:

sendRemote :: Server -> ProcessId -> PMessage -> STM ()
sendRemote Server{..} pid pmsg = writeTChan proxychan (send pid pmsg)

Now that we can send both local and remote messages, we can define sendMessage, which sends a Message to any client:

sendMessage :: Server -> Client -> Message -> STM ()
sendMessage server (ClientLocal client) msg =
    sendLocal client msg
sendMessage server (ClientRemote client) msg =
    sendRemote server (clientHome client) (MsgSend (remoteName client) msg)

A variant sends a message to a named client or returns False if the client is not connected:

sendToName :: Server -> ClientName -> Message -> STM Bool
sendToName server@Server{..} name msg = do
    clientmap <- readTVar clients
    case Map.lookup name clientmap of
        Nothing     -> return False
        Just client -> sendMessage server client msg >> return True


Next, we consider broadcasting messages. First, we need a way to send a PMessage to all the connected servers:

sendRemoteAll :: Server -> PMessage -> STM ()
sendRemoteAll server@Server{..} pmsg = do
    pids <- readTVar servers
    mapM_ (\pid -> sendRemote server pid pmsg) pids

We also need a broadcastLocal function that sends a message to the local clients only:

broadcastLocal :: Server -> Message -> STM ()
broadcastLocal server@Server{..} msg = do
    clientmap <- readTVar clients
    mapM_ sendIfLocal (Map.elems clientmap)
    sendIfLocal (ClientLocal c)  = sendLocal c msg
    sendIfLocal (ClientRemote _) = return ()

This function works by calling an auxiliary function sendIfLocal on each of the clients, which calls sendLocal if the client is local and does nothing if the client is remote.

Putting sendRemoteAll and broadcastLocal together, we can broadcast a Message to everyone:

broadcast :: Server -> Message -> STM ()
broadcast server@Server{..} msg = do
    sendRemoteAll server (MsgBroadcast msg)
    broadcastLocal server msg


The rest of the local server code is almost identical to that in A Chat Server, so we don’t reproduce it here. The only important differences are that we need to inform other servers whenever a client connects or disconnects by calling sendRemoteAll with a MsgNewClient or MsgClientDisconnected respectively.

The interesting part is how we handle distribution. Previously, the main function was responsible for setting up the network socket and accepting new connections. This is now delegated to a function socketListener, which is otherwise identical to the previous main:

socketListener :: Server -> Int -> IO ()
socketListener server port = withSocketsDo $ do
  sock <- listenOn (PortNumber (fromIntegral port))
  printf "Listening on port %d\n" port
  forever $ do
      (handle, host, port) <- accept sock
      printf "Accepted connection from %s: %s\n" host (show port)
      forkFinally (talk server handle)
                  (\_ -> hClose handle)

We need a function to implement the proxy, described above in Sending Messages. All it does is repeatedly read Process () values from the proxychan and execute them:

proxy :: Server -> Process ()
proxy Server{..} = forever $ join $ liftIO $ atomically $ readTChan proxychan

Now, the chatServer function is the main Process () action that implements a chat server:

chatServer :: Int -> Process ()
chatServer port = do
  server <- newServer []
  liftIO $ forkIO (socketListener server port)           -- 1
  spawnLocal (proxy server)                              -- 2
  forever $ do m <- expect; handleRemoteMessage server m -- 3

Starts up the socketListener thread.


Creates the proxy. Note here that we use spawnLocal, which is like spawn except that the new process is always created on the current node. This means that the computation to be spawned doesn’t need to be serialized, so spawnLocal takes an ordinary Process value rather than a Closure, which makes it easier to use.


Repeatedly grabs the next message and calls handleRemoteMessage (defined next) to act on it.

handleRemoteMessage :: Server -> PMessage -> Process ()
handleRemoteMessage server@Server{..} m = liftIO $ atomically $
  case m of
    MsgServers pids  -> writeTVar servers (filter (/= spid) pids) -- 1
    MsgSend name msg -> void $ sendToName server name msg         -- 2
    MsgBroadcast msg -> broadcastLocal server msg                 -- 3
    MsgKick who by   -> kick server who by                        -- 4

    MsgNewClient name pid -> do                                   -- 5
        ok <- checkAddClient server (ClientRemote (RemoteClient name pid))
        when (not ok) $
          sendRemote server pid (MsgKick name "SYSTEM")

    MsgClientDisconnected name pid -> do                          -- 6
         clientmap <- readTVar clients
         case Map.lookup name clientmap of
            Nothing -> return ()
            Just (ClientRemote (RemoteClient _ pid')) | pid == pid' ->
              deleteClient server name
            Just _ ->
              return ()

The special MsgServers message is sent once at startup to tell each server the ProcessIds of all the servers in the network. This is used to set the servers field of Server.

2 3 4

MsgSend, MsgBroadcast, and MsgKick are straightforward. They cause the appropriate action to take place just as if a local client had initiated it.


MsgNewClient indicates that a client has connected to a remote server. We attempt to add the remote client to the local state, but it may be that this server already has a client with the same name. Unlike in the single server case where we relied on STM to ensure that inconsistencies like this could never arise, in a distributed system there is no global consistency. So we have to handle the case where two clients connect at the same time on different servers. The method we choose here is simple but brutal: reply with a MsgKick to kick the other client. It is likely that the remote server will simultaneously do the same, so both clients will end up being kicked, but at least the inconsistency is resolved, and this case will be rare in practice.


MsgClientDisconnected is not difficult, but we do have to be careful to check that the client being disconnected is in fact the correct client, just in case an inconsistency has arisen (in particular, this might be the response to the MsgKick initiated by the MsgNewClient case just shown).

Now that the server code is in place, we just need to write the code to start up the whole distributed network. The main function invokes master on the master node:

port :: Int
port = 44444

master :: [NodeId] -> Process ()
master peers = do

  let run nid port = do
         say $ printf "spawning on %s" (show nid)
         spawn nid ($(mkClosure 'chatServer) port)

  pids <- zipWithM run peers [port+1..]
  mypid <- getSelfPid
  let all_pids = mypid : pids
  mapM_ (\pid <- send pid (MsgServers)) all_pids

  chatServer port

main = distribMain master Main.__remoteTable

The master function is fairly straightforward. It spawns chatServer on each of the slaves, using increasing port numbers, and then sends a MsgServers message to each server process containing a list of all the server ProcessIds.[62]

Testing the Server

We can start up a few nodes on a single machine like so:

$ ./chat slave 55551 & ./chat slave 55552 & ./chat master 55553
pid://localhost:55553:0:3: spawning on nid://localhost:55552:0
pid://localhost:55553:0:3: spawning on nid://localhost:55551:0
Listening on port 44444
Listening on port 44445
Listening on port 44446

(Remember the port numbers given on the command line are the ports used by the distributed-process framework; the ports that the chat server listens to are hardcoded to 44444, 44445, …)

Then connect to one of the nodes:

$ nc localhost 44445
What is your name?
*** Fred has connected

And connect to a different node:

$ nc localhost 44446
What is your name?
*** Bob has connected
<Bob>: hi

We should now see the new activity on the first connection:

*** Bob has connected
<Bob>: hi

Failure and Adding/Removing Nodes

Our distributed server works only with a fixed set of nodes, which makes it quite limited. In practice, we want to be able to add and remove nodes from the network at will. Nodes will disconnect due to network and hardware outages, and we would like to be able to add new nodes without restarting the entire network.

My sketch implementation can be found in distrib-chat/chat-noslave.hs, but you might want to try implementing this for yourself. Some hints on how to go about it follow.

We need to abandon the master/slave architecture; every node will be equal. Instead of using our DistribUtils module, we can use the following sequence to initialize the simplelocalnet backend and start up a node:


main = do
 [port, chat_port] <- getArgs
 backend <- initializeBackend "localhost" port
                              (Main.__remoteTable initRemoteTable)
 node <- newLocalNode backend
 Node.runProcess node (master backend chat_port)

Now the function master has type Backend -> String -> Process () and runs on every node. The outline of the rest of the implementation is as follows:

  1. When a node starts up, it calls findPeers to get the other nodes in the network.

    findPeers :: Backend -> Int {- timeout -} -> IO [NodeId]
  2. It registers the current process as "chatServer" on the local node using the register function:

    register :: String -> ProcessId ->
    Process ()
  3. Next we call whereisRemoteAsync for each of the other nodes, asking for the ProcessId of "chatServer".

    whereisRemoteAsync :: NodeId -> String -> Process ()

    The remote node will respond with a WhereIsReply:

    data WhereIsReply = WhereIsReply String (Maybe ProcessId)

    We won’t wait for the reply immediately; it will be received along with other messages in the main message loop.

  4. Then we start up the chatServer as before, but now we need to also handle WhereIsReply messages. When one of these messages is received, if it indicates that we found a "chatServer" process on another node, then we move on to the next step.
  5. Send that ProcessId a message to tell it that we have joined the network. This is a new PMessage that we call MsgServerInfo. It contains the current ProcessId and the list of local clients we have (because clients may have already connected by now).
  6. On receipt of a MsgServerInfo, add that ProcessId to the servers list if it isn’t already there.
  7. Add the information about the remote clients to the state. There may need to be some conflict resolution at this point if the remote server has clients with the same names as clients that we already know about.
  8. If the new server is not already known to us, then we should respond with a MsgServerInfo of our own to tell the other server which local clients are on this server.
  9. Start monitoring the remote process. Then we can be informed when the remote process dies and remove its clients from our local state.

Exercise: A Distributed Key-Value Store

A key-value store is a simple database that supports only operations to store and retrieve values associated with keys. Key-value stores have become popular over recent years because they offer scalability advantages over traditional relational databases in exchange for supporting fewer operations (in particular, they lack database joins).

This exercise is to use the distributed-process framework to implement a distributed fault-tolerant key-value store (albeit a very simplistic one).

The interface exposed to clients is the following:

type Database
type Key   = String
type Value = String

createDB :: Process Database
set      :: Database -> Key -> Value -> Process ()
get      :: Database -> Key -> Process (Maybe Value)

Here, createDB creates a database, and set and get perform operations on it. The set operation sets the given key to the given value, and get returns the current value associated with the given key or Nothing if the key has no entry.

Part 1. In distrib-db/db.hs, I supplied a sample main function that acts as a client for the database, and you can use this to test your database. The skeleton for the database code itself is in Database.hs in the same directory. The first exercise is to implement a single-node database by modifying Database.hs. That is:

  • createDB should spawn a process to act as the database. It can spawn on the current node.
  • get and set should talk to the database process via messages; you need to define the message type and the operations.

When you run db.hs, it will call createDB to create a database and then populate it using the Database.hs source file itself. Every word in the file is a key that maps to the word after it. The client will then look up a couple of keys and then go into an interactive mode where you can type in keys that are looked up in the database. Try it out with your database implementation and satisfy yourself that it is working.

Part 2. The second stage is to make the database distributed. In practice, the reason for doing this is to store a database much larger than we can store on a single machine and still have fast access to all of it.

The basic plan is that we are going to divide up the key space uniformly and store each portion of the key space on a separate node. The exact method used for splitting up the key space is important in practice because if you get it wrong, then the load might not be well-balanced between the nodes. For the purposes of this exercise, though, a simple scheme will do: take the first character of the key modulo the number of workers.

There will still be a single process handling requests from clients, so we still have type Database = ProcessId. However, this process needs to delegate requests to the correct worker process according to the key:

  • Arrange to start worker processes on each of the nodes. The list of nodes in the network is passed to createDB.
  • Write the code for the worker process. You probably need to put it in a different module (e.g., called Worker) due to restrictions imposed by Template Haskell. The worker process needs to maintain its own Map and handle get and set requests.
  • Make the main database process delegate operations to the correct worker. You should be able to make the worker reply directly to the original client rather than having to forward the response from the worker back to the client.

Compile db.hs against your distributed database to make sure it still works.

Part 3. Make the main database process monitor all the worker processes. Detect failure of a worker and emit a message using say. You will need to use receiveWait to wait for multiple types of messages; see the ping-fail.hs example for hints.

Note that we can’t yet do anything sensible if a worker dies. That is the next part of the exercise.

Part 4. Implement fault tolerance by replicating the database across multiple nodes.

  • Instead of dividing the key space evenly across workers, put the workers in pairs and give each pair a slice of the key space. Both workers in the pair will have exactly the same data.
  • Forward requests to both workers in the pair (it doesn’t matter that there will be two responses in the case of a get).
  • If a worker dies, you will need to remove the worker from your internal list of workers so that you don’t try to send it messages in the future.[63]

This should result in a distributed key-value store that is robust to individual nodes going down, as long as we don’t kill too many nodes too close together. Try it out—kill a node while the database is running and check that you can still look up keys.

A sample solution can be found in distrib-db/DatabaseSample.hs and distrib-db/WorkerSample.hs.

[50] Also known as “Cloud Haskell.”

[51] This is also known as the actor model.

[52] The distributed-process package is in fact the second implementation of these ideas, the first prototype being the remote package.

[53] For example, meta-par and HdpH.

[54] As of binary version

[55] As of GHC version 7.2.1.

[56] We expect that in the future, GHC will provide syntactic sugar to make remote code execution easier.

[57] Template Haskell is a feature provided by GHC that allows Haskell code to be manipulated and generated at compile time. For more details, see the GHC User’s Guide.

[58] The log messages produced by say are normally prefixed by a timestamp, but I have omitted the timestamps here for clarity.

[59] The default port is chosen by our distribMain wrapper, not the distributed-process framework.

[60] Indeed, some of Erlang’s libraries use exactly this technique.

[61] The current implementation of channels uses STM, and channels are merged using orElse.

[62] This is mainly so that we can test the server on a single machine; in practice, you would want to choose the port number via a command-line option or some other method.

[63] A real fault-tolerant database would restart the worker on a new node and copy the database slice from its partner. The solution provided in this book doesn’t do this, but by all means have a go at doing it.

The best content for your career. Discover unlimited learning on demand for around $1/day.