diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 1973047e6..98b769db8 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -97,7 +97,7 @@ void SingleNodeMain() { #else database::GraphDb db; #endif - query::InterpreterContext interpreter_context; + query::InterpreterContext interpreter_context{&db}; SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; integrations::kafka::Streams kafka_streams{ diff --git a/src/memgraph_ha.cpp b/src/memgraph_ha.cpp index 4bafe3b6c..aa6be558f 100644 --- a/src/memgraph_ha.cpp +++ b/src/memgraph_ha.cpp @@ -40,7 +40,7 @@ void SingleNodeHAMain() { auto durability_directory = std::filesystem::path(FLAGS_durability_directory); database::GraphDb db; - query::InterpreterContext interpreter_context; + query::InterpreterContext interpreter_context{&db}; SessionData session_data{&db, &interpreter_context, nullptr, nullptr}; ServerContext context; diff --git a/src/memgraph_init.cpp b/src/memgraph_init.cpp index df43b4cf2..23362a919 100644 --- a/src/memgraph_init.cpp +++ b/src/memgraph_init.cpp @@ -33,7 +33,6 @@ BoltSession::BoltSession(SessionData *data, db_(data->db), #endif interpreter_(data->interpreter_context), - transaction_engine_(data->db, &interpreter_), #ifndef MG_SINGLE_NODE_HA auth_(data->auth), audit_log_(data->audit_log), @@ -56,14 +55,14 @@ std::vector<std::string> BoltSession::Interpret( PropertyValue(params_pv)); #endif try { - auto result = transaction_engine_.Interpret(query, params_pv); + auto result = interpreter_.Interpret(query, params_pv); #ifndef MG_SINGLE_NODE_HA if (user_) { const auto &permissions = user_->GetPermissions(); for (const auto &privilege : result.second) { if (permissions.Has(glue::PrivilegeToPermission(privilege)) != auth::PermissionLevel::GRANT) { - transaction_engine_.Abort(); + interpreter_.Abort(); throw communication::bolt::ClientError( "You are not authorized to execute this query! Please contact " "your database administrator."); @@ -88,7 +87,7 @@ std::map<std::string, communication::bolt::Value> BoltSession::PullAll( #else TypedValueResultStream stream(encoder); #endif - const auto &summary = transaction_engine_.PullAll(&stream); + const auto &summary = interpreter_.PullAll(&stream); std::map<std::string, communication::bolt::Value> decoded_summary; for (const auto &kv : summary) { #ifdef MG_SINGLE_NODE_V2 @@ -117,7 +116,7 @@ std::map<std::string, communication::bolt::Value> BoltSession::PullAll( } } -void BoltSession::Abort() { transaction_engine_.Abort(); } +void BoltSession::Abort() { interpreter_.Abort(); } bool BoltSession::Authenticate(const std::string &username, const std::string &password) { @@ -169,35 +168,22 @@ void BoltSession::TypedValueResultStream::Result( void KafkaStreamWriter( SessionData &session_data, const std::string &query, const std::map<std::string, communication::bolt::Value> ¶ms) { - auto dba = session_data.db->Access(); - query::DbAccessor execution_dba(&dba); + query::Interpreter interpreter(session_data.interpreter_context); KafkaResultStream stream; std::map<std::string, PropertyValue> params_pv; for (const auto &kv : params) params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second)); + try { - query::Interpreter interpreter{session_data.interpreter_context}; - interpreter(query, &execution_dba, params_pv, false, - utils::NewDeleteResource()) - .PullAll(stream); -#ifdef MG_SINGLE_NODE_V2 - auto maybe_constraint_violation = dba.Commit(); - if (maybe_constraint_violation.HasError()) { - const auto &constraint_violation = maybe_constraint_violation.GetError(); - auto label_name = dba.LabelToName(constraint_violation.label); - auto property_name = dba.PropertyToName(constraint_violation.property); - LOG(WARNING) << fmt::format( - "[Kafka] query execution failed with an exception: " - "Unable to commit due to constraint violation on :{}({}).", - label_name, property_name); - } -#else - dba.Commit(); -#endif + // NOTE: This potentially allows Kafka streams to execute transaction + // control queries. However, those will not really work as a new + // `Interpreter` instance is created upon every call to this function, + // meaning any multicommand transaction state is lost. + interpreter.Interpret(query, params_pv); + interpreter.PullAll(&stream); } catch (const utils::BasicException &e) { LOG(WARNING) << "[Kafka] query execution failed with an exception: " << e.what(); - dba.Abort(); } }; diff --git a/src/memgraph_init.hpp b/src/memgraph_init.hpp index c896720e1..f8902a740 100644 --- a/src/memgraph_init.hpp +++ b/src/memgraph_init.hpp @@ -16,7 +16,6 @@ #include "communication/init.hpp" #include "communication/session.hpp" #include "query/interpreter.hpp" -#include "query/transaction_engine.hpp" #ifdef MG_SINGLE_NODE_V2 namespace database { @@ -93,7 +92,6 @@ class BoltSession final const storage::Storage *db_; #endif query::Interpreter interpreter_; - query::TransactionEngine transaction_engine_; #ifndef MG_SINGLE_NODE_HA auth::Auth *auth_; std::optional<auth::User> user_; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index b88dcfa27..7085efac8 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -838,10 +838,79 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) CHECK(interpreter_context_) << "Interpreter context must not be NULL"; } -Interpreter::Results Interpreter::operator()( - const std::string &query_string, DbAccessor *db_accessor, +std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>> +Interpreter::Interpret(const std::string &query, + const std::map<std::string, PropertyValue> ¶ms) { + // Clear pending results. + results_ = std::nullopt; + execution_memory_.Release(); + + // Check the query for transaction commands. + auto query_upper = utils::Trim(utils::ToUpperCase(query)); + if (query_upper == "BEGIN") { + if (in_explicit_transaction_) { + throw QueryException("Nested transactions are not supported."); + } + in_explicit_transaction_ = true; + expect_rollback_ = false; + return {}; + } else if (query_upper == "COMMIT") { + if (!in_explicit_transaction_) { + throw QueryException("No current transaction to commit."); + } + if (expect_rollback_) { + throw QueryException( + "Transaction can't be committed because there was a previous " + "error. Please invoke a rollback instead."); + } + + try { + Commit(); + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } + + expect_rollback_ = false; + in_explicit_transaction_ = false; + return {}; + } else if (query_upper == "ROLLBACK") { + if (!in_explicit_transaction_) { + throw QueryException("No current transaction to rollback."); + } + Abort(); + expect_rollback_ = false; + in_explicit_transaction_ = false; + return {}; + } + + // Any other query in an explicit transaction block advances the command. + if (in_explicit_transaction_ && db_accessor_) AdvanceCommand(); + + // Create a DB accessor if we don't yet have one. + if (!db_accessor_) { + db_accessor_.emplace(interpreter_context_->db->Access()); + execution_db_accessor_.emplace(&*db_accessor_); + } + + // Clear leftover results. + results_ = std::nullopt; + execution_memory_.Release(); + + // Interpret the query and return the headers. + try { + results_.emplace(Prepare(query, params, &*execution_db_accessor_)); + return {results_->header(), results_->privileges()}; + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } +} + +Interpreter::Results Interpreter::Prepare( + const std::string &query_string, const std::map<std::string, PropertyValue> ¶ms, - bool in_explicit_transaction, utils::MemoryResource *execution_memory) { + DbAccessor *db_accessor) { AstStorage ast_storage; Parameters parameters; std::map<std::string, TypedValue> summary; @@ -899,7 +968,7 @@ Interpreter::Results Interpreter::operator()( return Results(db_accessor, parameters, plan, std::move(output_symbols), std::move(header), std::move(summary), - parsed_query.required_privileges, execution_memory); + parsed_query.required_privileges, &execution_memory_); } if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) { @@ -970,7 +1039,7 @@ Interpreter::Results Interpreter::operator()( return Results(db_accessor, parameters, plan, std::move(output_symbols), std::move(header), std::move(summary), - parsed_query.required_privileges, execution_memory); + parsed_query.required_privileges, &execution_memory_); } if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) { @@ -980,7 +1049,7 @@ Interpreter::Results Interpreter::operator()( << "Expected stripped query to start with '" << kProfileQueryStart << "'"; - if (in_explicit_transaction) { + if (in_explicit_transaction_) { throw ProfileInMulticommandTxException(); } @@ -1052,7 +1121,7 @@ Interpreter::Results Interpreter::operator()( return Results(db_accessor, parameters, plan, std::move(output_symbols), std::move(header), std::move(summary), - parsed_query.required_privileges, execution_memory, + parsed_query.required_privileges, &execution_memory_, /* is_profile_query */ true, /* should_abort_query */ true); } @@ -1073,7 +1142,7 @@ Interpreter::Results Interpreter::operator()( return Results(db_accessor, parameters, plan, std::move(output_symbols), std::move(header), std::move(summary), - parsed_query.required_privileges, execution_memory, + parsed_query.required_privileges, &execution_memory_, /* is_profile_query */ false, /* should_abort_query */ false); #else @@ -1083,7 +1152,7 @@ Interpreter::Results Interpreter::operator()( Callback callback; if (auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query)) { - if (in_explicit_transaction) { + if (in_explicit_transaction_) { throw IndexInMulticommandTxException(); } // Creating an index influences computed plan costs. @@ -1103,7 +1172,7 @@ Interpreter::Results Interpreter::operator()( "Managing user privileges is not yet supported in Memgraph HA " "instance."); #else - if (in_explicit_transaction) { + if (in_explicit_transaction_) { throw UserModificationInMulticommandTxException(); } callback = HandleAuthQuery(auth_query, interpreter_context_->auth, @@ -1115,7 +1184,7 @@ Interpreter::Results Interpreter::operator()( throw utils::NotYetImplemented( "Graph streams are not yet supported in Memgraph HA instance."); #else - if (in_explicit_transaction) { + if (in_explicit_transaction_) { throw StreamClauseInMulticommandTxException(); } callback = @@ -1150,10 +1219,63 @@ Interpreter::Results Interpreter::operator()( return Results(db_accessor, parameters, plan, std::move(output_symbols), callback.header, std::move(summary), - parsed_query.required_privileges, execution_memory, + parsed_query.required_privileges, &execution_memory_, /* is_profile_query */ false, callback.should_abort_query); } +void Interpreter::Abort() { + results_ = std::nullopt; + execution_memory_.Release(); + expect_rollback_ = false; + in_explicit_transaction_ = false; + if (!db_accessor_) return; + db_accessor_->Abort(); + execution_db_accessor_ = std::nullopt; + db_accessor_ = std::nullopt; +} + +void Interpreter::Commit() { + results_ = std::nullopt; + execution_memory_.Release(); + if (!db_accessor_) return; +#ifdef MG_SINGLE_NODE_V2 + auto maybe_constraint_violation = db_accessor_->Commit(); + if (maybe_constraint_violation.HasError()) { + const auto &constraint_violation = maybe_constraint_violation.GetError(); + auto label_name = + execution_db_accessor_->LabelToName(constraint_violation.label); + auto property_name = + execution_db_accessor_->PropertyToName(constraint_violation.property); + execution_db_accessor_ = std::nullopt; + db_accessor_ = std::nullopt; + throw QueryException( + "Unable to commit due to existence constraint violation on :{}({}).", + label_name, property_name); + } +#else + db_accessor_->Commit(); +#endif + execution_db_accessor_ = std::nullopt; + db_accessor_ = std::nullopt; +} + +void Interpreter::AdvanceCommand() { + results_ = std::nullopt; + execution_memory_.Release(); + if (!db_accessor_) return; + db_accessor_->AdvanceCommand(); +} + +void Interpreter::AbortCommand() { + results_ = std::nullopt; + execution_memory_.Release(); + if (in_explicit_transaction_) { + expect_rollback_ = true; + } else { + Abort(); + } +} + std::shared_ptr<CachedPlan> Interpreter::CypherQueryToPlan( HashType query_hash, CypherQuery *query, AstStorage ast_storage, const Parameters ¶meters, DbAccessor *db_accessor) { diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 7a5868c78..0e316f65b 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -2,6 +2,8 @@ #include <gflags/gflags.h> +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" #include "query/context.hpp" #include "query/db_accessor.hpp" #include "query/frontend/ast/ast.hpp" @@ -9,6 +11,8 @@ #include "query/frontend/stripped.hpp" #include "query/interpret/frame.hpp" #include "query/plan/operator.hpp" +#include "utils/likely.hpp" +#include "utils/memory.hpp" #include "utils/skip_list.hpp" #include "utils/spin_lock.hpp" #include "utils/timer.hpp" @@ -27,6 +31,8 @@ class Streams; namespace query { +static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; + // TODO: Maybe this should move to query/plan/planner. /// Interface for accessing the root operator of a logical plan. class LogicalPlan { @@ -104,6 +110,21 @@ struct PlanCacheEntry { * been passed to an `Interpreter` instance. */ struct InterpreterContext { +#ifdef MG_SINGLE_NODE_V2 + explicit InterpreterContext(storage::Storage *db) +#else + explicit InterpreterContext(database::GraphDb *db) +#endif + : db(db) { + CHECK(db) << "Storage must not be NULL"; + } + +#ifdef MG_SINGLE_NODE_V2 + storage::Storage *db; +#else + database::GraphDb *db; +#endif + // Antlr has singleton instance that is shared between threads. It is // protected by locks inside of antlr. Unfortunately, they are not protected // in a very good way. Once we have antlr version without race conditions we @@ -266,16 +287,53 @@ class Interpreter { Interpreter(Interpreter &&) = delete; Interpreter &operator=(Interpreter &&) = delete; - virtual ~Interpreter() {} + virtual ~Interpreter() { Abort(); } + + std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>> + Interpret(const std::string &query, + const std::map<std::string, PropertyValue> ¶ms); /** * Generates an Results object for the parameters. The resulting object * can be Pulled with its results written to an arbitrary stream. */ - virtual Results operator()(const std::string &query, DbAccessor *db_accessor, - const std::map<std::string, PropertyValue> ¶ms, - bool in_explicit_transaction, - utils::MemoryResource *execution_memory); + virtual Results Prepare(const std::string &query, + const std::map<std::string, PropertyValue> ¶ms, + DbAccessor *db_accessor); + + template <typename TStream> + std::map<std::string, TypedValue> PullAll(TStream *result_stream) { + // If we don't have any results (eg. a transaction command preceeded), + // return an empty summary. + if (UNLIKELY(!results_)) return {}; + + // Stream all results and return the summary. + try { + results_->PullAll(*result_stream); + // Make a copy of the summary because the `Commit` call will destroy the + // `results_` object. + auto summary = results_->summary(); + if (!in_explicit_transaction_) { + if (results_->ShouldAbortQuery()) { + Abort(); + } else { + Commit(); + } + } + + return summary; +#ifdef MG_SINGLE_NODE_HA + } catch (const query::HintedAbortError &) { + AbortCommand(); + throw utils::BasicException("Transaction was asked to abort."); +#endif + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } + } + + void Abort(); protected: std::pair<frontend::StrippedQuery, ParsedQuery> StripAndParseQuery( @@ -299,6 +357,25 @@ class Interpreter { private: InterpreterContext *interpreter_context_; +#ifdef MG_SINGLE_NODE_V2 + std::optional<storage::Storage::Accessor> db_accessor_; +#else + std::optional<database::GraphDbAccessor> db_accessor_; +#endif + std::optional<DbAccessor> execution_db_accessor_; + // The `query::Interpreter::Results` object MUST be destroyed before the + // `database::GraphDbAccessor` is destroyed because the `Results` object holds + // references to the `GraphDb` object and will crash the database when + // destructed if you are not careful. + std::optional<Results> results_; + bool in_explicit_transaction_{false}; + bool expect_rollback_{false}; + utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize}; + + void Commit(); + void AdvanceCommand(); + void AbortCommand(); + // high level tree -> CachedPlan std::shared_ptr<CachedPlan> CypherQueryToPlan(HashType query_hash, CypherQuery *query, diff --git a/src/query/repl.cpp b/src/query/repl.cpp index 14c856b8b..831bd2043 100644 --- a/src/query/repl.cpp +++ b/src/query/repl.cpp @@ -61,16 +61,12 @@ void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) { // regular cypher queries try { - auto dba = db->Access(); ResultStreamFaker<query::TypedValue> stream; - DbAccessor execution_dba(&dba); - auto results = (*interpreter)(command, &execution_dba, {}, false, - utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + auto [header, _] = interpreter->Interpret(command, {}); + stream.Header(header); + auto summary = interpreter->PullAll(&stream); + stream.Summary(summary); std::cout << stream; - dba.Commit(); } catch (const query::SyntaxException &e) { std::cout << "SYNTAX EXCEPTION: " << e.what() << std::endl; } catch (const query::LexingException &e) { diff --git a/src/query/transaction_engine.hpp b/src/query/transaction_engine.hpp deleted file mode 100644 index b286d77de..000000000 --- a/src/query/transaction_engine.hpp +++ /dev/null @@ -1,207 +0,0 @@ -#pragma once - -#include "database/graph_db.hpp" -#include "database/graph_db_accessor.hpp" -#include "query/exceptions.hpp" -#include "query/interpreter.hpp" -#include "utils/likely.hpp" -#include "utils/memory.hpp" -#include "utils/string.hpp" - -namespace query { - -static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; - -class TransactionEngine final { - public: -#ifdef MG_SINGLE_NODE_V2 - TransactionEngine(storage::Storage *db, Interpreter *interpreter) -#else - TransactionEngine(database::GraphDb *db, Interpreter *interpreter) -#endif - : db_(db), - interpreter_(interpreter), - execution_memory_(&initial_memory_block_[0], - kExecutionMemoryBlockSize) {} - - ~TransactionEngine() { Abort(); } - - std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>> - Interpret(const std::string &query, - const std::map<std::string, PropertyValue> ¶ms) { - // Clear pending results. - results_ = std::nullopt; - execution_memory_.Release(); - - // Check the query for transaction commands. - auto query_upper = utils::Trim(utils::ToUpperCase(query)); - if (query_upper == "BEGIN") { - if (in_explicit_transaction_) { - throw QueryException("Nested transactions are not supported."); - } - in_explicit_transaction_ = true; - expect_rollback_ = false; - return {}; - } else if (query_upper == "COMMIT") { - if (!in_explicit_transaction_) { - throw QueryException("No current transaction to commit."); - } - if (expect_rollback_) { - throw QueryException( - "Transaction can't be committed because there was a previous " - "error. Please invoke a rollback instead."); - } - - try { - Commit(); - } catch (const utils::BasicException &) { - AbortCommand(); - throw; - } - - expect_rollback_ = false; - in_explicit_transaction_ = false; - return {}; - } else if (query_upper == "ROLLBACK") { - if (!in_explicit_transaction_) { - throw QueryException("No current transaction to rollback."); - } - Abort(); - expect_rollback_ = false; - in_explicit_transaction_ = false; - return {}; - } - - // Any other query in an explicit transaction block advances the command. - if (in_explicit_transaction_ && db_accessor_) AdvanceCommand(); - - // Create a DB accessor if we don't yet have one. - if (!db_accessor_) { - db_accessor_.emplace(db_->Access()); - execution_db_accessor_.emplace(&*db_accessor_); - } - - // Clear leftover results. - results_ = std::nullopt; - execution_memory_.Release(); - - // Interpret the query and return the headers. - try { - results_.emplace((*interpreter_)(query, &*execution_db_accessor_, params, - in_explicit_transaction_, - &execution_memory_)); - return {std::move(results_->header()), std::move(results_->privileges())}; - } catch (const utils::BasicException &) { - AbortCommand(); - throw; - } - } - - template <typename TStream> - std::map<std::string, TypedValue> PullAll(TStream *result_stream) { - // If we don't have any results (eg. a transaction command preceeded), - // return an empty summary. - if (UNLIKELY(!results_)) return {}; - - // Stream all results and return the summary. - try { - results_->PullAll(*result_stream); - // Make a copy of the summary because the `Commit` call will destroy the - // `results_` object. - auto summary = results_->summary(); - if (!in_explicit_transaction_) { - if (results_->ShouldAbortQuery()) { - Abort(); - } else { - Commit(); - } - } - - return summary; -#ifdef MG_SINGLE_NODE_HA - } catch (const query::HintedAbortError &) { - AbortCommand(); - throw utils::BasicException("Transaction was asked to abort."); -#endif - } catch (const utils::BasicException &) { - AbortCommand(); - throw; - } - } - - void Abort() { - results_ = std::nullopt; - execution_memory_.Release(); - expect_rollback_ = false; - in_explicit_transaction_ = false; - if (!db_accessor_) return; - db_accessor_->Abort(); - execution_db_accessor_ = std::nullopt; - db_accessor_ = std::nullopt; - } - - private: -#ifdef MG_SINGLE_NODE_V2 - storage::Storage *db_{nullptr}; - std::optional<storage::Storage::Accessor> db_accessor_; -#else - database::GraphDb *db_{nullptr}; - std::optional<database::GraphDbAccessor> db_accessor_; -#endif - std::optional<DbAccessor> execution_db_accessor_; - Interpreter *interpreter_{nullptr}; - // The `query::Interpreter::Results` object MUST be destroyed before the - // `database::GraphDbAccessor` is destroyed because the `Results` object holds - // references to the `GraphDb` object and will crash the database when - // destructed if you are not careful. - std::optional<query::Interpreter::Results> results_; - bool in_explicit_transaction_{false}; - bool expect_rollback_{false}; - - uint8_t initial_memory_block_[kExecutionMemoryBlockSize]; - utils::MonotonicBufferResource execution_memory_; - - void Commit() { - results_ = std::nullopt; - execution_memory_.Release(); - if (!db_accessor_) return; -#ifdef MG_SINGLE_NODE_V2 - auto maybe_constraint_violation = db_accessor_->Commit(); - if (maybe_constraint_violation.HasError()) { - const auto &constraint_violation = maybe_constraint_violation.GetError(); - auto label_name = execution_db_accessor_->LabelToName( - constraint_violation.label); - auto property_name = execution_db_accessor_->PropertyToName( - constraint_violation.property); - execution_db_accessor_ = std::nullopt; - db_accessor_ = std::nullopt; - throw QueryException( - "Unable to commit due to existence constraint violation on :{}({}).", - label_name, property_name); - } -#else - db_accessor_->Commit(); -#endif - execution_db_accessor_ = std::nullopt; - db_accessor_ = std::nullopt; - } - - void AdvanceCommand() { - results_ = std::nullopt; - execution_memory_.Release(); - if (!db_accessor_) return; - db_accessor_->AdvanceCommand(); - } - - void AbortCommand() { - results_ = std::nullopt; - execution_memory_.Release(); - if (in_explicit_transaction_) { - expect_rollback_ = true; - } else { - Abort(); - } - } -}; - -} // namespace query diff --git a/tests/benchmark/expansion.cpp b/tests/benchmark/expansion.cpp index 0dfc4425a..6e0dae542 100644 --- a/tests/benchmark/expansion.cpp +++ b/tests/benchmark/expansion.cpp @@ -13,8 +13,8 @@ class ExpansionBenchFixture : public benchmark::Fixture { // GraphDb shouldn't be global constructed/destructed. See // documentation in database/single_node/graph_db.hpp for details. std::optional<database::GraphDb> db_; - query::InterpreterContext interpreter_context_; - query::Interpreter interpreter_{&interpreter_context_}; + std::optional<query::InterpreterContext> interpreter_context_; + std::optional<query::Interpreter> interpreter_; void SetUp(const benchmark::State &state) override { db_.emplace(); @@ -30,6 +30,9 @@ class ExpansionBenchFixture : public benchmark::Fixture { dba.InsertEdge(start, dest, edge_type); } dba.Commit(); + + interpreter_context_.emplace(&*db_); + interpreter_.emplace(&*interpreter_context_); } void TearDown(const benchmark::State &) override { @@ -39,17 +42,16 @@ class ExpansionBenchFixture : public benchmark::Fixture { db_ = std::nullopt; } - auto &interpreter() { return interpreter_; } + auto &interpreter() { return *interpreter_; } }; BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) { auto query = "MATCH (s:Starting) return s"; - auto dba = db_->Access(); - query::DbAccessor query_dba(&dba); + while (state.KeepRunning()) { ResultStreamFaker<query::TypedValue> results; - interpreter()(query, &query_dba, {}, false, utils::NewDeleteResource()) - .PullAll(results); + interpreter().Interpret(query, {}); + interpreter().PullAll(&results); } } @@ -60,12 +62,11 @@ BENCHMARK_REGISTER_F(ExpansionBenchFixture, Match) BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) { auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)"; - auto dba = db_->Access(); - query::DbAccessor query_dba(&dba); + while (state.KeepRunning()) { ResultStreamFaker<query::TypedValue> results; - interpreter()(query, &query_dba, {}, false, utils::NewDeleteResource()) - .PullAll(results); + interpreter().Interpret(query, {}); + interpreter().PullAll(&results); } } diff --git a/tests/benchmark/query/eval.cpp b/tests/benchmark/query/eval.cpp index ccbaf8bbe..cbb7b5809 100644 --- a/tests/benchmark/query/eval.cpp +++ b/tests/benchmark/query/eval.cpp @@ -2,7 +2,7 @@ #include "query/db_accessor.hpp" #include "query/interpret/eval.hpp" -#include "query/transaction_engine.hpp" +#include "query/interpreter.hpp" // The following classes are wrappers for utils::MemoryResource, so that we can // use BENCHMARK_TEMPLATE diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index 48b8bfb24..49403acfe 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -11,7 +11,6 @@ #include "query/frontend/semantic/symbol_generator.hpp" #include "query/interpreter.hpp" #include "query/plan/planner.hpp" -#include "query/transaction_engine.hpp" // The following classes are wrappers for utils::MemoryResource, so that we can // use BENCHMARK_TEMPLATE diff --git a/tests/feature_benchmark/kafka/benchmark.cpp b/tests/feature_benchmark/kafka/benchmark.cpp index cd8493c50..e7c44fa4b 100644 --- a/tests/feature_benchmark/kafka/benchmark.cpp +++ b/tests/feature_benchmark/kafka/benchmark.cpp @@ -38,8 +38,8 @@ void KafkaBenchmarkMain() { audit::kBufferSizeDefault, audit::kBufferFlushIntervalMillisDefault}; - query::InterpreterContext interpreter_context; database::GraphDb db; + query::InterpreterContext interpreter_context{&db}; SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; std::atomic<int64_t> query_counter{0}; diff --git a/tests/manual/repl.cpp b/tests/manual/repl.cpp index 4dccdd935..b5c4595e5 100644 --- a/tests/manual/repl.cpp +++ b/tests/manual/repl.cpp @@ -297,7 +297,7 @@ int main(int argc, char *argv[]) { std::cout << "Generating graph..." << std::endl; // fill_db; random_generate(db, node_count, edge_count); - query::InterpreterContext interpreter_context; + query::InterpreterContext interpreter_context{&db}; query::Interpreter interpreter{&interpreter_context}; query::Repl(&db, &interpreter); return 0; diff --git a/tests/manual/single_query.cpp b/tests/manual/single_query.cpp index 6a964e86c..ec4a8d8ca 100644 --- a/tests/manual/single_query.cpp +++ b/tests/manual/single_query.cpp @@ -11,16 +11,17 @@ int main(int argc, char *argv[]) { std::cout << "Usage: ./single_query 'RETURN \"query here\"'" << std::endl; exit(1); } + database::GraphDb db; - auto dba = db.Access(); - query::DbAccessor query_dba(&dba); + query::InterpreterContext interpreter_context{&db}; + query::Interpreter interpreter{&interpreter_context}; + ResultStreamFaker<query::TypedValue> stream; - query::InterpreterContext interpreter_context; - auto results = query::Interpreter(&interpreter_context)( - argv[1], &query_dba, {}, false, utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + auto [header, _] = interpreter.Interpret(argv[1], {}); + stream.Header(header); + auto summary = interpreter.PullAll(&stream); + stream.Summary(summary); std::cout << stream; + return 0; } diff --git a/tests/unit/database_dump.cpp b/tests/unit/database_dump.cpp index ada7269e7..3293452fb 100644 --- a/tests/unit/database_dump.cpp +++ b/tests/unit/database_dump.cpp @@ -122,6 +122,9 @@ std::string DumpNext(CypherDumpGenerator *dump) { class DatabaseEnvironment { public: + DatabaseEnvironment() + : interpreter_context_{&db_}, interpreter_{&interpreter_context_} {} + GraphDbAccessor Access() { return db_.Access(); } DatabaseState GetState() { @@ -177,20 +180,28 @@ class DatabaseEnvironment { return {vertices, edges, indices, constraints}; } + /** + * Execute the given query and commit the transaction. + * + * Return the query stream. + */ + auto Execute(const std::string &query) { + ResultStreamFaker<query::TypedValue> stream; + + auto [header, _] = interpreter_.Interpret(query, {}); + stream.Header(header); + auto summary = interpreter_.PullAll(&stream); + stream.Summary(summary); + + return stream; + } + private: database::GraphDb db_; + query::InterpreterContext interpreter_context_; + query::Interpreter interpreter_; }; -void Execute(GraphDbAccessor *dba, const std::string &query) { - CHECK(dba); - ResultStreamFaker<query::TypedValue> results; - query::DbAccessor query_dba(dba); - query::InterpreterContext interpreter_context; - query::Interpreter (&interpreter_context)(query, &query_dba, {}, false, - utils::NewDeleteResource()) - .PullAll(results); -} - VertexAccessor CreateVertex(GraphDbAccessor *dba, const std::vector<std::string> &labels, const std::map<std::string, PropertyValue> &props, @@ -444,8 +455,8 @@ TEST(DumpTest, IndicesKeys) { { auto dba = db.Access(); CreateVertex(&dba, {"Label1", "Label2"}, {{"p", PropertyValue(1)}}, false); - Execute(&dba, "CREATE INDEX ON :Label1(prop);"); - Execute(&dba, "CREATE INDEX ON :Label2(prop);"); + dba.BuildIndex(dba.Label("Label1"), dba.Property("prop")); + dba.BuildIndex(dba.Label("Label2"), dba.Property("prop")); dba.Commit(); } @@ -470,11 +481,10 @@ TEST(DumpTest, UniqueConstraints) { { auto dba = db.Access(); CreateVertex(&dba, {"Label"}, {{"prop", PropertyValue(1)}}, false); - Execute(&dba, "CREATE CONSTRAINT ON (u:Label) ASSERT u.prop IS UNIQUE;"); + dba.BuildUniqueConstraint(dba.Label("Label"), {dba.Property("prop")}); // Create one with multiple properties. - Execute( - &dba, - "CREATE CONSTRAINT ON (u:Label) ASSERT u.prop1, u.prop2 IS UNIQUE;"); + dba.BuildUniqueConstraint(dba.Label("Label"), + {dba.Property("prop1"), dba.Property("prop2")}); dba.Commit(); } @@ -514,11 +524,10 @@ TEST(DumpTest, CheckStateVertexWithMultipleProperties) { auto dba = db.Access(); query::DbAccessor query_dba(&dba); CypherDumpGenerator dump(&query_dba); + std::string cmd; while (!(cmd = DumpNext(&dump)).empty()) { - auto dba_dump = db_dump.Access(); - Execute(&dba_dump, cmd); - dba_dump.Commit(); + db_dump.Execute(cmd); } } EXPECT_EQ(db.GetState(), db_dump.GetState()); @@ -545,10 +554,11 @@ TEST(DumpTest, CheckStateSimpleGraph) { CreateEdge(&dba, z, u, "Knows", {}); CreateEdge(&dba, w, z, "Knows", {{"how", PropertyValue("school")}}); CreateEdge(&dba, w, z, "Likes", {{"how", PropertyValue("very much")}}); + // Create few indices - Execute(&dba, "CREATE CONSTRAINT ON (u:Person) ASSERT u.name IS UNIQUE;"); - Execute(&dba, "CREATE INDEX ON :Person(id);"); - Execute(&dba, "CREATE INDEX ON :Person(unexisting_property);"); + dba.BuildUniqueConstraint(dba.Label("Person"), {dba.Property("name")}); + dba.BuildIndex(dba.Label("Person"), dba.Property("id")); + dba.BuildIndex(dba.Label("Person"), dba.Property("unexisting_property")); } const auto &db_initial_state = db.GetState(); @@ -557,11 +567,10 @@ TEST(DumpTest, CheckStateSimpleGraph) { auto dba = db.Access(); query::DbAccessor query_dba(&dba); CypherDumpGenerator dump(&query_dba); + std::string cmd; while (!(cmd = DumpNext(&dump)).empty()) { - auto dba_dump = db_dump.Access(); - Execute(&dba_dump, cmd); - dba_dump.Commit(); + db_dump.Execute(cmd); } } EXPECT_EQ(db.GetState(), db_dump.GetState()); @@ -579,17 +588,7 @@ TEST(DumpTest, ExecuteDumpDatabase) { } { - auto dba = db.Access(); - query::DbAccessor query_dba(&dba); - const std::string query = "DUMP DATABASE"; - ResultStreamFaker<query::TypedValue> stream; - query::InterpreterContext interpreter_context; - auto results = query::Interpreter(&interpreter_context)( - query, &query_dba, {}, false, utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); - + auto stream = db.Execute("DUMP DATABASE"); EXPECT_EQ(stream.GetResults().size(), 4U); ASSERT_EQ(stream.GetHeader().size(), 1U); EXPECT_EQ(stream.GetHeader()[0], "QUERY"); diff --git a/tests/unit/database_transaction_timeout.cpp b/tests/unit/database_transaction_timeout.cpp index 8d095f404..37a93297b 100644 --- a/tests/unit/database_transaction_timeout.cpp +++ b/tests/unit/database_transaction_timeout.cpp @@ -10,25 +10,8 @@ DECLARE_int32(query_execution_time_sec); TEST(TransactionTimeout, TransactionTimeout) { FLAGS_query_execution_time_sec = 3; database::GraphDb db; - query::InterpreterContext interpreter_context; - query::Interpreter interpreter(&interpreter_context); - auto interpret = [&](auto &dba, const std::string &query) { - query::DbAccessor query_dba(&dba); - ResultStreamFaker<query::TypedValue> stream; - interpreter(query, &query_dba, {}, false, utils::NewDeleteResource()) - .PullAll(stream); - }; - { - auto dba = db.Access(); - interpret(dba, "MATCH (n) RETURN n"); - } - { - auto dba = db.Access(); - std::this_thread::sleep_for(std::chrono::seconds(5)); - ASSERT_THROW(interpret(dba, "MATCH (n) RETURN n"), query::HintedAbortError); - } - { - auto dba = db.Access(); - interpret(dba, "MATCH (n) RETURN n"); - } + + auto dba = db.Access(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + ASSERT_TRUE(dba.should_abort()); } diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index b47105b58..c04ee0191 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -15,19 +15,23 @@ class InterpreterTest : public ::testing::Test { protected: database::GraphDb db_; - query::InterpreterContext interpreter_context_; + query::InterpreterContext interpreter_context_{&db_}; query::Interpreter interpreter_{&interpreter_context_}; + /** + * Execute the given query and commit the transaction. + * + * Return the query stream. + */ auto Interpret(const std::string &query, const std::map<std::string, PropertyValue> ¶ms = {}) { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); ResultStreamFaker<query::TypedValue> stream; - auto results = interpreter_(query, &query_dba, params, false, - utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + + auto [header, _] = interpreter_.Interpret(query, params); + stream.Header(header); + auto summary = interpreter_.PullAll(&stream); + stream.Summary(summary); + return stream; } }; @@ -208,21 +212,15 @@ TEST_F(InterpreterTest, Bfs) { dba.Commit(); } - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - ResultStreamFaker<query::TypedValue> stream; - auto results = interpreter_( + auto stream = Interpret( "MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and " - "e.reachable)]->(m) RETURN r", - &query_dba, {}, false, utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + "e.reachable)]->(m) RETURN r"); ASSERT_EQ(stream.GetHeader().size(), 1U); EXPECT_EQ(stream.GetHeader()[0], "r"); ASSERT_EQ(stream.GetResults().size(), 5 * kNumNodesPerLevel); + auto dba = db_.Access(); int expected_level = 1; int remaining_nodes_in_level = kNumNodesPerLevel; std::unordered_set<int64_t> matched_ids; @@ -254,43 +252,26 @@ TEST_F(InterpreterTest, Bfs) { TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) { ResultStreamFaker<query::TypedValue> stream; - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - ASSERT_THROW(interpreter_("CREATE INDEX ON :X(y)", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream), + interpreter_.Interpret("BEGIN", {}); + ASSERT_THROW(interpreter_.Interpret("CREATE INDEX ON :X(y)", {}), query::IndexInMulticommandTxException); + interpreter_.Interpret("ROLLBACK", {}); } // Test shortest path end to end. TEST_F(InterpreterTest, ShortestPath) { - { - ResultStreamFaker<query::TypedValue> stream; - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_( - "CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 " - "}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)", - &query_dba, {}, true, utils::NewDeleteResource()) - .PullAll(stream); + Interpret( + "CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 " + "}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)"); - dba.Commit(); - } - - ResultStreamFaker<query::TypedValue> stream; - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - auto results = - interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", - &query_dba, {}, false, utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + auto stream = + Interpret("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e"); ASSERT_EQ(stream.GetHeader().size(), 1U); EXPECT_EQ(stream.GetHeader()[0], "e"); ASSERT_EQ(stream.GetResults().size(), 3U); + auto dba = db_.Access(); std::vector<std::vector<std::string>> expected_results{ {"r1"}, {"r2"}, {"r1", "r2"}}; @@ -316,65 +297,13 @@ TEST_F(InterpreterTest, ShortestPath) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(InterpreterTest, UniqueConstraintTest) { - ResultStreamFaker<query::TypedValue> stream; - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;", - &query_dba, {}, true, utils::NewDeleteResource()) - .PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_("CREATE (:A{a:1, b:1})", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_("CREATE (:A{a:2, b:2})", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - ASSERT_THROW(interpreter_("CREATE (:A{a:1, b:1})", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream), - query::QueryRuntimeException); - dba.Commit(); - } - - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - interpreter_("CREATE (:A{a:2, b:2})", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - query::DbAccessor query_dba(&dba); - interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - interpreter_("CREATE (n:A{a:2, b:2})", &query_dba, {}, true, - utils::NewDeleteResource()) - .PullAll(stream); - dba.Commit(); - } + Interpret("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;"); + Interpret("CREATE (:A{a:1, b:1})"); + Interpret("CREATE (:A{a:2, b:2})"); + ASSERT_THROW(Interpret("CREATE (:A{a:1, b:1})"), + query::QueryRuntimeException); + Interpret("MATCH (n:A{a:2, b:2}) SET n.a=1"); + Interpret("CREATE (:A{a:2, b:2})"); + Interpret("MATCH (n:A{a:2, b:2}) DETACH DELETE n"); + Interpret("CREATE (n:A{a:2, b:2})"); } diff --git a/tests/unit/query_plan_edge_cases.cpp b/tests/unit/query_plan_edge_cases.cpp index ddc3eed93..69717e562 100644 --- a/tests/unit/query_plan_edge_cases.cpp +++ b/tests/unit/query_plan_edge_cases.cpp @@ -17,36 +17,34 @@ DECLARE_bool(query_cost_planner); class QueryExecution : public testing::Test { protected: std::optional<database::GraphDb> db_; - std::optional<database::GraphDbAccessor> dba_; + std::optional<query::InterpreterContext> interpreter_context_; + std::optional<query::Interpreter> interpreter_; void SetUp() { db_.emplace(); - dba_.emplace(db_->Access()); + interpreter_context_.emplace(&*db_); + interpreter_.emplace(&*interpreter_context_); } void TearDown() { - dba_ = std::nullopt; + interpreter_ = std::nullopt; + interpreter_context_ = std::nullopt; db_ = std::nullopt; } - /** Commits the current transaction and refreshes the dba_ - * variable to hold a new accessor with a new transaction */ - void Commit() { - dba_->Commit(); - dba_ = db_->Access(); - } - - /** Executes the query and returns the results. - * Does NOT commit the transaction */ + /** + * Execute the given query and commit the transaction. + * + * Return the query results. + */ auto Execute(const std::string &query) { - query::DbAccessor query_dba(&*dba_); ResultStreamFaker<query::TypedValue> stream; - query::InterpreterContext interpreter_context; - auto results = query::Interpreter(&interpreter_context)( - query, &query_dba, {}, false, utils::NewDeleteResource()); - stream.Header(results.header()); - results.PullAll(stream); - stream.Summary(results.summary()); + + auto [header, _] = interpreter_->Interpret(query, {}); + stream.Header(header); + auto summary = interpreter_->PullAll(&stream); + stream.Summary(summary); + return stream.GetResults(); } }; @@ -58,7 +56,6 @@ TEST_F(QueryExecution, MissingOptionalIntoExpand) { Execute( "CREATE (a:Person {id: 1}), (b:Person " "{id:2})-[:Has]->(:Dog)-[:Likes]->(:Food )"); - Commit(); ASSERT_EQ(Execute("MATCH (n) RETURN n").size(), 4); auto Exec = [this](bool desc, const std::string &edge_pattern) { @@ -90,7 +87,6 @@ TEST_F(QueryExecution, EdgeUniquenessInOptional) { // due to optonal match. Since edge-uniqueness only happens in one OPTIONAL // MATCH, we only need to check that scenario. Execute("CREATE (), ()-[:Type]->()"); - Commit(); ASSERT_EQ(Execute("MATCH (n) RETURN n").size(), 3); EXPECT_EQ(Execute("MATCH (n) OPTIONAL MATCH (n)-[r1]->(), (n)-[r2]->() " "RETURN n, r1, r2")