From 4acad5795b6253341acf9f74f1ad750127dee05e Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Tue, 16 Jul 2019 10:50:53 +0200 Subject: [PATCH] Expose the status of transaction within Raft Summary: For proper client interaction, we need to expose the (term_id, log_index) pair for the transaction that's about to be replicated and we need to be able to retrieve the status of a transaction defined by that pair. Transaction status can be one of the following: 1) REPLICATED (self-explanatory) 2) WAITING (waiting for replication) 3) ABORTED (self-explanatory) 4) INVALID (received request with either invalid term_id or invalid log_index) Reviewers: mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2201 --- src/raft/raft_interface.hpp | 49 ++++++++++++++++++- src/raft/raft_server.cpp | 40 ++++++++++++--- src/raft/raft_server.hpp | 24 ++++++--- src/transactions/single_node_ha/engine.cpp | 3 +- .../transaction_engine_single_node_ha.cpp | 9 +++- 5 files changed, 105 insertions(+), 20 deletions(-) diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index 801e2d66c..d80506a49 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -9,14 +9,46 @@ namespace raft { +enum class TxStatus { REPLICATED, WAITING, ABORTED, INVALID }; + +inline std::string TxStatusToString(const TxStatus &tx_status) { + switch (tx_status) { + case TxStatus::REPLICATED: + return "REPLICATED"; + case TxStatus::WAITING: + return "WAITING"; + case TxStatus::ABORTED: + return "ABORTED"; + case TxStatus::INVALID: + return "INVALID"; + } +} + +/// Structure which describes the StateDelta status after the execution of +/// RaftServer's Emplace method. +/// +/// It consists of two fields: +/// 1) A boolean flag `emplaced` which signals whether the delta has +/// successfully been emplaced in the raft log buffer. +/// 2) Two optional unsigned 64-bit integers which denote the term +/// when the corresponding LogEntry was emplaced and its log_index in +/// the Raft log. These values are contained in the optional metadata only +/// if the emplaced StateDelta signifies the COMMIT of a non-read-only +/// transaction. +struct DeltaStatus { + bool emplaced; + std::optional term_id; + std::optional log_index; +}; + /// Exposes only functionality that other parts of Memgraph can interact with, /// emplacing a state delta into the appropriate Raft log entry. class RaftInterface { public: /// Add StateDelta to the appropriate Raft log entry. /// - /// @returns true if the Delta is emplaced, false otherwise. - virtual bool Emplace(const database::StateDelta &) = 0; + /// @returns DeltaStatus object as a result. + virtual DeltaStatus Emplace(const database::StateDelta &) = 0; /// Checks if the transaction with the given transaction id can safely be /// committed in local storage. @@ -28,6 +60,19 @@ class RaftInterface { /// Returns the term ID of the current leader. virtual uint64_t TermId() = 0; + /// Returns the status of the transaction which began its replication in + /// a given term ID and was emplaced in the raft log at the given index. + /// + /// Transaction status can be one of the following: + /// 1) REPLICATED -- transaction was successfully replicated accross + /// the Raft cluster + /// 2) WAITING -- transaction was successfully emplaced in the Raft + /// log and is currently being replicated. + /// 3) ABORTED -- transaction was aborted. + /// 4) INVALID -- the request for the transaction was invalid, most + /// likely either term_id or log_index were out of range. + virtual TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) = 0; + virtual std::mutex &WithLock() = 0; protected: diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 7973d51aa..02bc70143 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -429,8 +429,9 @@ void RaftServer::PersistSnapshotMetadata( snapshot_metadata_.emplace(snapshot_metadata); } -void RaftServer::AppendToLog(const tx::TransactionId &tx_id, - const std::vector &deltas) { +std::pair, std::optional> +RaftServer::AppendToLog(const tx::TransactionId &tx_id, + const std::vector &deltas) { std::unique_lock lock(lock_); DCHECK(mode_ == Mode::LEADER) << "`AppendToLog` should only be called in LEADER mode"; @@ -440,7 +441,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, << "Transactions with two state deltas must be reads (start with BEGIN " "and end with COMMIT)"; rlog_->set_replicated(tx_id); - return; + return {std::nullopt, std::nullopt}; } rlog_->set_active(tx_id); @@ -459,9 +460,10 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, replication_timeout_.Insert(tx_id); state_changed_.notify_all(); + return std::make_pair(current_term_.load(), log_size_ - 1); } -bool RaftServer::Emplace(const database::StateDelta &delta) { +DeltaStatus RaftServer::Emplace(const database::StateDelta &delta) { return log_entry_buffer_.Emplace(delta); } @@ -520,6 +522,21 @@ bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; } uint64_t RaftServer::TermId() { return current_term_; } +TxStatus RaftServer::TransactionStatus(uint64_t term_id, uint64_t log_index) { + std::unique_lock lock(lock_); + if (term_id > current_term_ || log_index >= log_size_) + return TxStatus::INVALID; + + auto log_entry = GetLogEntry(log_index); + + // This is correct because the leader can only append to the log and no two + // workers can be leaders in the same term. + if (log_entry.term != term_id) return TxStatus::ABORTED; + + if (last_applied_ < log_index) return TxStatus::WAITING; + return TxStatus::REPLICATED; +} + RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) : raft_server_(raft_server) { CHECK(raft_server_) << "RaftServer can't be nullptr"; @@ -537,11 +554,16 @@ void RaftServer::LogEntryBuffer::Disable() { logs_.clear(); } -bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { +DeltaStatus RaftServer::LogEntryBuffer::Emplace( + const database::StateDelta &delta) { std::unique_lock lock(buffer_lock_); - if (!enabled_) return false; + if (!enabled_) return {false, std::nullopt, std::nullopt}; tx::TransactionId tx_id = delta.transaction_id; + + std::optional term_id = std::nullopt; + std::optional log_index = std::nullopt; + if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) { auto it = logs_.find(tx_id); CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; @@ -551,7 +573,9 @@ bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { logs_.erase(it); lock.unlock(); - raft_server_->AppendToLog(tx_id, log); + auto metadata = raft_server_->AppendToLog(tx_id, log); + term_id = metadata.first; + log_index = metadata.second; } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { auto it = logs_.find(tx_id); @@ -562,7 +586,7 @@ bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { logs_[tx_id].emplace_back(std::move(delta)); } - return true; + return {true, term_id, log_index}; } void RaftServer::RecoverPersistentData() { diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index c3e8538d1..d97fe2457 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -92,16 +92,22 @@ class RaftServer final : public RaftInterface { /// Persists snapshot metadata. void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata); - /// Append to the log a list of batched state deltasa that are ready to be + /// Append to the log a list of batched state deltas that are ready to be /// replicated. - void AppendToLog(const tx::TransactionId &tx_id, - const std::vector &deltas); + /// + /// @returns metadata about the emplaced log entry. More precisely, an + /// ordered pair (term_id, log_id) of the newly emplaced + /// log entry. If the entry was not emplaced, the method + /// returns std::nullopt (e.g. read-only transactions). + std::pair, std::optional> AppendToLog( + const tx::TransactionId &tx_id, + const std::vector &deltas); /// Emplace a single StateDelta to the corresponding batch. If the StateDelta /// marks the transaction end, it will replicate the log accorss the cluster. /// - /// @returns true if the Delta is emplaced, false otherwise. - bool Emplace(const database::StateDelta &delta) override; + /// @returns DeltaStatus object as a result. + DeltaStatus Emplace(const database::StateDelta &delta) override; /// Checks if the transaction with the given transaction id can safely be /// Returns the current state of the replication known by this machine. @@ -122,6 +128,10 @@ class RaftServer final : public RaftInterface { /// Returns the term ID of the current leader. uint64_t TermId() override; + /// Returns the status of the transaction which began its replication in + /// a given term ID and was emplaced in the raft log at the given index. + TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) override; + void GarbageCollectReplicationLog(const tx::TransactionId &tx_id); private: @@ -151,8 +161,8 @@ class RaftServer final : public RaftInterface { /// replicating, and if the type is `TRANSACTION_ABORT` it will delete the /// log from buffer. /// - /// @returns true if the Delta is emplaced, false otherwise. - bool Emplace(const database::StateDelta &delta); + /// @returns DeltaStatus object as a result. + DeltaStatus Emplace(const database::StateDelta &delta); private: bool enabled_{false}; diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 9be9bd22d..82e4b0913 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -79,7 +79,8 @@ CommandId Engine::UpdateCommand(TransactionId id) { void Engine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Committing transaction " << t.id_; - if (raft_->Emplace(database::StateDelta::TxCommit(t.id_))) { + auto delta_status = raft_->Emplace(database::StateDelta::TxCommit(t.id_)); + if (delta_status.emplaced) { // It is important to note the following situation. If our cluster ends up // with a network partition where the current leader can't communicate with // the majority of the peers, and the client is still sending queries to it, diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp index ef3bf5aa6..d14c503e5 100644 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -12,9 +12,9 @@ using namespace tx; class RaftMock final : public raft::RaftInterface { public: - bool Emplace(const database::StateDelta &delta) override { + raft::DeltaStatus Emplace(const database::StateDelta &delta) override { log_[delta.transaction_id].emplace_back(std::move(delta)); - return true; + return {true, std::nullopt}; } bool SafeToCommit(const tx::TransactionId &) override { @@ -25,6 +25,11 @@ class RaftMock final : public raft::RaftInterface { uint64_t TermId() override { return 1; } + raft::TxStatus TransactionStatus(uint64_t term_id, + uint64_t log_index) override { + return raft::TxStatus::REPLICATED; + } + std::vector GetLogForTx( const tx::TransactionId &tx_id) { return log_[tx_id];