Fix how HA handles leader change during commit

Summary:
During it's leadership, one peer can receive RPC messages from other peers that his reign is over.
The problem is when this happens during a transaction commit.

This is handled in the following way.
If we're the current leader and we want to commit a transaction, we need to make sure the Raft Log is replicated before we can tell the client that the transaction is committed.
During that wait, we can only notice that the replication takes too long, and we report that with `LOG(WARNING)` messages.

If we change the Raft mode during the wait, our Raft implementation will internally commit this transaction, but won't be able to acquire the Raft lock because the `db.Reset` has been called.
This is why there is an manual lock acquire. If we pick up that the `db.Reset` has been called, we throw an `UnexpectedLeaderChangeException` exception to the client.

Another thing with long running transactions, if someone decides to kill a `memgraph_ha` instance during the commit, the transaction will have `abort` hint set. This will cause the `src/query/operator.cpp` to throw a `HintedAbortError`. We need to catch this during the shutdown, because the `memgraph_ha` isn't dead from the user perspective, and the transaction wasn't aborted because it took too long, but we can differentiate between those two.

Reviewers: mferencevic, ipaljak

Reviewed By: mferencevic, ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1956
This commit is contained in:
Matija Santl 2019-04-09 10:02:11 +02:00
parent 37c68f0508
commit 5d5dfbb6f7
8 changed files with 149 additions and 47 deletions

View File

@ -40,7 +40,14 @@ class TransactionEngine final {
"Transaction can't be committed because there was a previous "
"error. Please invoke a rollback instead.");
}
try {
Commit();
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
expect_rollback_ = false;
in_explicit_transaction_ = false;
return {};
@ -96,6 +103,11 @@ class TransactionEngine final {
}
return summary;
#ifdef MG_SINGLE_NODE_HA
} catch (const query::HintedAbortError &) {
AbortCommand();
throw utils::BasicException("Transaction was asked to abort.");
#endif
} catch (const utils::BasicException &) {
AbortCommand();
throw;

View File

@ -89,4 +89,23 @@ class CantExecuteQueries : public RaftException {
"leader.") {}
};
/// This exception is thrown when leader re-election takes place during
/// transaction commit. We're throwing this exception to inform the client that
/// transaction failed.
class UnexpectedLeaderChangeException : public RaftException {
public:
using RaftException::RaftException;
UnexpectedLeaderChangeException()
: RaftException(
"Leader change happened during transaction commit. Aborting.") {}
};
/// This exception is thrown when the machine is in the process of shutting down
/// and Raft API is being used.
class RaftShutdownException : public RaftException {
public:
using RaftException::RaftException;
RaftShutdownException() : RaftException("Raft Server is shutting down.") {}
};
} // namespace raft

View File

@ -2,6 +2,8 @@
#pragma once
#include <mutex>
#include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
@ -12,11 +14,13 @@ namespace raft {
class RaftInterface {
public:
/// Add StateDelta to the appropriate Raft log entry.
virtual void Emplace(const database::StateDelta &) = 0;
///
/// @returns true if the Delta is emplaced, false otherwise.
virtual bool Emplace(const database::StateDelta &) = 0;
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
virtual bool SafeToCommit(const tx::TransactionId &tx_id) = 0;
virtual bool SafeToCommit(const tx::TransactionId &) = 0;
/// Returns true if the current servers mode is LEADER. False otherwise.
virtual bool IsLeader() = 0;
@ -24,6 +28,8 @@ class RaftInterface {
/// Returns the term ID of the current leader.
virtual uint64_t TermId() = 0;
virtual std::mutex &WithLock() = 0;
protected:
~RaftInterface() {}
};

View File

@ -339,9 +339,9 @@ void RaftServer::Start() {
}
void RaftServer::Shutdown() {
exiting_ = true;
{
std::lock_guard<std::mutex> guard(lock_);
exiting_ = true;
state_changed_.notify_all();
election_change_.notify_all();
@ -446,8 +446,8 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
state_changed_.notify_all();
}
void RaftServer::Emplace(const database::StateDelta &delta) {
log_entry_buffer_.Emplace(delta);
bool RaftServer::Emplace(const database::StateDelta &delta) {
return log_entry_buffer_.Emplace(delta);
}
bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
@ -463,17 +463,25 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
// leader. At that moment we don't have to check the replication log
// because the leader won't commit the Log locally if it's not replicated
// on the majority of the peers in the cluster. This is why we can short
// circut the check to always return true if in follower mode.
// circuit the check to always return true if in follower mode.
return true;
case Mode::LEADER:
// If we are shutting down, but we know that the Raft Log replicated
// successfully, we return true. This will eventually commit since we
// replicate NoOp on leader election.
if (rlog_->is_replicated(tx_id)) return true;
// Only if the transaction isn't replicated, thrown an exception to inform
// the client.
if (exiting_) throw RaftShutdownException();
if (rlog_->is_active(tx_id)) {
if (replication_timeout_.CheckTimeout(tx_id)) {
throw ReplicationTimeoutException();
}
return false;
}
if (rlog_->is_replicated(tx_id)) return true;
// The only possibility left is that our ReplicationLog doesn't contain
// information about that tx.
throw InvalidReplicationLogLookup();
@ -506,9 +514,9 @@ void RaftServer::LogEntryBuffer::Disable() {
logs_.clear();
}
void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::unique_lock<std::mutex> lock(buffer_lock_);
if (!enabled_) return;
if (!enabled_) return false;
tx::TransactionId tx_id = delta.transaction_id;
if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
@ -524,11 +532,14 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
logs_.erase(it);
// Sometimes it's possible that we're aborting a transaction that was meant
// to be commited and thus we don't have the StateDeltas anymore.
if (it != logs_.end()) logs_.erase(it);
} else {
logs_[tx_id].emplace_back(std::move(delta));
}
return true;
}
void RaftServer::RecoverPersistentData() {
@ -763,7 +774,11 @@ void RaftServer::SendLogEntries(
return;
}
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
// We can't early exit if the `exiting_` flag is true just yet. It is possible
// that the response we handle here carries the last confirmation that the logs
// have been replicated. We need to handle the response so the client doesn't
// retry the query because he thinks the query failed.
if (current_term_ != request_term || mode_ != Mode::LEADER) {
return;
}
@ -802,6 +817,7 @@ void RaftServer::SendLogEntries(
next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval;
}
if (exiting_) return;
state_changed_.notify_all();
}
@ -876,6 +892,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
}
void RaftServer::ElectionThreadMain() {
utils::ThreadSetName("ElectionThread");
std::unique_lock<std::mutex> lock(lock_);
while (!exiting_) {
if (Clock::now() >= next_election_) {
@ -1279,7 +1296,12 @@ void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
void RaftServer::NoOpCreate() {
auto dba = db_->Access();
Emplace(database::StateDelta::NoOp(dba.transaction_id()));
try {
dba.Commit();
} catch (const RaftException &) {
// NoOp failure can be ignored.
return;
}
}
void RaftServer::ApplyStateDeltas(
@ -1310,4 +1332,6 @@ void RaftServer::ApplyStateDeltas(
CHECK(!dba) << "StateDeltas missing commit command";
}
std::mutex &RaftServer::WithLock() { return lock_; }
} // namespace raft

View File

@ -99,7 +99,12 @@ class RaftServer final : public RaftInterface {
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster.
void Emplace(const database::StateDelta &delta) override;
///
/// @returns true if the Delta is emplaced, false otherwise.
bool Emplace(const database::StateDelta &delta) override;
/// Checks if the transaction with the given transaction id can safely be
/// Returns the current state of the replication known by this machine.
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
@ -107,6 +112,7 @@ class RaftServer final : public RaftInterface {
/// @param tx_id Transaction id which needs to be checked.
/// @return bool True if the transaction is safe to commit, false otherwise.
/// @throws ReplicationTimeoutException
/// @throws RaftShutdownException
/// @throws InvalidReplicationLogLookup
bool SafeToCommit(const tx::TransactionId &tx_id) override;
@ -144,7 +150,9 @@ class RaftServer final : public RaftInterface {
/// If the StateDelta type is `TRANSACTION_COMMIT` it will start
/// replicating, and if the type is `TRANSACTION_ABORT` it will delete the
/// log from buffer.
void Emplace(const database::StateDelta &delta);
///
/// @returns true if the Delta is emplaced, false otherwise.
bool Emplace(const database::StateDelta &delta);
private:
bool enabled_{false};
@ -441,5 +449,7 @@ class RaftServer final : public RaftInterface {
/// Applies the given batch of state deltas that are representing a transacton
/// to the db.
void ApplyStateDeltas(const std::vector<database::StateDelta> &deltas);
std::mutex &WithLock() override;
};
} // namespace raft

View File

@ -78,24 +78,24 @@ CommandId Engine::UpdateCommand(TransactionId id) {
}
void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
VLOG(11) << "[Tx] Committing transaction " << t.id_;
if (raft_->Emplace(database::StateDelta::TxCommit(t.id_))) {
// It is important to note the following situation. If our cluster ends up
// with a network partition where the current leader can't communicate with
// the majority of the peers, and the client is still sending queries to it,
// all of the transaction will end up waiting here until the network partition
// is resolved. The problem that can occur afterwards is bad. When the
// machine transitions from leader to follower mode, `SafeToCommit` method
// will start returning true. This might lead to a problem where we suddenly
// want to alter the state of the transaction engine that isn't valid anymore,
// because the current machine isn't the leader anymore. This is all handled
// in the `Transition` method where once the transition from leader to
// follower is happening, the mode will be set to follower first, then the
// `Reset` method on the transaction engine will wait for all transactions to
// finish, and even though we change the transaction engine state here, the
// engine will perform a `Reset` and start recovering from zero, and the
// invalid changes won't matter.
// all of the transaction will end up waiting here until the network
// partition is resolved. The problem that can occur afterwards is bad.
// When the machine transitions from leader to follower mode,
// `ReplicationInfo` method will start returning `is_replicated=true`. This
// might lead to a problem where we suddenly want to alter the state of the
// transaction engine that isn't valid anymore, because the current machine
// isn't the leader anymore. This is all handled in the `Transition` method
// where once the transition from leader to follower occurs, the mode will
// be set to follower first, then the `Reset` method on the transaction
// engine will wait for all transactions to finish, and even though we
// change the transaction engine state here, the engine will perform a
// `Reset` and start recovering from zero, and the invalid changes won't
// matter.
// Wait for Raft to receive confirmation from the majority of followers.
while (true) {
@ -110,6 +110,28 @@ void Engine::Commit(const Transaction &t) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
std::unique_lock<std::mutex> raft_lock(raft_->WithLock(), std::defer_lock);
// We need to acquire the Raft lock so we don't end up racing with a Raft
// thread that can reset the engine state. If we can't acquire the lock, and
// we end up with reseting the engine, we throw
// UnexpectedLeaderChangeException.
while (true) {
if (raft_lock.try_lock()) {
break;
}
// This is the case when we've lost our leader status due to another peer
// requesting election.
if (reset_active_.load()) throw raft::UnexpectedLeaderChangeException();
// This is the case when we're shutting down and we're no longer a valid
// leader. `SafeToCommit` will throw `RaftShutdownException` if the
// transaction wasn't replicated and the client will receive a negative
// response. Otherwise, we'll end up here, and since the transaction was
// replciated, we need to inform the client that the query succeeded.
if (!raft_->IsLeader()) break;
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
std::lock_guard<utils::SpinLock> guard(lock_);
replication_errors_.erase(t.id_);
clog_->set_committed(t.id_);
@ -122,10 +144,10 @@ void Engine::Commit(const Transaction &t) {
void Engine::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
raft_->Emplace(database::StateDelta::TxAbort(t.id_));
std::lock_guard<utils::SpinLock> guard(lock_);
clog_->set_aborted(t.id_);
active_.remove(t.id_);
raft_->Emplace(database::StateDelta::TxAbort(t.id_));
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
@ -204,6 +226,7 @@ void Engine::Reset() {
}
wait_for_txs = active_;
reset_active_.store(true);
}
// Wait for all active transactions to end.
@ -225,6 +248,7 @@ void Engine::Reset() {
clog_ = std::make_unique<CommitLog>();
}
accepting_transactions_.store(true);
reset_active_.store(false);
}
Transaction *Engine::BeginTransaction(bool blocking) {

View File

@ -73,6 +73,7 @@ class Engine final {
mutable utils::SpinLock lock_;
raft::RaftInterface *raft_{nullptr};
std::atomic<bool> accepting_transactions_{true};
std::atomic<bool> reset_active_{false};
// Keep track of transaction that experienced a replication error.
// While there is a replication error known to the engine, the engine won't

View File

@ -12,11 +12,14 @@ using namespace tx;
class RaftMock final : public raft::RaftInterface {
public:
void Emplace(const database::StateDelta &delta) override {
bool Emplace(const database::StateDelta &delta) override {
log_[delta.transaction_id].emplace_back(std::move(delta));
return true;
}
bool SafeToCommit(const tx::TransactionId &tx_id) override { return true; }
bool SafeToCommit(const tx::TransactionId &) override {
return true;
}
bool IsLeader() override { return true; }
@ -27,8 +30,11 @@ class RaftMock final : public raft::RaftInterface {
return log_[tx_id];
}
std::mutex &WithLock() override { return lock_; }
private:
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>> log_;
std::mutex lock_;
};
TEST(Engine, Reset) {