diff --git a/src/query/transaction_engine.hpp b/src/query/transaction_engine.hpp index 1be0d52b8..373ab68e7 100644 --- a/src/query/transaction_engine.hpp +++ b/src/query/transaction_engine.hpp @@ -40,7 +40,14 @@ class TransactionEngine final { "Transaction can't be committed because there was a previous " "error. Please invoke a rollback instead."); } - Commit(); + + try { + Commit(); + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } + expect_rollback_ = false; in_explicit_transaction_ = false; return {}; @@ -96,6 +103,11 @@ class TransactionEngine final { } return summary; +#ifdef MG_SINGLE_NODE_HA + } catch (const query::HintedAbortError &) { + AbortCommand(); + throw utils::BasicException("Transaction was asked to abort."); +#endif } catch (const utils::BasicException &) { AbortCommand(); throw; diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index 4dafa73f2..fafebae0e 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -89,4 +89,23 @@ class CantExecuteQueries : public RaftException { "leader.") {} }; +/// This exception is thrown when leader re-election takes place during +/// transaction commit. We're throwing this exception to inform the client that +/// transaction failed. +class UnexpectedLeaderChangeException : public RaftException { + public: + using RaftException::RaftException; + UnexpectedLeaderChangeException() + : RaftException( + "Leader change happened during transaction commit. Aborting.") {} +}; + +/// This exception is thrown when the machine is in the process of shutting down +/// and Raft API is being used. +class RaftShutdownException : public RaftException { + public: + using RaftException::RaftException; + RaftShutdownException() : RaftException("Raft Server is shutting down.") {} +}; + } // namespace raft diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index 1522536fa..801e2d66c 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -2,6 +2,8 @@ #pragma once +#include + #include "durability/single_node_ha/state_delta.hpp" #include "transactions/type.hpp" @@ -12,11 +14,13 @@ namespace raft { class RaftInterface { public: /// Add StateDelta to the appropriate Raft log entry. - virtual void Emplace(const database::StateDelta &) = 0; + /// + /// @returns true if the Delta is emplaced, false otherwise. + virtual bool Emplace(const database::StateDelta &) = 0; /// Checks if the transaction with the given transaction id can safely be /// committed in local storage. - virtual bool SafeToCommit(const tx::TransactionId &tx_id) = 0; + virtual bool SafeToCommit(const tx::TransactionId &) = 0; /// Returns true if the current servers mode is LEADER. False otherwise. virtual bool IsLeader() = 0; @@ -24,6 +28,8 @@ class RaftInterface { /// Returns the term ID of the current leader. virtual uint64_t TermId() = 0; + virtual std::mutex &WithLock() = 0; + protected: ~RaftInterface() {} }; diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 6cfb36888..209b20719 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -339,9 +339,9 @@ void RaftServer::Start() { } void RaftServer::Shutdown() { + exiting_ = true; { std::lock_guard guard(lock_); - exiting_ = true; state_changed_.notify_all(); election_change_.notify_all(); @@ -446,8 +446,8 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, state_changed_.notify_all(); } -void RaftServer::Emplace(const database::StateDelta &delta) { - log_entry_buffer_.Emplace(delta); +bool RaftServer::Emplace(const database::StateDelta &delta) { + return log_entry_buffer_.Emplace(delta); } bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { @@ -463,17 +463,25 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { // leader. At that moment we don't have to check the replication log // because the leader won't commit the Log locally if it's not replicated // on the majority of the peers in the cluster. This is why we can short - // circut the check to always return true if in follower mode. + // circuit the check to always return true if in follower mode. return true; case Mode::LEADER: + // If we are shutting down, but we know that the Raft Log replicated + // successfully, we return true. This will eventually commit since we + // replicate NoOp on leader election. + if (rlog_->is_replicated(tx_id)) return true; + + // Only if the transaction isn't replicated, thrown an exception to inform + // the client. + if (exiting_) throw RaftShutdownException(); + if (rlog_->is_active(tx_id)) { if (replication_timeout_.CheckTimeout(tx_id)) { throw ReplicationTimeoutException(); } - return false; } - if (rlog_->is_replicated(tx_id)) return true; + // The only possibility left is that our ReplicationLog doesn't contain // information about that tx. throw InvalidReplicationLogLookup(); @@ -506,9 +514,9 @@ void RaftServer::LogEntryBuffer::Disable() { logs_.clear(); } -void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { +bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { std::unique_lock lock(buffer_lock_); - if (!enabled_) return; + if (!enabled_) return false; tx::TransactionId tx_id = delta.transaction_id; if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) { @@ -524,11 +532,14 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { auto it = logs_.find(tx_id); - CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; - logs_.erase(it); + // Sometimes it's possible that we're aborting a transaction that was meant + // to be commited and thus we don't have the StateDeltas anymore. + if (it != logs_.end()) logs_.erase(it); } else { logs_[tx_id].emplace_back(std::move(delta)); } + + return true; } void RaftServer::RecoverPersistentData() { @@ -763,7 +774,11 @@ void RaftServer::SendLogEntries( return; } - if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) { + // We can't early exit if the `exiting_` flag is true just yet. It is possible + // that the response we handle here carries the last confirmation that the logs + // have been replicated. We need to handle the response so the client doesn't + // retry the query because he thinks the query failed. + if (current_term_ != request_term || mode_ != Mode::LEADER) { return; } @@ -802,6 +817,7 @@ void RaftServer::SendLogEntries( next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; } + if (exiting_) return; state_changed_.notify_all(); } @@ -876,6 +892,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, } void RaftServer::ElectionThreadMain() { + utils::ThreadSetName("ElectionThread"); std::unique_lock lock(lock_); while (!exiting_) { if (Clock::now() >= next_election_) { @@ -1279,7 +1296,12 @@ void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) { void RaftServer::NoOpCreate() { auto dba = db_->Access(); Emplace(database::StateDelta::NoOp(dba.transaction_id())); - dba.Commit(); + try { + dba.Commit(); + } catch (const RaftException &) { + // NoOp failure can be ignored. + return; + } } void RaftServer::ApplyStateDeltas( @@ -1310,4 +1332,6 @@ void RaftServer::ApplyStateDeltas( CHECK(!dba) << "StateDeltas missing commit command"; } +std::mutex &RaftServer::WithLock() { return lock_; } + } // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 74aa0a5a6..ab2b6b39c 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -99,7 +99,12 @@ class RaftServer final : public RaftInterface { /// Emplace a single StateDelta to the corresponding batch. If the StateDelta /// marks the transaction end, it will replicate the log accorss the cluster. - void Emplace(const database::StateDelta &delta) override; + /// + /// @returns true if the Delta is emplaced, false otherwise. + bool Emplace(const database::StateDelta &delta) override; + + /// Checks if the transaction with the given transaction id can safely be + /// Returns the current state of the replication known by this machine. /// Checks if the transaction with the given transaction id can safely be /// committed in local storage. @@ -107,6 +112,7 @@ class RaftServer final : public RaftInterface { /// @param tx_id Transaction id which needs to be checked. /// @return bool True if the transaction is safe to commit, false otherwise. /// @throws ReplicationTimeoutException + /// @throws RaftShutdownException /// @throws InvalidReplicationLogLookup bool SafeToCommit(const tx::TransactionId &tx_id) override; @@ -144,7 +150,9 @@ class RaftServer final : public RaftInterface { /// If the StateDelta type is `TRANSACTION_COMMIT` it will start /// replicating, and if the type is `TRANSACTION_ABORT` it will delete the /// log from buffer. - void Emplace(const database::StateDelta &delta); + /// + /// @returns true if the Delta is emplaced, false otherwise. + bool Emplace(const database::StateDelta &delta); private: bool enabled_{false}; @@ -441,5 +449,7 @@ class RaftServer final : public RaftInterface { /// Applies the given batch of state deltas that are representing a transacton /// to the db. void ApplyStateDeltas(const std::vector &deltas); + + std::mutex &WithLock() override; }; } // namespace raft diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 681f59c01..9be9bd22d 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -78,36 +78,58 @@ CommandId Engine::UpdateCommand(TransactionId id) { } void Engine::Commit(const Transaction &t) { - VLOG(11) << "[Tx] Commiting transaction " << t.id_; - raft_->Emplace(database::StateDelta::TxCommit(t.id_)); + VLOG(11) << "[Tx] Committing transaction " << t.id_; + if (raft_->Emplace(database::StateDelta::TxCommit(t.id_))) { + // It is important to note the following situation. If our cluster ends up + // with a network partition where the current leader can't communicate with + // the majority of the peers, and the client is still sending queries to it, + // all of the transaction will end up waiting here until the network + // partition is resolved. The problem that can occur afterwards is bad. + // When the machine transitions from leader to follower mode, + // `ReplicationInfo` method will start returning `is_replicated=true`. This + // might lead to a problem where we suddenly want to alter the state of the + // transaction engine that isn't valid anymore, because the current machine + // isn't the leader anymore. This is all handled in the `Transition` method + // where once the transition from leader to follower occurs, the mode will + // be set to follower first, then the `Reset` method on the transaction + // engine will wait for all transactions to finish, and even though we + // change the transaction engine state here, the engine will perform a + // `Reset` and start recovering from zero, and the invalid changes won't + // matter. - // It is important to note the following situation. If our cluster ends up - // with a network partition where the current leader can't communicate with - // the majority of the peers, and the client is still sending queries to it, - // all of the transaction will end up waiting here until the network partition - // is resolved. The problem that can occur afterwards is bad. When the - // machine transitions from leader to follower mode, `SafeToCommit` method - // will start returning true. This might lead to a problem where we suddenly - // want to alter the state of the transaction engine that isn't valid anymore, - // because the current machine isn't the leader anymore. This is all handled - // in the `Transition` method where once the transition from leader to - // follower is happening, the mode will be set to follower first, then the - // `Reset` method on the transaction engine will wait for all transactions to - // finish, and even though we change the transaction engine state here, the - // engine will perform a `Reset` and start recovering from zero, and the - // invalid changes won't matter. - - // Wait for Raft to receive confirmation from the majority of followers. - while (true) { - try { - if (raft_->SafeToCommit(t.id_)) break; - } catch (const raft::ReplicationTimeoutException &e) { - std::lock_guard guard(lock_); - if (replication_errors_.insert(t.id_).second) { - LOG(WARNING) << e.what(); + // Wait for Raft to receive confirmation from the majority of followers. + while (true) { + try { + if (raft_->SafeToCommit(t.id_)) break; + } catch (const raft::ReplicationTimeoutException &e) { + std::lock_guard guard(lock_); + if (replication_errors_.insert(t.id_).second) { + LOG(WARNING) << e.what(); + } } + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + std::unique_lock raft_lock(raft_->WithLock(), std::defer_lock); + // We need to acquire the Raft lock so we don't end up racing with a Raft + // thread that can reset the engine state. If we can't acquire the lock, and + // we end up with reseting the engine, we throw + // UnexpectedLeaderChangeException. + while (true) { + if (raft_lock.try_lock()) { + break; + } + // This is the case when we've lost our leader status due to another peer + // requesting election. + if (reset_active_.load()) throw raft::UnexpectedLeaderChangeException(); + // This is the case when we're shutting down and we're no longer a valid + // leader. `SafeToCommit` will throw `RaftShutdownException` if the + // transaction wasn't replicated and the client will receive a negative + // response. Otherwise, we'll end up here, and since the transaction was + // replciated, we need to inform the client that the query succeeded. + if (!raft_->IsLeader()) break; + std::this_thread::sleep_for(std::chrono::microseconds(100)); } - std::this_thread::sleep_for(std::chrono::microseconds(100)); } std::lock_guard guard(lock_); @@ -122,10 +144,10 @@ void Engine::Commit(const Transaction &t) { void Engine::Abort(const Transaction &t) { VLOG(11) << "[Tx] Aborting transaction " << t.id_; + raft_->Emplace(database::StateDelta::TxAbort(t.id_)); std::lock_guard guard(lock_); clog_->set_aborted(t.id_); active_.remove(t.id_); - raft_->Emplace(database::StateDelta::TxAbort(t.id_)); store_.erase(store_.find(t.id_)); if (t.blocking()) { accepting_transactions_.store(true); @@ -204,6 +226,7 @@ void Engine::Reset() { } wait_for_txs = active_; + reset_active_.store(true); } // Wait for all active transactions to end. @@ -225,6 +248,7 @@ void Engine::Reset() { clog_ = std::make_unique(); } accepting_transactions_.store(true); + reset_active_.store(false); } Transaction *Engine::BeginTransaction(bool blocking) { diff --git a/src/transactions/single_node_ha/engine.hpp b/src/transactions/single_node_ha/engine.hpp index 8c8647f38..467e2eae1 100644 --- a/src/transactions/single_node_ha/engine.hpp +++ b/src/transactions/single_node_ha/engine.hpp @@ -73,6 +73,7 @@ class Engine final { mutable utils::SpinLock lock_; raft::RaftInterface *raft_{nullptr}; std::atomic accepting_transactions_{true}; + std::atomic reset_active_{false}; // Keep track of transaction that experienced a replication error. // While there is a replication error known to the engine, the engine won't diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp index c19a52fae..ef3bf5aa6 100644 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -12,11 +12,14 @@ using namespace tx; class RaftMock final : public raft::RaftInterface { public: - void Emplace(const database::StateDelta &delta) override { + bool Emplace(const database::StateDelta &delta) override { log_[delta.transaction_id].emplace_back(std::move(delta)); + return true; } - bool SafeToCommit(const tx::TransactionId &tx_id) override { return true; } + bool SafeToCommit(const tx::TransactionId &) override { + return true; + } bool IsLeader() override { return true; } @@ -27,8 +30,11 @@ class RaftMock final : public raft::RaftInterface { return log_[tx_id]; } + std::mutex &WithLock() override { return lock_; } + private: std::unordered_map> log_; + std::mutex lock_; }; TEST(Engine, Reset) {