diff --git a/CHANGELOG.md b/CHANGELOG.md index 94da5d602..d9e7c15fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Change Log +## Next version + +### Major Features and Improvements + +* User specified transaction execution timeout. + ## v0.6.0 ### Major Features and Improvements diff --git a/docs/user_technical/installation.md b/docs/user_technical/installation.md index 3f4ac139d..c1827e5c7 100644 --- a/docs/user_technical/installation.md +++ b/docs/user_technical/installation.md @@ -71,6 +71,7 @@ parameters: --max-retained-snapshots | integer | 3 | Number of retained snapshots.<br/>Value -1 means without limit. --snapshot-on-db-exit | bool | false | Make a snapshot when closing Memgraph. --recover-on-startup | bool | false | Recover the database on startup using the last<br/>stored snapshot. + --query-execution-time-sec | integer | 180 | Maximum allowed query execution time. <br/>Queries exceeding this limit will be aborted. Value of -1 means no limit. [^1]: Maximum number of concurrent executions on the current CPU. diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 272ce4981..bb16c5cf1 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -18,6 +18,10 @@ DEFINE_int32(max_retained_snapshots, -1, DEFINE_int32(snapshot_cycle_sec, -1, "Amount of time between starts of two snapshooters in seconds. -1 " "to turn off."); +DEFINE_int32(query_execution_time_sec, 180, + "Maximum allowed query execution time. Queries exceeding this " + "limit will be aborted. Value of -1 means no limit."); + DEFINE_bool(snapshot_on_db_exit, false, "Snapshot on exiting the database."); DECLARE_string(snapshot_directory); @@ -32,19 +36,19 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir) gc_scheduler_.Run(std::chrono::seconds(FLAGS_gc_cycle_sec), [this]() { // main garbage collection logic // see wiki documentation for logic explanation - const auto snapshot = this->tx_engine.GcSnapshot(); + const auto snapshot = this->tx_engine_.GcSnapshot(); { // This can be run concurrently - this->gc_vertices_.Run(snapshot, this->tx_engine); - this->gc_edges_.Run(snapshot, this->tx_engine); + this->gc_vertices_.Run(snapshot, this->tx_engine_); + this->gc_edges_.Run(snapshot, this->tx_engine_); } // This has to be run sequentially after gc because gc modifies // version_lists and changes the oldest visible record, on which Refresh // depends. { // This can be run concurrently - this->labels_index_.Refresh(snapshot, this->tx_engine); - this->edge_types_index_.Refresh(snapshot, this->tx_engine); + this->labels_index_.Refresh(snapshot, this->tx_engine_); + this->edge_types_index_.Refresh(snapshot, this->tx_engine_); } // we free expired objects with snapshot.back(), which is // the ID of the oldest active transaction (or next active, if there @@ -60,6 +64,20 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir) RecoverDatabase(snapshot_db_dir); StartSnapshooting(); + + if (FLAGS_query_execution_time_sec != -1) { + transaction_killer_.Run( + std::chrono::seconds(std::max(1, FLAGS_query_execution_time_sec / 4)), + [this]() { + tx_engine_.ForEachActiveTransaction([](tx::Transaction &t) { + if (t.creation_time() + + std::chrono::seconds(FLAGS_query_execution_time_sec) < + std::chrono::system_clock::now()) { + t.set_should_abort(); + }; + }); + }); + } } void GraphDb::StartSnapshooting() { @@ -105,6 +123,9 @@ GraphDb::~GraphDb() { // deleted. snapshot_creator_.Stop(); + // Stop transaction killer. + transaction_killer_.Stop(); + // Create last database snapshot if (FLAGS_snapshot_on_db_exit == true) { GraphDbAccessor db_accessor(*this); diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 042f8d45c..c4b460414 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -68,7 +68,7 @@ class GraphDb { void RecoverDatabase(const fs::path &snapshot_db_path); /** transaction engine related to this database */ - tx::Engine tx_engine; + tx::Engine tx_engine_; // database name // TODO consider if this is even necessary @@ -107,4 +107,7 @@ class GraphDb { // Schedulers Scheduler<std::mutex> gc_scheduler_; Scheduler<std::mutex> snapshot_creator_; + // Periodically wakes up and hints to transactions that are running for a long + // time to stop their execution. + Scheduler<std::mutex> transaction_killer_; }; diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index c282c38da..4f52ad7f3 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -8,7 +8,7 @@ #include "utils/assert.hpp" GraphDbAccessor::GraphDbAccessor(GraphDb &db) - : db_(db), transaction_(db.tx_engine.Begin()) {} + : db_(db), transaction_(db.tx_engine_.Begin()) {} GraphDbAccessor::~GraphDbAccessor() { if (!commited_ && !aborted_) { @@ -36,6 +36,10 @@ void GraphDbAccessor::abort() { aborted_ = true; } +bool GraphDbAccessor::should_abort() const { + return transaction_->should_abort(); +} + VertexAccessor GraphDbAccessor::insert_vertex() { // create a vertex auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_); diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index c1d26dfdf..d7c4a9b68 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -275,7 +275,7 @@ class GraphDbAccessor { // happened earlier. We have to first wait for every transaction that // happend before, or a bit later than CreateIndex to end. { - auto wait_transaction = db_.tx_engine.Begin(); + auto wait_transaction = db_.tx_engine_.Begin(); for (auto id : wait_transaction->snapshot()) { if (id == transaction_->id_) continue; while (wait_transaction->engine_.clog().is_active(id)) @@ -286,7 +286,7 @@ class GraphDbAccessor { } // This transaction surely sees everything that happened before CreateIndex. - auto transaction = db_.tx_engine.Begin(); + auto transaction = db_.tx_engine_.Begin(); for (auto vertex_vlist : db_.vertices_.access()) { auto vertex_record = vertex_vlist->find(*transaction); @@ -443,6 +443,11 @@ class GraphDbAccessor { */ void abort(); + /** + * Return true if transaction is hinted to abort. + */ + bool should_abort() const; + /** * Initializes the record pointers in the given accessor. * The old_ and new_ pointers need to be initialized diff --git a/src/query/console.cpp b/src/query/console.cpp index 07afa3acf..60597c81e 100644 --- a/src/query/console.cpp +++ b/src/query/console.cpp @@ -73,8 +73,10 @@ void PrintResults(ResultStreamFaker results) { auto &results_data = results.GetResults(); std::vector<std::vector<std::string>> result_strings( results_data.size(), std::vector<std::string>(column_widths.size())); - for (int row_ind = 0; row_ind < results_data.size(); ++row_ind) { - for (int col_ind = 0; col_ind < column_widths.size(); ++col_ind) { + for (int row_ind = 0; row_ind < static_cast<int>(results_data.size()); + ++row_ind) { + for (int col_ind = 0; col_ind < static_cast<int>(column_widths.size()); + ++col_ind) { std::string string_val = TypedValueToString(results_data[row_ind][col_ind]); column_widths[col_ind] = @@ -94,7 +96,8 @@ void PrintResults(ResultStreamFaker results) { auto emit_result_vec = [&](const std::vector<std::string> result_vec) { std::cout << "| "; - for (int col_ind = 0; col_ind < column_widths.size(); ++col_ind) { + for (int col_ind = 0; col_ind < static_cast<int>(column_widths.size()); + ++col_ind) { const std::string &res = result_vec[col_ind]; std::cout << res << std::string(column_widths[col_ind] - res.size(), ' '); std::cout << " | "; @@ -148,6 +151,8 @@ void query::Repl(Dbms &dbms) { std::cout << "RUNTIME EXCEPTION: " << e.what() << std::endl; } catch (const query::TypedValueException &e) { std::cout << "TYPED VALUE EXCEPTION: " << e.what() << std::endl; + } catch (const query::HintedAbortError &e) { + std::cout << "HINTED ABORT ERROR: " << e.what() << std::endl; } catch (const utils::NotYetImplemented &e) { std::cout << e.what() << std::endl; } diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 992dae196..a8651cbb7 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -60,6 +60,12 @@ class TypeMismatchError : public SemanticException { name, datum, expected)) {} }; +class HintedAbortError : public QueryException { + public: + using QueryException::QueryException; + HintedAbortError() : QueryException("") {} +}; + /** * An exception for an illegal operation that can not be detected * before the query starts executing over data. diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index ca18161b4..b3340a602 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -78,8 +78,8 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, if (input_cursor_->Pull(frame, symbol_table)) { Create(frame, symbol_table); return true; - } else - return false; + } + return false; } void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); } @@ -191,12 +191,14 @@ template <class TVerticesFun> class ScanAllCursor : public Cursor { public: ScanAllCursor(Symbol output_symbol, std::unique_ptr<Cursor> input_cursor, - TVerticesFun get_vertices) + TVerticesFun get_vertices, GraphDbAccessor &db) : output_symbol_(output_symbol), input_cursor_(std::move(input_cursor)), - get_vertices_(std::move(get_vertices)) {} + get_vertices_(std::move(get_vertices)), + db_(db) {} bool Pull(Frame &frame, const SymbolTable &symbol_table) override { + if (db_.should_abort()) throw HintedAbortError(); if (!vertices_ || vertices_it_.value() == vertices_.value().end()) { if (!input_cursor_->Pull(frame, symbol_table)) return false; // We need a getter function, because in case of exhausting a lazy @@ -227,6 +229,7 @@ class ScanAllCursor : public Cursor { typename std::result_of<TVerticesFun(Frame &, const SymbolTable &)>::type> vertices_; std::experimental::optional<decltype(vertices_.value().begin())> vertices_it_; + GraphDbAccessor &db_; }; ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, @@ -245,7 +248,7 @@ std::unique_ptr<Cursor> ScanAll::MakeCursor(GraphDbAccessor &db) { return db.vertices(graph_view_ == GraphView::NEW); }; return std::make_unique<ScanAllCursor<decltype(vertices)>>( - output_symbol_, input_->MakeCursor(db), std::move(vertices)); + output_symbol_, input_->MakeCursor(db), std::move(vertices), db); } ScanAllByLabel::ScanAllByLabel(const std::shared_ptr<LogicalOperator> &input, @@ -260,7 +263,7 @@ std::unique_ptr<Cursor> ScanAllByLabel::MakeCursor(GraphDbAccessor &db) { return db.vertices(label_, graph_view_ == GraphView::NEW); }; return std::make_unique<ScanAllCursor<decltype(vertices)>>( - output_symbol_, input_->MakeCursor(db), std::move(vertices)); + output_symbol_, input_->MakeCursor(db), std::move(vertices), db); } ScanAllByLabelPropertyRange::ScanAllByLabelPropertyRange( @@ -311,7 +314,7 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor( db.vertices(label_, property_, graph_view_ == GraphView::NEW)); }; return std::make_unique<ScanAllCursor<decltype(vertices)>>( - output_symbol_, input_->MakeCursor(db), std::move(vertices)); + output_symbol_, input_->MakeCursor(db), std::move(vertices), db); } ScanAllByLabelPropertyValue::ScanAllByLabelPropertyValue( @@ -334,6 +337,7 @@ class ScanAllByLabelPropertyValueCursor : public Cursor { : self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db_)) {} bool Pull(Frame &frame, const SymbolTable &symbol_table) override { + if (db_.should_abort()) throw HintedAbortError(); if (!vertices_ || vertices_it_.value() == vertices_.value().end()) { if (!input_cursor_->Pull(frame, symbol_table)) return false; ExpressionEvaluator evaluator(frame, symbol_table, db_, @@ -400,10 +404,11 @@ std::unique_ptr<Cursor> Expand::MakeCursor(GraphDbAccessor &db) { } Expand::ExpandCursor::ExpandCursor(const Expand &self, GraphDbAccessor &db) - : self_(self), input_cursor_(self.input_->MakeCursor(db)) {} + : self_(self), input_cursor_(self.input_->MakeCursor(db)), db_(db) {} bool Expand::ExpandCursor::Pull(Frame &frame, const SymbolTable &symbol_table) { while (true) { + if (db_.should_abort()) throw HintedAbortError(); // attempt to get a value from the incoming edges if (in_edges_ && *in_edges_it_ != in_edges_->end()) { EdgeAccessor edge = *(*in_edges_it_)++; @@ -1117,7 +1122,7 @@ void Aggregate::AggregateCursor::ProcessAll(Frame &frame, ProcessOne(frame, symbol_table, evaluator); // calculate AVG aggregations (so far they have only been summed) - for (int pos = 0; pos < self_.aggregations_.size(); ++pos) { + for (int pos = 0; pos < static_cast<int>(self_.aggregations_.size()); ++pos) { if (std::get<1>(self_.aggregations_[pos]) != Aggregation::Op::AVG) continue; for (auto &kv : aggregation_) { AggregationValue &agg_value = kv.second; @@ -1157,8 +1162,7 @@ void Aggregate::AggregateCursor::EnsureInitialized( } void Aggregate::AggregateCursor::Update( - Frame &frame, const SymbolTable &symbol_table, - ExpressionEvaluator &evaluator, + Frame &, const SymbolTable &, ExpressionEvaluator &evaluator, Aggregate::AggregateCursor::AggregationValue &agg_value) { debug_assert( self_.aggregations_.size() == agg_value.values_.size(), @@ -1580,7 +1584,7 @@ bool Merge::MergeCursor::Pull(Frame &frame, const SymbolTable &symbol_table) { if (pull_input_) { // if we have just now pulled from the input // and failed to pull from merge_match, we should create - bool merge_create_pull_result = + __attribute__((unused)) bool merge_create_pull_result = merge_create_cursor_->Pull(frame, symbol_table); debug_assert(merge_create_pull_result, "MergeCreate must never fail"); return true; @@ -1680,6 +1684,7 @@ Unwind::UnwindCursor::UnwindCursor(Unwind &self, GraphDbAccessor &db) : self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {} bool Unwind::UnwindCursor::Pull(Frame &frame, const SymbolTable &symbol_table) { + if (db_.should_abort()) throw HintedAbortError(); // if we reached the end of our list of values // pull from the input if (input_value_it_ == input_value_.end()) { diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index c7ef38b50..87e742626 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -557,6 +557,8 @@ class Expand : public LogicalOperator { */ bool HandleExistingNode(const VertexAccessor new_node, Frame &frame, const SymbolTable &symbol_table); + + GraphDbAccessor &db_; }; }; diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index f6b5da5c7..c1c3dc895 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -32,7 +32,6 @@ class Engine : Lockable<SpinLock> { static constexpr auto kMaxCommandId = std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max(); - public: template <class T> class SimpleCounter { public: @@ -46,6 +45,7 @@ class Engine : Lockable<SpinLock> { T counter; }; + public: /** Begins a transaction and returns a pointer to * it's object. * @@ -87,39 +87,36 @@ class Engine : Lockable<SpinLock> { return *t; } - /** Returns the snapshot relevant to garbage collection - * of database records. + /** Returns the snapshot relevant to garbage collection of database records. * - * If there are no active transactions that means - * a snapshot containing only the next transaction ID. - * If there are active transactions, that means the - * oldest active transaction's snapshot, with that - * transaction's ID appened as last. + * If there are no active transactions that means a snapshot containing only + * the next transaction ID. If there are active transactions, that means the + * oldest active transaction's snapshot, with that transaction's ID appened as + * last. * - * The idea is that data records can only be deleted - * if they were expired (and that was committed) by - * a transaction older then the older currently active. - * We need the full snapshot to prevent overlaps (see - * general GC documentation). + * The idea is that data records can only be deleted if they were expired (and + * that was committed) by a transaction older then the older currently active. + * We need the full snapshot to prevent overlaps (see general GC + * documentation). */ Snapshot GcSnapshot() { auto guard = this->acquire_unique(); - // no active transactions + // No active transactions. if (active_.size() == 0) { auto snapshot_copy = active_; snapshot_copy.insert(counter_.count() + 1); return snapshot_copy; } - // there are active transactions + // There are active transactions. auto snapshot_copy = store_.get(active_.front())->snapshot(); snapshot_copy.insert(active_.front()); return snapshot_copy; } - /** Comits the given transaction. Deletes the transaction - * object, it's not valid after this function executes. */ + /** Comits the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ void Commit(const Transaction &t) { auto guard = this->acquire_unique(); clog_.set_committed(t.id_); @@ -127,8 +124,8 @@ class Engine : Lockable<SpinLock> { Finalize(t); } - /** Aborts the given transaction. Deletes the transaction - * object, it's not valid after this function executes. */ + /** Aborts the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ void Abort(const Transaction &t) { auto guard = this->acquire_unique(); clog_.set_aborted(t.id_); @@ -136,8 +133,8 @@ class Engine : Lockable<SpinLock> { Finalize(t); } - /** The total number of transactions that have - * executed since the creation of this engine */ + /** The total number of transactions that have executed since the creation of + * this engine */ auto Count() { auto guard = this->acquire_unique(); return counter_.count(); @@ -149,28 +146,36 @@ class Engine : Lockable<SpinLock> { return active_.size(); } + /** Calls function f on each active transaction. */ + void ForEachActiveTransaction(std::function<void(Transaction &)> f) { + this->acquire_unique(); + for (auto transaction : active_) { + f(*store_.get(transaction)); + } + } + /** Returns this engine's commit log */ auto &clog() const { return clog_; } private: - // commit log of this engine + // Commit log of this engine. CommitLog clog_; - // Performs cleanup common to ending the transaction - // with either commit or abort + // Performs cleanup common to ending the transaction with either commit or + // abort. void Finalize(const Transaction &t) { active_.remove(t.id_); store_.del(t.id_); } - // transaction counter. contains the number of transactions - // ever created till now + // Transaction counter. contains the number of transactions ever created till + // now. SimpleCounter<transaction_id_t> counter_{0}; - // a snapshot of currently active transactions + // A snapshot of currently active transactions. Snapshot active_; - // storage for the transactions + // Storage for the transactions. TransactionStore<transaction_id_t> store_; }; } diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index 9399952d4..46dce6ad5 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -73,8 +73,10 @@ class Snapshot { bool operator==(const Snapshot &other) const { return transaction_ids_ == other.transaction_ids_; } - auto begin() const { return transaction_ids_.begin(); } - auto end() const { return transaction_ids_.end(); } + auto begin() { return transaction_ids_.begin(); } + auto end() { return transaction_ids_.end(); } + auto begin() const { return transaction_ids_.cbegin(); } + auto end() const { return transaction_ids_.cend(); } friend std::ostream &operator<<(std::ostream &stream, const Snapshot &snapshot) { diff --git a/src/transactions/transaction.cpp b/src/transactions/transaction.cpp index 315f98413..ae68bab11 100644 --- a/src/transactions/transaction.cpp +++ b/src/transactions/transaction.cpp @@ -1,12 +1,9 @@ #include "transactions/transaction.hpp" -#include <chrono> // std::chrono::seconds - -#include <thread> // std::this_thread::sleep_for - #include "transactions/engine.hpp" namespace tx { + Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine) : id_(id), engine_(engine), snapshot_(snapshot) {} diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 7dd315b70..ec014081d 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -1,19 +1,21 @@ #pragma once +#include <chrono> #include <cstdint> #include <cstdlib> #include <vector> #include "storage/locking/record_lock.hpp" +#include "threading/sync/lockable.hpp" +#include "threading/sync/spinlock.hpp" #include "transactions/lock_store.hpp" #include "transactions/snapshot.hpp" #include "type.hpp" namespace tx { -/** A database transaction. Encapsulates an atomic, - * abortable unit of work. Also defines that all db - * ops are single-threaded within a single transaction */ +/** A database transaction. Encapsulates an atomic, abortable unit of work. Also + * defines that all db ops are single-threaded within a single transaction */ class Transaction { public: /** Returns the maximum possible transcation id */ @@ -24,29 +26,27 @@ class Transaction { private: friend class Engine; - // the constructor is private, only the Engine ever uses it + // The constructor is private, only the Engine ever uses it. Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine); - // a transaction can't be moved nor copied. it's owned by the transaction - // engine, and it's lifetime is managed by it + // A transaction can't be moved nor copied. it's owned by the transaction + // engine, and it's lifetime is managed by it. Transaction(const Transaction &) = delete; Transaction(Transaction &&) = delete; Transaction &operator=(const Transaction &) = delete; Transaction &operator=(Transaction &&) = delete; public: - /** Acquires the lock over the given RecordLock, preventing - * other transactions from doing the same */ + /** Acquires the lock over the given RecordLock, preventing other transactions + * from doing the same */ void TakeLock(RecordLock &lock); - /** Commits this transaction. After this call this transaction - * object is no longer valid for use (it gets deleted by the - * engine that owns it). */ + /** Commits this transaction. After this call this transaction object is no + * longer valid for use (it gets deleted by the engine that owns it). */ void Commit(); - /** Aborts this transaction. After this call this transaction - * object is no longer valid for use (it gets deleted by the - * engine that owns it). */ + /** Aborts this transaction. After this call this transaction object is no + * longer valid for use (it gets deleted by the engine that owns it). */ void Abort(); /** Transaction's id. Unique in the engine that owns it */ @@ -62,12 +62,28 @@ class Transaction { /** Returns this transaction's snapshot. */ const Snapshot &snapshot() const { return snapshot_; } + /** Signal to transaction that it should abort. It doesn't really enforce that + * transaction will abort, but it merely hints too the transaction that it is + * preferable to stop its execution. + */ + void set_should_abort() { should_abort_ = true; } + + bool should_abort() const { return should_abort_; } + + auto creation_time() const { return creation_time_; } + private: - // index of the current command in the current transaction; + // Index of the current command in the current transaction. command_id_t cid_{1}; - // a snapshot of currently active transactions + // A snapshot of currently active transactions. const Snapshot snapshot_; - // locks + // Record locks held by this transaction. LockStore<RecordLock> locks_; + // True if transaction should abort. Used to signal query executor that it + // should stop execution, it is only a hint, transaction can disobey. + std::atomic<bool> should_abort_{false}; + // Creation time. + const std::chrono::time_point<std::chrono::system_clock> creation_time_{ + std::chrono::system_clock::now()}; }; } diff --git a/tests/unit/database_transaction_timeout.cpp b/tests/unit/database_transaction_timeout.cpp new file mode 100644 index 000000000..d30808efc --- /dev/null +++ b/tests/unit/database_transaction_timeout.cpp @@ -0,0 +1,38 @@ +#include <glog/logging.h> +#include "communication/result_stream_faker.hpp" +#include "database/dbms.hpp" +#include "query/engine.hpp" +#include "query/exceptions.hpp" + +#include <gtest/gtest.h> + +DECLARE_int32(query_execution_time_sec); + +TEST(TransactionTimeout, TransactionTimeout) { + FLAGS_query_execution_time_sec = 3; + Dbms dbms; + QueryEngine<ResultStreamFaker> engine; + { + ResultStreamFaker stream; + auto dba1 = dbms.active(); + engine.Run("MATCH (n) RETURN n", *dba1, stream); + } + { + ResultStreamFaker stream; + auto dba2 = dbms.active(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + ASSERT_THROW(engine.Run("MATCH (n) RETURN n", *dba2, stream), + query::HintedAbortError); + } + { + ResultStreamFaker stream; + auto dba3 = dbms.active(); + engine.Run("MATCH (n) RETURN n", *dba3, stream); + } +} + +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}