diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 9cd9d1a89..915aba193 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -252,6 +252,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl) if (impl_->config_.durability_enabled) durability::CheckDurabilityDir(impl_->config_.durability_directory); + // Recovery on startup. if (impl_->config_.db_recover_on_startup) durability::Recover(impl_->config_.durability_directory, *impl_); if (impl_->config_.durability_enabled) { @@ -261,9 +262,32 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl) "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), [this] { MakeSnapshot(); }); } + + // Start transaction killer. + if (impl_->config_.query_execution_time_sec != -1) { + transaction_killer_.Run( + "TX killer", + std::chrono::seconds(std::max( + 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), + [this]() { + impl_->tx_engine().LocalForEachActiveTransaction( + [this](tx::Transaction &t) { + if (t.creation_time() + + std::chrono::seconds( + impl_->config_.query_execution_time_sec) < + std::chrono::steady_clock::now()) { + t.set_should_abort(); + }; + }); + }); + } } PublicBase::~PublicBase() { + is_accepting_transactions_ = false; + tx_engine().LocalForEachActiveTransaction( + [](auto &t) { t.set_should_abort(); }); + snapshot_creator_.release(); if (impl_->config_.snapshot_on_exit) MakeSnapshot(); } @@ -330,33 +354,6 @@ void PublicBase::MakeSnapshot() { } } // namespace impl -MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl) - : PublicBase(std::move(impl)) { - if (impl_->config_.query_execution_time_sec != -1) { - transaction_killer_.Run( - "TX killer", - std::chrono::seconds(std::max( - 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), - [this]() { - impl_->tx_engine().LocalForEachActiveTransaction( - [this](tx::Transaction &t) { - if (t.creation_time() + - std::chrono::seconds( - impl_->config_.query_execution_time_sec) < - std::chrono::steady_clock::now()) { - t.set_should_abort(); - }; - }); - }); - } -} - -MasterBase::~MasterBase() { - is_accepting_transactions_ = false; - tx_engine().LocalForEachActiveTransaction( - [](auto &t) { t.set_should_abort(); }); -} - SingleNode::SingleNode(Config config) : MasterBase(std::make_unique<impl::SingleNode>(config)) {} diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 3f2aa9e69..08ea9b26f 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -149,6 +149,8 @@ class PublicBase : public GraphDb { distributed::RemoteUpdatesRpcClients &remote_updates_clients() override; distributed::RemoteDataManager &remote_data_manager() override; + bool is_accepting_transactions() const { return is_accepting_transactions_; } + protected: explicit PublicBase(std::unique_ptr<PrivateBase> impl); ~PublicBase(); @@ -158,21 +160,17 @@ class PublicBase : public GraphDb { private: std::unique_ptr<Scheduler> snapshot_creator_; + /** When this is false, no new transactions should be created. */ + std::atomic<bool> is_accepting_transactions_{true}; + Scheduler transaction_killer_; + void MakeSnapshot(); }; } // namespace impl class MasterBase : public impl::PublicBase { public: - explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl); - bool is_accepting_transactions() const { return is_accepting_transactions_; } - - ~MasterBase(); - - private: - /** When this is false, no new transactions should be created. */ - std::atomic<bool> is_accepting_transactions_{true}; - Scheduler transaction_killer_; + using PublicBase::PublicBase; }; class SingleNode : public MasterBase { diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index 625bf2320..de8152c70 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -122,8 +122,12 @@ class RemoteProduceRpcServer { cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; } catch (const query::ReconstructionException &) { cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + } catch (const query::RemoveAttachedVertexException &) { + cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR; } catch (const query::QueryRuntimeException &) { cursor_state_ = RemotePullState::QUERY_ERROR; + } catch (const query::HintedAbortError &) { + cursor_state_ = RemotePullState::HINTED_ABORT_ERROR; } return std::make_pair(std::move(results), cursor_state_); } diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/remote_pull_produce_rpc_messages.hpp index 276874a55..9660b2772 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/remote_pull_produce_rpc_messages.hpp @@ -30,6 +30,8 @@ enum class RemotePullState { LOCK_TIMEOUT_ERROR, UPDATE_DELETED_ERROR, RECONSTRUCTION_ERROR, + UNABLE_TO_DELETE_VERTEX_ERROR, + HINTED_ABORT_ERROR, QUERY_ERROR }; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index fec6ba1a1..6e5e53116 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -3081,6 +3081,10 @@ class RemotePuller { "RecordDeleted error ocured during PullRemote !"); case distributed::RemotePullState::RECONSTRUCTION_ERROR: throw query::ReconstructionException(); + case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); + case distributed::RemotePullState::HINTED_ABORT_ERROR: + throw HintedAbortError(); case distributed::RemotePullState::QUERY_ERROR: throw QueryRuntimeException( "Query runtime error occurred duing PullRemote !"); @@ -3305,6 +3309,10 @@ class SynchronizeCursor : public Cursor { case distributed::RemotePullState::RECONSTRUCTION_ERROR: throw QueryRuntimeException( "Failed to perform remote accumulate due to ReconstructionError"); + case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: + throw RemoveAttachedVertexException(); + case distributed::RemotePullState::HINTED_ABORT_ERROR: + throw HintedAbortError(); case distributed::RemotePullState::QUERY_ERROR: throw QueryRuntimeException( "Failed to perform remote accumulate due to Query runtime error"); diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index 45f1da614..1b1cd7d65 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -26,6 +26,8 @@ #include "query_plan_common.hpp" #include "transactions/engine_master.hpp" +DECLARE_int32(query_execution_time_sec); + using namespace distributed; using namespace database; @@ -320,3 +322,46 @@ TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) { EXPECT_TRUE(TypedValue::BoolEqual{}(results[j][0], j)); } } + +class DistributedTransactionTimeout : public DistributedGraphDbTest { + protected: + void SetUp() override { + FLAGS_query_execution_time_sec = 1; + DistributedGraphDbTest::SetUp(); + } +}; + +TEST_F(DistributedTransactionTimeout, Timeout) { + InsertVertex(worker(1)); + InsertVertex(worker(1)); + + GraphDbAccessor dba{master()}; + Context ctx{dba}; + SymbolGenerator symbol_generator{ctx.symbol_table_}; + AstTreeStorage storage; + + // Make distributed plan for MATCH (n) RETURN n + auto scan_all = MakeScanAll(storage, ctx.symbol_table_, "n"); + auto output = NEXPR("n", IDENT("n")); + auto produce = MakeProduce(scan_all.op_, output); + ctx.symbol_table_[*output->expression_] = scan_all.sym_; + ctx.symbol_table_[*output] = + ctx.symbol_table_.CreateSymbol("named_expression_1", true); + + const int plan_id = 42; + master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_); + + Parameters params; + std::vector<query::Symbol> symbols{ctx.symbol_table_[*output]}; + auto remote_pull = [this, ¶ms, &symbols, &dba]() { + return master() + .remote_pull_clients() + .RemotePull(dba, 1, plan_id, params, symbols, false, 1) + .get() + .pull_state; + }; + ASSERT_EQ(remote_pull(), distributed::RemotePullState::CURSOR_IN_PROGRESS); + // Sleep here so the remote gets a hinted error. + std::this_thread::sleep_for(2s); + EXPECT_EQ(remote_pull(), distributed::RemotePullState::HINTED_ABORT_ERROR); +}