Improve sync performance in Raft

Summary: We should now exchange around O(log(n)) messages to get a follower that is n transactions behind back in sync.

Reviewers: msantl, mferencevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2051
This commit is contained in:
Ivan Paljak 2019-05-15 15:44:24 +02:00
parent d4b2d76a35
commit 137e020b22
2 changed files with 18 additions and 1 deletions

View File

@ -78,6 +78,7 @@ void RaftServer::Start() {
// Peer state initialization
auto cluster_size = coordination_->GetAllNodeCount() + 1;
next_index_.resize(cluster_size);
index_offset_.resize(cluster_size);
match_index_.resize(cluster_size);
next_replication_.resize(cluster_size);
next_heartbeat_.resize(cluster_size);
@ -642,6 +643,7 @@ void RaftServer::Transition(const Mode &new_mode) {
// is initialized to leader's last log index + 1"
for (int i = 1; i <= coordination_->GetAllNodeCount(); ++i) {
next_index_[i] = log_size_;
index_offset_[i] = 1;
match_index_[i] = 0;
}
@ -781,7 +783,13 @@ void RaftServer::SendLogEntries(
if (!reply->success) {
// Replication can fail for the first log entry if the peer that we're
// sending the entry is in the process of shutting down.
next_index_[peer_id] = std::max(next_index_[peer_id] - 1, 1UL);
if (next_index_[peer_id] > index_offset_[peer_id]) {
next_index_[peer_id] -= index_offset_[peer_id];
// Overflow should be prevented by snapshot threshold constant.
index_offset_[peer_id] <<= 1UL;
} else {
next_index_[peer_id] = 1UL;
}
} else {
uint64_t new_match_index = request_prev_log_index + request_entries.size();
DCHECK(match_index_[peer_id] <= new_match_index)
@ -789,6 +797,7 @@ void RaftServer::SendLogEntries(
match_index_[peer_id] = new_match_index;
if (request_entries.size() > 0) AdvanceCommitIndex();
next_index_[peer_id] = match_index_[peer_id] + 1;
index_offset_[peer_id] = 1;
next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval;
}
@ -859,6 +868,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
match_index_[peer_id] = snapshot_metadata.last_included_index;
next_index_[peer_id] = snapshot_metadata.last_included_index + 1;
index_offset_[peer_id] = 1;
next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval;
state_changed_.notify_all();

View File

@ -239,6 +239,13 @@ class RaftServer final : public RaftInterface {
std::vector<uint64_t> next_index_; ///< for each server, index of the next
///< log entry to send to that server.
std::vector<uint64_t> index_offset_; ///< for each server, the offset for
///< which we reduce the next_index_
///< field if the AppendEntries request
///< is denied. We use "binary lifting"
///< style technique to achieve at most
///< O(logn) requests.
std::vector<uint64_t> match_index_; ///< for each server, index of the
///< highest log entry known to be
///< replicated on server.