Implement log replication in Raft

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1782
This commit is contained in:
Ivan Paljak 2019-01-04 15:27:41 +01:00
parent 363cdb8b88
commit cc3192cef7
3 changed files with 302 additions and 84 deletions

View File

@ -15,6 +15,10 @@ cpp<#
(lcp:define-struct log-entry () (lcp:define-struct log-entry ()
((term :uint64_t) ((term :uint64_t)
(deltas "std::vector<database::StateDelta>" :capnp-type "List(Database.StateDelta)")) (deltas "std::vector<database::StateDelta>" :capnp-type "List(Database.StateDelta)"))
(:public #>cpp
LogEntry() = default;
LogEntry(uint64_t _term, std::vector<database::StateDelta> _deltas): term(_term), deltas(_deltas) {}
cpp<#)
(:serialize (:slk) (:capnp))) (:serialize (:slk) (:capnp)))
(lcp:pop-namespace) ;; raft (lcp:pop-namespace) ;; raft

View File

@ -31,19 +31,21 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
rlog_(std::make_unique<ReplicationLog>()), rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER), mode_(Mode::FOLLOWER),
server_id_(server_id), server_id_(server_id),
commit_index_(0),
last_applied_(0),
disk_storage_(fs::path(durability_dir) / kRaftDir), disk_storage_(fs::path(durability_dir) / kRaftDir),
reset_callback_(reset_callback), reset_callback_(reset_callback),
no_op_create_callback_(no_op_create_callback) {} no_op_create_callback_(no_op_create_callback) {}
void RaftServer::Start() { void RaftServer::Start() {
// Persistent storage initialization/recovery. // Persistent storage initialization
if (Log().empty()) { if (Log().empty()) {
UpdateTerm(0); UpdateTerm(0);
} else { LogEntry empty_log_entry(0, {});
Recover(); AppendLogEntries(0, 0, {empty_log_entry});
} }
// Peer state // Peer state initialization
int cluster_size = coordination_->WorkerCount() + 1; int cluster_size = coordination_->WorkerCount() + 1;
next_index_.resize(cluster_size); next_index_.resize(cluster_size);
match_index_.resize(cluster_size); match_index_.resize(cluster_size);
@ -61,7 +63,7 @@ void RaftServer::Start() {
// "If a server recieves a request with a stale term, // "If a server recieves a request with a stale term,
// it rejects the request" // it rejects the request"
uint64_t current_term = CurrentTerm(); uint64_t current_term = CurrentTerm();
if (req.term < current_term) { if (exiting_ || req.term < current_term) {
RequestVoteRes res(false, current_term); RequestVoteRes res(false, current_term);
Save(res, res_builder); Save(res, res_builder);
return; return;
@ -83,11 +85,12 @@ void RaftServer::Start() {
// up-to-date than that of the candidate" // up-to-date than that of the candidate"
std::experimental::optional<uint16_t> voted_for = VotedFor(); std::experimental::optional<uint16_t> voted_for = VotedFor();
auto last_entry_data = LastEntryData(); auto last_entry_data = LastEntryData();
RequestVoteRes res( bool grant_vote =
(!voted_for || voted_for.value() == req.candidate_id) && (!voted_for || voted_for.value() == req.candidate_id) &&
AtLeastUpToDate(req.last_log_index, req.last_log_term, AtLeastUpToDate(req.last_log_index, req.last_log_term,
last_entry_data.first, last_entry_data.second), last_entry_data.first, last_entry_data.second);
current_term); RequestVoteRes res(grant_vote, current_term);
if (grant_vote) SetNextElectionTimePoint();
Save(res, res_builder); Save(res, res_builder);
}); });
@ -115,8 +118,16 @@ void RaftServer::Start() {
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER); if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
} }
// [Raft paper 5.3]
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ &&
last_applied_ + 1 < Log().size()) {
++last_applied_;
delta_applier_->Apply(Log()[last_applied_].deltas);
}
// respond positively to a heartbeat. // respond positively to a heartbeat.
// TODO(ipaljak) review this when implementing log replication.
if (req.entries.empty()) { if (req.entries.empty()) {
AppendEntriesRes res(true, current_term); AppendEntriesRes res(true, current_term);
Save(res, res_builder); Save(res, res_builder);
@ -129,11 +140,9 @@ void RaftServer::Start() {
return; return;
} }
throw utils::NotYetImplemented("AppendEntriesRpc which is not a heartbeat");
// [Raft paper 5.3] // [Raft paper 5.3]
// "If a follower's log is inconsistent with the leader's, the // "If a follower's log is inconsistent with the leader's, the
// consistency check will fail in the next AppendEntries RPC." // consistency check will fail in the AppendEntries RPC."
// //
// Consistency checking assures the Log Matching Property: // Consistency checking assures the Log Matching Property:
// - If two entries in different logs have the same index and // - If two entries in different logs have the same index and
@ -141,24 +150,20 @@ void RaftServer::Start() {
// - If two entries in different logs have the same index and term, // - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries. // then the logs are identical in all preceding entries.
auto log = Log(); auto log = Log();
if (log.size() < req.prev_log_index || if (log.size() <= req.prev_log_index ||
log[req.prev_log_index - 1].term != req.prev_log_term) { log[req.prev_log_index].term != req.prev_log_term) {
AppendEntriesRes res(false, current_term); AppendEntriesRes res(false, current_term);
Save(res, res_builder); Save(res, res_builder);
return; return;
} }
// If existing entry conflicts with new one, we need to delete the AppendLogEntries(req.leader_commit, req.prev_log_index + 1, req.entries);
// existing entry and all that follow it.
if (log.size() > req.prev_log_index &&
log[req.prev_log_index].term != req.entries[0].term)
DeleteLogSuffix(req.prev_log_index + 1);
AppendLogEntries(req.leader_commit, req.entries);
AppendEntriesRes res(true, current_term); AppendEntriesRes res(true, current_term);
Save(res, res_builder); Save(res, res_builder);
}); });
// start threads
SetNextElectionTimePoint(); SetNextElectionTimePoint();
election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this); election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this);
@ -166,6 +171,8 @@ void RaftServer::Start() {
if (peer_id == server_id_) continue; if (peer_id == server_id_) continue;
peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id);
} }
no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this);
} }
void RaftServer::Shutdown() { void RaftServer::Shutdown() {
@ -175,6 +182,7 @@ void RaftServer::Shutdown() {
state_changed_.notify_all(); state_changed_.notify_all();
election_change_.notify_all(); election_change_.notify_all();
leader_changed_.notify_all();
} }
for (auto &peer_thread : peer_threads_) { for (auto &peer_thread : peer_threads_) {
@ -182,6 +190,7 @@ void RaftServer::Shutdown() {
} }
if (election_thread_.joinable()) election_thread_.join(); if (election_thread_.joinable()) election_thread_.join();
if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join();
} }
uint64_t RaftServer::CurrentTerm() { uint64_t RaftServer::CurrentTerm() {
@ -208,9 +217,26 @@ std::vector<LogEntry> RaftServer::Log() {
} }
void RaftServer::AppendToLog(const tx::TransactionId &tx_id, void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &log) { const std::vector<database::StateDelta> &deltas) {
std::unique_lock<std::mutex> lock(lock_);
DCHECK(mode_ == Mode::LEADER)
<< "`AppendToLog` should only be called in LEADER mode";
if (deltas.size() == 2) {
DCHECK(deltas[0].type == database::StateDelta::Type::TRANSACTION_BEGIN &&
deltas[1].type == database::StateDelta::Type::TRANSACTION_COMMIT)
<< "Transactions with two state deltas must be reads (start with BEGIN "
"and end with COMMIT)";
rlog_->set_replicated(tx_id);
return;
}
auto log = Log();
DCHECK(last_applied_ == log.size() - 1) << "Everything from the leaders log "
"should be applied into our state "
"machine";
rlog_->set_active(tx_id); rlog_->set_active(tx_id);
throw utils::NotYetImplemented("RaftServer replication"); log.emplace_back(CurrentTerm(), deltas);
++last_applied_;
disk_storage_.Put(kLogKey, SerializeLog(log));
} }
void RaftServer::Emplace(const database::StateDelta &delta) { void RaftServer::Emplace(const database::StateDelta &delta) {
@ -218,8 +244,6 @@ void RaftServer::Emplace(const database::StateDelta &delta) {
} }
bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
std::unique_lock<std::mutex> lock(lock_);
switch (mode_) { switch (mode_) {
case Mode::FOLLOWER: case Mode::FOLLOWER:
// When in follower mode, we will only try to apply a Raft Log when we // When in follower mode, we will only try to apply a Raft Log when we
@ -242,25 +266,29 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
throw InvalidReplicationLogLookup(); throw InvalidReplicationLogLookup();
} }
void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
rlog_->garbage_collect_older(tx_id);
}
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) { : raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr"; CHECK(raft_server_) << "RaftServer can't be nullptr";
} }
void RaftServer::LogEntryBuffer::Enable() { void RaftServer::LogEntryBuffer::Enable() {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(buffer_lock_);
enabled_ = true; enabled_ = true;
} }
void RaftServer::LogEntryBuffer::Disable() { void RaftServer::LogEntryBuffer::Disable() {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(buffer_lock_);
enabled_ = false; enabled_ = false;
// Clear all existing logs from buffers. // Clear all existing logs from buffers.
logs_.clear(); logs_.clear();
} }
void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(buffer_lock_);
if (!enabled_) return; if (!enabled_) return;
tx::TransactionId tx_id = delta.transaction_id; tx::TransactionId tx_id = delta.transaction_id;
@ -271,6 +299,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::vector<database::StateDelta> log(std::move(it->second)); std::vector<database::StateDelta> log(std::move(it->second));
log.emplace_back(std::move(delta)); log.emplace_back(std::move(delta));
logs_.erase(it); logs_.erase(it);
raft_server_->AppendToLog(tx_id, log);
// Make sure that this wasn't a read query (contains transaction begin and // Make sure that this wasn't a read query (contains transaction begin and
// commit). // commit).
@ -279,8 +308,6 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
<< "Raft log of size two doesn't start with TRANSACTION_BEGIN"; << "Raft log of size two doesn't start with TRANSACTION_BEGIN";
return; return;
} }
raft_server_->AppendToLog(tx_id, log);
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id); auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -344,8 +371,6 @@ void RaftServer::Transition(const Mode &new_mode) {
case Mode::LEADER: { case Mode::LEADER: {
LOG(INFO) << "Server " << server_id_ LOG(INFO) << "Server " << server_id_
<< ": Transition to LEADER (Term: " << CurrentTerm() << ")"; << ": Transition to LEADER (Term: " << CurrentTerm() << ")";
log_entry_buffer_.Enable();
// Freeze election timer // Freeze election timer
next_election_ = TimePoint::max(); next_election_ = TimePoint::max();
election_change_.notify_all(); election_change_.notify_all();
@ -355,12 +380,25 @@ void RaftServer::Transition(const Mode &new_mode) {
for (auto &peer_heartbeat : next_heartbeat_) for (auto &peer_heartbeat : next_heartbeat_)
peer_heartbeat = now + config_.heartbeat_interval; peer_heartbeat = now + config_.heartbeat_interval;
mode_ = Mode::LEADER; // [Raft paper figure 2]
// "For each server, index of the next log entry to send to that server
// is initialized to leader's last log index + 1"
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
next_index_[i] = Log().size();
match_index_[i] = 0;
}
// no_op_create_callback_ will create a new transaction that has a NO_OP // Raft guarantees the Leader Append-Only property [Raft paper 5.2]
// StateDelta. This will trigger the whole procedure of replicating logs // so its safe to apply everything from our log into our state machine
// in our implementation of Raft. auto log = Log();
no_op_create_callback_(); for (int i = last_applied_ + 1; i < log.size(); ++i)
delta_applier_->Apply(log[i].deltas);
last_applied_ = log.size() - 1;
mode_ = Mode::LEADER;
log_entry_buffer_.Enable();
leader_changed_.notify_all();
break; break;
} }
} }
@ -371,8 +409,130 @@ void RaftServer::UpdateTerm(uint64_t new_term) {
disk_storage_.Delete(kVotedForKey); disk_storage_.Delete(kVotedForKey);
} }
void RaftServer::AdvanceCommitIndex() {
DCHECK(mode_ == Mode::LEADER)
<< "Commit index can only be advanced by the leader";
std::vector<uint64_t> known_replication_indices;
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
if (i != server_id_)
known_replication_indices.push_back(match_index_[i]);
else
known_replication_indices.push_back(Log().size() - 1);
}
std::sort(known_replication_indices.begin(), known_replication_indices.end());
uint64_t new_commit_index =
known_replication_indices[(coordination_->WorkerCount() - 1) / 2];
// This can happen because we reset `match_index` vector to 0 after a
// new leader has been elected.
if (commit_index_ >= new_commit_index) return;
// [Raft thesis, section 3.6.2]
// "(...) Raft never commits log entries from previous terms by counting
// replicas. Only log entries from the leader's current term are committed by
// counting replicas; once an entry from the current term has been committed
// in this way, then all prior entries are committed indirectly because of the
// Log Matching Property."
if (Log()[new_commit_index].term != CurrentTerm()) {
LOG(INFO) << "Server " << server_id_ << ": cannot commit log entry from "
"previous term based on "
"replication count.";
return;
}
LOG(INFO) << "Begin noting comimitted transactions";
// Note the newly committed transactions in ReplicationLog
std::set<tx::TransactionId> replicated_tx_ids;
auto log = Log();
for (int i = commit_index_ + 1; i <= new_commit_index; ++i) {
for (const auto &state_delta : log[i].deltas) {
replicated_tx_ids.insert(state_delta.transaction_id);
}
}
for (const auto &tx_id : replicated_tx_ids)
rlog_->set_replicated(tx_id);
commit_index_ = new_commit_index;
}
void RaftServer::Recover() { void RaftServer::Recover() {
throw utils::NotYetImplemented("RaftServer recover"); throw utils::NotYetImplemented("RaftServer Recover");
}
void RaftServer::SendEntries(uint16_t peer_id,
std::unique_lock<std::mutex> &lock) {
uint64_t request_term = CurrentTerm();
uint64_t request_prev_log_index = next_index_[peer_id] - 1;
uint64_t request_prev_log_term = Log()[next_index_[peer_id] - 1].term;
std::vector<raft::LogEntry> request_entries;
if (next_index_[peer_id] <= Log().size() - 1)
GetLogSuffix(next_index_[peer_id], request_entries);
bool unreachable_peer = false;
auto peer_future = coordination_->ExecuteOnWorker<AppendEntriesRes>(
peer_id, [&](int worker_id, auto &client) {
try {
auto res = client.template Call<AppendEntriesRpc>(
server_id_, commit_index_, request_term, request_prev_log_index,
request_prev_log_term, request_entries);
return res;
} catch (...) {
// not being able to connect to peer means we need to retry.
// TODO(ipaljak): Consider backoff.
unreachable_peer = true;
return AppendEntriesRes(false, request_term);
}
});
LOG(INFO) << "Entries size: " << request_entries.size();
lock.unlock(); // Release lock while waiting for response.
auto reply = peer_future.get();
lock.lock();
if (unreachable_peer) {
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
return;
}
if (CurrentTerm() != request_term || exiting_) {
return;
}
if (OutOfSync(reply.term)) {
state_changed_.notify_all();
return;
}
DCHECK(mode_ == Mode::LEADER)
<< "Elected leader for term should never change.";
if (reply.term != CurrentTerm()) {
LOG(INFO) << "Server " << server_id_
<< ": Ignoring stale AppendEntriesRPC reply from " << peer_id;
return;
}
if (!reply.success) {
DCHECK(next_index_[peer_id] > 1)
<< "Log replication should not fail for first log entry";
--next_index_[peer_id];
} else {
uint64_t new_match_index = request_prev_log_index + request_entries.size();
DCHECK(match_index_[peer_id] <= new_match_index)
<< "`match_index` should increase monotonically within a term";
match_index_[peer_id] = new_match_index;
if (request_entries.size() > 0) AdvanceCommitIndex();
next_index_[peer_id] = match_index_[peer_id] + 1;
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
}
state_changed_.notify_all();
} }
void RaftServer::ElectionThreadMain() { void RaftServer::ElectionThreadMain() {
@ -389,7 +549,7 @@ void RaftServer::ElectionThreadMain() {
} }
} }
void RaftServer::PeerThreadMain(int peer_id) { void RaftServer::PeerThreadMain(uint16_t peer_id) {
std::unique_lock<std::mutex> lock(lock_); std::unique_lock<std::mutex> lock(lock_);
/* This loop will either call a function that issues an RPC or wait on the /* This loop will either call a function that issues an RPC or wait on the
@ -436,6 +596,7 @@ void RaftServer::PeerThreadMain(int peer_id) {
return RequestVoteRes(false, request_term); return RequestVoteRes(false, request_term);
} }
}); });
lock.unlock(); // Release lock while waiting for response lock.unlock(); // Release lock while waiting for response
auto reply = peer_future.get(); auto reply = peer_future.get();
lock.lock(); lock.lock();
@ -470,21 +631,10 @@ void RaftServer::PeerThreadMain(int peer_id) {
case Mode::LEADER: { case Mode::LEADER: {
if (now >= next_heartbeat_[peer_id]) { if (now >= next_heartbeat_[peer_id]) {
LOG(INFO) << "Server " << server_id_ << ": Send HB to server " LOG(INFO) << "Server " << server_id_
<< peer_id << " (Term: " << CurrentTerm() << ")"; << ": Send AppendEntries RPC to server " << peer_id
auto peer_future = coordination_->ExecuteOnWorker<AppendEntriesRes>( << " (Term: " << CurrentTerm() << ")";
peer_id, [&](int worker_id, auto &client) { SendEntries(peer_id, lock);
auto last_entry_data = LastEntryData();
std::vector<raft::LogEntry> empty_entries;
auto res = client.template Call<AppendEntriesRpc>(
server_id_, commit_index_, CurrentTerm(),
last_entry_data.first, last_entry_data.second,
empty_entries);
return res;
});
next_heartbeat_[peer_id] =
Clock::now() + config_.heartbeat_interval;
state_changed_.notify_all();
continue; continue;
} }
wait_until = next_heartbeat_[peer_id]; wait_until = next_heartbeat_[peer_id];
@ -497,6 +647,19 @@ void RaftServer::PeerThreadMain(int peer_id) {
} }
} }
void RaftServer::NoOpIssuerThreadMain() {
std::mutex m;
auto lock = std::unique_lock<std::mutex>(m);
while (!exiting_) {
leader_changed_.wait(lock);
// no_op_create_callback_ will create a new transaction that has a NO_OP
// StateDelta. This will trigger the whole procedure of replicating logs
// in our implementation of Raft.
if (!exiting_)
no_op_create_callback_();
}
}
void RaftServer::SetNextElectionTimePoint() { void RaftServer::SetNextElectionTimePoint() {
// [Raft thesis, section 3.4] // [Raft thesis, section 3.4]
// "Raft uses randomized election timeouts to ensure that split votes are // "Raft uses randomized election timeouts to ensure that split votes are
@ -555,14 +718,32 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
void RaftServer::DeleteLogSuffix(int starting_index) { void RaftServer::DeleteLogSuffix(int starting_index) {
auto log = Log(); auto log = Log();
log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed log.erase(log.begin() + starting_index, log.end());
disk_storage_.Put(kLogKey, SerializeLog(log)); disk_storage_.Put(kLogKey, SerializeLog(log));
} }
void RaftServer::GetLogSuffix(int starting_index,
std::vector<raft::LogEntry> &entries) {
auto log = Log();
for (int i = starting_index; i < log.size(); ++i) entries.push_back(log[i]);
}
void RaftServer::AppendLogEntries(uint64_t leader_commit_index, void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
uint64_t starting_index,
const std::vector<LogEntry> &new_entries) { const std::vector<LogEntry> &new_entries) {
auto log = Log(); auto log = Log();
for (auto &entry : new_entries) log.emplace_back(entry); for (int i = 0; i < new_entries.size(); ++i) {
// If existing entry conflicts with new one, we need to delete the
// existing entry and all that follow it.
int current_index = i + starting_index;
if (log.size() > current_index &&
log[current_index].term != new_entries[i].term)
DeleteLogSuffix(current_index);
DCHECK(log.size() >= current_index);
if (log.size() == current_index)
log.emplace_back(new_entries[i]);
}
// See Raft paper 5.3 // See Raft paper 5.3
if (leader_commit_index > commit_index_) { if (leader_commit_index > commit_index_) {
commit_index_ = std::min(leader_commit_index, log.size() - 1); commit_index_ = std::min(leader_commit_index, log.size() - 1);
@ -571,25 +752,26 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
} }
std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) { std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) {
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
{
::capnp::MallocMessageBuilder message; ::capnp::MallocMessageBuilder message;
auto log_builder = ::capnp::List<capnp::LogEntry>::Builder log_builder =
message.initRoot<::capnp::List<capnp::LogEntry>>(log.size()); message.initRoot<::capnp::List<capnp::LogEntry>>(log.size());
utils::SaveVector<capnp::LogEntry, LogEntry>( utils::SaveVector<capnp::LogEntry, LogEntry>(
log, &log_builder, [](auto *log_builder, const auto &log_entry) { log, &log_builder, [](auto *log_builder, const auto &log_entry) {
Save(log_entry, log_builder); Save(log_entry, log_builder);
}); });
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
kj::std::StdOutputStream std_stream(stream); kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream); kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
writeMessage(buffered_stream, message); writeMessage(buffered_stream, message);
}
return stream.str(); return stream.str();
} }
std::vector<LogEntry> RaftServer::DeserializeLog( std::vector<LogEntry> RaftServer::DeserializeLog(
const std::string &serialized_log) { const std::string &serialized_log) {
if (serialized_log.empty()) return {};
::capnp::MallocMessageBuilder message; ::capnp::MallocMessageBuilder message;
std::stringstream stream(std::ios_base::in | std::ios_base::out | std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary); std::ios_base::binary);
@ -610,8 +792,9 @@ std::vector<LogEntry> RaftServer::DeserializeLog(
return deserialized_log; return deserialized_log;
} }
void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { void RaftServer::ResetReplicationLog() {
rlog_->garbage_collect_older(tx_id); rlog_ = nullptr;
rlog_ = std::make_unique<ReplicationLog>();
} }
} // namespace raft } // namespace raft

View File

@ -2,6 +2,7 @@
#pragma once #pragma once
#include <atomic>
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
@ -51,7 +52,7 @@ class RaftServer final : public RaftInterface {
/// @param durbility_dir directory for persisted data. /// @param durbility_dir directory for persisted data.
/// @param config raft configuration. /// @param config raft configuration.
/// @param coordination Abstraction for coordination between Raft servers. /// @param coordination Abstraction for coordination between Raft servers.
/// @param delta_applier TODO /// @param delta_applier Object which is able to apply state deltas to SM.
/// @param reset_callback Function that is called on each Leader->Follower /// @param reset_callback Function that is called on each Leader->Follower
/// transition. /// transition.
RaftServer(uint16_t server_id, const std::string &durability_dir, RaftServer(uint16_t server_id, const std::string &durability_dir,
@ -81,10 +82,10 @@ class RaftServer final : public RaftInterface {
/// persistent storage, an empty Log will be created. /// persistent storage, an empty Log will be created.
std::vector<LogEntry> Log(); std::vector<LogEntry> Log();
/// Append the log to the list of completed logs that are ready to be /// Append to the log a list of batched state deltasa that are ready to be
/// replicated. /// replicated.
void AppendToLog(const tx::TransactionId &tx_id, void AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &log); const std::vector<database::StateDelta> &deltas);
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta /// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster. /// marks the transaction end, it will replicate the log accorss the cluster.
@ -126,7 +127,7 @@ class RaftServer final : public RaftInterface {
private: private:
bool enabled_{false}; bool enabled_{false};
mutable std::mutex lock_; mutable std::mutex buffer_lock_;
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>> std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>>
logs_; logs_;
@ -144,7 +145,7 @@ class RaftServer final : public RaftInterface {
database::StateDeltaApplier *delta_applier_{nullptr}; database::StateDeltaApplier *delta_applier_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr}; std::unique_ptr<ReplicationLog> rlog_{nullptr};
Mode mode_; ///< Server's current mode. std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server. uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known committed entry. uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM. uint64_t last_applied_; ///< Index of the highest applied entry to SM.
@ -162,6 +163,13 @@ class RaftServer final : public RaftInterface {
std::condition_variable state_changed_; ///< Notifies all peer threads on std::condition_variable state_changed_; ///< Notifies all peer threads on
///< relevant state change. ///< relevant state change.
std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op
///< command on leader change.
std::condition_variable leader_changed_; ///< Notifies the no_op_issuer_thread
///< that a new leader has been
///< elected.
bool exiting_ = false; ///< True on server shutdown. bool exiting_ = false; ///< True on server shutdown.
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -187,10 +195,10 @@ class RaftServer final : public RaftInterface {
// volatile state on leaders // volatile state on leaders
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
std::vector<uint16_t> next_index_; ///< for each server, index of the next std::vector<uint64_t> next_index_; ///< for each server, index of the next
///< log entry to send to that server. ///< log entry to send to that server.
std::vector<uint16_t> match_index_; ///< for each server, index of the std::vector<uint64_t> match_index_; ///< for each server, index of the
///< highest log entry known to be ///< highest log entry known to be
///< replicated on server. ///< replicated on server.
@ -225,10 +233,21 @@ class RaftServer final : public RaftInterface {
/// Updates the current term. /// Updates the current term.
void UpdateTerm(uint64_t new_term); void UpdateTerm(uint64_t new_term);
/// Tries to advance the commit index on a leader.
void AdvanceCommitIndex();
/// Recovers from persistent storage. This function should be called from /// Recovers from persistent storage. This function should be called from
/// the constructor before the server starts with normal operation. /// the constructor before the server starts with normal operation.
void Recover(); void Recover();
/// Sends Entries to peer. This function should only be called in leader
/// mode.
///
/// @param peer_id ID of the peer which receives entries.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendEntries(uint16_t peer_id, std::unique_lock<std::mutex> &lock);
/// Main function of the `election_thread_`. It is responsible for /// Main function of the `election_thread_`. It is responsible for
/// transition to CANDIDATE mode when election timeout elapses. /// transition to CANDIDATE mode when election timeout elapses.
void ElectionThreadMain(); void ElectionThreadMain();
@ -237,7 +256,12 @@ class RaftServer final : public RaftInterface {
/// specified node within the Raft cluster. /// specified node within the Raft cluster.
/// ///
/// @param peer_id - ID of a receiving node in the cluster. /// @param peer_id - ID of a receiving node in the cluster.
void PeerThreadMain(int peer_id); void PeerThreadMain(uint16_t peer_id);
/// Issues no-op command when a new leader is elected. This is done to
/// force the Raft protocol to commit logs from previous terms that
/// have been replicated on a majority of peers.
void NoOpIssuerThreadMain();
/// Sets the `TimePoint` for next election. /// Sets the `TimePoint` for next election.
void SetNextElectionTimePoint(); void SetNextElectionTimePoint();
@ -278,14 +302,24 @@ class RaftServer final : public RaftInterface {
/// 1-indexed. /// 1-indexed.
void DeleteLogSuffix(int starting_index); void DeleteLogSuffix(int starting_index);
/// Stores log entries with indexes that are greater or equal to the given
/// starting index into a provided container. If the starting index is
/// greater than the log size, nothing will be stored in the provided
/// container.
///
/// @param starting_index Smallest index which will be stored.
/// @param entries The container which will store the wanted suffix.
void GetLogSuffix(int starting_index, std::vector<raft::LogEntry> &entries);
/// Appends new log entries to Raft log. Note that this function is not /// Appends new log entries to Raft log. Note that this function is not
/// smart in any way, i.e. the caller should make sure that it's safe /// smart in any way, i.e. the caller should make sure that it's safe
/// to call this function. This function also updates this server's commit /// to call this function. This function also updates this server's commit
/// index if necessary. /// index if necessary.
/// ///
/// @param leader_commit_index - Used to update local commit index. /// @param leader_commit_index - Used to update local commit index.
/// @param starting_index - Index in the log from which we start to append.
/// @param new_entries - New `LogEntry` instances to be appended in the log. /// @param new_entries - New `LogEntry` instances to be appended in the log.
void AppendLogEntries(uint64_t leader_commit_index, void AppendLogEntries(uint64_t leader_commit_index, uint64_t starting_index,
const std::vector<LogEntry> &new_entries); const std::vector<LogEntry> &new_entries);
/// Serializes Raft log into `std::string`. /// Serializes Raft log into `std::string`.
@ -294,9 +328,6 @@ class RaftServer final : public RaftInterface {
/// Deserializes Raft log from `std::string`. /// Deserializes Raft log from `std::string`.
std::vector<LogEntry> DeserializeLog(const std::string &serialized_log); std::vector<LogEntry> DeserializeLog(const std::string &serialized_log);
void ResetReplicationLog() { void ResetReplicationLog();
rlog_ = nullptr;
rlog_ = std::make_unique<ReplicationLog>();
}
}; };
} // namespace raft } // namespace raft