From d637629078d3c154fcf13a8d7b0fae5de49a9a9e Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Fri, 7 Dec 2018 16:19:03 +0100 Subject: [PATCH] Implement Raft RPC, log serialization for disk storage and leader election Summary: This diff contains a rough implementation of the Raft protocol which ends at leader election. Reviewers: msantl Reviewed By: msantl Subscribers: teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1744 --- src/raft/config.hpp | 19 +- src/raft/coordination.cpp | 11 + src/raft/coordination.hpp | 23 +- src/raft/exceptions.hpp | 35 +-- src/raft/log_entry.lcp | 3 +- src/raft/raft_rpc_messages.lcp | 2 +- src/raft/raft_server.cpp | 468 ++++++++++++++++++++++++++++++++- src/raft/raft_server.hpp | 153 ++++++++++- 8 files changed, 658 insertions(+), 56 deletions(-) diff --git a/src/raft/config.hpp b/src/raft/config.hpp index 2800e94dc..3942edc39 100644 --- a/src/raft/config.hpp +++ b/src/raft/config.hpp @@ -16,8 +16,8 @@ namespace raft { /// Configurable Raft parameters. struct Config { - std::chrono::milliseconds leader_timeout_min; - std::chrono::milliseconds leader_timeout_max; + std::chrono::milliseconds election_timeout_min; + std::chrono::milliseconds election_timeout_max; std::chrono::milliseconds heartbeat_interval; std::chrono::milliseconds replicate_timeout; @@ -34,19 +34,22 @@ struct Config { } if (!data.is_object()) throw RaftConfigException(raft_config_file); - if (!data["leader_timeout_min"].is_number()) + if (!data["election_timeout_min"].is_number()) throw RaftConfigException(raft_config_file); - if (!data["leader_timeout_max"].is_number()) + if (!data["election_timeout_max"].is_number()) throw RaftConfigException(raft_config_file); if (!data["heartbeat_interval"].is_number()) throw RaftConfigException(raft_config_file); if (!data["replicate_timeout"].is_number()) throw RaftConfigException(raft_config_file); - return Config{std::chrono::duration(data["leader_timeout_min"]), - std::chrono::duration(data["leader_timeout_max"]), - std::chrono::duration(data["heartbeat_interval"]), - std::chrono::duration(data["replicate_timeout"])}; + return Config{ + std::chrono::duration( + data["election_timeout_min"]), + std::chrono::duration( + data["election_timeout_max"]), + std::chrono::duration(data["heartbeat_interval"]), + std::chrono::duration(data["replicate_timeout"])}; } }; diff --git a/src/raft/coordination.cpp b/src/raft/coordination.cpp index 81117dbb8..5a318724e 100644 --- a/src/raft/coordination.cpp +++ b/src/raft/coordination.cpp @@ -64,6 +64,13 @@ io::network::Endpoint Coordination::GetServerEndpoint() { return server_.endpoint(); } +std::vector Coordination::GetWorkerIds() { + std::lock_guard guard(lock_); + std::vector worker_ids; + for (auto worker : workers_) worker_ids.push_back(worker.first); + return worker_ids; +} + communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) { std::lock_guard guard(lock_); auto found = client_pools_.find(worker_id); @@ -79,6 +86,10 @@ communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) { .first->second; } +uint16_t Coordination::WorkerCount() { + return workers_.size(); +} + bool Coordination::Start() { if (!server_.Start()) return false; AddWorker(worker_id_, server_.endpoint()); diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp index 059f07427..1aed88c17 100644 --- a/src/raft/coordination.hpp +++ b/src/raft/coordination.hpp @@ -15,7 +15,6 @@ #include "communication/rpc/server.hpp" #include "io/network/endpoint.hpp" #include "raft/exceptions.hpp" -#include "utils/future.hpp" #include "utils/thread.hpp" namespace raft { @@ -55,11 +54,16 @@ class Coordination final { /// Gets the endpoint for this RPC server. io::network::Endpoint GetServerEndpoint(); + /// Returns all workers ids. + std::vector GetWorkerIds(); + /// Returns a cached `ClientPool` for the given `worker_id`. communication::rpc::ClientPool *GetClientPool(int worker_id); + uint16_t WorkerCount(); + /// Asynchronously executes the given function on the RPC client for the - /// given worker id. Returns an `utils::Future` of the given `execute` + /// given worker id. Returns an `std::future` of the given `execute` /// function's return type. template auto ExecuteOnWorker( @@ -69,6 +73,21 @@ class Coordination final { auto client_pool = GetClientPool(worker_id); return thread_pool_.Run(execute, worker_id, std::ref(*client_pool)); } + /// Asynchroniously executes the `execute` function on all worker rpc clients + /// except the one whose id is `skip_worker_id`. Returns a vector of futures + /// contaning the results of the `execute` function. + template + auto ExecuteOnWorkers( + int skip_worker_id, + std::function + execute) { + std::vector> futures; + for (auto &worker_id : GetWorkerIds()) { + if (worker_id == skip_worker_id) continue; + futures.emplace_back(std::move(ExecuteOnWorker(worker_id, execute))); + } + return futures; + } template void Register(std::function< diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index 0184abb7b..fab721b11 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -6,19 +6,15 @@ namespace raft { -/** - * Base exception class used for all exceptions that can occur within the - * Raft protocol. - */ +/// Base exception class used for all exceptions that can occur within the +/// Raft protocol. class RaftException : public utils::BasicException { public: using utils::BasicException::BasicException; }; -/** - * This exception should be thrown when attempting to transition between - * incompatible states, e.g. from `FOLLOWER` to `LEADER`. - */ +/// This exception should be thrown when attempting to transition between +/// incompatible states, e.g. from `FOLLOWER` to `LEADER`. class InvalidTransitionException : public RaftException { public: using RaftException::RaftException; @@ -28,10 +24,8 @@ class InvalidTransitionException : public RaftException { new_mode) {} }; -/** - * Exception used to indicate something is wrong with the raft config provided - * by the user. - */ +/// Exception used to indicate something is wrong with the raft config provided +/// by the user. class RaftConfigException : public RaftException { public: using RaftException::RaftException; @@ -39,10 +33,8 @@ class RaftConfigException : public RaftException { : RaftException("Unable to parse raft config file " + path) {} }; -/** - * Exception used to indicate something is wrong with the coordination config - * provided by the user. - */ +/// Exception used to indicate something is wrong with the coordination config +/// provided by the user. class RaftCoordinationConfigException : public RaftException { public: using RaftException::RaftException; @@ -51,4 +43,15 @@ class RaftCoordinationConfigException : public RaftException { } }; +/// This exception should be thrown when a `RaftServer` instance attempts +/// to read data from persistent storage which is missing. +class MissingPersistentDataException : public RaftException { + public: + using RaftException::RaftException; + explicit MissingPersistentDataException(const std::string &key) + : RaftException( + "Attempting to read non-existing persistent data under key: " + + key) {} +}; + } // namespace raft diff --git a/src/raft/log_entry.lcp b/src/raft/log_entry.lcp index fcef95dfc..eb3936d1f 100644 --- a/src/raft/log_entry.lcp +++ b/src/raft/log_entry.lcp @@ -13,7 +13,8 @@ cpp<# (lcp:capnp-import 'database "/database/single_node_ha/serialization.capnp") (lcp:define-struct log-entry () - ((deltas "std::vector" :capnp-type "List(Database.StateDelta)")) + ((term :uint64_t) + (deltas "std::vector" :capnp-type "List(Database.StateDelta)")) (:serialize (:slk) (:capnp))) (lcp:pop-namespace) ;; raft diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp index 0311309f7..779d6e1c8 100644 --- a/src/raft/raft_rpc_messages.lcp +++ b/src/raft/raft_rpc_messages.lcp @@ -27,7 +27,7 @@ cpp<# (lcp:define-rpc append-entries (:request ((leader-id :uint16_t) - (leader-commit :uint16_t) + (leader-commit :uint64_t) (term :uint64_t) (prev-log-index :uint64_t) (prev-log-term :uint64_t) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 449c239ac..d3730f8ef 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -1,45 +1,186 @@ #include "raft/raft_server.hpp" #include +#include #include #include -#include "raft/coordination.hpp" #include "raft/exceptions.hpp" -#include "raft/raft_rpc_messages.hpp" #include "utils/exceptions.hpp" +#include "utils/serialization.hpp" namespace raft { namespace fs = std::experimental::filesystem; -const std::string kRaftDir = "raft"; +const std::string kCurrentTermKey = "current_term"; +const std::string kVotedForKey = "voted_for"; +const std::string kLogKey = "log"; +const std::string kRaftDir = "raft"; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, Coordination *coordination) - : server_id_(server_id), - config_(config), + : config_(config), coordination_(coordination), + mode_(Mode::FOLLOWER), + server_id_(server_id), disk_storage_(fs::path(durability_dir) / kRaftDir) { - coordination_->Register( + + // Persistent storage initialization/recovery. + if (Log().empty()) { + disk_storage_.Put(kCurrentTermKey, "0"); + } else { + Recover(); + } + + // Peer state + int cluster_size = coordination_->WorkerCount(); + next_index_.resize(cluster_size); + match_index_.resize(cluster_size); + next_heartbeat_.resize(cluster_size); + backoff_until_.resize(cluster_size); + + // RPC registration + coordination->Register( [this](const auto &req_reader, auto *res_builder) { - throw utils::NotYetImplemented("RaftServer constructor"); + std::lock_guard guard(lock_); + RequestVoteReq req; + Load(&req, req_reader); + + // [Raft paper 5.1] + // "If a server recieves a request with a stale term, + // it rejects the request" + uint64_t current_term = CurrentTerm(); + if (req.term < current_term) { + RequestVoteRes res(false, current_term); + Save(res, res_builder); + return; + } + + // [Raft paper 5.2, 5.4] + // "Each server will vote for at most one candidate in a given + // term, on a first-come-first-serve basis with an additional + // restriction on votes" + // Restriction: "The voter denies its vote if its own log is more + // up-to-date than that of the candidate" + std::experimental::optional voted_for = VotedFor(); + auto last_entry_data = LastEntryData(); + RequestVoteRes res( + (!voted_for || voted_for.value() == req.candidate_id) && + AtLeastUpToDate(req.last_log_index, req.last_log_term, + last_entry_data.first, last_entry_data.second), + current_term); + Save(res, res_builder); }); coordination_->Register( [this](const auto &req_reader, auto *res_builder) { - throw utils::NotYetImplemented("RaftServer constructor"); + std::lock_guard guard(lock_); + AppendEntriesReq req; + Load(&req, req_reader); + + // [Raft paper 5.1] + // "If a server recieves a request with a stale term, + // it rejects the request" + uint64_t current_term = CurrentTerm(); + if (req.term < current_term) { + AppendEntriesRes res(false, current_term); + Save(res, res_builder); + return; + } + + // respond positively to a heartbeat. + // TODO(ipaljak) review this when implementing log replication. + if (req.entries.empty()) { + AppendEntriesRes res(true, current_term); + Save(res, res_builder); + if (mode_ != Mode::FOLLOWER) { + Transition(Mode::FOLLOWER); + state_changed_.notify_all(); + } else { + SetNextElectionTimePoint(); + } + return; + } + + throw utils::NotYetImplemented( + "AppendEntriesRpc which is not a heartbeat"); + + // [Raft paper 5.3] + // "If a follower's log is inconsistent with the leader's, the + // consistency check will fail in the next AppendEntries RPC." + // + // Consistency checking assures the Log Matching Property: + // - If two entries in different logs have the same index and + // term, then they store the same command. + // - If two entries in different logs have the same index and term, + // then the logs are identical in all preceding entries. + auto log = Log(); + if (log.size() < req.prev_log_index || + log[req.prev_log_index - 1].term != req.prev_log_term) { + AppendEntriesRes res(false, current_term); + Save(res, res_builder); + return; + } + + // If existing entry conflicts with new one, we need to delete the + // existing entry and all that follow it. + if (log.size() > req.prev_log_index && + log[req.prev_log_index].term != req.entries[0].term) + DeleteLogSuffix(req.prev_log_index + 1); + + AppendLogEntries(req.leader_commit, req.entries); + AppendEntriesRes res(true, current_term); + Save(res, res_builder); }); + + SetNextElectionTimePoint(); + election_thread_ = + std::thread(&RaftServer::ElectionThreadMain, this); + + for (const auto &peer_id : coordination_->GetWorkerIds()) { + if (peer_id == server_id_) continue; + peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); + } } -void RaftServer::Transition(const Mode &new_mode) { - if (new_mode == Mode::LEADER) - log_entry_buffer_.Enable(); - else - log_entry_buffer_.Disable(); +RaftServer::~RaftServer() { + exiting_ = true; - throw utils::NotYetImplemented("RaftServer transition"); + state_changed_.notify_all(); + election_change_.notify_all(); + + for (auto &peer_thread : peer_threads_) { + if (peer_thread.joinable()) + peer_thread.join(); + } + + if (election_thread_.joinable()) + election_thread_.join(); +} + +uint64_t RaftServer::CurrentTerm() { + auto opt_value = disk_storage_.Get(kCurrentTermKey); + if (opt_value == std::experimental::nullopt) + throw MissingPersistentDataException(kCurrentTermKey); + return std::stoull(opt_value.value()); +} + +std::experimental::optional RaftServer::VotedFor() { + auto opt_value = disk_storage_.Get(kVotedForKey); + if (opt_value == std::experimental::nullopt) + return std::experimental::nullopt; + return {std::stoul(opt_value.value())}; +} + +std::vector RaftServer::Log() { + auto opt_value = disk_storage_.Get(kLogKey); + if (opt_value == std::experimental::nullopt) { + disk_storage_.Put(kLogKey, SerializeLog({})); + return {}; + } + return DeserializeLog(opt_value.value()); } void RaftServer::Replicate(const std::vector &log) { @@ -107,4 +248,303 @@ bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd( } } +void RaftServer::Transition(const Mode &new_mode) { + switch (new_mode) { + case Mode::FOLLOWER: { + LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER"; + SetNextElectionTimePoint(); + mode_ = Mode::FOLLOWER; + //log_entry_buffer_.Disable(); + break; + } + + case Mode::CANDIDATE: { + LOG(INFO) << "Server " << server_id_ << ": Transition to CANDIDATE"; + //log_entry_buffer_.Disable(); + + // [Raft thesis, section 3.4] + // "Each candidate restarts its randomized election timeout at the start + // of an election, and it waits for that timeout to elapse before + // starting the next election; this reduces the likelihood of another + // split vote in the new election." + SetNextElectionTimePoint(); + election_change_.notify_all(); + + // [Raft thesis, section 3.4] + // "To begin an election, a follower increments its current term and + // transitions to candidate state. It then votes for itself and issues + // RequestVote RPCs in parallel to each of the other servers in the + // cluster." + disk_storage_.Put(kCurrentTermKey, std::to_string(CurrentTerm() + 1)); + disk_storage_.Put(kVotedForKey, std::to_string(server_id_)); + + granted_votes_ = 1; + vote_requested_.assign(coordination_->WorkerCount(), false); + + mode_ = Mode::CANDIDATE; + + if (HasMajortyVote()) { + Transition(Mode::LEADER); + state_changed_.notify_all(); + return; + } + + break; + } + + case Mode::LEADER: { + LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER"; + //log_entry_buffer_.Enable(); + + // Freeze election timer + next_election_ = TimePoint::max(); + election_change_.notify_all(); + + // Set next heartbeat to correct values + TimePoint now = Clock::now(); + for (auto &peer_heartbeat : next_heartbeat_) + peer_heartbeat = now + config_.heartbeat_interval; + + mode_ = Mode::LEADER; + + // TODO(ipaljak): Implement no-op replication. For now, we are only + // sending heartbeats. + break; + } + } +} + +void RaftServer::Recover() { + throw utils::NotYetImplemented("RaftServer recover"); +} + +void RaftServer::ElectionThreadMain() { + std::unique_lock lock(lock_); + while (!exiting_) { + if (Clock::now() >= next_election_) { + LOG(INFO) << "Server " << server_id_ << ": Election timeout exceeded"; + Transition(Mode::CANDIDATE); + state_changed_.notify_all(); + } + election_change_.wait_until(lock, next_election_); + } +} + +void RaftServer::PeerThreadMain(int peer_id) { + std::unique_lock lock(lock_); + + /* This loop will either call a function that issues an RPC or wait on the + * condition variable. It must not do both! Lock on `mutex_` is released + * while waiting for RPC response, which might cause us to miss a + * notification on `state_changed_` conditional variable and wait + * indefinitely. The safest thing to do is to assume some important part of + * state was modified while we were waiting for the response and loop around + * to check. */ + while (!exiting_) { + TimePoint now = Clock::now(); + TimePoint wait_until; + + if (mode_ != Mode::FOLLOWER && backoff_until_[peer_id] > now) { + wait_until = backoff_until_[peer_id]; + } else { + switch (mode_) { + case Mode::FOLLOWER: { + wait_until = TimePoint::max(); + break; + } + + case Mode::CANDIDATE: { + if (vote_requested_[peer_id]) break; + + // TODO(ipaljak): Consider backoff. + wait_until = TimePoint::max(); + + auto request_term = CurrentTerm(); + auto peer_future = coordination_->ExecuteOnWorker( + peer_id, [&](int worker_id, auto &client) { + auto last_entry_data = LastEntryData(); + try { + auto res = client.template Call( + server_id_, request_term, last_entry_data.first, + last_entry_data.second); + return res; + } catch (...) { + // not being able to connect to peer defaults to a vote + // being denied from that peer. This is correct but not + // optimal. + // + // TODO(ipaljak): reconsider this decision :) + return RequestVoteRes(false, request_term); + } + }); + lock.unlock(); // Release lock while waiting for response + auto reply = peer_future.get(); + lock.lock(); + + if (CurrentTerm() != request_term || mode_ != Mode::CANDIDATE || + exiting_) { + LOG(INFO) << "Server " << server_id_ + << ": Ignoring RequestVoteRPC reply from " << peer_id; + break; + } + + if (OutOfSync(reply.term)) { + state_changed_.notify_all(); + continue; + } + + vote_requested_[peer_id] = true; + + if (reply.vote_granted) { + LOG(INFO) << "Server " << server_id_ << ": Got vote from " + << peer_id; + ++granted_votes_; + if (HasMajortyVote()) Transition(Mode::LEADER); + } else { + LOG(INFO) << "Server " << server_id_ << ": Denied vote from " + << peer_id; + } + + state_changed_.notify_all(); + continue; + } + + case Mode::LEADER: { + if (now >= next_heartbeat_[peer_id]) { + LOG(INFO) << "Server " << server_id_ << ": Send HB to server " + << peer_id; + auto peer_future = coordination_->ExecuteOnWorker( + peer_id, [&](int worker_id, auto &client) { + auto last_entry_data = LastEntryData(); + std::vector empty_entries; + auto res = client.template Call( + server_id_, commit_index_, CurrentTerm(), + last_entry_data.first, last_entry_data.second, + empty_entries); + return res; + }); + next_heartbeat_[peer_id] = + Clock::now() + config_.heartbeat_interval; + } + wait_until = next_heartbeat_[peer_id]; + break; + } + } + } + + state_changed_.wait_until(lock, wait_until); + } +} + +void RaftServer::SetNextElectionTimePoint() { + // [Raft thesis, section 3.4] + // "Raft uses randomized election timeouts to ensure that split votes are + // rare and that they are resolved quickly. To prevent split votes in the + // first place, election timeouts are chosen randomly from a fixed interval + // (e.g., 150-300 ms)." + std::uniform_int_distribution distribution( + config_.election_timeout_min.count(), + config_.election_timeout_max.count()); + Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_)); + next_election_ = Clock::now() + wait_interval; +} + +bool RaftServer::HasMajortyVote() { + if (2 * granted_votes_ > coordination_->WorkerCount()) { + LOG(INFO) << "Server " << server_id_ << ": Obtained majority vote"; + return true; + } + return false; +} + +std::pair RaftServer::LastEntryData() { + auto log = Log(); + if (log.empty()) return {0, 0}; + return {log.size(), log.back().term}; +} + +bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a, + uint64_t last_log_term_a, + uint64_t last_log_index_b, + uint64_t last_log_term_b) { + if (last_log_term_a == last_log_term_b) + return last_log_index_a >= last_log_index_b; + return last_log_term_a > last_log_term_b; +} + +bool RaftServer::OutOfSync(uint64_t reply_term) { + DCHECK(mode_ != Mode::FOLLOWER) << "`OutOfSync` called from FOLLOWER mode"; + + // [Raft thesis, Section 3.3] + // "Current terms are exchanged whenever servers communicate; if one + // server's current term is smaller than the other's, then it updates + // its current term to the larger value. If a candidate or leader + // discovers that its term is out of date, it immediately reverts to + // follower state." + if (CurrentTerm() < reply_term) { + disk_storage_.Put(kCurrentTermKey, std::to_string(reply_term)); + disk_storage_.Delete(kVotedForKey); + granted_votes_ = 0; + Transition(Mode::FOLLOWER); + return true; + } + return false; +} + +void RaftServer::DeleteLogSuffix(int starting_index) { + auto log = Log(); + log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed + disk_storage_.Put(kLogKey, SerializeLog(log)); +} + +void RaftServer::AppendLogEntries(uint64_t leader_commit_index, + const std::vector &new_entries) { + auto log = Log(); + for (auto &entry : new_entries) log.emplace_back(entry); + // See Raft paper 5.3 + if (leader_commit_index > commit_index_) + commit_index_ = std::min(leader_commit_index, log.size()); + disk_storage_.Put(kLogKey, SerializeLog(log)); +} + +std::string RaftServer::SerializeLog(const std::vector &log) { + ::capnp::MallocMessageBuilder message; + auto log_builder = + message.initRoot<::capnp::List>(log.size()); + utils::SaveVector( + log, &log_builder, [](auto *log_builder, const auto &log_entry) { + Save(log_entry, log_builder); + }); + + std::stringstream stream(std::ios_base::in | std::ios_base::out | + std::ios_base::binary); + kj::std::StdOutputStream std_stream(stream); + kj::BufferedOutputStreamWrapper buffered_stream(std_stream); + writeMessage(buffered_stream, message); + return stream.str(); +} + +std::vector RaftServer::DeserializeLog( + const std::string &serialized_log) { + if (serialized_log.empty()) return {}; + ::capnp::MallocMessageBuilder message; + std::stringstream stream(std::ios_base::in | std::ios_base::out | + std::ios_base::binary); + kj::std::StdInputStream std_stream(stream); + kj::BufferedInputStreamWrapper buffered_stream(std_stream); + stream << serialized_log; + readMessageCopy(buffered_stream, message); + + ::capnp::List::Reader log_reader = + message.getRoot<::capnp::List>().asReader(); + std::vector deserialized_log; + utils::LoadVector(&deserialized_log, log_reader, + [](const auto &log_reader) { + LogEntry log_entry; + Load(&log_entry, log_reader); + return log_entry; + }); + return deserialized_log; +} + } // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 732509e9d..2a5f747cf 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -8,13 +8,17 @@ #include "durability/single_node_ha/state_delta.hpp" #include "raft/config.hpp" +#include "raft/coordination.hpp" +#include "raft/log_entry.hpp" +#include "raft/raft_rpc_messages.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" +#include "utils/scheduler.hpp" namespace raft { -// Forward declaration. -class Coordination; +using Clock = std::chrono::system_clock; +using TimePoint = std::chrono::system_clock::time_point; enum class Mode { FOLLOWER, CANDIDATE, LEADER }; @@ -26,20 +30,37 @@ class RaftServer { public: RaftServer() = delete; - /** - * The implementation assumes that server IDs are unique integers between - * ranging from 1 to cluster_size. - * - * @param server_id ID of the current server. - * @param durbility_dir directory for persisted data. - * @param config raft configuration. - * @param coordination Abstraction for coordination between Raft servers. - */ + /// The implementation assumes that server IDs are unique integers between + /// ranging from 1 to cluster_size. + /// + /// @param server_id ID of the current server. + /// @param durbility_dir directory for persisted data. + /// @param config raft configuration. + /// @param coordination Abstraction for coordination between Raft servers. RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, raft::Coordination *coordination); + ~RaftServer(); + + /// Retrieves the current term from persistent storage. + /// + /// @throws MissingPersistentDataException + uint64_t CurrentTerm(); + + /// Retrieves the ID of the server this server has voted for in + /// the current term from persistent storage. Returns std::nullopt + /// if such server doesn't exist. + std::experimental::optional VotedFor(); + + /// Retrieves the log entries from persistent storage. The log is 1-indexed + /// in order to be consistent with the paper. If the Log isn't present in + /// persistent storage, an empty Log will be created. + std::vector Log(); + + // TODO(msantl): document void Replicate(const std::vector &log); + // TODO(msantl): document void Emplace(const database::StateDelta &delta); private: @@ -80,15 +101,17 @@ class RaftServer { bool IsStateDeltaTransactionEnd(const database::StateDelta &delta); }; + mutable std::mutex lock_; ///< Guards all internal state. + ////////////////////////////////////////////////////////////////////////////// // volatile state on all servers ////////////////////////////////////////////////////////////////////////////// - uint16_t server_id_; ///< ID of the current server. Config config_; ///< Raft config. Coordination *coordination_{nullptr}; ///< Cluster coordination. Mode mode_; ///< Server's current mode. + uint16_t server_id_; ///< ID of the current server. uint64_t commit_index_; ///< Index of the highest known committed entry. uint64_t last_applied_; ///< Index of the highest applied entry to SM. @@ -99,6 +122,33 @@ class RaftServer { /// log is ready for replication it will be discarded anyway. LogEntryBuffer log_entry_buffer_{this}; + std::vector peer_threads_; ///< One thread per peer which + ///< handles outgoing RPCs. + + std::condition_variable state_changed_; ///< Notifies all peer threads on + ///< relevant state change. + + bool exiting_ = false; ///< True on server shutdown. + + ////////////////////////////////////////////////////////////////////////////// + // volatile state on followers and candidates + ////////////////////////////////////////////////////////////////////////////// + + std::thread election_thread_; ///< Timer thread for triggering elections. + TimePoint next_election_; ///< Next election `TimePoint`. + + std::condition_variable election_change_; ///> Used to notify election_thread + ///> on next_election_ change. + + std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}()); + + ////////////////////////////////////////////////////////////////////////////// + // volatile state on candidates + ////////////////////////////////////////////////////////////////////////////// + + uint16_t granted_votes_; + std::vector vote_requested_; + ////////////////////////////////////////////////////////////////////////////// // volatile state on leaders ////////////////////////////////////////////////////////////////////////////// @@ -110,6 +160,10 @@ class RaftServer { ///< highest log entry known to be ///< replicated on server. + std::vector next_heartbeat_; ///< for each server, time point for + ///< the next heartbeat. + std::vector backoff_until_; ///< backoff for each server. + ////////////////////////////////////////////////////////////////////////////// // persistent state on all servers // @@ -119,11 +173,82 @@ class RaftServer { // term (null if none). // - vector log -- log entries. ////////////////////////////////////////////////////////////////////////////// + storage::KVStore disk_storage_; /// Makes a transition to a new `raft::Mode`. /// - /// @throws InvalidTransitionException when transitioning between incompatible - void Transition(const Mode &new_mode); + /// throws InvalidTransitionException when transitioning between incompatible + /// `raft::Mode`s. + void Transition(const raft::Mode &new_mode); + + /// Recovers from persistent storage. This function should be called from + /// the constructor before the server starts with normal operation. + void Recover(); + + /// Main function of the `election_thread_`. It is responsible for + /// transition to CANDIDATE mode when election timeout elapses. + void ElectionThreadMain(); + + /// Main function of the thread that handles outgoing RPCs towards a + /// specified node within the Raft cluster. + /// + /// @param peer_id - ID of a receiving node in the cluster. + void PeerThreadMain(int peer_id); + + /// Sets the `TimePoint` for next election. + void SetNextElectionTimePoint(); + + /// Checks if the current server obtained enough votes to become a leader. + bool HasMajortyVote(); + + /// Returns relevant metadata about the last entry in this server's Raft Log. + /// More precisely, returns a pair consisting of an index of the last entry + /// in the log and the term of the last entry in the log. + /// + /// @return std::pair + std::pair LastEntryData(); + + /// Checks whether Raft log of server A is at least as up-to-date as the Raft + /// log of server B. This is strictly defined in Raft paper 5.4. + /// + /// @param last_log_index_a - Index of server A's last log entry. + /// @param last_log_term_a - Term of server A's last log entry. + /// @param last_log_index_b - Index of server B's last log entry. + /// @param last_log_term_b - Term of server B's last log entry. + bool AtLeastUpToDate(uint64_t last_log_index_a, uint64_t last_log_term_a, + uint64_t last_log_index_b, uint64_t last_log_term_b); + + /// Checks whether the current server got a reply from "future", i.e. reply + /// with a higher term. If so, the current server falls back to follower mode + /// and updates its current term. + /// + /// @param reply_term Term from RPC response. + /// @return true if the current server's term lags behind. + bool OutOfSync(uint64_t reply_term); + + /// Deletes log entries with indexes that are greater or equal to the given + /// starting index. + /// + /// @param starting_index Smallest index which will be deleted from the Log. + /// Also, a friendly remainder that log entries are + /// 1-indexed. + void DeleteLogSuffix(int starting_index); + + /// Appends new log entries to Raft log. Note that this function is not + /// smart in any way, i.e. the caller should make sure that it's safe + /// to call this function. This function also updates this server's commit + /// index if necessary. + /// + /// @param leader_commit_index - Used to update local commit index. + /// @param new_entries - New `LogEntry` instances to be appended in the log. + void AppendLogEntries(uint64_t leader_commit_index, + const std::vector &new_entries); + + /// Serializes Raft log into `std::string`. + std::string SerializeLog(const std::vector &log); + + /// Deserializes Raft log from `std::string`. + std::vector DeserializeLog(const std::string &serialized_log); }; } // namespace raft