Chapter 4. Concurrent Programming

Concurrency is the ability for different functions to execute in parallel without affecting each other unless explicitly programmed to do so. Each concurrent activity in Erlang is called a process. The only way for processes to interact with each other is through message passing, where data is sent from one process to another. The philosophy behind Erlang and its concurrency model is best described by Joe Armstrong’s tenets:

  • The world is concurrent.

  • Things in the world don’t share data.

  • Things communicate with messages.

  • Things fail.

The concurrency model and its error-handling mechanisms were built into Erlang from the start. With lightweight processes, it is not unusual to have hundreds of thousands, even millions, of processes running in parallel, often with a small memory footprint. The ability of the runtime system to scale concurrency to these levels directly affects the way programs are developed, differentiating Erlang from other concurrent programming languages.

What if you were to use Erlang to write an instant messaging (IM) server, supporting the transmission of messages between thousands of users in a system such as Google Talk or Facebook? The Erlang design philosophy is to spawn a new process for every event so that the program structure directly reflects the concurrency of multiple users exchanging messages. In an IM system, an event could be a presence update, a message being sent or received, or a login request. Each process will service the event it handles, and terminate when the request has been completed.

You could do the same in C or Java, but you would struggle when scaling the system to hundreds of thousands of concurrent events. An option might be to have a pool of processes handling specific event types or particular users, but certainly not a new process for every event. Erlang gets away with this because it does not use native threads to represent processes. It has its own scheduler in the virtual machine (VM), making the creation of processes very efficient while at the same time minimizing their memory footprint. This efficiency is maintained regardless of the number of concurrent processes in the system. The same argument applies for message passing, where the time to send a message is negligible and constant, regardless of the number of processes. This chapter introduces concurrent programming in Erlang, letting you in on one of the most powerful concurrency models available today.

Creating Processes

So far, we’ve looked at executing sequential code in a single process. To run concurrent code, you have to create more processes. You do this by spawning a process using the spawn(Module, Function, Arguments) BIF. This BIF creates a new process that evaluates the Function exported from the module Module with the list of Arguments as parameters. The spawn/3 BIF returns a process identifier, which from now on we will refer to as a pid.

In Figure 4-1, the process we call Pid1 executes the spawn BIF somewhere in its program. This call results in the new process with process identifier Pid2 being created. Process identifier Pid2 is returned as a result of the call to spawn, and will typically be bound to a variable in an expression of the following format:

Pid2 = spawn(Module, Function, Arguments).
Before and after calling spawn
Figure 4-1. Before and after calling spawn

The pid of the new process, Pid2, at this point is known only within the process Pid1, as it is a local variable that has not been shared with anybody. The spawned process starts executing the exported function passed as the second argument to the BIF, and the arity of this function is dictated by the length of the list passed as the third argument to the BIF.

Warning

A common error when you start programming Erlang is to forget that the third argument to spawn is a list of arguments, so if you want to spawn the function m:f/1 with the argument a, you need to call:

spawn(m, f, [a])

not:

not spawn(m, f, a).

Once spawned, a process will continue executing and remain alive until it terminates. If there is no more code to execute, a process is said to terminate normally. On the other hand, if a runtime error such as a bad match or a case failure occurs, the process is said to terminate abnormally.

Spawning a process will never fail, even if you are spawning a nonexported or even a nonexistent function. As soon as the process is created and spawn/3 returns the pid, the newly created process will terminate with a runtime error:

1> spawn(no_module, nonexistent_function, []).
<0.32.0>

=ERROR REPORT==== 29-Feb-2008::21:48:29 ===
Error in process <0.32.0> with exit value: 
  {undef,[{no_module,nonexistent_function,[]}]}

In the preceding example, note how the error report is formatted. It is different from the ones you saw previously, as the error does not take place in the shell, but in the process with pid <0.32.0>. If the error occurs in a spawned process, it is detected by another part of the Erlang runtime system called the error logger, which by default prints an error report in the shell using the format shown earlier. Errors detected by the shell are instead formatted in a more readable form.

The processes() BIF returns a list of all of the processes running in the system. In most cases, you should have no problems using the BIF, but there have been extreme situations in large systems where calling processes() from the shell has been known to result in the runtime system running out of memory![13] Don’t forget that in industrial applications, you might be dealing with millions of processes running concurrently. In the current implementation of the runtime system, the absolute limit is in the hundreds of millions. Check the Erlang documentation for the latest figures. The default number is much lower, but you can easily change it by starting the Erlang shell with the command erl +P MaxProcceses, where MaxProcesses is an integer.

You can use the shell command i() to find out what the currently executing processes in the runtime system are doing. It will print the process identifier, the function used to spawn the process, the function in which the process is currently executing, as well as other information covered later in this chapter. Look at the example in the following shell printout. Can you spot the process that is running as the error logger?

2> processes().
[<0.0.0>,<0.2.0>,<0.4.0>,<0.5.0>,<0.7.0>,<0.8.0>,<0.9.0>,
 <0.10.0>,<0.11.0>,<0.12.0>,<0.13.0>,<0.14.0>,<0.15.0>,
 <0.17.0>,<0.18.0>,<0.19.0>,<0.20.0>,<0.21.0>,<0.22.0>,
 <0.23.0>,<0.24.0>,<0.25.0>,<0.26.0>,<0.30.0>]
3> i().
Pid                   Initial Call                          Heap     Reds Msgs
Registered            Current Function                     Stack
<0.0.0>               otp_ring0:start/2                      987     2684    0
init                  init:loop/1                              2
<0.2.0>               erlang:apply/2                        2584    61740    0
erl_prim_loader       erl_prim_loader:loop/3                   5
<0.4.0>               gen_event:init_it/6                    610      219    0
error_logger          gen_event:fetch_msg/5                   11
<0.5.0>               erlang:apply/2                        1597      508    0
...

If you are wondering why the processes() BIF returned far more than 20 processes when you created only one that failed right after being spawned, you are not alone. Large parts of the Erlang runtime system are implemented in Erlang, the error_logger and the Erlang shell being two of the many examples. You will come across other system processes as you work your way through the remaining chapters of this book.

Message Passing

Processes communicate with each other using message passing. Messages are sent using the Pid ! Message construct, where Pid is a valid process identifier and Message is a value from any Erlang data type (see Figure 4-2).

Message passing
Figure 4-2. Message passing

Each Erlang process has a mailbox in which incoming messages are stored. When a message is sent, it is copied from the sending process into the recipient’s mailbox for retrieval. Messages are stored in the mailbox in the order in which they are delivered. If two messages are sent from one process to another, the messages are guaranteed to be received in the same order in which they are sent. This guarantee is not extended to messages sent from different processes, however, and in this case the ordering is VM-dependent.

Sending a message will never fail; so if you try sending a message to a nonexistent process, it is thrown away without generating an error. Finally, message passing is asynchronous: a sending process will not be suspended after sending a message; it will instead immediately continue executing the next expression in its code.

To test sending messages in the shell, let’s use the self/0 BIF, which returns the pid of the process in which it is evaluated. The Erlang shell is nothing other than an Erlang process in a read-evaluate-print loop, waiting for you to type in an expression. When you terminate an expression followed by a full stop (.) and press Enter, the shell evaluates what you typed in and prints out a result. Since the shell is an Erlang process, there is nothing stopping us from sending messages to it. To retrieve and display all the messages sent to the shell process, and therefore currently held in the process mailbox, you can use the shell command flush/0, which also has the effect of removing (or flushing) those messages from the mailbox:

1> Pid = self().
<0.30.0>
2> Pid ! hello.
hello
3> flush().
Shell got hello
ok
4> <0.30.0> ! hello.
* 1: syntax error before: '<'
5> Pid2 = pid(0,30,0).
<0.30.0>
6> Pid2 ! hello2.
hello2
7> flush().
Shell got hello2
ok

What is happening in the preceding example? In command 1, the BIF self() returns a pid, which in the shell is bound to the variable Pid and displayed as <0.30.0>. In commands 2 and 3 you see the message being sent to the Pid, and then flushed from the mailbox, using the flush() command in the shell.

You cannot type pids directly in a module or in the shell, as in both cases, they result in a syntax error; this is shown for the shell in command 4. You need either to bind the process identifiers to a variable when BIFs such as self and spawn return them, or generate a pid using the pid/3 shell function, as shown in command 5 and used in command 6. The flush() in command 7 shows that the message indeed went to the shell process.

Pid ! Message is a valid Erlang expression, and as with all valid expressions in Erlang, it has to return a value. The value, in this case, is the message sent. So if, for example, you need to send the same message to many processes, you can write either a sequence of message sends, such as Pid1!Msg,Pid2!Msg,Pid3!Msg, or a single expression, such as Pid3!Pid2!Pid1!Message, which is equivalent to writing Pid3!(Pid2!(Pid1!Message)), where Pid1!Message returns the message to send to Pid2, which in turn returns the message to be sent to Pid3.

As we already said, sending messages to nonexistent processes will always succeed. To test this, let’s make the shell process crash with an illegal operation. Crashing is the same as an abnormal process termination, something that is considered normal in Erlang, in the sense that Erlang provides mechanisms to deal with it. We will cover abnormal process terminations in more detail in the next chapter, so until then, do not get alarmed. Making the shell crash will automatically result in a new shell process—in this example with pid <0.38.0>—being spawned by the runtime system.

With this in mind, we locate the shell pid, make the shell process terminate, and then send a message to it. Based on the semantics of message passing, this will result in the message being thrown away:

7> self().
<0.30.0>
8> 1/0.
** exception error: bad argument in an arithmetic expression
     in operator  '/'/2
        called as 1 / 0
9> self().
<0.38.0>
10> pid(0,30,0) ! hello.
hello
11> flush().
ok

The reason that message passing and spawn always succeed, even if the recipient process does not exist or the spawned process crashes on creation, has to do with process dependencies, or rather, their deliberate lack of dependencies. We say that process A depends on process B when the fact of B terminating can prevent A from functioning correctly.

Process dependencies are very important and will often influence your design. In massively concurrent systems, you do not want processes to depend on each other unless explicitly specified, and in such cases, you want to have as few dependencies as possible. To give a concrete example of this, imagine an IM server concurrently handling thousands of messages being exchanged by its users. Each message is handled by a process spawned for that particular function. If, due to a bug, one of these processes terminates, you would lose that particular message. Ensuring a lack of dependency between this process and the processes handling all the other messages guarantees that these messages are safely processed and delivered to their recipients regardless of the bug.

Receiving Messages

Messages are retrieved from the process mailbox using the receive clause. The receive clause is a construct delimited by the reserved words receive and end, and contains a number of clauses. These clauses are similar to case clauses, with a pattern in the head (to the left of the arrow) and a sequence of expressions in the body (to the right).

On executing the receive statement, the first (and oldest) message in the mailbox is pattern-matched against each pattern in the receive expression in turn:

  • If a successful match occurs, the message is retrieved from the mailbox, the variables in the pattern are bound to the matching parts of the message, and the body of the clause is executed.

  • If none of the clauses matches, the subsequent messages in the mailbox are pattern-matched one by one against all of the clauses until either a message matches a clause or all of the messages have failed all of the possible pattern matches.

In the following example, if the message {reset, 151} is sent to the process executing the receive statement, the first clause will pattern-match, resulting in the variable Board being bound to the integer 151. This will result in the function reset(151) being called:

receive
  {reset, Board} -> reset(Board);
  _Other         -> {error, unknown_msg}
end

Assume now that two new messages—restart and {reset, 151}—are received in that order by the executing process. As soon as the execution flow of the process reaches the receive statement, it will try to match the oldest message in the mailbox, restart. Pattern matching will fail in the first clause, but will match successfully in the second, binding the variable _Other to the atom restart. You can follow this example in Figure 4-3.

Selective receives
Figure 4-3. Selective receives

A receive statement will return the last evaluated expression in the body of the matched clause. In the example, this will be either the return value of the reset/1 call or the tuple {error, unknown_msg}. Although rarely done, it is possible to write expressions such as Result = receive Msg -> handle(Msg) end, in which a variable (here, Result) is bound to the return value of the receive clause. It is considered a better practice to make the receive clause the body of a separate function, and to bind the return value of the function to the variable.

Warning

Take a minute to think about the restart message in the example. Other than being a good practice, nothing is stopping us from sending the message in the format {restart}. It is a common misconception that messages have to be sent in tuples: this is not the case, as messages can consist of any valid Erlang term.

Using tuples with only one element is unnecessary, as they consume more memory and are slower to process. It is considered a bad practice to use them not only as messages, but also as terms in your programs. The guideline is that if your tuple has only one element, use the element on its own, without the tuple construct.

When none of the clauses in a case statement match, a runtime error occurs. What happens in receive statements? The syntax and semantics of receive are very similar to those of case, the main difference being that the process is suspended in receive statements until a message is matched, whereas in a case statement a runtime error occurs.

In general, a receive statement has the following form:

receive
  Pattern1 when Guard1  -> exp11, .., exp1n;
  Pattern2 when Guard2  -> exp21, .., exp2n;
  ...
  Other                 -> expn1, .., expnn
end

The keywords used to delimit a receive clause are receive and end. Each pattern consists of any valid Erlang term, including bound and unbound variables as well as optional guards. The expressions are valid Erlang terms or legal expressions that evaluate to terms. The return value of the receive clause will be the return value of the last evaluated expression in the body executed, in this case, expin.

To ensure that the receive statement always retrieves the first message in the mailbox you could use an unbound variable (such as Other in the first example) or the “don’t care” variable if you are not interested in its value.

Selective and Nonselective Receives

Look at Figure 4-4. What can you deduce from the incoming messages in the receive clause about Pid being already bound? The variable Pid is bound to the value passed to decode_digit when it is called, so it is already bound to this value in the pattern part of the receive clause. On receiving a message, a successful match will occur only if the first element of the tuple of size 2 sent as a message is exactly equal to (remember the =:= construct?) the value stored in the variable Pid.

Selective receives with bound variables
Figure 4-4. Selective receives with bound variables

We call this a selective receive, where we retrieve only the messages we are explicitly interested in based on certain criteria, leaving the remaining messages in the mailbox. Selective receives often select on process identifiers, but sequence references or other identifiers are also common, as are tagged tuples and guards. Now, contrast the bound variable Pid in Figure 4-4 with the unbound variable DigitList in Figure 4-5, which will be bound only once a message has been received.

Selective reception of multiple messages
Figure 4-5. Selective reception of multiple messages

In concurrent systems, it is common for race conditions to occur. A race condition occurs when the behavior of a system depends on the order in which certain events occur: these events “race” to influence the behavior. Due to the indeterminate nature of concurrency it is not possible to determine the order in which messages sent by different processes will arrive at the receiving process. This is when selective receive becomes crucial.

In Figure 4-5, Pid3 will receive the message foo followed by bar regardless of the order in which they are delivered. Selective receive of multiple messages is useful when synchronizing processes in a rendezvous, or when data from several sources needs to be collected before it can be processed. Contrast this with a programming language in which it is possible to process messages only in the order in which they arrive: the code for this would have to deal with the possibility of bar preceding foo and of foo preceding bar; the code becomes more complex, and much more likely to contain potential flaws.

If the order of the messages is unimportant, you can just bind them to a variable. In Figure 4-6, the first message to arrive at the process Pid3 will be processed, regardless of the order in which the messages were sent. The variable Msg in the receive statement will be bound to one of the atoms foo or bar, depending on which is delivered first.

Receipt of messages regardless of the sending order
Figure 4-6. Receipt of messages regardless of the sending order

Figure 4-7 demonstrates how processes share data with each other. The process with PidA will send a tagged tuple with its own process identifier, retrieved through a call to the BIF self(), to the process with PidB. PidB will receive it, binding PidA’s value to the variable Pid. A new tagged tuple is sent to PidC, which also pattern-matches the message in its receive statement and binds the value of PidA to the variable Pid. PidC now uses the value bound to Pid to send the message foo back to PidA. In this way, processes can share information about each other, allowing communication between processes that initially did not have knowledge of each other.

Sharing Pid data between processes
Figure 4-7. Sharing Pid data between processes

As processes do not share memory, the only way for them to share data is through message passing. Passing a message results in the data in the message being copied from the heap of the sending process to the heap of the receiving one, so this does not result in the two processes sharing a storage location (which each might read or write) but only in them each having their own copy of the data.

An Echo Example

Now that we have covered process creation and message passing, let’s use spawn, send, and receive, in a small program. Open your editor and copy the contents of Example 4-1 or download it from the book’s website. When doing so, do not forget to export the function you are spawning, in this case loop/0. In the example, pay particular notice to the fact that two different processes will be executing and interacting with each other using code defined in the same module.

Example 4-1. The echo process
-module(echo).
-export([go/0, loop/0]).

go() ->
   Pid = spawn(echo, loop, []),
   Pid ! {self(), hello},
   receive
     {Pid, Msg} ->
       io:format("~w~n",[Msg])
   end,
   Pid ! stop.


loop() ->
   receive
     {From, Msg} ->
        From ! {self(), Msg},
        loop();
     stop ->
       true
   end.

So, what does this program do? Calling the function go/0 will initiate a process whose first action is to spawn a child process. This child process starts executing in the loop/0 function and is immediately suspended in the receive clause, as its mailbox is empty. The parent, still executing in go/0, uses the Pid for the child process, which is bound as a return value from the spawn BIF, to send the child a message containing a tuple with the parent’s process identifier (given as a result of calling self()) and the atom hello.

As soon as the message is sent, the parent is suspended in a receive clause. The child, which is waiting for an incoming message, successfully pattern-matches the {Pid, Msg} tuple where Pid is matched to the process identifier belonging to the parent and Msg is matched to the atom hello. The child process uses the Pid to return the message {self(), hello} back to the parent, where this call to self() returns the pid of the child. See Figure 4-8 for a visual depiction of this process.

Sequence diagram for
Figure 4-8. Sequence diagram for Example 4-1

At this point, the parent is suspended in the receive clause, and is waiting for a message. Note that it will only pattern-match on the tuple {Pid, Msg}, where the variable Pid is already bound (as a result of the spawn BIF) to the pid of the child process. This is a good (but not entirely secure) way to ensure that the message you receive is, in fact, a message you are expecting, and not just any message consisting of a tuple with two elements sent by another process. The message arrives and is successfully pattern-matched. The atom hello is bound to the Msg variable, which is passed as an argument to the io:format/2 call, printing it out in the shell. As soon as the parent has printed the atom hello in the shell, it sends the atom stop back to the child.

What has the child been doing while the parent was busy receiving the reply and printing it? Remember that processes will terminate if they have no more code to execute, so to avoid terminating, the child called the loop/0 function recursively, suspending it in the receive clause. It receives the stop message sent to it by its parent, returns the atom true as its result, and terminates normally.

Try running the program in the shell and see what happens:

1> c(echo).
{ok,echo}
2> echo:go().
hello
stop

The atom hello is clearly the result of the io:format/2 call, but where does the atom stop come from? It is the value returned as the result of calling echo:go/0. To further familiarize yourself with concurrency, experiment with the echo example, putting io:format/2 statements in the loop/0 process and sending different messages to it. You could also experiment with the go/0 process, allowing it to send and receive more than one message. When experimenting, you will most likely get the shell to hang in a receive clause that will not match. If this happens, you will need to kill the shell and start again.

Registered Processes

It is not always practical to use pids to communicate with processes. To use a pid, a process needs to be notified of it and store its value. It is common to register processes that offer specific services with an alias, a name that can be used instead of the pid. You register a process with the register(Alias, Pid) BIF, where Alias is an atom and Pid is the process identifier. You do not have to be a parent or a child of the process to call the register BIF; you just need to know its process identifier.

Once a process has been registered, any process can send a message to it without having to be aware of its identifier (see Figure 4-9). All the process needs to do is use the Alias ! Message construct. In programs, the alias is usually hardcoded in. Other BIFs which are directly related to process registration include unregister(Pid); registered(), which returns a list of registered names; and whereis(Alias), which returns the pid associated with the Alias.

Sending a message to a registered process
Figure 4-9. Sending a message to a registered process

Look at Example 4-2, which is a variant of Example 4-1. We have removed the Pid!stop expression at the end of the go/0 function, and instead of binding the return value of spawn/3, we pass it as the second argument to the register BIF. The first argument to register is echo, the atom we use to name the process. This alias is used to send the message to the newly spawned child.

Example 4-2. The registered echo process
-module(echo).
-export([go/0, loop/0]).

go() ->
  register(echo, spawn(echo, loop, [])),
  echo ! {self(), hello},
  receive
    {_Pid, Msg} ->
      io:format("~w~n",[Msg])
  end.


loop() ->
  receive
    {From, Msg} ->
      From ! {self(), Msg},
      loop();
    stop ->
      true
  end.

It is not mandatory, but it is considered a good practice to give your process the same name as the module in which it is defined.

Update your echo module with the changes we just discussed and try out the new BIFs you have just read about in the shell. Test the new implementation of echo, inspecting its state with the i() and regs() shell commands. Note how the shell process sends the stop message to the echo process without knowing its pid, and how whereis/1 returns undefined if the process does not exist:

1> c(echo).
{ok,echo}
2> echo:go().
hello
ok
3> whereis(echo).
<0.37.0>
4> echo ! stop.
stop
5> whereis(echo).
undefined
6> regs().

** Registered procs on node nonode@nohost **
Name                   Pid            Initial Call           Reds Msgs
application_controlle  <0.5.0>        erlang:apply/2         4426    0
code_server            <0.20.0>       erlang:apply/2       112203    0
ddll_server            <0.10.0>       erl_ddll:init/1          32    0
erl_prim_loader        <0.2.0>        erlang:apply/2       206631    0
error_logger           <0.4.0>        gen_event:init_it/6     209    0
file_server            <0.19.0>       erlang:apply/2           12    0
file_server_2          <0.18.0>       file_server:init/1    25411    0
global_group           <0.17.0>       global_group:init/1      71    0
global_name_server     <0.12.0>       global:init/1            60    0
inet_db                <0.15.0>       inet_db:init/1          103    0
init                   <0.0.0>        otp_ring0:start/2      5017    0
kernel_safe_sup        <0.26.0>       supervisor:kernel/1      61    0
kernel_sup             <0.9.0>        supervisor:kernel/1    1377    0
rex                    <0.11.0>       rpc:init/1               44    0
user                   <0.23.0>       user:server/2          1459    0

** Registered ports on node nonode@nohost **
Name                  Id              Command
ok

The shell command regs() prints out all the registered processes. It might be an alternative to i() when retrieving system information in a system with large quantities of processes. In the preceding example, the echo process is not among the processes listed, as we have stopped it. Instead, you are seeing all of the registered system processes.

Warning

It is a feature of Erlang memory management that atoms are not garbage collected. Once you’ve created an atom, it remains in the atom table regardless of whether it is referenced in the code. This can be a potential problem if you decide to register transient processes with an alias derived from converting a string to an atom with the list_to_atom/1 BIF. If you have millions of users logging on to your system every day and you create a registered process for the duration of their sessions, don’t be surprised if you end up running out of memory.

You would be much better off storing the mapping of users to pids in a session table. It is best to register only processes with a long life span, and if you really must convert a string to use as an alias, use list_to_existing_atom/1 to ensure that your system does not suffer memory leakages.

Sending messages to nonexistent registered processes causes the calling process to terminate with a badarg (see Figure 4-10). This behavior is different from sending a message to a process identifier for a nonexistent process, as registered processes are assumed to provide a service. The absence of a registered process is therefore treated as a bug. If your program might be sending messages to nonexistent registered processes and you do not want the calling process to terminate, wrap a try ... catch around the call.

Sending messages to non-registered processes
Figure 4-10. Sending messages to non-registered processes

Timeouts

You saw that if a process enters a receive statement and none of the messages matches, the process will get suspended. This could be similar to you going to your mailbox at home, discovering there is no mail, and being forced to wait there until the mailman arrives. It might be an option if you are waiting for very urgent mail or have nothing better to do. In most cases, though, all you want to do is check the mailbox, and if nothing has arrived, continue with your household chores. Erlang processes can do just that by using the receive ... after construct:

receive
  Pattern1 when Guard1 -> exp11, .., exp1n;
  Pattern2 when Guard2 -> exp21, .., exp2n;
  ...
  Other                -> expn1, .., expnn
after
 Timeout -> exp1,  .., expn
end

When a process reaches the receive statement and no messages pattern-match, it will wait for Timeout milliseconds. If after Timeout milliseconds no message has arrived, the expressions in the body of the after clause are executed. Timeout is an integer denoting the time in milliseconds, or the atom infinity. Using infinity as a timeout value is the same as not including the after construct. It is included, as Timeout can be a variable set every time the function is called, allowing the receive ... after clause to behave as desired in each call (see Figure 4-11).

Receive timeouts
Figure 4-11. Receive timeouts

Assume you have a process registered with the alias db, which is acting as a database server. Every time you want to look up an item, you send the database a message and wait for a response. At busy times, however, the request might take too long to be processed, so you return a timeout error by using the receive ... after construct. When doing so, however, you will end up receiving the response from the server after the timeout, risking that your replies get out of sync with the sequence of requests sent to the database server. The next time you send the database a request, you will match the oldest message in your receive clause. This message will be the response sent back after the timeout, and not the response to the request you just sent. When using receive ... after, you need to cater to these cases by flushing your mailbox and ensuring it is empty. In doing so, your code might look something like this:

read(Key) ->
  flush(),
  db ! {self(),{read, Key}},
  receive
    {read,R}        -> {ok, R};
    {error, Reason} -> {error, Reason}
  after 1000        -> {error, timeout}
  end.

flush() ->
  receive
    {read, _}  -> flush();
    {error, _} -> flush()
  after 0       -> ok
end.

Another use for the receive ... after clause is to suspend a process for a period in milliseconds, or to send messages delayed by a certain amount of time. The definition of sleep/1 in the following code is taken directly from the timer library module, while send_after will send a message to the calling process after Time milliseconds:

-module(my_timer).
-export([send_after/2, sleep/1, send/3]).

send_after(Time, Msg) ->
    spawn(my_timer, send, [self(),Time,Msg]).

send(Pid, Time, Msg) ->
  receive
  after
    Time ->
      Pid ! Msg
  end.

sleep(T) ->
  receive
  after
    T ->  true
  end.

Benchmarking

In this chapter, we have been talking about the low process creation and message passing times in Erlang. To demonstrate them, let’s run a benchmark in which the parent spawns a child and sends a message to it. Upon being spawned, the child creates a new process and waits for a message from its parent. Upon receiving the message, it terminates normally. The child’s child creates yet another process, resulting in hundreds, thousands, and even millions of processes.

This is a sequential benchmark that will barely take advantage of SMP on a multicore system, because at any one time, only a couple of processes will be executing in parallel:

-module(myring).
-export([start/1, start_proc/2]).

start(Num) ->
  start_proc(Num, self()).

start_proc(0, Pid) ->
  Pid ! ok;

start_proc(Num, Pid) ->
  NPid = spawn(?MODULE, start_proc, [Num-1, Pid]),
  NPid ! ok,
  receive ok -> ok end.

Let’s test the preceding example for 100,000, 1 million, and 10 million processes. To benchmark the program, we use the function call:

timer:tc(Module, Function, Arguments)

which takes a function and its arguments and executes it. It returns a tuple containing the time in microseconds it took to run the function alongside the return value of the function. Testing the program shows that it takes 0.48 seconds to spawn 100,000 processes, 4.2 seconds to spawn 1 million processes, and about 40 seconds to spawn 10 million processes. Try it out on your computer:

1> c(myring).
{ok,myring}
2> timer:tc(myring, start, [100000]).
{484000,ok}
3> timer:tc(myring, start, [1000000]).
{4289360,ok}
4> timer:tc(myring, start, [10000000]).
{40572800,ok}

Process Skeletons

There is a common pattern to the behavior of processes, regardless of their particular purpose. Processes have to be spawned and their aliases registered. The first action of newly spawned processes is to initialize the process loop data. The loop data is often the result of arguments passed to the spawn BIF and the initialization of the process. It is stored in a variable we refer to as the process state. The state is passed to a receive-evaluate function, which receives a message, handles it, updates the state, and passes it back as an argument to a tail-recursive call. If one of the messages it handles is a stop message, the receiving process will clean up after itself and terminate. This is a recurring theme among processes that we usually refer to as a design pattern, and it will occur regardless of the task the process has been assigned to perform. Figure 4-12 shows an example skeleton.

A process skeleton
Figure 4-12. A process skeleton

From reoccurring patterns, let’s now look at differences among processes:

  • The arguments passed to the spawn BIF calls will differ from one process to another.

  • You have to decide whether you should register a process, and, if you do register it, what alias should be used.

  • In the function that initializes the process state, the actions taken will differ based on the tasks the process will perform.

  • The storing of the process state might be generic, but its contents will vary among processes.

  • When in the receive-evaluate loop, processes will receive different messages and handle them in different ways.

  • And finally, on termination, the cleanup will vary from process to process.

So, even if a skeleton of generic actions exists, these actions are complemented by specific ones that are directly related to the specific tasks assigned to the process.

Tail Recursion and Memory Leaks

We mentioned earlier that if processes have no more code to execute, they terminate. Suppose you want to write an echo process that indefinitely continues to send back the message it has received (or that does this until you explicitly send it a message to stop). You would keep the Erlang process alive using a tail-recursive call to the function that contains the receive statement. We often call this function the receive/evaluate loop of the process. Its task is to receive a message, handle it, and then recursively call itself.

This is where the importance of tail recursion in concurrent programming becomes evident. As you do not know how many times the function is going to be called, you must ensure that it executes in constant memory space without increasing the recursive call stack every time a message is handled. It is common to have processes handling thousands of messages per second over sustained periods of not only hours, days, or months, but also years! Using tail recursion, where the very last thing the receive/evaluate function does is to call itself, you ensure that this nonstop operation is possible without memory leakages.

What happens when a message doesn’t match any of the clauses in a receive statement? It remains in the process mailbox indefinitely, causing a memory leakage that over time could also cause the runtime system to run out of memory and crash. Not handling unknown messages should therefore be treated as a bug. Either these messages should not have been sent to this process in the first place, or they should be handled, possibly just by being retrieved from the mailbox and ignored.

The defensive approach of ignoring unknown messages with a “don’t care” variable in the receive clause, even if convenient, might not always be the best approach. Messages not being handled should probably not have been sent to the process in the first place. And if they were sent on purpose, they were probably not matched because of a bug in one of the receive clauses. Throwing these messages away would only make the bug harder to detect. If you do throw unknown messages away, make sure you log their occurrence so that the bugs can be found and corrected.

A Case Study on Concurrency-Oriented Programming

When on consulting assignments around the world working with developers coming from a C++ and Java background who have learned Erlang on their own, a common theme we have come across is the use of processes. This theme is irrespective of the experience level of the developers, and of what their system does. Instead of creating a process for every truly concurrent activity in the system, they tend to create one for every task. Programming concurrent applications in Erlang requires a different strategy for processes, which in turn means reasoning in a different way to what one may be accustomed to. The main difference from other concurrent languages is that with Erlang, processes are so cheap it is best to use a process for each truly concurrent activity in your system, not for every task. This case study is from one of Francesco’s first consulting assignments outside of Ericsson, soon after Erlang had been released as open source, and it illustrates clearly what we mean by the difference between a task and an activity.

He worked with a group of engineers who were developing an IM proxy for Jabber. The system received a packet through a socket, decoded it, and took certain actions based on its content. Once the actions were completed, it encoded a reply and sent it to a different socket to be forwarded to the recipient. Only one packet at a time could come through a socket, but many sockets could simultaneously be receiving and handling packets.

As described in Figure 4-13, the original system did not have a process for every truly concurrent activity—the processing of a packet from end to end—but instead used a process for every different task—decoding, encoding, and so forth. Each open socket in Erlang was associated with a process that was receiving and sending data through this socket. Once the packet was received, it was forwarded to a process that handled the decoding. Once decoded, the decoding process forwarded it to the handler that processed it. The result was sent to an encoding process, which after formatting it, forwarded the reply to the socket process that held the open connection belonging to the recipient.

At its best performance, the system could process five simultaneous messages, with the decoding, handling, and encoding being the bottleneck, regardless of the number of simultaneously connected sockets. There were two other processes, one used for error handling, where all errors were passed, and one managing a database, where data reads, writes, and deletes were executed.

A badly designed concurrent system
Figure 4-13. A badly designed concurrent system

When reviewing the system, we identified what we believed was a truly concurrent activity in the system. It was not the action of decoding, handling, and encoding that was the answer, but the handling of the individual packets themselves. Having a process for every packet and using that process to decode, handle, and encode the packet meant that if thousands of packets were received simultaneously, they would all be processed in parallel. Knowing that a socket can receive only one packet at any one time meant that we could use this socket process to handle the call. Once the packet was received, a function call ensured that it was decoded and handled. The result (possibly an error) was encoded and sent to the socket process managing the connection belonging to the final recipient. The error handler and database processes were not needed, as the consistency of data through the serialization of destructive database operations could have been achieved in the handler process, as could the management of errors.

If you look at Figure 4-14, you will notice that on top of the socket processes, a database process was added to the rewritten program. This was to ensure that data consistency was maintained, as many processes accessing the same data might corrupt it as a result of a race condition. All destructive database operations such as write and delete were serialized through this process. Even if you can execute most of your activities in parallel, it is essential to identify activities that need serializing and place them in a process of their own. By taking care in identifying truly concurrent activites in your Erlang system, and spawning a process for each, you will ensure that you maximize the throughput while reducing the risk of bottlenecks.

A process for each concurrent activity
Figure 4-14. A process for each concurrent activity

Race Conditions, Deadlocks, and Process Starvation

Anyone who has programmed concurrent applications before moving to Erlang will have his favorite horror stories on memory corruption, deadlocks, race conditions, and process starvation. Some of these conditions arise as a result of shared memory and the need for semaphores to access them. Others are as a result of priorities. Having a “no shared data” approach, where the only way for processes to share data is by copying data from one process to another, removes the need for locks, and as a result, the vast majority of bugs related to memory corruption deadlocks and race conditions.

Problems in concurrent programs may also arise as a result of synchronous message passing, especially if the communication is across a network. Erlang solves this through asynchronous message passing. And finally, the scheduler, the per-process garbage collection mechanisms, and the massive level of concurrency that can be supported in Erlang systems ensure that all processes get a relatively fair time slice when executing. In most systems, you can expect a majority of the processes to be suspended in a receive statement, waiting for an event to trigger a chain of actions.

That being said, Erlang is not completely problem-free. You can avoid these problems, however, through careful and well-thought-out design. Let’s start with race conditions. If two processes are executing the following code in parallel, what can go wrong?

start() ->
   case whereis(db_server) of
     undefined ->
        Pid = spawn(db_server, init, []),
        register(db_server, Pid),
        {ok, Pid};
     Pid when is_pid(Pid) ->
        {error, already_started}
end.

Assume that the database server process has not been started and two processes simultaneously start executing the start/0 function. The first process calls whereis(db_server), which returns the atom undefined. This pattern-matches the first clause, and as a result, a new database server is spawned. Its process identifier is bound to the variable Pid. If, after spawning the database server, the process runs out of reductions and is preempted, this will allow the second process to start executing. The call whereis(db_server) by the second process also returns undefined, as the first process had not gotten as far as registering it. The second process spawns the database server and might go a little further than the first one, registering it with the name db_server. At this stage, the second process is preempted and the first process continues where it left off. It tries to register the database server it created with the name db_server but fails with a runtime error, as there already is a process with that name. What we would have expected is the tuple {error, already_started} to be returned, instead of a runtime error. Race conditions such as this one in Erlang are rare, but they do happen when you might least expect them. A variant of the preceding example was taken from one of the early Erlang libraries and reported as a bug in 1996.

A second potential problem to keep in mind involves deadlocks. A good design of a system based on client/server principles is often enough to guarantee that your application will be deadlock-free. The only rule you have to follow is that if process A sends a message and waits for a response from process B, in effect doing a synchronous call, process B is not allowed, anywhere in its code, to do a synchronous call to process A, as the messages might cross and cause the deadlock. Deadlocks are extremely rare in Erlang as a direct result of the way in which programs are structured. In those rare occasions where they slip through the design phase, they are caught at a very early stage of testing.[15]

By calling the BIF process_flag(priority, Priority), where Priority can be set to the atom high, normal, or low, the behavior of the scheduler can be changed, giving processes with higher priority a precedence when being dispatched. Not only should you use this feature sparingly; in fact, you should not use it at all! As large parts of the Erlang runtime system are written in Erlang running at a normal priority, you will end up with deadlocks, starvation, and in extreme cases, a scheduler that gives low-priority processes more CPU time than its high-priority counterparts. With SMP, this behavior becomes even more non-deterministic. Endless flame wars and arguments regarding process and priorities have been fought on the Erlang-questions mailing list, deserving a whole chapter on the subject. We will limit ourselves to saying that under no circumstances should you use process priorities. A proper design of your concurrency model will ensure that your system is well balanced and deterministic, with no process starvation, deadlocks, or race conditions. You have been warned!

The Process Manager

The process manager is a debugging tool used to inspect the state of processes in Erlang systems. Whereas the debugger concentrates on tracing the sequential aspects of your program, the process manager deals with the concurrent ones. You can start the process manager by writing pman:start() in the shell. A window will open (see Figure 4-15), displaying contents similar to what you saw when experimenting with the i() command. Double-clicking any of the processes will open a trace output window. You can choose your settings by picking options in the File menu.

For each process with an output window, you can trace all the messages that are sent and received. You can trace BIF and function calls, as well as concurrency-related events, such as processes being spawned or terminated. Your can also redirect your trace output from the window to a file. Finally, you can pick the inheritance level of your trace events. A very detailed and well-written user guide comes with the Erlang distribution that we recommend as further reading.

The process manager window
Figure 4-15. The process manager window

Warning

At the time of writing, because of its underlying TCL/TK graphics libraries that are no longer supported, the process manager can be unstable when running on Microsoft Windows operating systems.

This chapter introduced the basics of concurrency in Erlang, which is based on message passing between concurrent processes, rather than on shared memory. Message passing is asynchronous, and the selective receive facility, under which messages can be handled independently of the order in which they are received, allows modular and concise concurrent programs to be written. In the next chapter, we’ll build on this introduction and look at design patterns for process-based systems.

Exercises

Exercise 4-1: An Echo Server

Write the server in Figure 4-16 that will wait in a receive loop until a message is sent to it. Depending on the message, it should either print its contents and loop again, or terminate. You want to hide the fact that you are dealing with a process, and access its services through a functional interface, which you can call from the shell.

An echo server
Figure 4-16. An echo server

This functional interface, exported in the echo.erl module, will spawn the process and send messages to it. The function interfaces are shown here:

echo:start() ⇒ ok
echo:print(Term) ⇒ ok
echo:stop() ⇒ ok

Hint: use the register/2 built-in function, and test your echo server using the process manager.

Warning: use an internal message protocol to avoid stopping the process when you, for example, call the function echo:print(stop).

Exercise 4-2: The Process Ring

Write a program that will create N processes connected in a ring, as shown in Figure 4-17. Once started, these processes will send M number of messages around the ring and then terminate gracefully when they receive a quit message. You can start the ring with the call ring:start(M, N, Message).

The process ring
Figure 4-17. The process ring

There are two basic strategies to tackling this exercise. The first one is to have a central process that sets up the ring and initiates sending the message. The second strategy consists of the new process spawning the next process in the ring. With this strategy, you have to find a method to connect the first process to the second process.

Regardless of the strategy you choose, make sure you have solved this exercise with pen and paper before you start coding. It differs from the ones you have solved before because you will have many processes executing the same function in the same module at the same time. Furthermore, processes will be using this function to interact with each other. When writing your program, make sure your code has many io:format statements in every loop iteration. This will give you a complete overview of what is happening (or not happening) and should help you solve the exercise.



[13] Partially because the return values of the operations in the shell are cached.

[14] The number of reductions will vary between releases. In the R12 release, the number of reductions starts at 2,000 and is reduced by one for every operation. In R13, the number of initial reductions depends on the number of scheduler threads.

[15] In 15 years of working with Erlang on systems with millions of lines of code, Francesco has come across only one deadlock that made it as far as the integration phase.

Get Erlang Programming now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.