Programming

Erlang n-squared gen_server

One of the nice things about Erlang is it’s amazing performance. However, in certain circumstances its performance is surprisingly poor.

As I pointed out in my previous post, work is done in an Erlang application by Erlang processes and each process has an input queue of messages.

The standard way of using Erlang is with OTP. OTP is an acronym that stands for Open Telecom Platform. OTP is a framework and set of support libraries that handle general problems like spawning supervised processes and building whole applications with them as well as a wide variety of general purpose support modules.

Typically processes in an OTP application that do most of the work will implement the gen_server behavior. In general, a gen_server process will wait for a message to arrive in its message queue and then process it before waiting for another. If in between beginning to handle a message and finishing, two or more messages arrive in the process message queue, the process will immediately begin processing the next one at the front of the queue.

Pulling the next message from the front of the queue is very fast. More importantly, it’s an O(1) time complexity operation with respect to the number of messages waiting to be processed. It’s done with syntax that looks like this:

receive
    Message -> handle_message(Message)
end

In this example, the first message in the queue will be removed from the queue and then bound to the variable Message and then it will be passed as a parameter to the function handle_message/1.

However, Erlang has a facility that allows messages to be processed out of order. This is done using an Erlang mechanism called selective receive. Selective receive will scan the message queue of a process from the front to the back looking for a message matching a particular pattern. If no match is found, a selective receive will wait for a matching message to be added to the end. Selective receive is done using syntax like this:

receive
    some_message -> got_the_message_i_was_looking_for()
end

In this code snippet, the message queue will be scanned for a message matching the single atom some_message, waiting for it if necessary, and when found, it will be removed from the queue and the function got_the_message_i_was_looking_for/0 will be called.

The problem here is that while pulling a message from the front of the queue has O(1) time complexity, using selective receive has O(N) time complexity with respect to the number of messages waiting in the queue to be processed. This is fine if your gen_server process doesn’t handle a lot of messages or else it doesn’t make use of selective receive. However, if your process is a high throughput server process the use of selective receive can be a big problem.

The most common Erlang facility that would make use selective receive would be calling gen_server:call/2 or gen_server:call/3. These functions make synchronous calls to other gen_server processes. The way this is implemented is by sending a message to the other process (which is normally always asynchronous) and then waiting for a particular pattern of response message using selective receive. Regardless of the time complexity of selective receive, it’s generally not advisable to wait synchronously for work to be done in a high throughput server process, so this usually isn’t an issue.

The real problem is typically more subtle. This is because a subset of OTP library calls are implemented in terms of selective receive. For example, the OTP library function for sending a packet to a UDP port is implemented by sending a message with the payload to an Erlang port followed by a selective receive to get the result. If, for example, your application sends UDP packets to localhost port 514 to log messages to syslog, you might assume that you could do that directly from a high throughput server process. If you were to do that, your application will probably work fine most of the time. However, if your application ever has a spike in workload or a stall in processing that causes your high throughput server process to get a bit behind, it may take a long time to catch up. If an Erlang process were to have N messages in its message queue and the processing of each message required sending a UDP packet then the O(N) nature of selective receive means that processing N messages has O(N^2) time complexity.

If an Erlang process is continuing to receive more messages at a constant rate, it’s possible for it to get far enough behind that it takes more time to process a message than the average time between messages arriving. In this case, it will never get caught up. Since each message in a processes’ message queue takes memory, the accumulation of messages in the processes message queue will cause the Erlang VM to eventually use all available RAM followed by increasingly severe swapping.

Here’s a simple demonstration of the problem. The following module definition will send N messages to itself, handle each by sending a UDP packet, and print the length of time it takes to drain its message queue.

-module(time_udp).

-export([start/1]).

start(N) ->
    {ok, Socket} = gen_udp:open(0),
    Seq = lists:seq(1, N),
    lists:foreach(fun(_I) -> self() ! message end, Seq),
    Start = erlang:now(),
    lists:foreach(fun(_I) ->
            receive _Message -> gen_udp:send(Socket, {127, 0, 0, 1}, 1234, <<"message">>) end
        end, Seq),
    End = erlang:now(),
    io:format("processed ~p messages in ~p seconds~n", [N, time_diff(Start, End)]).

time_diff({StartMega, StartSecs, StartMicro}, {EndMega, EndSecs, EndMicro}) ->
    ((EndMega - StartMega) * 1000000.0) + (EndSecs - StartSecs) + ((EndMicro - StartMicro) / 1000000.0).

Now calling start/1 with increasing values of N will illustrate the quadratic relationship between N and the time it takes to send N UDP packets. If the relationship were linear, doubling N should roughly double the time. Instead, as the following output shows, doubling N roughly multiplies the time by 4 which is exactly what would be expected if the relationship were quadratic.

$ erl
Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
1> time_udp:start(10000).
processed 10000 messages in 0.343503 seconds
ok
2> time_udp:start(20000).
processed 20000 messages in 1.222438 seconds
ok
3> time_udp:start(40000).
processed 40000 messages in 4.574205 seconds
ok
4> time_udp:start(80000).
processed 80000 messages in 17.498623 seconds
ok
5>

If only the implementation of sending a message to a UDP socket were implemented with an Erlang built-in function rather than a selective receive, this would not be a problem. Without writing your own native Erlang module, there is no way to avoid n-squared time complexity when sending UDP packets while using gen_server.*

So how do you write an Erlang application that doesn’t suffer from this problem and still use OTP? The answer is gen_server2. gen_server2 is actually a fork of the OTP gen_server module but with some significant modifications. The objective of the modifications is to minimize the time spent scanning the message queue when doing a selective receive. It accomplishes this by first draining the message queue into local memory before handling the first message in the queue the same way that gen_server would have done. By doing this, the cost of scanning the message queue when doing a selective receive is limited to any messages that have arrived in the message queue since handling of the current message started.

While gen_server2 can solve the n-squared problem caused by a large message backlog in the gen_server processes you’ve written for your application, it will not eliminate the problem entirely for any OTP application. The reason is that the same problem exists in all supervisor processes using the OTP supervisor behavior. For very busy supervisors, the problem can be severe since the protocol for a supervisor starting a new child process involves a selective receive in the supervisor to get the result of the child processes initialization. Additionally, the Erlang VM will automatically start a number of support processes that are implemented using gen_server.

One such system process that is easy to overload is the error_logger process. Generally applications don’t block waiting for the error_logger process to do work so an accumulation of messages in the error_logger process just causes increased memory and CPU utilization until the it catches up (assuming it does so).

You might think that if your application doesn’t send UDP packets, it’s safe from this problem (other than the supervisor and error_logger) so you don’t need gen_server2. While I’ve tracked down this problem for certain to exist within the gen_udp implementation, I strongly suspect that the same issue exists within other OTP library calls, though I’ve not specifically identified any. Since gen_server2 behaves identically to gen_server under normal circumstances and strictly better than gen_server under abnormal but not necessarily unusual circumstances, I strongly recommend using gen_server2 rather than gen_server in your Erlang applications.

* It is possible to not use the gen_udp module to send UDP messages and handle the result message asynchronously, avoiding the selective receive performed in the gen_udp implementation. However, doing so would eliminate the encapsulation within the gen_udp implementation of the format of the result message. It’s possible to do this, but not necessarily a good idea.

Programming

Some surprising Erlang behavior

Erlang is in many ways a fantastic language. Its syntax is a little foreign, at first, to someone like myself coming from a background in C and C++. The language and the runtime environment have some really nice qualities, though. When I began working with it, I quickly acquired an appreciation for it. I won’t get too deep into the details of the language and why I like it but I will start off with a discussion of a couple key features relevant to this discussion.

First, work is done in an Erlang application by Erlang processes. Each process is a lightweight thread and has an identifier called a pid. Each process has an input queue of messages. An Erlang process generally operates by receiving a message from its message queue, doing some work to process the message, and then waiting for another message to arrive in its queue. Some of the work that a process may do might involve generating and sending messages to another process using the other process’ pid. Under normal circumstances, sending a message from one process to another process is asynchronous, meaning the send operation does not block waiting for the receiving process to handle the message or even necessarily for it to be added to the queue of the receiving process.

Second, an Erlang node is an instance of the Erlang interpreter, an OS-level process. Erlang nodes can be easily joined together so that a single Erlang application can span many physical hosts. Once a node has joined an Erlang cluster, it becomes trivial for processes in remote hosts to communicate with one another. Sending a message to a remote process is done with the pid of the remote process, just like a local process. This allows for Location Transparency, one of the nice features of Erlang.

Erlang has gotten a reputation as a platform that is very stable. Joe Armstrong, one of the original authors of Erlang, has famously claimed that the Ericcson AXD301 switch, developed using Erlang, has achieved NINE nines of reliability.

Having heard this, I was extremely surprised to identify a case where an Erlang application performs not just poorly but it literally comes to a screeching halt. The problem occurs when one of the nodes in an Erlang cluster suddenly goes network silent. This can occur for a variety of reasons. For example, the node may have kernel panicked or it may have started swapping heavily or a network cable connecting one of the physical hosts in the cluster may have gotten unplugged. When this condition occurs, messages sent to processes on the node which has gone dark are buffered up to a limit but once the buffer fills up, sending messages to processes on the node which has gone dark goes from being asynchronous to being synchronous. Erlang processes not sending messages to the failed node still continue to do work as normal but any process sending a message to the failed node will halt.

The Erlang runtime will monitor the other nodes to which the local node is attached. If a host where one of the nodes is running goes network silent then after some period of time (defaulting to about 1 minute), the other nodes will decide that the network silent node is down. The length of time before a non-responsive node is considered down is tune-able using an Erlang kernel parameter, net_ticktime. You might think that the processes waiting to send messages to processes on the down node would get unblocked when the target node is considered down, but that (mostly) doesn’t happen. It turns out that once the target node is considered down, blocked senders will get unblocked once every 7 seconds, likely tune-able using the Erlang kernel parameter net_setuptime.

I’ve come up with a fairly easy way to demonstrate this problem given two Linux VMs. Let’s call them faucet-vm and sink-vm. Both need Erlang installed.

On the sink-vm host, create sink.erl with these contents:

-module(sink).
-export([start/0, entry/0]).

start() ->
    erlang:register(?MODULE, erlang:spawn(?MODULE, entry, [])).

entry() ->
    loop(0).

loop(X) ->
    receive ping ->
        io:format("~p: ping ~p~n", [timeasfloat(), X]),
        loop(X + 1)
    end.

timeasfloat() ->
    {Mega, Sec, Micro} = os:timestamp(),
    ((Mega * 1000000) + Sec + (Micro * 0.000001)).

Now compile and start the sink process:

$ erlc sink.erl
$ erl -sname sink -setcookie monster
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:4:4] [async-threads:0] [kernel-poll:false]

Eshell V5.9.1  (abort with ^G)
(sink@sink-vm)1> sink:start().
true
(sink@sink-vm)2>

On the faucet-vm host, create faucet.erl with these contents:

-module(faucet).
-export([start/0, entry/0]).

start() ->
    erlang:spawn(?MODULE, entry, []).

entry() ->
    Sink = rpc:call('sink@sink-vm', erlang, whereis, [sink]),
    loop(Sink, 0).

loop(Sink, X) ->
    io:format("~p: sending ping ~p~n", [timeasfloat(), X]),
    Sink ! ping,
    loop(Sink, X + 1).

timeasfloat() ->
    {Mega, Sec, Micro} = os:timestamp(),
    ((Mega * 1000000) + Sec + (Micro * 0.000001)).

Now compile and start the faucet process:

$ erlc faucet.erl
$ erl -sname faucet -setcookie monster
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:2:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.9.1  (abort with ^G)
(faucet@faucet-vm)1> faucet:start().

This spews horrific amounts of console output on both faucet-vm and sink-vm consoles. If you don’t see tons of console output on both sides, you’ve probably done something wrong.

In a separate console on sink-vm, use iptables to block all IP traffic between sink-vm and faucet-vm:

# iptables -A INPUT -s faucet-vm -j DROP && iptables -A OUTPUT -d faucet-vm -j DROP

The output of the two processes will halt shortly after the traffic between them is blocked. Which one stops first actually depends on whether the sink process is able to keep up with the faucet process before the traffic is blocked. About 60 seconds after halting, each node should report that the other node is DOWN. The number of messages the faucet claims to have sent will be higher than the number the sink has received. The difference will be the number of messages the faucet has buffered to send to the sink before blocking. After the faucet reports that the sink is DOWN, it will report one sent message every 7 seconds.

Here’s the tail of the output of the sink process when I tried it:

1440398109.640845: ping 45404
1440398109.640864: ping 45405
1440398109.640881: ping 45406
1440398109.640899: ping 45407
1440398109.640917: ping 45408
1440398109.640935: ping 45409
1440398109.640953: ping 45410
1440398109.640971: ping 45411
1440398109.640988: ping 45412
1440398109.641006: ping 45413
1440398109.641022: ping 45414
1440398109.64104: ping 45415
1440398109.641057: ping 45416
(sink@sink-vm)2>
=ERROR REPORT==== 23-Aug-2015::23:36:03 ===
** Node faucet@faucet-vm not responding **
** Removing (timedout) connection **

And here’s the tail of the output of the faucet process when I tried it:

1440398099.86504: sending ping 63339
1440398099.865061: sending ping 63340
1440398099.865107: sending ping 63341
1440398099.865159: sending ping 63342
1440398099.865199: sending ping 63343
1440398099.865224: sending ping 63344
1440398099.865246: sending ping 63345
1440398099.865293: sending ping 63346
1440398099.865328: sending ping 63347
1440398099.865364: sending ping 63348
1440398157.560028: sending ping 63349
(faucet@faucet-vm)2>
=ERROR REPORT==== 23-Aug-2015::23:35:57 ===
** Node 'sink@sink-vm' not responding **
** Removing (timedout) connection **
1440398164.566233: sending ping 63350
1440398171.574962: sending ping 63351
1440398178.582127: sending ping 63352
(faucet@faucet-vm)2>

The implications of this behavior are rather severe. If you have an Erlang application running on a cluster of N nodes, you can’t assume that if 1 of the nodes goes network silent that you will only lose 1/N total capacity of the cluster. If there are critical processes attempting to communicate with processes on the failed node then they will eventually be unable to do any work at all, including work that is unrelated to the failed node.

I haven’t yet tried this, but it is likely possible to solve this problem by routing all messages to remote processes through a proxy process. Each remote node can have a registered local proxy and the sender would find the appropriate proxy depending on the node of the remote process using erlang:node/1. Each node proxy could be monitored by a separate process that detects when the proxy has become unresponsive due to the remote node going down. When hung, the proxy can either be restarted to keep its message queue from growing too large or it can just be killed until the remote node becomes available again. At the very least, Location Transparency is lost.

An astute reader might notice that my use of iptables to simulate a network silent node is imperfect. Specifically, iptables will not block ARP traffic. It’s conceivable that ARP traffic might interfere with the experiment. It turns out it does not. I’ve specifically tested this by also blocking ARP traffic using arptables and obtained identical results. I left ARP and arptables out of the example for simplicity.

Additionally, I’ve carefully experimented with various iptables rules to simulate nodes becoming unavailable in other ways with similar results. Specifically, using a REJECT rule for the INPUT chain rather than DROP results in an ICMP destination port unreachable packet being returned to the faucet-vm host but the faucet process will still block. Using REJECT --reject-with icmp-host-unreachable will generate ICMP destination host unreachable packets returned to the faucet-vm host but this will also leave the faucet process blocked.

It turns out the only way to get the sending process (mostly) unblocked is if the sending node receives a TCP RST packet from the target host. I tested this by using REJECT --reject-with tcp-reset and allowing outbound RST packets. This effectively simulates a node whose host is still running but where Erlang is not. The reason this only mostly unblocks the sending process is that the rate of sending is still significantly lower than if the target node is up. I measured the rate of pings “sent” by the faucet under these conditions to be roughly 1/10th the rate when the sink node is up and traffic is not blocked.

One might think this problem is just a bug and if it was a bug, it might be limited to a particular version. After all, the example output above is from R15B01. If this behavior is a bug, it’s been around for a while and it hasn’t been fixed yet. I’ve reproduced it with at least three different versions. The earliest version was R15B01 and the newest is the current version, OTP 18.0.

The conclusion to be drawn is that when designing a distributed application using Erlang, you have to be aware of this behavior and either work around it one way or another or else accept that your application is unlikely to be highly available.

Edit:

After spending more time investigating this problem and work-arounds, I can now say for sure that the solution I suggested above can be made to work. However, it seems that there is a simpler solution. Instead of using the ! operator or erlang:send/2, it is possible to completely mitigate this problem by using erlang:send/3, passing [nosuspend, noconnect] as the options parameter.

If only nosuspend is used, the problem is mitigated up until the point where the remote node is identified as DOWN. Once that happens, it’s back to one message per 7 seconds until communication with the remote host is re-established. This suggests that the 7 second delay is the timeout waiting to connect to the remote node once it has been removed from the node list. Also using noconnect avoids blocking for 7 seconds on each send after the remote node has been determined to be down, which solves the problem in question but it potentially causes others. Using noconnect means that you have to have some other mechanism for re-establishing communication with remote nodes once they become available again. This isn’t necessarily challenging but it needs to be considered in when designing your application.