From 1a50165c228027264480b05894053df8b4400462 Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Wed, 22 Jan 2020 16:21:39 +0100 Subject: [PATCH] Remove log compaction from current HA implementation Reviewers: mferencevic, buda Reviewed By: mferencevic, buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2632 --- src/raft/raft_rpc_messages.lcp | 10 - src/raft/raft_server.cpp | 346 +-------------------------------- src/raft/raft_server.hpp | 32 +-- src/raft/snapshot_metadata.lcp | 24 --- 4 files changed, 10 insertions(+), 402 deletions(-) delete mode 100644 src/raft/snapshot_metadata.lcp diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp index 54075c414..35c3fffe8 100644 --- a/src/raft/raft_rpc_messages.lcp +++ b/src/raft/raft_rpc_messages.lcp @@ -6,7 +6,6 @@ #include "communication/rpc/messages.hpp" #include "raft/log_entry.hpp" -#include "raft/snapshot_metadata.hpp" cpp<# (lcp:namespace raft) @@ -41,13 +40,4 @@ cpp<# ((success :bool) (term :uint64_t)))) -(lcp:define-rpc install-snapshot - (:request - ((leader-id :uint16_t) - (term :uint64_t) - (snapshot-metadata "::raft::SnapshotMetadata") - (data "std::string"))) - (:response - ((term :uint64_t)))) - (lcp:pop-namespace) ;; raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 141f83ec4..80134d320 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -13,8 +13,6 @@ #include "communication/rpc/client.hpp" #include "database/graph_db_accessor.hpp" #include "durability/single_node_ha/paths.hpp" -#include "durability/single_node_ha/recovery.hpp" -#include "durability/single_node_ha/snapshooter.hpp" #include "raft/exceptions.hpp" #include "slk/streams.hpp" #include "utils/cast.hpp" @@ -31,9 +29,7 @@ const std::string kCurrentTermKey = "current_term"; const std::string kVotedForKey = "voted_for"; const std::string kLogSizeKey = "log_size"; const std::string kLogEntryPrefix = "log_entry_"; -const std::string kSnapshotMetadataKey = "snapshot_metadata"; const std::string kRaftDir = "raft"; -const std::chrono::duration kSnapshotPeriod = 1s; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, bool db_recover_on_startup, const Config &config, @@ -53,18 +49,9 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, disk_storage_(fs::path(durability_dir) / kRaftDir) {} void RaftServer::Start() { - if (db_recover_on_startup_) { - snapshot_metadata_ = GetSnapshotMetadata(); - if (snapshot_metadata_) { - RecoverSnapshot(snapshot_metadata_->snapshot_filename); - last_applied_ = snapshot_metadata_->last_included_index; - commit_index_ = snapshot_metadata_->last_included_index; - last_entry_term_ = snapshot_metadata_->last_included_term; - } - } else { + if (!db_recover_on_startup_) { // We need to clear persisted data if we don't want any recovery. disk_storage_.DeletePrefix(""); - durability::RemoveAllSnapshots(durability_dir_); } // Persistent storage initialization @@ -181,27 +168,11 @@ void RaftServer::Start() { // term, then they store the same command. // - If two entries in different logs have the same index and term, // then the logs are identical in all preceding entries. - if (snapshot_metadata_ && - snapshot_metadata_->last_included_index == req.prev_log_index) { - if (req.prev_log_term != snapshot_metadata_->last_included_term) { - AppendEntriesRes res(false, current_term_); - slk::Save(res, res_builder); - return; - } - } else if (snapshot_metadata_ && - snapshot_metadata_->last_included_index > req.prev_log_index) { - LOG(ERROR) << "Received entries that are already commited and have been " - "compacted"; + if (log_size_ <= req.prev_log_index || + GetLogEntry(req.prev_log_index).term != req.prev_log_term) { AppendEntriesRes res(false, current_term_); slk::Save(res, res_builder); return; - } else { - if (log_size_ <= req.prev_log_index || - GetLogEntry(req.prev_log_index).term != req.prev_log_term) { - AppendEntriesRes res(false, current_term_); - slk::Save(res, res_builder); - return; - } } // No need to call this function for a heartbeat @@ -253,89 +224,6 @@ void RaftServer::Start() { slk::Save(res, res_builder); }); - coordination_->Register( - [this](auto *req_reader, auto *res_builder) { - // Acquire snapshot lock. - std::lock_guard snapshot_guard(snapshot_lock_); - std::lock_guard guard(lock_); - - InstallSnapshotReq req; - slk::Load(&req, req_reader); - - if (exiting_ || req.term < current_term_) { - InstallSnapshotRes res(current_term_); - slk::Save(res, res_builder); - return; - } - - // Check if the current state matches the one in snapshot - if (req.snapshot_metadata.last_included_index == last_applied_ && - req.snapshot_metadata.last_included_term == current_term_) { - InstallSnapshotRes res(current_term_); - slk::Save(res, res_builder); - return; - } - - VLOG(40) << "[InstallSnapshotRpc] Starting."; - - if (req.term > current_term_) { - VLOG(40) << "[InstallSnapshotRpc] Updating term."; - SetCurrentTerm(req.term); - if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER); - } - - utils::OnScopeExit extend_election_timeout([this] { - SetNextElectionTimePoint(); - election_change_.notify_all(); - }); - - // Temporary freeze election timer while we handle the snapshot. - next_election_ = TimePoint::max(); - - VLOG(40) << "[InstallSnapshotRpc] Remove all snapshots."; - // Remove all previous snapshots - durability::RemoveAllSnapshots(durability_dir_); - - const auto snapshot_path = durability::MakeSnapshotPath( - durability_dir_, req.snapshot_metadata.snapshot_filename); - - // Save snapshot file - { - VLOG(40) << "[InstallSnapshotRpc] Saving received snapshot."; - std::ofstream output_stream; - output_stream.open(snapshot_path, std::ios::out | std::ios::binary); - output_stream.write(req.data.data(), req.data.size()); - output_stream.flush(); - output_stream.close(); - } - - // Discard the all logs. We keep the one at index 0. - VLOG(40) << "[InstallSnapshotRpc] Discarding logs."; - log_.clear(); - for (uint64_t i = 1; i < log_size_; ++i) - disk_storage_.Delete(LogEntryKey(i)); - - // Reset the database. - VLOG(40) << "[InstallSnapshotRpc] Reset database."; - db_->Reset(); - - // Apply the snapshot. - VLOG(40) << "[InstallSnapshotRpc] Recover from received snapshot."; - RecoverSnapshot(req.snapshot_metadata.snapshot_filename); - - VLOG(40) << "[InstallSnapshotRpc] Persist snapshot metadata."; - PersistSnapshotMetadata(req.snapshot_metadata); - - // Update the state to match the one from snapshot. - VLOG(40) << "[InstallSnapshotRpc] Update Raft state."; - last_applied_ = req.snapshot_metadata.last_included_index; - commit_index_ = req.snapshot_metadata.last_included_index; - SetLogSize(req.snapshot_metadata.last_included_index + 1); - - InstallSnapshotRes res(current_term_); - slk::Save(res, res_builder); - }); - // start threads SetNextElectionTimePoint(); @@ -347,8 +235,6 @@ void RaftServer::Start() { } no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this); - - snapshot_thread_ = std::thread(&RaftServer::SnapshotThread, this); } void RaftServer::Shutdown() { @@ -372,7 +258,6 @@ void RaftServer::Shutdown() { if (election_thread_.joinable()) election_thread_.join(); if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join(); - if (snapshot_thread_.joinable()) snapshot_thread_.join(); } void RaftServer::SetCurrentTerm(uint64_t new_current_term) { @@ -394,43 +279,6 @@ void RaftServer::SetLogSize(uint64_t new_log_size) { disk_storage_.Put(kLogSizeKey, std::to_string(new_log_size)); } -std::optional RaftServer::GetSnapshotMetadata() { - auto opt_value = disk_storage_.Get(kSnapshotMetadataKey); - if (opt_value == std::nullopt) { - return std::nullopt; - } - - auto &value = *opt_value; - slk::Reader reader(reinterpret_cast(value.data()), - value.size()); - SnapshotMetadata deserialized; - try { - slk::Load(&deserialized, &reader); - reader.Finalize(); - } catch (const slk::SlkReaderException &) { - return std::nullopt; - } - return std::make_optional(deserialized); -} - -void RaftServer::PersistSnapshotMetadata( - const SnapshotMetadata &snapshot_metadata) { - std::stringstream stream(std::ios_base::in | std::ios_base::out | - std::ios_base::binary); - slk::Builder builder( - [&stream](const uint8_t *data, size_t size, bool have_more) { - for (size_t i = 0; i < size; ++i) { - stream << utils::MemcpyCast(data[i]); - } - }); - slk::Save(snapshot_metadata, &builder); - builder.Finalize(); - disk_storage_.Put(kSnapshotMetadataKey, stream.str()); - - // Keep the metadata in memory for faster access. - snapshot_metadata_.emplace(snapshot_metadata); -} - std::optional RaftServer::Emplace( const std::vector &deltas) { std::unique_lock lock(lock_); @@ -544,11 +392,6 @@ void RaftServer::Transition(const Mode &new_mode) { // Re-apply raft log. uint64_t starting_index = 1; - if (snapshot_metadata_) { - RecoverSnapshot(snapshot_metadata_->snapshot_filename); - starting_index = snapshot_metadata_->last_included_index + 1; - } - for (uint64_t i = starting_index; i <= commit_index_; ++i) { ApplyStateDeltas(GetLogEntry(i).deltas); } @@ -681,27 +524,16 @@ void RaftServer::AdvanceCommitIndex() { void RaftServer::SendEntries(uint16_t peer_id, std::unique_lock *lock) { - if (snapshot_metadata_ && - snapshot_metadata_->last_included_index >= next_index_[peer_id]) { - SendSnapshot(peer_id, *snapshot_metadata_, lock); - } else { - SendLogEntries(peer_id, snapshot_metadata_, lock); - } + SendLogEntries(peer_id, lock); } -void RaftServer::SendLogEntries( - uint16_t peer_id, const std::optional &snapshot_metadata, - std::unique_lock *lock) { +void RaftServer::SendLogEntries(uint16_t peer_id, + std::unique_lock *lock) { uint64_t request_term = current_term_; uint64_t request_prev_log_index = next_index_[peer_id] - 1; uint64_t request_prev_log_term; - if (snapshot_metadata && - snapshot_metadata->last_included_index == next_index_[peer_id] - 1) { - request_prev_log_term = snapshot_metadata->last_included_term; - } else { - request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term; - } + request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term; std::vector request_entries; if (next_index_[peer_id] <= log_size_ - 1) @@ -775,76 +607,6 @@ void RaftServer::SendLogEntries( state_changed_.notify_all(); } -void RaftServer::SendSnapshot(uint16_t peer_id, - const SnapshotMetadata &snapshot_metadata, - std::unique_lock *lock) { - uint64_t request_term = current_term_; - std::string snapshot_data; - - // TODO: The snapshot is currently sent all at once. Because the snapshot file - // can be extremely large (>100GB, it contains the whole database) it must be - // sent out in chunks! Reimplement this logic so that it sends out the - // snapshot in chunks. - - { - const auto snapshot_path = durability::MakeSnapshotPath( - durability_dir_, snapshot_metadata.snapshot_filename); - - std::ifstream input_stream; - input_stream.open(snapshot_path, std::ios::in | std::ios::binary); - input_stream.seekg(0, std::ios::end); - uint64_t snapshot_size = input_stream.tellg(); - - snapshot_data = std::string(snapshot_size, '\0'); - - input_stream.seekg(0, std::ios::beg); - input_stream.read(snapshot_data.data(), snapshot_size); - input_stream.close(); - } - - VLOG(40) << "Server " << server_id_ - << ": Sending Snapshot RPC to server " << peer_id - << " (Term: " << current_term_ << ")"; - VLOG(40) << "Snapshot size: " << snapshot_data.size() << " bytes."; - - // Copy all internal variables before releasing the lock. - auto server_id = server_id_; - - // Execute the RPC. - lock->unlock(); - auto reply = coordination_->ExecuteOnOtherNode( - peer_id, server_id, request_term, snapshot_metadata, - std::move(snapshot_data)); - lock->lock(); - - if (!reply) { - next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; - return; - } - - if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) { - return; - } - - if (OutOfSync(reply->term)) { - state_changed_.notify_all(); - return; - } - - if (reply->term != current_term_) { - VLOG(40) << "Server " << server_id_ - << ": Ignoring stale InstallSnapshotRpc reply from " << peer_id; - return; - } - - match_index_[peer_id] = snapshot_metadata.last_included_index; - next_index_[peer_id] = snapshot_metadata.last_included_index + 1; - index_offset_[peer_id] = 1; - next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; - - state_changed_.notify_all(); -} - void RaftServer::ElectionThreadMain() { utils::ThreadSetName("ElectionThread"); std::unique_lock lock(lock_); @@ -1010,85 +772,6 @@ void RaftServer::NoOpIssuerThreadMain() { } } -void RaftServer::SnapshotThread() { - utils::ThreadSetName(fmt::format("RaftSnapshot")); - if (config_.log_size_snapshot_threshold == -1) return; - - while (true) { - { - // Acquire snapshot lock before we acquire the Raft lock. This should - // avoid the situation where we release the lock to start writing a - // snapshot but the `InstallSnapshotRpc` deletes it and we write wrong - // metadata. - std::lock_guard snapshot_guard(snapshot_lock_); - std::unique_lock lock(lock_); - if (exiting_) break; - - uint64_t committed_log_size = last_applied_; - if (snapshot_metadata_) { - committed_log_size -= snapshot_metadata_->last_included_index; - } - - // Compare the log size to the config - if (config_.log_size_snapshot_threshold < committed_log_size) { - VLOG(40) << "[LogCompaction] Starting log compaction."; - - uint64_t last_included_term = 0; - uint64_t last_included_index = 0; - std::string snapshot_filename; - bool status = false; - - { - // Create a DB accessor for snapshot creation - auto dba = db_->Access(); - last_included_term = GetLogEntry(last_applied_).term; - last_included_index = last_applied_; - snapshot_filename = durability::GetSnapshotFilename( - last_included_term, last_included_index); - - lock.unlock(); - VLOG(40) << "[LogCompaction] Creating snapshot."; - status = durability::MakeSnapshot(*db_, dba, durability_dir_, - snapshot_filename); - - // Raft lock must be released when destroying dba object. - // Destroy the db accessor - } - - lock.lock(); - - if (status) { - uint64_t log_compaction_start_index = 1; - if (snapshot_metadata_) { - log_compaction_start_index = - snapshot_metadata_->last_included_index + 1; - } - - VLOG(40) << "[LogCompaction] Persisting snapshot metadata"; - PersistSnapshotMetadata( - {last_included_term, last_included_index, snapshot_filename}); - - // Log compaction. - VLOG(40) << "[LogCompaction] Compacting log from " - << log_compaction_start_index << " to " - << last_included_index; - for (int i = log_compaction_start_index; i <= last_included_index; - ++i) { - log_.erase(i); - disk_storage_.Delete(LogEntryKey(i)); - } - // After we deleted entries from the persistent store, make sure we - // compact the files and actually reduce used disk space. - disk_storage_.CompactRange(LogEntryKey(log_compaction_start_index), - LogEntryKey(last_included_index)); - } - } - } - - std::this_thread::sleep_for(kSnapshotPeriod); - } -} - void RaftServer::SetNextElectionTimePoint() { // [Raft thesis, section 3.4] // "Raft uses randomized election timeouts to ensure that split votes are @@ -1112,10 +795,6 @@ bool RaftServer::HasMajorityVote() { } std::pair RaftServer::LastEntryData() { - if (snapshot_metadata_ && - snapshot_metadata_->last_included_index == log_size_ - 1) { - return {log_size_, snapshot_metadata_->last_included_term}; - } return {log_size_, last_entry_term_}; } @@ -1147,7 +826,7 @@ bool RaftServer::OutOfSync(uint64_t reply_term) { return false; } -LogEntry RaftServer::GetLogEntry(int index) { +LogEntry RaftServer::GetLogEntry(uint64_t index) { auto it = log_.find(index); if (it != log_.end()) return it->second; // retrieve in-mem if possible @@ -1235,15 +914,6 @@ LogEntry RaftServer::DeserializeLogEntry( return deserialized; } -void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) { - durability::RecoveryData recovery_data; - bool recovery = durability::RecoverSnapshot( - db_, &recovery_data, durability_dir_, snapshot_filename); - - CHECK(recovery); - durability::RecoverIndexes(db_, recovery_data.indexes); -} - void RaftServer::NoOpCreate() { // TODO(ipaljak): Review this after implementing RaftDelta object. auto dba = db_->Access(); diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 7f3766822..5fe34e19f 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -16,7 +16,6 @@ #include "raft/raft_rpc_messages.hpp" #include "raft/replication_log.hpp" #include "raft/replication_timeout_map.hpp" -#include "raft/snapshot_metadata.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" #include "utils/scheduler.hpp" @@ -84,14 +83,6 @@ class RaftServer final : public RaftInterface { /// as its in-memory copy. void SetLogSize(uint64_t new_log_size); - /// Retrieves persisted snapshot metadata or nullopt if not present. - /// Snapshot metadata is a triplet consisting of the last included term, last - /// last included log entry index and the snapshot filename. - std::optional GetSnapshotMetadata(); - - /// Persists snapshot metadata. - void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata); - /// Emplace a new LogEntry in the raft log and start its replication. This /// entry is created from a given batched set of StateDelta objects. /// @@ -138,7 +129,6 @@ class RaftServer final : public RaftInterface { private: mutable std::mutex lock_; ///< Guards all internal state. - mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal. mutable std::mutex heartbeat_lock_; ///< Guards HB issuing ////////////////////////////////////////////////////////////////////////////// @@ -174,10 +164,6 @@ class RaftServer final : public RaftInterface { std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op ///< command on leader change. - std::thread snapshot_thread_; ///< Thread responsible for snapshot creation - ///< when log size reaches - ///< `log_size_snapshot_threshold`. - std::condition_variable leader_changed_; ///< Notifies the ///< no_op_issuer_thread that a new ///< leader has been elected. @@ -256,8 +242,6 @@ class RaftServer final : public RaftInterface { std::map log_; - std::optional snapshot_metadata_; - /// Recovers persistent data from disk and stores its in-memory copies /// that insure faster read-only operations. This method should be called /// on start-up. If parts of persistent data are missing, the method won't @@ -285,22 +269,18 @@ class RaftServer final : public RaftInterface { /// mode. /// /// @param peer_id ID of the peer which receives entries. - /// @param snapshot_metadata metadata of the last snapshot, if any. /// @param lock Lock from the peer thread (released while waiting for /// response) void SendLogEntries(uint16_t peer_id, - const std::optional &snapshot_metadata, std::unique_lock *lock); /// Send Snapshot to peer. This function should only be called in leader /// mode. /// /// @param peer_id ID of the peer which receives entries. - /// @param snapshot_metadata metadata of the snapshot to send. /// @param lock Lock from the peer thread (released while waiting for /// response) - void SendSnapshot(uint16_t peer_id, const SnapshotMetadata &snapshot_metadata, - std::unique_lock *lock); + void SendSnapshot(uint16_t peer_id, std::unique_lock *lock); /// Main function of the `election_thread_`. It is responsible for /// transition to CANDIDATE mode when election timeout elapses. @@ -325,11 +305,6 @@ class RaftServer final : public RaftInterface { /// have been replicated on a majority of peers. void NoOpIssuerThreadMain(); - /// Periodically checks if the Log size reached `log_size_snapshot_threshold` - /// parameter. If it has, then it performs log compaction and creates - /// snapshots. - void SnapshotThread(); - /// Sets the `TimePoint` for next election. void SetNextElectionTimePoint(); @@ -364,7 +339,7 @@ class RaftServer final : public RaftInterface { /// Retrieves a log entry from the log at a given index. /// /// @param index Index of the log entry to be retrieved. - LogEntry GetLogEntry(int index); + LogEntry GetLogEntry(uint64_t index); /// Deletes log entries with indexes that are greater or equal to the given /// starting index. @@ -406,9 +381,6 @@ class RaftServer final : public RaftInterface { /// Deserialized Raft log entry from `std::string` LogEntry DeserializeLogEntry(const std::string &serialized_log_entry); - /// Recovers the given snapshot if it exists in the durability directory. - void RecoverSnapshot(const std::string &snapshot_filename); - /// Start a new transaction with a NO-OP StateDelta. void NoOpCreate(); diff --git a/src/raft/snapshot_metadata.lcp b/src/raft/snapshot_metadata.lcp deleted file mode 100644 index a2c3e4aa4..000000000 --- a/src/raft/snapshot_metadata.lcp +++ /dev/null @@ -1,24 +0,0 @@ -#>cpp -#pragma once -#include - -#include "utils/typeinfo.hpp" -cpp<# - -(lcp:namespace raft) - -(lcp:define-struct snapshot-metadata () - ((last-included-term :uint64_t) - (last-included-index :uint64_t) - (snapshot-filename "std::string")) - (:public #>cpp - SnapshotMetadata() = default; - SnapshotMetadata(uint64_t _last_included_term, uint64_t _last_included_index, - const std::string &_snapshot_filename) - : last_included_term(_last_included_term), - last_included_index(_last_included_index), - snapshot_filename(_snapshot_filename) {} - cpp<#) - (:serialize (:slk))) - -(lcp:pop-namespace) ;; raft