Fix Raft shutdown
Summary: During the following scenario: - start a HA cluster with 3 machines - find the leader and start sending queries - SIGTERM the leader but leave other 2 machines untouched The leader would be stuck in the shutdown phase. This was happening because during the shutdown phase of the Bolt server, a `graph_db_accessor` would try to commit a transaction after we've already shut down Raft server. Raft, although not running, is still thinking it's in the Leader mode. Tx Engine calls the `SafeToCommit` method to Commit transactions, and ends up in an infinite loop. Since Raft was shut down it won't handle any of the incoming RPCs and won't change it's mode. The fix here is to shut down the Bolt server before Raft, so we don't have any pending commits once Raft is shut down. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1853
This commit is contained in:
parent
1b11e109fa
commit
f85095c203
@ -37,19 +37,18 @@ void GraphDb::Start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
GraphDb::~GraphDb() {}
|
|
||||||
|
|
||||||
void GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
|
void GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
|
||||||
coordination_.AwaitShutdown([this, &call_before_shutdown]() {
|
coordination_.AwaitShutdown([this, &call_before_shutdown]() {
|
||||||
tx_engine_.LocalForEachActiveTransaction(
|
tx_engine_.LocalForEachActiveTransaction(
|
||||||
[](auto &t) { t.set_should_abort(); });
|
[](auto &t) { t.set_should_abort(); });
|
||||||
|
|
||||||
call_before_shutdown();
|
call_before_shutdown();
|
||||||
|
|
||||||
|
raft_server_.Shutdown();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void GraphDb::Shutdown() {
|
void GraphDb::Shutdown() {
|
||||||
raft_server_.Shutdown();
|
|
||||||
coordination_.Shutdown();
|
coordination_.Shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,12 +95,12 @@ void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); }
|
|||||||
void GraphDb::Reset() {
|
void GraphDb::Reset() {
|
||||||
// Release gc scheduler to stop it from touching storage.
|
// Release gc scheduler to stop it from touching storage.
|
||||||
storage_gc_ = nullptr;
|
storage_gc_ = nullptr;
|
||||||
storage_ = std::make_unique<Storage>(config_.properties_on_disk);
|
|
||||||
|
|
||||||
// This will make all active transactions to abort and reset the internal
|
// This will make all active transactions to abort and reset the internal
|
||||||
// state.
|
// state.
|
||||||
tx_engine_.Reset();
|
tx_engine_.Reset();
|
||||||
|
|
||||||
|
storage_ = std::make_unique<Storage>(config_.properties_on_disk);
|
||||||
storage_gc_ = std::make_unique<StorageGc>(
|
storage_gc_ = std::make_unique<StorageGc>(
|
||||||
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
|
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,6 @@ class GraphDbAccessor;
|
|||||||
class GraphDb {
|
class GraphDb {
|
||||||
public:
|
public:
|
||||||
explicit GraphDb(Config config = Config());
|
explicit GraphDb(Config config = Config());
|
||||||
~GraphDb();
|
|
||||||
|
|
||||||
GraphDb(const GraphDb &) = delete;
|
GraphDb(const GraphDb &) = delete;
|
||||||
GraphDb(GraphDb &&) = delete;
|
GraphDb(GraphDb &&) = delete;
|
||||||
|
@ -444,7 +444,7 @@ void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
|
|||||||
rlog_->garbage_collect_older(tx_id);
|
rlog_->garbage_collect_older(tx_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RaftServer::IsLeader() { return mode_ == Mode::LEADER; }
|
bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
|
||||||
|
|
||||||
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
|
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
|
||||||
: raft_server_(raft_server) {
|
: raft_server_(raft_server) {
|
||||||
@ -930,6 +930,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (exiting_) break;
|
||||||
state_changed_.wait_until(lock, wait_until);
|
state_changed_.wait_until(lock, wait_until);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,7 +195,7 @@ class RaftServer final : public RaftInterface {
|
|||||||
///< no_op_issuer_thread that a new
|
///< no_op_issuer_thread that a new
|
||||||
///< leader has been elected.
|
///< leader has been elected.
|
||||||
|
|
||||||
bool exiting_{false}; ///< True on server shutdown.
|
std::atomic<bool> exiting_{false}; ///< True on server shutdown.
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
// volatile state on followers and candidates
|
// volatile state on followers and candidates
|
||||||
|
@ -30,7 +30,8 @@ Transaction *Engine::BeginBlocking(
|
|||||||
{
|
{
|
||||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||||
if (!accepting_transactions_.load())
|
if (!accepting_transactions_.load())
|
||||||
throw TransactionEngineError("Engine is not accepting new transactions");
|
throw TransactionEngineError(
|
||||||
|
"The transaction engine currently isn't accepting new transactions.");
|
||||||
|
|
||||||
// Block the engine from accepting new transactions.
|
// Block the engine from accepting new transactions.
|
||||||
accepting_transactions_.store(false);
|
accepting_transactions_.store(false);
|
||||||
|
@ -63,7 +63,7 @@ code=$?
|
|||||||
# Shutdown
|
# Shutdown
|
||||||
for server_id in 1 2 3
|
for server_id in 1 2 3
|
||||||
do
|
do
|
||||||
kill -9 ${HA_PIDS[$server_id]}
|
kill -15 ${HA_PIDS[$server_id]}
|
||||||
done
|
done
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
|
Loading…
Reference in New Issue
Block a user