Add replication timeout in Raft

Summary:
Added a new config parameter, replication timeout. This parameter sets the
upper limit to the replication phase and once the timeout exceeds, the
transaction engine stops accepting new transactions.

We could experience this timeout in two cases:
 1. a network partition
 2. majority of the cluster stops working

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1893
This commit is contained in:
Matija Santl 2019-02-27 14:20:30 +01:00
parent 65b7976b78
commit 54b23ba5b6
10 changed files with 81 additions and 11 deletions

View File

@ -19,6 +19,7 @@ struct Config {
std::chrono::milliseconds election_timeout_min;
std::chrono::milliseconds election_timeout_max;
std::chrono::milliseconds heartbeat_interval;
std::chrono::milliseconds replication_timeout;
int64_t log_size_snapshot_threshold;
static Config LoadFromFile(const std::string &raft_config_file) {
@ -40,6 +41,8 @@ struct Config {
throw RaftConfigException(raft_config_file);
if (!data["heartbeat_interval"].is_number())
throw RaftConfigException(raft_config_file);
if (!data["replication_timeout"].is_number())
throw RaftConfigException(raft_config_file);
if (!data["log_size_snapshot_threshold"].is_number())
throw RaftConfigException(raft_config_file);
@ -49,6 +52,7 @@ struct Config {
std::chrono::duration<int64_t, std::milli>(
data["election_timeout_max"]),
std::chrono::duration<int64_t, std::milli>(data["heartbeat_interval"]),
std::chrono::duration<int64_t, std::milli>(data["replication_timeout"]),
data["log_size_snapshot_threshold"]};
}
};

View File

@ -64,4 +64,14 @@ class InvalidReplicationLogLookup : public RaftException {
: RaftException("Replication log lookup for invalid transaction.") {}
};
/// This exception is thrown when a transaction is taking too long to replicate.
/// We're throwing this to reduce the number of threads that are in an infinite
/// loop during a network partition.
class ReplicationTimeoutException : public RaftException {
public:
using RaftException::RaftException;
ReplicationTimeoutException()
: RaftException("Raft Log replication is taking too long. ") {}
};
} // namespace raft

View File

@ -14,9 +14,9 @@
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "raft/exceptions.hpp"
#include "rpc/serialization.hpp"
#include "utils/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
#include "rpc/serialization.hpp"
#include "utils/thread.hpp"
namespace raft {
@ -408,6 +408,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
TimePoint now = Clock::now();
for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now;
// From this point on, we can say that the replication of a LogEntry started.
replication_timeout_[tx_id] = config_.replication_timeout + now;
state_changed_.notify_all();
}
@ -431,7 +434,13 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
// circut the check to always return true if in follower mode.
return true;
case Mode::LEADER:
if (rlog_->is_active(tx_id)) return false;
if (rlog_->is_active(tx_id)) {
if (replication_timeout_[tx_id] < Clock::now()) {
throw ReplicationTimeoutException();
}
return false;
}
if (rlog_->is_replicated(tx_id)) return true;
// The only possibility left is that our ReplicationLog doesn't contain
// information about that tx.
@ -490,8 +499,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
void RaftServer::RecoverPersistentData() {
auto opt_term = disk_storage_.Get(kCurrentTermKey);
if (opt_term)
current_term_ = std::stoull(opt_term.value());
if (opt_term) current_term_ = std::stoull(opt_term.value());
auto opt_voted_for = disk_storage_.Get(kVotedForKey);
if (!opt_voted_for) {
@ -501,8 +509,7 @@ void RaftServer::RecoverPersistentData() {
}
auto opt_log_size = disk_storage_.Get(kLogSizeKey);
if (opt_log_size)
log_size_ = std::stoull(opt_log_size.value());
if (opt_log_size) log_size_ = std::stoull(opt_log_size.value());
}
void RaftServer::Transition(const Mode &new_mode) {
@ -522,6 +529,7 @@ void RaftServer::Transition(const Mode &new_mode) {
db_->Reset();
ResetReplicationLog();
replication_timeout_.clear();
// Re-apply raft log.
uint64_t starting_index = 1;
@ -652,7 +660,9 @@ void RaftServer::AdvanceCommitIndex() {
auto deltas = GetLogEntry(i).deltas;
DCHECK(deltas.size() > 2)
<< "Log entry should consist of at least two state deltas.";
rlog_->set_replicated(deltas[0].transaction_id);
auto tx_id = deltas[0].transaction_id;
rlog_->set_replicated(tx_id);
replication_timeout_.erase(tx_id);
}
commit_index_ = new_commit_index;
@ -1008,7 +1018,7 @@ void RaftServer::SnapshotThread() {
disk_storage_.Delete(LogEntryKey(i));
}
}
}
}
}
std::this_thread::sleep_for(kSnapshotPeriod);

View File

@ -105,6 +105,11 @@ class RaftServer final : public RaftInterface {
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
///
/// @param tx_id Transaction id which needs to be checked.
/// @return bool True if the transaction is safe to commit, false otherwise.
/// @throws ReplicationTimeoutException
/// @throws InvalidReplicationLogLookup
bool SafeToCommit(const tx::TransactionId &tx_id) override;
/// Returns true if the current servers mode is LEADER. False otherwise.
@ -231,6 +236,10 @@ class RaftServer final : public RaftInterface {
///< the next heartbeat.
std::vector<TimePoint> backoff_until_; ///< backoff for each server.
// Tracks timepoints until a transactions is allowed to be in the replication
// process.
std::unordered_map<tx::TransactionId, TimePoint> replication_timeout_;
//////////////////////////////////////////////////////////////////////////////
// persistent state on all servers
//

View File

@ -6,6 +6,7 @@
#include "glog/logging.h"
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/exceptions.hpp"
namespace tx {
@ -17,7 +18,7 @@ Engine::Engine(raft::RaftInterface *raft)
Transaction *Engine::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<utils::SpinLock> guard(lock_);
if (!accepting_transactions_.load())
if (!accepting_transactions_.load() || !replication_errors_.empty())
throw TransactionEngineError(
"The transaction engine currently isn't accepting new transactions.");
@ -29,7 +30,7 @@ Transaction *Engine::BeginBlocking(
Snapshot wait_for_txs;
{
std::lock_guard<utils::SpinLock> guard(lock_);
if (!accepting_transactions_.load())
if (!accepting_transactions_.load() || !replication_errors_.empty())
throw TransactionEngineError(
"The transaction engine currently isn't accepting new transactions.");
@ -81,12 +82,37 @@ void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
// It is important to note the following situation. If our cluster ends up
// with a network partition where the current leader can't communicate with
// the majority of the peers, and the client is still sending queries to it,
// all of the transaction will end up waiting here until the network partition
// is resolved. The problem that can occur afterwards is bad. When the
// machine transitions from leader to follower mode, `SafeToCommit` method
// will start returning true. This might lead to a problem where we suddenly
// want to alter the state of the transaction engine that isn't valid anymore,
// because the current machine isn't the leader anymore. This is all handled
// in the `Transition` method where once the transition from leader to
// follower is happening, the mode will be set to follower first, then the
// `Reset` method on the transaction engine will wait for all transactions to
// finish, and even though we change the transaction engine state here, the
// engine will perform a `Reset` and start recovering from zero, and the
// invalid changes won't matter.
// Wait for Raft to receive confirmation from the majority of followers.
while (!raft_->SafeToCommit(t.id_)) {
while (true) {
try {
if (raft_->SafeToCommit(t.id_)) break;
} catch (const raft::ReplicationTimeoutException &e) {
std::lock_guard<utils::SpinLock> guard(lock_);
if (replication_errors_.insert(t.id_).second) {
LOG(WARNING) << e.what();
}
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
std::lock_guard<utils::SpinLock> guard(lock_);
replication_errors_.erase(t.id_);
clog_->set_committed(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
@ -197,6 +223,7 @@ void Engine::Reset() {
// Only after all transactions have finished, reset the engine.
std::lock_guard<utils::SpinLock> guard(lock_);
counter_ = 0;
replication_errors_.clear();
store_.clear();
active_.clear();
{

View File

@ -5,6 +5,7 @@
#include <atomic>
#include <experimental/optional>
#include <unordered_map>
#include <unordered_set>
#include "raft/raft_interface.hpp"
#include "transactions/commit_log.hpp"
@ -75,6 +76,11 @@ class Engine final {
raft::RaftInterface *raft_{nullptr};
std::atomic<bool> accepting_transactions_{true};
// Keep track of transaction that experienced a replication error.
// While there is a replication error known to the engine, the engine won't
// accept new transactions.
std::unordered_set<TransactionId> replication_errors_;
// Helper method for transaction begin.
Transaction *BeginTransaction(bool blocking);
};

View File

@ -2,5 +2,6 @@
"election_timeout_min": 350,
"election_timeout_max": 700,
"heartbeat_interval": 100,
"replication_timeout": 10000,
"log_size_snapshot_threshold": -1
}

View File

@ -2,5 +2,6 @@
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"replication_timeout": 10000,
"log_size_snapshot_threshold": -1
}

View File

@ -2,5 +2,6 @@
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"replication_timeout": 10000,
"log_size_snapshot_threshold": -1
}

View File

@ -2,5 +2,6 @@
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"replication_timeout": 10000,
"log_size_snapshot_threshold": 100
}