diff --git a/src/raft/log_entry.lcp b/src/raft/log_entry.lcp index eb3936d1f..0aaabac02 100644 --- a/src/raft/log_entry.lcp +++ b/src/raft/log_entry.lcp @@ -15,6 +15,10 @@ cpp<# (lcp:define-struct log-entry () ((term :uint64_t) (deltas "std::vector" :capnp-type "List(Database.StateDelta)")) + (:public #>cpp + LogEntry() = default; + LogEntry(uint64_t _term, std::vector _deltas): term(_term), deltas(_deltas) {} + cpp<#) (:serialize (:slk) (:capnp))) (lcp:pop-namespace) ;; raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index f3b89d37e..7aac0a5d1 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -31,19 +31,21 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, rlog_(std::make_unique()), mode_(Mode::FOLLOWER), server_id_(server_id), + commit_index_(0), + last_applied_(0), disk_storage_(fs::path(durability_dir) / kRaftDir), reset_callback_(reset_callback), no_op_create_callback_(no_op_create_callback) {} void RaftServer::Start() { - // Persistent storage initialization/recovery. + // Persistent storage initialization if (Log().empty()) { UpdateTerm(0); - } else { - Recover(); + LogEntry empty_log_entry(0, {}); + AppendLogEntries(0, 0, {empty_log_entry}); } - // Peer state + // Peer state initialization int cluster_size = coordination_->WorkerCount() + 1; next_index_.resize(cluster_size); match_index_.resize(cluster_size); @@ -61,7 +63,7 @@ void RaftServer::Start() { // "If a server recieves a request with a stale term, // it rejects the request" uint64_t current_term = CurrentTerm(); - if (req.term < current_term) { + if (exiting_ || req.term < current_term) { RequestVoteRes res(false, current_term); Save(res, res_builder); return; @@ -83,11 +85,12 @@ void RaftServer::Start() { // up-to-date than that of the candidate" std::experimental::optional voted_for = VotedFor(); auto last_entry_data = LastEntryData(); - RequestVoteRes res( + bool grant_vote = (!voted_for || voted_for.value() == req.candidate_id) && - AtLeastUpToDate(req.last_log_index, req.last_log_term, - last_entry_data.first, last_entry_data.second), - current_term); + AtLeastUpToDate(req.last_log_index, req.last_log_term, + last_entry_data.first, last_entry_data.second); + RequestVoteRes res(grant_vote, current_term); + if (grant_vote) SetNextElectionTimePoint(); Save(res, res_builder); }); @@ -115,8 +118,16 @@ void RaftServer::Start() { if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER); } + // [Raft paper 5.3] + // "Once a follower learns that a log entry is committed, it applies + // the entry to its state machine (in log order) + while (req.leader_commit > last_applied_ && + last_applied_ + 1 < Log().size()) { + ++last_applied_; + delta_applier_->Apply(Log()[last_applied_].deltas); + } + // respond positively to a heartbeat. - // TODO(ipaljak) review this when implementing log replication. if (req.entries.empty()) { AppendEntriesRes res(true, current_term); Save(res, res_builder); @@ -129,11 +140,9 @@ void RaftServer::Start() { return; } - throw utils::NotYetImplemented("AppendEntriesRpc which is not a heartbeat"); - // [Raft paper 5.3] // "If a follower's log is inconsistent with the leader's, the - // consistency check will fail in the next AppendEntries RPC." + // consistency check will fail in the AppendEntries RPC." // // Consistency checking assures the Log Matching Property: // - If two entries in different logs have the same index and @@ -141,24 +150,20 @@ void RaftServer::Start() { // - If two entries in different logs have the same index and term, // then the logs are identical in all preceding entries. auto log = Log(); - if (log.size() < req.prev_log_index || - log[req.prev_log_index - 1].term != req.prev_log_term) { + if (log.size() <= req.prev_log_index || + log[req.prev_log_index].term != req.prev_log_term) { AppendEntriesRes res(false, current_term); Save(res, res_builder); return; } - // If existing entry conflicts with new one, we need to delete the - // existing entry and all that follow it. - if (log.size() > req.prev_log_index && - log[req.prev_log_index].term != req.entries[0].term) - DeleteLogSuffix(req.prev_log_index + 1); - - AppendLogEntries(req.leader_commit, req.entries); + AppendLogEntries(req.leader_commit, req.prev_log_index + 1, req.entries); AppendEntriesRes res(true, current_term); Save(res, res_builder); }); + // start threads + SetNextElectionTimePoint(); election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this); @@ -166,6 +171,8 @@ void RaftServer::Start() { if (peer_id == server_id_) continue; peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); } + + no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this); } void RaftServer::Shutdown() { @@ -175,6 +182,7 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); + leader_changed_.notify_all(); } for (auto &peer_thread : peer_threads_) { @@ -182,6 +190,7 @@ void RaftServer::Shutdown() { } if (election_thread_.joinable()) election_thread_.join(); + if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join(); } uint64_t RaftServer::CurrentTerm() { @@ -208,9 +217,26 @@ std::vector RaftServer::Log() { } void RaftServer::AppendToLog(const tx::TransactionId &tx_id, - const std::vector &log) { + const std::vector &deltas) { + std::unique_lock lock(lock_); + DCHECK(mode_ == Mode::LEADER) + << "`AppendToLog` should only be called in LEADER mode"; + if (deltas.size() == 2) { + DCHECK(deltas[0].type == database::StateDelta::Type::TRANSACTION_BEGIN && + deltas[1].type == database::StateDelta::Type::TRANSACTION_COMMIT) + << "Transactions with two state deltas must be reads (start with BEGIN " + "and end with COMMIT)"; + rlog_->set_replicated(tx_id); + return; + } + auto log = Log(); + DCHECK(last_applied_ == log.size() - 1) << "Everything from the leaders log " + "should be applied into our state " + "machine"; rlog_->set_active(tx_id); - throw utils::NotYetImplemented("RaftServer replication"); + log.emplace_back(CurrentTerm(), deltas); + ++last_applied_; + disk_storage_.Put(kLogKey, SerializeLog(log)); } void RaftServer::Emplace(const database::StateDelta &delta) { @@ -218,8 +244,6 @@ void RaftServer::Emplace(const database::StateDelta &delta) { } bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { - std::unique_lock lock(lock_); - switch (mode_) { case Mode::FOLLOWER: // When in follower mode, we will only try to apply a Raft Log when we @@ -242,25 +266,29 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { throw InvalidReplicationLogLookup(); } +void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { + rlog_->garbage_collect_older(tx_id); +} + RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) : raft_server_(raft_server) { CHECK(raft_server_) << "RaftServer can't be nullptr"; } void RaftServer::LogEntryBuffer::Enable() { - std::lock_guard guard(lock_); + std::lock_guard guard(buffer_lock_); enabled_ = true; } void RaftServer::LogEntryBuffer::Disable() { - std::lock_guard guard(lock_); + std::lock_guard guard(buffer_lock_); enabled_ = false; // Clear all existing logs from buffers. logs_.clear(); } void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { - std::lock_guard guard(lock_); + std::lock_guard guard(buffer_lock_); if (!enabled_) return; tx::TransactionId tx_id = delta.transaction_id; @@ -271,6 +299,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { std::vector log(std::move(it->second)); log.emplace_back(std::move(delta)); logs_.erase(it); + raft_server_->AppendToLog(tx_id, log); // Make sure that this wasn't a read query (contains transaction begin and // commit). @@ -279,8 +308,6 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { << "Raft log of size two doesn't start with TRANSACTION_BEGIN"; return; } - - raft_server_->AppendToLog(tx_id, log); } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { auto it = logs_.find(tx_id); CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; @@ -344,8 +371,6 @@ void RaftServer::Transition(const Mode &new_mode) { case Mode::LEADER: { LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER (Term: " << CurrentTerm() << ")"; - log_entry_buffer_.Enable(); - // Freeze election timer next_election_ = TimePoint::max(); election_change_.notify_all(); @@ -355,12 +380,25 @@ void RaftServer::Transition(const Mode &new_mode) { for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now + config_.heartbeat_interval; - mode_ = Mode::LEADER; + // [Raft paper figure 2] + // "For each server, index of the next log entry to send to that server + // is initialized to leader's last log index + 1" + for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) { + next_index_[i] = Log().size(); + match_index_[i] = 0; + } - // no_op_create_callback_ will create a new transaction that has a NO_OP - // StateDelta. This will trigger the whole procedure of replicating logs - // in our implementation of Raft. - no_op_create_callback_(); + // Raft guarantees the Leader Append-Only property [Raft paper 5.2] + // so its safe to apply everything from our log into our state machine + auto log = Log(); + for (int i = last_applied_ + 1; i < log.size(); ++i) + delta_applier_->Apply(log[i].deltas); + last_applied_ = log.size() - 1; + + mode_ = Mode::LEADER; + log_entry_buffer_.Enable(); + + leader_changed_.notify_all(); break; } } @@ -371,8 +409,130 @@ void RaftServer::UpdateTerm(uint64_t new_term) { disk_storage_.Delete(kVotedForKey); } +void RaftServer::AdvanceCommitIndex() { + DCHECK(mode_ == Mode::LEADER) + << "Commit index can only be advanced by the leader"; + + std::vector known_replication_indices; + for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) { + if (i != server_id_) + known_replication_indices.push_back(match_index_[i]); + else + known_replication_indices.push_back(Log().size() - 1); + } + + std::sort(known_replication_indices.begin(), known_replication_indices.end()); + uint64_t new_commit_index = + known_replication_indices[(coordination_->WorkerCount() - 1) / 2]; + + // This can happen because we reset `match_index` vector to 0 after a + // new leader has been elected. + if (commit_index_ >= new_commit_index) return; + + // [Raft thesis, section 3.6.2] + // "(...) Raft never commits log entries from previous terms by counting + // replicas. Only log entries from the leader's current term are committed by + // counting replicas; once an entry from the current term has been committed + // in this way, then all prior entries are committed indirectly because of the + // Log Matching Property." + if (Log()[new_commit_index].term != CurrentTerm()) { + LOG(INFO) << "Server " << server_id_ << ": cannot commit log entry from " + "previous term based on " + "replication count."; + return; + } + + LOG(INFO) << "Begin noting comimitted transactions"; + + // Note the newly committed transactions in ReplicationLog + std::set replicated_tx_ids; + auto log = Log(); + for (int i = commit_index_ + 1; i <= new_commit_index; ++i) { + for (const auto &state_delta : log[i].deltas) { + replicated_tx_ids.insert(state_delta.transaction_id); + } + } + + for (const auto &tx_id : replicated_tx_ids) + rlog_->set_replicated(tx_id); + + commit_index_ = new_commit_index; +} + void RaftServer::Recover() { - throw utils::NotYetImplemented("RaftServer recover"); + throw utils::NotYetImplemented("RaftServer Recover"); +} + +void RaftServer::SendEntries(uint16_t peer_id, + std::unique_lock &lock) { + uint64_t request_term = CurrentTerm(); + uint64_t request_prev_log_index = next_index_[peer_id] - 1; + uint64_t request_prev_log_term = Log()[next_index_[peer_id] - 1].term; + + std::vector request_entries; + if (next_index_[peer_id] <= Log().size() - 1) + GetLogSuffix(next_index_[peer_id], request_entries); + + bool unreachable_peer = false; + auto peer_future = coordination_->ExecuteOnWorker( + peer_id, [&](int worker_id, auto &client) { + try { + auto res = client.template Call( + server_id_, commit_index_, request_term, request_prev_log_index, + request_prev_log_term, request_entries); + return res; + } catch (...) { + // not being able to connect to peer means we need to retry. + // TODO(ipaljak): Consider backoff. + unreachable_peer = true; + return AppendEntriesRes(false, request_term); + } + }); + + LOG(INFO) << "Entries size: " << request_entries.size(); + + lock.unlock(); // Release lock while waiting for response. + auto reply = peer_future.get(); + lock.lock(); + + if (unreachable_peer) { + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + return; + } + + if (CurrentTerm() != request_term || exiting_) { + return; + } + + if (OutOfSync(reply.term)) { + state_changed_.notify_all(); + return; + } + + DCHECK(mode_ == Mode::LEADER) + << "Elected leader for term should never change."; + + if (reply.term != CurrentTerm()) { + LOG(INFO) << "Server " << server_id_ + << ": Ignoring stale AppendEntriesRPC reply from " << peer_id; + return; + } + + if (!reply.success) { + DCHECK(next_index_[peer_id] > 1) + << "Log replication should not fail for first log entry"; + --next_index_[peer_id]; + } else { + uint64_t new_match_index = request_prev_log_index + request_entries.size(); + DCHECK(match_index_[peer_id] <= new_match_index) + << "`match_index` should increase monotonically within a term"; + match_index_[peer_id] = new_match_index; + if (request_entries.size() > 0) AdvanceCommitIndex(); + next_index_[peer_id] = match_index_[peer_id] + 1; + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + } + + state_changed_.notify_all(); } void RaftServer::ElectionThreadMain() { @@ -389,7 +549,7 @@ void RaftServer::ElectionThreadMain() { } } -void RaftServer::PeerThreadMain(int peer_id) { +void RaftServer::PeerThreadMain(uint16_t peer_id) { std::unique_lock lock(lock_); /* This loop will either call a function that issues an RPC or wait on the @@ -436,6 +596,7 @@ void RaftServer::PeerThreadMain(int peer_id) { return RequestVoteRes(false, request_term); } }); + lock.unlock(); // Release lock while waiting for response auto reply = peer_future.get(); lock.lock(); @@ -470,21 +631,10 @@ void RaftServer::PeerThreadMain(int peer_id) { case Mode::LEADER: { if (now >= next_heartbeat_[peer_id]) { - LOG(INFO) << "Server " << server_id_ << ": Send HB to server " - << peer_id << " (Term: " << CurrentTerm() << ")"; - auto peer_future = coordination_->ExecuteOnWorker( - peer_id, [&](int worker_id, auto &client) { - auto last_entry_data = LastEntryData(); - std::vector empty_entries; - auto res = client.template Call( - server_id_, commit_index_, CurrentTerm(), - last_entry_data.first, last_entry_data.second, - empty_entries); - return res; - }); - next_heartbeat_[peer_id] = - Clock::now() + config_.heartbeat_interval; - state_changed_.notify_all(); + LOG(INFO) << "Server " << server_id_ + << ": Send AppendEntries RPC to server " << peer_id + << " (Term: " << CurrentTerm() << ")"; + SendEntries(peer_id, lock); continue; } wait_until = next_heartbeat_[peer_id]; @@ -497,6 +647,19 @@ void RaftServer::PeerThreadMain(int peer_id) { } } +void RaftServer::NoOpIssuerThreadMain() { + std::mutex m; + auto lock = std::unique_lock(m); + while (!exiting_) { + leader_changed_.wait(lock); + // no_op_create_callback_ will create a new transaction that has a NO_OP + // StateDelta. This will trigger the whole procedure of replicating logs + // in our implementation of Raft. + if (!exiting_) + no_op_create_callback_(); + } +} + void RaftServer::SetNextElectionTimePoint() { // [Raft thesis, section 3.4] // "Raft uses randomized election timeouts to ensure that split votes are @@ -555,14 +718,32 @@ bool RaftServer::OutOfSync(uint64_t reply_term) { void RaftServer::DeleteLogSuffix(int starting_index) { auto log = Log(); - log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed + log.erase(log.begin() + starting_index, log.end()); disk_storage_.Put(kLogKey, SerializeLog(log)); } +void RaftServer::GetLogSuffix(int starting_index, + std::vector &entries) { + auto log = Log(); + for (int i = starting_index; i < log.size(); ++i) entries.push_back(log[i]); +} + void RaftServer::AppendLogEntries(uint64_t leader_commit_index, + uint64_t starting_index, const std::vector &new_entries) { auto log = Log(); - for (auto &entry : new_entries) log.emplace_back(entry); + for (int i = 0; i < new_entries.size(); ++i) { + // If existing entry conflicts with new one, we need to delete the + // existing entry and all that follow it. + int current_index = i + starting_index; + if (log.size() > current_index && + log[current_index].term != new_entries[i].term) + DeleteLogSuffix(current_index); + DCHECK(log.size() >= current_index); + if (log.size() == current_index) + log.emplace_back(new_entries[i]); + } + // See Raft paper 5.3 if (leader_commit_index > commit_index_) { commit_index_ = std::min(leader_commit_index, log.size() - 1); @@ -571,25 +752,26 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index, } std::string RaftServer::SerializeLog(const std::vector &log) { - ::capnp::MallocMessageBuilder message; - auto log_builder = - message.initRoot<::capnp::List>(log.size()); - utils::SaveVector( - log, &log_builder, [](auto *log_builder, const auto &log_entry) { - Save(log_entry, log_builder); - }); - std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary); - kj::std::StdOutputStream std_stream(stream); - kj::BufferedOutputStreamWrapper buffered_stream(std_stream); - writeMessage(buffered_stream, message); + { + ::capnp::MallocMessageBuilder message; + ::capnp::List::Builder log_builder = + message.initRoot<::capnp::List>(log.size()); + utils::SaveVector( + log, &log_builder, [](auto *log_builder, const auto &log_entry) { + Save(log_entry, log_builder); + }); + + kj::std::StdOutputStream std_stream(stream); + kj::BufferedOutputStreamWrapper buffered_stream(std_stream); + writeMessage(buffered_stream, message); + } return stream.str(); } std::vector RaftServer::DeserializeLog( const std::string &serialized_log) { - if (serialized_log.empty()) return {}; ::capnp::MallocMessageBuilder message; std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary); @@ -610,8 +792,9 @@ std::vector RaftServer::DeserializeLog( return deserialized_log; } -void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { - rlog_->garbage_collect_older(tx_id); +void RaftServer::ResetReplicationLog() { + rlog_ = nullptr; + rlog_ = std::make_unique(); } } // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index b42617c3b..7f129c185 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -2,6 +2,7 @@ #pragma once +#include #include #include #include @@ -51,7 +52,7 @@ class RaftServer final : public RaftInterface { /// @param durbility_dir directory for persisted data. /// @param config raft configuration. /// @param coordination Abstraction for coordination between Raft servers. - /// @param delta_applier TODO + /// @param delta_applier Object which is able to apply state deltas to SM. /// @param reset_callback Function that is called on each Leader->Follower /// transition. RaftServer(uint16_t server_id, const std::string &durability_dir, @@ -81,10 +82,10 @@ class RaftServer final : public RaftInterface { /// persistent storage, an empty Log will be created. std::vector Log(); - /// Append the log to the list of completed logs that are ready to be + /// Append to the log a list of batched state deltasa that are ready to be /// replicated. void AppendToLog(const tx::TransactionId &tx_id, - const std::vector &log); + const std::vector &deltas); /// Emplace a single StateDelta to the corresponding batch. If the StateDelta /// marks the transaction end, it will replicate the log accorss the cluster. @@ -126,7 +127,7 @@ class RaftServer final : public RaftInterface { private: bool enabled_{false}; - mutable std::mutex lock_; + mutable std::mutex buffer_lock_; std::unordered_map> logs_; @@ -144,7 +145,7 @@ class RaftServer final : public RaftInterface { database::StateDeltaApplier *delta_applier_{nullptr}; std::unique_ptr rlog_{nullptr}; - Mode mode_; ///< Server's current mode. + std::atomic mode_; ///< Server's current mode. uint16_t server_id_; ///< ID of the current server. uint64_t commit_index_; ///< Index of the highest known committed entry. uint64_t last_applied_; ///< Index of the highest applied entry to SM. @@ -162,6 +163,13 @@ class RaftServer final : public RaftInterface { std::condition_variable state_changed_; ///< Notifies all peer threads on ///< relevant state change. + std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op + ///< command on leader change. + + std::condition_variable leader_changed_; ///< Notifies the no_op_issuer_thread + ///< that a new leader has been + ///< elected. + bool exiting_ = false; ///< True on server shutdown. ////////////////////////////////////////////////////////////////////////////// @@ -187,10 +195,10 @@ class RaftServer final : public RaftInterface { // volatile state on leaders ////////////////////////////////////////////////////////////////////////////// - std::vector next_index_; ///< for each server, index of the next + std::vector next_index_; ///< for each server, index of the next ///< log entry to send to that server. - std::vector match_index_; ///< for each server, index of the + std::vector match_index_; ///< for each server, index of the ///< highest log entry known to be ///< replicated on server. @@ -225,10 +233,21 @@ class RaftServer final : public RaftInterface { /// Updates the current term. void UpdateTerm(uint64_t new_term); + /// Tries to advance the commit index on a leader. + void AdvanceCommitIndex(); + /// Recovers from persistent storage. This function should be called from /// the constructor before the server starts with normal operation. void Recover(); + /// Sends Entries to peer. This function should only be called in leader + /// mode. + /// + /// @param peer_id ID of the peer which receives entries. + /// @param lock Lock from the peer thread (released while waiting for + /// response) + void SendEntries(uint16_t peer_id, std::unique_lock &lock); + /// Main function of the `election_thread_`. It is responsible for /// transition to CANDIDATE mode when election timeout elapses. void ElectionThreadMain(); @@ -237,7 +256,12 @@ class RaftServer final : public RaftInterface { /// specified node within the Raft cluster. /// /// @param peer_id - ID of a receiving node in the cluster. - void PeerThreadMain(int peer_id); + void PeerThreadMain(uint16_t peer_id); + + /// Issues no-op command when a new leader is elected. This is done to + /// force the Raft protocol to commit logs from previous terms that + /// have been replicated on a majority of peers. + void NoOpIssuerThreadMain(); /// Sets the `TimePoint` for next election. void SetNextElectionTimePoint(); @@ -278,14 +302,24 @@ class RaftServer final : public RaftInterface { /// 1-indexed. void DeleteLogSuffix(int starting_index); + /// Stores log entries with indexes that are greater or equal to the given + /// starting index into a provided container. If the starting index is + /// greater than the log size, nothing will be stored in the provided + /// container. + /// + /// @param starting_index Smallest index which will be stored. + /// @param entries The container which will store the wanted suffix. + void GetLogSuffix(int starting_index, std::vector &entries); + /// Appends new log entries to Raft log. Note that this function is not /// smart in any way, i.e. the caller should make sure that it's safe /// to call this function. This function also updates this server's commit /// index if necessary. /// /// @param leader_commit_index - Used to update local commit index. + /// @param starting_index - Index in the log from which we start to append. /// @param new_entries - New `LogEntry` instances to be appended in the log. - void AppendLogEntries(uint64_t leader_commit_index, + void AppendLogEntries(uint64_t leader_commit_index, uint64_t starting_index, const std::vector &new_entries); /// Serializes Raft log into `std::string`. @@ -294,9 +328,6 @@ class RaftServer final : public RaftInterface { /// Deserializes Raft log from `std::string`. std::vector DeserializeLog(const std::string &serialized_log); - void ResetReplicationLog() { - rlog_ = nullptr; - rlog_ = std::make_unique(); - } + void ResetReplicationLog(); }; } // namespace raft