Add a way to stop query execution

Reviewers: buda, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D552
This commit is contained in:
Mislav Bradac 2017-07-14 13:58:25 +02:00
parent efebba514f
commit 4a500b99fa
15 changed files with 191 additions and 75 deletions

View File

@ -1,5 +1,11 @@
# Change Log
## Next version
### Major Features and Improvements
* User specified transaction execution timeout.
## v0.6.0
### Major Features and Improvements

View File

@ -71,6 +71,7 @@ parameters:
--max-retained-snapshots | integer | 3 | Number of retained snapshots.<br/>Value -1 means without limit.
--snapshot-on-db-exit | bool | false | Make a snapshot when closing Memgraph.
--recover-on-startup | bool | false | Recover the database on startup using the last<br/>stored snapshot.
--query-execution-time-sec | integer | 180 | Maximum allowed query execution time. <br/>Queries exceeding this limit will be aborted. Value of -1 means no limit.
[^1]: Maximum number of concurrent executions on the current CPU.

View File

@ -18,6 +18,10 @@ DEFINE_int32(max_retained_snapshots, -1,
DEFINE_int32(snapshot_cycle_sec, -1,
"Amount of time between starts of two snapshooters in seconds. -1 "
"to turn off.");
DEFINE_int32(query_execution_time_sec, 180,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of -1 means no limit.");
DEFINE_bool(snapshot_on_db_exit, false, "Snapshot on exiting the database.");
DECLARE_string(snapshot_directory);
@ -32,19 +36,19 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir)
gc_scheduler_.Run(std::chrono::seconds(FLAGS_gc_cycle_sec), [this]() {
// main garbage collection logic
// see wiki documentation for logic explanation
const auto snapshot = this->tx_engine.GcSnapshot();
const auto snapshot = this->tx_engine_.GcSnapshot();
{
// This can be run concurrently
this->gc_vertices_.Run(snapshot, this->tx_engine);
this->gc_edges_.Run(snapshot, this->tx_engine);
this->gc_vertices_.Run(snapshot, this->tx_engine_);
this->gc_edges_.Run(snapshot, this->tx_engine_);
}
// This has to be run sequentially after gc because gc modifies
// version_lists and changes the oldest visible record, on which Refresh
// depends.
{
// This can be run concurrently
this->labels_index_.Refresh(snapshot, this->tx_engine);
this->edge_types_index_.Refresh(snapshot, this->tx_engine);
this->labels_index_.Refresh(snapshot, this->tx_engine_);
this->edge_types_index_.Refresh(snapshot, this->tx_engine_);
}
// we free expired objects with snapshot.back(), which is
// the ID of the oldest active transaction (or next active, if there
@ -60,6 +64,20 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir)
RecoverDatabase(snapshot_db_dir);
StartSnapshooting();
if (FLAGS_query_execution_time_sec != -1) {
transaction_killer_.Run(
std::chrono::seconds(std::max(1, FLAGS_query_execution_time_sec / 4)),
[this]() {
tx_engine_.ForEachActiveTransaction([](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(FLAGS_query_execution_time_sec) <
std::chrono::system_clock::now()) {
t.set_should_abort();
};
});
});
}
}
void GraphDb::StartSnapshooting() {
@ -105,6 +123,9 @@ GraphDb::~GraphDb() {
// deleted.
snapshot_creator_.Stop();
// Stop transaction killer.
transaction_killer_.Stop();
// Create last database snapshot
if (FLAGS_snapshot_on_db_exit == true) {
GraphDbAccessor db_accessor(*this);

View File

@ -68,7 +68,7 @@ class GraphDb {
void RecoverDatabase(const fs::path &snapshot_db_path);
/** transaction engine related to this database */
tx::Engine tx_engine;
tx::Engine tx_engine_;
// database name
// TODO consider if this is even necessary
@ -107,4 +107,7 @@ class GraphDb {
// Schedulers
Scheduler<std::mutex> gc_scheduler_;
Scheduler<std::mutex> snapshot_creator_;
// Periodically wakes up and hints to transactions that are running for a long
// time to stop their execution.
Scheduler<std::mutex> transaction_killer_;
};

View File

@ -8,7 +8,7 @@
#include "utils/assert.hpp"
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(db.tx_engine.Begin()) {}
: db_(db), transaction_(db.tx_engine_.Begin()) {}
GraphDbAccessor::~GraphDbAccessor() {
if (!commited_ && !aborted_) {
@ -36,6 +36,10 @@ void GraphDbAccessor::abort() {
aborted_ = true;
}
bool GraphDbAccessor::should_abort() const {
return transaction_->should_abort();
}
VertexAccessor GraphDbAccessor::insert_vertex() {
// create a vertex
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_);

View File

@ -275,7 +275,7 @@ class GraphDbAccessor {
// happened earlier. We have to first wait for every transaction that
// happend before, or a bit later than CreateIndex to end.
{
auto wait_transaction = db_.tx_engine.Begin();
auto wait_transaction = db_.tx_engine_.Begin();
for (auto id : wait_transaction->snapshot()) {
if (id == transaction_->id_) continue;
while (wait_transaction->engine_.clog().is_active(id))
@ -286,7 +286,7 @@ class GraphDbAccessor {
}
// This transaction surely sees everything that happened before CreateIndex.
auto transaction = db_.tx_engine.Begin();
auto transaction = db_.tx_engine_.Begin();
for (auto vertex_vlist : db_.vertices_.access()) {
auto vertex_record = vertex_vlist->find(*transaction);
@ -443,6 +443,11 @@ class GraphDbAccessor {
*/
void abort();
/**
* Return true if transaction is hinted to abort.
*/
bool should_abort() const;
/**
* Initializes the record pointers in the given accessor.
* The old_ and new_ pointers need to be initialized

View File

@ -73,8 +73,10 @@ void PrintResults(ResultStreamFaker results) {
auto &results_data = results.GetResults();
std::vector<std::vector<std::string>> result_strings(
results_data.size(), std::vector<std::string>(column_widths.size()));
for (int row_ind = 0; row_ind < results_data.size(); ++row_ind) {
for (int col_ind = 0; col_ind < column_widths.size(); ++col_ind) {
for (int row_ind = 0; row_ind < static_cast<int>(results_data.size());
++row_ind) {
for (int col_ind = 0; col_ind < static_cast<int>(column_widths.size());
++col_ind) {
std::string string_val =
TypedValueToString(results_data[row_ind][col_ind]);
column_widths[col_ind] =
@ -94,7 +96,8 @@ void PrintResults(ResultStreamFaker results) {
auto emit_result_vec = [&](const std::vector<std::string> result_vec) {
std::cout << "| ";
for (int col_ind = 0; col_ind < column_widths.size(); ++col_ind) {
for (int col_ind = 0; col_ind < static_cast<int>(column_widths.size());
++col_ind) {
const std::string &res = result_vec[col_ind];
std::cout << res << std::string(column_widths[col_ind] - res.size(), ' ');
std::cout << " | ";
@ -148,6 +151,8 @@ void query::Repl(Dbms &dbms) {
std::cout << "RUNTIME EXCEPTION: " << e.what() << std::endl;
} catch (const query::TypedValueException &e) {
std::cout << "TYPED VALUE EXCEPTION: " << e.what() << std::endl;
} catch (const query::HintedAbortError &e) {
std::cout << "HINTED ABORT ERROR: " << e.what() << std::endl;
} catch (const utils::NotYetImplemented &e) {
std::cout << e.what() << std::endl;
}

View File

@ -60,6 +60,12 @@ class TypeMismatchError : public SemanticException {
name, datum, expected)) {}
};
class HintedAbortError : public QueryException {
public:
using QueryException::QueryException;
HintedAbortError() : QueryException("") {}
};
/**
* An exception for an illegal operation that can not be detected
* before the query starts executing over data.

View File

@ -78,8 +78,8 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame,
if (input_cursor_->Pull(frame, symbol_table)) {
Create(frame, symbol_table);
return true;
} else
return false;
}
return false;
}
void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); }
@ -191,12 +191,14 @@ template <class TVerticesFun>
class ScanAllCursor : public Cursor {
public:
ScanAllCursor(Symbol output_symbol, std::unique_ptr<Cursor> input_cursor,
TVerticesFun get_vertices)
TVerticesFun get_vertices, GraphDbAccessor &db)
: output_symbol_(output_symbol),
input_cursor_(std::move(input_cursor)),
get_vertices_(std::move(get_vertices)) {}
get_vertices_(std::move(get_vertices)),
db_(db) {}
bool Pull(Frame &frame, const SymbolTable &symbol_table) override {
if (db_.should_abort()) throw HintedAbortError();
if (!vertices_ || vertices_it_.value() == vertices_.value().end()) {
if (!input_cursor_->Pull(frame, symbol_table)) return false;
// We need a getter function, because in case of exhausting a lazy
@ -227,6 +229,7 @@ class ScanAllCursor : public Cursor {
typename std::result_of<TVerticesFun(Frame &, const SymbolTable &)>::type>
vertices_;
std::experimental::optional<decltype(vertices_.value().begin())> vertices_it_;
GraphDbAccessor &db_;
};
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input,
@ -245,7 +248,7 @@ std::unique_ptr<Cursor> ScanAll::MakeCursor(GraphDbAccessor &db) {
return db.vertices(graph_view_ == GraphView::NEW);
};
return std::make_unique<ScanAllCursor<decltype(vertices)>>(
output_symbol_, input_->MakeCursor(db), std::move(vertices));
output_symbol_, input_->MakeCursor(db), std::move(vertices), db);
}
ScanAllByLabel::ScanAllByLabel(const std::shared_ptr<LogicalOperator> &input,
@ -260,7 +263,7 @@ std::unique_ptr<Cursor> ScanAllByLabel::MakeCursor(GraphDbAccessor &db) {
return db.vertices(label_, graph_view_ == GraphView::NEW);
};
return std::make_unique<ScanAllCursor<decltype(vertices)>>(
output_symbol_, input_->MakeCursor(db), std::move(vertices));
output_symbol_, input_->MakeCursor(db), std::move(vertices), db);
}
ScanAllByLabelPropertyRange::ScanAllByLabelPropertyRange(
@ -311,7 +314,7 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
db.vertices(label_, property_, graph_view_ == GraphView::NEW));
};
return std::make_unique<ScanAllCursor<decltype(vertices)>>(
output_symbol_, input_->MakeCursor(db), std::move(vertices));
output_symbol_, input_->MakeCursor(db), std::move(vertices), db);
}
ScanAllByLabelPropertyValue::ScanAllByLabelPropertyValue(
@ -334,6 +337,7 @@ class ScanAllByLabelPropertyValueCursor : public Cursor {
: self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db_)) {}
bool Pull(Frame &frame, const SymbolTable &symbol_table) override {
if (db_.should_abort()) throw HintedAbortError();
if (!vertices_ || vertices_it_.value() == vertices_.value().end()) {
if (!input_cursor_->Pull(frame, symbol_table)) return false;
ExpressionEvaluator evaluator(frame, symbol_table, db_,
@ -400,10 +404,11 @@ std::unique_ptr<Cursor> Expand::MakeCursor(GraphDbAccessor &db) {
}
Expand::ExpandCursor::ExpandCursor(const Expand &self, GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
: self_(self), input_cursor_(self.input_->MakeCursor(db)), db_(db) {}
bool Expand::ExpandCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
while (true) {
if (db_.should_abort()) throw HintedAbortError();
// attempt to get a value from the incoming edges
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
EdgeAccessor edge = *(*in_edges_it_)++;
@ -1117,7 +1122,7 @@ void Aggregate::AggregateCursor::ProcessAll(Frame &frame,
ProcessOne(frame, symbol_table, evaluator);
// calculate AVG aggregations (so far they have only been summed)
for (int pos = 0; pos < self_.aggregations_.size(); ++pos) {
for (int pos = 0; pos < static_cast<int>(self_.aggregations_.size()); ++pos) {
if (std::get<1>(self_.aggregations_[pos]) != Aggregation::Op::AVG) continue;
for (auto &kv : aggregation_) {
AggregationValue &agg_value = kv.second;
@ -1157,8 +1162,7 @@ void Aggregate::AggregateCursor::EnsureInitialized(
}
void Aggregate::AggregateCursor::Update(
Frame &frame, const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator,
Frame &, const SymbolTable &, ExpressionEvaluator &evaluator,
Aggregate::AggregateCursor::AggregationValue &agg_value) {
debug_assert(
self_.aggregations_.size() == agg_value.values_.size(),
@ -1580,7 +1584,7 @@ bool Merge::MergeCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
if (pull_input_) {
// if we have just now pulled from the input
// and failed to pull from merge_match, we should create
bool merge_create_pull_result =
__attribute__((unused)) bool merge_create_pull_result =
merge_create_cursor_->Pull(frame, symbol_table);
debug_assert(merge_create_pull_result, "MergeCreate must never fail");
return true;
@ -1680,6 +1684,7 @@ Unwind::UnwindCursor::UnwindCursor(Unwind &self, GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
bool Unwind::UnwindCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
if (db_.should_abort()) throw HintedAbortError();
// if we reached the end of our list of values
// pull from the input
if (input_value_it_ == input_value_.end()) {

View File

@ -557,6 +557,8 @@ class Expand : public LogicalOperator {
*/
bool HandleExistingNode(const VertexAccessor new_node, Frame &frame,
const SymbolTable &symbol_table);
GraphDbAccessor &db_;
};
};

View File

@ -32,7 +32,6 @@ class Engine : Lockable<SpinLock> {
static constexpr auto kMaxCommandId =
std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max();
public:
template <class T>
class SimpleCounter {
public:
@ -46,6 +45,7 @@ class Engine : Lockable<SpinLock> {
T counter;
};
public:
/** Begins a transaction and returns a pointer to
* it's object.
*
@ -87,39 +87,36 @@ class Engine : Lockable<SpinLock> {
return *t;
}
/** Returns the snapshot relevant to garbage collection
* of database records.
/** Returns the snapshot relevant to garbage collection of database records.
*
* If there are no active transactions that means
* a snapshot containing only the next transaction ID.
* If there are active transactions, that means the
* oldest active transaction's snapshot, with that
* transaction's ID appened as last.
* If there are no active transactions that means a snapshot containing only
* the next transaction ID. If there are active transactions, that means the
* oldest active transaction's snapshot, with that transaction's ID appened as
* last.
*
* The idea is that data records can only be deleted
* if they were expired (and that was committed) by
* a transaction older then the older currently active.
* We need the full snapshot to prevent overlaps (see
* general GC documentation).
* The idea is that data records can only be deleted if they were expired (and
* that was committed) by a transaction older then the older currently active.
* We need the full snapshot to prevent overlaps (see general GC
* documentation).
*/
Snapshot GcSnapshot() {
auto guard = this->acquire_unique();
// no active transactions
// No active transactions.
if (active_.size() == 0) {
auto snapshot_copy = active_;
snapshot_copy.insert(counter_.count() + 1);
return snapshot_copy;
}
// there are active transactions
// There are active transactions.
auto snapshot_copy = store_.get(active_.front())->snapshot();
snapshot_copy.insert(active_.front());
return snapshot_copy;
}
/** Comits the given transaction. Deletes the transaction
* object, it's not valid after this function executes. */
/** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Commit(const Transaction &t) {
auto guard = this->acquire_unique();
clog_.set_committed(t.id_);
@ -127,8 +124,8 @@ class Engine : Lockable<SpinLock> {
Finalize(t);
}
/** Aborts the given transaction. Deletes the transaction
* object, it's not valid after this function executes. */
/** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Abort(const Transaction &t) {
auto guard = this->acquire_unique();
clog_.set_aborted(t.id_);
@ -136,8 +133,8 @@ class Engine : Lockable<SpinLock> {
Finalize(t);
}
/** The total number of transactions that have
* executed since the creation of this engine */
/** The total number of transactions that have executed since the creation of
* this engine */
auto Count() {
auto guard = this->acquire_unique();
return counter_.count();
@ -149,28 +146,36 @@ class Engine : Lockable<SpinLock> {
return active_.size();
}
/** Calls function f on each active transaction. */
void ForEachActiveTransaction(std::function<void(Transaction &)> f) {
this->acquire_unique();
for (auto transaction : active_) {
f(*store_.get(transaction));
}
}
/** Returns this engine's commit log */
auto &clog() const { return clog_; }
private:
// commit log of this engine
// Commit log of this engine.
CommitLog clog_;
// Performs cleanup common to ending the transaction
// with either commit or abort
// Performs cleanup common to ending the transaction with either commit or
// abort.
void Finalize(const Transaction &t) {
active_.remove(t.id_);
store_.del(t.id_);
}
// transaction counter. contains the number of transactions
// ever created till now
// Transaction counter. contains the number of transactions ever created till
// now.
SimpleCounter<transaction_id_t> counter_{0};
// a snapshot of currently active transactions
// A snapshot of currently active transactions.
Snapshot active_;
// storage for the transactions
// Storage for the transactions.
TransactionStore<transaction_id_t> store_;
};
}

View File

@ -73,8 +73,10 @@ class Snapshot {
bool operator==(const Snapshot &other) const {
return transaction_ids_ == other.transaction_ids_;
}
auto begin() const { return transaction_ids_.begin(); }
auto end() const { return transaction_ids_.end(); }
auto begin() { return transaction_ids_.begin(); }
auto end() { return transaction_ids_.end(); }
auto begin() const { return transaction_ids_.cbegin(); }
auto end() const { return transaction_ids_.cend(); }
friend std::ostream &operator<<(std::ostream &stream,
const Snapshot &snapshot) {

View File

@ -1,12 +1,9 @@
#include "transactions/transaction.hpp"
#include <chrono> // std::chrono::seconds
#include <thread> // std::this_thread::sleep_for
#include "transactions/engine.hpp"
namespace tx {
Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot,
Engine &engine)
: id_(id), engine_(engine), snapshot_(snapshot) {}

View File

@ -1,19 +1,21 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <vector>
#include "storage/locking/record_lock.hpp"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/lock_store.hpp"
#include "transactions/snapshot.hpp"
#include "type.hpp"
namespace tx {
/** A database transaction. Encapsulates an atomic,
* abortable unit of work. Also defines that all db
* ops are single-threaded within a single transaction */
/** A database transaction. Encapsulates an atomic, abortable unit of work. Also
* defines that all db ops are single-threaded within a single transaction */
class Transaction {
public:
/** Returns the maximum possible transcation id */
@ -24,29 +26,27 @@ class Transaction {
private:
friend class Engine;
// the constructor is private, only the Engine ever uses it
// The constructor is private, only the Engine ever uses it.
Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine);
// a transaction can't be moved nor copied. it's owned by the transaction
// engine, and it's lifetime is managed by it
// A transaction can't be moved nor copied. it's owned by the transaction
// engine, and it's lifetime is managed by it.
Transaction(const Transaction &) = delete;
Transaction(Transaction &&) = delete;
Transaction &operator=(const Transaction &) = delete;
Transaction &operator=(Transaction &&) = delete;
public:
/** Acquires the lock over the given RecordLock, preventing
* other transactions from doing the same */
/** Acquires the lock over the given RecordLock, preventing other transactions
* from doing the same */
void TakeLock(RecordLock &lock);
/** Commits this transaction. After this call this transaction
* object is no longer valid for use (it gets deleted by the
* engine that owns it). */
/** Commits this transaction. After this call this transaction object is no
* longer valid for use (it gets deleted by the engine that owns it). */
void Commit();
/** Aborts this transaction. After this call this transaction
* object is no longer valid for use (it gets deleted by the
* engine that owns it). */
/** Aborts this transaction. After this call this transaction object is no
* longer valid for use (it gets deleted by the engine that owns it). */
void Abort();
/** Transaction's id. Unique in the engine that owns it */
@ -62,12 +62,28 @@ class Transaction {
/** Returns this transaction's snapshot. */
const Snapshot &snapshot() const { return snapshot_; }
/** Signal to transaction that it should abort. It doesn't really enforce that
* transaction will abort, but it merely hints too the transaction that it is
* preferable to stop its execution.
*/
void set_should_abort() { should_abort_ = true; }
bool should_abort() const { return should_abort_; }
auto creation_time() const { return creation_time_; }
private:
// index of the current command in the current transaction;
// Index of the current command in the current transaction.
command_id_t cid_{1};
// a snapshot of currently active transactions
// A snapshot of currently active transactions.
const Snapshot snapshot_;
// locks
// Record locks held by this transaction.
LockStore<RecordLock> locks_;
// True if transaction should abort. Used to signal query executor that it
// should stop execution, it is only a hint, transaction can disobey.
std::atomic<bool> should_abort_{false};
// Creation time.
const std::chrono::time_point<std::chrono::system_clock> creation_time_{
std::chrono::system_clock::now()};
};
}

View File

@ -0,0 +1,38 @@
#include <glog/logging.h>
#include "communication/result_stream_faker.hpp"
#include "database/dbms.hpp"
#include "query/engine.hpp"
#include "query/exceptions.hpp"
#include <gtest/gtest.h>
DECLARE_int32(query_execution_time_sec);
TEST(TransactionTimeout, TransactionTimeout) {
FLAGS_query_execution_time_sec = 3;
Dbms dbms;
QueryEngine<ResultStreamFaker> engine;
{
ResultStreamFaker stream;
auto dba1 = dbms.active();
engine.Run("MATCH (n) RETURN n", *dba1, stream);
}
{
ResultStreamFaker stream;
auto dba2 = dbms.active();
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_THROW(engine.Run("MATCH (n) RETURN n", *dba2, stream),
query::HintedAbortError);
}
{
ResultStreamFaker stream;
auto dba3 = dbms.active();
engine.Run("MATCH (n) RETURN n", *dba3, stream);
}
}
int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}