Implement Raft RPC, log serialization for disk storage and leader election
Summary: This diff contains a rough implementation of the Raft protocol which ends at leader election. Reviewers: msantl Reviewed By: msantl Subscribers: teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1744
This commit is contained in:
parent
f5b39cfc41
commit
d637629078
@ -16,8 +16,8 @@ namespace raft {
|
||||
|
||||
/// Configurable Raft parameters.
|
||||
struct Config {
|
||||
std::chrono::milliseconds leader_timeout_min;
|
||||
std::chrono::milliseconds leader_timeout_max;
|
||||
std::chrono::milliseconds election_timeout_min;
|
||||
std::chrono::milliseconds election_timeout_max;
|
||||
std::chrono::milliseconds heartbeat_interval;
|
||||
std::chrono::milliseconds replicate_timeout;
|
||||
|
||||
@ -34,19 +34,22 @@ struct Config {
|
||||
}
|
||||
|
||||
if (!data.is_object()) throw RaftConfigException(raft_config_file);
|
||||
if (!data["leader_timeout_min"].is_number())
|
||||
if (!data["election_timeout_min"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["leader_timeout_max"].is_number())
|
||||
if (!data["election_timeout_max"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["heartbeat_interval"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["replicate_timeout"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
|
||||
return Config{std::chrono::duration<int64_t>(data["leader_timeout_min"]),
|
||||
std::chrono::duration<int64_t>(data["leader_timeout_max"]),
|
||||
std::chrono::duration<int64_t>(data["heartbeat_interval"]),
|
||||
std::chrono::duration<int64_t>(data["replicate_timeout"])};
|
||||
return Config{
|
||||
std::chrono::duration<int64_t, std::milli>(
|
||||
data["election_timeout_min"]),
|
||||
std::chrono::duration<int64_t, std::milli>(
|
||||
data["election_timeout_max"]),
|
||||
std::chrono::duration<int64_t, std::milli>(data["heartbeat_interval"]),
|
||||
std::chrono::duration<int64_t, std::milli>(data["replicate_timeout"])};
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -64,6 +64,13 @@ io::network::Endpoint Coordination::GetServerEndpoint() {
|
||||
return server_.endpoint();
|
||||
}
|
||||
|
||||
std::vector<int> Coordination::GetWorkerIds() {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
std::vector<int> worker_ids;
|
||||
for (auto worker : workers_) worker_ids.push_back(worker.first);
|
||||
return worker_ids;
|
||||
}
|
||||
|
||||
communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto found = client_pools_.find(worker_id);
|
||||
@ -79,6 +86,10 @@ communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) {
|
||||
.first->second;
|
||||
}
|
||||
|
||||
uint16_t Coordination::WorkerCount() {
|
||||
return workers_.size();
|
||||
}
|
||||
|
||||
bool Coordination::Start() {
|
||||
if (!server_.Start()) return false;
|
||||
AddWorker(worker_id_, server_.endpoint());
|
||||
|
@ -15,7 +15,6 @@
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "utils/future.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
|
||||
namespace raft {
|
||||
@ -55,11 +54,16 @@ class Coordination final {
|
||||
/// Gets the endpoint for this RPC server.
|
||||
io::network::Endpoint GetServerEndpoint();
|
||||
|
||||
/// Returns all workers ids.
|
||||
std::vector<int> GetWorkerIds();
|
||||
|
||||
/// Returns a cached `ClientPool` for the given `worker_id`.
|
||||
communication::rpc::ClientPool *GetClientPool(int worker_id);
|
||||
|
||||
uint16_t WorkerCount();
|
||||
|
||||
/// Asynchronously executes the given function on the RPC client for the
|
||||
/// given worker id. Returns an `utils::Future` of the given `execute`
|
||||
/// given worker id. Returns an `std::future` of the given `execute`
|
||||
/// function's return type.
|
||||
template <typename TResult>
|
||||
auto ExecuteOnWorker(
|
||||
@ -69,6 +73,21 @@ class Coordination final {
|
||||
auto client_pool = GetClientPool(worker_id);
|
||||
return thread_pool_.Run(execute, worker_id, std::ref(*client_pool));
|
||||
}
|
||||
/// Asynchroniously executes the `execute` function on all worker rpc clients
|
||||
/// except the one whose id is `skip_worker_id`. Returns a vector of futures
|
||||
/// contaning the results of the `execute` function.
|
||||
template <typename TResult>
|
||||
auto ExecuteOnWorkers(
|
||||
int skip_worker_id,
|
||||
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
|
||||
execute) {
|
||||
std::vector<std::future<TResult>> futures;
|
||||
for (auto &worker_id : GetWorkerIds()) {
|
||||
if (worker_id == skip_worker_id) continue;
|
||||
futures.emplace_back(std::move(ExecuteOnWorker(worker_id, execute)));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
template <class TRequestResponse>
|
||||
void Register(std::function<
|
||||
|
@ -6,19 +6,15 @@
|
||||
|
||||
namespace raft {
|
||||
|
||||
/**
|
||||
* Base exception class used for all exceptions that can occur within the
|
||||
* Raft protocol.
|
||||
*/
|
||||
/// Base exception class used for all exceptions that can occur within the
|
||||
/// Raft protocol.
|
||||
class RaftException : public utils::BasicException {
|
||||
public:
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
/**
|
||||
* This exception should be thrown when attempting to transition between
|
||||
* incompatible states, e.g. from `FOLLOWER` to `LEADER`.
|
||||
*/
|
||||
/// This exception should be thrown when attempting to transition between
|
||||
/// incompatible states, e.g. from `FOLLOWER` to `LEADER`.
|
||||
class InvalidTransitionException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
@ -28,10 +24,8 @@ class InvalidTransitionException : public RaftException {
|
||||
new_mode) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Exception used to indicate something is wrong with the raft config provided
|
||||
* by the user.
|
||||
*/
|
||||
/// Exception used to indicate something is wrong with the raft config provided
|
||||
/// by the user.
|
||||
class RaftConfigException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
@ -39,10 +33,8 @@ class RaftConfigException : public RaftException {
|
||||
: RaftException("Unable to parse raft config file " + path) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Exception used to indicate something is wrong with the coordination config
|
||||
* provided by the user.
|
||||
*/
|
||||
/// Exception used to indicate something is wrong with the coordination config
|
||||
/// provided by the user.
|
||||
class RaftCoordinationConfigException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
@ -51,4 +43,15 @@ class RaftCoordinationConfigException : public RaftException {
|
||||
}
|
||||
};
|
||||
|
||||
/// This exception should be thrown when a `RaftServer` instance attempts
|
||||
/// to read data from persistent storage which is missing.
|
||||
class MissingPersistentDataException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
explicit MissingPersistentDataException(const std::string &key)
|
||||
: RaftException(
|
||||
"Attempting to read non-existing persistent data under key: " +
|
||||
key) {}
|
||||
};
|
||||
|
||||
} // namespace raft
|
||||
|
@ -13,7 +13,8 @@ cpp<#
|
||||
(lcp:capnp-import 'database "/database/single_node_ha/serialization.capnp")
|
||||
|
||||
(lcp:define-struct log-entry ()
|
||||
((deltas "std::vector<database::StateDelta>" :capnp-type "List(Database.StateDelta)"))
|
||||
((term :uint64_t)
|
||||
(deltas "std::vector<database::StateDelta>" :capnp-type "List(Database.StateDelta)"))
|
||||
(:serialize (:slk) (:capnp)))
|
||||
|
||||
(lcp:pop-namespace) ;; raft
|
||||
|
@ -27,7 +27,7 @@ cpp<#
|
||||
(lcp:define-rpc append-entries
|
||||
(:request
|
||||
((leader-id :uint16_t)
|
||||
(leader-commit :uint16_t)
|
||||
(leader-commit :uint64_t)
|
||||
(term :uint64_t)
|
||||
(prev-log-index :uint64_t)
|
||||
(prev-log-term :uint64_t)
|
||||
|
@ -1,45 +1,186 @@
|
||||
#include "raft/raft_server.hpp"
|
||||
|
||||
#include <experimental/filesystem>
|
||||
#include <kj/std/iostream.h>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "raft/coordination.hpp"
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "raft/raft_rpc_messages.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/serialization.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
const std::string kCurrentTermKey = "current_term";
|
||||
const std::string kVotedForKey = "voted_for";
|
||||
const std::string kLogKey = "log";
|
||||
const std::string kRaftDir = "raft";
|
||||
|
||||
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
const Config &config, Coordination *coordination)
|
||||
: server_id_(server_id),
|
||||
config_(config),
|
||||
: config_(config),
|
||||
coordination_(coordination),
|
||||
mode_(Mode::FOLLOWER),
|
||||
server_id_(server_id),
|
||||
disk_storage_(fs::path(durability_dir) / kRaftDir) {
|
||||
coordination_->Register<RequestVoteRpc>(
|
||||
|
||||
// Persistent storage initialization/recovery.
|
||||
if (Log().empty()) {
|
||||
disk_storage_.Put(kCurrentTermKey, "0");
|
||||
} else {
|
||||
Recover();
|
||||
}
|
||||
|
||||
// Peer state
|
||||
int cluster_size = coordination_->WorkerCount();
|
||||
next_index_.resize(cluster_size);
|
||||
match_index_.resize(cluster_size);
|
||||
next_heartbeat_.resize(cluster_size);
|
||||
backoff_until_.resize(cluster_size);
|
||||
|
||||
// RPC registration
|
||||
coordination->Register<RequestVoteRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
throw utils::NotYetImplemented("RaftServer constructor");
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
RequestVoteReq req;
|
||||
Load(&req, req_reader);
|
||||
|
||||
// [Raft paper 5.1]
|
||||
// "If a server recieves a request with a stale term,
|
||||
// it rejects the request"
|
||||
uint64_t current_term = CurrentTerm();
|
||||
if (req.term < current_term) {
|
||||
RequestVoteRes res(false, current_term);
|
||||
Save(res, res_builder);
|
||||
return;
|
||||
}
|
||||
|
||||
// [Raft paper 5.2, 5.4]
|
||||
// "Each server will vote for at most one candidate in a given
|
||||
// term, on a first-come-first-serve basis with an additional
|
||||
// restriction on votes"
|
||||
// Restriction: "The voter denies its vote if its own log is more
|
||||
// up-to-date than that of the candidate"
|
||||
std::experimental::optional<uint16_t> voted_for = VotedFor();
|
||||
auto last_entry_data = LastEntryData();
|
||||
RequestVoteRes res(
|
||||
(!voted_for || voted_for.value() == req.candidate_id) &&
|
||||
AtLeastUpToDate(req.last_log_index, req.last_log_term,
|
||||
last_entry_data.first, last_entry_data.second),
|
||||
current_term);
|
||||
Save(res, res_builder);
|
||||
});
|
||||
|
||||
coordination_->Register<AppendEntriesRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
throw utils::NotYetImplemented("RaftServer constructor");
|
||||
});
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
AppendEntriesReq req;
|
||||
Load(&req, req_reader);
|
||||
|
||||
// [Raft paper 5.1]
|
||||
// "If a server recieves a request with a stale term,
|
||||
// it rejects the request"
|
||||
uint64_t current_term = CurrentTerm();
|
||||
if (req.term < current_term) {
|
||||
AppendEntriesRes res(false, current_term);
|
||||
Save(res, res_builder);
|
||||
return;
|
||||
}
|
||||
|
||||
void RaftServer::Transition(const Mode &new_mode) {
|
||||
if (new_mode == Mode::LEADER)
|
||||
log_entry_buffer_.Enable();
|
||||
else
|
||||
log_entry_buffer_.Disable();
|
||||
// respond positively to a heartbeat.
|
||||
// TODO(ipaljak) review this when implementing log replication.
|
||||
if (req.entries.empty()) {
|
||||
AppendEntriesRes res(true, current_term);
|
||||
Save(res, res_builder);
|
||||
if (mode_ != Mode::FOLLOWER) {
|
||||
Transition(Mode::FOLLOWER);
|
||||
state_changed_.notify_all();
|
||||
} else {
|
||||
SetNextElectionTimePoint();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
throw utils::NotYetImplemented("RaftServer transition");
|
||||
throw utils::NotYetImplemented(
|
||||
"AppendEntriesRpc which is not a heartbeat");
|
||||
|
||||
// [Raft paper 5.3]
|
||||
// "If a follower's log is inconsistent with the leader's, the
|
||||
// consistency check will fail in the next AppendEntries RPC."
|
||||
//
|
||||
// Consistency checking assures the Log Matching Property:
|
||||
// - If two entries in different logs have the same index and
|
||||
// term, then they store the same command.
|
||||
// - If two entries in different logs have the same index and term,
|
||||
// then the logs are identical in all preceding entries.
|
||||
auto log = Log();
|
||||
if (log.size() < req.prev_log_index ||
|
||||
log[req.prev_log_index - 1].term != req.prev_log_term) {
|
||||
AppendEntriesRes res(false, current_term);
|
||||
Save(res, res_builder);
|
||||
return;
|
||||
}
|
||||
|
||||
// If existing entry conflicts with new one, we need to delete the
|
||||
// existing entry and all that follow it.
|
||||
if (log.size() > req.prev_log_index &&
|
||||
log[req.prev_log_index].term != req.entries[0].term)
|
||||
DeleteLogSuffix(req.prev_log_index + 1);
|
||||
|
||||
AppendLogEntries(req.leader_commit, req.entries);
|
||||
AppendEntriesRes res(true, current_term);
|
||||
Save(res, res_builder);
|
||||
});
|
||||
|
||||
SetNextElectionTimePoint();
|
||||
election_thread_ =
|
||||
std::thread(&RaftServer::ElectionThreadMain, this);
|
||||
|
||||
for (const auto &peer_id : coordination_->GetWorkerIds()) {
|
||||
if (peer_id == server_id_) continue;
|
||||
peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
RaftServer::~RaftServer() {
|
||||
exiting_ = true;
|
||||
|
||||
state_changed_.notify_all();
|
||||
election_change_.notify_all();
|
||||
|
||||
for (auto &peer_thread : peer_threads_) {
|
||||
if (peer_thread.joinable())
|
||||
peer_thread.join();
|
||||
}
|
||||
|
||||
if (election_thread_.joinable())
|
||||
election_thread_.join();
|
||||
}
|
||||
|
||||
uint64_t RaftServer::CurrentTerm() {
|
||||
auto opt_value = disk_storage_.Get(kCurrentTermKey);
|
||||
if (opt_value == std::experimental::nullopt)
|
||||
throw MissingPersistentDataException(kCurrentTermKey);
|
||||
return std::stoull(opt_value.value());
|
||||
}
|
||||
|
||||
std::experimental::optional<uint16_t> RaftServer::VotedFor() {
|
||||
auto opt_value = disk_storage_.Get(kVotedForKey);
|
||||
if (opt_value == std::experimental::nullopt)
|
||||
return std::experimental::nullopt;
|
||||
return {std::stoul(opt_value.value())};
|
||||
}
|
||||
|
||||
std::vector<LogEntry> RaftServer::Log() {
|
||||
auto opt_value = disk_storage_.Get(kLogKey);
|
||||
if (opt_value == std::experimental::nullopt) {
|
||||
disk_storage_.Put(kLogKey, SerializeLog({}));
|
||||
return {};
|
||||
}
|
||||
return DeserializeLog(opt_value.value());
|
||||
}
|
||||
|
||||
void RaftServer::Replicate(const std::vector<database::StateDelta> &log) {
|
||||
@ -107,4 +248,303 @@ bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd(
|
||||
}
|
||||
}
|
||||
|
||||
void RaftServer::Transition(const Mode &new_mode) {
|
||||
switch (new_mode) {
|
||||
case Mode::FOLLOWER: {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER";
|
||||
SetNextElectionTimePoint();
|
||||
mode_ = Mode::FOLLOWER;
|
||||
//log_entry_buffer_.Disable();
|
||||
break;
|
||||
}
|
||||
|
||||
case Mode::CANDIDATE: {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Transition to CANDIDATE";
|
||||
//log_entry_buffer_.Disable();
|
||||
|
||||
// [Raft thesis, section 3.4]
|
||||
// "Each candidate restarts its randomized election timeout at the start
|
||||
// of an election, and it waits for that timeout to elapse before
|
||||
// starting the next election; this reduces the likelihood of another
|
||||
// split vote in the new election."
|
||||
SetNextElectionTimePoint();
|
||||
election_change_.notify_all();
|
||||
|
||||
// [Raft thesis, section 3.4]
|
||||
// "To begin an election, a follower increments its current term and
|
||||
// transitions to candidate state. It then votes for itself and issues
|
||||
// RequestVote RPCs in parallel to each of the other servers in the
|
||||
// cluster."
|
||||
disk_storage_.Put(kCurrentTermKey, std::to_string(CurrentTerm() + 1));
|
||||
disk_storage_.Put(kVotedForKey, std::to_string(server_id_));
|
||||
|
||||
granted_votes_ = 1;
|
||||
vote_requested_.assign(coordination_->WorkerCount(), false);
|
||||
|
||||
mode_ = Mode::CANDIDATE;
|
||||
|
||||
if (HasMajortyVote()) {
|
||||
Transition(Mode::LEADER);
|
||||
state_changed_.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case Mode::LEADER: {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER";
|
||||
//log_entry_buffer_.Enable();
|
||||
|
||||
// Freeze election timer
|
||||
next_election_ = TimePoint::max();
|
||||
election_change_.notify_all();
|
||||
|
||||
// Set next heartbeat to correct values
|
||||
TimePoint now = Clock::now();
|
||||
for (auto &peer_heartbeat : next_heartbeat_)
|
||||
peer_heartbeat = now + config_.heartbeat_interval;
|
||||
|
||||
mode_ = Mode::LEADER;
|
||||
|
||||
// TODO(ipaljak): Implement no-op replication. For now, we are only
|
||||
// sending heartbeats.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RaftServer::Recover() {
|
||||
throw utils::NotYetImplemented("RaftServer recover");
|
||||
}
|
||||
|
||||
void RaftServer::ElectionThreadMain() {
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
while (!exiting_) {
|
||||
if (Clock::now() >= next_election_) {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Election timeout exceeded";
|
||||
Transition(Mode::CANDIDATE);
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
election_change_.wait_until(lock, next_election_);
|
||||
}
|
||||
}
|
||||
|
||||
void RaftServer::PeerThreadMain(int peer_id) {
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
|
||||
/* This loop will either call a function that issues an RPC or wait on the
|
||||
* condition variable. It must not do both! Lock on `mutex_` is released
|
||||
* while waiting for RPC response, which might cause us to miss a
|
||||
* notification on `state_changed_` conditional variable and wait
|
||||
* indefinitely. The safest thing to do is to assume some important part of
|
||||
* state was modified while we were waiting for the response and loop around
|
||||
* to check. */
|
||||
while (!exiting_) {
|
||||
TimePoint now = Clock::now();
|
||||
TimePoint wait_until;
|
||||
|
||||
if (mode_ != Mode::FOLLOWER && backoff_until_[peer_id] > now) {
|
||||
wait_until = backoff_until_[peer_id];
|
||||
} else {
|
||||
switch (mode_) {
|
||||
case Mode::FOLLOWER: {
|
||||
wait_until = TimePoint::max();
|
||||
break;
|
||||
}
|
||||
|
||||
case Mode::CANDIDATE: {
|
||||
if (vote_requested_[peer_id]) break;
|
||||
|
||||
// TODO(ipaljak): Consider backoff.
|
||||
wait_until = TimePoint::max();
|
||||
|
||||
auto request_term = CurrentTerm();
|
||||
auto peer_future = coordination_->ExecuteOnWorker<RequestVoteRes>(
|
||||
peer_id, [&](int worker_id, auto &client) {
|
||||
auto last_entry_data = LastEntryData();
|
||||
try {
|
||||
auto res = client.template Call<RequestVoteRpc>(
|
||||
server_id_, request_term, last_entry_data.first,
|
||||
last_entry_data.second);
|
||||
return res;
|
||||
} catch (...) {
|
||||
// not being able to connect to peer defaults to a vote
|
||||
// being denied from that peer. This is correct but not
|
||||
// optimal.
|
||||
//
|
||||
// TODO(ipaljak): reconsider this decision :)
|
||||
return RequestVoteRes(false, request_term);
|
||||
}
|
||||
});
|
||||
lock.unlock(); // Release lock while waiting for response
|
||||
auto reply = peer_future.get();
|
||||
lock.lock();
|
||||
|
||||
if (CurrentTerm() != request_term || mode_ != Mode::CANDIDATE ||
|
||||
exiting_) {
|
||||
LOG(INFO) << "Server " << server_id_
|
||||
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
|
||||
break;
|
||||
}
|
||||
|
||||
if (OutOfSync(reply.term)) {
|
||||
state_changed_.notify_all();
|
||||
continue;
|
||||
}
|
||||
|
||||
vote_requested_[peer_id] = true;
|
||||
|
||||
if (reply.vote_granted) {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Got vote from "
|
||||
<< peer_id;
|
||||
++granted_votes_;
|
||||
if (HasMajortyVote()) Transition(Mode::LEADER);
|
||||
} else {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Denied vote from "
|
||||
<< peer_id;
|
||||
}
|
||||
|
||||
state_changed_.notify_all();
|
||||
continue;
|
||||
}
|
||||
|
||||
case Mode::LEADER: {
|
||||
if (now >= next_heartbeat_[peer_id]) {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Send HB to server "
|
||||
<< peer_id;
|
||||
auto peer_future = coordination_->ExecuteOnWorker<AppendEntriesRes>(
|
||||
peer_id, [&](int worker_id, auto &client) {
|
||||
auto last_entry_data = LastEntryData();
|
||||
std::vector<raft::LogEntry> empty_entries;
|
||||
auto res = client.template Call<AppendEntriesRpc>(
|
||||
server_id_, commit_index_, CurrentTerm(),
|
||||
last_entry_data.first, last_entry_data.second,
|
||||
empty_entries);
|
||||
return res;
|
||||
});
|
||||
next_heartbeat_[peer_id] =
|
||||
Clock::now() + config_.heartbeat_interval;
|
||||
}
|
||||
wait_until = next_heartbeat_[peer_id];
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state_changed_.wait_until(lock, wait_until);
|
||||
}
|
||||
}
|
||||
|
||||
void RaftServer::SetNextElectionTimePoint() {
|
||||
// [Raft thesis, section 3.4]
|
||||
// "Raft uses randomized election timeouts to ensure that split votes are
|
||||
// rare and that they are resolved quickly. To prevent split votes in the
|
||||
// first place, election timeouts are chosen randomly from a fixed interval
|
||||
// (e.g., 150-300 ms)."
|
||||
std::uniform_int_distribution<uint64_t> distribution(
|
||||
config_.election_timeout_min.count(),
|
||||
config_.election_timeout_max.count());
|
||||
Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_));
|
||||
next_election_ = Clock::now() + wait_interval;
|
||||
}
|
||||
|
||||
bool RaftServer::HasMajortyVote() {
|
||||
if (2 * granted_votes_ > coordination_->WorkerCount()) {
|
||||
LOG(INFO) << "Server " << server_id_ << ": Obtained majority vote";
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
|
||||
auto log = Log();
|
||||
if (log.empty()) return {0, 0};
|
||||
return {log.size(), log.back().term};
|
||||
}
|
||||
|
||||
bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
|
||||
uint64_t last_log_term_a,
|
||||
uint64_t last_log_index_b,
|
||||
uint64_t last_log_term_b) {
|
||||
if (last_log_term_a == last_log_term_b)
|
||||
return last_log_index_a >= last_log_index_b;
|
||||
return last_log_term_a > last_log_term_b;
|
||||
}
|
||||
|
||||
bool RaftServer::OutOfSync(uint64_t reply_term) {
|
||||
DCHECK(mode_ != Mode::FOLLOWER) << "`OutOfSync` called from FOLLOWER mode";
|
||||
|
||||
// [Raft thesis, Section 3.3]
|
||||
// "Current terms are exchanged whenever servers communicate; if one
|
||||
// server's current term is smaller than the other's, then it updates
|
||||
// its current term to the larger value. If a candidate or leader
|
||||
// discovers that its term is out of date, it immediately reverts to
|
||||
// follower state."
|
||||
if (CurrentTerm() < reply_term) {
|
||||
disk_storage_.Put(kCurrentTermKey, std::to_string(reply_term));
|
||||
disk_storage_.Delete(kVotedForKey);
|
||||
granted_votes_ = 0;
|
||||
Transition(Mode::FOLLOWER);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void RaftServer::DeleteLogSuffix(int starting_index) {
|
||||
auto log = Log();
|
||||
log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed
|
||||
disk_storage_.Put(kLogKey, SerializeLog(log));
|
||||
}
|
||||
|
||||
void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
|
||||
const std::vector<LogEntry> &new_entries) {
|
||||
auto log = Log();
|
||||
for (auto &entry : new_entries) log.emplace_back(entry);
|
||||
// See Raft paper 5.3
|
||||
if (leader_commit_index > commit_index_)
|
||||
commit_index_ = std::min(leader_commit_index, log.size());
|
||||
disk_storage_.Put(kLogKey, SerializeLog(log));
|
||||
}
|
||||
|
||||
std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) {
|
||||
::capnp::MallocMessageBuilder message;
|
||||
auto log_builder =
|
||||
message.initRoot<::capnp::List<capnp::LogEntry>>(log.size());
|
||||
utils::SaveVector<capnp::LogEntry, LogEntry>(
|
||||
log, &log_builder, [](auto *log_builder, const auto &log_entry) {
|
||||
Save(log_entry, log_builder);
|
||||
});
|
||||
|
||||
std::stringstream stream(std::ios_base::in | std::ios_base::out |
|
||||
std::ios_base::binary);
|
||||
kj::std::StdOutputStream std_stream(stream);
|
||||
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
|
||||
writeMessage(buffered_stream, message);
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
std::vector<LogEntry> RaftServer::DeserializeLog(
|
||||
const std::string &serialized_log) {
|
||||
if (serialized_log.empty()) return {};
|
||||
::capnp::MallocMessageBuilder message;
|
||||
std::stringstream stream(std::ios_base::in | std::ios_base::out |
|
||||
std::ios_base::binary);
|
||||
kj::std::StdInputStream std_stream(stream);
|
||||
kj::BufferedInputStreamWrapper buffered_stream(std_stream);
|
||||
stream << serialized_log;
|
||||
readMessageCopy(buffered_stream, message);
|
||||
|
||||
::capnp::List<capnp::LogEntry>::Reader log_reader =
|
||||
message.getRoot<::capnp::List<capnp::LogEntry>>().asReader();
|
||||
std::vector<LogEntry> deserialized_log;
|
||||
utils::LoadVector<capnp::LogEntry, LogEntry>(&deserialized_log, log_reader,
|
||||
[](const auto &log_reader) {
|
||||
LogEntry log_entry;
|
||||
Load(&log_entry, log_reader);
|
||||
return log_entry;
|
||||
});
|
||||
return deserialized_log;
|
||||
}
|
||||
|
||||
} // namespace raft
|
||||
|
@ -8,13 +8,17 @@
|
||||
|
||||
#include "durability/single_node_ha/state_delta.hpp"
|
||||
#include "raft/config.hpp"
|
||||
#include "raft/coordination.hpp"
|
||||
#include "raft/log_entry.hpp"
|
||||
#include "raft/raft_rpc_messages.hpp"
|
||||
#include "storage/common/kvstore/kvstore.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
// Forward declaration.
|
||||
class Coordination;
|
||||
using Clock = std::chrono::system_clock;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
|
||||
|
||||
@ -26,20 +30,37 @@ class RaftServer {
|
||||
public:
|
||||
RaftServer() = delete;
|
||||
|
||||
/**
|
||||
* The implementation assumes that server IDs are unique integers between
|
||||
* ranging from 1 to cluster_size.
|
||||
*
|
||||
* @param server_id ID of the current server.
|
||||
* @param durbility_dir directory for persisted data.
|
||||
* @param config raft configuration.
|
||||
* @param coordination Abstraction for coordination between Raft servers.
|
||||
*/
|
||||
/// The implementation assumes that server IDs are unique integers between
|
||||
/// ranging from 1 to cluster_size.
|
||||
///
|
||||
/// @param server_id ID of the current server.
|
||||
/// @param durbility_dir directory for persisted data.
|
||||
/// @param config raft configuration.
|
||||
/// @param coordination Abstraction for coordination between Raft servers.
|
||||
RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
const Config &config, raft::Coordination *coordination);
|
||||
|
||||
~RaftServer();
|
||||
|
||||
/// Retrieves the current term from persistent storage.
|
||||
///
|
||||
/// @throws MissingPersistentDataException
|
||||
uint64_t CurrentTerm();
|
||||
|
||||
/// Retrieves the ID of the server this server has voted for in
|
||||
/// the current term from persistent storage. Returns std::nullopt
|
||||
/// if such server doesn't exist.
|
||||
std::experimental::optional<uint16_t> VotedFor();
|
||||
|
||||
/// Retrieves the log entries from persistent storage. The log is 1-indexed
|
||||
/// in order to be consistent with the paper. If the Log isn't present in
|
||||
/// persistent storage, an empty Log will be created.
|
||||
std::vector<LogEntry> Log();
|
||||
|
||||
// TODO(msantl): document
|
||||
void Replicate(const std::vector<database::StateDelta> &log);
|
||||
|
||||
// TODO(msantl): document
|
||||
void Emplace(const database::StateDelta &delta);
|
||||
|
||||
private:
|
||||
@ -80,15 +101,17 @@ class RaftServer {
|
||||
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
|
||||
};
|
||||
|
||||
mutable std::mutex lock_; ///< Guards all internal state.
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// volatile state on all servers
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint16_t server_id_; ///< ID of the current server.
|
||||
Config config_; ///< Raft config.
|
||||
Coordination *coordination_{nullptr}; ///< Cluster coordination.
|
||||
|
||||
Mode mode_; ///< Server's current mode.
|
||||
uint16_t server_id_; ///< ID of the current server.
|
||||
uint64_t commit_index_; ///< Index of the highest known committed entry.
|
||||
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
|
||||
|
||||
@ -99,6 +122,33 @@ class RaftServer {
|
||||
/// log is ready for replication it will be discarded anyway.
|
||||
LogEntryBuffer log_entry_buffer_{this};
|
||||
|
||||
std::vector<std::thread> peer_threads_; ///< One thread per peer which
|
||||
///< handles outgoing RPCs.
|
||||
|
||||
std::condition_variable state_changed_; ///< Notifies all peer threads on
|
||||
///< relevant state change.
|
||||
|
||||
bool exiting_ = false; ///< True on server shutdown.
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// volatile state on followers and candidates
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::thread election_thread_; ///< Timer thread for triggering elections.
|
||||
TimePoint next_election_; ///< Next election `TimePoint`.
|
||||
|
||||
std::condition_variable election_change_; ///> Used to notify election_thread
|
||||
///> on next_election_ change.
|
||||
|
||||
std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}());
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// volatile state on candidates
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint16_t granted_votes_;
|
||||
std::vector<bool> vote_requested_;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// volatile state on leaders
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
@ -110,6 +160,10 @@ class RaftServer {
|
||||
///< highest log entry known to be
|
||||
///< replicated on server.
|
||||
|
||||
std::vector<TimePoint> next_heartbeat_; ///< for each server, time point for
|
||||
///< the next heartbeat.
|
||||
std::vector<TimePoint> backoff_until_; ///< backoff for each server.
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// persistent state on all servers
|
||||
//
|
||||
@ -119,11 +173,82 @@ class RaftServer {
|
||||
// term (null if none).
|
||||
// - vector<LogEntry> log -- log entries.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
storage::KVStore disk_storage_;
|
||||
|
||||
/// Makes a transition to a new `raft::Mode`.
|
||||
///
|
||||
/// @throws InvalidTransitionException when transitioning between incompatible
|
||||
void Transition(const Mode &new_mode);
|
||||
/// throws InvalidTransitionException when transitioning between incompatible
|
||||
/// `raft::Mode`s.
|
||||
void Transition(const raft::Mode &new_mode);
|
||||
|
||||
/// Recovers from persistent storage. This function should be called from
|
||||
/// the constructor before the server starts with normal operation.
|
||||
void Recover();
|
||||
|
||||
/// Main function of the `election_thread_`. It is responsible for
|
||||
/// transition to CANDIDATE mode when election timeout elapses.
|
||||
void ElectionThreadMain();
|
||||
|
||||
/// Main function of the thread that handles outgoing RPCs towards a
|
||||
/// specified node within the Raft cluster.
|
||||
///
|
||||
/// @param peer_id - ID of a receiving node in the cluster.
|
||||
void PeerThreadMain(int peer_id);
|
||||
|
||||
/// Sets the `TimePoint` for next election.
|
||||
void SetNextElectionTimePoint();
|
||||
|
||||
/// Checks if the current server obtained enough votes to become a leader.
|
||||
bool HasMajortyVote();
|
||||
|
||||
/// Returns relevant metadata about the last entry in this server's Raft Log.
|
||||
/// More precisely, returns a pair consisting of an index of the last entry
|
||||
/// in the log and the term of the last entry in the log.
|
||||
///
|
||||
/// @return std::pair<last_log_index, last_log_term>
|
||||
std::pair<uint64_t, uint64_t> LastEntryData();
|
||||
|
||||
/// Checks whether Raft log of server A is at least as up-to-date as the Raft
|
||||
/// log of server B. This is strictly defined in Raft paper 5.4.
|
||||
///
|
||||
/// @param last_log_index_a - Index of server A's last log entry.
|
||||
/// @param last_log_term_a - Term of server A's last log entry.
|
||||
/// @param last_log_index_b - Index of server B's last log entry.
|
||||
/// @param last_log_term_b - Term of server B's last log entry.
|
||||
bool AtLeastUpToDate(uint64_t last_log_index_a, uint64_t last_log_term_a,
|
||||
uint64_t last_log_index_b, uint64_t last_log_term_b);
|
||||
|
||||
/// Checks whether the current server got a reply from "future", i.e. reply
|
||||
/// with a higher term. If so, the current server falls back to follower mode
|
||||
/// and updates its current term.
|
||||
///
|
||||
/// @param reply_term Term from RPC response.
|
||||
/// @return true if the current server's term lags behind.
|
||||
bool OutOfSync(uint64_t reply_term);
|
||||
|
||||
/// Deletes log entries with indexes that are greater or equal to the given
|
||||
/// starting index.
|
||||
///
|
||||
/// @param starting_index Smallest index which will be deleted from the Log.
|
||||
/// Also, a friendly remainder that log entries are
|
||||
/// 1-indexed.
|
||||
void DeleteLogSuffix(int starting_index);
|
||||
|
||||
/// Appends new log entries to Raft log. Note that this function is not
|
||||
/// smart in any way, i.e. the caller should make sure that it's safe
|
||||
/// to call this function. This function also updates this server's commit
|
||||
/// index if necessary.
|
||||
///
|
||||
/// @param leader_commit_index - Used to update local commit index.
|
||||
/// @param new_entries - New `LogEntry` instances to be appended in the log.
|
||||
void AppendLogEntries(uint64_t leader_commit_index,
|
||||
const std::vector<LogEntry> &new_entries);
|
||||
|
||||
/// Serializes Raft log into `std::string`.
|
||||
std::string SerializeLog(const std::vector<LogEntry> &log);
|
||||
|
||||
/// Deserializes Raft log from `std::string`.
|
||||
std::vector<LogEntry> DeserializeLog(const std::string &serialized_log);
|
||||
};
|
||||
} // namespace raft
|
||||
|
Loading…
Reference in New Issue
Block a user