Kafka Durability Setup

Mon 16 December 2019

Eliminating the possibility of data lose as much as possible.

Very Quick Kafka Intro

You can skip it if you know more.

Each topic in Kafka is split into multiple partitions. Each partition is replicated more than once (for redundancy). Each of these replicas live in a different broker.

For reasons outside the scope of this post, Kafka will pick one of the three replicas to be a leader and the rest calls them followers. To write to a topic or to read from it, a leader must be available.

Setting up the stage

For simplicity, let's assume the following:

  • We have a topic with one partition.
  • And replication factor of 3.

When a producer sends a message to the topic (which has one partition in our case), it first goes to the leader. The followers later asks the leader for the latest messages it received to write their copies of the same message. So, we arrive to the state in which we have 3 copies of the same message.

Given this simple explanation, let's draw a simple happy scenario first:

  1. producer sends a message on topic.
  2. leader receives the message.
  3. follower_1 asks leader for the message.
  4. follower_2 asks leader for the message.

What happens when leader just dies after step 2?

That means that none of the followers got the message yet.

What Kafka does when a leader dies, is that Kafka chooses another replica (in our case one of the followers) to be the new leader. Kafka by default chooses one that is in-sync with the dead leader. In-sync roughly means it has all messages that exists on the dead leader (not exactly, but more on in-sync later).

Now, in our case, none of the followers is in-sync. So, Kafka won't be able to pick a leader (by default). That means, this topic is not available for read or write until the same dead leader is up again and, obviously, chosen to be the new leader, as it is the only valid candidate. That's not bad in itself, it depends on how available you want Kafka to be. Given this post's scope, we are only interested in the fact that the message is not lost. It's only a matter of time until it will become available for consumers.

But .. that's not quite true!

To understand the case here, we need to learn more about the in-sync replicas.

In-sync Replicas

Followers regularly check with the leader for new messages. Kafka keeps track of how much time passes since a follower did not check with the leader, and when it does, how much time passed since a follower have received the latest message/offset. Based on these two metrics, it decides if that follower is still in-sync or not.

Let's say Kafka is configured to expect a check every second. if a follower missed a check point (skips a second), it is removed from the list of in-sync replicas.

Let's say that the follower checks once every second. If in one of those checks, it did not receive all new messages, Kafka starts a timer. If subsequent checks, also failed to receive all new messages, for a certain amount of time, Kafka will remove this follower from the list of in-sync replicas.

Obviously, the leader is an in-sync replica, along with none or at least one follower.

How regular the checks happen, is configurable.

How much time a follower allowed to lag, is also configurable.

What happens when leader just dies after step 2 (second attempt) ?

Let's assume that Kafka is configured to have regular checks every second at most. Let's assume that our follower_1 checks the leader every 1 second. And our producer produces one message per second. And let's go through the scenario once again with timing:

  1. follower_1 asks leader for new messages.....(sec 0)
  2. producer sends a message on topic...........(sec 0.1)
  3. leader receives the message...................(sec 0.5)
  4. leader dies.....................................(sec 0.8)
  5. follower_1 asks leader for the message.
  6. follower_2 asks leader for the message.

Step 0: follower_1 just finished syncing with leader at (second 0).

Step 3: leader dies at (second 0.8).

Kafka then checks the in-sync replicas list to find an in-sync follower to become the new leader. It finds that follower_1 was a regular checker and has been reading the latest messages in every check it does (it has no lag behind). So, follower_1 is going to be the new leader. Although it does not have the latest message sent by the producer at the second (0.1).

We lost a message.

This is not bad in itself, but the fact that the producer thinks the message is written, is what makes it bad.

So... Now we need to configure Kafka to only report success to the producer when at least one follower received the message as well.

Producer config ack=all and topic's min.insync.replicas

Setting the producer to ack=all, makes the producer asks Kafka to only acknowledge a message when all in-sync replicas receive the message. Kafka's default for minimum insync replicas is 1. Since in-sync replicas will always contain the leader, if we have none of the followers in-sync, then the message will end up only on the leader, and that will be enough to satisfy the condition for acknowledgement; "all in-sync replicas should have the message".

Setting min.insync.replicas forces Kafka to never fall behind this number of insync replicas when it is considering to acknowledge a message.

So, for our topic if we set the producer to ack=all and configured our topic to have min.insync.replicas=2, then we are sure that "at least one follower got the message".

What have we done?

A topic with replication factor of 3.

A topic configured with min.insync.replicas=2.

A Producer configured with ack=all.

Now every time our producer will send a message, three conditions need to be checked before accepting/acknowledging the message: 1. Leader is available 2. At least one in-sync follower is available. 3. That follower received the new message.

This of course, lowers our chance of writing the message (that initially only requiring a leader to be availability).

Which means, given this setup, we gained stronger Consistency/Durability, but lost some Availability.

This trade-off, is un-escapable, but could be tweeked using the replica.lag.time.max.ms. But that's another topic.