Remove log compaction from current HA implementation

Reviewers: mferencevic, buda

Reviewed By: mferencevic, buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2632
This commit is contained in:
Ivan Paljak 2020-01-22 16:21:39 +01:00
parent c11d391e62
commit 1a50165c22
4 changed files with 10 additions and 402 deletions

View File

@ -6,7 +6,6 @@
#include "communication/rpc/messages.hpp"
#include "raft/log_entry.hpp"
#include "raft/snapshot_metadata.hpp"
cpp<#
(lcp:namespace raft)
@ -41,13 +40,4 @@ cpp<#
((success :bool)
(term :uint64_t))))
(lcp:define-rpc install-snapshot
(:request
((leader-id :uint16_t)
(term :uint64_t)
(snapshot-metadata "::raft::SnapshotMetadata")
(data "std::string")))
(:response
((term :uint64_t))))
(lcp:pop-namespace) ;; raft

View File

@ -13,8 +13,6 @@
#include "communication/rpc/client.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "raft/exceptions.hpp"
#include "slk/streams.hpp"
#include "utils/cast.hpp"
@ -31,9 +29,7 @@ const std::string kCurrentTermKey = "current_term";
const std::string kVotedForKey = "voted_for";
const std::string kLogSizeKey = "log_size";
const std::string kLogEntryPrefix = "log_entry_";
const std::string kSnapshotMetadataKey = "snapshot_metadata";
const std::string kRaftDir = "raft";
const std::chrono::duration<int64_t> kSnapshotPeriod = 1s;
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
bool db_recover_on_startup, const Config &config,
@ -53,18 +49,9 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
disk_storage_(fs::path(durability_dir) / kRaftDir) {}
void RaftServer::Start() {
if (db_recover_on_startup_) {
snapshot_metadata_ = GetSnapshotMetadata();
if (snapshot_metadata_) {
RecoverSnapshot(snapshot_metadata_->snapshot_filename);
last_applied_ = snapshot_metadata_->last_included_index;
commit_index_ = snapshot_metadata_->last_included_index;
last_entry_term_ = snapshot_metadata_->last_included_term;
}
} else {
if (!db_recover_on_startup_) {
// We need to clear persisted data if we don't want any recovery.
disk_storage_.DeletePrefix("");
durability::RemoveAllSnapshots(durability_dir_);
}
// Persistent storage initialization
@ -181,28 +168,12 @@ void RaftServer::Start() {
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
if (snapshot_metadata_ &&
snapshot_metadata_->last_included_index == req.prev_log_index) {
if (req.prev_log_term != snapshot_metadata_->last_included_term) {
AppendEntriesRes res(false, current_term_);
slk::Save(res, res_builder);
return;
}
} else if (snapshot_metadata_ &&
snapshot_metadata_->last_included_index > req.prev_log_index) {
LOG(ERROR) << "Received entries that are already commited and have been "
"compacted";
AppendEntriesRes res(false, current_term_);
slk::Save(res, res_builder);
return;
} else {
if (log_size_ <= req.prev_log_index ||
GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
AppendEntriesRes res(false, current_term_);
slk::Save(res, res_builder);
return;
}
}
// No need to call this function for a heartbeat
if (!req.entries.empty()) {
@ -253,89 +224,6 @@ void RaftServer::Start() {
slk::Save(res, res_builder);
});
coordination_->Register<InstallSnapshotRpc>(
[this](auto *req_reader, auto *res_builder) {
// Acquire snapshot lock.
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
std::lock_guard<std::mutex> guard(lock_);
InstallSnapshotReq req;
slk::Load(&req, req_reader);
if (exiting_ || req.term < current_term_) {
InstallSnapshotRes res(current_term_);
slk::Save(res, res_builder);
return;
}
// Check if the current state matches the one in snapshot
if (req.snapshot_metadata.last_included_index == last_applied_ &&
req.snapshot_metadata.last_included_term == current_term_) {
InstallSnapshotRes res(current_term_);
slk::Save(res, res_builder);
return;
}
VLOG(40) << "[InstallSnapshotRpc] Starting.";
if (req.term > current_term_) {
VLOG(40) << "[InstallSnapshotRpc] Updating term.";
SetCurrentTerm(req.term);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
utils::OnScopeExit extend_election_timeout([this] {
SetNextElectionTimePoint();
election_change_.notify_all();
});
// Temporary freeze election timer while we handle the snapshot.
next_election_ = TimePoint::max();
VLOG(40) << "[InstallSnapshotRpc] Remove all snapshots.";
// Remove all previous snapshots
durability::RemoveAllSnapshots(durability_dir_);
const auto snapshot_path = durability::MakeSnapshotPath(
durability_dir_, req.snapshot_metadata.snapshot_filename);
// Save snapshot file
{
VLOG(40) << "[InstallSnapshotRpc] Saving received snapshot.";
std::ofstream output_stream;
output_stream.open(snapshot_path, std::ios::out | std::ios::binary);
output_stream.write(req.data.data(), req.data.size());
output_stream.flush();
output_stream.close();
}
// Discard the all logs. We keep the one at index 0.
VLOG(40) << "[InstallSnapshotRpc] Discarding logs.";
log_.clear();
for (uint64_t i = 1; i < log_size_; ++i)
disk_storage_.Delete(LogEntryKey(i));
// Reset the database.
VLOG(40) << "[InstallSnapshotRpc] Reset database.";
db_->Reset();
// Apply the snapshot.
VLOG(40) << "[InstallSnapshotRpc] Recover from received snapshot.";
RecoverSnapshot(req.snapshot_metadata.snapshot_filename);
VLOG(40) << "[InstallSnapshotRpc] Persist snapshot metadata.";
PersistSnapshotMetadata(req.snapshot_metadata);
// Update the state to match the one from snapshot.
VLOG(40) << "[InstallSnapshotRpc] Update Raft state.";
last_applied_ = req.snapshot_metadata.last_included_index;
commit_index_ = req.snapshot_metadata.last_included_index;
SetLogSize(req.snapshot_metadata.last_included_index + 1);
InstallSnapshotRes res(current_term_);
slk::Save(res, res_builder);
});
// start threads
SetNextElectionTimePoint();
@ -347,8 +235,6 @@ void RaftServer::Start() {
}
no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this);
snapshot_thread_ = std::thread(&RaftServer::SnapshotThread, this);
}
void RaftServer::Shutdown() {
@ -372,7 +258,6 @@ void RaftServer::Shutdown() {
if (election_thread_.joinable()) election_thread_.join();
if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join();
if (snapshot_thread_.joinable()) snapshot_thread_.join();
}
void RaftServer::SetCurrentTerm(uint64_t new_current_term) {
@ -394,43 +279,6 @@ void RaftServer::SetLogSize(uint64_t new_log_size) {
disk_storage_.Put(kLogSizeKey, std::to_string(new_log_size));
}
std::optional<SnapshotMetadata> RaftServer::GetSnapshotMetadata() {
auto opt_value = disk_storage_.Get(kSnapshotMetadataKey);
if (opt_value == std::nullopt) {
return std::nullopt;
}
auto &value = *opt_value;
slk::Reader reader(reinterpret_cast<const uint8_t *>(value.data()),
value.size());
SnapshotMetadata deserialized;
try {
slk::Load(&deserialized, &reader);
reader.Finalize();
} catch (const slk::SlkReaderException &) {
return std::nullopt;
}
return std::make_optional(deserialized);
}
void RaftServer::PersistSnapshotMetadata(
const SnapshotMetadata &snapshot_metadata) {
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
slk::Builder builder(
[&stream](const uint8_t *data, size_t size, bool have_more) {
for (size_t i = 0; i < size; ++i) {
stream << utils::MemcpyCast<char>(data[i]);
}
});
slk::Save(snapshot_metadata, &builder);
builder.Finalize();
disk_storage_.Put(kSnapshotMetadataKey, stream.str());
// Keep the metadata in memory for faster access.
snapshot_metadata_.emplace(snapshot_metadata);
}
std::optional<LogEntryStatus> RaftServer::Emplace(
const std::vector<database::StateDelta> &deltas) {
std::unique_lock<std::mutex> lock(lock_);
@ -544,11 +392,6 @@ void RaftServer::Transition(const Mode &new_mode) {
// Re-apply raft log.
uint64_t starting_index = 1;
if (snapshot_metadata_) {
RecoverSnapshot(snapshot_metadata_->snapshot_filename);
starting_index = snapshot_metadata_->last_included_index + 1;
}
for (uint64_t i = starting_index; i <= commit_index_; ++i) {
ApplyStateDeltas(GetLogEntry(i).deltas);
}
@ -681,27 +524,16 @@ void RaftServer::AdvanceCommitIndex() {
void RaftServer::SendEntries(uint16_t peer_id,
std::unique_lock<std::mutex> *lock) {
if (snapshot_metadata_ &&
snapshot_metadata_->last_included_index >= next_index_[peer_id]) {
SendSnapshot(peer_id, *snapshot_metadata_, lock);
} else {
SendLogEntries(peer_id, snapshot_metadata_, lock);
}
SendLogEntries(peer_id, lock);
}
void RaftServer::SendLogEntries(
uint16_t peer_id, const std::optional<SnapshotMetadata> &snapshot_metadata,
void RaftServer::SendLogEntries(uint16_t peer_id,
std::unique_lock<std::mutex> *lock) {
uint64_t request_term = current_term_;
uint64_t request_prev_log_index = next_index_[peer_id] - 1;
uint64_t request_prev_log_term;
if (snapshot_metadata &&
snapshot_metadata->last_included_index == next_index_[peer_id] - 1) {
request_prev_log_term = snapshot_metadata->last_included_term;
} else {
request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
}
std::vector<LogEntry> request_entries;
if (next_index_[peer_id] <= log_size_ - 1)
@ -775,76 +607,6 @@ void RaftServer::SendLogEntries(
state_changed_.notify_all();
}
void RaftServer::SendSnapshot(uint16_t peer_id,
const SnapshotMetadata &snapshot_metadata,
std::unique_lock<std::mutex> *lock) {
uint64_t request_term = current_term_;
std::string snapshot_data;
// TODO: The snapshot is currently sent all at once. Because the snapshot file
// can be extremely large (>100GB, it contains the whole database) it must be
// sent out in chunks! Reimplement this logic so that it sends out the
// snapshot in chunks.
{
const auto snapshot_path = durability::MakeSnapshotPath(
durability_dir_, snapshot_metadata.snapshot_filename);
std::ifstream input_stream;
input_stream.open(snapshot_path, std::ios::in | std::ios::binary);
input_stream.seekg(0, std::ios::end);
uint64_t snapshot_size = input_stream.tellg();
snapshot_data = std::string(snapshot_size, '\0');
input_stream.seekg(0, std::ios::beg);
input_stream.read(snapshot_data.data(), snapshot_size);
input_stream.close();
}
VLOG(40) << "Server " << server_id_
<< ": Sending Snapshot RPC to server " << peer_id
<< " (Term: " << current_term_ << ")";
VLOG(40) << "Snapshot size: " << snapshot_data.size() << " bytes.";
// Copy all internal variables before releasing the lock.
auto server_id = server_id_;
// Execute the RPC.
lock->unlock();
auto reply = coordination_->ExecuteOnOtherNode<InstallSnapshotRpc>(
peer_id, server_id, request_term, snapshot_metadata,
std::move(snapshot_data));
lock->lock();
if (!reply) {
next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval;
return;
}
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
return;
}
if (OutOfSync(reply->term)) {
state_changed_.notify_all();
return;
}
if (reply->term != current_term_) {
VLOG(40) << "Server " << server_id_
<< ": Ignoring stale InstallSnapshotRpc reply from " << peer_id;
return;
}
match_index_[peer_id] = snapshot_metadata.last_included_index;
next_index_[peer_id] = snapshot_metadata.last_included_index + 1;
index_offset_[peer_id] = 1;
next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval;
state_changed_.notify_all();
}
void RaftServer::ElectionThreadMain() {
utils::ThreadSetName("ElectionThread");
std::unique_lock<std::mutex> lock(lock_);
@ -1010,85 +772,6 @@ void RaftServer::NoOpIssuerThreadMain() {
}
}
void RaftServer::SnapshotThread() {
utils::ThreadSetName(fmt::format("RaftSnapshot"));
if (config_.log_size_snapshot_threshold == -1) return;
while (true) {
{
// Acquire snapshot lock before we acquire the Raft lock. This should
// avoid the situation where we release the lock to start writing a
// snapshot but the `InstallSnapshotRpc` deletes it and we write wrong
// metadata.
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
std::unique_lock<std::mutex> lock(lock_);
if (exiting_) break;
uint64_t committed_log_size = last_applied_;
if (snapshot_metadata_) {
committed_log_size -= snapshot_metadata_->last_included_index;
}
// Compare the log size to the config
if (config_.log_size_snapshot_threshold < committed_log_size) {
VLOG(40) << "[LogCompaction] Starting log compaction.";
uint64_t last_included_term = 0;
uint64_t last_included_index = 0;
std::string snapshot_filename;
bool status = false;
{
// Create a DB accessor for snapshot creation
auto dba = db_->Access();
last_included_term = GetLogEntry(last_applied_).term;
last_included_index = last_applied_;
snapshot_filename = durability::GetSnapshotFilename(
last_included_term, last_included_index);
lock.unlock();
VLOG(40) << "[LogCompaction] Creating snapshot.";
status = durability::MakeSnapshot(*db_, dba, durability_dir_,
snapshot_filename);
// Raft lock must be released when destroying dba object.
// Destroy the db accessor
}
lock.lock();
if (status) {
uint64_t log_compaction_start_index = 1;
if (snapshot_metadata_) {
log_compaction_start_index =
snapshot_metadata_->last_included_index + 1;
}
VLOG(40) << "[LogCompaction] Persisting snapshot metadata";
PersistSnapshotMetadata(
{last_included_term, last_included_index, snapshot_filename});
// Log compaction.
VLOG(40) << "[LogCompaction] Compacting log from "
<< log_compaction_start_index << " to "
<< last_included_index;
for (int i = log_compaction_start_index; i <= last_included_index;
++i) {
log_.erase(i);
disk_storage_.Delete(LogEntryKey(i));
}
// After we deleted entries from the persistent store, make sure we
// compact the files and actually reduce used disk space.
disk_storage_.CompactRange(LogEntryKey(log_compaction_start_index),
LogEntryKey(last_included_index));
}
}
}
std::this_thread::sleep_for(kSnapshotPeriod);
}
}
void RaftServer::SetNextElectionTimePoint() {
// [Raft thesis, section 3.4]
// "Raft uses randomized election timeouts to ensure that split votes are
@ -1112,10 +795,6 @@ bool RaftServer::HasMajorityVote() {
}
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
if (snapshot_metadata_ &&
snapshot_metadata_->last_included_index == log_size_ - 1) {
return {log_size_, snapshot_metadata_->last_included_term};
}
return {log_size_, last_entry_term_};
}
@ -1147,7 +826,7 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
return false;
}
LogEntry RaftServer::GetLogEntry(int index) {
LogEntry RaftServer::GetLogEntry(uint64_t index) {
auto it = log_.find(index);
if (it != log_.end())
return it->second; // retrieve in-mem if possible
@ -1235,15 +914,6 @@ LogEntry RaftServer::DeserializeLogEntry(
return deserialized;
}
void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
durability::RecoveryData recovery_data;
bool recovery = durability::RecoverSnapshot(
db_, &recovery_data, durability_dir_, snapshot_filename);
CHECK(recovery);
durability::RecoverIndexes(db_, recovery_data.indexes);
}
void RaftServer::NoOpCreate() {
// TODO(ipaljak): Review this after implementing RaftDelta object.
auto dba = db_->Access();

View File

@ -16,7 +16,6 @@
#include "raft/raft_rpc_messages.hpp"
#include "raft/replication_log.hpp"
#include "raft/replication_timeout_map.hpp"
#include "raft/snapshot_metadata.hpp"
#include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
@ -84,14 +83,6 @@ class RaftServer final : public RaftInterface {
/// as its in-memory copy.
void SetLogSize(uint64_t new_log_size);
/// Retrieves persisted snapshot metadata or nullopt if not present.
/// Snapshot metadata is a triplet consisting of the last included term, last
/// last included log entry index and the snapshot filename.
std::optional<SnapshotMetadata> GetSnapshotMetadata();
/// Persists snapshot metadata.
void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata);
/// Emplace a new LogEntry in the raft log and start its replication. This
/// entry is created from a given batched set of StateDelta objects.
///
@ -138,7 +129,6 @@ class RaftServer final : public RaftInterface {
private:
mutable std::mutex lock_; ///< Guards all internal state.
mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal.
mutable std::mutex heartbeat_lock_; ///< Guards HB issuing
//////////////////////////////////////////////////////////////////////////////
@ -174,10 +164,6 @@ class RaftServer final : public RaftInterface {
std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op
///< command on leader change.
std::thread snapshot_thread_; ///< Thread responsible for snapshot creation
///< when log size reaches
///< `log_size_snapshot_threshold`.
std::condition_variable leader_changed_; ///< Notifies the
///< no_op_issuer_thread that a new
///< leader has been elected.
@ -256,8 +242,6 @@ class RaftServer final : public RaftInterface {
std::map<uint64_t, LogEntry> log_;
std::optional<SnapshotMetadata> snapshot_metadata_;
/// Recovers persistent data from disk and stores its in-memory copies
/// that insure faster read-only operations. This method should be called
/// on start-up. If parts of persistent data are missing, the method won't
@ -285,22 +269,18 @@ class RaftServer final : public RaftInterface {
/// mode.
///
/// @param peer_id ID of the peer which receives entries.
/// @param snapshot_metadata metadata of the last snapshot, if any.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendLogEntries(uint16_t peer_id,
const std::optional<SnapshotMetadata> &snapshot_metadata,
std::unique_lock<std::mutex> *lock);
/// Send Snapshot to peer. This function should only be called in leader
/// mode.
///
/// @param peer_id ID of the peer which receives entries.
/// @param snapshot_metadata metadata of the snapshot to send.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendSnapshot(uint16_t peer_id, const SnapshotMetadata &snapshot_metadata,
std::unique_lock<std::mutex> *lock);
void SendSnapshot(uint16_t peer_id, std::unique_lock<std::mutex> *lock);
/// Main function of the `election_thread_`. It is responsible for
/// transition to CANDIDATE mode when election timeout elapses.
@ -325,11 +305,6 @@ class RaftServer final : public RaftInterface {
/// have been replicated on a majority of peers.
void NoOpIssuerThreadMain();
/// Periodically checks if the Log size reached `log_size_snapshot_threshold`
/// parameter. If it has, then it performs log compaction and creates
/// snapshots.
void SnapshotThread();
/// Sets the `TimePoint` for next election.
void SetNextElectionTimePoint();
@ -364,7 +339,7 @@ class RaftServer final : public RaftInterface {
/// Retrieves a log entry from the log at a given index.
///
/// @param index Index of the log entry to be retrieved.
LogEntry GetLogEntry(int index);
LogEntry GetLogEntry(uint64_t index);
/// Deletes log entries with indexes that are greater or equal to the given
/// starting index.
@ -406,9 +381,6 @@ class RaftServer final : public RaftInterface {
/// Deserialized Raft log entry from `std::string`
LogEntry DeserializeLogEntry(const std::string &serialized_log_entry);
/// Recovers the given snapshot if it exists in the durability directory.
void RecoverSnapshot(const std::string &snapshot_filename);
/// Start a new transaction with a NO-OP StateDelta.
void NoOpCreate();

View File

@ -1,24 +0,0 @@
#>cpp
#pragma once
#include <string>
#include "utils/typeinfo.hpp"
cpp<#
(lcp:namespace raft)
(lcp:define-struct snapshot-metadata ()
((last-included-term :uint64_t)
(last-included-index :uint64_t)
(snapshot-filename "std::string"))
(:public #>cpp
SnapshotMetadata() = default;
SnapshotMetadata(uint64_t _last_included_term, uint64_t _last_included_index,
const std::string &_snapshot_filename)
: last_included_term(_last_included_term),
last_included_index(_last_included_index),
snapshot_filename(_snapshot_filename) {}
cpp<#)
(:serialize (:slk)))
(lcp:pop-namespace) ;; raft