Get answers and suggestions for various questions from here

Analysis of Elasticsearch's Distributed Consistency Principle (3) - Data


This article first appeared in the Yunqi community ( Elasticsearch distributed consistency principle analysis (3) - Data articles - blog - Yunqi community - Alibaba Cloud ), reproduced by the original author.


The "Elasticsearch Principles for Distributed Consistency Analysis" series will provide a detailed analysis of Elasticsearch's distributed consistency principle, describing its implementation, principles, and problems (based on version 6.2). The first two articles introduced how the clusters in the ES are composed, the master election algorithm, the process of updating the meta by the master, etc., and analyzes the consistency issues in the election and the meta update. This paper will analyze the data flow in ES, including its writing process, algorithm model PacificA, SequenceNumber and Checkpoint, etc., and compare the similarities and differences between ES implementation and standard PacificA algorithm. The directory is as follows:

  1. Problem background
  2. Data writing process
  3. PacificA algorithm
  4. SequenceNumber, Checkpoint and fault recovery
  5. Comparison of ES and PacificA
  6. summary

Problem background

Students who have used ES know that each index in the ES is divided into multiple Shards, and the Shards are distributed on different Nodes to implement distributed storage and query and support large-scale data sets. For each shard, there will be multiple copies of the shard, one of which is Primary and the other one or more Replica. When the data is written, it will be written to Primary, and the primary will resynchronize the data to Replica. In order to improve readability, both Primary and Replica accept read requests.

Under this model, we can feel that ES has such features, such as:

  1. Highly reliable data: Data has multiple copies.
  2. Service high availability: After the Primary is hanged, you can select a new Primary service from Replica.
  3. Readability extensions: Both Primary and Replica can take read requests.
  4. Failure recovery capability: Primary or Replica hangs up, resulting in insufficient number of copies. In this case, a new copy can be generated by the new Primary by copying the data.

In addition, we can also think of some issues, such as:

  1. How is the data copied from Primary to Replica?
  2. Does one write require all copies to succeed?
  3. Does Primary hang out and lose data?
  4. The data is read from Replica, can I always read the latest data?
  5. When recovering from a failure, do you need to copy all the data under the Shard?

It can be seen that for the data consistency in the ES, although we can easily understand its general principle, we still have a lot of confusion about its details. Then this article will explain how ES works in terms of ES write process, consistency algorithm, SequenceId and Checkpoint design, and then answer these questions. It should be noted that this article is based on the ES6.2 version of the analysis, and many of the content may not be applicable to the pre-ES version, such as the 2.X version.

Data writing process

First we look at the flow of data is written, the reader can read this article to learn more about: .

Replication perspective: Primary -> Replica

From a big perspective, the ES write process is to write Primary first, then write to Replica concurrently, and finally answer the client. The flow is as follows:

  • Check the number of active shards.
final String activeShardCountFailure = checkActiveShardCount();
  • Write to Primary.
primaryResult = primary.perform(request);
  • Concurrently initiate write requests to all Replicates
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
  • After all Replicates return or fail, return to the Client.
private void decPendingAndFinishIfNeeded() {
     assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
     if (pendingActions.decrementAndGet() == 0) {

The above process is in the execute function of the ReplicationOperation class. The complete code is as follows:

public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));

        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primaryResult = primary.perform(request);
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);

            // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
            // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
            // of the sampled replication group, and advanced further than what the given replication group would allow it to.
            // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());

        successfulShards.incrementAndGet();  // mark primary as successful

Below we analyze several issues for this process:

1. Why do you need to check the number of Active Shards in the first step?

There is a parameter in the ES called wait for active shards. This parameter is a setting for Index. You can also bring this parameter in the request. The meaning of this parameter is that the shard has at least the number of active copies before each write. Suppose we have an Index with 3 Replicas per Shard and a total of 4 copies with Primary. If wait for active shards is set to 3, then at most one Replica is allowed to hang. If two Replicas are hanged, the number of Active copies is less than 3. Writes are not allowed at this time.

This parameter defaults to 1, which means that as long as Primary can be written, it will not work. If the configuration is greater than 1, it can play a protective role to ensure higher reliability of the written data. However, this parameter is only checked before writing, and there is no guarantee that the data must be successfully written on at least these copies, so it is not strictly guaranteed how many copies are written. In this regard, you can refer to the following official documents:
...It is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. Once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary. The _shards section of the write operation’s response reveals the number of shard copies on which replication succeeded/failed.

2. After writing the Primary, why should I wait for all Replica responses (or connection failures) to return?

In earlier versions of ES, between Amazon and Replica, asynchronous replication was allowed, that is, it was returned successfully after writing to Primary. However, in this mode, if the Primary hangs, there is a risk of data loss, and reading data from Replica is difficult to ensure that the latest data can be read. So ES will cancel the asynchronous mode, and then change to Primary and other Replica and return to the client.

Because Primary waits for all Replica returns to be returned to the client, the delay is affected by the slowest Replica, which is indeed a drawback of the current ES architecture. I have mistakenly thought that this is to wait for wait for active_shards to be successfully written, but then read the source code and find that all Replica returns.
... Once all replicas have successfully performed the operation and responded to the primary, the primary acknowledges the successful completion of the request to the client.

If the Replica write fails, the ES will perform some retry logic, etc., but in the end does not force how many nodes must be successfully written. In the returned result, it will contain how many shards the data was written successfully and how many failed:

    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2

3. If a Replica continues to fail to write, will the user frequently check the old data?

The problem is that if a Replica continues to fail to write, then the data on this Replica may be much behind the Primary. We know that Replica in ES can also take read requests, so will the user read the old data on this Replica?

The answer is that if a Replica write fails, Primary will report this information to the Master, then the Master will update the Index's InSyncAllocations configuration in Meta, remove the Replica from it, and remove it without taking the read request. The user may also read the data of this Replica before the Meta is updated to each Node, but it will not be updated after the Meta is updated. So this solution is not very strict, considering that ES itself is a near real-time system, the data needs to be refreshed after the data is written, so in general, reading the old data in the short term should be acceptable.


            public void onFailure(Exception replicaException) {
                    (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                if (TransportActions.isShardNotAvailableException(replicaException)) {
                } else {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, message,
                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                            ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());


        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
                                      Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {

                    () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
                    createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));


    public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
        if ( && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {

            if (failedShard.primary()) {
                Updates updates = changes(failedShard.shardId());
                if (updates.firstFailedPrimary == null) {
                    // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
                    updates.firstFailedPrimary = failedShard;

        if ( && failedShard.primary()) {

The practice of maintaining InSyncAllocation in the ES is followed by the PacificA algorithm, which is detailed in the next section.

Primary angle

From the perspective of Primary itself, a write request is written to Lucene first and then to the translog. Specific process may be reading this: .

1. Why write translog?

Translog is similar to the commitlog in the database, or binlog. As long as the translog is successfully written and flushed, the data is placed on the disk, and the data security is guaranteed. The Segment can be placed later. Because translog is an append mode write, write performance will be higher than random writes.

On the other hand, translog records every data change and the order of data changes, so translog can also be used for data recovery. Data recovery includes two aspects. On the one hand, after the node is restarted, the segment data that has not been placed before the restart is restored from the translog, and the data synchronization between the primary and the new Replica is used on the other hand, that is, Replica gradually catches up with the Primary. The process of data.

2. Why write Lucene first, then write translog?

Write Lucene is written to the memory, can be read in the memory after refresh, write translog is the order, for data persistence and recovery. Normally, in a distributed system, the commitLog is first written for data persistence, and then the application changes this time in memory. So why should ES go against it? The main reason is probably that when Lucene is written, Lucene will check the data again, and there may be cases where writing to Lucene fails. If you write translog first, then you have to deal with the problem of writing translog successfully but writing to Lucene has been a failure, so ES uses the way to write Lucene first.

PacificA algorithm

PacificA is a distributed consensus algorithm for log replication systems proposed by Microsoft Research Asia. The paper was published in 2008 ( PacificA paper ). The ES official explicitly proposed its Replication model based on this algorithm:
Elasticsearch’s data replication model is based on the primary-backup model and is described very well in the PacificA paper of Microsoft Research. That model is based on having a single copy from the replication group that acts as the primary shard. The other copies are called replica shards. The primary serves as the main entry point for all indexing operations. It is in charge of validating them and making sure they are correct. Once an index operation has been accepted by the primary, the primary is also responsible for replicating the operation to the other copies.

There are few articles explaining this algorithm online, so this article briefly introduces this algorithm according to the paper of PacificA. The algorithm has the following characteristics:

  1. Strong consistency.
  2. Single Primary to Multi-Secondary data synchronization mode.
  3. Maintain the Configuration with additional consistency components.
  4. The minority Replica is still writable when it is available.

Some nouns

First let's introduce some terms in the algorithm:

  1. Replica Group: A collection of data that is a replica of each other is called Replica Group, and each copy is a Replica. Only one copy of a Replica Group is Primary and the rest is Secondary.
  2. Configuration: A Replica Group's Configuration describes which replicas are included in this Replica Group, of which Primary is.
  3. Configuration Version: The version number of the Configuration, which is incremented by 1 each time the Configuration changes.
  4. Configuration Manager: Manages the global components of Configuration, which guarantees the consistency of configuration data. Configuration changes are initiated by a Replica, sent to the Configuration Manager with Version, Configuration Manager will check if the Version is correct, and refuse to change if it is not correct.
  5. Query & Update: There are two operations on a Replica Group, Query and Update. Query does not change the data, and Update changes the data.
  6. Serial Number (sn): represents the order in which each Update operation is executed. Each Update operation is incremented by 1 and is a consecutive number.
  7. Prepared List: The preparation sequence of the Update operation.
  8. Committed List: The commit sequence of the Update operation. The operations in the commit sequence must not be lost (unless all copies are dropped). On the same Replica, the Committed List must be the prefix of the Prepared List.

Primary Invariant

In the PacificA algorithm, some error detection mechanism is required to satisfy the following invariants:

Primary Invariant: At any time, when a Replica considers itself to be Primary, the Configuration maintained in Configuration Manager also considers it to be the current Primary. At any time, at most one Replica considers itself to be the Primary of this Replica Group.

Primary Invariant guarantees that when a node considers itself to be Primary, it must be the current Primary. If the Primary Invariant is not met, the Query request may be sent to Old Primary to read the old data.

How to ensure that Primary Invariant is met? One method presented in the paper is through the Lease mechanism, which is also a common method in distributed systems. Specifically, Primary will periodically acquire a Lease. After obtaining it, it is considered that it is a Primary for a certain period of time. Once it has not acquired a new Lease, it will exit the Primary state. As long as the CPU of each machine does not have a large clock drift, the effectiveness of the Lease mechanism can be guaranteed.

The way to implement the Lease mechanism in the paper is that Primary periodically sends heartbeats to all Secondary to get Lease, instead of all nodes getting Lease from a centralized component. The advantage of this is that the pressure is distracted and there is no failure of the centralized component to cause all nodes to lose Lease.


The Query process is relatively simple. The Query can only be sent to the Primary. The Primary returns the corresponding value based on the latest commit data. Since the algorithm requires Primary Invariant, Query can always read the latest commit data.


The Update process is as follows:

  1. Primary assigns a Serial Number (sn) to an UpdateRequest.
  2. Primary adds this UpdateRequest to its own Prepared List, and sends a Prepare request to all Secondary, requesting that the UpdateRequest be added to the Prepared List.
  3. When all Replicas have completed Prepare, that is, all Replica's Prepared List contains the Update request, Primary starts the Commit request, which is to put the UpdateRequest into the Committed List, and Apply to update. It should be noted that on the same Replica, the Committed List is always the prefix of the Prepared List, so the Primary actually raises the Committed Point and includes the Update Request.
  4. Return to the client, the Update operation is successful.

When the next Primary sends a request to the Secondary, it will bring the Current Committed Point of the Primary, and the Secondary will raise its Committed Point.

From the Update process we can derive the following invariants:

Commited Invariant

We record a Certain Committed List as a SecondaryCommittedList, a Prepared List as a SecondaryPreparedList, and a Primary Committed List as a PrimaryCommittedList.

Commited Invariant : The SecondaryCommittedList must be the prefix of the PrimaryCommittedList, and the PrimaryCommittedList must be the prefix of the SecondaryPreparedList.

Reconfiguration:Secondary failure, Primary failure, new node

1. Secondary fault

When a Secondary fails, the Primary initiates a Reconfiguration to the Configuration Manager, removing the failed node from the Replica Group. Once the Replica is removed, it does not belong to the Replica Group and all requests will not be sent to it.

Suppose a Primary and Secondary network partition occurs, but you can connect to Configuration Manager. At this time, Primary will detect that Secondary does not respond, and Secondary will also detect that Primary does not respond. At this point both will attempt to initiate Reconfiguration and remove the other party from the Replica Group. The strategy here is the principle of First Win. Whoever changes to Configuration Manager first, whoever stays in the Replica Group, and the other one does not belong Replica Group, and can no longer update the Configuration. Since Primary will request a Lease from Secondary, Secondary will not perform Reconfiguration during Lease validity period, and Primary detection interval must be less than Lease time, so I think that in this case, it is always preferred that Primary first perform Reconfiguration to remove Secondary. .

2. Primary failure

When a Primary fails, Secondary will not receive the Primary heartbeat. If it exceeds the Lease time, then Secondary will initiate Reconfiguration, and the Primary will be removed. This is also the principle of First Win. Which Second is successful first, it will become Primary.

When a Secondary becomes Primary, you need to go through a stage called Reconciliation to provide the service. Because of the above-mentioned Commited Invariant, the original Primary's Committed List must be the prefix of the new Primary's Prepared List, then we will align the contents of the new Primary's Prepared List with other nodes in the current Replica Group, which is equivalent to If the record without Commit on the node is Commit once on all nodes, then all previous Commit records must be included. The following invariant:

Reconfiguration Invariant : When a new Primary completes Reconciliation at time T, then the Commited List of any node (including the original Primary) before T time is the prefix of the new Primary Current Commited List.

Reconfiguration Invariant indicates that the data that has been Commit will not be lost during the Reconfiguration process.

3. New node

The newly added node needs to be Secondary Candidate first. At this time, the Primary will start sending the Prepare request to it. At this time, the node will also chase the records that have not been synchronized before. Once it is leveled, it will apply to become a Secondary, and then Primary to Configuration Manager. Initiate a configuration change and join this node to the Replica Group.

In another case, if a node was once in the Replica Group and was removed due to a temporary failure, it needs to be added back. At this point, the data in the Commited List on this node must have been Commited, but the data in the Prepared List is not necessarily Commit, so the data without Commit should be removed, and the data should be requested from the Committed Point to the Primary.

Algorithm summary

PacificA is an algorithm that satisfies strong consistency for both reads and writes. It separates the consistency of the data from the consistency of the configuration. It uses an additional Consistency Component (Configuration Manager) to maintain consistent configuration, available copies of the data. When the number is less than half, new data can still be written and strong consistency is guaranteed.

The ES is designed with reference to the PacificA algorithm, which maintains the Meta of the Index through the Master, similar to the Configuration Manager in the paper to maintain the Configuration. The InSyncAllocationIds in its IndexMeta represent the currently available Shards, similar to the maintenance of the Replica Group in the paper. In the next section we will introduce the SequenceNumber and Checkpoint in the ES. These two are similar to the Serial Number and Committed Point in the PacificA algorithm. After this section, there will be another section to compare the ES implementation with the PacificA.

SequenceNumber, Checkpoint and fault recovery

The above is the ES's consistency algorithm model PacificA. The important point of this algorithm is that each Update operation will have a corresponding Serial Number, indicating the order of execution. In previous versions of ES, there wasn't something like a Serial Number for each write operation, so a lot of things couldn't be done. In 15 years, ES official began planning to add a SequenceNumber to each write operation, and envisioned many application scenarios. Specific information can refer to the following two links:

Add Sequence Numbers to write operations #10708

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

Below we briefly introduce the Sequence, what is Checkpoint, and its application scenarios.

Term and SequenceNumber

Each write is assigned two values, Term and SequenceNumber. Term adds 1 to each Primary change, similar to the Configuration Version in PacificA's paper. SequenceNumber is incremented by 1 after each operation, similar to the Serial Number in PacificA papers.

Since write requests are always sent to Primary, Term and SequenceNumber are allocated by Primary, and these two values ​​are taken when a synchronization request is sent to Replica.

LocalCheckpoint and GlobalCheckpoint

LocalCheckpoint represents that all requests in this shard that are less than this value have been processed.

GlobalCheckpoint represents that all requests smaller than this value are processed on all Replica. GlobalCheckpoint will be maintained by Primary. Each Replica will report its LocalCheckpoint to Primary. Based on this information, Primary will enhance GlobalCheckpoint.

GlobalCheckpoint is a global security location, which means that all previous requests are processed correctly by all Replica, and can be applied to data replenishment after node failure recovery. On the other hand, GlobalCheckpoint can also be used for Translog's GC, because the previous operation records can be saved. However, the GC strategy of Translog in ES is based on size or time, and it seems that GlobalCheckpoint is not used.

Fast failure recovery

When a Replica fails, the ES will remove it. When the fault exceeds a certain time, the ES will allocate a new Replica to the new Node. In this case, the full amount of data needs to be synchronized. However, if the previous fault Replica is back, you can only replenish the data after the fault, and then add it back after the fault to achieve fast fault recovery. There are two conditions for achieving fast fault recovery. One is to be able to save all operations during the fault and its order, and the other is to know from which point to start synchronizing data. The first condition can be achieved by saving the Translog for a certain period of time. The second condition can be implemented by Checkpoint, so that fast fault recovery can be achieved. This is the first important application scenario for SequenceNumber and Checkpoint.

Comparison of ES and PacificA

Same point

  1. Meta consistency and Data consistency are handled separately: PacificA maintains the consistency of the Configuration through Configuration Manager, and the ES maintains the consistency of the Meta through the Master.
  2. Maintain a replica collection in synchronization: Maintain Replica Group in PacificA and maintain InSyncAllocationIds in ES.
  3. SequenceNumber: In PacificA and ES, the write operation has a SequenceNumber, which records the sequence of operations.


The difference is mainly reflected in the fact that ES follows PacificA, but there are still many places where it does not meet the algorithm requirements, so it cannot guarantee strict consistency. The main points are as follows:

  1. Meta consistency: In the previous article, we analyzed the problem of meta-consistency in ES. It can be seen that ES does not fully guarantee meta-consistency, so it is inevitable that data consistency cannot be strictly guaranteed.
  2. Prepare phase: There is a Prepare phase in PacificA, which ensures that the data can be Commit after all the parameters of Prepare are successful. It ensures that the Commit data is not lost. There is no such stage in the ES, and the data will be directly written.
  3. Read consistency: All InSync Replics in the ES are readable, improving readability, but may read old data. On the other hand, even if only the Primary can be read, the ES needs the Lease mechanism to avoid reading Old Primary. Because the ES itself is a near real-time system, read consistency requirements may not be strict.


This paper analyzes the consistency of data flow in ES. It can be seen that ES has made a lot of progress in this piece in recent years, but there are also many problems. This article is the final article of the analysis of the distributed consistency principle of Elasticsearch. This series of articles is a survey and analysis summary of ES. It gradually analyzes the node discovery, master election, Meta consistency, Data consistency, etc. in ES. The students who read the series of articles said a word of thanks and look forward to communicating with you.


Index API | Elasticsearch Reference [6.2]

Reading and Writing documents | Elasticsearch Reference [6.2]

PacificA: Replication in Log-Based Distributed Storage Systems

Add Sequence Numbers to write operations #10708

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You