Saturday, April 27, 2024

Replicated Log


Cluster nodes preserve a Write-Forward Log. Every log entry shops
the state required for consensus together with the consumer request.
They coordinate to construct consensus over log entries,
so that each one cluster nodes have precisely the identical Write-Forward log.
The requests are then executed sequentially following the log.
As a result of all cluster nodes agree on every log entry, they execute the identical
requests in the identical order. This ensures that each one the cluster nodes
share the identical state.

Executing two phases for every state change request is just not environment friendly.
So cluster nodes choose a frontrunner at startup.
The chief election section establishes the Era Clock
and detects all log entries within the earlier Quorum.
(The entries the earlier chief may need copied to the vast majority of
the cluster nodes.)
As soon as there’s a steady chief, solely the chief co-ordinates the replication.
Shoppers talk with the chief.
The chief provides every request to the log and makes certain that it is replicated
on all of the followers. Consensus is reached as soon as a log entry is efficiently
replicated to the vast majority of the followers.
This fashion, just one section execution to
attain consensus is required for every state change operation when there’s a
steady chief.

Following sections describe how Raft implements a replicated log.

Replicating shopper requests

Determine 1: Replication

For every log entry, the chief appends it to its native Write-Forward log
after which sends it to all of the followers.

chief (class ReplicatedLog…)

  non-public Lengthy appendAndReplicate(byte[] knowledge) {
      Lengthy lastLogEntryIndex = appendToLocalLog(knowledge);
      replicateOnFollowers(lastLogEntryIndex);
      return lastLogEntryIndex;
  }


  non-public void replicateOnFollowers(Lengthy entryAtIndex) {
      for (closing FollowerHandler follower : followers) {
          replicateOn(follower, entryAtIndex); //ship replication requests to followers
      }
  }

The followers deal with the replication request and append the log entries to their native logs.
After efficiently appending the log entries, they reply to the chief with the index of the
newest log entry they’ve.
The response additionally contains the present Era Clock of the server.

The followers additionally verify if the entries exist already or there are entries past
those that are being replicated.
It ignores entries that are already current. But when there are entries that are from totally different generations,
they take away the conflicting entries.

follower (class ReplicatedLog…)

  void maybeTruncate(ReplicationRequest replicationRequest) {
      replicationRequest.getEntries().stream()
              .filter(entry -> wal.getLastLogIndex() >= entry.getEntryIndex() &&
                      entry.getGeneration() != wal.readAt(entry.getEntryIndex()).getGeneration())
              .forEach(entry -> wal.truncate(entry.getEntryIndex()));
  }

follower (class ReplicatedLog…)

  non-public ReplicationResponse appendEntries(ReplicationRequest replicationRequest) {
      Record<WALEntry> entries = replicationRequest.getEntries();
      entries.stream()
              .filter(e -> !wal.exists(e))
              .forEach(e -> wal.writeEntry(e));
      return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

The follower rejects the replication request when the technology quantity within the request
is decrease than the most recent technology the server is aware of about.
This notifies the chief to step down and develop into a follower.

follower (class ReplicatedLog…)

  Lengthy currentGeneration = replicationState.getGeneration();
  if (currentGeneration > request.getGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), currentGeneration, wal.getLastLogIndex());
  }

The Chief retains monitor of log indexes replicated at every server, when responses are obtained.
It makes use of it to trace the log entries that are efficiently copied to the Quorum
and tracks the index as a commitIndex. commitIndex is the Excessive-Water Mark within the log.

chief (class ReplicatedLog…)

  logger.data("Updating matchIndex for " + response.getServerId() + " to " + response.getReplicatedLogIndex());
  updateMatchingLogIndex(response.getServerId(), response.getReplicatedLogIndex());
  var logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
  var currentHighWaterMark = replicationState.getHighWaterMark();
  if (logIndexAtQuorum > currentHighWaterMark && logIndexAtQuorum != 0) {
      applyLogEntries(currentHighWaterMark, logIndexAtQuorum);
      replicationState.setHighWaterMark(logIndexAtQuorum);
  }

chief (class ReplicatedLog…)

  Lengthy computeHighwaterMark(Record<Lengthy> serverLogIndexes, int noOfServers) {
      serverLogIndexes.kind(Lengthy::compareTo);
      return serverLogIndexes.get(noOfServers / 2);
  }

chief (class ReplicatedLog…)

  non-public void updateMatchingLogIndex(int serverId, lengthy replicatedLogIndex) {
      FollowerHandler follower = getFollowerHandler(serverId);
      follower.updateLastReplicationIndex(replicatedLogIndex);
  }

chief (class ReplicatedLog…)

  public void updateLastReplicationIndex(lengthy lastReplicatedLogIndex) {
      this.matchIndex = lastReplicatedLogIndex;
  }

Full replication

You will need to make sure that all of the cluster nodes
obtain all of the log entries from the chief, even when
they’re disconnected or they crash and are available again up.
Raft has a mechanism to ensure all of the cluster nodes obtain
all of the log entries from the chief.

With each replication request in Raft, the chief additionally sends the log
index and technology of the log entries which instantly precede
the brand new entries getting replicated. If the earlier log index and
time period don’t match with its native log, the followers reject the request.
This means to the chief that the follower log must be synced
for among the older entries.

follower (class ReplicatedLog…)

  if (!wal.isEmpty() && request.getPrevLogIndex() >= wal.getLogStartIndex() &&
           generationAt(request.getPrevLogIndex()) != request.getPrevLogGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

follower (class ReplicatedLog…)

  non-public Lengthy generationAt(lengthy prevLogIndex) {
      WALEntry walEntry = wal.readAt(prevLogIndex);

      return walEntry.getGeneration();
  }

So the chief decrements the matchIndex and tries sending
log entries on the decrease index. This continues till the followers
settle for the replication request.

chief (class ReplicatedLog…)

  //rejected due to conflicting entries, decrement matchIndex
  FollowerHandler peer = getFollowerHandler(response.getServerId());
  logger.data("decrementing nextIndex for peer " + peer.getId() + " from " + peer.getNextIndex());
  peer.decrementNextIndex();
  replicateOn(peer, peer.getNextIndex());

This verify on the earlier log index and technology
permits the chief to detect two issues.

  • If the follower log has lacking entries.
    For instance, if the follower log has just one entry
    and the chief begins replicating the third entry,
    the requests can be rejected till the chief replicates
    the second entry.
  • If the earlier entries within the log are from a special
    technology, larger or decrease than the corresponding entries
    within the chief log. The chief will attempt replicating entries
    from decrease indexes till the requests get accepted.
    The followers truncate the entries for which the technology
    doesn’t match.

This fashion, the chief tries to push its personal log to all of the followers
repeatedly through the use of the earlier index to detect lacking entries
or conflicting entries.
This makes certain that each one the cluster nodes ultimately
obtain all of the log entries from the chief even after they
are disconnected for a while.

Raft doesn’t have a separate commit message, however sends the commitIndex as half
of the conventional replication requests.
The empty replication requests are additionally despatched as heartbeats.
So commitIndex is shipped to followers as a part of the heartbeat requests.

Log entries are executed within the log order

As soon as the chief updates its commitIndex, it executes the log entries so as,
from the final worth of the commitIndex to the most recent worth of the commitIndex.
The shopper requests are accomplished and the response is returned to the shopper
as soon as the log entries are executed.

class ReplicatedLog…

  non-public void applyLogEntries(Lengthy previousCommitIndex, Lengthy commitIndex) {
      for (lengthy index = previousCommitIndex + 1; index <= commitIndex; index++) {
          WALEntry walEntry = wal.readAt(index);
          var responses = stateMachine.applyEntries(Arrays.asList(walEntry));
          completeActiveProposals(index, responses);
      }
  }

The chief additionally sends the commitIndex with the heartbeat requests it sends to the followers.
The followers replace the commitIndex and apply the entries the identical approach.

class ReplicatedLog…

  non-public void updateHighWaterMark(ReplicationRequest request) {
      if (request.getHighWaterMark() > replicationState.getHighWaterMark()) {
          var previousHighWaterMark = replicationState.getHighWaterMark();
          replicationState.setHighWaterMark(request.getHighWaterMark());
          applyLogEntries(previousHighWaterMark, request.getHighWaterMark());
      }
  }

Chief Election

Chief election is the section the place log entries dedicated within the earlier quorum
are detected.
Each cluster node operates in three states: candidate, chief or follower.
The cluster nodes begin in a follower state anticipating
a HeartBeat from an current chief.
If a follower would not hear from any chief in a predetermined time interval
,it strikes to the candidate state and begins leader-election.
The chief election algorithm establishes a brand new Era Clock
worth. Raft refers back to the Era Clock as time period.

The chief election mechanism additionally makes certain the elected chief has as many
up-to-date log entries stipulated by the quorum.
That is an optimization performed by Raft
which avoids log entries from earlier Quorum
being transferred to the brand new chief.

New chief election is began by sending every of the peer servers
a message requesting a vote.

class ReplicatedLog…

  non-public void startLeaderElection() {
      replicationState.setGeneration(replicationState.getGeneration() + 1);
      registerSelfVote();
      requestVoteFrom(followers);
  }

As soon as a server is voted for in a given Era Clock,
the identical vote is returned for that technology all the time.
This ensures that another server requesting a vote for the
similar technology is just not elected, when a profitable election has already
occurred.
The dealing with of the vote request occurs as follows:

class ReplicatedLog…

  VoteResponse handleVoteRequest(VoteRequest voteRequest) {
      //for larger technology request develop into follower.
      // However we have no idea who the chief is but.
      if (voteRequest.getGeneration() > replicationState.getGeneration()) {
          becomeFollower(LEADER_NOT_KNOWN, voteRequest.getGeneration());
      }

      VoteTracker voteTracker = replicationState.getVoteTracker();
      if (voteRequest.getGeneration() == replicationState.getGeneration() && !replicationState.hasLeader()) {
              if(isUptoDate(voteRequest) && !voteTracker.alreadyVoted()) {
                  voteTracker.registerVote(voteRequest.getServerId());
                  return grantVote();
              }
              if (voteTracker.alreadyVoted()) {
                  return voteTracker.votedFor == voteRequest.getServerId() ?
                          grantVote():rejectVote();

              }
      }
      return rejectVote();
  }

  non-public boolean isUptoDate(VoteRequest voteRequest) 

The server which receives votes from the vast majority of the servers
transitions to the chief state. The bulk is set as mentioned
in Quorum. As soon as elected, the chief repeatedly
sends a HeartBeat to all the followers.
If the followers do not obtain a HeartBeat
in a specified time interval,
a brand new chief election is triggered.

Log entries from earlier technology

As mentioned within the above part, the primary section of the consensus
algorithms detects the present values, which had been copied
on the earlier runs of the algorithm. The opposite key facet is that
these values are proposed because the values with the most recent technology
of the chief. The second section decides that the worth is dedicated
provided that the values are proposed for the present technology.
Raft by no means updates technology numbers for the present entries
within the log. So if the chief has log entries from the older technology
that are lacking from among the followers,
it can’t mark these entries as dedicated simply based mostly on
the bulk quorum.
That’s as a result of another server which is probably not out there now,
can have an entry on the similar index with larger technology.
If the chief goes down with out replicating an entry from
its present technology, these entries can get overwritten by the brand new chief.
So in Raft, the brand new chief should commit a minimum of one entry in its time period.
It will possibly then safely commit all of the earlier entries.
Most sensible implementations of Raft attempt to commit a no-op entry
instantly after a frontrunner election, earlier than the chief is taken into account
able to serve shopper requests.
Seek advice from [raft-phd] part 3.6.1 for particulars.

An instance leader-election

Contemplate 5 servers, athens, byzantium, cyrene, delphi and ephesus.
ephesus is the chief for technology 1. It has replicated entries to
itself, delphi and athens.

Determine 2: Misplaced heartbeat triggers an election

At this level, ephesus and delphi get disconnected from the remainder of the cluster.

byzantium has the least election timeout, so it
triggers the election by incrementing its Era Clock to 2.
cyrene has its technology lower than 2 and it additionally has similar log entry as byzantium.
So it grants the vote. However athens has an additional entry in its log. So it rejects the vote.

As a result of byzantium can’t get a majority 3 votes, it loses the election
and strikes again to follower state.

Determine 3: Misplaced election as a result of log is just not up-to-date

athens occasions out and triggers the election subsequent. It increments the Era Clock
to three and sends vote request to byzantium and cyrene.
As a result of each byzantium and cyrene have decrease technology quantity and fewer log entries than
athens, they each grant the vote to athens.
As soon as athens will get majority of the votes, it turns into the chief and begins
sending HeartBeats to byzantium and cyrene.
As soon as byzantium and cyrene obtain a heartbeat from the chief at larger technology,
they increment their technology. This confirms the management of athens.
athens then replicates its personal log to byzantium and cyrene.

Determine 4: Node with up-to-date log wins election

athens now replicates Entry2 from technology 1 to byzantium and cyrene.
However as a result of it is an entry from the earlier technology,
it doesn’t replace the commitIndex even when Entry2 is efficiently replicated
on the bulk quorum.

athens appends a no-op entry to its native log.
After this new entry in technology 3 is efficiently replicated,
it updates the commitIndex

If ephesus comes again up or restores community connectivity and sends
request to cyrene. As a result of cyrene is now at technology 3, it rejects the requests.
ephesus will get the brand new time period within the rejection response, and steps right down to be a follower.

Determine 7: Chief step-down

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments