From 54b23ba5b636681477fae787da6db7c7bc56f723 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 27 Feb 2019 14:20:30 +0100 Subject: [PATCH] Add replication timeout in Raft Summary: Added a new config parameter, replication timeout. This parameter sets the upper limit to the replication phase and once the timeout exceeds, the transaction engine stops accepting new transactions. We could experience this timeout in two cases: 1. a network partition 2. majority of the cluster stops working Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1893 --- src/raft/config.hpp | 4 +++ src/raft/exceptions.hpp | 10 ++++++ src/raft/raft_server.cpp | 26 ++++++++++----- src/raft/raft_server.hpp | 9 +++++ src/transactions/single_node_ha/engine.cpp | 33 +++++++++++++++++-- src/transactions/single_node_ha/engine.hpp | 6 ++++ tests/feature_benchmark/ha/raft.json | 1 + tests/integration/ha/basic/raft.json | 1 + tests/integration/ha/index/raft.json | 1 + tests/integration/ha/log_compaction/raft.json | 1 + 10 files changed, 81 insertions(+), 11 deletions(-) diff --git a/src/raft/config.hpp b/src/raft/config.hpp index 2ac793e17..a17470d53 100644 --- a/src/raft/config.hpp +++ b/src/raft/config.hpp @@ -19,6 +19,7 @@ struct Config { std::chrono::milliseconds election_timeout_min; std::chrono::milliseconds election_timeout_max; std::chrono::milliseconds heartbeat_interval; + std::chrono::milliseconds replication_timeout; int64_t log_size_snapshot_threshold; static Config LoadFromFile(const std::string &raft_config_file) { @@ -40,6 +41,8 @@ struct Config { throw RaftConfigException(raft_config_file); if (!data["heartbeat_interval"].is_number()) throw RaftConfigException(raft_config_file); + if (!data["replication_timeout"].is_number()) + throw RaftConfigException(raft_config_file); if (!data["log_size_snapshot_threshold"].is_number()) throw RaftConfigException(raft_config_file); @@ -49,6 +52,7 @@ struct Config { std::chrono::duration( data["election_timeout_max"]), std::chrono::duration(data["heartbeat_interval"]), + std::chrono::duration(data["replication_timeout"]), data["log_size_snapshot_threshold"]}; } }; diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index e6dc1e02b..1fe2b218b 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -64,4 +64,14 @@ class InvalidReplicationLogLookup : public RaftException { : RaftException("Replication log lookup for invalid transaction.") {} }; +/// This exception is thrown when a transaction is taking too long to replicate. +/// We're throwing this to reduce the number of threads that are in an infinite +/// loop during a network partition. +class ReplicationTimeoutException : public RaftException { + public: + using RaftException::RaftException; + ReplicationTimeoutException() + : RaftException("Raft Log replication is taking too long. ") {} +}; + } // namespace raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index dcbdd93d7..f7cc9809a 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -14,9 +14,9 @@ #include "durability/single_node_ha/recovery.hpp" #include "durability/single_node_ha/snapshooter.hpp" #include "raft/exceptions.hpp" +#include "rpc/serialization.hpp" #include "utils/exceptions.hpp" #include "utils/on_scope_exit.hpp" -#include "rpc/serialization.hpp" #include "utils/thread.hpp" namespace raft { @@ -408,6 +408,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, TimePoint now = Clock::now(); for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now; + // From this point on, we can say that the replication of a LogEntry started. + replication_timeout_[tx_id] = config_.replication_timeout + now; + state_changed_.notify_all(); } @@ -431,7 +434,13 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) { // circut the check to always return true if in follower mode. return true; case Mode::LEADER: - if (rlog_->is_active(tx_id)) return false; + if (rlog_->is_active(tx_id)) { + if (replication_timeout_[tx_id] < Clock::now()) { + throw ReplicationTimeoutException(); + } + + return false; + } if (rlog_->is_replicated(tx_id)) return true; // The only possibility left is that our ReplicationLog doesn't contain // information about that tx. @@ -490,8 +499,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { void RaftServer::RecoverPersistentData() { auto opt_term = disk_storage_.Get(kCurrentTermKey); - if (opt_term) - current_term_ = std::stoull(opt_term.value()); + if (opt_term) current_term_ = std::stoull(opt_term.value()); auto opt_voted_for = disk_storage_.Get(kVotedForKey); if (!opt_voted_for) { @@ -501,8 +509,7 @@ void RaftServer::RecoverPersistentData() { } auto opt_log_size = disk_storage_.Get(kLogSizeKey); - if (opt_log_size) - log_size_ = std::stoull(opt_log_size.value()); + if (opt_log_size) log_size_ = std::stoull(opt_log_size.value()); } void RaftServer::Transition(const Mode &new_mode) { @@ -522,6 +529,7 @@ void RaftServer::Transition(const Mode &new_mode) { db_->Reset(); ResetReplicationLog(); + replication_timeout_.clear(); // Re-apply raft log. uint64_t starting_index = 1; @@ -652,7 +660,9 @@ void RaftServer::AdvanceCommitIndex() { auto deltas = GetLogEntry(i).deltas; DCHECK(deltas.size() > 2) << "Log entry should consist of at least two state deltas."; - rlog_->set_replicated(deltas[0].transaction_id); + auto tx_id = deltas[0].transaction_id; + rlog_->set_replicated(tx_id); + replication_timeout_.erase(tx_id); } commit_index_ = new_commit_index; @@ -1008,7 +1018,7 @@ void RaftServer::SnapshotThread() { disk_storage_.Delete(LogEntryKey(i)); } } - } + } } std::this_thread::sleep_for(kSnapshotPeriod); diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index bc163ae6d..9fb10e6e6 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -105,6 +105,11 @@ class RaftServer final : public RaftInterface { /// Checks if the transaction with the given transaction id can safely be /// committed in local storage. + /// + /// @param tx_id Transaction id which needs to be checked. + /// @return bool True if the transaction is safe to commit, false otherwise. + /// @throws ReplicationTimeoutException + /// @throws InvalidReplicationLogLookup bool SafeToCommit(const tx::TransactionId &tx_id) override; /// Returns true if the current servers mode is LEADER. False otherwise. @@ -231,6 +236,10 @@ class RaftServer final : public RaftInterface { ///< the next heartbeat. std::vector backoff_until_; ///< backoff for each server. + // Tracks timepoints until a transactions is allowed to be in the replication + // process. + std::unordered_map replication_timeout_; + ////////////////////////////////////////////////////////////////////////////// // persistent state on all servers // diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 07e9b3a6b..0eebe260e 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -6,6 +6,7 @@ #include "glog/logging.h" #include "durability/single_node_ha/state_delta.hpp" +#include "raft/exceptions.hpp" namespace tx { @@ -17,7 +18,7 @@ Engine::Engine(raft::RaftInterface *raft) Transaction *Engine::Begin() { VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; std::lock_guard guard(lock_); - if (!accepting_transactions_.load()) + if (!accepting_transactions_.load() || !replication_errors_.empty()) throw TransactionEngineError( "The transaction engine currently isn't accepting new transactions."); @@ -29,7 +30,7 @@ Transaction *Engine::BeginBlocking( Snapshot wait_for_txs; { std::lock_guard guard(lock_); - if (!accepting_transactions_.load()) + if (!accepting_transactions_.load() || !replication_errors_.empty()) throw TransactionEngineError( "The transaction engine currently isn't accepting new transactions."); @@ -81,12 +82,37 @@ void Engine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Commiting transaction " << t.id_; raft_->Emplace(database::StateDelta::TxCommit(t.id_)); + // It is important to note the following situation. If our cluster ends up + // with a network partition where the current leader can't communicate with + // the majority of the peers, and the client is still sending queries to it, + // all of the transaction will end up waiting here until the network partition + // is resolved. The problem that can occur afterwards is bad. When the + // machine transitions from leader to follower mode, `SafeToCommit` method + // will start returning true. This might lead to a problem where we suddenly + // want to alter the state of the transaction engine that isn't valid anymore, + // because the current machine isn't the leader anymore. This is all handled + // in the `Transition` method where once the transition from leader to + // follower is happening, the mode will be set to follower first, then the + // `Reset` method on the transaction engine will wait for all transactions to + // finish, and even though we change the transaction engine state here, the + // engine will perform a `Reset` and start recovering from zero, and the + // invalid changes won't matter. + // Wait for Raft to receive confirmation from the majority of followers. - while (!raft_->SafeToCommit(t.id_)) { + while (true) { + try { + if (raft_->SafeToCommit(t.id_)) break; + } catch (const raft::ReplicationTimeoutException &e) { + std::lock_guard guard(lock_); + if (replication_errors_.insert(t.id_).second) { + LOG(WARNING) << e.what(); + } + } std::this_thread::sleep_for(std::chrono::microseconds(100)); } std::lock_guard guard(lock_); + replication_errors_.erase(t.id_); clog_->set_committed(t.id_); active_.remove(t.id_); store_.erase(store_.find(t.id_)); @@ -197,6 +223,7 @@ void Engine::Reset() { // Only after all transactions have finished, reset the engine. std::lock_guard guard(lock_); counter_ = 0; + replication_errors_.clear(); store_.clear(); active_.clear(); { diff --git a/src/transactions/single_node_ha/engine.hpp b/src/transactions/single_node_ha/engine.hpp index 5cf4c194f..74fb58d58 100644 --- a/src/transactions/single_node_ha/engine.hpp +++ b/src/transactions/single_node_ha/engine.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "raft/raft_interface.hpp" #include "transactions/commit_log.hpp" @@ -75,6 +76,11 @@ class Engine final { raft::RaftInterface *raft_{nullptr}; std::atomic accepting_transactions_{true}; + // Keep track of transaction that experienced a replication error. + // While there is a replication error known to the engine, the engine won't + // accept new transactions. + std::unordered_set replication_errors_; + // Helper method for transaction begin. Transaction *BeginTransaction(bool blocking); }; diff --git a/tests/feature_benchmark/ha/raft.json b/tests/feature_benchmark/ha/raft.json index 9dc159ec4..744fcf1f6 100644 --- a/tests/feature_benchmark/ha/raft.json +++ b/tests/feature_benchmark/ha/raft.json @@ -2,5 +2,6 @@ "election_timeout_min": 350, "election_timeout_max": 700, "heartbeat_interval": 100, + "replication_timeout": 10000, "log_size_snapshot_threshold": -1 } diff --git a/tests/integration/ha/basic/raft.json b/tests/integration/ha/basic/raft.json index 5983863d8..93a21c5b7 100644 --- a/tests/integration/ha/basic/raft.json +++ b/tests/integration/ha/basic/raft.json @@ -2,5 +2,6 @@ "election_timeout_min": 200, "election_timeout_max": 500, "heartbeat_interval": 100, + "replication_timeout": 10000, "log_size_snapshot_threshold": -1 } diff --git a/tests/integration/ha/index/raft.json b/tests/integration/ha/index/raft.json index 5983863d8..93a21c5b7 100644 --- a/tests/integration/ha/index/raft.json +++ b/tests/integration/ha/index/raft.json @@ -2,5 +2,6 @@ "election_timeout_min": 200, "election_timeout_max": 500, "heartbeat_interval": 100, + "replication_timeout": 10000, "log_size_snapshot_threshold": -1 } diff --git a/tests/integration/ha/log_compaction/raft.json b/tests/integration/ha/log_compaction/raft.json index 6528d3228..a8e1e0f75 100644 --- a/tests/integration/ha/log_compaction/raft.json +++ b/tests/integration/ha/log_compaction/raft.json @@ -2,5 +2,6 @@ "election_timeout_min": 200, "election_timeout_max": 500, "heartbeat_interval": 100, + "replication_timeout": 10000, "log_size_snapshot_threshold": 100 }