Remove raft from the codebase
Summary: The raft implementation has been stale for a while now. It doesn't compile, nor uses Cap'n Proto for serialization. In the future we would probably rewrite it, so it doesn't need to be part of the repo at this moment. Reviewers: mferencevic, mtomic, buda Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1513
This commit is contained in:
parent
7262f7c5c1
commit
8d934ed801
@ -1,23 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "communication/raft/raft.hpp"
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
enum class RpcType { REQUEST_VOTE, APPEND_ENTRIES };
|
||||
|
||||
template <class State>
|
||||
struct PeerRpcRequest {
|
||||
RpcType type;
|
||||
RequestVoteRequest request_vote;
|
||||
AppendEntriesRequest<State> append_entries;
|
||||
};
|
||||
|
||||
struct PeerRpcReply {
|
||||
RpcType type;
|
||||
RequestVoteReply request_vote;
|
||||
AppendEntriesReply append_entries;
|
||||
};
|
||||
|
||||
} // namespace communication::raft
|
@ -1,699 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "fmt/format.h"
|
||||
#include "glog/logging.h"
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
namespace impl {
|
||||
|
||||
template <class State>
|
||||
RaftMemberImpl<State>::RaftMemberImpl(RaftNetworkInterface<State> &network,
|
||||
RaftStorageInterface<State> &storage,
|
||||
const MemberId &id,
|
||||
const RaftConfig &config)
|
||||
: network_(network), storage_(storage), id_(id), config_(config) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
tie(term_, voted_for_) = storage_.GetTermAndVotedFor();
|
||||
|
||||
for (const auto &peer_id : config_.members) {
|
||||
peer_states_[peer_id] = std::make_unique<RaftPeerState>();
|
||||
}
|
||||
|
||||
SetElectionTimer();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
RaftMemberImpl<State>::~RaftMemberImpl() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::Stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (!exiting_) {
|
||||
LogInfo("Stopping...");
|
||||
exiting_ = true;
|
||||
}
|
||||
}
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
template <class... Args>
|
||||
void RaftMemberImpl<State>::LogInfo(const std::string &format,
|
||||
Args &&... args) {
|
||||
LOG(INFO) << fmt::format("[id = {}, term = {}] {}", id_, term_,
|
||||
fmt::format(format, std::forward<Args>(args)...))
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::TimerThreadMain() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
while (!exiting_) {
|
||||
if (Clock::now() >= next_election_time_) {
|
||||
StartNewElection();
|
||||
}
|
||||
state_changed_.wait_until(lock, next_election_time_);
|
||||
}
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::PeerThreadMain(std::string peer_id) {
|
||||
RaftPeerState &peer_state = *peer_states_[peer_id];
|
||||
|
||||
LogInfo("Peer thread started for {}", peer_id);
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
/* This loop will either call a function that issues an RPC or wait on the
|
||||
* condition variable. It must not do both! Lock on `mutex_` is released while
|
||||
* waiting for RPC response, which might cause us to miss a notification on
|
||||
* `state_changed_` conditional variable and wait indefinitely. The safest
|
||||
* thing to do is to assume some important part of state was modified while we
|
||||
* were waiting for the response and loop around to check. */
|
||||
while (!exiting_) {
|
||||
TimePoint now = Clock::now();
|
||||
TimePoint wait_until;
|
||||
|
||||
if (mode_ != RaftMode::FOLLOWER && peer_state.backoff_until > now) {
|
||||
wait_until = peer_state.backoff_until;
|
||||
} else {
|
||||
switch (mode_) {
|
||||
case RaftMode::FOLLOWER:
|
||||
wait_until = TimePoint::max();
|
||||
break;
|
||||
case RaftMode::CANDIDATE:
|
||||
if (!peer_state.request_vote_done) {
|
||||
RequestVote(peer_id, peer_state, lock);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
case RaftMode::LEADER:
|
||||
if (peer_state.next_index <= storage_.GetLastLogIndex() ||
|
||||
now >= peer_state.next_heartbeat_time) {
|
||||
AppendEntries(peer_id, peer_state, lock);
|
||||
continue;
|
||||
} else {
|
||||
wait_until = peer_state.next_heartbeat_time;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
state_changed_.wait_until(lock, wait_until);
|
||||
}
|
||||
|
||||
LogInfo("Peer thread exiting for {}", peer_id);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::CandidateOrLeaderTransitionToFollower() {
|
||||
DCHECK(mode_ != RaftMode::FOLLOWER)
|
||||
<< "`CandidateOrLeaderTransitionToFollower` called from follower mode";
|
||||
mode_ = RaftMode::FOLLOWER;
|
||||
leader_ = {};
|
||||
SetElectionTimer();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::CandidateTransitionToLeader() {
|
||||
DCHECK(mode_ == RaftMode::CANDIDATE)
|
||||
<< "`CandidateTransitionToLeader` called while not in candidate mode";
|
||||
mode_ = RaftMode::LEADER;
|
||||
leader_ = id_;
|
||||
|
||||
/* We don't want to trigger elections while in leader mode. */
|
||||
next_election_time_ = TimePoint::max();
|
||||
|
||||
/* [Raft thesis, Section 6.4]
|
||||
* "The Leader Completeness Property guarantees that a leader has all
|
||||
* committed entries, but at the start of its term, it may not know which
|
||||
* those are. To find out, it needs to commit an entry from its term. Raft
|
||||
* handles this by having each leader commit a blank no-op entry into the log
|
||||
* at the start of its term. As soon as this no-op entry is committed, the
|
||||
* leader’s commit index will be at least as large as any other servers’
|
||||
* during its term." */
|
||||
LogEntry<State> entry;
|
||||
entry.term = term_;
|
||||
entry.command = std::experimental::nullopt;
|
||||
storage_.AppendLogEntry(entry);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
bool RaftMemberImpl<State>::CandidateOrLeaderNoteTerm(const TermId new_term) {
|
||||
DCHECK(mode_ != RaftMode::FOLLOWER)
|
||||
<< "`CandidateOrLeaderNoteTerm` called from follower mode";
|
||||
/* [Raft thesis, Section 3.3]
|
||||
* "Current terms are exchanged whenever servers communicate; if one server's
|
||||
* current term is smaller than the other's, then it updates its current term
|
||||
* to the larger value. If a candidate or leader discovers that its term is
|
||||
* out of date, it immediately reverts to follower state." */
|
||||
if (term_ < new_term) {
|
||||
UpdateTermAndVotedFor(new_term, {});
|
||||
CandidateOrLeaderTransitionToFollower();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::UpdateTermAndVotedFor(
|
||||
const TermId new_term,
|
||||
const std::experimental::optional<MemberId> &new_voted_for) {
|
||||
term_ = new_term;
|
||||
voted_for_ = new_voted_for;
|
||||
leader_ = {};
|
||||
|
||||
storage_.WriteTermAndVotedFor(term_, voted_for_);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::SetElectionTimer() {
|
||||
/* [Raft thesis, section 3.4]
|
||||
* "Raft uses randomized election timeouts to ensure that split votes are rare
|
||||
* and that they are resolved quickly. To prevent split votes in the first
|
||||
* place, election timeouts are chosen randomly from a fixed interval (e.g.,
|
||||
* 150-300 ms)." */
|
||||
std::uniform_int_distribution<uint64_t> distribution(
|
||||
config_.leader_timeout_min.count(), config_.leader_timeout_max.count());
|
||||
Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_));
|
||||
next_election_time_ = Clock::now() + wait_interval;
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::StartNewElection() {
|
||||
LogInfo("Starting new election");
|
||||
/* [Raft thesis, section 3.4]
|
||||
* "To begin an election, a follower increments its current term and
|
||||
* transitions to candidate state. It then votes for itself and issues
|
||||
* RequestVote RPCs in parallel to each of the other servers in the cluster."
|
||||
*/
|
||||
UpdateTermAndVotedFor(term_ + 1, id_);
|
||||
mode_ = RaftMode::CANDIDATE;
|
||||
|
||||
/* [Raft thesis, section 3.4]
|
||||
* "Each candidate restarts its randomized election timeout at the start of an
|
||||
* election, and it waits for that timeout to elapse before starting the next
|
||||
* election; this reduces the likelihood of another split vote in the new
|
||||
* election." */
|
||||
SetElectionTimer();
|
||||
|
||||
for (const auto &peer_id : config_.members) {
|
||||
if (peer_id == id_) {
|
||||
continue;
|
||||
}
|
||||
auto &peer_state = peer_states_[peer_id];
|
||||
peer_state->request_vote_done = false;
|
||||
peer_state->voted_for_me = false;
|
||||
peer_state->match_index = 0;
|
||||
peer_state->next_index = storage_.GetLastLogIndex() + 1;
|
||||
|
||||
/* [Raft thesis, section 3.5]
|
||||
* "Until the leader has discovered where it and the follower's logs match,
|
||||
* the leader can send AppendEntries with no entries (like heartbeats) to
|
||||
* save bandwidth. Then, once the matchIndex immediately precedes the
|
||||
* nextIndex, the leader should begin to send the actual entries." */
|
||||
peer_state->suppress_log_entries = true;
|
||||
|
||||
/* [Raft thesis, section 3.4]
|
||||
* "Once a candidate wins an election, it becomes leader. It then sends
|
||||
* heartbeat messages to all of the other servers to establish its authority
|
||||
* and prevent new elections."
|
||||
*
|
||||
* This will make newly elected leader send heartbeats immediately.
|
||||
*/
|
||||
peer_state->next_heartbeat_time = TimePoint::min();
|
||||
peer_state->backoff_until = TimePoint::min();
|
||||
}
|
||||
|
||||
// We already have the majority if we're in a single node cluster.
|
||||
if (CountVotes()) {
|
||||
LogInfo("Elected as leader.");
|
||||
CandidateTransitionToLeader();
|
||||
}
|
||||
|
||||
/* Notify peer threads to start issuing RequestVote RPCs. */
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
bool RaftMemberImpl<State>::CountVotes() {
|
||||
DCHECK(mode_ == RaftMode::CANDIDATE)
|
||||
<< "`CountVotes` should only be called from candidate mode";
|
||||
int num_votes = 0;
|
||||
for (const auto &peer_id : config_.members) {
|
||||
if (peer_id == id_ || peer_states_[peer_id]->voted_for_me) {
|
||||
num_votes++;
|
||||
}
|
||||
}
|
||||
|
||||
return 2 * num_votes > config_.members.size();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::RequestVote(const std::string &peer_id,
|
||||
RaftPeerState &peer_state,
|
||||
std::unique_lock<std::mutex> &lock) {
|
||||
LogInfo("Requesting vote from {}", peer_id);
|
||||
|
||||
RequestVoteRequest request;
|
||||
request.candidate_term = term_;
|
||||
request.candidate_id = id_;
|
||||
request.last_log_index = storage_.GetLastLogIndex();
|
||||
request.last_log_term = storage_.GetLogTerm(request.last_log_index);
|
||||
|
||||
RequestVoteReply reply;
|
||||
|
||||
/* Release lock before issuing RPC and waiting for response. */
|
||||
/* TODO(mtomic): Revise how this will work with RPC cancellation. */
|
||||
lock.unlock();
|
||||
bool ok = network_.SendRequestVote(peer_id, request, reply);
|
||||
lock.lock();
|
||||
|
||||
/* TODO(mtomic): Maybe implement exponential backoff. */
|
||||
if (!ok) {
|
||||
peer_state.backoff_until = Clock::now() + config_.rpc_backoff;
|
||||
return;
|
||||
}
|
||||
|
||||
if (term_ != request.candidate_term || mode_ != RaftMode::CANDIDATE ||
|
||||
exiting_) {
|
||||
LogInfo("Ignoring RequestVote RPC reply from {}", peer_id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (CandidateOrLeaderNoteTerm(reply.term)) {
|
||||
state_changed_.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
DCHECK(reply.term == term_) << "Stale RequestVote RPC reply";
|
||||
|
||||
peer_state.request_vote_done = true;
|
||||
|
||||
if (reply.vote_granted) {
|
||||
peer_state.voted_for_me = true;
|
||||
LogInfo("Got vote from {}", peer_id);
|
||||
|
||||
if (CountVotes()) {
|
||||
LogInfo("Elected as leader.");
|
||||
CandidateTransitionToLeader();
|
||||
}
|
||||
} else {
|
||||
LogInfo("Vote denied from {}", peer_id);
|
||||
}
|
||||
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::AdvanceCommitIndex() {
|
||||
DCHECK(mode_ == RaftMode::LEADER)
|
||||
<< "`AdvanceCommitIndex` can only be called from leader mode";
|
||||
|
||||
std::vector<LogIndex> match_indices;
|
||||
for (const auto &peer : peer_states_) {
|
||||
match_indices.push_back(peer.second->match_index);
|
||||
}
|
||||
match_indices.push_back(storage_.GetLastLogIndex());
|
||||
std::sort(match_indices.begin(), match_indices.end(),
|
||||
std::greater<LogIndex>());
|
||||
LogIndex new_commit_index_ = match_indices[(config_.members.size() - 1) / 2];
|
||||
|
||||
LogInfo("Trying to advance commit index {} to {}", commit_index_,
|
||||
new_commit_index_);
|
||||
|
||||
/* This can happen because we reset `match_index` to 0 for every peer when
|
||||
* elected. */
|
||||
if (commit_index_ >= new_commit_index_) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* [Raft thesis, section 3.6.2]
|
||||
* (...) Raft never commits log entries from previous terms by counting
|
||||
* replicas. Only log entries from the leader's current term are committed by
|
||||
* counting replicas; once an entry from the current term has been committed
|
||||
* in this way, then all prior entries are committed indirectly because of the
|
||||
* Log Matching Property." */
|
||||
if (storage_.GetLogTerm(new_commit_index_) != term_) {
|
||||
LogInfo("Cannot commit log entry from previous term");
|
||||
return;
|
||||
}
|
||||
|
||||
commit_index_ = std::max(commit_index_, new_commit_index_);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
void RaftMemberImpl<State>::AppendEntries(const std::string &peer_id,
|
||||
RaftPeerState &peer_state,
|
||||
std::unique_lock<std::mutex> &lock) {
|
||||
LogInfo("Appending entries to {}", peer_id);
|
||||
|
||||
AppendEntriesRequest<State> request;
|
||||
request.leader_term = term_;
|
||||
request.leader_id = id_;
|
||||
|
||||
request.prev_log_index = peer_state.next_index - 1;
|
||||
request.prev_log_term = storage_.GetLogTerm(peer_state.next_index - 1);
|
||||
|
||||
if (!peer_state.suppress_log_entries &&
|
||||
peer_state.next_index <= storage_.GetLastLogIndex()) {
|
||||
request.entries = storage_.GetLogSuffix(peer_state.next_index);
|
||||
} else {
|
||||
request.entries = {};
|
||||
}
|
||||
|
||||
request.leader_commit = commit_index_;
|
||||
|
||||
AppendEntriesReply reply;
|
||||
|
||||
/* Release lock before issuing RPC and waiting for response. */
|
||||
/* TODO(mtomic): Revise how this will work with RPC cancellation. */
|
||||
lock.unlock();
|
||||
bool ok = network_.SendAppendEntries(peer_id, request, reply);
|
||||
lock.lock();
|
||||
|
||||
/* TODO(mtomic): Maybe implement exponential backoff. */
|
||||
if (!ok) {
|
||||
/* There is probably something wrong with this peer, let's avoid sending log
|
||||
* entries. */
|
||||
peer_state.suppress_log_entries = true;
|
||||
peer_state.backoff_until = Clock::now() + config_.rpc_backoff;
|
||||
return;
|
||||
}
|
||||
|
||||
if (term_ != request.leader_term || exiting_) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (CandidateOrLeaderNoteTerm(reply.term)) {
|
||||
state_changed_.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
DCHECK(mode_ == RaftMode::LEADER)
|
||||
<< "Elected leader for term should never change";
|
||||
DCHECK(reply.term == term_) << "Got stale AppendEntries reply";
|
||||
|
||||
if (reply.success) {
|
||||
/* We've found a match, we can start sending log entries. */
|
||||
peer_state.suppress_log_entries = false;
|
||||
|
||||
LogIndex new_match_index = request.prev_log_index + request.entries.size();
|
||||
DCHECK(peer_state.match_index <= new_match_index)
|
||||
<< "`match_index` should increase monotonically within a term";
|
||||
peer_state.match_index = new_match_index;
|
||||
AdvanceCommitIndex();
|
||||
peer_state.next_index = peer_state.match_index + 1;
|
||||
peer_state.next_heartbeat_time = Clock::now() + config_.heartbeat_interval;
|
||||
} else {
|
||||
DCHECK(peer_state.next_index > 1)
|
||||
<< "Log replication should not fail for first log entry.";
|
||||
--peer_state.next_index;
|
||||
}
|
||||
|
||||
state_changed_.notify_all();
|
||||
}
|
||||
|
||||
template <class State>
|
||||
RequestVoteReply RaftMemberImpl<State>::OnRequestVote(
|
||||
const RequestVoteRequest &request) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
LogInfo("RequestVote RPC request from {}", request.candidate_id);
|
||||
|
||||
RequestVoteReply reply;
|
||||
|
||||
/* [Raft thesis, Section 3.3]
|
||||
* "If a server receives a request with a stale term number, it rejects the
|
||||
* request." */
|
||||
if (request.candidate_term < term_) {
|
||||
reply.term = term_;
|
||||
reply.vote_granted = false;
|
||||
return reply;
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.3]
|
||||
* "Current terms are exchanged whenever servers communicate; if one server's
|
||||
* current term is smaller than the other's, then it updates its current term
|
||||
* to the larger value. If a candidate or leader discovers that its term is
|
||||
* out of date, it immediately reverts to follower state." */
|
||||
if (request.candidate_term > term_) {
|
||||
if (mode_ != RaftMode::FOLLOWER) {
|
||||
CandidateOrLeaderTransitionToFollower();
|
||||
}
|
||||
UpdateTermAndVotedFor(request.candidate_term, {});
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.6.1]
|
||||
* "Raft uses the voting process to prevent a candidate from winning an
|
||||
* election unless its log contains all committed entries. (...) The
|
||||
* RequestVote RPC implements this restriction: the RPC includes information
|
||||
* about the candidate's log, and the voter denies its vote if its own log is
|
||||
* more up-to-date than that of the candidate. Raft determines which of two
|
||||
* logs is more up-to-date by comparing the index and term of the last entries
|
||||
* in the logs. If the logs have last entries with different terms, then the
|
||||
* log with the later term is more up-to-date. If the logs end with the same
|
||||
* term, then whichever log is longer is more up-to-date." */
|
||||
LogIndex my_last_log_index = storage_.GetLastLogIndex();
|
||||
TermId my_last_log_term = storage_.GetLogTerm(my_last_log_index);
|
||||
if (my_last_log_term > request.last_log_term ||
|
||||
(my_last_log_term == request.last_log_term &&
|
||||
my_last_log_index > request.last_log_index)) {
|
||||
reply.term = term_;
|
||||
reply.vote_granted = false;
|
||||
return reply;
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.4]
|
||||
* "Each server will vote for at most one candidate in a given term, on a
|
||||
* firstcome-first-served basis."
|
||||
*/
|
||||
|
||||
/* We voted for someone else in this term. */
|
||||
if (request.candidate_term == term_ && voted_for_ &&
|
||||
*voted_for_ != request.candidate_id) {
|
||||
reply.term = term_;
|
||||
reply.vote_granted = false;
|
||||
return reply;
|
||||
}
|
||||
|
||||
/* Now we know we will vote for this candidate, because it's term is at least
|
||||
* as big as ours and we haven't voted for anyone else. */
|
||||
UpdateTermAndVotedFor(request.candidate_term, request.candidate_id);
|
||||
|
||||
/* [Raft thesis, Section 3.4]
|
||||
* A server remains in follower state as long as it receives valid RPCs from a
|
||||
* leader or candidate. */
|
||||
SetElectionTimer();
|
||||
state_changed_.notify_all();
|
||||
|
||||
reply.term = request.candidate_term;
|
||||
reply.vote_granted = true;
|
||||
return reply;
|
||||
}
|
||||
|
||||
template <class State>
|
||||
AppendEntriesReply RaftMemberImpl<State>::OnAppendEntries(
|
||||
const AppendEntriesRequest<State> &request) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
LogInfo("AppendEntries RPC request from {}", request.leader_id);
|
||||
|
||||
AppendEntriesReply reply;
|
||||
|
||||
/* [Raft thesis, Section 3.3]
|
||||
* "If a server receives a request with a stale term number, it rejects the
|
||||
* request." */
|
||||
if (request.leader_term < term_) {
|
||||
reply.term = term_;
|
||||
reply.success = false;
|
||||
return reply;
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.3]
|
||||
* "Current terms are exchanged whenever servers communicate; if one server's
|
||||
* current term is smaller than the other's, then it updates its current term
|
||||
* to the larger value. If a candidate or leader discovers that its term is
|
||||
* out of date, it immediately reverts to follower state." */
|
||||
if (request.leader_term > term_) {
|
||||
if (mode_ != RaftMode::FOLLOWER) {
|
||||
CandidateOrLeaderTransitionToFollower();
|
||||
}
|
||||
UpdateTermAndVotedFor(request.leader_term, {});
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.4]
|
||||
* "While waiting for votes, a candidate may receive an AppendEntries RPC from
|
||||
* another server claiming to be leader. If the leader's term (included in its
|
||||
* RPC) is at least as large as the candidate's current term, then the
|
||||
* candidate recognizes the leader as legitimate and returns to follower
|
||||
* state." */
|
||||
if (mode_ == RaftMode::CANDIDATE && request.leader_term == term_) {
|
||||
CandidateOrLeaderTransitionToFollower();
|
||||
}
|
||||
|
||||
DCHECK(mode_ != RaftMode::LEADER)
|
||||
<< "Leader cannot accept `AppendEntries` RPC";
|
||||
DCHECK(term_ == request.leader_term) << "Term should be equal to request "
|
||||
"term when accepting `AppendEntries` "
|
||||
"RPC";
|
||||
|
||||
leader_ = request.leader_id;
|
||||
|
||||
/* [Raft thesis, Section 3.4]
|
||||
* A server remains in follower state as long as it receives valid RPCs from a
|
||||
* leader or candidate. */
|
||||
SetElectionTimer();
|
||||
state_changed_.notify_all();
|
||||
|
||||
/* [Raft thesis, Section 3.5]
|
||||
* "When sending an AppendEntries RPC, the leader includes the index and term
|
||||
* of the entry in its log that immediately precedes the new entries. If the
|
||||
* follower does not find an entry in its log with the same index and term,
|
||||
* then it refuses the new entries." */
|
||||
if (request.prev_log_index > storage_.GetLastLogIndex() ||
|
||||
storage_.GetLogTerm(request.prev_log_index) != request.prev_log_term) {
|
||||
reply.term = term_;
|
||||
reply.success = false;
|
||||
return reply;
|
||||
}
|
||||
|
||||
/* [Raft thesis, Section 3.5]
|
||||
* "To bring a follower's log into consistency with its own, the leader must
|
||||
* find the latest log entry where the two logs agree, delete any entries in
|
||||
* the follower's log after that point, and send the follower all of the
|
||||
* leader's entries after that point." */
|
||||
|
||||
/* Entry at `request.prev_log_index` is the last entry where ours and leader's
|
||||
* logs agree. It's time to replace the tail of the log with new entries from
|
||||
* the leader. We have to be careful here as duplicated AppendEntries RPCs
|
||||
* could cause data loss.
|
||||
*
|
||||
* There is a possibility that an old AppendEntries RPC is duplicated and
|
||||
* received after processing newer one. For example, leader appends entry 3
|
||||
* and then entry 4, but follower recieves entry 3, then entry 4, and then
|
||||
* entry 3 again. We have to be careful not to delete entry 4 from log when
|
||||
* processing the last RPC. */
|
||||
LogIndex index = request.prev_log_index;
|
||||
auto it = request.entries.begin();
|
||||
for (; it != request.entries.end(); ++it) {
|
||||
++index;
|
||||
if (index > storage_.GetLastLogIndex()) {
|
||||
break;
|
||||
}
|
||||
if (storage_.GetLogTerm(index) != it->term) {
|
||||
LogInfo("Truncating log suffix from index {}", index);
|
||||
DCHECK(commit_index_ < index)
|
||||
<< "Committed entries should never be truncated from the log";
|
||||
storage_.TruncateLogSuffix(index);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LogInfo("Appending {} out of {} logs from {}.", request.entries.end() - it,
|
||||
request.entries.size(), request.leader_id);
|
||||
|
||||
for (; it != request.entries.end(); ++it) {
|
||||
storage_.AppendLogEntry(*it);
|
||||
}
|
||||
|
||||
commit_index_ = std::max(commit_index_, request.leader_commit);
|
||||
|
||||
/* Let's bump election timer once again, we don't want to take down the leader
|
||||
* because of our long disk writes. */
|
||||
SetElectionTimer();
|
||||
state_changed_.notify_all();
|
||||
|
||||
reply.term = term_;
|
||||
reply.success = true;
|
||||
return reply;
|
||||
}
|
||||
|
||||
template <class State>
|
||||
ClientResult RaftMemberImpl<State>::AddCommand(
|
||||
const typename State::Change &command, bool blocking) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (mode_ != RaftMode::LEADER) {
|
||||
return ClientResult::NOT_LEADER;
|
||||
}
|
||||
|
||||
LogEntry<State> entry;
|
||||
entry.term = term_;
|
||||
entry.command = command;
|
||||
storage_.AppendLogEntry(entry);
|
||||
|
||||
// Entry is already replicated if this is a single node cluster.
|
||||
AdvanceCommitIndex();
|
||||
|
||||
state_changed_.notify_all();
|
||||
|
||||
if (!blocking) {
|
||||
return ClientResult::OK;
|
||||
}
|
||||
|
||||
LogIndex index = storage_.GetLastLogIndex();
|
||||
|
||||
while (!exiting_ && term_ == entry.term) {
|
||||
if (commit_index_ >= index) {
|
||||
return ClientResult::OK;
|
||||
}
|
||||
state_changed_.wait(lock);
|
||||
}
|
||||
|
||||
return ClientResult::NOT_LEADER;
|
||||
}
|
||||
|
||||
} // namespace impl
|
||||
|
||||
template <class State>
|
||||
RaftMember<State>::RaftMember(RaftNetworkInterface<State> &network,
|
||||
RaftStorageInterface<State> &storage,
|
||||
const MemberId &id, const RaftConfig &config)
|
||||
: network_(network), impl_(network, storage, id, config) {
|
||||
timer_thread_ =
|
||||
std::thread(&impl::RaftMemberImpl<State>::TimerThreadMain, &impl_);
|
||||
|
||||
for (const auto &peer_id : config.members) {
|
||||
if (peer_id != id) {
|
||||
peer_threads_.emplace_back(&impl::RaftMemberImpl<State>::PeerThreadMain,
|
||||
&impl_, peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
network_.Start(*this);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
RaftMember<State>::~RaftMember() {
|
||||
impl_.Stop();
|
||||
timer_thread_.join();
|
||||
|
||||
for (auto &peer_thread : peer_threads_) {
|
||||
peer_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
template <class State>
|
||||
ClientResult RaftMember<State>::AddCommand(
|
||||
const typename State::Change &command, bool blocking) {
|
||||
return impl_.AddCommand(command, blocking);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
RequestVoteReply RaftMember<State>::OnRequestVote(
|
||||
const RequestVoteRequest &request) {
|
||||
return impl_.OnRequestVote(request);
|
||||
}
|
||||
|
||||
template <class State>
|
||||
AppendEntriesReply RaftMember<State>::OnAppendEntries(
|
||||
const AppendEntriesRequest<State> &request) {
|
||||
return impl_.OnAppendEntries(request);
|
||||
}
|
||||
|
||||
} // namespace communication::raft
|
@ -1,277 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <experimental/optional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <random>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "boost/serialization/vector.hpp"
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "utils/serialization.hpp"
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
template <class State>
|
||||
class RaftMember;
|
||||
|
||||
enum class ClientResult { NOT_LEADER, OK };
|
||||
|
||||
using Clock = std::chrono::system_clock;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
using MemberId = std::string;
|
||||
using TermId = uint64_t;
|
||||
|
||||
using ClientId = uint64_t;
|
||||
using CommandId = uint64_t;
|
||||
|
||||
using LogIndex = uint64_t;
|
||||
|
||||
template <class State>
|
||||
struct LogEntry {
|
||||
int term;
|
||||
|
||||
std::experimental::optional<typename State::Change> command;
|
||||
|
||||
bool operator==(const LogEntry &rhs) const {
|
||||
return term == rhs.term && command == rhs.command;
|
||||
}
|
||||
bool operator!=(const LogEntry &rhs) const { return !(*this == rhs); }
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &term;
|
||||
ar &command;
|
||||
}
|
||||
};
|
||||
|
||||
/* Raft RPC requests and replies as described in [Raft thesis, Figure 3.1]. */
|
||||
struct RequestVoteRequest {
|
||||
TermId candidate_term;
|
||||
MemberId candidate_id;
|
||||
LogIndex last_log_index;
|
||||
TermId last_log_term;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &candidate_term;
|
||||
ar &candidate_id;
|
||||
ar &last_log_index;
|
||||
ar &last_log_term;
|
||||
}
|
||||
};
|
||||
|
||||
struct RequestVoteReply {
|
||||
TermId term;
|
||||
bool vote_granted;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &term;
|
||||
ar &vote_granted;
|
||||
}
|
||||
};
|
||||
|
||||
template <class State>
|
||||
struct AppendEntriesRequest {
|
||||
TermId leader_term;
|
||||
MemberId leader_id;
|
||||
LogIndex prev_log_index;
|
||||
TermId prev_log_term;
|
||||
std::vector<LogEntry<State>> entries;
|
||||
LogIndex leader_commit;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &leader_term;
|
||||
ar &leader_id;
|
||||
ar &prev_log_index;
|
||||
ar &prev_log_term;
|
||||
ar &entries;
|
||||
ar &leader_commit;
|
||||
}
|
||||
};
|
||||
|
||||
struct AppendEntriesReply {
|
||||
TermId term;
|
||||
bool success;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &term;
|
||||
ar &success;
|
||||
}
|
||||
};
|
||||
|
||||
template <class State>
|
||||
class RaftNetworkInterface {
|
||||
public:
|
||||
virtual ~RaftNetworkInterface() = default;
|
||||
|
||||
/* These function return false if RPC failed for some reason (e.g. cannot
|
||||
* establish connection or request cancelled). Otherwise
|
||||
* `reply` contains response from peer. */
|
||||
virtual bool SendRequestVote(const MemberId &recipient,
|
||||
const RequestVoteRequest &request,
|
||||
RequestVoteReply &reply) = 0;
|
||||
|
||||
virtual bool SendAppendEntries(const MemberId &recipient,
|
||||
const AppendEntriesRequest<State> &request,
|
||||
AppendEntriesReply &reply) = 0;
|
||||
|
||||
/* This will be called once the RaftMember is ready to start receiving RPCs.
|
||||
*/
|
||||
virtual void Start(RaftMember<State> &member) = 0;
|
||||
};
|
||||
|
||||
template <class State>
|
||||
class RaftStorageInterface {
|
||||
public:
|
||||
virtual ~RaftStorageInterface() = default;
|
||||
|
||||
virtual void WriteTermAndVotedFor(
|
||||
const TermId term,
|
||||
const std::experimental::optional<std::string> &voted_for) = 0;
|
||||
virtual std::pair<TermId, std::experimental::optional<MemberId>>
|
||||
GetTermAndVotedFor() = 0;
|
||||
virtual void AppendLogEntry(const LogEntry<State> &entry) = 0;
|
||||
virtual TermId GetLogTerm(const LogIndex index) = 0;
|
||||
virtual LogEntry<State> GetLogEntry(const LogIndex index) = 0;
|
||||
virtual std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) = 0;
|
||||
virtual LogIndex GetLastLogIndex() = 0;
|
||||
virtual void TruncateLogSuffix(const LogIndex index) = 0;
|
||||
};
|
||||
|
||||
struct RaftConfig {
|
||||
std::vector<MemberId> members;
|
||||
std::chrono::milliseconds leader_timeout_min;
|
||||
std::chrono::milliseconds leader_timeout_max;
|
||||
std::chrono::milliseconds heartbeat_interval;
|
||||
std::chrono::milliseconds rpc_backoff;
|
||||
};
|
||||
|
||||
namespace impl {
|
||||
|
||||
enum class RaftMode { FOLLOWER, CANDIDATE, LEADER };
|
||||
|
||||
struct RaftPeerState {
|
||||
bool request_vote_done;
|
||||
bool voted_for_me;
|
||||
LogIndex match_index;
|
||||
LogIndex next_index;
|
||||
bool suppress_log_entries;
|
||||
Clock::time_point next_heartbeat_time;
|
||||
Clock::time_point backoff_until;
|
||||
};
|
||||
|
||||
template <class State>
|
||||
class RaftMemberImpl {
|
||||
public:
|
||||
explicit RaftMemberImpl(RaftNetworkInterface<State> &network,
|
||||
RaftStorageInterface<State> &storage,
|
||||
const MemberId &id, const RaftConfig &config);
|
||||
|
||||
~RaftMemberImpl();
|
||||
|
||||
void Stop();
|
||||
|
||||
void TimerThreadMain();
|
||||
void PeerThreadMain(std::string peer_id);
|
||||
|
||||
void UpdateTermAndVotedFor(
|
||||
const TermId new_term,
|
||||
const std::experimental::optional<MemberId> &new_voted_for);
|
||||
void CandidateOrLeaderTransitionToFollower();
|
||||
void CandidateTransitionToLeader();
|
||||
bool CandidateOrLeaderNoteTerm(const TermId new_term);
|
||||
|
||||
void StartNewElection();
|
||||
void SetElectionTimer();
|
||||
bool CountVotes();
|
||||
void RequestVote(const MemberId &peer_id, RaftPeerState &peer_state,
|
||||
std::unique_lock<std::mutex> &lock);
|
||||
|
||||
void AdvanceCommitIndex();
|
||||
void AppendEntries(const MemberId &peer_id, RaftPeerState &peer_state,
|
||||
std::unique_lock<std::mutex> &lock);
|
||||
|
||||
RequestVoteReply OnRequestVote(const RequestVoteRequest &request);
|
||||
AppendEntriesReply OnAppendEntries(
|
||||
const AppendEntriesRequest<State> &request);
|
||||
|
||||
ClientResult AddCommand(const typename State::Change &command, bool blocking);
|
||||
|
||||
template <class... Args>
|
||||
void LogInfo(const std::string &, Args &&...);
|
||||
|
||||
RaftNetworkInterface<State> &network_;
|
||||
RaftStorageInterface<State> &storage_;
|
||||
|
||||
MemberId id_;
|
||||
RaftConfig config_;
|
||||
|
||||
TermId term_;
|
||||
RaftMode mode_ = RaftMode::FOLLOWER;
|
||||
std::experimental::optional<MemberId> voted_for_ = std::experimental::nullopt;
|
||||
std::experimental::optional<MemberId> leader_ = std::experimental::nullopt;
|
||||
|
||||
TimePoint next_election_time_;
|
||||
|
||||
LogIndex commit_index_ = 0;
|
||||
|
||||
bool exiting_ = false;
|
||||
|
||||
std::map<std::string, std::unique_ptr<RaftPeerState>> peer_states_;
|
||||
|
||||
/* This mutex protects all of the internal state. */
|
||||
std::mutex mutex_;
|
||||
|
||||
/* Used to notify waiting threads that some of the internal state has changed.
|
||||
* It is notified when following events occurr:
|
||||
* - mode change
|
||||
* - election start
|
||||
* - `next_election_time_` update on RPC from leader or candidate
|
||||
* - destructor is called
|
||||
* - `commit_index_` is advanced
|
||||
*/
|
||||
std::condition_variable state_changed_;
|
||||
|
||||
std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}());
|
||||
};
|
||||
|
||||
} // namespace impl
|
||||
|
||||
template <class State>
|
||||
class RaftMember final {
|
||||
public:
|
||||
explicit RaftMember(RaftNetworkInterface<State> &network,
|
||||
RaftStorageInterface<State> &storage, const MemberId &id,
|
||||
const RaftConfig &config);
|
||||
~RaftMember();
|
||||
|
||||
ClientResult AddCommand(const typename State::Change &command, bool blocking);
|
||||
|
||||
RequestVoteReply OnRequestVote(const RequestVoteRequest &request);
|
||||
AppendEntriesReply OnAppendEntries(
|
||||
const AppendEntriesRequest<State> &request);
|
||||
|
||||
private:
|
||||
RaftNetworkInterface<State> &network_;
|
||||
impl::RaftMemberImpl<State> impl_;
|
||||
|
||||
/* Timer thread for triggering elections. */
|
||||
std::thread timer_thread_;
|
||||
|
||||
/* One thread per peer for outgoing RPCs. */
|
||||
std::vector<std::thread> peer_threads_;
|
||||
};
|
||||
|
||||
} // namespace communication::raft
|
||||
|
||||
#include "raft-inl.hpp"
|
@ -1,120 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "communication/raft/network_common.hpp"
|
||||
#include "communication/raft/raft.hpp"
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
/* Implementation of `RaftNetworkInterface` using RPC. Raft RPC requests and
|
||||
* responses are wrapped in `PeerRpcRequest` and `PeerRpcReply`. */
|
||||
|
||||
// TODO(mtomic): Unwrap RPCs and use separate request-response protocols instead
|
||||
// of `PeerProtocol`, or at least use an union to avoid sending unnecessary data
|
||||
// over the wire.
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
template <class State>
|
||||
using PeerProtocol = rpc::RequestResponse<PeerRpcRequest<State>, PeerRpcReply>;
|
||||
|
||||
template <class State>
|
||||
class RpcNetwork : public RaftNetworkInterface<State> {
|
||||
public:
|
||||
RpcNetwork(rpc::Server &server,
|
||||
std::unordered_map<std::string, io::network::Endpoint> directory)
|
||||
: server_(server), directory_(std::move(directory)) {}
|
||||
|
||||
virtual void Start(RaftMember<State> &member) override {
|
||||
// TODO: Serialize RPC via Cap'n Proto
|
||||
// server_.Register<PeerProtocol<State>>(
|
||||
// [&member](const auto &req_reader, auto *res_builder) {
|
||||
// PeerRpcRequest<State> request;
|
||||
// request.Load(req_reader);
|
||||
// PeerRpcReply reply;
|
||||
// reply.type = request.type;
|
||||
// switch (request.type) {
|
||||
// case RpcType::REQUEST_VOTE:
|
||||
// reply.request_vote = member.OnRequestVote(request.request_vote);
|
||||
// break;
|
||||
// case RpcType::APPEND_ENTRIES:
|
||||
// reply.append_entries =
|
||||
// member.OnAppendEntries(request.append_entries);
|
||||
// break;
|
||||
// default:
|
||||
// LOG(ERROR) << "Unknown RPC type: "
|
||||
// << static_cast<int>(request.type);
|
||||
// }
|
||||
// reply.Save(res_builder);
|
||||
// });
|
||||
}
|
||||
|
||||
virtual bool SendRequestVote(const MemberId &recipient,
|
||||
const RequestVoteRequest &request,
|
||||
RequestVoteReply &reply) override {
|
||||
PeerRpcRequest<State> req;
|
||||
PeerRpcReply rep;
|
||||
|
||||
req.type = RpcType::REQUEST_VOTE;
|
||||
req.request_vote = request;
|
||||
|
||||
if (!SendRpc(recipient, req, rep)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reply = rep.request_vote;
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool SendAppendEntries(const MemberId &recipient,
|
||||
const AppendEntriesRequest<State> &request,
|
||||
AppendEntriesReply &reply) override {
|
||||
PeerRpcRequest<State> req;
|
||||
PeerRpcReply rep;
|
||||
|
||||
req.type = RpcType::APPEND_ENTRIES;
|
||||
req.append_entries = request;
|
||||
|
||||
if (!SendRpc(recipient, req, rep)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reply = rep.append_entries;
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
bool SendRpc(const MemberId &recipient, const PeerRpcRequest<State> &request,
|
||||
PeerRpcReply &reply) {
|
||||
auto &client = GetClient(recipient);
|
||||
auto response = client.template Call<PeerProtocol<State>>(request);
|
||||
|
||||
if (!response) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reply = *response;
|
||||
return true;
|
||||
}
|
||||
|
||||
rpc::Client &GetClient(const MemberId &id) {
|
||||
auto it = clients_.find(id);
|
||||
if (it == clients_.end()) {
|
||||
auto ne = directory_[id];
|
||||
it = clients_.try_emplace(id, ne).first;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
rpc::Server &server_;
|
||||
// TODO(mtomic): how to update and distribute this?
|
||||
std::unordered_map<MemberId, io::network::Endpoint> directory_;
|
||||
|
||||
std::unordered_map<MemberId, rpc::Client> clients_;
|
||||
};
|
||||
|
||||
} // namespace communication::raft
|
@ -1,239 +0,0 @@
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
* Raft log is stored inside a folder. Each log entry is stored in a file named
|
||||
* by its index. There is a special file named "metadata" which stores Raft
|
||||
* metadata and also the last log index, which is used on startup to identify
|
||||
* which log entry files are valid.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "boost/archive/binary_iarchive.hpp"
|
||||
#include "boost/archive/binary_oarchive.hpp"
|
||||
#include "boost/iostreams/device/file_descriptor.hpp"
|
||||
#include "boost/iostreams/stream.hpp"
|
||||
|
||||
#include "communication/raft/raft.hpp"
|
||||
#include "communication/raft/storage/memory.hpp"
|
||||
#include "utils/file.hpp"
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
struct SimpleFileStorageMetadata {
|
||||
TermId term;
|
||||
std::experimental::optional<MemberId> voted_for;
|
||||
LogIndex last_log_index;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &term &voted_for &last_log_index;
|
||||
}
|
||||
};
|
||||
|
||||
template <class State>
|
||||
class SimpleFileStorage : public RaftStorageInterface<State> {
|
||||
public:
|
||||
explicit SimpleFileStorage(const fs::path &parent_dir) : memory_storage_() {
|
||||
try {
|
||||
dir_ = utils::OpenDir(parent_dir);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Error opening log directory: {}", e.what());
|
||||
}
|
||||
|
||||
auto md = utils::TryOpenFile(dir_, "metadata", O_RDONLY);
|
||||
if (!md) {
|
||||
LOG(WARNING) << fmt::format("No metadata file found in directory '{}'",
|
||||
parent_dir);
|
||||
return;
|
||||
}
|
||||
|
||||
boost::iostreams::file_descriptor_source src(
|
||||
md->Handle(),
|
||||
boost::iostreams::file_descriptor_flags::never_close_handle);
|
||||
boost::iostreams::stream<boost::iostreams::file_descriptor_source> is(src);
|
||||
boost::archive::binary_iarchive iar(is);
|
||||
|
||||
SimpleFileStorageMetadata metadata;
|
||||
|
||||
try {
|
||||
iar >> metadata;
|
||||
} catch (boost::archive::archive_exception &e) {
|
||||
LOG(FATAL) << "Failed to deserialize Raft metadata: " << e.what();
|
||||
}
|
||||
|
||||
LOG(INFO) << fmt::format(
|
||||
"Read term = {} and voted_for = {} from storage", metadata.term,
|
||||
metadata.voted_for ? *metadata.voted_for : "(none)");
|
||||
|
||||
memory_storage_.term_ = metadata.term;
|
||||
memory_storage_.voted_for_ = metadata.voted_for;
|
||||
memory_storage_.log_.reserve(metadata.last_log_index);
|
||||
|
||||
for (LogIndex idx = 1; idx <= metadata.last_log_index; ++idx) {
|
||||
utils::File entry_file;
|
||||
|
||||
try {
|
||||
entry_file = utils::OpenFile(dir_, fmt::format("{}", idx), O_RDONLY);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to open entry file {}: {}", idx,
|
||||
e.what());
|
||||
}
|
||||
|
||||
boost::iostreams::file_descriptor_source src(
|
||||
entry_file.Handle(),
|
||||
boost::iostreams::file_descriptor_flags::never_close_handle);
|
||||
boost::iostreams::stream<boost::iostreams::file_descriptor_source> is(
|
||||
src);
|
||||
boost::archive::binary_iarchive iar(is);
|
||||
LogEntry<State> entry;
|
||||
|
||||
try {
|
||||
iar >> entry;
|
||||
memory_storage_.log_.emplace_back(std::move(entry));
|
||||
} catch (boost::archive::archive_exception &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to deserialize log entry {}: {}", idx,
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
|
||||
LOG(INFO) << fmt::format("Read {} log entries", metadata.last_log_index);
|
||||
}
|
||||
|
||||
void WriteTermAndVotedFor(
|
||||
TermId term,
|
||||
const std::experimental::optional<MemberId> &voted_for) override {
|
||||
memory_storage_.WriteTermAndVotedFor(term, voted_for);
|
||||
WriteMetadata();
|
||||
|
||||
// Metadata file might be newly created so we have to fsync the directory.
|
||||
try {
|
||||
utils::Fsync(dir_);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}",
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<TermId, std::experimental::optional<MemberId>> GetTermAndVotedFor()
|
||||
override {
|
||||
return memory_storage_.GetTermAndVotedFor();
|
||||
}
|
||||
|
||||
void AppendLogEntry(const LogEntry<State> &entry) override {
|
||||
memory_storage_.AppendLogEntry(entry);
|
||||
|
||||
utils::File entry_file;
|
||||
|
||||
try {
|
||||
entry_file = utils::OpenFile(
|
||||
dir_, fmt::format("{}", memory_storage_.GetLastLogIndex()),
|
||||
O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to open log entry file: {}", e.what());
|
||||
}
|
||||
|
||||
boost::iostreams::file_descriptor_sink sink(
|
||||
entry_file.Handle(),
|
||||
boost::iostreams::file_descriptor_flags::never_close_handle);
|
||||
boost::iostreams::stream<boost::iostreams::file_descriptor_sink> os(sink);
|
||||
boost::archive::binary_oarchive oar(os);
|
||||
|
||||
try {
|
||||
oar << entry;
|
||||
os.flush();
|
||||
} catch (boost::archive::archive_exception &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to serialize log entry: {}", e.what());
|
||||
}
|
||||
|
||||
try {
|
||||
utils::Fsync(entry_file);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to write log entry file to disk: {}",
|
||||
e.what());
|
||||
}
|
||||
|
||||
// We update the metadata only after the log entry file is written to
|
||||
// disk. This ensures that no file in range [1, last_log_index] is
|
||||
// corrupted.
|
||||
WriteMetadata();
|
||||
|
||||
try {
|
||||
utils::Fsync(dir_);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}",
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
|
||||
TermId GetLogTerm(const LogIndex index) override {
|
||||
return memory_storage_.GetLogTerm(index);
|
||||
}
|
||||
|
||||
LogEntry<State> GetLogEntry(const LogIndex index) override {
|
||||
return memory_storage_.GetLogEntry(index);
|
||||
}
|
||||
|
||||
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) override {
|
||||
return memory_storage_.GetLogSuffix(index);
|
||||
}
|
||||
|
||||
LogIndex GetLastLogIndex() override {
|
||||
return memory_storage_.GetLastLogIndex();
|
||||
}
|
||||
|
||||
void TruncateLogSuffix(const LogIndex index) override {
|
||||
return memory_storage_.TruncateLogSuffix(index);
|
||||
}
|
||||
|
||||
private:
|
||||
InMemoryStorage<State> memory_storage_;
|
||||
utils::File dir_;
|
||||
|
||||
void WriteMetadata() {
|
||||
// We first write data to a temporary file, ensure data is safely written
|
||||
// to disk, and then rename the file. Since rename is an atomic operation,
|
||||
// "metadata" file won't get corrupted in case of program crash.
|
||||
utils::File md_tmp;
|
||||
try {
|
||||
md_tmp =
|
||||
OpenFile(dir_, "metadata.new", O_WRONLY | O_CREAT | O_TRUNC, 0644);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to open temporary metadata file: {}",
|
||||
e.what());
|
||||
}
|
||||
|
||||
boost::iostreams::file_descriptor_sink sink(
|
||||
md_tmp.Handle(),
|
||||
boost::iostreams::file_descriptor_flags::never_close_handle);
|
||||
boost::iostreams::stream<boost::iostreams::file_descriptor_sink> os(sink);
|
||||
boost::archive::binary_oarchive oar(os);
|
||||
|
||||
try {
|
||||
oar << SimpleFileStorageMetadata{
|
||||
memory_storage_.GetTermAndVotedFor().first,
|
||||
memory_storage_.GetTermAndVotedFor().second,
|
||||
memory_storage_.GetLastLogIndex()};
|
||||
} catch (boost::archive::archive_exception &e) {
|
||||
LOG(FATAL) << "Error serializing Raft metadata";
|
||||
}
|
||||
os.flush();
|
||||
|
||||
try {
|
||||
utils::Fsync(md_tmp);
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format(
|
||||
"Failed to write temporary metadata file to disk: {}", e.what());
|
||||
}
|
||||
|
||||
try {
|
||||
utils::Rename(dir_, "metadata.new", dir_, "metadata");
|
||||
} catch (std::system_error &e) {
|
||||
LOG(FATAL) << fmt::format("Failed to move temporary metadata file: {}",
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace communication::raft
|
@ -1,63 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/raft/raft.hpp"
|
||||
|
||||
namespace communication::raft {
|
||||
|
||||
template <class State>
|
||||
class InMemoryStorage : public RaftStorageInterface<State> {
|
||||
public:
|
||||
InMemoryStorage()
|
||||
: term_(0), voted_for_(std::experimental::nullopt), log_() {}
|
||||
|
||||
InMemoryStorage(const TermId term,
|
||||
const std::experimental::optional<std::string> &voted_for,
|
||||
const std::vector<LogEntry<State>> log)
|
||||
: term_(term), voted_for_(voted_for), log_(log) {}
|
||||
|
||||
void WriteTermAndVotedFor(
|
||||
const TermId term,
|
||||
const std::experimental::optional<std::string> &voted_for) {
|
||||
term_ = term;
|
||||
voted_for_ = voted_for;
|
||||
}
|
||||
|
||||
std::pair<TermId, std::experimental::optional<MemberId>>
|
||||
GetTermAndVotedFor() {
|
||||
return {term_, voted_for_};
|
||||
}
|
||||
|
||||
void AppendLogEntry(const LogEntry<State> &entry) { log_.push_back(entry); }
|
||||
|
||||
TermId GetLogTerm(const LogIndex index) {
|
||||
CHECK(0 <= index && index <= log_.size())
|
||||
<< "Trying to read nonexistent log entry";
|
||||
return index > 0 ? log_[index - 1].term : 0;
|
||||
}
|
||||
|
||||
LogEntry<State> GetLogEntry(const LogIndex index) {
|
||||
CHECK(1 <= index && index <= log_.size())
|
||||
<< "Trying to get nonexistent log entry";
|
||||
return log_[index - 1];
|
||||
}
|
||||
|
||||
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) {
|
||||
CHECK(1 <= index && index <= log_.size())
|
||||
<< "Trying to get nonexistent log entries";
|
||||
return std::vector<LogEntry<State>>(log_.begin() + index - 1, log_.end());
|
||||
}
|
||||
|
||||
LogIndex GetLastLogIndex(void) { return log_.size(); }
|
||||
|
||||
void TruncateLogSuffix(const LogIndex index) {
|
||||
CHECK(1 <= index <= log_.size())
|
||||
<< "Trying to remove nonexistent log entries";
|
||||
log_.erase(log_.begin() + index - 1, log_.end());
|
||||
}
|
||||
|
||||
TermId term_;
|
||||
std::experimental::optional<MemberId> voted_for_;
|
||||
std::vector<LogEntry<State>> log_;
|
||||
};
|
||||
|
||||
} // namespace communication::raft
|
@ -1,141 +0,0 @@
|
||||
#include <functional>
|
||||
|
||||
#include "communication/raft/network_common.hpp"
|
||||
#include "communication/raft/raft.hpp"
|
||||
|
||||
namespace communication::raft::test_utils {
|
||||
|
||||
struct DummyState {
|
||||
struct Change {
|
||||
bool operator==(const Change &) const { return true; }
|
||||
bool operator!=(const Change &) const { return false; }
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &, unsigned int) {}
|
||||
};
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &, unsigned int) {}
|
||||
};
|
||||
|
||||
struct IntState {
|
||||
int x;
|
||||
|
||||
struct Change {
|
||||
enum Type { ADD, SUB, SET };
|
||||
Type t;
|
||||
int d;
|
||||
|
||||
bool operator==(const Change &rhs) const {
|
||||
return t == rhs.t && d == rhs.d;
|
||||
}
|
||||
bool operator!=(const Change &rhs) const { return !(*this == rhs); };
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &t;
|
||||
ar &d;
|
||||
}
|
||||
};
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &x;
|
||||
}
|
||||
};
|
||||
|
||||
/* Implementations of `RaftNetworkInterface` for simpler unit testing. */
|
||||
|
||||
/* `NoOpNetworkInterface` doesn't do anything -- it's like a server disconnected
|
||||
* from the network. */
|
||||
template <class State>
|
||||
class NoOpNetworkInterface : public RaftNetworkInterface<State> {
|
||||
public:
|
||||
~NoOpNetworkInterface() {}
|
||||
|
||||
virtual bool SendRequestVote(const MemberId &, const RequestVoteRequest &,
|
||||
RequestVoteReply &) override {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual bool SendAppendEntries(const MemberId &,
|
||||
const AppendEntriesRequest<State> &,
|
||||
AppendEntriesReply &) override {
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual void Start(RaftMember<State> &) override {}
|
||||
};
|
||||
|
||||
/* `NextReplyNetworkInterface` has two fields: `on_request_` and `next_reply_`
|
||||
* which is optional. `on_request_` is a callback that will be called before
|
||||
* processing requets. If `next_reply_` is not set, `Send*` functions will
|
||||
* return false, otherwise they return that reply. */
|
||||
template <class State>
|
||||
class NextReplyNetworkInterface : public RaftNetworkInterface<State> {
|
||||
public:
|
||||
~NextReplyNetworkInterface() {}
|
||||
|
||||
virtual bool SendRequestVote(const MemberId &,
|
||||
const RequestVoteRequest &request,
|
||||
RequestVoteReply &reply) override {
|
||||
PeerRpcRequest<State> req;
|
||||
req.type = RpcType::REQUEST_VOTE;
|
||||
req.request_vote = request;
|
||||
on_request_(req);
|
||||
if (!next_reply_) {
|
||||
return false;
|
||||
}
|
||||
DCHECK(next_reply_->type == RpcType::REQUEST_VOTE)
|
||||
<< "`next_reply_` type doesn't match the request type";
|
||||
reply = next_reply_->request_vote;
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool SendAppendEntries(const MemberId &,
|
||||
const AppendEntriesRequest<State> &request,
|
||||
AppendEntriesReply &reply) override {
|
||||
PeerRpcRequest<State> req;
|
||||
req.type = RpcType::APPEND_ENTRIES;
|
||||
req.append_entries = request;
|
||||
on_request_(req);
|
||||
if (!next_reply_) {
|
||||
return false;
|
||||
}
|
||||
DCHECK(next_reply_->type == RpcType::APPEND_ENTRIES)
|
||||
<< "`next_reply_` type doesn't match the request type";
|
||||
reply = next_reply_->append_entries;
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual void Start(RaftMember<State> &) override {}
|
||||
|
||||
std::function<void(const PeerRpcRequest<State> &)> on_request_;
|
||||
std::experimental::optional<PeerRpcReply> next_reply_;
|
||||
};
|
||||
|
||||
template <class State>
|
||||
class NoOpStorageInterface : public RaftStorageInterface<State> {
|
||||
public:
|
||||
NoOpStorageInterface() {}
|
||||
|
||||
void WriteTermAndVotedFor(const TermId,
|
||||
const std::experimental::optional<std::string> &) {}
|
||||
|
||||
std::pair<TermId, std::experimental::optional<MemberId>>
|
||||
GetTermAndVotedFor() {
|
||||
return {0, {}};
|
||||
}
|
||||
void AppendLogEntry(const LogEntry<State> &) {}
|
||||
TermId GetLogTerm(const LogIndex) { return 0; }
|
||||
LogEntry<State> GetLogEntry(const LogIndex) { assert(false); }
|
||||
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex) { return {}; }
|
||||
LogIndex GetLastLogIndex() { return 0; }
|
||||
void TruncateLogSuffix(const LogIndex) {}
|
||||
|
||||
TermId term_;
|
||||
std::experimental::optional<MemberId> voted_for_;
|
||||
std::vector<LogEntry<State>> log_;
|
||||
};
|
||||
|
||||
} // namespace communication::raft::test_utils
|
@ -25,8 +25,5 @@ add_subdirectory(unit)
|
||||
# property based test binaries
|
||||
add_subdirectory(property_based)
|
||||
|
||||
# raft binaries
|
||||
add_subdirectory(distributed/raft)
|
||||
|
||||
# integration test binaries
|
||||
add_subdirectory(integration)
|
||||
|
@ -21,7 +21,7 @@ def parse_args():
|
||||
Parse command line arguments
|
||||
"""
|
||||
argp = ArgumentParser(description=__doc__)
|
||||
argp.add_argument("--test-suite", default="raft",
|
||||
argp.add_argument("--test-suite", default="card_fraud",
|
||||
help="Tests suite")
|
||||
argp.add_argument("--test", default="example_test",
|
||||
help="Test specification in python module")
|
||||
|
@ -1,29 +0,0 @@
|
||||
# set current directory name as a test type
|
||||
get_filename_component(test_type ${CMAKE_CURRENT_SOURCE_DIR} NAME)
|
||||
|
||||
# get all cpp abs file names recursively starting from current directory
|
||||
file(GLOB_RECURSE test_type_cpps *.cpp)
|
||||
message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}")
|
||||
|
||||
# for each cpp file build binary and register test
|
||||
foreach(test_cpp ${test_type_cpps})
|
||||
|
||||
# get exec name (remove extension from the abs path)
|
||||
get_filename_component(exec_name ${test_cpp} NAME_WE)
|
||||
|
||||
set(target_name memgraph__${test_type}__${exec_name})
|
||||
|
||||
# build exec file
|
||||
add_executable(${target_name} ${test_cpp})
|
||||
|
||||
# OUTPUT_NAME sets the real name of a target when it is built and can be
|
||||
# used to help create two targets of the same name even though CMake
|
||||
# requires unique logical target names
|
||||
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
|
||||
|
||||
# link libraries
|
||||
target_link_libraries(${target_name} memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
set(output_path ${CMAKE_BINARY_DIR}/test_results/unit/${target_name}.xml)
|
||||
|
||||
endforeach()
|
@ -1,13 +0,0 @@
|
||||
# Raft Tests
|
||||
|
||||
To run test locally execute following command:
|
||||
|
||||
```
|
||||
./local_runner {test_suite} {test_name}
|
||||
```
|
||||
|
||||
Every test has to be defined as python module
|
||||
with exposed ```run(machine_ids, workers)```
|
||||
method. In each test there has to be constant
|
||||
```NUM_MACHINES``` which specifies how many workers
|
||||
to run in cluster.
|
@ -1,49 +0,0 @@
|
||||
#include <ctime>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "messages.hpp"
|
||||
|
||||
using namespace communication::rpc;
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_string(server_interface, "127.0.0.1",
|
||||
"Server interface on which to communicate.");
|
||||
DEFINE_int32(server_port, 8010, "Server port on which to communicate.");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Raft RPC Client");
|
||||
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
// Initialize client.
|
||||
Client client(io::network::Endpoint(
|
||||
io::network::ResolveHostname(FLAGS_server_interface), FLAGS_server_port));
|
||||
|
||||
// Try to send 100 values to server.
|
||||
// If requests timeout, try to resend it.
|
||||
// Log output on server should contain all values once
|
||||
// in correct order.
|
||||
for (int i = 1; i <= 100; ++i) {
|
||||
LOG(INFO) << fmt::format("Apennding value: {}", i);
|
||||
// TODO: Serialize RPC via Cap'n Proto
|
||||
// auto result_tuple = client.Call<AppendEntry>(i);
|
||||
// if (!result_tuple) {
|
||||
// LOG(INFO) << "Request unsuccessful";
|
||||
// // Try to resend value
|
||||
// --i;
|
||||
// } else {
|
||||
// LOG(INFO) << fmt::format("Appended value: {}", i);
|
||||
// }
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
#include <fstream>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "messages.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/terminate_handler.hpp"
|
||||
|
||||
using namespace communication::rpc;
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_string(interface, "127.0.0.1",
|
||||
"Communication interface on which to listen.");
|
||||
DEFINE_string(port, "10000", "Communication port on which to listen.");
|
||||
DEFINE_string(log, "log.txt", "Entries log file");
|
||||
|
||||
volatile sig_atomic_t is_shutting_down = 0;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Raft RPC Server");
|
||||
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
Server server(io::network::Endpoint(FLAGS_interface, stoul(FLAGS_port)));
|
||||
std::ofstream log(FLAGS_log, std::ios_base::app);
|
||||
|
||||
// Handler for regular termination signals.
|
||||
auto shutdown = [&log]() {
|
||||
if (is_shutting_down) return;
|
||||
is_shutting_down = 1;
|
||||
log.close();
|
||||
exit(0);
|
||||
};
|
||||
|
||||
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
||||
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
||||
// double shutdown.
|
||||
sigset_t block_shutdown_signals;
|
||||
sigemptyset(&block_shutdown_signals);
|
||||
sigaddset(&block_shutdown_signals, SIGTERM);
|
||||
sigaddset(&block_shutdown_signals, SIGINT);
|
||||
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
||||
shutdown, block_shutdown_signals))
|
||||
<< "Unable to register SIGTERM handler!";
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
||||
block_shutdown_signals))
|
||||
<< "Unable to register SIGINT handler!";
|
||||
|
||||
// Example callback.
|
||||
// TODO: Serialize RPC via Cap'n Proto
|
||||
// server.Register<AppendEntry>(
|
||||
// [&log](const auto &req_reader, auto *res_builder) {
|
||||
// AppendEntryReq request;
|
||||
// request.Load(req_reader);
|
||||
// log << request.val << std::endl;
|
||||
// log.flush();
|
||||
// LOG(INFO) << fmt::format("AppendEntry: {}", request.val);
|
||||
// AppendEntryRes res(200, FLAGS_interface, stol(FLAGS_port));
|
||||
// res.Save(res_builder);
|
||||
// });
|
||||
|
||||
LOG(INFO) << "Raft RPC server started";
|
||||
// Sleep until shutdown detected.
|
||||
std::this_thread::sleep_until(
|
||||
std::chrono::time_point<std::chrono::system_clock>::max());
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import xmlrpc.client
|
||||
|
||||
NUM_MACHINES = 2
|
||||
|
||||
# binaries to run
|
||||
CLIENT_BINARY = "tests/distributed/raft/example_client"
|
||||
SERVER_BINARY = "tests/distributed/raft/example_server"
|
||||
|
||||
|
||||
def run(machine_ids, workers):
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
log = logging.getLogger("example_test")
|
||||
log.info("Start")
|
||||
|
||||
# define interfaces and ports for binaries
|
||||
server_interface = os.environ[machine_ids[1]]
|
||||
server_port = str(10000)
|
||||
client_interface = os.environ[machine_ids[0]]
|
||||
client_port = str(10010)
|
||||
|
||||
# start binaries
|
||||
log_abs_path = workers[machine_ids[1]].allocate_file()
|
||||
server_tid = workers[machine_ids[1]].get_jail()
|
||||
server_args = ["--interface", server_interface]
|
||||
server_args += ["--port", server_port]
|
||||
server_args += ["--log", log_abs_path]
|
||||
workers[machine_ids[1]].start(server_tid, SERVER_BINARY, server_args)
|
||||
|
||||
client_tid = workers[machine_ids[0]].get_jail()
|
||||
client_args = ["--interface", client_interface]
|
||||
client_args += ["--port", client_port]
|
||||
client_args += ["--server-interface", server_interface]
|
||||
client_args += ["--server-port", server_port]
|
||||
workers[machine_ids[0]].start(client_tid, CLIENT_BINARY, client_args)
|
||||
|
||||
# crash server
|
||||
workers[machine_ids[1]].stop(server_tid)
|
||||
time.sleep(5)
|
||||
workers[machine_ids[1]].start(server_tid, SERVER_BINARY, server_args)
|
||||
|
||||
# wait for test to finish
|
||||
time.sleep(5)
|
||||
|
||||
# stop binaries
|
||||
workers[machine_ids[0]].stop(client_tid)
|
||||
workers[machine_ids[1]].stop(server_tid)
|
||||
|
||||
# fetch log
|
||||
result = workers[machine_ids[1]].read_file(log_abs_path)
|
||||
if result is not None:
|
||||
local_log = "local_log.txt"
|
||||
result = result.data.decode('ascii')
|
||||
if result.splitlines() == ["{}".format(x) for x in range(1, 101)]:
|
||||
log.warn("Test successful")
|
||||
else:
|
||||
raise Exception("Test failed")
|
||||
|
||||
log.info("End")
|
@ -1,21 +0,0 @@
|
||||
#include "communication/rpc/messages.hpp"
|
||||
|
||||
using namespace communication::rpc;
|
||||
|
||||
struct AppendEntryReq {
|
||||
AppendEntryReq() {}
|
||||
explicit AppendEntryReq(int val) : val(val) {}
|
||||
int val;
|
||||
};
|
||||
|
||||
struct AppendEntryRes {
|
||||
AppendEntryRes() {}
|
||||
AppendEntryRes(int status, std::string interface, uint16_t port)
|
||||
: status(status), interface(interface), port(port) {}
|
||||
int status;
|
||||
std::string interface;
|
||||
uint16_t port;
|
||||
|
||||
};
|
||||
|
||||
using AppendEntry = RequestResponse<AppendEntryReq, AppendEntryRes>;
|
@ -56,9 +56,6 @@ target_link_libraries(${test_prefix}query_hash memgraph_lib kvstore_dummy_lib)
|
||||
add_manual_test(query_planner.cpp)
|
||||
target_link_libraries(${test_prefix}query_planner memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_manual_test(raft_rpc.cpp)
|
||||
target_link_libraries(${test_prefix}raft_rpc memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_manual_test(repl.cpp)
|
||||
target_link_libraries(${test_prefix}repl memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
|
@ -1,50 +0,0 @@
|
||||
#include "communication/raft/rpc.hpp"
|
||||
#include "communication/raft/storage/file.hpp"
|
||||
#include "communication/raft/test_utils.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
namespace raft = communication::raft;
|
||||
|
||||
using io::network::Endpoint;
|
||||
using raft::RaftConfig;
|
||||
using raft::RpcNetwork;
|
||||
using raft::test_utils::DummyState;
|
||||
|
||||
DEFINE_string(member_id, "", "id of Raft member");
|
||||
DEFINE_string(log_dir, "", "Raft log directory");
|
||||
|
||||
/* Start cluster members with:
|
||||
* ./raft_rpc --member-id a --log-dir a_log
|
||||
* ./raft_rpc --member-id b --log-dir b_log
|
||||
* ./raft_rpc --member-id c --log-dir c_log
|
||||
*
|
||||
* Enjoy democracy!
|
||||
*/
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
std::unordered_map<std::string, Endpoint> directory = {
|
||||
{"a", Endpoint("127.0.0.1", 12345)},
|
||||
{"b", Endpoint("127.0.0.1", 12346)},
|
||||
{"c", Endpoint("127.0.0.1", 12347)}};
|
||||
|
||||
communication::rpc::Server server(directory[FLAGS_member_id]);
|
||||
// TODO: Serialize RPC via Cap'n Proto
|
||||
// RpcNetwork<DummyState> network(server, directory);
|
||||
// raft::SimpleFileStorage<DummyState> storage(FLAGS_log_dir);
|
||||
|
||||
// raft::RaftConfig config{{"a", "b", "c"}, 150ms, 300ms, 70ms, 30ms};
|
||||
|
||||
// {
|
||||
// raft::RaftMember<DummyState> raft_member(network, storage, FLAGS_member_id,
|
||||
// config);
|
||||
// while (true) {
|
||||
// continue;
|
||||
// }
|
||||
// }
|
||||
|
||||
return 0;
|
||||
}
|
@ -178,12 +178,6 @@ target_link_libraries(${test_prefix}query_variable_start_planner memgraph_lib kv
|
||||
add_unit_test(queue.cpp)
|
||||
target_link_libraries(${test_prefix}queue memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_unit_test(raft.cpp)
|
||||
target_link_libraries(${test_prefix}raft memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_unit_test(raft_storage.cpp)
|
||||
target_link_libraries(${test_prefix}raft_storage memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
add_unit_test(record_edge_vertex_accessor.cpp)
|
||||
target_link_libraries(${test_prefix}record_edge_vertex_accessor memgraph_lib kvstore_dummy_lib)
|
||||
|
||||
|
@ -1,660 +0,0 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <experimental/optional>
|
||||
#include <thread>
|
||||
|
||||
#include "communication/raft/raft.hpp"
|
||||
#include "communication/raft/storage/memory.hpp"
|
||||
#include "communication/raft/test_utils.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using testing::Values;
|
||||
|
||||
using namespace communication::raft;
|
||||
using namespace communication::raft::test_utils;
|
||||
|
||||
using communication::raft::impl::RaftMemberImpl;
|
||||
using communication::raft::impl::RaftMode;
|
||||
|
||||
const RaftConfig test_config1{{"a"}, 150ms, 300ms, 70ms, 30ms};
|
||||
const RaftConfig test_config2{{"a", "b"}, 150ms, 300ms, 70ms, 30ms};
|
||||
const RaftConfig test_config3{{"a", "b", "c"}, 150ms, 300ms, 70ms, 30ms};
|
||||
const RaftConfig test_config5{
|
||||
{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms, 30ms};
|
||||
|
||||
class RaftMemberImplTest : public ::testing::Test {
|
||||
public:
|
||||
RaftMemberImplTest()
|
||||
: storage_(1, "a", {}), member(network_, storage_, "a", test_config5) {}
|
||||
|
||||
void SetLog(std::vector<LogEntry<DummyState>> log) {
|
||||
storage_.log_ = std::move(log);
|
||||
}
|
||||
|
||||
NoOpNetworkInterface<DummyState> network_;
|
||||
InMemoryStorage<DummyState> storage_;
|
||||
RaftMemberImpl<DummyState> member;
|
||||
};
|
||||
|
||||
TEST_F(RaftMemberImplTest, Constructor) {
|
||||
EXPECT_EQ(member.mode_, RaftMode::FOLLOWER);
|
||||
EXPECT_EQ(member.term_, 1);
|
||||
EXPECT_EQ(*member.voted_for_, "a");
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, CandidateOrLeaderTransitionToFollower) {
|
||||
member.mode_ = RaftMode::CANDIDATE;
|
||||
member.CandidateTransitionToLeader();
|
||||
|
||||
member.CandidateOrLeaderTransitionToFollower();
|
||||
EXPECT_EQ(member.mode_, RaftMode::FOLLOWER);
|
||||
EXPECT_EQ(member.leader_, std::experimental::nullopt);
|
||||
EXPECT_LT(member.next_election_time_, TimePoint::max());
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, CandidateTransitionToLeader) {
|
||||
member.mode_ = RaftMode::CANDIDATE;
|
||||
member.CandidateTransitionToLeader();
|
||||
|
||||
EXPECT_EQ(member.mode_, RaftMode::LEADER);
|
||||
EXPECT_EQ(*member.leader_, "a");
|
||||
EXPECT_EQ(member.next_election_time_, TimePoint::max());
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, CandidateOrLeaderNoteTerm) {
|
||||
member.mode_ = RaftMode::LEADER;
|
||||
member.term_ = 5;
|
||||
member.CandidateOrLeaderNoteTerm(5);
|
||||
|
||||
EXPECT_EQ(member.mode_, RaftMode::LEADER);
|
||||
EXPECT_EQ(member.term_, 5);
|
||||
|
||||
member.CandidateOrLeaderNoteTerm(6);
|
||||
EXPECT_EQ(member.mode_, RaftMode::FOLLOWER);
|
||||
EXPECT_EQ(member.term_, 6);
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, StartNewElection) {
|
||||
member.StartNewElection();
|
||||
|
||||
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
|
||||
EXPECT_EQ(member.term_, 2);
|
||||
EXPECT_EQ(member.voted_for_, member.id_);
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, CountVotes) {
|
||||
member.StartNewElection();
|
||||
EXPECT_FALSE(member.CountVotes());
|
||||
|
||||
member.peer_states_["b"]->voted_for_me = true;
|
||||
EXPECT_FALSE(member.CountVotes());
|
||||
|
||||
member.peer_states_["c"]->voted_for_me = true;
|
||||
EXPECT_TRUE(member.CountVotes());
|
||||
}
|
||||
|
||||
TEST_F(RaftMemberImplTest, AdvanceCommitIndex) {
|
||||
SetLog({{1}, {1}, {1}, {1}, {2}, {2}, {2}, {2}});
|
||||
|
||||
member.mode_ = RaftMode::LEADER;
|
||||
member.term_ = 2;
|
||||
|
||||
member.peer_states_["b"]->match_index = 4;
|
||||
member.peer_states_["c"]->match_index = 4;
|
||||
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
member.peer_states_["b"]->match_index = 4;
|
||||
member.peer_states_["c"]->match_index = 4;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
member.peer_states_["b"]->match_index = 5;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
member.peer_states_["c"]->match_index = 5;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 5);
|
||||
|
||||
member.peer_states_["d"]->match_index = 6;
|
||||
member.peer_states_["e"]->match_index = 7;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 6);
|
||||
|
||||
member.peer_states_["c"]->match_index = 8;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 7);
|
||||
|
||||
member.peer_states_["a"]->match_index = 8;
|
||||
member.AdvanceCommitIndex();
|
||||
EXPECT_EQ(member.commit_index_, 8);
|
||||
}
|
||||
|
||||
TEST(RequestVote, SimpleElection) {
|
||||
NextReplyNetworkInterface<DummyState> network;
|
||||
InMemoryStorage<DummyState> storage(1, {}, {{1}, {1}});
|
||||
RaftMemberImpl<DummyState> member(network, storage, "a", test_config5);
|
||||
|
||||
member.StartNewElection();
|
||||
|
||||
std::unique_lock<std::mutex> lock(member.mutex_);
|
||||
|
||||
PeerRpcReply next_reply;
|
||||
next_reply.type = RpcType::REQUEST_VOTE;
|
||||
|
||||
network.on_request_ = [](const PeerRpcRequest<DummyState> &request) {
|
||||
ASSERT_EQ(request.type, RpcType::REQUEST_VOTE);
|
||||
ASSERT_EQ(request.request_vote.candidate_term, 2);
|
||||
ASSERT_EQ(request.request_vote.candidate_id, "a");
|
||||
ASSERT_EQ(request.request_vote.last_log_index, 2);
|
||||
ASSERT_EQ(request.request_vote.last_log_term, 1);
|
||||
};
|
||||
|
||||
/* member 'b' first voted for us */
|
||||
next_reply.request_vote.term = 2;
|
||||
next_reply.request_vote.vote_granted = true;
|
||||
network.next_reply_ = next_reply;
|
||||
member.RequestVote("b", *member.peer_states_["b"], lock);
|
||||
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
|
||||
EXPECT_TRUE(member.peer_states_["b"]->request_vote_done);
|
||||
EXPECT_TRUE(member.peer_states_["b"]->voted_for_me);
|
||||
|
||||
/* member 'c' didn't */
|
||||
next_reply.request_vote.vote_granted = false;
|
||||
network.next_reply_ = next_reply;
|
||||
member.RequestVote("c", *member.peer_states_["c"], lock);
|
||||
EXPECT_TRUE(member.peer_states_["c"]->request_vote_done);
|
||||
EXPECT_FALSE(member.peer_states_["c"]->voted_for_me);
|
||||
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
|
||||
|
||||
/* but member 'd' did */
|
||||
next_reply.request_vote.vote_granted = true;
|
||||
network.next_reply_ = next_reply;
|
||||
member.RequestVote("d", *member.peer_states_["d"], lock);
|
||||
EXPECT_TRUE(member.peer_states_["d"]->request_vote_done);
|
||||
EXPECT_TRUE(member.peer_states_["d"]->voted_for_me);
|
||||
EXPECT_EQ(member.mode_, RaftMode::LEADER);
|
||||
|
||||
/* no-op entry should be at the end of leader's log */
|
||||
EXPECT_EQ(storage.log_.back().term, 2);
|
||||
EXPECT_EQ(storage.log_.back().command, std::experimental::nullopt);
|
||||
}
|
||||
|
||||
TEST(AppendEntries, SimpleLogSync) {
|
||||
NextReplyNetworkInterface<DummyState> network;
|
||||
InMemoryStorage<DummyState> storage(3, {}, {{1}, {1}, {2}, {3}});
|
||||
RaftMemberImpl<DummyState> member(network, storage, "a", test_config2);
|
||||
|
||||
member.mode_ = RaftMode::LEADER;
|
||||
|
||||
std::unique_lock<std::mutex> lock(member.mutex_);
|
||||
|
||||
PeerRpcReply reply;
|
||||
reply.type = RpcType::APPEND_ENTRIES;
|
||||
|
||||
reply.append_entries.term = 3;
|
||||
reply.append_entries.success = false;
|
||||
network.next_reply_ = reply;
|
||||
|
||||
LogIndex expected_prev_log_index;
|
||||
TermId expected_prev_log_term;
|
||||
std::vector<LogEntry<DummyState>> expected_entries;
|
||||
|
||||
network.on_request_ = [&](const PeerRpcRequest<DummyState> &request) {
|
||||
EXPECT_EQ(request.type, RpcType::APPEND_ENTRIES);
|
||||
EXPECT_EQ(request.append_entries.leader_term, 3);
|
||||
EXPECT_EQ(request.append_entries.leader_id, "a");
|
||||
EXPECT_EQ(request.append_entries.prev_log_index, expected_prev_log_index);
|
||||
EXPECT_EQ(request.append_entries.prev_log_term, expected_prev_log_term);
|
||||
EXPECT_EQ(request.append_entries.entries, expected_entries);
|
||||
};
|
||||
|
||||
/* initial state after election */
|
||||
auto &peer_state = *member.peer_states_["b"];
|
||||
peer_state.match_index = 0;
|
||||
peer_state.next_index = 5;
|
||||
peer_state.suppress_log_entries = true;
|
||||
|
||||
/* send a heartbeat and find out logs don't match */
|
||||
expected_prev_log_index = 4;
|
||||
expected_prev_log_term = 3;
|
||||
expected_entries = {};
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 0);
|
||||
EXPECT_EQ(peer_state.next_index, 4);
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
/* move `next_index` until we find a match, `expected_entries` will be empty
|
||||
* because `suppress_log_entries` will be true */
|
||||
expected_entries = {};
|
||||
|
||||
expected_prev_log_index = 3;
|
||||
expected_prev_log_term = 2;
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 0);
|
||||
EXPECT_EQ(peer_state.next_index, 3);
|
||||
EXPECT_EQ(peer_state.suppress_log_entries, true);
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
expected_prev_log_index = 2;
|
||||
expected_prev_log_term = 1;
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 0);
|
||||
EXPECT_EQ(peer_state.next_index, 2);
|
||||
EXPECT_EQ(peer_state.suppress_log_entries, true);
|
||||
EXPECT_EQ(member.commit_index_, 0);
|
||||
|
||||
/* we found a match */
|
||||
reply.append_entries.success = true;
|
||||
network.next_reply_ = reply;
|
||||
|
||||
expected_prev_log_index = 1;
|
||||
expected_prev_log_term = 1;
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 1);
|
||||
EXPECT_EQ(peer_state.next_index, 2);
|
||||
EXPECT_EQ(peer_state.suppress_log_entries, false);
|
||||
EXPECT_EQ(member.commit_index_, 4);
|
||||
|
||||
/* now sync them */
|
||||
expected_prev_log_index = 1;
|
||||
expected_prev_log_term = 1;
|
||||
expected_entries = {{1}, {2}, {3}};
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 4);
|
||||
EXPECT_EQ(peer_state.next_index, 5);
|
||||
EXPECT_EQ(peer_state.suppress_log_entries, false);
|
||||
EXPECT_EQ(member.commit_index_, 4);
|
||||
|
||||
/* heartbeat after successful log sync */
|
||||
expected_prev_log_index = 4;
|
||||
expected_prev_log_term = 3;
|
||||
expected_entries = {};
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 4);
|
||||
EXPECT_EQ(peer_state.next_index, 5);
|
||||
EXPECT_EQ(member.commit_index_, 4);
|
||||
|
||||
/* replicate a newly appended entry */
|
||||
storage.AppendLogEntry({3});
|
||||
|
||||
expected_prev_log_index = 4;
|
||||
expected_prev_log_term = 3;
|
||||
expected_entries = {{3}};
|
||||
member.AppendEntries("b", peer_state, lock);
|
||||
EXPECT_EQ(peer_state.match_index, 5);
|
||||
EXPECT_EQ(peer_state.next_index, 6);
|
||||
EXPECT_EQ(member.commit_index_, 5);
|
||||
}
|
||||
|
||||
template <class TestParam>
|
||||
class RaftMemberParamTest : public ::testing::TestWithParam<TestParam> {
|
||||
public:
|
||||
virtual void SetUp() {
|
||||
/* Some checks to verify that test case is valid. */
|
||||
|
||||
/* Member's term should be greater than or equal to last log term. */
|
||||
ASSERT_GE(storage_.term_, storage_.GetLogTerm(storage_.GetLastLogIndex()));
|
||||
|
||||
ASSERT_GE(peer_storage_.term_,
|
||||
peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex()));
|
||||
|
||||
/* If two logs match at some index, the entire prefix should match. */
|
||||
LogIndex pos =
|
||||
std::min(storage_.GetLastLogIndex(), peer_storage_.GetLastLogIndex());
|
||||
|
||||
for (; pos > 0; --pos) {
|
||||
if (storage_.GetLogEntry(pos) == peer_storage_.GetLogEntry(pos)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (; pos > 0; --pos) {
|
||||
ASSERT_EQ(storage_.GetLogEntry(pos), peer_storage_.GetLogEntry(pos));
|
||||
}
|
||||
}
|
||||
|
||||
RaftMemberParamTest(InMemoryStorage<DummyState> storage,
|
||||
InMemoryStorage<DummyState> peer_storage)
|
||||
: network_(NoOpNetworkInterface<DummyState>()),
|
||||
storage_(storage),
|
||||
member_(network_, storage_, "a", test_config3),
|
||||
peer_storage_(peer_storage) {}
|
||||
|
||||
NoOpNetworkInterface<DummyState> network_;
|
||||
InMemoryStorage<DummyState> storage_;
|
||||
RaftMemberImpl<DummyState> member_;
|
||||
|
||||
InMemoryStorage<DummyState> peer_storage_;
|
||||
};
|
||||
|
||||
struct OnRequestVoteTestParam {
|
||||
TermId term;
|
||||
std::experimental::optional<MemberId> voted_for;
|
||||
std::vector<LogEntry<DummyState>> log;
|
||||
|
||||
TermId peer_term;
|
||||
std::vector<LogEntry<DummyState>> peer_log;
|
||||
|
||||
bool expected_reply;
|
||||
};
|
||||
|
||||
class OnRequestVoteTest : public RaftMemberParamTest<OnRequestVoteTestParam> {
|
||||
public:
|
||||
OnRequestVoteTest()
|
||||
: RaftMemberParamTest(
|
||||
InMemoryStorage<DummyState>(GetParam().term, GetParam().voted_for,
|
||||
GetParam().log),
|
||||
InMemoryStorage<DummyState>(GetParam().peer_term, {},
|
||||
GetParam().peer_log)) {}
|
||||
virtual ~OnRequestVoteTest() {}
|
||||
};
|
||||
|
||||
TEST_P(OnRequestVoteTest, RequestVoteTest) {
|
||||
auto reply = member_.OnRequestVote(
|
||||
{GetParam().peer_term, "b", peer_storage_.GetLastLogIndex(),
|
||||
peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex())});
|
||||
|
||||
EXPECT_EQ(reply.vote_granted, GetParam().expected_reply);
|
||||
|
||||
/* Our term should always be at least as large as sender's term. */
|
||||
/* If we accepted the request, our term should be equal to candidate's term
|
||||
* and voted_for should be set. */
|
||||
EXPECT_EQ(reply.term, std::max(GetParam().peer_term, GetParam().term));
|
||||
EXPECT_EQ(storage_.term_, std::max(GetParam().peer_term, GetParam().term));
|
||||
EXPECT_EQ(storage_.voted_for_,
|
||||
reply.vote_granted ? "b" : GetParam().voted_for);
|
||||
}
|
||||
|
||||
/* Member 'b' is starting an election for term 5 and sending RequestVote RPC
|
||||
* to 'a'. Logs are empty so log-up-to-date check will always pass. */
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
TermAndVotedForCheck, OnRequestVoteTest,
|
||||
Values(
|
||||
/* we didn't vote for anyone in a smaller term -> accept */
|
||||
OnRequestVoteTestParam{3, {}, {}, 5, {}, true},
|
||||
/* we voted for someone in smaller term -> accept */
|
||||
OnRequestVoteTestParam{4, "c", {}, 5, {}, true},
|
||||
/* equal term but we didn't vote for anyone in it -> accept */
|
||||
OnRequestVoteTestParam{5, {}, {}, 5, {}, true},
|
||||
/* equal term but we voted for this candidate-> accept */
|
||||
OnRequestVoteTestParam{5, "b", {}, 5, {}, true},
|
||||
/* equal term but we voted for someone else -> decline */
|
||||
OnRequestVoteTestParam{5, "c", {}, 5, {}, false},
|
||||
/* larger term and haven't voted for anyone -> decline */
|
||||
OnRequestVoteTestParam{6, {}, {}, 5, {}, false},
|
||||
/* larger term and we voted for someone else -> decline */
|
||||
OnRequestVoteTestParam{6, "a", {}, 5, {}, false}));
|
||||
|
||||
/* Member 'a' log:
|
||||
* 1 2 3 4 5 6 7
|
||||
* | 1 | 1 | 1 | 2 | 3 | 3 |
|
||||
*
|
||||
* It is in term 5.
|
||||
*/
|
||||
|
||||
/* Member 'b' is sending RequestVote RPC to 'a' for term 8. */
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
LogUpToDateCheck, OnRequestVoteTest,
|
||||
Values(
|
||||
/* candidate's last log term is smaller -> decline */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}},
|
||||
false},
|
||||
/* candidate's last log term is smaller -> decline */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}, {2}, {2}, {2}},
|
||||
false},
|
||||
/* candidate's term is equal, but our log is longer -> decline */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}, {3}},
|
||||
false},
|
||||
/* equal logs -> accept */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
true},
|
||||
/* candidate's term is larger -> accept */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}, {4}},
|
||||
true},
|
||||
/* equal terms, but candidate's log is longer -> accept */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}, {3}},
|
||||
true},
|
||||
/* candidate's last log term is larger -> accept */
|
||||
OnRequestVoteTestParam{5,
|
||||
{},
|
||||
{{1}, {1}, {1}, {2}, {3}, {3}},
|
||||
8,
|
||||
{{1}, {2}, {3}, {4}, {5}},
|
||||
true}));
|
||||
|
||||
struct OnAppendEntriesTestParam {
|
||||
TermId term;
|
||||
std::vector<LogEntry<DummyState>> log;
|
||||
|
||||
TermId peer_term;
|
||||
std::vector<LogEntry<DummyState>> peer_log;
|
||||
LogIndex peer_next_index;
|
||||
|
||||
bool expected_reply;
|
||||
std::vector<LogEntry<DummyState>> expected_log;
|
||||
};
|
||||
|
||||
class OnAppendEntriesTest
|
||||
: public RaftMemberParamTest<OnAppendEntriesTestParam> {
|
||||
public:
|
||||
OnAppendEntriesTest()
|
||||
: RaftMemberParamTest(
|
||||
InMemoryStorage<DummyState>(GetParam().term, {}, GetParam().log),
|
||||
InMemoryStorage<DummyState>(GetParam().peer_term, {},
|
||||
GetParam().peer_log)) {}
|
||||
virtual ~OnAppendEntriesTest() {}
|
||||
};
|
||||
|
||||
TEST_P(OnAppendEntriesTest, All) {
|
||||
auto last_log_index = GetParam().peer_next_index - 1;
|
||||
auto last_log_term = peer_storage_.GetLogTerm(last_log_index);
|
||||
auto entries = peer_storage_.GetLogSuffix(GetParam().peer_next_index);
|
||||
auto reply = member_.OnAppendEntries(
|
||||
{GetParam().peer_term, "b", last_log_index, last_log_term, entries, 0});
|
||||
|
||||
EXPECT_EQ(reply.success, GetParam().expected_reply);
|
||||
EXPECT_EQ(reply.term, std::max(GetParam().peer_term, GetParam().term));
|
||||
EXPECT_EQ(storage_.log_, GetParam().expected_log);
|
||||
}
|
||||
|
||||
/* Member 'a' recieved AppendEntries RPC from member 'b'. The request will
|
||||
* contain no log entries, representing just a heartbeat, as it is not
|
||||
* important in these scenarios. */
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
TermAndLogConsistencyCheck, OnAppendEntriesTest,
|
||||
Values(
|
||||
/* sender has stale term -> decline */
|
||||
OnAppendEntriesTestParam{/* my term*/ 8,
|
||||
{{1}, {1}, {2}},
|
||||
7,
|
||||
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
|
||||
7,
|
||||
false,
|
||||
{{1}, {1}, {2}}},
|
||||
/* we're missing entries 4, 5 and 6 -> decline, but update term */
|
||||
OnAppendEntriesTestParam{4,
|
||||
{{1}, {1}, {2}},
|
||||
8,
|
||||
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
|
||||
7,
|
||||
false,
|
||||
{{1}, {1}, {2}}},
|
||||
/* we're missing entry 4 -> decline, but update term */
|
||||
OnAppendEntriesTestParam{5,
|
||||
{{1}, {1}, {2}},
|
||||
8,
|
||||
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
|
||||
5,
|
||||
false,
|
||||
{{1}, {1}, {2}}},
|
||||
/* log terms don't match at entry 4 -> decline, but update term */
|
||||
OnAppendEntriesTestParam{5,
|
||||
{{1}, {1}, {2}},
|
||||
8,
|
||||
{{1}, {1}, {3}, {3}, {4}, {5}, {5}, {6}},
|
||||
4,
|
||||
false,
|
||||
{{1}, {1}, {2}}},
|
||||
/* logs match -> accept and update term */
|
||||
OnAppendEntriesTestParam{5,
|
||||
{{1}, {1}, {2}},
|
||||
8,
|
||||
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}},
|
||||
/* now follow some log truncation tests */
|
||||
/* no truncation, append a single entry */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
9,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* no truncation, append multiple entries */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* no truncation, leader's log is prefix of ours */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}}},
|
||||
/* another one, now with entries from newer term */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}},
|
||||
/* no truncation, partial match between our log and appended entries
|
||||
*/
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* truncate suffix */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {4}, {4}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
5,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* truncate suffix, with partial match between our log and appened
|
||||
entries */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {4}, {4}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
4,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* delete whole log */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{5}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
1,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
|
||||
/* append on empty log */
|
||||
OnAppendEntriesTestParam{
|
||||
8,
|
||||
{{}},
|
||||
8,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
|
||||
1,
|
||||
true,
|
||||
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}));
|
||||
|
||||
TEST(RaftMemberTest, AddCommand) {
|
||||
NextReplyNetworkInterface<IntState> network;
|
||||
|
||||
std::vector<IntState::Change> changes = {{IntState::Change::Type::ADD, 5},
|
||||
{IntState::Change::Type::ADD, 10}};
|
||||
|
||||
network.on_request_ = [&network, num_calls = 0 ](
|
||||
const PeerRpcRequest<IntState> &request) mutable {
|
||||
++num_calls;
|
||||
PeerRpcReply reply;
|
||||
|
||||
if (num_calls == 1) {
|
||||
reply.type = RpcType::REQUEST_VOTE;
|
||||
reply.request_vote.term = 1;
|
||||
reply.request_vote.vote_granted = true;
|
||||
} else {
|
||||
reply.type = RpcType::APPEND_ENTRIES;
|
||||
reply.append_entries.term = 1;
|
||||
reply.append_entries.success = true;
|
||||
}
|
||||
|
||||
network.next_reply_ = reply;
|
||||
};
|
||||
|
||||
InMemoryStorage<IntState> storage(0, {}, {});
|
||||
RaftMember<IntState> member(network, storage, "a", test_config2);
|
||||
|
||||
std::this_thread::sleep_for(500ms);
|
||||
|
||||
member.AddCommand(changes[0], false);
|
||||
member.AddCommand(changes[1], true);
|
||||
|
||||
ASSERT_EQ(storage.log_.size(), 3);
|
||||
EXPECT_EQ(storage.log_[0].command, std::experimental::nullopt);
|
||||
EXPECT_TRUE(storage.log_[1].command &&
|
||||
*storage.log_[1].command == changes[0]);
|
||||
EXPECT_TRUE(storage.log_[2].command &&
|
||||
*storage.log_[2].command == changes[1]);
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
#include <experimental/optional>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "communication/raft/storage/file.hpp"
|
||||
#include "communication/raft/test_utils.hpp"
|
||||
|
||||
using communication::raft::LogEntry;
|
||||
using communication::raft::SimpleFileStorage;
|
||||
using communication::raft::test_utils::IntState;
|
||||
|
||||
TEST(SimpleFileStorageTest, All) {
|
||||
typedef LogEntry<IntState> Log;
|
||||
auto GetLog = [](int term, int d) {
|
||||
return Log{term, IntState::Change{IntState::Change::Type::SET, d}};
|
||||
};
|
||||
|
||||
{
|
||||
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().first, 0);
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
|
||||
EXPECT_EQ(storage.GetLastLogIndex(), 0);
|
||||
|
||||
storage.WriteTermAndVotedFor(1, "a");
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().first, 1);
|
||||
EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a");
|
||||
|
||||
storage.AppendLogEntry(GetLog(1, 1));
|
||||
storage.AppendLogEntry(GetLog(1, 2));
|
||||
|
||||
EXPECT_EQ(storage.GetLastLogIndex(), 2);
|
||||
|
||||
EXPECT_EQ(storage.GetLogSuffix(1),
|
||||
std::vector<Log>({GetLog(1, 1), GetLog(1, 2)}));
|
||||
}
|
||||
|
||||
{
|
||||
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
|
||||
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().first, 1);
|
||||
EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a");
|
||||
EXPECT_EQ(storage.GetLastLogIndex(), 2);
|
||||
EXPECT_EQ(storage.GetLogSuffix(1),
|
||||
std::vector<Log>({GetLog(1, 1), GetLog(1, 2)}));
|
||||
|
||||
storage.TruncateLogSuffix(2);
|
||||
EXPECT_EQ(storage.GetLogSuffix(1), std::vector<Log>({GetLog(1, 1)}));
|
||||
|
||||
storage.WriteTermAndVotedFor(2, std::experimental::nullopt);
|
||||
storage.AppendLogEntry(GetLog(2, 3));
|
||||
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().first, 2);
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
|
||||
EXPECT_EQ(storage.GetLogSuffix(1),
|
||||
std::vector<Log>({GetLog(1, 1), GetLog(2, 3)}));
|
||||
}
|
||||
|
||||
{
|
||||
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
|
||||
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().first, 2);
|
||||
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
|
||||
EXPECT_EQ(storage.GetLogSuffix(1),
|
||||
std::vector<Log>({GetLog(1, 1), GetLog(2, 3)}));
|
||||
}
|
||||
|
||||
fs::remove("raft_storage_test_dir/metadata");
|
||||
fs::remove("raft_storage_test_dir/1");
|
||||
fs::remove("raft_storage_test_dir/2");
|
||||
fs::remove("raft_storage_test_dir");
|
||||
}
|
Loading…
Reference in New Issue
Block a user