diff --git a/src/memgraph_init.cpp b/src/memgraph_init.cpp index 28284d5e0..ce3c54018 100644 --- a/src/memgraph_init.cpp +++ b/src/memgraph_init.cpp @@ -126,7 +126,9 @@ void KafkaStreamWriter( for (const auto &kv : params) params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second)); try { - (*session_data.interpreter)(query, dba, params_pv, false).PullAll(stream); + (*session_data.interpreter)(query, dba, params_pv, false, + utils::NewDeleteResource()) + .PullAll(stream); dba.Commit(); } catch (const utils::BasicException &e) { LOG(WARNING) << "[Kafka] query execution failed with an exception: " diff --git a/src/query/context.hpp b/src/query/context.hpp index d0dd49cbe..101f6fb8f 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -7,8 +7,6 @@ namespace query { -static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; - struct EvaluationContext { /// Memory for allocations during evaluation of a *single* Pull call. /// diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 1ee23236f..26c1a8e66 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -775,7 +775,7 @@ Interpreter::Interpreter() : is_tsc_available_(utils::CheckAvailableTSC()) {} Interpreter::Results Interpreter::operator()( const std::string &query_string, database::GraphDbAccessor &db_accessor, const std::map ¶ms, - bool in_explicit_transaction) { + bool in_explicit_transaction, utils::MemoryResource *execution_memory) { AstStorage ast_storage; Parameters parameters; std::map summary; @@ -832,7 +832,7 @@ Interpreter::Results Interpreter::operator()( } return Results(&db_accessor, parameters, plan, output_symbols, header, - summary, parsed_query.required_privileges); + summary, parsed_query.required_privileges, execution_memory); } if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) { @@ -902,7 +902,7 @@ Interpreter::Results Interpreter::operator()( std::vector header{query_plan_symbol.name()}; return Results(&db_accessor, parameters, plan, output_symbols, header, - summary, parsed_query.required_privileges); + summary, parsed_query.required_privileges, execution_memory); } if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) { @@ -983,7 +983,7 @@ Interpreter::Results Interpreter::operator()( summary["planning_time"] = planning_time.count(); return Results(&db_accessor, parameters, plan, output_symbols, header, - summary, parsed_query.required_privileges, + summary, parsed_query.required_privileges, execution_memory, /* is_profile_query */ true, /* should_abort_query */ true); } @@ -1005,7 +1005,7 @@ Interpreter::Results Interpreter::operator()( summary["planning_time"] = planning_timer.Elapsed().count(); return Results(&db_accessor, parameters, plan, output_symbols, header, - summary, parsed_query.required_privileges, + summary, parsed_query.required_privileges, execution_memory, /* is_profile_query */ false, /* should_abort_query */ false); #else @@ -1079,6 +1079,7 @@ Interpreter::Results Interpreter::operator()( return Results(&db_accessor, parameters, plan, output_symbols, callback.header, summary, parsed_query.required_privileges, + execution_memory, /* is_profile_query */ false, callback.should_abort_query); } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 23a5b84b5..553782b83 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -88,13 +88,12 @@ class Interpreter { std::vector output_symbols, std::vector header, std::map summary, std::vector privileges, + utils::MemoryResource *execution_memory, bool is_profile_query = false, bool should_abort_query = false) : ctx_{db_accessor}, plan_(plan), - execution_memory_(std::make_unique( - kExecutionMemoryBlockSize)), - cursor_(plan_->plan().MakeCursor(execution_memory_.get())), - frame_(plan_->symbol_table().max_position(), execution_memory_.get()), + cursor_(plan_->plan().MakeCursor(execution_memory)), + frame_(plan_->symbol_table().max_position(), execution_memory), output_symbols_(output_symbols), header_(header), summary_(summary), @@ -185,9 +184,6 @@ class Interpreter { private: ExecutionContext ctx_; std::shared_ptr plan_; - // execution_memory_ is unique_ptr, because we are passing the address to - // cursor_, and we want to preserve the pointer in case we get moved. - std::unique_ptr execution_memory_; query::plan::UniqueCursorPtr cursor_; Frame frame_; std::vector output_symbols_; @@ -217,7 +213,8 @@ class Interpreter { virtual Results operator()(const std::string &query, database::GraphDbAccessor &db_accessor, const std::map ¶ms, - bool in_explicit_transaction); + bool in_explicit_transaction, + utils::MemoryResource *execution_memory); auth::Auth *auth_ = nullptr; integrations::kafka::Streams *kafka_streams_ = nullptr; diff --git a/src/query/repl.cpp b/src/query/repl.cpp index 2a9ec0bc7..aa23a27e2 100644 --- a/src/query/repl.cpp +++ b/src/query/repl.cpp @@ -63,7 +63,8 @@ void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) { try { auto dba = db->Access(); ResultStreamFaker stream; - auto results = (*interpreter)(command, dba, {}, false); + auto results = + (*interpreter)(command, dba, {}, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); diff --git a/src/query/transaction_engine.hpp b/src/query/transaction_engine.hpp index 6d2ca0b06..26b5c42f7 100644 --- a/src/query/transaction_engine.hpp +++ b/src/query/transaction_engine.hpp @@ -5,14 +5,20 @@ #include "query/exceptions.hpp" #include "query/interpreter.hpp" #include "utils/likely.hpp" +#include "utils/memory.hpp" #include "utils/string.hpp" namespace query { +static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; + class TransactionEngine final { public: TransactionEngine(database::GraphDb *db, Interpreter *interpreter) - : db_(db), interpreter_(interpreter) {} + : db_(db), + interpreter_(interpreter), + execution_memory_(&initial_memory_block_[0], + kExecutionMemoryBlockSize) {} ~TransactionEngine() { Abort(); } @@ -21,6 +27,7 @@ class TransactionEngine final { const std::map ¶ms) { // Clear pending results. results_ = std::nullopt; + execution_memory_.Release(); // Check the query for transaction commands. auto query_upper = utils::Trim(utils::ToUpperCase(query)); @@ -67,10 +74,15 @@ class TransactionEngine final { // Create a DB accessor if we don't yet have one. if (!db_accessor_) db_accessor_.emplace(db_->Access()); + // Clear leftover results. + results_ = std::nullopt; + execution_memory_.Release(); + // Interpret the query and return the headers. try { results_.emplace((*interpreter_)(query, *db_accessor_, params, - in_explicit_transaction_)); + in_explicit_transaction_, + &execution_memory_)); return {results_->header(), results_->privileges()}; } catch (const utils::BasicException &) { AbortCommand(); @@ -112,6 +124,7 @@ class TransactionEngine final { void Abort() { results_ = std::nullopt; + execution_memory_.Release(); expect_rollback_ = false; in_explicit_transaction_ = false; if (!db_accessor_) return; @@ -131,8 +144,12 @@ class TransactionEngine final { bool in_explicit_transaction_{false}; bool expect_rollback_{false}; + uint8_t initial_memory_block_[kExecutionMemoryBlockSize]; + utils::MonotonicBufferResource execution_memory_; + void Commit() { results_ = std::nullopt; + execution_memory_.Release(); if (!db_accessor_) return; db_accessor_->Commit(); db_accessor_ = std::nullopt; @@ -140,12 +157,14 @@ class TransactionEngine final { void AdvanceCommand() { results_ = std::nullopt; + execution_memory_.Release(); if (!db_accessor_) return; db_accessor_->AdvanceCommand(); } void AbortCommand() { results_ = std::nullopt; + execution_memory_.Release(); if (in_explicit_transaction_) { expect_rollback_ = true; } else { diff --git a/tests/benchmark/expansion.cpp b/tests/benchmark/expansion.cpp index ee6d09f3f..9412786a1 100644 --- a/tests/benchmark/expansion.cpp +++ b/tests/benchmark/expansion.cpp @@ -46,7 +46,8 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) { auto dba = db_->Access(); while (state.KeepRunning()) { ResultStreamFaker results; - interpreter()(query, dba, {}, false).PullAll(results); + interpreter()(query, dba, {}, false, utils::NewDeleteResource()) + .PullAll(results); } } @@ -60,7 +61,8 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) { auto dba = db_->Access(); while (state.KeepRunning()) { ResultStreamFaker results; - interpreter()(query, dba, {}, false).PullAll(results); + interpreter()(query, dba, {}, false, utils::NewDeleteResource()) + .PullAll(results); } } diff --git a/tests/benchmark/query/eval.cpp b/tests/benchmark/query/eval.cpp index ed7368bcd..ff5e1dc2b 100644 --- a/tests/benchmark/query/eval.cpp +++ b/tests/benchmark/query/eval.cpp @@ -1,6 +1,7 @@ #include #include "query/interpret/eval.hpp" +#include "query/transaction_engine.hpp" // The following classes are wrappers for utils::MemoryResource, so that we can // use BENCHMARK_TEMPLATE diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index 9fe430ac8..f16899a8a 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -11,6 +11,7 @@ #include "query/frontend/semantic/symbol_generator.hpp" #include "query/interpreter.hpp" #include "query/plan/planner.hpp" +#include "query/transaction_engine.hpp" // The following classes are wrappers for utils::MemoryResource, so that we can // use BENCHMARK_TEMPLATE diff --git a/tests/manual/single_query.cpp b/tests/manual/single_query.cpp index 47de84e79..49a4f3398 100644 --- a/tests/manual/single_query.cpp +++ b/tests/manual/single_query.cpp @@ -14,7 +14,8 @@ int main(int argc, char *argv[]) { database::GraphDb db; auto dba = db.Access(); ResultStreamFaker stream; - auto results = query::Interpreter()(argv[1], dba, {}, false); + auto results = + query::Interpreter()(argv[1], dba, {}, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); diff --git a/tests/unit/database_dump.cpp b/tests/unit/database_dump.cpp index 63f8e0763..d15fda9d4 100644 --- a/tests/unit/database_dump.cpp +++ b/tests/unit/database_dump.cpp @@ -184,7 +184,8 @@ class DatabaseEnvironment { void Execute(GraphDbAccessor *dba, const std::string &query) { CHECK(dba); ResultStreamFaker results; - query::Interpreter()(query, *dba, {}, false).PullAll(results); + query::Interpreter()(query, *dba, {}, false, utils::NewDeleteResource()) + .PullAll(results); } VertexAccessor CreateVertex(GraphDbAccessor *dba, @@ -560,7 +561,8 @@ TEST(DumpTest, ExecuteDumpDatabase) { auto dba = db.Access(); const std::string query = "DUMP DATABASE"; ResultStreamFaker stream; - auto results = query::Interpreter()(query, dba, {}, false); + auto results = + query::Interpreter()(query, dba, {}, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); diff --git a/tests/unit/database_transaction_timeout.cpp b/tests/unit/database_transaction_timeout.cpp index bc9075514..f824f0ff9 100644 --- a/tests/unit/database_transaction_timeout.cpp +++ b/tests/unit/database_transaction_timeout.cpp @@ -13,7 +13,8 @@ TEST(TransactionTimeout, TransactionTimeout) { query::Interpreter interpreter; auto interpret = [&](auto &dba, const std::string &query) { ResultStreamFaker stream; - interpreter(query, dba, {}, false).PullAll(stream); + interpreter(query, dba, {}, false, utils::NewDeleteResource()) + .PullAll(stream); }; { auto dba = db.Access(); diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index f1df4169f..596f790dd 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -21,7 +21,8 @@ class InterpreterTest : public ::testing::Test { const std::map ¶ms = {}) { auto dba = db_.Access(); ResultStreamFaker stream; - auto results = interpreter_(query, dba, params, false); + auto results = + interpreter_(query, dba, params, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); @@ -203,7 +204,7 @@ TEST_F(InterpreterTest, Bfs) { auto results = interpreter_( "MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and " "e.reachable)]->(m) RETURN r", - dba, {}, false); + dba, {}, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); @@ -246,9 +247,10 @@ TEST_F(InterpreterTest, Bfs) { TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) { ResultStreamFaker stream; auto dba = db_.Access(); - ASSERT_THROW( - interpreter_("CREATE INDEX ON :X(y)", dba, {}, true).PullAll(stream), - query::IndexInMulticommandTxException); + ASSERT_THROW(interpreter_("CREATE INDEX ON :X(y)", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream), + query::IndexInMulticommandTxException); } // Test shortest path end to end. @@ -259,7 +261,7 @@ TEST_F(InterpreterTest, ShortestPath) { interpreter_( "CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 " "}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)", - dba, {}, true) + dba, {}, true, utils::NewDeleteResource()) .PullAll(stream); dba.Commit(); @@ -269,7 +271,7 @@ TEST_F(InterpreterTest, ShortestPath) { auto dba = db_.Access(); auto results = interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e", - dba, {}, false); + dba, {}, false, utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); @@ -307,44 +309,55 @@ TEST_F(InterpreterTest, UniqueConstraintTest) { { auto dba = db_.Access(); interpreter_("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;", dba, - {}, true) + {}, true, utils::NewDeleteResource()) .PullAll(stream); dba.Commit(); } { auto dba = db_.Access(); - interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true).PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true).PullAll(stream); - dba.Commit(); - } - - { - auto dba = db_.Access(); - ASSERT_THROW( - interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true).PullAll(stream), - query::QueryRuntimeException); - dba.Commit(); - } - - { - auto dba = db_.Access(); - interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", dba, {}, true) + interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true, + utils::NewDeleteResource()) .PullAll(stream); - interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true).PullAll(stream); dba.Commit(); } { auto dba = db_.Access(); - interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", dba, {}, true) + interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream); + dba.Commit(); + } + + { + auto dba = db_.Access(); + ASSERT_THROW(interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream), + query::QueryRuntimeException); + dba.Commit(); + } + + { + auto dba = db_.Access(); + interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream); + interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream); + dba.Commit(); + } + + { + auto dba = db_.Access(); + interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", dba, {}, true, + utils::NewDeleteResource()) + .PullAll(stream); + interpreter_("CREATE (n:A{a:2, b:2})", dba, {}, true, + utils::NewDeleteResource()) .PullAll(stream); - interpreter_("CREATE (n:A{a:2, b:2})", dba, {}, true).PullAll(stream); dba.Commit(); } } diff --git a/tests/unit/query_plan_edge_cases.cpp b/tests/unit/query_plan_edge_cases.cpp index 84c9e8927..d6953743b 100644 --- a/tests/unit/query_plan_edge_cases.cpp +++ b/tests/unit/query_plan_edge_cases.cpp @@ -40,7 +40,8 @@ class QueryExecution : public testing::Test { * Does NOT commit the transaction */ auto Execute(const std::string &query) { ResultStreamFaker stream; - auto results = query::Interpreter()(query, *dba_, {}, false); + auto results = query::Interpreter()(query, *dba_, {}, false, + utils::NewDeleteResource()); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary());