2018-11-19 18:27:45 +08:00
|
|
|
#include "raft/raft_server.hpp"
|
|
|
|
|
|
|
|
#include <experimental/filesystem>
|
2018-12-07 23:19:03 +08:00
|
|
|
#include <kj/std/iostream.h>
|
2018-11-19 18:27:45 +08:00
|
|
|
|
|
|
|
#include <gflags/gflags.h>
|
|
|
|
#include <glog/logging.h>
|
|
|
|
|
2018-11-21 22:49:04 +08:00
|
|
|
#include "raft/exceptions.hpp"
|
|
|
|
#include "utils/exceptions.hpp"
|
2018-12-07 23:19:03 +08:00
|
|
|
#include "utils/serialization.hpp"
|
2018-11-21 22:49:04 +08:00
|
|
|
|
2018-11-19 18:27:45 +08:00
|
|
|
namespace raft {
|
|
|
|
|
2018-11-21 22:49:04 +08:00
|
|
|
namespace fs = std::experimental::filesystem;
|
|
|
|
|
2018-12-07 23:19:03 +08:00
|
|
|
const std::string kCurrentTermKey = "current_term";
|
|
|
|
const std::string kVotedForKey = "voted_for";
|
|
|
|
const std::string kLogKey = "log";
|
|
|
|
const std::string kRaftDir = "raft";
|
2018-11-21 22:49:04 +08:00
|
|
|
|
|
|
|
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
|
|
|
const Config &config, Coordination *coordination)
|
2018-12-07 23:19:03 +08:00
|
|
|
: config_(config),
|
2018-11-19 23:46:30 +08:00
|
|
|
coordination_(coordination),
|
2018-12-07 23:19:03 +08:00
|
|
|
mode_(Mode::FOLLOWER),
|
|
|
|
server_id_(server_id),
|
2018-11-21 22:49:04 +08:00
|
|
|
disk_storage_(fs::path(durability_dir) / kRaftDir) {
|
2018-12-07 23:19:03 +08:00
|
|
|
|
|
|
|
// Persistent storage initialization/recovery.
|
|
|
|
if (Log().empty()) {
|
|
|
|
disk_storage_.Put(kCurrentTermKey, "0");
|
|
|
|
} else {
|
|
|
|
Recover();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Peer state
|
|
|
|
int cluster_size = coordination_->WorkerCount();
|
|
|
|
next_index_.resize(cluster_size);
|
|
|
|
match_index_.resize(cluster_size);
|
|
|
|
next_heartbeat_.resize(cluster_size);
|
|
|
|
backoff_until_.resize(cluster_size);
|
|
|
|
|
|
|
|
// RPC registration
|
|
|
|
coordination->Register<RequestVoteRpc>(
|
2018-11-19 18:27:45 +08:00
|
|
|
[this](const auto &req_reader, auto *res_builder) {
|
2018-12-07 23:19:03 +08:00
|
|
|
std::lock_guard<std::mutex> guard(lock_);
|
|
|
|
RequestVoteReq req;
|
|
|
|
Load(&req, req_reader);
|
|
|
|
|
|
|
|
// [Raft paper 5.1]
|
|
|
|
// "If a server recieves a request with a stale term,
|
|
|
|
// it rejects the request"
|
|
|
|
uint64_t current_term = CurrentTerm();
|
|
|
|
if (req.term < current_term) {
|
|
|
|
RequestVoteRes res(false, current_term);
|
|
|
|
Save(res, res_builder);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// [Raft paper 5.2, 5.4]
|
|
|
|
// "Each server will vote for at most one candidate in a given
|
|
|
|
// term, on a first-come-first-serve basis with an additional
|
|
|
|
// restriction on votes"
|
|
|
|
// Restriction: "The voter denies its vote if its own log is more
|
|
|
|
// up-to-date than that of the candidate"
|
|
|
|
std::experimental::optional<uint16_t> voted_for = VotedFor();
|
|
|
|
auto last_entry_data = LastEntryData();
|
|
|
|
RequestVoteRes res(
|
|
|
|
(!voted_for || voted_for.value() == req.candidate_id) &&
|
|
|
|
AtLeastUpToDate(req.last_log_index, req.last_log_term,
|
|
|
|
last_entry_data.first, last_entry_data.second),
|
|
|
|
current_term);
|
|
|
|
Save(res, res_builder);
|
2018-11-19 18:27:45 +08:00
|
|
|
});
|
|
|
|
|
2018-11-19 23:46:30 +08:00
|
|
|
coordination_->Register<AppendEntriesRpc>(
|
2018-11-19 18:27:45 +08:00
|
|
|
[this](const auto &req_reader, auto *res_builder) {
|
2018-12-07 23:19:03 +08:00
|
|
|
std::lock_guard<std::mutex> guard(lock_);
|
|
|
|
AppendEntriesReq req;
|
|
|
|
Load(&req, req_reader);
|
|
|
|
|
|
|
|
// [Raft paper 5.1]
|
|
|
|
// "If a server recieves a request with a stale term,
|
|
|
|
// it rejects the request"
|
|
|
|
uint64_t current_term = CurrentTerm();
|
|
|
|
if (req.term < current_term) {
|
|
|
|
AppendEntriesRes res(false, current_term);
|
|
|
|
Save(res, res_builder);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// respond positively to a heartbeat.
|
|
|
|
// TODO(ipaljak) review this when implementing log replication.
|
|
|
|
if (req.entries.empty()) {
|
|
|
|
AppendEntriesRes res(true, current_term);
|
|
|
|
Save(res, res_builder);
|
|
|
|
if (mode_ != Mode::FOLLOWER) {
|
|
|
|
Transition(Mode::FOLLOWER);
|
|
|
|
state_changed_.notify_all();
|
|
|
|
} else {
|
|
|
|
SetNextElectionTimePoint();
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
throw utils::NotYetImplemented(
|
|
|
|
"AppendEntriesRpc which is not a heartbeat");
|
|
|
|
|
|
|
|
// [Raft paper 5.3]
|
|
|
|
// "If a follower's log is inconsistent with the leader's, the
|
|
|
|
// consistency check will fail in the next AppendEntries RPC."
|
|
|
|
//
|
|
|
|
// Consistency checking assures the Log Matching Property:
|
|
|
|
// - If two entries in different logs have the same index and
|
|
|
|
// term, then they store the same command.
|
|
|
|
// - If two entries in different logs have the same index and term,
|
|
|
|
// then the logs are identical in all preceding entries.
|
|
|
|
auto log = Log();
|
|
|
|
if (log.size() < req.prev_log_index ||
|
|
|
|
log[req.prev_log_index - 1].term != req.prev_log_term) {
|
|
|
|
AppendEntriesRes res(false, current_term);
|
|
|
|
Save(res, res_builder);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If existing entry conflicts with new one, we need to delete the
|
|
|
|
// existing entry and all that follow it.
|
|
|
|
if (log.size() > req.prev_log_index &&
|
|
|
|
log[req.prev_log_index].term != req.entries[0].term)
|
|
|
|
DeleteLogSuffix(req.prev_log_index + 1);
|
|
|
|
|
|
|
|
AppendLogEntries(req.leader_commit, req.entries);
|
|
|
|
AppendEntriesRes res(true, current_term);
|
|
|
|
Save(res, res_builder);
|
2018-11-19 18:27:45 +08:00
|
|
|
});
|
2018-12-07 23:19:03 +08:00
|
|
|
|
|
|
|
SetNextElectionTimePoint();
|
|
|
|
election_thread_ =
|
|
|
|
std::thread(&RaftServer::ElectionThreadMain, this);
|
|
|
|
|
|
|
|
for (const auto &peer_id : coordination_->GetWorkerIds()) {
|
|
|
|
if (peer_id == server_id_) continue;
|
|
|
|
peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id);
|
|
|
|
}
|
2018-11-19 18:27:45 +08:00
|
|
|
}
|
|
|
|
|
2018-12-07 23:19:03 +08:00
|
|
|
RaftServer::~RaftServer() {
|
|
|
|
exiting_ = true;
|
|
|
|
|
|
|
|
state_changed_.notify_all();
|
|
|
|
election_change_.notify_all();
|
|
|
|
|
|
|
|
for (auto &peer_thread : peer_threads_) {
|
|
|
|
if (peer_thread.joinable())
|
|
|
|
peer_thread.join();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (election_thread_.joinable())
|
|
|
|
election_thread_.join();
|
|
|
|
}
|
2018-11-19 23:46:30 +08:00
|
|
|
|
2018-12-07 23:19:03 +08:00
|
|
|
uint64_t RaftServer::CurrentTerm() {
|
|
|
|
auto opt_value = disk_storage_.Get(kCurrentTermKey);
|
|
|
|
if (opt_value == std::experimental::nullopt)
|
|
|
|
throw MissingPersistentDataException(kCurrentTermKey);
|
|
|
|
return std::stoull(opt_value.value());
|
|
|
|
}
|
|
|
|
|
|
|
|
std::experimental::optional<uint16_t> RaftServer::VotedFor() {
|
|
|
|
auto opt_value = disk_storage_.Get(kVotedForKey);
|
|
|
|
if (opt_value == std::experimental::nullopt)
|
|
|
|
return std::experimental::nullopt;
|
|
|
|
return {std::stoul(opt_value.value())};
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<LogEntry> RaftServer::Log() {
|
|
|
|
auto opt_value = disk_storage_.Get(kLogKey);
|
|
|
|
if (opt_value == std::experimental::nullopt) {
|
|
|
|
disk_storage_.Put(kLogKey, SerializeLog({}));
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
return DeserializeLog(opt_value.value());
|
2018-11-19 18:27:45 +08:00
|
|
|
}
|
|
|
|
|
2018-11-19 23:46:30 +08:00
|
|
|
void RaftServer::Replicate(const std::vector<database::StateDelta> &log) {
|
|
|
|
throw utils::NotYetImplemented("RaftServer replication");
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::Emplace(const database::StateDelta &delta) {
|
|
|
|
log_entry_buffer_.Emplace(delta);
|
|
|
|
}
|
|
|
|
|
|
|
|
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
|
|
|
|
: raft_server_(raft_server) {
|
|
|
|
CHECK(raft_server_) << "RaftServer can't be nullptr";
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::LogEntryBuffer::Enable() {
|
|
|
|
std::lock_guard<std::mutex> guard(lock_);
|
|
|
|
enabled_ = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::LogEntryBuffer::Disable() {
|
|
|
|
std::lock_guard<std::mutex> guard(lock_);
|
|
|
|
enabled_ = false;
|
|
|
|
// Clear all existing logs from buffers.
|
|
|
|
logs_.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
|
|
|
|
std::lock_guard<std::mutex> guard(lock_);
|
|
|
|
if (!enabled_) return;
|
|
|
|
|
|
|
|
tx::TransactionId tx_id = delta.transaction_id;
|
|
|
|
if (IsStateDeltaTransactionEnd(delta)) {
|
|
|
|
auto it = logs_.find(tx_id);
|
|
|
|
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
|
|
|
|
|
|
|
|
std::vector<database::StateDelta> log(std::move(it->second));
|
|
|
|
log.emplace_back(std::move(delta));
|
|
|
|
logs_.erase(it);
|
|
|
|
|
|
|
|
raft_server_->Replicate(log);
|
|
|
|
} else {
|
|
|
|
logs_[tx_id].emplace_back(std::move(delta));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd(
|
|
|
|
const database::StateDelta &delta) {
|
|
|
|
switch (delta.type) {
|
|
|
|
case database::StateDelta::Type::TRANSACTION_COMMIT:
|
|
|
|
return true;
|
|
|
|
case database::StateDelta::Type::TRANSACTION_ABORT:
|
|
|
|
case database::StateDelta::Type::TRANSACTION_BEGIN:
|
|
|
|
case database::StateDelta::Type::CREATE_VERTEX:
|
|
|
|
case database::StateDelta::Type::CREATE_EDGE:
|
|
|
|
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
|
|
|
|
case database::StateDelta::Type::SET_PROPERTY_EDGE:
|
|
|
|
case database::StateDelta::Type::ADD_LABEL:
|
|
|
|
case database::StateDelta::Type::REMOVE_LABEL:
|
|
|
|
case database::StateDelta::Type::REMOVE_VERTEX:
|
|
|
|
case database::StateDelta::Type::REMOVE_EDGE:
|
|
|
|
case database::StateDelta::Type::BUILD_INDEX:
|
|
|
|
case database::StateDelta::Type::DROP_INDEX:
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-07 23:19:03 +08:00
|
|
|
void RaftServer::Transition(const Mode &new_mode) {
|
|
|
|
switch (new_mode) {
|
|
|
|
case Mode::FOLLOWER: {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER";
|
|
|
|
SetNextElectionTimePoint();
|
|
|
|
mode_ = Mode::FOLLOWER;
|
|
|
|
//log_entry_buffer_.Disable();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case Mode::CANDIDATE: {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Transition to CANDIDATE";
|
|
|
|
//log_entry_buffer_.Disable();
|
|
|
|
|
|
|
|
// [Raft thesis, section 3.4]
|
|
|
|
// "Each candidate restarts its randomized election timeout at the start
|
|
|
|
// of an election, and it waits for that timeout to elapse before
|
|
|
|
// starting the next election; this reduces the likelihood of another
|
|
|
|
// split vote in the new election."
|
|
|
|
SetNextElectionTimePoint();
|
|
|
|
election_change_.notify_all();
|
|
|
|
|
|
|
|
// [Raft thesis, section 3.4]
|
|
|
|
// "To begin an election, a follower increments its current term and
|
|
|
|
// transitions to candidate state. It then votes for itself and issues
|
|
|
|
// RequestVote RPCs in parallel to each of the other servers in the
|
|
|
|
// cluster."
|
|
|
|
disk_storage_.Put(kCurrentTermKey, std::to_string(CurrentTerm() + 1));
|
|
|
|
disk_storage_.Put(kVotedForKey, std::to_string(server_id_));
|
|
|
|
|
|
|
|
granted_votes_ = 1;
|
|
|
|
vote_requested_.assign(coordination_->WorkerCount(), false);
|
|
|
|
|
|
|
|
mode_ = Mode::CANDIDATE;
|
|
|
|
|
|
|
|
if (HasMajortyVote()) {
|
|
|
|
Transition(Mode::LEADER);
|
|
|
|
state_changed_.notify_all();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case Mode::LEADER: {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER";
|
|
|
|
//log_entry_buffer_.Enable();
|
|
|
|
|
|
|
|
// Freeze election timer
|
|
|
|
next_election_ = TimePoint::max();
|
|
|
|
election_change_.notify_all();
|
|
|
|
|
|
|
|
// Set next heartbeat to correct values
|
|
|
|
TimePoint now = Clock::now();
|
|
|
|
for (auto &peer_heartbeat : next_heartbeat_)
|
|
|
|
peer_heartbeat = now + config_.heartbeat_interval;
|
|
|
|
|
|
|
|
mode_ = Mode::LEADER;
|
|
|
|
|
|
|
|
// TODO(ipaljak): Implement no-op replication. For now, we are only
|
|
|
|
// sending heartbeats.
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::Recover() {
|
|
|
|
throw utils::NotYetImplemented("RaftServer recover");
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::ElectionThreadMain() {
|
|
|
|
std::unique_lock<std::mutex> lock(lock_);
|
|
|
|
while (!exiting_) {
|
|
|
|
if (Clock::now() >= next_election_) {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Election timeout exceeded";
|
|
|
|
Transition(Mode::CANDIDATE);
|
|
|
|
state_changed_.notify_all();
|
|
|
|
}
|
|
|
|
election_change_.wait_until(lock, next_election_);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::PeerThreadMain(int peer_id) {
|
|
|
|
std::unique_lock<std::mutex> lock(lock_);
|
|
|
|
|
|
|
|
/* This loop will either call a function that issues an RPC or wait on the
|
|
|
|
* condition variable. It must not do both! Lock on `mutex_` is released
|
|
|
|
* while waiting for RPC response, which might cause us to miss a
|
|
|
|
* notification on `state_changed_` conditional variable and wait
|
|
|
|
* indefinitely. The safest thing to do is to assume some important part of
|
|
|
|
* state was modified while we were waiting for the response and loop around
|
|
|
|
* to check. */
|
|
|
|
while (!exiting_) {
|
|
|
|
TimePoint now = Clock::now();
|
|
|
|
TimePoint wait_until;
|
|
|
|
|
|
|
|
if (mode_ != Mode::FOLLOWER && backoff_until_[peer_id] > now) {
|
|
|
|
wait_until = backoff_until_[peer_id];
|
|
|
|
} else {
|
|
|
|
switch (mode_) {
|
|
|
|
case Mode::FOLLOWER: {
|
|
|
|
wait_until = TimePoint::max();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
case Mode::CANDIDATE: {
|
|
|
|
if (vote_requested_[peer_id]) break;
|
|
|
|
|
|
|
|
// TODO(ipaljak): Consider backoff.
|
|
|
|
wait_until = TimePoint::max();
|
|
|
|
|
|
|
|
auto request_term = CurrentTerm();
|
|
|
|
auto peer_future = coordination_->ExecuteOnWorker<RequestVoteRes>(
|
|
|
|
peer_id, [&](int worker_id, auto &client) {
|
|
|
|
auto last_entry_data = LastEntryData();
|
|
|
|
try {
|
|
|
|
auto res = client.template Call<RequestVoteRpc>(
|
|
|
|
server_id_, request_term, last_entry_data.first,
|
|
|
|
last_entry_data.second);
|
|
|
|
return res;
|
|
|
|
} catch (...) {
|
|
|
|
// not being able to connect to peer defaults to a vote
|
|
|
|
// being denied from that peer. This is correct but not
|
|
|
|
// optimal.
|
|
|
|
//
|
|
|
|
// TODO(ipaljak): reconsider this decision :)
|
|
|
|
return RequestVoteRes(false, request_term);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
lock.unlock(); // Release lock while waiting for response
|
|
|
|
auto reply = peer_future.get();
|
|
|
|
lock.lock();
|
|
|
|
|
|
|
|
if (CurrentTerm() != request_term || mode_ != Mode::CANDIDATE ||
|
|
|
|
exiting_) {
|
|
|
|
LOG(INFO) << "Server " << server_id_
|
|
|
|
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (OutOfSync(reply.term)) {
|
|
|
|
state_changed_.notify_all();
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
vote_requested_[peer_id] = true;
|
|
|
|
|
|
|
|
if (reply.vote_granted) {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Got vote from "
|
|
|
|
<< peer_id;
|
|
|
|
++granted_votes_;
|
|
|
|
if (HasMajortyVote()) Transition(Mode::LEADER);
|
|
|
|
} else {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Denied vote from "
|
|
|
|
<< peer_id;
|
|
|
|
}
|
|
|
|
|
|
|
|
state_changed_.notify_all();
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
case Mode::LEADER: {
|
|
|
|
if (now >= next_heartbeat_[peer_id]) {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Send HB to server "
|
|
|
|
<< peer_id;
|
|
|
|
auto peer_future = coordination_->ExecuteOnWorker<AppendEntriesRes>(
|
|
|
|
peer_id, [&](int worker_id, auto &client) {
|
|
|
|
auto last_entry_data = LastEntryData();
|
|
|
|
std::vector<raft::LogEntry> empty_entries;
|
|
|
|
auto res = client.template Call<AppendEntriesRpc>(
|
|
|
|
server_id_, commit_index_, CurrentTerm(),
|
|
|
|
last_entry_data.first, last_entry_data.second,
|
|
|
|
empty_entries);
|
|
|
|
return res;
|
|
|
|
});
|
|
|
|
next_heartbeat_[peer_id] =
|
|
|
|
Clock::now() + config_.heartbeat_interval;
|
|
|
|
}
|
|
|
|
wait_until = next_heartbeat_[peer_id];
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
state_changed_.wait_until(lock, wait_until);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::SetNextElectionTimePoint() {
|
|
|
|
// [Raft thesis, section 3.4]
|
|
|
|
// "Raft uses randomized election timeouts to ensure that split votes are
|
|
|
|
// rare and that they are resolved quickly. To prevent split votes in the
|
|
|
|
// first place, election timeouts are chosen randomly from a fixed interval
|
|
|
|
// (e.g., 150-300 ms)."
|
|
|
|
std::uniform_int_distribution<uint64_t> distribution(
|
|
|
|
config_.election_timeout_min.count(),
|
|
|
|
config_.election_timeout_max.count());
|
|
|
|
Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_));
|
|
|
|
next_election_ = Clock::now() + wait_interval;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RaftServer::HasMajortyVote() {
|
|
|
|
if (2 * granted_votes_ > coordination_->WorkerCount()) {
|
|
|
|
LOG(INFO) << "Server " << server_id_ << ": Obtained majority vote";
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
|
|
|
|
auto log = Log();
|
|
|
|
if (log.empty()) return {0, 0};
|
|
|
|
return {log.size(), log.back().term};
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
|
|
|
|
uint64_t last_log_term_a,
|
|
|
|
uint64_t last_log_index_b,
|
|
|
|
uint64_t last_log_term_b) {
|
|
|
|
if (last_log_term_a == last_log_term_b)
|
|
|
|
return last_log_index_a >= last_log_index_b;
|
|
|
|
return last_log_term_a > last_log_term_b;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RaftServer::OutOfSync(uint64_t reply_term) {
|
|
|
|
DCHECK(mode_ != Mode::FOLLOWER) << "`OutOfSync` called from FOLLOWER mode";
|
|
|
|
|
|
|
|
// [Raft thesis, Section 3.3]
|
|
|
|
// "Current terms are exchanged whenever servers communicate; if one
|
|
|
|
// server's current term is smaller than the other's, then it updates
|
|
|
|
// its current term to the larger value. If a candidate or leader
|
|
|
|
// discovers that its term is out of date, it immediately reverts to
|
|
|
|
// follower state."
|
|
|
|
if (CurrentTerm() < reply_term) {
|
|
|
|
disk_storage_.Put(kCurrentTermKey, std::to_string(reply_term));
|
|
|
|
disk_storage_.Delete(kVotedForKey);
|
|
|
|
granted_votes_ = 0;
|
|
|
|
Transition(Mode::FOLLOWER);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::DeleteLogSuffix(int starting_index) {
|
|
|
|
auto log = Log();
|
|
|
|
log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed
|
|
|
|
disk_storage_.Put(kLogKey, SerializeLog(log));
|
|
|
|
}
|
|
|
|
|
|
|
|
void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
|
|
|
|
const std::vector<LogEntry> &new_entries) {
|
|
|
|
auto log = Log();
|
|
|
|
for (auto &entry : new_entries) log.emplace_back(entry);
|
|
|
|
// See Raft paper 5.3
|
|
|
|
if (leader_commit_index > commit_index_)
|
|
|
|
commit_index_ = std::min(leader_commit_index, log.size());
|
|
|
|
disk_storage_.Put(kLogKey, SerializeLog(log));
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) {
|
|
|
|
::capnp::MallocMessageBuilder message;
|
|
|
|
auto log_builder =
|
|
|
|
message.initRoot<::capnp::List<capnp::LogEntry>>(log.size());
|
|
|
|
utils::SaveVector<capnp::LogEntry, LogEntry>(
|
|
|
|
log, &log_builder, [](auto *log_builder, const auto &log_entry) {
|
|
|
|
Save(log_entry, log_builder);
|
|
|
|
});
|
|
|
|
|
|
|
|
std::stringstream stream(std::ios_base::in | std::ios_base::out |
|
|
|
|
std::ios_base::binary);
|
|
|
|
kj::std::StdOutputStream std_stream(stream);
|
|
|
|
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
|
|
|
|
writeMessage(buffered_stream, message);
|
|
|
|
return stream.str();
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<LogEntry> RaftServer::DeserializeLog(
|
|
|
|
const std::string &serialized_log) {
|
|
|
|
if (serialized_log.empty()) return {};
|
|
|
|
::capnp::MallocMessageBuilder message;
|
|
|
|
std::stringstream stream(std::ios_base::in | std::ios_base::out |
|
|
|
|
std::ios_base::binary);
|
|
|
|
kj::std::StdInputStream std_stream(stream);
|
|
|
|
kj::BufferedInputStreamWrapper buffered_stream(std_stream);
|
|
|
|
stream << serialized_log;
|
|
|
|
readMessageCopy(buffered_stream, message);
|
|
|
|
|
|
|
|
::capnp::List<capnp::LogEntry>::Reader log_reader =
|
|
|
|
message.getRoot<::capnp::List<capnp::LogEntry>>().asReader();
|
|
|
|
std::vector<LogEntry> deserialized_log;
|
|
|
|
utils::LoadVector<capnp::LogEntry, LogEntry>(&deserialized_log, log_reader,
|
|
|
|
[](const auto &log_reader) {
|
|
|
|
LogEntry log_entry;
|
|
|
|
Load(&log_entry, log_reader);
|
|
|
|
return log_entry;
|
|
|
|
});
|
|
|
|
return deserialized_log;
|
|
|
|
}
|
|
|
|
|
2018-11-19 18:27:45 +08:00
|
|
|
} // namespace raft
|