Keep SnapshotMetadata in memory
Summary: Instead of asking rocksdb for snapshot metadata, keep it in memory for faster access. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2075
This commit is contained in:
parent
f2cc41fa59
commit
e1fdb85a34
@ -54,11 +54,11 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
|
||||
void RaftServer::Start() {
|
||||
if (db_recover_on_startup_) {
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
if (snapshot_metadata) {
|
||||
RecoverSnapshot(snapshot_metadata->snapshot_filename);
|
||||
last_applied_ = snapshot_metadata->last_included_index;
|
||||
commit_index_ = snapshot_metadata->last_included_index;
|
||||
snapshot_metadata_ = GetSnapshotMetadata();
|
||||
if (snapshot_metadata_) {
|
||||
RecoverSnapshot(snapshot_metadata_->snapshot_filename);
|
||||
last_applied_ = snapshot_metadata_->last_included_index;
|
||||
commit_index_ = snapshot_metadata_->last_included_index;
|
||||
}
|
||||
} else {
|
||||
// We need to clear persisted data if we don't want any recovery.
|
||||
@ -169,17 +169,15 @@ void RaftServer::Start() {
|
||||
// term, then they store the same command.
|
||||
// - If two entries in different logs have the same index and term,
|
||||
// then the logs are identical in all preceding entries.
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
|
||||
if (snapshot_metadata &&
|
||||
snapshot_metadata->last_included_index == req.prev_log_index) {
|
||||
if (req.prev_log_term != snapshot_metadata->last_included_term) {
|
||||
if (snapshot_metadata_ &&
|
||||
snapshot_metadata_->last_included_index == req.prev_log_index) {
|
||||
if (req.prev_log_term != snapshot_metadata_->last_included_term) {
|
||||
AppendEntriesRes res(false, current_term_);
|
||||
slk::Save(res, res_builder);
|
||||
return;
|
||||
}
|
||||
} else if (snapshot_metadata &&
|
||||
snapshot_metadata->last_included_index > req.prev_log_index) {
|
||||
} else if (snapshot_metadata_ &&
|
||||
snapshot_metadata_->last_included_index > req.prev_log_index) {
|
||||
LOG(ERROR) << "Received entries that are already commited and have been "
|
||||
"compacted";
|
||||
AppendEntriesRes res(false, current_term_);
|
||||
@ -413,6 +411,9 @@ void RaftServer::PersistSnapshotMetadata(
|
||||
slk::Save(snapshot_metadata, &builder);
|
||||
builder.Finalize();
|
||||
disk_storage_.Put(kSnapshotMetadataKey, stream.str());
|
||||
|
||||
// Keep the metadata in memory for faster access.
|
||||
snapshot_metadata_.emplace(snapshot_metadata);
|
||||
}
|
||||
|
||||
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
|
||||
@ -579,10 +580,9 @@ void RaftServer::Transition(const Mode &new_mode) {
|
||||
|
||||
// Re-apply raft log.
|
||||
uint64_t starting_index = 1;
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
if (snapshot_metadata) {
|
||||
RecoverSnapshot(snapshot_metadata->snapshot_filename);
|
||||
starting_index = snapshot_metadata->last_included_index + 1;
|
||||
if (snapshot_metadata_) {
|
||||
RecoverSnapshot(snapshot_metadata_->snapshot_filename);
|
||||
starting_index = snapshot_metadata_->last_included_index + 1;
|
||||
}
|
||||
|
||||
for (uint64_t i = starting_index; i <= commit_index_; ++i) {
|
||||
@ -725,13 +725,11 @@ void RaftServer::AdvanceCommitIndex() {
|
||||
|
||||
void RaftServer::SendEntries(uint16_t peer_id,
|
||||
std::unique_lock<std::mutex> *lock) {
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
|
||||
if (snapshot_metadata &&
|
||||
snapshot_metadata->last_included_index >= next_index_[peer_id]) {
|
||||
SendSnapshot(peer_id, *snapshot_metadata, lock);
|
||||
if (snapshot_metadata_ &&
|
||||
snapshot_metadata_->last_included_index >= next_index_[peer_id]) {
|
||||
SendSnapshot(peer_id, *snapshot_metadata_, lock);
|
||||
} else {
|
||||
SendLogEntries(peer_id, snapshot_metadata, lock);
|
||||
SendLogEntries(peer_id, snapshot_metadata_, lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1068,9 +1066,8 @@ void RaftServer::SnapshotThread() {
|
||||
if (exiting_) break;
|
||||
|
||||
uint64_t committed_log_size = last_applied_;
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
if (snapshot_metadata) {
|
||||
committed_log_size -= snapshot_metadata->last_included_index;
|
||||
if (snapshot_metadata_) {
|
||||
committed_log_size -= snapshot_metadata_->last_included_index;
|
||||
}
|
||||
|
||||
// Compare the log size to the config
|
||||
@ -1103,9 +1100,9 @@ void RaftServer::SnapshotThread() {
|
||||
|
||||
if (status) {
|
||||
uint64_t log_compaction_start_index = 1;
|
||||
if (snapshot_metadata) {
|
||||
if (snapshot_metadata_) {
|
||||
log_compaction_start_index =
|
||||
snapshot_metadata->last_included_index + 1;
|
||||
snapshot_metadata_->last_included_index + 1;
|
||||
}
|
||||
|
||||
VLOG(40) << "[LogCompaction] Persisting snapshot metadata";
|
||||
@ -1156,10 +1153,9 @@ bool RaftServer::HasMajorityVote() {
|
||||
}
|
||||
|
||||
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
if (snapshot_metadata &&
|
||||
snapshot_metadata->last_included_index == log_size_ - 1) {
|
||||
return {log_size_, snapshot_metadata->last_included_term};
|
||||
if (snapshot_metadata_ &&
|
||||
snapshot_metadata_->last_included_index == log_size_ - 1) {
|
||||
return {log_size_, snapshot_metadata_->last_included_term};
|
||||
}
|
||||
return {log_size_, GetLogEntry(log_size_ - 1).term};
|
||||
}
|
||||
|
@ -289,6 +289,8 @@ class RaftServer final : public RaftInterface {
|
||||
|
||||
std::map<uint64_t, LogEntry> log_;
|
||||
|
||||
std::optional<SnapshotMetadata> snapshot_metadata_;
|
||||
|
||||
/// Recovers persistent data from disk and stores its in-memory copies
|
||||
/// that insure faster read-only operations. This method should be called
|
||||
/// on start-up. If parts of persistent data are missing, the method won't
|
||||
|
Loading…
Reference in New Issue
Block a user