From 5b7cb4602c8e0e018cd31b647bca1f64de73b714 Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Wed, 13 Dec 2017 18:15:57 +0100 Subject: [PATCH] Add simple client facing API for RaftMember Summary: Implement AddComand method on RaftMember Move RPCType out of rpc request and reply Add unit test for AddCommand Reviewers: mislav.bradac, buda Reviewed By: mislav.bradac Subscribers: pullbot, mculinovic Differential Revision: https://phabricator.memgraph.io/D1042 --- src/communication/raft/raft-inl.hpp | 53 ++++++++++++++++++-- src/communication/raft/raft.hpp | 30 +++++++----- src/communication/raft/raft_reactor.hpp | 8 +-- src/communication/raft/test_utils.hpp | 13 +++++ tests/unit/raft.cpp | 65 ++++++++++++++++++++----- 5 files changed, 136 insertions(+), 33 deletions(-) diff --git a/src/communication/raft/raft-inl.hpp b/src/communication/raft/raft-inl.hpp index 9ab55e278..62e88b649 100644 --- a/src/communication/raft/raft-inl.hpp +++ b/src/communication/raft/raft-inl.hpp @@ -197,8 +197,7 @@ bool RaftMemberImpl::SendRPC(const std::string &recipient, return false; } - DCHECK(reply.Term() >= term_) - << "Response term should be >= request term"; + DCHECK(reply.Term() >= term_) << "Response term should be >= request term"; /* [Raft thesis, Section 3.3] * "Current terms are exchanged whenever servers communicate; if one server's @@ -259,6 +258,12 @@ void RaftMemberImpl::StartNewElection() { */ peer_state->next_heartbeat_time = TimePoint::min(); } + + // We already have the majority if we're in a single node cluster. + if (CountVotes()) { + LogInfo("Elected as leader."); + CandidateTransitionToLeader(); + } /* Notify peer threads to start issuing RequestVote RPCs. */ state_changed_.notify_all(); @@ -285,7 +290,7 @@ void RaftMemberImpl::RequestVote(const std::string &peer_id, LogInfo("Requesting vote from {}", peer_id); PeerRPCRequest request; - request.type = PeerRPCRequest::Type::REQUEST_VOTE; + request.type = RPCType::REQUEST_VOTE; request.request_vote.candidate_term = term_; request.request_vote.candidate_id = id_; @@ -358,7 +363,7 @@ void RaftMemberImpl::AppendEntries(const std::string &peer_id, LogInfo("Appending entries to {}", peer_id); PeerRPCRequest request; - request.type = PeerRPCRequest::Type::APPEND_ENTRIES; + request.type = RPCType::APPEND_ENTRIES; request.append_entries.leader_term = term_; request.append_entries.leader_id = id_; @@ -603,6 +608,40 @@ PeerRPCReply::AppendEntries RaftMemberImpl::OnAppendEntries( return reply; } +template +ClientResult RaftMemberImpl::AddCommand( + const typename State::Change &command, bool blocking) { + std::unique_lock lock(mutex_); + if (mode_ != RaftMode::LEADER) { + return ClientResult::NOT_LEADER; + } + + LogEntry entry; + entry.term = term_; + entry.command = command; + storage_.AppendLogEntry(entry); + + // Entry is already replicated if this is a single node cluster. + AdvanceCommitIndex(); + + state_changed_.notify_all(); + + if (!blocking) { + return ClientResult::OK; + } + + LogIndex index = storage_.GetLastLogIndex(); + + while (!exiting_ && term_ == entry.term) { + if (commit_index_ >= index) { + return ClientResult::OK; + } + state_changed_.wait(lock); + } + + return ClientResult::NOT_LEADER; +} + } // namespace impl template @@ -643,4 +682,10 @@ PeerRPCReply::AppendEntries RaftMember::OnAppendEntries( return impl_.OnAppendEntries(request); } +template +ClientResult RaftMember::AddCommand( + const typename State::Change &command, bool blocking) { + return impl_.AddCommand(command, blocking); +} + } // namespace communication::raft diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp index 063965341..08f25a261 100644 --- a/src/communication/raft/raft.hpp +++ b/src/communication/raft/raft.hpp @@ -12,6 +12,8 @@ namespace communication::raft { +enum class ClientResult { NOT_LEADER, OK }; + using Clock = std::chrono::system_clock; using TimePoint = std::chrono::system_clock::time_point; @@ -36,11 +38,11 @@ struct LogEntry { }; /* Raft RPC requests and replies as described in [Raft thesis, Figure 3.1]. */ +enum class RPCType { REQUEST_VOTE, APPEND_ENTRIES }; + template struct PeerRPCRequest { - enum class Type { REQUEST_VOTE, APPEND_ENTRIES }; - - Type type; + RPCType type; struct RequestVote { TermId candidate_term; @@ -60,18 +62,16 @@ struct PeerRPCRequest { TermId Term() const { switch (type) { - case Type::REQUEST_VOTE: + case RPCType::REQUEST_VOTE: return request_vote.candidate_term; - case Type::APPEND_ENTRIES: + case RPCType::APPEND_ENTRIES: return append_entries.leader_term; } } }; struct PeerRPCReply { - enum class Type { REQUEST_VOTE, APPEND_ENTRIES }; - - Type type; + RPCType type; struct RequestVote { TermId term; @@ -85,9 +85,9 @@ struct PeerRPCReply { TermId Term() const { switch (type) { - case Type::REQUEST_VOTE: + case RPCType::REQUEST_VOTE: return request_vote.term; - case Type::APPEND_ENTRIES: + case RPCType::APPEND_ENTRIES: return append_entries.term; } } @@ -148,8 +148,8 @@ template class RaftMemberImpl { public: explicit RaftMemberImpl(RaftNetworkInterface &network, - RaftStorageInterface &storage, - const MemberId &id, const RaftConfig &config); + RaftStorageInterface &storage, + const MemberId &id, const RaftConfig &config); ~RaftMemberImpl(); @@ -183,6 +183,8 @@ class RaftMemberImpl { PeerRPCReply::AppendEntries OnAppendEntries( const typename PeerRPCRequest::AppendEntries &request); + ClientResult AddCommand(const typename State::Change &command, bool blocking); + template void LogInfo(const std::string &, Args &&...); @@ -221,7 +223,7 @@ class RaftMemberImpl { std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}()); }; -} // namespace internal +} // namespace internal template class RaftMember final { @@ -243,6 +245,8 @@ class RaftMember final { PeerRPCReply::AppendEntries OnAppendEntries( const typename PeerRPCRequest::AppendEntries &request); + ClientResult AddCommand(const typename State::Change &command, bool blocking); + private: impl::RaftMemberImpl impl_; diff --git a/src/communication/raft/raft_reactor.hpp b/src/communication/raft/raft_reactor.hpp index 6dbf9630d..4f3311d46 100644 --- a/src/communication/raft/raft_reactor.hpp +++ b/src/communication/raft/raft_reactor.hpp @@ -117,11 +117,11 @@ class RaftMemberLocalReactor { PeerRPCReply OnRPC(const PeerRPCRequest &request) { PeerRPCReply reply; - if (request.type == PeerRPCRequest::Type::REQUEST_VOTE) { - reply.type = PeerRPCReply::Type::REQUEST_VOTE; + if (request.type == RPCType::REQUEST_VOTE) { + reply.type = RPCType::REQUEST_VOTE; reply.request_vote = member_.OnRequestVote(request.request_vote); - } else if (request.type == PeerRPCRequest::Type::APPEND_ENTRIES) { - reply.type = PeerRPCReply::Type::APPEND_ENTRIES; + } else if (request.type == RPCType::APPEND_ENTRIES) { + reply.type = RPCType::APPEND_ENTRIES; reply.append_entries = member_.OnAppendEntries(request.append_entries); } else { LOG(FATAL) << "Unknown Raft RPC request type"; diff --git a/src/communication/raft/test_utils.hpp b/src/communication/raft/test_utils.hpp index 3ce575287..8f2ada0f5 100644 --- a/src/communication/raft/test_utils.hpp +++ b/src/communication/raft/test_utils.hpp @@ -13,6 +13,19 @@ struct DummyState { struct Result {}; }; +struct IntState { + int x; + + struct Change { + enum Type { ADD, SUB, SET }; + Type t; + int d; + + bool operator==(const Change &rhs) { return t == rhs.t && d == rhs.d; } + bool operator!=(const Change &rhs) { return !(*this == rhs); }; + }; +}; + template class NoOpNetworkInterface : public RaftNetworkInterface { public: diff --git a/tests/unit/raft.cpp b/tests/unit/raft.cpp index 6f15df7e9..52f4a86a6 100644 --- a/tests/unit/raft.cpp +++ b/tests/unit/raft.cpp @@ -13,6 +13,7 @@ using namespace communication::raft::test_utils; using testing::Values; +const RaftConfig test_config1{{"a"}, 150ms, 300ms, 70ms}; const RaftConfig test_config2{{"a", "b"}, 150ms, 300ms, 70ms}; const RaftConfig test_config3{{"a", "b", "c"}, 150ms, 300ms, 70ms}; const RaftConfig test_config5{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms}; @@ -20,9 +21,9 @@ const RaftConfig test_config5{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms}; using communication::raft::impl::RaftMemberImpl; using communication::raft::impl::RaftMode; -class RaftMemberTest : public ::testing::Test { +class RaftMemberImplTest : public ::testing::Test { public: - RaftMemberTest() + RaftMemberImplTest() : storage_(1, "a", {}), member(network_, storage_, "a", test_config5) {} void SetLog(std::vector> log) { @@ -34,14 +35,14 @@ class RaftMemberTest : public ::testing::Test { RaftMemberImpl member; }; -TEST_F(RaftMemberTest, Constructor) { +TEST_F(RaftMemberImplTest, Constructor) { EXPECT_EQ(member.mode_, RaftMode::FOLLOWER); EXPECT_EQ(member.term_, 1); EXPECT_EQ(*member.voted_for_, "a"); EXPECT_EQ(member.commit_index_, 0); } -TEST_F(RaftMemberTest, CandidateOrLeaderTransitionToFollower) { +TEST_F(RaftMemberImplTest, CandidateOrLeaderTransitionToFollower) { member.mode_ = RaftMode::CANDIDATE; member.CandidateTransitionToLeader(); @@ -51,7 +52,7 @@ TEST_F(RaftMemberTest, CandidateOrLeaderTransitionToFollower) { EXPECT_LT(member.next_election_time_, TimePoint::max()); } -TEST_F(RaftMemberTest, CandidateTransitionToLeader) { +TEST_F(RaftMemberImplTest, CandidateTransitionToLeader) { member.mode_ = RaftMode::CANDIDATE; member.CandidateTransitionToLeader(); @@ -60,7 +61,7 @@ TEST_F(RaftMemberTest, CandidateTransitionToLeader) { EXPECT_EQ(member.next_election_time_, TimePoint::max()); } -TEST_F(RaftMemberTest, StartNewElection) { +TEST_F(RaftMemberImplTest, StartNewElection) { member.StartNewElection(); EXPECT_EQ(member.mode_, RaftMode::CANDIDATE); @@ -68,7 +69,7 @@ TEST_F(RaftMemberTest, StartNewElection) { EXPECT_EQ(member.voted_for_, member.id_); } -TEST_F(RaftMemberTest, CountVotes) { +TEST_F(RaftMemberImplTest, CountVotes) { member.StartNewElection(); EXPECT_FALSE(member.CountVotes()); @@ -79,7 +80,7 @@ TEST_F(RaftMemberTest, CountVotes) { EXPECT_TRUE(member.CountVotes()); } -TEST_F(RaftMemberTest, AdvanceCommitIndex) { +TEST_F(RaftMemberImplTest, AdvanceCommitIndex) { SetLog({{1}, {1}, {1}, {1}, {2}, {2}, {2}, {2}}); member.mode_ = RaftMode::LEADER; @@ -129,10 +130,10 @@ TEST(RequestVote, SimpleElection) { std::unique_lock lock(member.mutex_); PeerRPCReply next_reply; - next_reply.type = PeerRPCReply::Type::REQUEST_VOTE; + next_reply.type = RPCType::REQUEST_VOTE; network.on_request_ = [](const PeerRPCRequest &request) { - ASSERT_EQ(request.type, PeerRPCRequest::Type::REQUEST_VOTE); + ASSERT_EQ(request.type, RPCType::REQUEST_VOTE); ASSERT_EQ(request.request_vote.candidate_term, 2); ASSERT_EQ(request.request_vote.candidate_id, "a"); }; @@ -177,7 +178,7 @@ TEST(AppendEntries, SimpleLogSync) { std::unique_lock lock(member.mutex_); PeerRPCReply reply; - reply.type = PeerRPCReply::Type::APPEND_ENTRIES; + reply.type = RPCType::APPEND_ENTRIES; reply.append_entries.term = 3; reply.append_entries.success = false; @@ -188,7 +189,7 @@ TEST(AppendEntries, SimpleLogSync) { std::vector> expected_entries; network.on_request_ = [&](const PeerRPCRequest &request) { - EXPECT_EQ(request.type, PeerRPCRequest::Type::APPEND_ENTRIES); + EXPECT_EQ(request.type, RPCType::APPEND_ENTRIES); EXPECT_EQ(request.append_entries.leader_term, 3); EXPECT_EQ(request.append_entries.leader_id, "a"); EXPECT_EQ(request.append_entries.prev_log_index, expected_prev_log_index); @@ -600,3 +601,43 @@ INSTANTIATE_TEST_CASE_P( 1, true, {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}})); + +TEST(RaftMemberTest, AddCommand) { + NextReplyNetworkInterface network; + + std::vector changes = {{IntState::Change::Type::ADD, 5}, + {IntState::Change::Type::ADD, 10}}; + + network.on_request_ = [&network, num_calls = 0 ]( + const PeerRPCRequest &request) mutable { + ++num_calls; + PeerRPCReply reply; + + if (num_calls == 1) { + reply.type = RPCType::REQUEST_VOTE; + reply.request_vote.term = 1; + reply.request_vote.vote_granted = true; + } else { + reply.type = RPCType::APPEND_ENTRIES; + reply.append_entries.term = 1; + reply.append_entries.success = true; + } + + network.next_reply_ = reply; + }; + + InMemoryStorageInterface storage(0, {}, {}); + RaftMember member(network, storage, "a", test_config2); + + std::this_thread::sleep_for(500ms); + + member.AddCommand(changes[0], false); + member.AddCommand(changes[1], true); + + ASSERT_EQ(storage.log_.size(), 3); + EXPECT_EQ(storage.log_[0].command, std::experimental::nullopt); + EXPECT_TRUE(storage.log_[1].command && + *storage.log_[1].command == changes[0]); + EXPECT_TRUE(storage.log_[2].command && + *storage.log_[2].command == changes[1]); +}