diff --git a/src/distributed/produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp index 2a74fa3fb..f1c0a7d61 100644 --- a/src/distributed/produce_rpc_server.cpp +++ b/src/distributed/produce_rpc_server.cpp @@ -18,7 +18,9 @@ ProduceRpcServer::OngoingProduce::OngoingProduce( context_{dba_.get()}, pull_symbols_(std::move(pull_symbols)), frame_(plan_pack.symbol_table.max_position()), - cursor_(plan_pack.plan->MakeCursor(*dba_)) { + execution_memory_(std::make_unique<utils::MonotonicBufferResource>( + query::kExecutionMemoryBlockSize)), + cursor_(plan_pack.plan->MakeCursor(dba_.get(), execution_memory_.get())) { context_.symbol_table = plan_pack.symbol_table; context_.evaluation_context.timestamp = timestamp; context_.evaluation_context.parameters = parameters; diff --git a/src/distributed/produce_rpc_server.hpp b/src/distributed/produce_rpc_server.hpp index c046bac9f..cc991950a 100644 --- a/src/distributed/produce_rpc_server.hpp +++ b/src/distributed/produce_rpc_server.hpp @@ -65,6 +65,9 @@ class ProduceRpcServer { query::Frame frame_; PullState cursor_state_{PullState::CURSOR_IN_PROGRESS}; std::vector<std::vector<query::TypedValue>> accumulation_; + // execution_memory_ is unique_ptr because we are passing the address to + // cursor_, and we want to preserve the pointer in case we get moved. + std::unique_ptr<utils::MonotonicBufferResource> execution_memory_; std::unique_ptr<query::plan::Cursor> cursor_; /// Pulls and returns a single result from the cursor. diff --git a/src/query/context.hpp b/src/query/context.hpp index 755699f24..bb119e5cc 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -7,6 +7,8 @@ namespace query { +static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; + struct EvaluationContext { int64_t timestamp{-1}; Parameters parameters; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 7c4684292..1f55a0195 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -871,8 +871,9 @@ Interpreter::Results Interpreter::operator()( auto output_plan = std::make_unique<plan::OutputTable>( output_symbols, [cypher_query_plan](Frame *frame, ExecutionContext *context) { - auto cursor = - cypher_query_plan->plan().MakeCursor(*context->db_accessor); + utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024); + auto cursor = cypher_query_plan->plan().MakeCursor( + context->db_accessor, &execution_memory); // We are pulling from another plan, so set up the EvaluationContext // correctly. The rest of the context should be good for sharing. diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 00200f2be..67c927931 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -93,7 +93,10 @@ class Interpreter { bool is_profile_query = false, bool should_abort_query = false) : ctx_{db_accessor}, plan_(plan), - cursor_(plan_->plan().MakeCursor(*db_accessor)), + execution_memory_(std::make_unique<utils::MonotonicBufferResource>( + kExecutionMemoryBlockSize)), + cursor_( + plan_->plan().MakeCursor(db_accessor, execution_memory_.get())), frame_(plan_->symbol_table().max_position()), output_symbols_(output_symbols), header_(header), @@ -176,6 +179,9 @@ class Interpreter { private: ExecutionContext ctx_; std::shared_ptr<CachedPlan> plan_; + // execution_memory_ is unique_ptr because we are passing the address to + // cursor_, and we want to preserve the pointer in case we get moved. + std::unique_ptr<utils::MonotonicBufferResource> execution_memory_; std::unique_ptr<query::plan::Cursor> cursor_; Frame frame_; std::vector<Symbol> output_symbols_; diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 2bfdbab44..09245f381 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -145,10 +145,6 @@ can serve as inputs to others and thus a sequence of operations is formed.") virtual std::unique_ptr<Cursor> MakeCursor( database::GraphDbAccessor *, utils::MemoryResource *) const = 0; - std::unique_ptr<Cursor> MakeCursor(database::GraphDbAccessor &dba) const { - return MakeCursor(&dba, utils::NewDeleteResource()); - } - /** Return @c Symbol vector where the query results will be stored. * * Currently, output symbols are generated in @c Produce and @c Union diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index b0fc61d5b..911ac69e8 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -52,7 +52,8 @@ static void DistinctDefaultAllocator(benchmark::State &state) { while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; - auto cursor = plan_and_cost.first->MakeCursor(dba); + auto cursor = + plan_and_cost.first->MakeCursor(&dba, utils::NewDeleteResource()); while (cursor->Pull(frame, execution_context)) ; } diff --git a/tests/unit/bfs_common.hpp b/tests/unit/bfs_common.hpp index c06d459f8..e780c53a7 100644 --- a/tests/unit/bfs_common.hpp +++ b/tests/unit/bfs_common.hpp @@ -173,7 +173,8 @@ class Yield : public query::plan::LogicalOperator { std::vector<std::vector<query::TypedValue>> PullResults( query::plan::LogicalOperator *last_op, query::ExecutionContext *context, std::vector<query::Symbol> output_symbols) { - auto cursor = last_op->MakeCursor(*context->db_accessor); + auto cursor = + last_op->MakeCursor(context->db_accessor, utils::NewDeleteResource()); std::vector<std::vector<query::TypedValue>> output; { query::Frame frame(context->symbol_table.max_position()); diff --git a/tests/unit/distributed_reset.cpp b/tests/unit/distributed_reset.cpp index e6df8efea..56c3c55fc 100644 --- a/tests/unit/distributed_reset.cpp +++ b/tests/unit/distributed_reset.cpp @@ -21,7 +21,7 @@ TEST_F(DistributedReset, ResetTest) { query::Frame frame(0); query::ExecutionContext context{dba.get()}; auto pull_remote_cursor = - pull_remote->query::plan::LogicalOperator::MakeCursor(*dba); + pull_remote->MakeCursor(dba.get(), utils::NewDeleteResource()); for (int i = 0; i < 3; ++i) { EXPECT_TRUE(pull_remote_cursor->Pull(frame, context)); diff --git a/tests/unit/query_plan_common.hpp b/tests/unit/query_plan_common.hpp index fa16e6e19..a4de396cf 100644 --- a/tests/unit/query_plan_common.hpp +++ b/tests/unit/query_plan_common.hpp @@ -43,7 +43,7 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce, // stream out results auto cursor = - produce.query::plan::LogicalOperator::MakeCursor(*context->db_accessor); + produce.MakeCursor(context->db_accessor, utils::NewDeleteResource()); std::vector<std::vector<TypedValue>> results; while (cursor->Pull(frame, *context)) { std::vector<TypedValue> values; @@ -56,7 +56,8 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce, int PullAll(const LogicalOperator &logical_op, ExecutionContext *context) { Frame frame(context->symbol_table.max_position()); - auto cursor = logical_op.MakeCursor(*context->db_accessor); + auto cursor = + logical_op.MakeCursor(context->db_accessor, utils::NewDeleteResource()); int count = 0; while (cursor->Pull(frame, *context)) count++; return count; diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index 95ffe3136..0b069c64d 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -280,8 +280,8 @@ TEST(QueryPlan, Delete) { n.op_, std::vector<Expression *>{n_get}, true); Frame frame(symbol_table.max_position()); auto context = MakeContext(storage, symbol_table, &dba); - delete_op->query::plan::LogicalOperator::MakeCursor(dba)->Pull(frame, - context); + delete_op->MakeCursor(&dba, utils::NewDeleteResource()) + ->Pull(frame, context); dba.AdvanceCommand(); EXPECT_EQ(3, CountIterable(dba.Vertices(false))); EXPECT_EQ(3, CountIterable(dba.Edges(false))); diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index 2d845f213..610894eb0 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -586,7 +586,7 @@ class QueryPlanExpandVariable : public testing::Test { template <typename TResult> auto GetResults(std::shared_ptr<LogicalOperator> input_op, Symbol symbol) { Frame frame(symbol_table.max_position()); - auto cursor = input_op->MakeCursor(dba_); + auto cursor = input_op->MakeCursor(&dba_, utils::NewDeleteResource()); auto context = MakeContext(storage, symbol_table, &dba_); std::vector<TResult> results; while (cursor->Pull(frame, context)) @@ -884,7 +884,7 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test { total_weight); Frame frame(symbol_table.max_position()); - auto cursor = last_op->MakeCursor(dba); + auto cursor = last_op->MakeCursor(&dba, utils::NewDeleteResource()); std::vector<ResultType> results; auto context = MakeContext(storage, symbol_table, &dba); while (cursor->Pull(frame, context)) {