Enable transaction killer on worker

Summary:
I have not thoroughly thought this through, especially the worker
destruction (is it legit to abort all running tx?), but it's tested to
abort during remote pull, what we need.

Also I improved error handling for vertex deletion failure during
remote pull (@dgleich).

Reviewers: teon.banek, msantl, dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1263
This commit is contained in:
florijan 2018-03-01 11:03:54 +01:00
parent 41f3b75709
commit 43c0e91057
6 changed files with 90 additions and 36 deletions

View File

@ -252,6 +252,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
if (impl_->config_.durability_enabled)
durability::CheckDurabilityDir(impl_->config_.durability_directory);
// Recovery on startup.
if (impl_->config_.db_recover_on_startup)
durability::Recover(impl_->config_.durability_directory, *impl_);
if (impl_->config_.durability_enabled) {
@ -261,9 +262,32 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] { MakeSnapshot(); });
}
// Start transaction killer.
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine().LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
PublicBase::~PublicBase() {
is_accepting_transactions_ = false;
tx_engine().LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
snapshot_creator_.release();
if (impl_->config_.snapshot_on_exit) MakeSnapshot();
}
@ -330,33 +354,6 @@ void PublicBase::MakeSnapshot() {
}
} // namespace impl
MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)
: PublicBase(std::move(impl)) {
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {
impl_->tx_engine().LocalForEachActiveTransaction(
[this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(
impl_->config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
});
});
}
}
MasterBase::~MasterBase() {
is_accepting_transactions_ = false;
tx_engine().LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
}
SingleNode::SingleNode(Config config)
: MasterBase(std::make_unique<impl::SingleNode>(config)) {}

View File

@ -149,6 +149,8 @@ class PublicBase : public GraphDb {
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override;
distributed::RemoteDataManager &remote_data_manager() override;
bool is_accepting_transactions() const { return is_accepting_transactions_; }
protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
~PublicBase();
@ -158,21 +160,17 @@ class PublicBase : public GraphDb {
private:
std::unique_ptr<Scheduler> snapshot_creator_;
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};
Scheduler transaction_killer_;
void MakeSnapshot();
};
} // namespace impl
class MasterBase : public impl::PublicBase {
public:
explicit MasterBase(std::unique_ptr<impl::PrivateBase> impl);
bool is_accepting_transactions() const { return is_accepting_transactions_; }
~MasterBase();
private:
/** When this is false, no new transactions should be created. */
std::atomic<bool> is_accepting_transactions_{true};
Scheduler transaction_killer_;
using PublicBase::PublicBase;
};
class SingleNode : public MasterBase {

View File

@ -122,8 +122,12 @@ class RemoteProduceRpcServer {
cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR;
} catch (const query::RemoveAttachedVertexException &) {
cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR;
} catch (const query::QueryRuntimeException &) {
cursor_state_ = RemotePullState::QUERY_ERROR;
} catch (const query::HintedAbortError &) {
cursor_state_ = RemotePullState::HINTED_ABORT_ERROR;
}
return std::make_pair(std::move(results), cursor_state_);
}

View File

@ -30,6 +30,8 @@ enum class RemotePullState {
LOCK_TIMEOUT_ERROR,
UPDATE_DELETED_ERROR,
RECONSTRUCTION_ERROR,
UNABLE_TO_DELETE_VERTEX_ERROR,
HINTED_ABORT_ERROR,
QUERY_ERROR
};

View File

@ -3081,6 +3081,10 @@ class RemotePuller {
"RecordDeleted error ocured during PullRemote !");
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
throw query::ReconstructionException();
case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException();
case distributed::RemotePullState::HINTED_ABORT_ERROR:
throw HintedAbortError();
case distributed::RemotePullState::QUERY_ERROR:
throw QueryRuntimeException(
"Query runtime error occurred duing PullRemote !");
@ -3305,6 +3309,10 @@ class SynchronizeCursor : public Cursor {
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
throw QueryRuntimeException(
"Failed to perform remote accumulate due to ReconstructionError");
case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException();
case distributed::RemotePullState::HINTED_ABORT_ERROR:
throw HintedAbortError();
case distributed::RemotePullState::QUERY_ERROR:
throw QueryRuntimeException(
"Failed to perform remote accumulate due to Query runtime error");

View File

@ -26,6 +26,8 @@
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
DECLARE_int32(query_execution_time_sec);
using namespace distributed;
using namespace database;
@ -320,3 +322,46 @@ TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) {
EXPECT_TRUE(TypedValue::BoolEqual{}(results[j][0], j));
}
}
class DistributedTransactionTimeout : public DistributedGraphDbTest {
protected:
void SetUp() override {
FLAGS_query_execution_time_sec = 1;
DistributedGraphDbTest::SetUp();
}
};
TEST_F(DistributedTransactionTimeout, Timeout) {
InsertVertex(worker(1));
InsertVertex(worker(1));
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// Make distributed plan for MATCH (n) RETURN n
auto scan_all = MakeScanAll(storage, ctx.symbol_table_, "n");
auto output = NEXPR("n", IDENT("n"));
auto produce = MakeProduce(scan_all.op_, output);
ctx.symbol_table_[*output->expression_] = scan_all.sym_;
ctx.symbol_table_[*output] =
ctx.symbol_table_.CreateSymbol("named_expression_1", true);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*output]};
auto remote_pull = [this, &params, &symbols, &dba]() {
return master()
.remote_pull_clients()
.RemotePull(dba, 1, plan_id, params, symbols, false, 1)
.get()
.pull_state;
};
ASSERT_EQ(remote_pull(), distributed::RemotePullState::CURSOR_IN_PROGRESS);
// Sleep here so the remote gets a hinted error.
std::this_thread::sleep_for(2s);
EXPECT_EQ(remote_pull(), distributed::RemotePullState::HINTED_ABORT_ERROR);
}