From deee3b8ab7b4889f85924fd1366e3defc43ba316 Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Tue, 10 Sep 2019 15:51:04 +0200 Subject: [PATCH] Remove Raft's dependency on transaction id Reviewers: buda, mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2338 --- src/database/single_node_ha/graph_db.cpp | 2 + src/database/single_node_ha/graph_db.hpp | 5 +- .../single_node_ha/graph_db_accessor.cpp | 30 +-- .../single_node_ha/graph_db_accessor.hpp | 2 + src/raft/raft_interface.hpp | 88 +++++---- src/raft/raft_server.cpp | 174 ++++-------------- src/raft/raft_server.hpp | 111 ++++------- src/raft/replication_timeout_map.hpp | 20 +- .../single_node_ha/record_accessor.cpp | 8 +- .../single_node_ha/state_delta_buffer.hpp | 47 +++++ src/storage/single_node_ha/storage_gc.hpp | 1 - .../single_node_ha/vertex_accessor.cpp | 4 +- src/transactions/single_node_ha/engine.cpp | 151 +++++++++------ src/transactions/single_node_ha/engine.hpp | 4 +- tests/unit/CMakeLists.txt | 3 - .../transaction_engine_single_node_ha.cpp | 76 -------- 16 files changed, 310 insertions(+), 416 deletions(-) create mode 100644 src/storage/single_node_ha/state_delta_buffer.hpp delete mode 100644 tests/unit/transaction_engine_single_node_ha.cpp diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 3bac93fd4..9c1fb9f15 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -71,6 +71,8 @@ raft::StorageInfo *GraphDb::storage_info() { return &storage_info_; } tx::Engine &GraphDb::tx_engine() { return tx_engine_; } +storage::StateDeltaBuffer *GraphDb::sd_buffer() { return &sd_buffer_; } + storage::ConcurrentIdMapper &GraphDb::label_mapper() { return label_mapper_; } diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index da907fbb0..e110a31dc 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -13,6 +13,7 @@ #include "raft/storage_info.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/concurrent_id_mapper.hpp" +#include "storage/single_node_ha/state_delta_buffer.hpp" #include "storage/single_node_ha/storage.hpp" #include "storage/single_node_ha/storage_gc.hpp" #include "transactions/single_node_ha/engine.hpp" @@ -84,6 +85,7 @@ class GraphDb { raft::RaftInterface *raft(); raft::StorageInfo *storage_info(); tx::Engine &tx_engine(); + storage::StateDeltaBuffer *sd_buffer(); storage::ConcurrentIdMapper &label_mapper(); storage::ConcurrentIdMapper &edge_type_mapper(); storage::ConcurrentIdMapper &property_mapper(); @@ -138,8 +140,9 @@ class GraphDb { &coordination_, this}; raft::StorageInfo storage_info_{this, &coordination_, config_.server_id}; + storage::StateDeltaBuffer sd_buffer_; - tx::Engine tx_engine_{&raft_server_}; + tx::Engine tx_engine_{&raft_server_, &sd_buffer_}; std::unique_ptr storage_gc_ = std::make_unique( *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); storage::ConcurrentIdMapper label_mapper_{ diff --git a/src/database/single_node_ha/graph_db_accessor.cpp b/src/database/single_node_ha/graph_db_accessor.cpp index 7ed066c60..d7a9fd6a2 100644 --- a/src/database/single_node_ha/graph_db_accessor.cpp +++ b/src/database/single_node_ha/graph_db_accessor.cpp @@ -90,7 +90,13 @@ bool GraphDbAccessor::should_abort() const { return transaction_->should_abort(); } -raft::RaftInterface *GraphDbAccessor::raft() { return db_->raft(); } +raft::RaftInterface *GraphDbAccessor::raft() { + return db_->raft(); +} + +storage::StateDeltaBuffer *GraphDbAccessor::sd_buffer() { + return db_->sd_buffer(); +} VertexAccessor GraphDbAccessor::InsertVertex( std::optional requested_gid) { @@ -103,7 +109,7 @@ VertexAccessor GraphDbAccessor::InsertVertex( db_->storage().vertices_.access().insert(gid, vertex_vlist).second; CHECK(success) << "Attempting to insert a vertex with an existing GID: " << gid.AsUint(); - raft()->Emplace( + sd_buffer()->Emplace( database::StateDelta::CreateVertex(transaction_->id_, vertex_vlist->gid_)); auto va = VertexAccessor(vertex_vlist, *this); return va; @@ -165,9 +171,10 @@ void GraphDbAccessor::BuildIndex(storage::Label label, void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) { // Commit transaction as we finished applying method on newest visible - // records. Write that transaction's ID to the RaftServer as the index has been - // built at this point even if this DBA's transaction aborts for some reason. - raft()->Emplace(database::StateDelta::BuildIndex( + // records. Write that transaction's ID to the RaftServer as the index has + // been built at this point even if this DBA's transaction aborts for some + // reason. + sd_buffer()->Emplace(database::StateDelta::BuildIndex( transaction_id(), key.label_, LabelName(key.label_), key.property_, PropertyName(key.property_))); } @@ -190,7 +197,7 @@ void GraphDbAccessor::DeleteIndex(storage::Label label, auto dba = db_->AccessBlocking(std::make_optional(transaction_->id_)); db_->storage().label_property_index_.DeleteIndex(key); - dba.raft()->Emplace(database::StateDelta::DropIndex( + dba.sd_buffer()->Emplace(database::StateDelta::DropIndex( dba.transaction_id(), key.label_, LabelName(key.label_), key.property_, PropertyName(key.property_))); @@ -226,7 +233,7 @@ void GraphDbAccessor::BuildUniqueConstraint( return dba.PropertyName(property); }); - dba.raft()->Emplace(database::StateDelta::BuildUniqueConstraint( + dba.sd_buffer()->Emplace(database::StateDelta::BuildUniqueConstraint( dba.transaction().id_, label, dba.LabelName(label), properties, property_names)); @@ -264,7 +271,7 @@ void GraphDbAccessor::DeleteUniqueConstraint( return dba.PropertyName(property); }); - dba.raft()->Emplace(database::StateDelta::DropUniqueConstraint( + dba.sd_buffer()->Emplace(database::StateDelta::DropUniqueConstraint( dba.transaction().id_, label, dba.LabelName(label), properties, property_names)); @@ -420,7 +427,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor, return false; auto *vlist_ptr = vertex_accessor.address(); - raft()->Emplace(database::StateDelta::RemoveVertex( + sd_buffer()->Emplace(database::StateDelta::RemoveVertex( transaction_->id_, vlist_ptr->gid_, check_empty)); vlist_ptr->remove(vertex_accessor.current_, *transaction_); return true; @@ -467,7 +474,7 @@ EdgeAccessor GraphDbAccessor::InsertEdge( to.SwitchNew(); to.update().in_.emplace(from.address(), edge_vlist, edge_type); - raft()->Emplace(database::StateDelta::CreateEdge( + sd_buffer()->Emplace(database::StateDelta::CreateEdge( transaction_->id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type, EdgeTypeName(edge_type))); @@ -492,7 +499,8 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge, if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); edge.address()->remove(edge.current_, *transaction_); - raft()->Emplace(database::StateDelta::RemoveEdge(transaction_->id_, edge.gid())); + sd_buffer()->Emplace( + database::StateDelta::RemoveEdge(transaction_->id_, edge.gid())); } storage::Label GraphDbAccessor::Label(const std::string &label_name) { diff --git a/src/database/single_node_ha/graph_db_accessor.hpp b/src/database/single_node_ha/graph_db_accessor.hpp index 737ca37b9..65f35f321 100644 --- a/src/database/single_node_ha/graph_db_accessor.hpp +++ b/src/database/single_node_ha/graph_db_accessor.hpp @@ -15,6 +15,7 @@ #include "raft/raft_interface.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/edge_accessor.hpp" +#include "storage/single_node_ha/state_delta_buffer.hpp" #include "storage/single_node_ha/vertex_accessor.hpp" #include "transactions/transaction.hpp" #include "transactions/type.hpp" @@ -596,6 +597,7 @@ class GraphDbAccessor { const tx::Transaction &transaction() const { return *transaction_; } raft::RaftInterface *raft(); + storage::StateDeltaBuffer *sd_buffer(); auto &db() { return db_; } const auto &db() const { return db_; } diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index d80506a49..29e66c7aa 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -9,50 +9,50 @@ namespace raft { -enum class TxStatus { REPLICATED, WAITING, ABORTED, INVALID }; +enum class ReplicationStatus { REPLICATED, WAITING, ABORTED, INVALID }; -inline std::string TxStatusToString(const TxStatus &tx_status) { - switch (tx_status) { - case TxStatus::REPLICATED: +inline std::string ReplicationStatusToString( + const ReplicationStatus &replication_status) { + switch (replication_status) { + case ReplicationStatus::REPLICATED: return "REPLICATED"; - case TxStatus::WAITING: + case ReplicationStatus::WAITING: return "WAITING"; - case TxStatus::ABORTED: + case ReplicationStatus::ABORTED: return "ABORTED"; - case TxStatus::INVALID: + case ReplicationStatus::INVALID: return "INVALID"; } } -/// Structure which describes the StateDelta status after the execution of -/// RaftServer's Emplace method. +/// Structure which describes the status of a newly created LogEntry after the +/// execution of RaftServer's Emplace method. /// -/// It consists of two fields: -/// 1) A boolean flag `emplaced` which signals whether the delta has -/// successfully been emplaced in the raft log buffer. -/// 2) Two optional unsigned 64-bit integers which denote the term -/// when the corresponding LogEntry was emplaced and its log_index in -/// the Raft log. These values are contained in the optional metadata only -/// if the emplaced StateDelta signifies the COMMIT of a non-read-only -/// transaction. -struct DeltaStatus { - bool emplaced; - std::optional term_id; - std::optional log_index; +/// It consists of two unsigned 64-bit integers which uniquely describe +/// the emplaced LogEntry: +/// 1) Term when the LogEntry was emplaced to the Raft log. +/// 2) Index of the entry within the Raft log. +/// +/// In the case an entry was not successfully emplaced (e.g. unexpected +/// leadership change), the values will have a std::nullopt value instead. +struct LogEntryStatus { + uint64_t term_id; + uint64_t log_index; }; -/// Exposes only functionality that other parts of Memgraph can interact with, -/// emplacing a state delta into the appropriate Raft log entry. +/// Exposes only functionality that other parts of Memgraph can interact with. class RaftInterface { public: - /// Add StateDelta to the appropriate Raft log entry. + /// Emplace a new LogEntry in the raft log and start its replication. This + /// entry is created from a given batched set of StateDelta objects. /// - /// @returns DeltaStatus object as a result. - virtual DeltaStatus Emplace(const database::StateDelta &) = 0; - - /// Checks if the transaction with the given transaction id can safely be - /// committed in local storage. - virtual bool SafeToCommit(const tx::TransactionId &) = 0; + /// It is possible that the entry was not successfully emplaced. In that case, + /// the method returns std::nullopt and the caller is responsible for handling + /// situation correctly (e.g. aborting the corresponding transaction). + /// + /// @returns an optional LogEntryStatus object as result. + virtual std::optional Emplace( + const std::vector &) = 0; /// Returns true if the current servers mode is LEADER. False otherwise. virtual bool IsLeader() = 0; @@ -60,18 +60,32 @@ class RaftInterface { /// Returns the term ID of the current leader. virtual uint64_t TermId() = 0; - /// Returns the status of the transaction which began its replication in + /// Returns the replication status of LogEntry which began its replication in /// a given term ID and was emplaced in the raft log at the given index. /// - /// Transaction status can be one of the following: - /// 1) REPLICATED -- transaction was successfully replicated accross + /// Replication status can be one of the following + /// 1) REPLICATED -- LogEntry was successfully replicated across /// the Raft cluster - /// 2) WAITING -- transaction was successfully emplaced in the Raft + /// 2) WAITING -- LogEntry was successfully emplaced in the Raft /// log and is currently being replicated. - /// 3) ABORTED -- transaction was aborted. - /// 4) INVALID -- the request for the transaction was invalid, most + /// 3) ABORTED -- LogEntry will not be replicated. + /// 4) INVALID -- the request for the LogEntry was invalid, most /// likely either term_id or log_index were out of range. - virtual TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) = 0; + virtual ReplicationStatus GetReplicationStatus(uint64_t term_id, + uint64_t log_index) = 0; + + /// Checks if the LogEntry with the give term id and log index can safely be + /// committed in local storage. + /// + /// @param term_id term when the LogEntry was created + /// @param log_index index of the LogEntry in the Raft log + /// + /// @return bool True if the transaction is safe to commit, false otherwise. + /// + /// @throws ReplicationTimeoutException + /// @throws RaftShutdownException + /// @throws InvalidReplicationLogLookup + virtual bool SafeToCommit(uint64_t term_id, uint64_t log_index) = 0; virtual std::mutex &WithLock() = 0; diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index cdb7c667f..141f83ec4 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -41,7 +41,6 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, : config_(config), coordination_(coordination), db_(db), - rlog_(std::make_unique()), mode_(Mode::FOLLOWER), server_id_(server_id), durability_dir_(fs::path(durability_dir)), @@ -432,22 +431,13 @@ void RaftServer::PersistSnapshotMetadata( snapshot_metadata_.emplace(snapshot_metadata); } -std::pair, std::optional> -RaftServer::AppendToLog(const tx::TransactionId &tx_id, - const std::vector &deltas) { +std::optional RaftServer::Emplace( + const std::vector &deltas) { std::unique_lock lock(lock_); - DCHECK(mode_ == Mode::LEADER) - << "`AppendToLog` should only be called in LEADER mode"; - if (deltas.size() == 2) { - DCHECK(deltas[0].type == database::StateDelta::Type::TRANSACTION_BEGIN && - deltas[1].type == database::StateDelta::Type::TRANSACTION_COMMIT) - << "Transactions with two state deltas must be reads (start with BEGIN " - "and end with COMMIT)"; - rlog_->set_replicated(tx_id); - return {std::nullopt, std::nullopt}; + if (mode_ != Mode::LEADER) { + return std::nullopt; } - rlog_->set_active(tx_id); LogEntry new_entry(current_term_, deltas); log_[log_size_] = new_entry; @@ -460,136 +450,55 @@ RaftServer::AppendToLog(const tx::TransactionId &tx_id, for (auto &peer_replication : next_replication_) peer_replication = now; // From this point on, we can say that the replication of a LogEntry started. - replication_timeout_.Insert(tx_id); + replication_timeout_.Insert(new_entry.term, log_size_ - 1); state_changed_.notify_all(); - return std::make_pair(current_term_.load(), log_size_ - 1); -} - -DeltaStatus RaftServer::Emplace(const database::StateDelta &delta) { - return log_entry_buffer_.Emplace(delta); -} - -bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { - switch (mode_) { - case Mode::CANDIDATE: - // When Memgraph first starts, the Raft is initialized in candidate - // mode and we try to perform recovery. Since everything for recovery - // needs to be able to commit, we return true. - return true; - case Mode::FOLLOWER: - // When in follower mode, we will only try to apply a Raft Log when we - // receive a commit index greater or equal from the Log index from the - // leader. At that moment we don't have to check the replication log - // because the leader won't commit the Log locally if it's not replicated - // on the majority of the peers in the cluster. This is why we can short - // circuit the check to always return true if in follower mode. - return true; - case Mode::LEADER: - // We are taking copies of the rlog_ status here so we avoid the case - // where the call to `set_replicated` shadows the `active` bit that is - // checked after the `replicated` bit. It is possible that both `active` - // and `replicated` are `true` but since we check `replicated` first this - // shouldn't be a problem. - bool active = rlog_->is_active(tx_id); - bool replicated = rlog_->is_replicated(tx_id); - - // If we are shutting down, but we know that the Raft Log replicated - // successfully, we return true. This will eventually commit since we - // replicate NoOp on leader election. - if (replicated) return true; - - // Only if the transaction isn't replicated, thrown an exception to inform - // the client. - if (exiting_) throw RaftShutdownException(); - - if (active) { - if (replication_timeout_.CheckTimeout(tx_id)) { - throw ReplicationTimeoutException(); - } - return false; - } - - // The only possibility left is that our ReplicationLog doesn't contain - // information about that tx. - throw InvalidReplicationLogLookup(); - break; - } -} - -void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) { - rlog_->garbage_collect_older(tx_id); + return {{new_entry.term, log_size_ - 1}}; } bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; } uint64_t RaftServer::TermId() { return current_term_; } -TxStatus RaftServer::TransactionStatus(uint64_t term_id, uint64_t log_index) { +ReplicationStatus RaftServer::GetReplicationStatus(uint64_t term_id, + uint64_t log_index) { std::unique_lock lock(lock_); if (term_id > current_term_ || log_index >= log_size_) - return TxStatus::INVALID; + return ReplicationStatus::INVALID; auto log_entry = GetLogEntry(log_index); // This is correct because the leader can only append to the log and no two // workers can be leaders in the same term. - if (log_entry.term != term_id) return TxStatus::ABORTED; + if (log_entry.term != term_id) return ReplicationStatus::ABORTED; - if (last_applied_ < log_index) return TxStatus::WAITING; - return TxStatus::REPLICATED; + if (last_applied_ < log_index) return ReplicationStatus::WAITING; + return ReplicationStatus::REPLICATED; } -RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) - : raft_server_(raft_server) { - CHECK(raft_server_) << "RaftServer can't be nullptr"; -} +bool RaftServer::SafeToCommit(uint64_t term_id, uint64_t log_index) { + auto replication_status = GetReplicationStatus(term_id, log_index); -void RaftServer::LogEntryBuffer::Enable() { - std::lock_guard guard(buffer_lock_); - enabled_ = true; -} + // If we are shutting down, but we know that the Raft Log replicated + // successfully, we return true. This will eventually commit since we + // replicate NoOp on leader election. + if (replication_status == ReplicationStatus::REPLICATED) return true; -void RaftServer::LogEntryBuffer::Disable() { - std::lock_guard guard(buffer_lock_); - enabled_ = false; - // Clear all existing logs from buffers. - logs_.clear(); -} + // Only if the log entry isn't replicated, throw an exception to inform + // the client. + if (exiting_) throw RaftShutdownException(); -DeltaStatus RaftServer::LogEntryBuffer::Emplace( - const database::StateDelta &delta) { - std::unique_lock lock(buffer_lock_); - if (!enabled_) return {false, std::nullopt, std::nullopt}; - - tx::TransactionId tx_id = delta.transaction_id; - - std::optional term_id = std::nullopt; - std::optional log_index = std::nullopt; - - if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) { - auto it = logs_.find(tx_id); - CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; - - std::vector log(std::move(it->second)); - log.emplace_back(std::move(delta)); - logs_.erase(it); - - lock.unlock(); - auto metadata = raft_server_->AppendToLog(tx_id, log); - term_id = metadata.first; - log_index = metadata.second; - - } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { - auto it = logs_.find(tx_id); - // Sometimes it's possible that we're aborting a transaction that was meant - // to be commited and thus we don't have the StateDeltas anymore. - if (it != logs_.end()) logs_.erase(it); - } else { - logs_[tx_id].emplace_back(std::move(delta)); + if (replication_status == ReplicationStatus::WAITING) { + if (replication_timeout_.CheckTimeout(term_id, log_index)) { + throw ReplicationTimeoutException(); + } + return false; } - return {true, term_id, log_index}; + // TODO(ipaljak): Fix the old naming. + // The only possibility left is that our ReplicationLog doesn't contain + // information about that tx. + throw InvalidReplicationLogLookup(); } void RaftServer::RecoverPersistentData() { @@ -624,7 +533,6 @@ void RaftServer::Transition(const Mode &new_mode) { bool reset = mode_ == Mode::LEADER; issue_hb_ = false; mode_ = Mode::FOLLOWER; - log_entry_buffer_.Disable(); if (reset) { VLOG(40) << "Resetting internal state"; @@ -632,7 +540,6 @@ void RaftServer::Transition(const Mode &new_mode) { next_election_ = TimePoint::max(); db_->Reset(); - ResetReplicationLog(); replication_timeout_.Clear(); // Re-apply raft log. @@ -718,7 +625,6 @@ void RaftServer::Transition(const Mode &new_mode) { last_applied_ = log_size_ - 1; mode_ = Mode::LEADER; - log_entry_buffer_.Enable(); leader_changed_.notify_all(); break; @@ -763,12 +669,10 @@ void RaftServer::AdvanceCommitIndex() { VLOG(40) << "Begin applying commited transactions"; for (int i = commit_index_ + 1; i <= new_commit_index; ++i) { - auto deltas = GetLogEntry(i).deltas; - DCHECK(deltas.size() > 2) - << "Log entry should consist of at least two state deltas."; - auto tx_id = deltas[0].transaction_id; - rlog_->set_replicated(tx_id); - replication_timeout_.Remove(tx_id); + auto log_entry = GetLogEntry(i); + DCHECK(log_entry.deltas.size() > 2) + << "Log entry should consist of at least three state deltas."; + replication_timeout_.Remove(log_entry.term, i); } commit_index_ = new_commit_index; @@ -1331,11 +1235,6 @@ LogEntry RaftServer::DeserializeLogEntry( return deserialized; } -void RaftServer::ResetReplicationLog() { - rlog_ = nullptr; - rlog_ = std::make_unique(); -} - void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) { durability::RecoveryData recovery_data; bool recovery = durability::RecoverSnapshot( @@ -1346,8 +1245,9 @@ void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) { } void RaftServer::NoOpCreate() { + // TODO(ipaljak): Review this after implementing RaftDelta object. auto dba = db_->Access(); - Emplace(database::StateDelta::NoOp(dba.transaction_id())); + db_->sd_buffer()->Emplace(database::StateDelta::NoOp(dba.transaction_id())); try { dba.Commit(); } catch (const RaftException &) { @@ -1361,6 +1261,8 @@ void RaftServer::ApplyStateDeltas( std::optional dba; for (auto &delta : deltas) { switch (delta.type) { + case database::StateDelta::Type::NO_OP: + break; case database::StateDelta::Type::TRANSACTION_BEGIN: CHECK(!dba) << "Double transaction start"; dba = db_->Access(); diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index d97fe2457..7f3766822 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -92,35 +92,16 @@ class RaftServer final : public RaftInterface { /// Persists snapshot metadata. void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata); - /// Append to the log a list of batched state deltas that are ready to be - /// replicated. + /// Emplace a new LogEntry in the raft log and start its replication. This + /// entry is created from a given batched set of StateDelta objects. /// - /// @returns metadata about the emplaced log entry. More precisely, an - /// ordered pair (term_id, log_id) of the newly emplaced - /// log entry. If the entry was not emplaced, the method - /// returns std::nullopt (e.g. read-only transactions). - std::pair, std::optional> AppendToLog( - const tx::TransactionId &tx_id, - const std::vector &deltas); - - /// Emplace a single StateDelta to the corresponding batch. If the StateDelta - /// marks the transaction end, it will replicate the log accorss the cluster. + /// It is possible that the entry was not successfully emplaced. In that case, + /// the method returns std::nullopt and the caller is responsible for handling + /// situation correctly (e.g. aborting the corresponding transaction). /// - /// @returns DeltaStatus object as a result. - DeltaStatus Emplace(const database::StateDelta &delta) override; - - /// Checks if the transaction with the given transaction id can safely be - /// Returns the current state of the replication known by this machine. - - /// Checks if the transaction with the given transaction id can safely be - /// committed in local storage. - /// - /// @param tx_id Transaction id which needs to be checked. - /// @return bool True if the transaction is safe to commit, false otherwise. - /// @throws ReplicationTimeoutException - /// @throws RaftShutdownException - /// @throws InvalidReplicationLogLookup - bool SafeToCommit(const tx::TransactionId &tx_id) override; + /// @returns an optional LogEntryStatus object as result. + std::optional Emplace( + const std::vector &deltas) override; /// Returns true if the current servers mode is LEADER. False otherwise. bool IsLeader() override; @@ -128,51 +109,34 @@ class RaftServer final : public RaftInterface { /// Returns the term ID of the current leader. uint64_t TermId() override; - /// Returns the status of the transaction which began its replication in + /// Returns the replication status of LogEntry which began its replication in /// a given term ID and was emplaced in the raft log at the given index. - TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) override; + /// + /// Replication status can be one of the following + /// 1) REPLICATED -- LogEntry was successfully replicated across + /// the Raft cluster + /// 2) WAITING -- LogEntry was successfully emplaced in the Raft + /// log and is currently being replicated. + /// 3) ABORTED -- LogEntry will not be replicated. + /// 4) INVALID -- the request for the LogEntry was invalid, most + /// likely either term_id or log_index were out of range. + ReplicationStatus GetReplicationStatus(uint64_t term_id, + uint64_t log_index) override; - void GarbageCollectReplicationLog(const tx::TransactionId &tx_id); + /// Checks if the LogEntry with the give term id and log index can safely be + /// committed in local storage. + /// + /// @param term_id term when the LogEntry was created + /// @param log_index index of the LogEntry in the Raft log + /// + /// @return bool True if the transaction is safe to commit, false otherwise. + /// + /// @throws ReplicationTimeoutException + /// @throws RaftShutdownException + /// @throws InvalidReplicationLogLookup + bool SafeToCommit(uint64_t term_id, uint64_t log_index) override; private: - /// Buffers incomplete Raft logs. - /// - /// A Raft log is considered to be complete if it ends with a StateDelta - /// that represents transaction commit. - /// LogEntryBuffer will be used instead of WriteAheadLog. We don't need to - /// persist logs until we receive a majority vote from the Raft cluster, and - /// apply the to our local state machine(storage). - class LogEntryBuffer final { - public: - LogEntryBuffer() = delete; - - explicit LogEntryBuffer(RaftServer *raft_server); - - void Enable(); - - /// Disable all future insertions in the buffer. - /// - /// Note: this will also clear all existing logs from buffers. - void Disable(); - - /// Insert a new StateDelta in logs. - /// - /// If the StateDelta type is `TRANSACTION_COMMIT` it will start - /// replicating, and if the type is `TRANSACTION_ABORT` it will delete the - /// log from buffer. - /// - /// @returns DeltaStatus object as a result. - DeltaStatus Emplace(const database::StateDelta &delta); - - private: - bool enabled_{false}; - mutable std::mutex buffer_lock_; - std::unordered_map> - logs_; - - RaftServer *raft_server_{nullptr}; - }; - mutable std::mutex lock_; ///< Guards all internal state. mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal. mutable std::mutex heartbeat_lock_; ///< Guards HB issuing @@ -184,7 +148,6 @@ class RaftServer final : public RaftInterface { Config config_; ///< Raft config. Coordination *coordination_{nullptr}; ///< Cluster coordination. database::GraphDb *db_{nullptr}; - std::unique_ptr rlog_{nullptr}; std::atomic mode_; ///< Server's current mode. uint16_t server_id_; ///< ID of the current server. @@ -198,13 +161,6 @@ class RaftServer final : public RaftInterface { std::atomic issue_hb_; ///< Flag which signalizes if the current server ///< should send HBs to the rest of the cluster. - /// Raft log entry buffer. - /// - /// LogEntryBuffer buffers Raft logs until a log is complete and ready for - /// replication. This doesn't have to persist, if something fails before a - /// log is ready for replication it will be discarded anyway. - LogEntryBuffer log_entry_buffer_{this}; - std::vector peer_threads_; ///< One thread per peer which ///< handles outgoing RPCs. @@ -450,9 +406,6 @@ class RaftServer final : public RaftInterface { /// Deserialized Raft log entry from `std::string` LogEntry DeserializeLogEntry(const std::string &serialized_log_entry); - /// Resets the replication log used to indicate the replication status. - void ResetReplicationLog(); - /// Recovers the given snapshot if it exists in the durability directory. void RecoverSnapshot(const std::string &snapshot_filename); diff --git a/src/raft/replication_timeout_map.hpp b/src/raft/replication_timeout_map.hpp index 341ed28f1..447c66d77 100644 --- a/src/raft/replication_timeout_map.hpp +++ b/src/raft/replication_timeout_map.hpp @@ -2,10 +2,8 @@ #pragma once #include +#include #include -#include - -#include "transactions/type.hpp" namespace raft { @@ -34,23 +32,23 @@ class ReplicationTimeoutMap final { } /// Remove a single entry from the map. - void Remove(const tx::TransactionId &tx_id) { + void Remove(const uint64_t term_id, const uint64_t log_index) { std::lock_guard guard(lock_); - timeout_.erase(tx_id); + timeout_.erase({term_id, log_index}); } /// Inserts and entry in the map by setting a point in time until it needs to /// replicated. - void Insert(const tx::TransactionId &tx_id) { + void Insert(const uint64_t term_id, const uint64_t log_index) { std::lock_guard guard(lock_); - timeout_.emplace(tx_id, replication_timeout_ + Clock::now()); + timeout_[{term_id, log_index}] = replication_timeout_ + Clock::now(); } /// Checks if the given entry has timed out. /// @returns bool True if it exceeded timeout, false otherwise. - bool CheckTimeout(const tx::TransactionId &tx_id) { + bool CheckTimeout(const uint64_t term_id, const uint64_t log_index) { std::lock_guard guard(lock_); - auto found = timeout_.find(tx_id); + auto found = timeout_.find({term_id, log_index}); // If we didn't set the timeout yet, or we already deleted it, we didn't // time out. if (found == timeout_.end()) return false; @@ -65,7 +63,9 @@ class ReplicationTimeoutMap final { std::chrono::milliseconds replication_timeout_; mutable std::mutex lock_; - std::unordered_map timeout_; + // TODO(ipaljak): Consider using unordered_map if we encounter any performance + // issues. + std::map, TimePoint> timeout_; }; } // namespace raft diff --git a/src/storage/single_node_ha/record_accessor.cpp b/src/storage/single_node_ha/record_accessor.cpp index 07e6c81b5..010983e68 100644 --- a/src/storage/single_node_ha/record_accessor.cpp +++ b/src/storage/single_node_ha/record_accessor.cpp @@ -28,7 +28,7 @@ void RecordAccessor::PropsSet(storage::Property key, auto previous_value = PropsAt(key); update().properties_.set(key, value); dba.UpdateOnAddProperty(key, previous_value, value, *this, &update()); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); } template <> @@ -39,7 +39,7 @@ void RecordAccessor::PropsSet(storage::Property key, dba.PropertyName(key), value); update().properties_.set(key, value); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); } template <> @@ -51,7 +51,7 @@ void RecordAccessor::PropsErase(storage::Property key) { auto previous_value = PropsAt(key); update().properties_.set(key, PropertyValue()); dba.UpdateOnRemoveProperty(key, previous_value, *this, &update()); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); } template <> @@ -61,7 +61,7 @@ void RecordAccessor::PropsErase(storage::Property key) { StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue()); update().properties_.set(key, PropertyValue()); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); } template diff --git a/src/storage/single_node_ha/state_delta_buffer.hpp b/src/storage/single_node_ha/state_delta_buffer.hpp new file mode 100644 index 000000000..abc747839 --- /dev/null +++ b/src/storage/single_node_ha/state_delta_buffer.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include "durability/single_node_ha/state_delta.hpp" + +namespace storage { + +class StateDeltaBuffer final { + public: + /// Inserts a new StateDelta in buffer. + void Emplace(const database::StateDelta &delta) { + tx::TransactionId tx_id = delta.transaction_id; + std::vector *curr_buffer; + { + // We only need the lock when we're inserting a new key into the buffer. + std::lock_guard lock(buffer_lock_); + curr_buffer = &buffer_[tx_id]; + } + curr_buffer->emplace_back(delta); + } + + /// Retrieves all buffered StateDeltas for a given transaction id. + /// If there are no such StateDeltas, the return vector is empty. + std::vector GetDeltas( + const tx::TransactionId &tx_id) { + std::vector *curr_buffer; + { + std::lock_guard lock(buffer_lock_); + auto it = buffer_.find(tx_id); + if (it == buffer_.end()) return {}; + curr_buffer = &it->second; + } + return *curr_buffer; + } + + /// Deletes all buffered StateDeltas for a given transaction id. + void Erase(const tx::TransactionId &tx_id) { + std::lock_guard lock(buffer_lock_); + buffer_.erase(tx_id); + } + + private: + mutable std::mutex buffer_lock_; + std::unordered_map> + buffer_; +}; + +} // namespace storage diff --git a/src/storage/single_node_ha/storage_gc.hpp b/src/storage/single_node_ha/storage_gc.hpp index 8a8b40b5b..b87a5295e 100644 --- a/src/storage/single_node_ha/storage_gc.hpp +++ b/src/storage/single_node_ha/storage_gc.hpp @@ -81,7 +81,6 @@ class StorageGc { auto safe_to_delete = GetClogSafeTransaction(oldest_active); if (safe_to_delete) { tx_engine_.GarbageCollectCommitLog(*safe_to_delete); - raft_server_->GarbageCollectReplicationLog(*safe_to_delete); } } diff --git a/src/storage/single_node_ha/vertex_accessor.cpp b/src/storage/single_node_ha/vertex_accessor.cpp index 6d304d43e..ad1c5bc33 100644 --- a/src/storage/single_node_ha/vertex_accessor.cpp +++ b/src/storage/single_node_ha/vertex_accessor.cpp @@ -24,7 +24,7 @@ void VertexAccessor::add_label(storage::Label label) { // not a duplicate label, add it if (!utils::Contains(vertex.labels_, label)) { vertex.labels_.emplace_back(label); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); dba.UpdateOnAddLabel(label, *this, &vertex); } } @@ -39,7 +39,7 @@ void VertexAccessor::remove_label(storage::Label label) { auto found = std::find(labels.begin(), labels.end(), delta.label); std::swap(*found, labels.back()); labels.pop_back(); - dba.raft()->Emplace(delta); + dba.sd_buffer()->Emplace(delta); dba.UpdateOnRemoveLabel(label, *this); } } diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 82e4b0913..6f9565f56 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -10,9 +10,13 @@ namespace tx { -Engine::Engine(raft::RaftInterface *raft) - : clog_(std::make_unique()), raft_(raft) { +Engine::Engine(raft::RaftInterface *raft, + storage::StateDeltaBuffer *delta_buffer) + : clog_(std::make_unique()), + raft_(raft), + delta_buffer_(delta_buffer) { CHECK(raft) << "Raft can't be nullptr in HA"; + CHECK(delta_buffer) << "State delta buffer can't be nullptr in HA"; } Transaction *Engine::Begin() { @@ -30,7 +34,7 @@ Transaction *Engine::BeginBlocking(std::optional parent_tx) { { std::lock_guard guard(lock_); if (!accepting_transactions_.load() || !replication_errors_.empty()) - throw TransactionEngineError( + throw TransactionEngineError( "The transaction engine currently isn't accepting new transactions."); // Block the engine from accepting new transactions. @@ -79,60 +83,97 @@ CommandId Engine::UpdateCommand(TransactionId id) { void Engine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Committing transaction " << t.id_; - auto delta_status = raft_->Emplace(database::StateDelta::TxCommit(t.id_)); - if (delta_status.emplaced) { - // It is important to note the following situation. If our cluster ends up - // with a network partition where the current leader can't communicate with - // the majority of the peers, and the client is still sending queries to it, - // all of the transaction will end up waiting here until the network - // partition is resolved. The problem that can occur afterwards is bad. - // When the machine transitions from leader to follower mode, - // `ReplicationInfo` method will start returning `is_replicated=true`. This - // might lead to a problem where we suddenly want to alter the state of the - // transaction engine that isn't valid anymore, because the current machine - // isn't the leader anymore. This is all handled in the `Transition` method - // where once the transition from leader to follower occurs, the mode will - // be set to follower first, then the `Reset` method on the transaction - // engine will wait for all transactions to finish, and even though we - // change the transaction engine state here, the engine will perform a - // `Reset` and start recovering from zero, and the invalid changes won't - // matter. + delta_buffer_->Emplace(database::StateDelta::TxCommit(t.id_)); + auto deltas = delta_buffer_->GetDeltas(t.id_); - // Wait for Raft to receive confirmation from the majority of followers. - while (true) { - try { - if (raft_->SafeToCommit(t.id_)) break; - } catch (const raft::ReplicationTimeoutException &e) { - std::lock_guard guard(lock_); - if (replication_errors_.insert(t.id_).second) { - LOG(WARNING) << e.what(); - } - } - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - - std::unique_lock raft_lock(raft_->WithLock(), std::defer_lock); - // We need to acquire the Raft lock so we don't end up racing with a Raft - // thread that can reset the engine state. If we can't acquire the lock, and - // we end up with reseting the engine, we throw - // UnexpectedLeaderChangeException. - while (true) { - if (raft_lock.try_lock()) { - break; - } - // This is the case when we've lost our leader status due to another peer - // requesting election. - if (reset_active_.load()) throw raft::UnexpectedLeaderChangeException(); - // This is the case when we're shutting down and we're no longer a valid - // leader. `SafeToCommit` will throw `RaftShutdownException` if the - // transaction wasn't replicated and the client will receive a negative - // response. Otherwise, we'll end up here, and since the transaction was - // replciated, we need to inform the client that the query succeeded. - if (!raft_->IsLeader()) break; - std::this_thread::sleep_for(std::chrono::microseconds(100)); + // If we have only two state deltas in our transaction, that means we are + // dealing with a read-only transaction which does not need to be replicated + // throughout the cluster, so we simply commit it in our storage. + // + // Also, when the current server is not in the leader mode, the following + // holds: + // + // 1) In CANDIDATE mode we need to be able to commit because Raft is + // initialzed in that mode and needs to perform recovery. + // + // 2) In FOLLOWER mode, Raft will only try to apply state deltas from logs + // that are behind the current commit index and are therefore safe to + // apply. + if (deltas.size() == 2 || !raft_->IsLeader()) { + delta_buffer_->Erase(t.id_); + std::lock_guard guard(lock_); + clog_->set_committed(t.id_); + active_.remove(t.id_); + store_.erase(store_.find(t.id_)); + if (t.blocking()) { + accepting_transactions_.store(true); } + return; } + auto log_entry_status = raft_->Emplace(deltas); + + // Log Entry was not successfully emplaced and the transaction should be + // aborted + if (!log_entry_status) { + Abort(t); + return; + } + + // It is important to note the following situation. If our cluster ends up + // with a network partition where the current leader can't communicate with + // the majority of the peers, and the client is still sending queries to it, + // all of the transaction will end up waiting here until the network + // partition is resolved. The problem that can occur afterwards is bad. + // When the machine transitions from leader to follower mode, + // `ReplicationInfo` method will start returning `is_replicated=true`. This + // might lead to a problem where we suddenly want to alter the state of the + // transaction engine that isn't valid anymore, because the current machine + // isn't the leader anymore. This is all handled in the `Transition` method + // where once the transition from leader to follower occurs, the mode will + // be set to follower first, then the `Reset` method on the transaction + // engine will wait for all transactions to finish, and even though we + // change the transaction engine state here, the engine will perform a + // `Reset` and start recovering from zero, and the invalid changes won't + // matter. + + // Wait for Raft to receive confirmation from the majority of followers. + while (true) { + try { + if (raft_->SafeToCommit(log_entry_status->term_id, + log_entry_status->log_index)) + break; + } catch (const raft::ReplicationTimeoutException &e) { + std::lock_guard guard(lock_); + if (replication_errors_.insert(t.id_).second) { + LOG(WARNING) << e.what(); + } + } + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + std::unique_lock raft_lock(raft_->WithLock(), std::defer_lock); + // We need to acquire the Raft lock so we don't end up racing with a Raft + // thread that can reset the engine state. If we can't acquire the lock, and + // we end up with reseting the engine, we throw + // UnexpectedLeaderChangeException. + while (true) { + if (raft_lock.try_lock()) { + break; + } + // This is the case when we've lost our leader status due to another peer + // requesting election. + if (reset_active_.load()) throw raft::UnexpectedLeaderChangeException(); + // This is the case when we're shutting down and we're no longer a valid + // leader. `SafeToCommit` will throw `RaftShutdownException` if the + // transaction wasn't replicated and the client will receive a negative + // response. Otherwise, we'll end up here, and since the transaction was + // replciated, we need to inform the client that the query succeeded. + if (!raft_->IsLeader()) break; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + delta_buffer_->Erase(t.id_); std::lock_guard guard(lock_); replication_errors_.erase(t.id_); clog_->set_committed(t.id_); @@ -145,7 +186,7 @@ void Engine::Commit(const Transaction &t) { void Engine::Abort(const Transaction &t) { VLOG(11) << "[Tx] Aborting transaction " << t.id_; - raft_->Emplace(database::StateDelta::TxAbort(t.id_)); + delta_buffer_->Erase(t.id_); std::lock_guard guard(lock_); clog_->set_aborted(t.id_); active_.remove(t.id_); @@ -257,7 +298,7 @@ Transaction *Engine::BeginTransaction(bool blocking) { Transaction *t = new Transaction(id, active_, *this, blocking); active_.insert(id); store_.emplace(id, t); - raft_->Emplace(database::StateDelta::TxBegin(id)); + delta_buffer_->Emplace(database::StateDelta::TxBegin(id)); return t; } diff --git a/src/transactions/single_node_ha/engine.hpp b/src/transactions/single_node_ha/engine.hpp index 467e2eae1..ad5056e0c 100644 --- a/src/transactions/single_node_ha/engine.hpp +++ b/src/transactions/single_node_ha/engine.hpp @@ -8,6 +8,7 @@ #include #include "raft/raft_interface.hpp" +#include "storage/single_node_ha/state_delta_buffer.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" #include "utils/spin_lock.hpp" @@ -24,7 +25,7 @@ class TransactionEngineError : public utils::BasicException { /// information needed for raft followers when replicating logs. class Engine final { public: - explicit Engine(raft::RaftInterface *raft); + Engine(raft::RaftInterface *raft, storage::StateDeltaBuffer *delta_buffer); Engine(const Engine &) = delete; Engine(Engine &&) = delete; @@ -72,6 +73,7 @@ class Engine final { Snapshot active_; mutable utils::SpinLock lock_; raft::RaftInterface *raft_{nullptr}; + storage::StateDeltaBuffer *delta_buffer_{nullptr}; std::atomic accepting_transactions_{true}; std::atomic reset_active_{false}; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index c589e8788..199aad11f 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -202,9 +202,6 @@ target_link_libraries(${test_prefix}stripped mg-single-node kvstore_dummy_lib) add_unit_test(transaction_engine_single_node.cpp) target_link_libraries(${test_prefix}transaction_engine_single_node mg-single-node kvstore_dummy_lib) -add_unit_test(transaction_engine_single_node_ha.cpp) -target_link_libraries(${test_prefix}transaction_engine_single_node_ha mg-single-node-ha kvstore_dummy_lib) - add_unit_test(typed_value.cpp) target_link_libraries(${test_prefix}typed_value mg-single-node kvstore_dummy_lib) diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp deleted file mode 100644 index d14c503e5..000000000 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "gtest/gtest.h" - -#include -#include - -#include "durability/single_node_ha/state_delta.hpp" -#include "raft/raft_interface.hpp" -#include "transactions/single_node_ha/engine.hpp" -#include "transactions/transaction.hpp" - -using namespace tx; - -class RaftMock final : public raft::RaftInterface { - public: - raft::DeltaStatus Emplace(const database::StateDelta &delta) override { - log_[delta.transaction_id].emplace_back(std::move(delta)); - return {true, std::nullopt}; - } - - bool SafeToCommit(const tx::TransactionId &) override { - return true; - } - - bool IsLeader() override { return true; } - - uint64_t TermId() override { return 1; } - - raft::TxStatus TransactionStatus(uint64_t term_id, - uint64_t log_index) override { - return raft::TxStatus::REPLICATED; - } - - std::vector GetLogForTx( - const tx::TransactionId &tx_id) { - return log_[tx_id]; - } - - std::mutex &WithLock() override { return lock_; } - - private: - std::unordered_map> log_; - std::mutex lock_; -}; - -TEST(Engine, Reset) { - RaftMock raft; - Engine engine{&raft}; - - auto t0 = engine.Begin(); - EXPECT_EQ(t0->id_, 1); - engine.Commit(*t0); - - engine.Reset(); - - auto t1 = engine.Begin(); - EXPECT_EQ(t1->id_, 1); - engine.Commit(*t1); -} - -TEST(Engine, TxStateDelta) { - RaftMock raft; - Engine engine{&raft}; - - auto t0 = engine.Begin(); - tx::TransactionId tx_id = t0->id_; - engine.Commit(*t0); - - auto t0_log = raft.GetLogForTx(tx_id); - EXPECT_EQ(t0_log.size(), 2); - - using Type = enum database::StateDelta::Type; - EXPECT_EQ(t0_log[0].type, Type::TRANSACTION_BEGIN); - EXPECT_EQ(t0_log[0].transaction_id, tx_id); - EXPECT_EQ(t0_log[1].type, Type::TRANSACTION_COMMIT); - EXPECT_EQ(t0_log[1].transaction_id, tx_id); -}