Add Start/Stop methods to RaftServer

Summary:
Explicitly start and stop raft server.
This way we can be sure that raft won't try to use coordination after it's
shutdown, and we can define the start of th raft protocol easier.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1766
This commit is contained in:
Matija Santl 2018-12-11 10:51:37 +01:00
parent 8e796e9fd1
commit 8e35d8afdc
3 changed files with 35 additions and 22 deletions

View File

@ -19,6 +19,7 @@ GraphDb::GraphDb(Config config) : config_(config) {}
void GraphDb::Start() {
utils::CheckDir(config_.durability_directory);
raft_server_.Start();
CHECK(coordination_.Start()) << "Couldn't start coordination!";
// Start transaction killer.
@ -62,7 +63,10 @@ bool GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
return ret;
}
void GraphDb::Shutdown() { coordination_.Shutdown(); }
void GraphDb::Shutdown() {
raft_server_.Shutdown();
coordination_.Shutdown();
}
std::unique_ptr<GraphDbAccessor> GraphDb::Access() {
// NOTE: We are doing a heap allocation to allow polymorphism. If this poses

View File

@ -27,7 +27,9 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
mode_(Mode::FOLLOWER),
server_id_(server_id),
disk_storage_(fs::path(durability_dir) / kRaftDir),
reset_callback_(reset_callback) {
reset_callback_(reset_callback) {}
void RaftServer::Start() {
// Persistent storage initialization/recovery.
if (Log().empty()) {
UpdateTerm(0);
@ -36,14 +38,14 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
}
// Peer state
int cluster_size = coordination_->WorkerCount();
int cluster_size = coordination_->WorkerCount() + 1;
next_index_.resize(cluster_size);
match_index_.resize(cluster_size);
next_heartbeat_.resize(cluster_size);
backoff_until_.resize(cluster_size);
// RPC registration
coordination->Register<RequestVoteRpc>(
coordination_->Register<RequestVoteRpc>(
[this](const auto &req_reader, auto *res_builder) {
std::lock_guard<std::mutex> guard(lock_);
RequestVoteReq req;
@ -64,8 +66,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER)
Transition(Mode::FOLLOWER);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
// [Raft paper 5.2, 5.4]
@ -105,8 +106,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER)
Transition(Mode::FOLLOWER);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
// respond positively to a heartbeat.
@ -162,11 +162,14 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
}
}
RaftServer::~RaftServer() {
void RaftServer::Shutdown() {
{
std::lock_guard<std::mutex> guard(lock_);
exiting_ = true;
state_changed_.notify_all();
election_change_.notify_all();
}
for (auto &peer_thread : peer_threads_) {
if (peer_thread.joinable()) peer_thread.join();
@ -449,6 +452,8 @@ void RaftServer::PeerThreadMain(int peer_id) {
});
next_heartbeat_[peer_id] =
Clock::now() + config_.heartbeat_interval;
state_changed_.notify_all();
continue;
}
wait_until = next_heartbeat_[peer_id];
break;

View File

@ -55,7 +55,11 @@ class RaftServer final : public RaftInterface {
const Config &config, raft::Coordination *coordination,
std::function<void(void)> reset_callback);
~RaftServer();
/// Starts the RPC servers and starts mechanisms inside Raft protocol.
void Start();
/// Stops all threads responsible for the Raft protocol.
void Shutdown();
/// Retrieves the current term from persistent storage.
///