From 8c51d2fa0b12ba6e75f76945ea8a5c5638619963 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Tue, 18 Dec 2018 14:31:55 +0100 Subject: [PATCH] Add ReplicationLog to RaftServer Summary: * renamed `HasCommitted` to `SafeToCommit` * implemented (c/p) `ReplicationLog` Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1776 --- src/database/single_node_ha/graph_db.cpp | 4 +- src/database/single_node_ha/graph_db.hpp | 4 +- src/raft/exceptions.hpp | 12 +++ src/raft/raft_interface.hpp | 6 +- src/raft/raft_server.cpp | 41 +++++++-- src/raft/raft_server.hpp | 21 +++-- src/raft/replication_log.hpp | 87 +++++++++++++++++++ src/storage/single_node_ha/storage_gc.hpp | 17 ++-- src/transactions/single_node_ha/engine.cpp | 2 +- tests/unit/CMakeLists.txt | 3 + tests/unit/replication_log.cpp | 65 ++++++++++++++ .../transaction_engine_single_node_ha.cpp | 2 +- 12 files changed, 238 insertions(+), 26 deletions(-) create mode 100644 src/raft/replication_log.hpp create mode 100644 tests/unit/replication_log.cpp diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 15600781f..782b14062 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -129,8 +129,8 @@ void GraphDb::Reset() { // state. tx_engine_.Reset(); - storage_gc_ = - std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); + storage_gc_ = std::make_unique( + *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); } } // namespace database diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 7c192c717..511e1fbcb 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -173,8 +173,8 @@ class GraphDb { &delta_applier_, [this]() { this->Reset(); }}; tx::Engine tx_engine_{&raft_server_}; - std::unique_ptr storage_gc_ = - std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); + std::unique_ptr storage_gc_ = std::make_unique( + *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); storage::ConcurrentIdMapper label_mapper_{ storage_->PropertiesOnDisk()}; storage::ConcurrentIdMapper edge_mapper_{ diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index fab721b11..8ccc2703d 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -54,4 +54,16 @@ class MissingPersistentDataException : public RaftException { key) {} }; +/// This exception should be thrown when a `RaftServer` instance attempts to +/// read from replication log from a invalid mode or for a garbage collected +/// transaction. +class InvalidReplicationLogLookup : public RaftException { + public: + using RaftException::RaftException; + InvalidReplicationLogLookup() + : RaftException( + "Replication log lookup for invalid transaction or from invalid " + "mode.") {} +}; + } // namespace raft diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index d7c9de4a0..f8edacaf5 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -14,9 +14,9 @@ class RaftInterface { /// Add StateDelta to the appropriate Raft log entry. virtual void Emplace(const database::StateDelta &) = 0; - /// Check if the transaction with the given transaction id has been - /// replicated on the majority of the Raft cluster and commited. - virtual bool HasCommitted(const tx::TransactionId &tx_id) = 0; + /// Checks if the transaction with the given transaction id can safely be + /// committed in local storage. + virtual bool SafeToCommit(const tx::TransactionId &tx_id) = 0; protected: ~RaftInterface() {} diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 284a7ed4d..7a66b2977 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -26,6 +27,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, : config_(config), coordination_(coordination), delta_applier_(delta_applier), + rlog_(std::make_unique()), mode_(Mode::FOLLOWER), server_id_(server_id), disk_storage_(fs::path(durability_dir) / kRaftDir), @@ -203,7 +205,9 @@ std::vector RaftServer::Log() { return DeserializeLog(opt_value.value()); } -void RaftServer::Replicate(const std::vector &log) { +void RaftServer::AppendToLog(const tx::TransactionId &tx_id, + const std::vector &log) { + rlog_->set_active(tx_id); throw utils::NotYetImplemented("RaftServer replication"); } @@ -211,11 +215,29 @@ void RaftServer::Emplace(const database::StateDelta &delta) { log_entry_buffer_.Emplace(delta); } -bool RaftServer::HasCommitted(const tx::TransactionId &tx_id) { - // When in follower mode return true. - // Raise an exception if in candidate mode (should't happen). - // Check the state and return the correct value if leader. - return true; +bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { + std::unique_lock lock(lock_); + + switch (mode_) { + case Mode::FOLLOWER: + // When in follower mode, we will only try to apply a Raft Log when we + // receive a commit index greater or equal from the Log index from the + // leader. At that moment we don't have to check the replication log + // because the leader won't commit the Log locally if it's not replicated + // on the majority of the peers in the cluster. This is why we can short + // circut the check to always return true if in follower mode. + return true; + case Mode::LEADER: + if (rlog_->is_active(tx_id)) return false; + if (rlog_->is_replicated(tx_id)) return true; + // The only possibility left is that our ReplicationLog doesn't contain + // information about that tx. Let's log that on our way out. + break; + case Mode::CANDIDATE: + break; + } + + throw InvalidReplicationLogLookup(); } RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) @@ -248,7 +270,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { log.emplace_back(std::move(delta)); logs_.erase(it); - raft_server_->Replicate(log); + raft_server_->AppendToLog(tx_id, log); } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { auto it = logs_.find(tx_id); CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; @@ -263,6 +285,7 @@ void RaftServer::Transition(const Mode &new_mode) { case Mode::FOLLOWER: { if (mode_ == Mode::LEADER) { reset_callback_(); + ResetReplicationLog(); } LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER (Term: " << CurrentTerm() << ")"; @@ -575,4 +598,8 @@ std::vector RaftServer::DeserializeLog( return deserialized_log; } +void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { + rlog_->garbage_collect_older(tx_id); +} + } // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 05e3919d4..92a6e842f 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -13,6 +13,7 @@ #include "raft/log_entry.hpp" #include "raft/raft_interface.hpp" #include "raft/raft_rpc_messages.hpp" +#include "raft/replication_log.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" #include "utils/scheduler.hpp" @@ -79,16 +80,20 @@ class RaftServer final : public RaftInterface { /// persistent storage, an empty Log will be created. std::vector Log(); - /// Start replicating StateDeltas batched together in a Raft log. - void Replicate(const std::vector &log); + /// Append the log to the list of completed logs that are ready to be + /// replicated. + void AppendToLog(const tx::TransactionId &tx_id, + const std::vector &log); /// Emplace a single StateDelta to the corresponding batch. If the StateDelta /// marks the transaction end, it will replicate the log accorss the cluster. void Emplace(const database::StateDelta &delta) override; - /// Check if the transaction with the given transaction id has been - /// replicated on the majority of the Raft cluster and commited. - bool HasCommitted(const tx::TransactionId &tx_id) override; + /// Checks if the transaction with the given transaction id can safely be + /// committed in local storage. + bool SafeToCommit(const tx::TransactionId &tx_id) override; + + void GarbageCollectReplicationLog(const tx::TransactionId &tx_id); private: /// Buffers incomplete Raft logs. @@ -136,6 +141,7 @@ class RaftServer final : public RaftInterface { Config config_; ///< Raft config. Coordination *coordination_{nullptr}; ///< Cluster coordination. database::StateDeltaApplier *delta_applier_{nullptr}; + std::unique_ptr rlog_{nullptr}; Mode mode_; ///< Server's current mode. uint16_t server_id_; ///< ID of the current server. @@ -283,5 +289,10 @@ class RaftServer final : public RaftInterface { /// Deserializes Raft log from `std::string`. std::vector DeserializeLog(const std::string &serialized_log); + + void ResetReplicationLog() { + rlog_ = nullptr; + rlog_ = std::make_unique(); + } }; } // namespace raft diff --git a/src/raft/replication_log.hpp b/src/raft/replication_log.hpp new file mode 100644 index 000000000..2a7a517cf --- /dev/null +++ b/src/raft/replication_log.hpp @@ -0,0 +1,87 @@ +/// @file +#pragma once + +#include + +#include "data_structures/bitset/dynamic_bitset.hpp" +#include "transactions/type.hpp" + +namespace raft { + +/// Tracks information about replicated and active logs for high availability. +/// +/// The main difference between ReplicationLog and CommitLog is that +/// ReplicationLog doesn't throw when looking up garbage collected transaction +/// ids. +class ReplicationLog final { + public: + static constexpr int kBitsetBlockSize = 32768; + + ReplicationLog() = default; + ReplicationLog(const ReplicationLog &) = delete; + ReplicationLog(ReplicationLog &&) = delete; + ReplicationLog &operator=(const ReplicationLog &) = delete; + ReplicationLog &operator=(ReplicationLog &&) = delete; + + bool is_active(tx::TransactionId id) const { + return fetch_info(id).is_active(); + } + + void set_active(tx::TransactionId id) { log.set(2 * id); } + + bool is_replicated(tx::TransactionId id) const { + return fetch_info(id).is_replicated(); + } + + void set_replicated(tx::TransactionId id) { log.set(2 * id + 1); } + + // Clears the replication log from bits associated with transactions with an + // id lower than `id`. + void garbage_collect_older(tx::TransactionId id) { + log.delete_prefix(2 * id); + valid_prefix = 2 * (id + 1); + } + + class Info final { + public: + enum Status { + UNKNOWN = 0, // 00 + ACTIVE = 1, // 01 + REPLICATED = 2, // 10 + }; + + explicit Info(uint8_t flags) { + if (flags & REPLICATED) { + flags_ = REPLICATED; + } else if (flags & ACTIVE) { + flags_ = ACTIVE; + } else { + flags_ = UNKNOWN; + } + } + + bool is_active() const { + if (flags_ & REPLICATED) return false; + return flags_ & ACTIVE; + } + + bool is_replicated() const { return flags_ & REPLICATED; } + + operator uint8_t() const { return flags_; } + + private: + uint8_t flags_{0}; + }; + + Info fetch_info(tx::TransactionId id) const { + if (valid_prefix > 2 * id) return Info{0}; + + return Info{log.at(2 * id, 2)}; + } + + private: + DynamicBitset log; + std::atomic valid_prefix{0}; +}; + +} // namespace raft diff --git a/src/storage/single_node_ha/storage_gc.hpp b/src/storage/single_node_ha/storage_gc.hpp index 64b1a5bc7..44c4cc370 100644 --- a/src/storage/single_node_ha/storage_gc.hpp +++ b/src/storage/single_node_ha/storage_gc.hpp @@ -4,12 +4,13 @@ #include #include "data_structures/concurrent/concurrent_map.hpp" -#include "storage/single_node_ha/mvcc/version_list.hpp" +#include "raft/raft_server.hpp" #include "stats/metrics.hpp" #include "storage/single_node_ha/deferred_deleter.hpp" #include "storage/single_node_ha/edge.hpp" #include "storage/single_node_ha/garbage_collector.hpp" #include "storage/single_node_ha/gid.hpp" +#include "storage/single_node_ha/mvcc/version_list.hpp" #include "storage/single_node_ha/storage.hpp" #include "storage/single_node_ha/vertex.hpp" #include "transactions/single_node_ha/engine.hpp" @@ -41,8 +42,10 @@ class StorageGc { /** Creates a garbage collector for the given storage that uses the given * tx::Engine. If `pause_sec` is greater then zero, then GC gets triggered * periodically. */ - StorageGc(Storage &storage, tx::Engine &tx_engine, int pause_sec) + StorageGc(Storage &storage, tx::Engine &tx_engine, + raft::RaftServer *raft_server, int pause_sec) : tx_engine_(tx_engine), + raft_server_(raft_server), storage_(storage), vertices_(storage.vertices_), edges_(storage.edges_) { @@ -75,9 +78,12 @@ class StorageGc { StorageGc &operator=(const StorageGc &) = delete; StorageGc &operator=(StorageGc &&) = delete; - void CollectCommitLogGarbage(tx::TransactionId oldest_active) { + void CollectLogGarbage(tx::TransactionId oldest_active) { auto safe_to_delete = GetClogSafeTransaction(oldest_active); - if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete); + if (safe_to_delete) { + tx_engine_.GarbageCollectCommitLog(*safe_to_delete); + raft_server_->GarbageCollectReplicationLog(*safe_to_delete); + } } void CollectGarbage() { @@ -121,7 +127,7 @@ class StorageGc { << x.Elapsed().count(); } - CollectCommitLogGarbage(snapshot_gc.back()); + CollectLogGarbage(snapshot_gc.back()); gc_txid_ranges_.emplace(snapshot_gc.back(), tx_engine_.GlobalLast()); VLOG(21) << "gc snapshot: " << snapshot_gc; @@ -155,6 +161,7 @@ class StorageGc { } tx::Engine &tx_engine_; + raft::RaftServer *raft_server_; utils::Scheduler scheduler_; private: diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 3f6c237f5..836ad6047 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -81,7 +81,7 @@ void Engine::Commit(const Transaction &t) { raft_->Emplace(database::StateDelta::TxCommit(t.id_)); // Wait for Raft to receive confirmation from the majority of followers. - while (!raft_->HasCommitted(t.id_)) { + while (!raft_->SafeToCommit(t.id_)) { std::this_thread::sleep_for(std::chrono::microseconds(100)); } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index b33adf695..a51854c30 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -158,6 +158,9 @@ target_link_libraries(${test_prefix}pod_buffer mg-single-node kvstore_dummy_lib) add_unit_test(property_value_store.cpp) target_link_libraries(${test_prefix}property_value_store kvstore_lib mg-single-node) +add_unit_test(replication_log.cpp) +target_link_libraries(${test_prefix}replication_log mg-single-node-ha kvstore_lib glog) + add_unit_test(query_cost_estimator.cpp) target_link_libraries(${test_prefix}query_cost_estimator mg-single-node kvstore_dummy_lib) diff --git a/tests/unit/replication_log.cpp b/tests/unit/replication_log.cpp new file mode 100644 index 000000000..7943a5fb5 --- /dev/null +++ b/tests/unit/replication_log.cpp @@ -0,0 +1,65 @@ +#include +#include + +#include "raft/replication_log.hpp" + +using namespace tx; + +TEST(ReplicationLog, ActiveReplicated) { + raft::ReplicationLog rlog; + const tx::TransactionId tx_id = 10; + + EXPECT_FALSE(rlog.is_replicated(tx_id)); + EXPECT_FALSE(rlog.is_active(tx_id)); + + rlog.set_active(tx_id); + + EXPECT_FALSE(rlog.is_replicated(tx_id)); + EXPECT_TRUE(rlog.is_active(tx_id)); + + rlog.set_replicated(tx_id); + + EXPECT_TRUE(rlog.is_replicated(tx_id)); + EXPECT_FALSE(rlog.is_active(tx_id)); +} + +TEST(ReplicationLog, GarbageCollect) { + raft::ReplicationLog rlog; + + auto set_active = [&rlog](tx::TransactionId tx_id) { + rlog.set_active(tx_id); + EXPECT_TRUE(rlog.is_active(tx_id)); + }; + + auto set_replicated = [&rlog](tx::TransactionId tx_id) { + rlog.set_replicated(tx_id); + EXPECT_TRUE(rlog.is_replicated(tx_id)); + EXPECT_FALSE(rlog.is_active(tx_id)); + }; + + const int n = raft::ReplicationLog::kBitsetBlockSize; + + for (int i = 1; i < 3 * n; ++i) { + set_active(i); + } + + for (int i = 1; i < 2 * n; ++i) { + set_replicated(i); + } + + rlog.garbage_collect_older(n); + + for (int i = 1; i <= n; ++i) { + EXPECT_FALSE(rlog.is_active(i)); + EXPECT_FALSE(rlog.is_replicated(i)); + } + + for (int i = n + 1; i < 2 * n; ++i) { + EXPECT_FALSE(rlog.is_active(i)); + EXPECT_TRUE(rlog.is_replicated(i)); + } + + for (int i = 2 * n; i < 3 * n; ++i) { + EXPECT_TRUE(rlog.is_active(i)); + } +} diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp index 002800b68..76ed3c675 100644 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -16,7 +16,7 @@ class RaftMock final : public raft::RaftInterface { log_[delta.transaction_id].emplace_back(std::move(delta)); } - bool HasCommitted(const tx::TransactionId &tx_id) override { + bool SafeToCommit(const tx::TransactionId &tx_id) override { return true; }