Optimize Raft log persistent storage

Summary:
Each `raft::LogEntry` is now persisted under its own key in our `KVStore`. Locally running our HA feature benchmark yields the following results:

```
duration 23.7
executed_writes: 15000
write_per_second: 632.888
```

This represents about 5x increase in throughput.

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1799
This commit is contained in:
Ivan Paljak 2019-01-15 14:11:18 +01:00
parent ca00575f82
commit f09c1254f4
3 changed files with 104 additions and 85 deletions
src/raft
tests/feature_benchmark/ha

View File

@ -17,7 +17,8 @@ namespace fs = std::experimental::filesystem;
const std::string kCurrentTermKey = "current_term";
const std::string kVotedForKey = "voted_for";
const std::string kLogKey = "log";
const std::string kLogSizeKey = "log_size";
const std::string kLogEntryPrefix = "log_entry_";
const std::string kRaftDir = "raft";
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
@ -39,7 +40,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
void RaftServer::Start() {
// Persistent storage initialization
if (Log().empty()) {
if (LogSize() == 0) {
UpdateTerm(0);
LogEntry empty_log_entry(0, {});
AppendLogEntries(0, 0, {empty_log_entry});
@ -128,9 +129,9 @@ void RaftServer::Start() {
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ &&
last_applied_ + 1 < Log().size()) {
last_applied_ + 1 < LogSize()) {
++last_applied_;
delta_applier_->Apply(Log()[last_applied_].deltas);
delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
}
// respond positively to a heartbeat.
@ -155,9 +156,8 @@ void RaftServer::Start() {
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
auto log = Log();
if (log.size() <= req.prev_log_index ||
log[req.prev_log_index].term != req.prev_log_term) {
if (LogSize() <= req.prev_log_index ||
GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
@ -213,13 +213,13 @@ std::experimental::optional<uint16_t> RaftServer::VotedFor() {
return {std::stoul(opt_value.value())};
}
std::vector<LogEntry> RaftServer::Log() {
auto opt_value = disk_storage_.Get(kLogKey);
uint64_t RaftServer::LogSize() {
auto opt_value = disk_storage_.Get(kLogSizeKey);
if (opt_value == std::experimental::nullopt) {
disk_storage_.Put(kLogKey, SerializeLog({}));
return {};
disk_storage_.Put(kLogSizeKey, "0");
return 0;
}
return DeserializeLog(opt_value.value());
return std::stoull(opt_value.value());
}
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
@ -235,14 +235,17 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
rlog_->set_replicated(tx_id);
return;
}
auto log = Log();
DCHECK(last_applied_ == log.size() - 1) << "Everything from the leaders log "
"should be applied into our state "
"machine";
uint64_t log_size = LogSize();
DCHECK(last_applied_ == log_size - 1) << "Everything from the leaders log "
"should be applied into our state "
"machine";
rlog_->set_active(tx_id);
log.emplace_back(CurrentTerm(), deltas);
LogEntry new_entry(CurrentTerm(), deltas);
++last_applied_;
disk_storage_.Put(kLogKey, SerializeLog(log));
disk_storage_.Put(LogEntryKey(log_size), SerializeLogEntry(new_entry));
disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
}
void RaftServer::Emplace(const database::StateDelta &delta) {
@ -383,6 +386,7 @@ void RaftServer::Transition(const Mode &new_mode) {
// Freeze election timer
next_election_ = TimePoint::max();
election_change_.notify_all();
uint64_t log_size = LogSize();
// Set next heartbeat to correct values
TimePoint now = Clock::now();
@ -393,16 +397,15 @@ void RaftServer::Transition(const Mode &new_mode) {
// "For each server, index of the next log entry to send to that server
// is initialized to leader's last log index + 1"
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
next_index_[i] = Log().size();
next_index_[i] = log_size;
match_index_[i] = 0;
}
// Raft guarantees the Leader Append-Only property [Raft paper 5.2]
// so its safe to apply everything from our log into our state machine
auto log = Log();
for (int i = last_applied_ + 1; i < log.size(); ++i)
delta_applier_->Apply(log[i].deltas);
last_applied_ = log.size() - 1;
for (int i = last_applied_ + 1; i < log_size; ++i)
delta_applier_->Apply(GetLogEntry(i).deltas);
last_applied_ = log_size - 1;
mode_ = Mode::LEADER;
log_entry_buffer_.Enable();
@ -423,11 +426,12 @@ void RaftServer::AdvanceCommitIndex() {
<< "Commit index can only be advanced by the leader";
std::vector<uint64_t> known_replication_indices;
uint64_t log_size = LogSize();
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
if (i != server_id_)
known_replication_indices.push_back(match_index_[i]);
else
known_replication_indices.push_back(Log().size() - 1);
known_replication_indices.push_back(log_size - 1);
}
std::sort(known_replication_indices.begin(), known_replication_indices.end());
@ -444,7 +448,7 @@ void RaftServer::AdvanceCommitIndex() {
// counting replicas; once an entry from the current term has been committed
// in this way, then all prior entries are committed indirectly because of the
// Log Matching Property."
if (Log()[new_commit_index].term != CurrentTerm()) {
if (GetLogEntry(new_commit_index).term != CurrentTerm()) {
VLOG(40) << "Server " << server_id_
<< ": cannot commit log entry from "
"previous term based on "
@ -454,17 +458,13 @@ void RaftServer::AdvanceCommitIndex() {
VLOG(40) << "Begin noting comimitted transactions";
// Note the newly committed transactions in ReplicationLog
std::set<tx::TransactionId> replicated_tx_ids;
auto log = Log();
for (int i = commit_index_ + 1; i <= new_commit_index; ++i) {
for (const auto &state_delta : log[i].deltas) {
replicated_tx_ids.insert(state_delta.transaction_id);
}
auto deltas = GetLogEntry(i).deltas;
DCHECK(deltas.size() > 2)
<< "Log entry should consist of at least two state deltas.";
rlog_->set_replicated(deltas[0].transaction_id);
}
for (const auto &tx_id : replicated_tx_ids) rlog_->set_replicated(tx_id);
commit_index_ = new_commit_index;
}
@ -476,10 +476,10 @@ void RaftServer::SendEntries(uint16_t peer_id,
std::unique_lock<std::mutex> &lock) {
uint64_t request_term = CurrentTerm();
uint64_t request_prev_log_index = next_index_[peer_id] - 1;
uint64_t request_prev_log_term = Log()[next_index_[peer_id] - 1].term;
uint64_t request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
std::vector<raft::LogEntry> request_entries;
if (next_index_[peer_id] <= Log().size() - 1)
std::vector<LogEntry> request_entries;
if (next_index_[peer_id] <= LogSize() - 1)
GetLogSuffix(next_index_[peer_id], request_entries);
bool unreachable_peer = false;
@ -690,9 +690,9 @@ bool RaftServer::HasMajortyVote() {
}
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
auto log = Log();
if (log.empty()) return {0, 0};
return {log.size(), log.back().term};
uint64_t log_size = LogSize();
if (log_size == 0) return {0, 0};
return {log_size, GetLogEntry(log_size - 1).term};
}
bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
@ -723,52 +723,70 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
return false;
}
LogEntry RaftServer::GetLogEntry(int index) {
auto opt_value = disk_storage_.Get(LogEntryKey(index));
DCHECK(opt_value != std::experimental::nullopt) << "Log index out of bounds.";
return DeserializeLogEntry(opt_value.value());
}
void RaftServer::DeleteLogSuffix(int starting_index) {
auto log = Log();
log.erase(log.begin() + starting_index, log.end());
disk_storage_.Put(kLogKey, SerializeLog(log));
uint64_t log_size = LogSize();
DCHECK(0 <= starting_index && starting_index < log_size)
<< "Log index out of bounds.";
for (int i = starting_index; i < log_size; ++i)
disk_storage_.Delete(LogEntryKey(i));
disk_storage_.Put(kLogSizeKey, std::to_string(starting_index));
}
void RaftServer::GetLogSuffix(int starting_index,
std::vector<raft::LogEntry> &entries) {
auto log = Log();
for (int i = starting_index; i < log.size(); ++i) entries.push_back(log[i]);
uint64_t log_size = LogSize();
DCHECK(0 <= starting_index && starting_index < log_size)
<< "Log index out of bounds.";
for (int i = starting_index; i < log_size; ++i)
entries.push_back(GetLogEntry(i));
}
void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
uint64_t starting_index,
const std::vector<LogEntry> &new_entries) {
auto log = Log();
uint64_t log_size = LogSize();
for (int i = 0; i < new_entries.size(); ++i) {
// If existing entry conflicts with new one, we need to delete the
// existing entry and all that follow it.
int current_index = i + starting_index;
if (log.size() > current_index &&
log[current_index].term != new_entries[i].term)
if (log_size > current_index &&
GetLogEntry(current_index).term != new_entries[i].term) {
DeleteLogSuffix(current_index);
DCHECK(log.size() >= current_index);
if (log.size() == current_index) log.emplace_back(new_entries[i]);
log_size = LogSize();
}
DCHECK(log_size >= current_index) << "Current Log index out of bounds.";
if (log_size == current_index) {
disk_storage_.Put(LogEntryKey(log_size),
SerializeLogEntry(new_entries[i]));
disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
log_size += 1;
}
}
// See Raft paper 5.3
if (leader_commit_index > commit_index_) {
commit_index_ = std::min(leader_commit_index, log.size() - 1);
commit_index_ = std::min(leader_commit_index, log_size - 1);
}
disk_storage_.Put(kLogKey, SerializeLog(log));
}
std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) {
std::string RaftServer::LogEntryKey(uint64_t index) {
return kLogEntryPrefix + std::to_string(index);
}
std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) {
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
{
::capnp::MallocMessageBuilder message;
::capnp::List<capnp::LogEntry>::Builder log_builder =
message.initRoot<::capnp::List<capnp::LogEntry>>(log.size());
utils::SaveVector<capnp::LogEntry, LogEntry>(
log, &log_builder, [](auto *log_builder, const auto &log_entry) {
Save(log_entry, log_builder);
});
capnp::LogEntry::Builder log_builder =
message.initRoot<capnp::LogEntry>();
Save(log_entry, &log_builder);
kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
writeMessage(buffered_stream, message);
@ -776,25 +794,19 @@ std::string RaftServer::SerializeLog(const std::vector<LogEntry> &log) {
return stream.str();
}
std::vector<LogEntry> RaftServer::DeserializeLog(
const std::string &serialized_log) {
LogEntry RaftServer::DeserializeLogEntry(
const std::string &serialized_log_entry) {
::capnp::MallocMessageBuilder message;
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
kj::std::StdInputStream std_stream(stream);
kj::BufferedInputStreamWrapper buffered_stream(std_stream);
stream << serialized_log;
stream << serialized_log_entry;
readMessageCopy(buffered_stream, message);
::capnp::List<capnp::LogEntry>::Reader log_reader =
message.getRoot<::capnp::List<capnp::LogEntry>>().asReader();
std::vector<LogEntry> deserialized_log;
utils::LoadVector<capnp::LogEntry, LogEntry>(&deserialized_log, log_reader,
[](const auto &log_reader) {
LogEntry log_entry;
Load(&log_entry, log_reader);
return log_entry;
});
capnp::LogEntry::Reader log_reader =
message.getRoot<capnp::LogEntry>().asReader();
LogEntry deserialized_log;
Load(&deserialized_log, log_reader);
return deserialized_log;
}

View File

@ -77,10 +77,8 @@ class RaftServer final : public RaftInterface {
/// if such server doesn't exist.
std::experimental::optional<uint16_t> VotedFor();
/// Retrieves the log entries from persistent storage. The log is 1-indexed
/// in order to be consistent with the paper. If the Log isn't present in
/// persistent storage, an empty Log will be created.
std::vector<LogEntry> Log();
/// Retrieves log size from persistent storage.
uint64_t LogSize();
/// Append to the log a list of batched state deltasa that are ready to be
/// replicated.
@ -216,7 +214,9 @@ class RaftServer final : public RaftInterface {
// - uint64_t current_term -- latest term server has seen.
// - uint16_t voted_for -- candidate_id that received vote in current
// term (null if none).
// - vector<LogEntry> log -- log entries.
// - uint64_t log_size -- Number of stored entries within the log.
// - vector<LogEntry> log -- log entries. Each log entry is stored under
// a separate key within KVStore.
//////////////////////////////////////////////////////////////////////////////
storage::KVStore disk_storage_;
@ -297,6 +297,11 @@ class RaftServer final : public RaftInterface {
/// @return true if the current server's term lags behind.
bool OutOfSync(uint64_t reply_term);
/// Retrieves a log entry from the log at a given index.
///
/// @param index Index of the log entry to be retrieved.
LogEntry GetLogEntry(int index);
/// Deletes log entries with indexes that are greater or equal to the given
/// starting index.
///
@ -325,12 +330,17 @@ class RaftServer final : public RaftInterface {
void AppendLogEntries(uint64_t leader_commit_index, uint64_t starting_index,
const std::vector<LogEntry> &new_entries);
/// Serializes Raft log into `std::string`.
std::string SerializeLog(const std::vector<LogEntry> &log);
/// Generates the key under which the `LogEntry` with a given index should
/// be stored on our disk storage.
///
/// @param index - Index of the `LogEntry` for which we generate the key.
std::string LogEntryKey(uint64_t index);
/// Deserializes Raft log from `std::string`.
std::vector<LogEntry> DeserializeLog(const std::string &serialized_log);
/// Serializes Raft log entry into `std::string`
std::string SerializeLogEntry(const LogEntry &log_entry);
/// Deserialized Raft log entry from `std::string`
LogEntry DeserializeLogEntry(const std::string &serialized_log_entry);
void ResetReplicationLog();
};
} // namespace raft

View File

@ -20,10 +20,7 @@ fi
RESULTS="$DIR/.apollo_measurements"
# Benchmark parameters
# TODO(msantl): We're benchmarking with only 300 nodes because there is a O(n^2)
# complexity in the current Raft implementation. Once we remove this bottleneck,
# we can raise this number to test proper performance.
NODES=300
NODES=15000
## Startup
declare -a HA_PIDS