Raft election

Summary: Implemented leader election part of raft protocol

Reviewers: mislav.bradac, buda, teon.banek, dgleich

Reviewed By: mislav.bradac

Subscribers: dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D966
This commit is contained in:
Marin Tomic 2017-11-07 18:21:05 +01:00
parent 1e0ac8ab8f
commit 41f868319d
8 changed files with 739 additions and 7 deletions

View File

@ -187,6 +187,7 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
${src_dir}/communication/bolt/v1/session.cpp
${src_dir}/communication/reactor/protocol.cpp
${src_dir}/communication/raft/raft.cpp
${src_dir}/communication/reactor/reactor_local.cpp
${src_dir}/communication/reactor/reactor_distributed.cpp
${src_dir}/data_structures/concurrent/skiplist_gc.cpp

View File

@ -0,0 +1,277 @@
#include "raft.hpp"
#include <iostream>
#include "fmt/format.h"
#include "glog/logging.h"
#include "communication/raft/raft_network.hpp"
using std::experimental::optional;
using std::placeholders::_1;
using std::placeholders::_2;
using namespace communication::reactor;
using namespace std::chrono_literals;
namespace communication::raft {
RaftMember::RaftMember(System &system, const std::string &id,
const RaftConfig &config, RaftNetworkInterface &network)
: id_(id),
system_(system),
config_(config),
network_(network),
mode_(Mode::FOLLOWER),
leader_watchdog_(config_.leader_timeout_min, config_.leader_timeout_max,
[this]() {
auto channel = system_.FindChannel(id_, "main");
if (channel) {
channel->Send<MLeaderTimeout>();
}
}),
heartbeat_watchdog_(
config_.heartbeat_interval, config_.heartbeat_interval,
[this]() {
for (const auto &member : config_.members) {
if (id_ != member) {
network_.AppendEntries(member, MAppendEntries(term_, id_));
}
}
},
true) {
system.Spawn(id, [this](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEvent<MLeaderTimeout>([this](
const MLeaderTimeout &, const Subscription &) { RunElection(); });
stream->OnEvent<MRequestVote>(
[this](const MRequestVote &req, const Subscription &) {
network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
});
stream->OnEvent<MRequestVoteReply>(
[this](const MRequestVoteReply &req, const Subscription &) {
OnRequestVoteReply(req);
});
stream->OnEvent<MAppendEntries>(
[this](const MAppendEntries &req, const Subscription &) {
network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
});
stream->OnEvent<MAppendEntriesReply>(
[this](const MAppendEntriesReply &rep, const Subscription &) {
OnAppendEntriesReply(rep);
});
stream->OnEvent<MShutdown>([&r](const MShutdown &, const Subscription &) {
r.CloseChannel("main");
});
});
}
RaftMember::~RaftMember() {
LogInfo("Shutting down...");
auto channel = system_.FindChannel(id_, "main");
if (channel) {
channel->Send<MShutdown>();
}
}
template <class... Args>
void RaftMember::LogInfo(const std::string &format, Args &&... args) {
LOG(INFO) << fmt::format("(node = {}, term = {}) ", id_, term_)
<< fmt::format(format, std::forward<Args>(args)...) << std::endl;
}
void RaftMember::TransitionToFollower() {
/* Stop sending heartbeats to followers, start listening for them. */
heartbeat_watchdog_.Block();
leader_watchdog_.Unblock();
mode_ = Mode::FOLLOWER;
}
void RaftMember::TransitionToCandidate() {
/* This transition only happens if we are in follower or candidate mode.
* Leader timeout watchdog is not blocked because it also triggers new
* elections if the current one times out. */
DCHECK(mode_ != Mode::LEADER)
<< "Transition to candidate mode from leader mode.";
mode_ = Mode::CANDIDATE;
votes_ = {};
}
void RaftMember::TransitionToLeader() {
/* This transition only happens if we are in candidate mode. */
DCHECK(mode_ == Mode::CANDIDATE)
<< "Transition to leader mode from leader or follower mode.";
/* Stop listening for leader heartbeats, start sending them. */
leader_watchdog_.Block();
heartbeat_watchdog_.Unblock();
mode_ = Mode::LEADER;
leader_ = id_;
}
void RaftMember::UpdateTerm(int new_term) {
term_ = new_term;
voted_for_ = {};
leader_ = {};
}
void RaftMember::RunElection() {
/* Elections will be skipped if we believe we are the leader. This can happen
* if leader timeout message was delayed for some reason. */
if (mode_ == Mode::LEADER) {
return;
}
LogInfo("Running for leader.");
/* [Raft paper, Section 5.2.]
* "To begin an election, a follower increments its current term and
* transitions to candidate state. It then votes for itself and issues
* RequestVote RPCs in parallel to each of the other servers in
* the cluster." */
TransitionToCandidate();
UpdateTerm(term_ + 1);
voted_for_ = id_;
votes_.insert(id_);
for (const auto &member_id : config_.members) {
if (member_id != id_) {
network_.RequestVote(member_id, MRequestVote(term_, id_));
}
}
}
MRequestVoteReply RaftMember::OnRequestVote(const MRequestVote &req) {
LogInfo("Vote requested from {}, candidate_term = {}", req.sender_id,
req.term);
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (req.term > term_) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Figure 2]
* "Reply false if term < currentTerm." */
if (req.term < term_) {
return MRequestVoteReply(term_, id_, false);
}
/* [Raft paper, Figure 2]
* "If votedFor is null or candidateId, and candidate's log is at least as
* up-to-date as receiver's log, grant vote." */
if (voted_for_ && *voted_for_ != req.sender_id) {
return MRequestVoteReply(term_, id_, false);
}
LogInfo("Granting vote to {}.", req.sender_id);
voted_for_ = req.sender_id;
/* [Raft paper, Section 5.2]
* "A server remains in follower state as long as it receives valid RPCs from
* a leader or candidate." */
leader_watchdog_.Notify();
DCHECK(mode_ != Mode::LEADER) << "Granted vote as a leader.";
return MRequestVoteReply(term_, id_, true);
}
void RaftMember::OnRequestVoteReply(const MRequestVoteReply &rep) {
/* Ignore leftover messages from old elections. */
if (mode_ != Mode::CANDIDATE || rep.term < term_) {
return;
}
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (rep.term > term_) {
LogInfo(
"Vote denied from {} with greater term {}, transitioning to follower "
"mode.",
rep.sender_id, rep.term);
TransitionToFollower();
UpdateTerm(rep.term);
return;
}
if (!rep.success) {
LogInfo("Vote rejected from {}.", rep.sender_id);
return;
}
LogInfo("Vote granted from {}.", rep.sender_id);
votes_.insert(rep.sender_id);
if (2 * votes_.size() > config_.members.size()) {
LogInfo("Elected as leader.");
TransitionToLeader();
}
}
MAppendEntriesReply RaftMember::OnAppendEntries(const MAppendEntries &req) {
LogInfo("Append entries from {}.", req.sender_id);
/* [Raft paper, Section 5.1]
* "If a server receives a request with a stale term number, it rejects
* the request." */
if (req.term < term_) {
return MAppendEntriesReply(term_, id_, false);
}
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (req.term > term_) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Section 5.2]
* "While waiting for votes, a candidate may receive an AppendEntries RPC from
* another server claiming to be leader. If the leaders term (included in its
* RPC) is at least as large as the candidates current term, then the
* candidate recognizes the leader as legitimate and returns to follower
* state." */
if (req.term == term_ && mode_ == Mode::CANDIDATE) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Section 5.2]
* "A server remains in follower state as long as it receives
* valid RPCs from a leader or candidate." */
leader_ = req.sender_id;
leader_watchdog_.Notify();
return MAppendEntriesReply(term_, id_, true);
}
void RaftMember::OnAppendEntriesReply(const MAppendEntriesReply &rep) {
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (rep.term > term_) {
TransitionToFollower();
UpdateTerm(rep.term);
}
}
} // namespace communication::raft

View File

@ -0,0 +1,68 @@
#pragma once
#include <chrono>
#include <experimental/optional>
#include <set>
#include <vector>
#include "communication/reactor/reactor_local.hpp"
#include "utils/watchdog.hpp"
namespace communication::raft {
struct MAppendEntries;
struct MAppendEntriesReply;
struct MRequestVote;
struct MRequestVoteReply;
class RaftNetworkInterface;
struct RaftConfig {
std::vector<std::string> members;
std::chrono::milliseconds leader_timeout_min;
std::chrono::milliseconds leader_timeout_max;
std::chrono::milliseconds heartbeat_interval;
};
class RaftMember {
public:
RaftMember(communication::reactor::System &system, const std::string &id,
const RaftConfig &config, RaftNetworkInterface &network);
virtual ~RaftMember();
protected:
std::string id_;
std::experimental::optional<std::string> leader_;
private:
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
communication::reactor::System &system_;
RaftConfig config_;
RaftNetworkInterface &network_;
Mode mode_;
uint64_t term_ = 0;
std::experimental::optional<std::string> voted_for_;
std::set<std::string> votes_;
Watchdog leader_watchdog_;
Watchdog heartbeat_watchdog_;
void RunElection();
void TransitionToFollower();
void TransitionToCandidate();
void TransitionToLeader();
void UpdateTerm(int new_term);
MRequestVoteReply OnRequestVote(const MRequestVote &);
void OnRequestVoteReply(const MRequestVoteReply &);
MAppendEntriesReply OnAppendEntries(const MAppendEntries &);
void OnAppendEntriesReply(const MAppendEntriesReply &);
template <class... Args>
void LogInfo(const std::string &, Args &&...);
};
} // namespace communication::raft

View File

@ -0,0 +1,161 @@
#pragma once
#include <mutex>
#include "communication/reactor/reactor_local.hpp"
namespace communication::raft {
struct MLeaderTimeout : public communication::reactor::Message {};
struct MShutdown : public communication::reactor::Message {};
struct RaftMessage : public communication::reactor::Message {
RaftMessage(int term, const std::string &sender_id)
: term(term), sender_id(sender_id) {}
int term;
std::string sender_id;
};
struct RaftMessageReply : public RaftMessage {
RaftMessageReply(int term, const std::string &sender_id, bool success)
: RaftMessage(term, sender_id), success(success) {}
std::experimental::optional<std::string> leader;
bool success;
};
struct MRequestVote : public RaftMessage {
MRequestVote(int candidate_term, const std::string &candidate_id)
: RaftMessage(candidate_term, candidate_id) {}
};
struct MRequestVoteReply : public RaftMessageReply {
MRequestVoteReply(int term, const std::string &sender_id, bool vote_granted)
: RaftMessageReply(term, sender_id, vote_granted) {}
};
struct MAppendEntries : public RaftMessage {
MAppendEntries(int term, const std::string &sender_id)
: RaftMessage(term, sender_id) {}
};
struct MAppendEntriesReply : public RaftMessageReply {
MAppendEntriesReply(int term, const std::string &sender_id, bool success)
: RaftMessageReply(term, sender_id, success) {}
};
class RaftNetworkInterface {
public:
virtual ~RaftNetworkInterface() {}
virtual bool RequestVote(const std::string &recipient,
const MRequestVote &msg) = 0;
virtual bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) = 0;
virtual bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) = 0;
virtual bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) = 0;
};
class LocalReactorNetworkInterface : public RaftNetworkInterface {
public:
explicit LocalReactorNetworkInterface(communication::reactor::System &system)
: system_(system) {}
bool RequestVote(const std::string &recipient,
const MRequestVote &msg) override {
return SendMessage(recipient, msg);
}
bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) override {
return SendMessage(recipient, msg);
}
private:
template <class TMessage>
bool SendMessage(const std::string &recipient, const TMessage &msg) {
auto channel = system_.FindChannel(recipient, "main");
if (!channel) {
return false;
}
channel->Send<TMessage>(msg);
return true;
}
communication::reactor::System &system_;
};
class FakeNetworkInterface : public RaftNetworkInterface {
public:
explicit FakeNetworkInterface(communication::reactor::System &system) : system_(system) {}
bool RequestVote(const std::string &recipient,
const MRequestVote &msg) override {
return SendMessage(recipient, msg);
}
bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) override {
return SendMessage(recipient, msg);
}
void Disconnect(const std::string &id) {
std::lock_guard<std::mutex> guard(mutex_);
status_[id] = false;
}
void Connect(const std::string &id) {
std::lock_guard<std::mutex> guard(mutex_);
status_[id] = true;
}
private:
template <class TMessage>
bool SendMessage(const std::string &recipient, const TMessage &msg) {
bool ok;
{
std::lock_guard<std::mutex> guard(mutex_);
ok = status_[msg.sender_id] && status_[recipient];
}
if (ok) {
auto channel = system_.FindChannel(recipient, "main");
if (!channel) {
ok = false;
} else {
channel->Send<TMessage>(msg);
}
}
return ok;
}
communication::reactor::System &system_;
std::mutex mutex_;
std::unordered_map<std::string, bool> status_;
};
} // namespace communication::raft

View File

@ -13,14 +13,15 @@ using std::chrono::steady_clock;
Watchdog::Watchdog(const milliseconds &min_timeout,
const milliseconds &max_timeout,
const std::function<void()> &callback)
const std::function<void()> &callback,
bool blocked)
: min_timeout_(min_timeout),
max_timeout_(max_timeout),
generator_(std::random_device{}()),
distribution_(min_timeout.count(), max_timeout_.count()),
callback_(callback),
draining_(false),
blocked_(false) {
blocked_(blocked) {
DCHECK(min_timeout_ <= max_timeout_)
<< "Min timeout should be less than max timeout";
Notify();

View File

@ -9,16 +9,17 @@
#include <thread>
/**
* @brief - Keeps track of how long it's been since `Notify` method was
* called. If it wasn't called for a sufficiently long time interval (randomly
* chosen between `min_timeout` and `max_timeout`), the watchdog will
* periodically call `callback` until it is notified or destroyed.
* @brief - Keeps track of how long it's been since `Notify` method was called.
* If it wasn't called for a sufficiently long time interval (randomly chosen
* between `min_timeout` and `max_timeout`), the watchdog will periodically call
* `callback` until it is notified or destroyed. If `blocked` is set to true,
* watchdog will be blocked on startup.
*/
class Watchdog {
public:
Watchdog(const std::chrono::milliseconds &min_timeout,
const std::chrono::milliseconds &max_timeout,
const std::function<void()> &callback);
const std::function<void()> &callback, bool blocked = false);
~Watchdog();
Watchdog(Watchdog &&) = delete;
Watchdog(const Watchdog &) = delete;

View File

@ -0,0 +1,142 @@
#include <algorithm>
#include "fmt/format.h"
#include "glog/logging.h"
#include "communication/raft/raft.hpp"
#include "communication/raft/raft_network.hpp"
using std::chrono::milliseconds;
using std::experimental::optional;
using namespace communication::raft;
using namespace communication::reactor;
using namespace std::chrono_literals;
class RaftMemberTest : RaftMember {
public:
std::string Id() const { return id_; }
optional<std::string> Leader() const { return leader_; }
using RaftMember::RaftMember;
};
milliseconds InitialElection(const RaftConfig &config) {
System sys;
FakeNetworkInterface network(sys);
std::chrono::system_clock::time_point start, end;
LOG(INFO) << "Starting..." << std::endl;
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
start = std::chrono::system_clock::now();
for (const auto &member_id : config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, config, network));
network.Connect(member_id);
}
bool leader_elected = false;
do {
for (const auto &member : members) {
if (member->Leader()) {
leader_elected = true;
break;
}
}
} while (!leader_elected);
end = std::chrono::system_clock::now();
}
sys.AwaitShutdown();
return std::chrono::duration_cast<milliseconds>(end - start);
}
milliseconds Reelection(const RaftConfig &config) {
System sys;
FakeNetworkInterface network(sys);
std::chrono::system_clock::time_point start, end;
LOG(INFO) << "Starting..." << std::endl;
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
for (const auto &member_id : config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, config, network));
network.Connect(member_id);
}
bool leader_elected = false;
std::string first_leader;
do {
for (const auto &member : members) {
if (member->Leader()) {
leader_elected = true;
first_leader = *member->Leader();
break;
}
}
} while (!leader_elected);
// Let leader notify followers
std::this_thread::sleep_for(config.heartbeat_interval);
start = std::chrono::system_clock::now();
network.Disconnect(first_leader);
leader_elected = false;
do {
for (const auto &member : members) {
if (member->Leader() && *member->Leader() != first_leader) {
leader_elected = true;
break;
}
}
} while (!leader_elected);
end = std::chrono::system_clock::now();
}
sys.AwaitShutdown();
return std::chrono::duration_cast<milliseconds>(end - start);
}
std::vector<milliseconds> RunTest(const std::string &name,
const std::function<milliseconds()> &test,
const int runs) {
std::vector<milliseconds> results(runs);
for (int i = 0; i < runs; ++i) {
results[i] = test();
}
sort(results.begin(), results.end());
fmt::print("{} : min = {}ms, max = {}ms, median = {}ms\n", name,
results[0].count(), results[runs - 1].count(),
results[(runs - 1) / 2].count());
return results;
}
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
RaftConfig config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
const int RUNS = 100;
auto InitialElectionTest = [config]() { return InitialElection(config); };
auto ReelectionTest = [config]() { return Reelection(config); };
RunTest("InitialElection", InitialElectionTest, RUNS);
RunTest("ReelectionTest", ReelectionTest, RUNS);
return 0;
}

81
tests/unit/raft.cpp Normal file
View File

@ -0,0 +1,81 @@
#include "gtest/gtest.h"
#include <chrono>
#include <thread>
#include "communication/raft/raft.hpp"
#include "communication/raft/raft_network.hpp"
#include "communication/reactor/reactor_local.hpp"
using namespace std::chrono_literals;
using namespace communication::raft;
class RaftMemberTest : RaftMember {
public:
std::string Id() { return id_; }
std::string Leader() { return *leader_; }
using RaftMember::RaftMember;
};
const RaftConfig test_config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
TEST(Raft, InitialElection) {
communication::reactor::System sys;
FakeNetworkInterface network(sys);
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
for (const auto &member_id : test_config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
network.Connect(member_id);
}
std::this_thread::sleep_for(500ms);
std::string leader = members[0]->Leader();
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), leader);
}
}
sys.AwaitShutdown();
}
TEST(Raft, Reelection) {
communication::reactor::System sys;
FakeNetworkInterface network(sys);
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
for (const auto &member_id : test_config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
network.Connect(member_id);
}
std::this_thread::sleep_for(500ms);
std::string first_leader = members[0]->Leader();
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), first_leader);
}
network.Disconnect(first_leader);
std::this_thread::sleep_for(500ms);
std::string second_leader = members[0]->Id() == first_leader
? members[1]->Leader()
: members[0]->Leader();
network.Connect(first_leader);
std::this_thread::sleep_for(100ms);
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), second_leader);
}
}
sys.AwaitShutdown();
}