diff --git a/src/distributed/bfs_subcursor.cpp b/src/distributed/bfs_subcursor.cpp index efea3d29a..de3623ed1 100644 --- a/src/distributed/bfs_subcursor.cpp +++ b/src/distributed/bfs_subcursor.cpp @@ -53,7 +53,8 @@ void ExpandBfsSubcursor::PrepareForExpand( bool clear, std::vector frame) { if (clear) { Reset(); - frame_.elems() = std::move(frame); + frame_.elems().assign(std::make_move_iterator(frame.begin()), + std::make_move_iterator(frame.end())); } else { std::swap(to_visit_current_, to_visit_next_); to_visit_next_.clear(); diff --git a/src/query/distributed/plan/ops.cpp b/src/query/distributed/plan/ops.cpp index a3eb47880..b38750bf8 100644 --- a/src/query/distributed/plan/ops.cpp +++ b/src/query/distributed/plan/ops.cpp @@ -592,7 +592,7 @@ class SynchronizeCursor : public Cursor { // Accumulate local results while (input_cursor_->Pull(frame, context)) { // Copy the frame elements, because Pull may still use them. - local_frames_.emplace_back(frame.elems()); + local_frames_.emplace_back(frame.elems().begin(), frame.elems().end()); } // Wait for all workers to finish accumulation (first sync point). @@ -852,7 +852,8 @@ class DistributedExpandCursor : public query::plan::Cursor { LOG(FATAL) << "Must indicate exact expansion direction here"; }); future_expands_.emplace_back( - FutureExpand{utils::make_future(std::move(edge_to)), frame.elems()}); + FutureExpand{utils::make_future(std::move(edge_to)), + {frame.elems().begin(), frame.elems().end()}}); }; auto find_ready_future = [this]() { @@ -863,7 +864,8 @@ class DistributedExpandCursor : public query::plan::Cursor { auto put_future_edge_on_frame = [this, &frame](auto &future) { auto edge_to = future.edge_to.get(); - frame.elems() = future.frame_elems; + frame.elems().assign(future.frame_elems.begin(), + future.frame_elems.end()); frame[self_->common_.edge_symbol] = edge_to.first; frame[self_->common_.node_symbol] = edge_to.second; }; @@ -878,7 +880,8 @@ class DistributedExpandCursor : public query::plan::Cursor { if (future_it != future_expands_.end()) { // Backup the current frame (if we haven't done so already) before // putting the future edge. - if (last_frame_.empty()) last_frame_ = frame.elems(); + if (last_frame_.empty()) + last_frame_.assign(frame.elems().begin(), frame.elems().end()); put_future_edge_on_frame(*future_it); // Erase the future and return true to yield the result. future_expands_.erase(future_it); @@ -888,7 +891,7 @@ class DistributedExpandCursor : public query::plan::Cursor { // In case we have replaced the frame with the one for a future edge, // restore it. if (!last_frame_.empty()) { - frame.elems() = last_frame_; + frame.elems().assign(last_frame_.begin(), last_frame_.end()); last_frame_.clear(); } // attempt to get a value from the incoming edges @@ -1204,8 +1207,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { VLOG(10) << "Starting BFS from " << vertex << " with limits " << lower_bound_ << ".." << upper_bound_; - bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true, - frame.elems()); + bfs_subcursor_clients_->PrepareForExpand( + subcursor_ids_, true, {frame.elems().begin(), frame.elems().end()}); bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress()); current_depth_ = 1; } diff --git a/src/query/interpret/frame.hpp b/src/query/interpret/frame.hpp index 218916392..eab69898c 100644 --- a/src/query/interpret/frame.hpp +++ b/src/query/interpret/frame.hpp @@ -2,14 +2,26 @@ #include +#include + #include "query/frontend/semantic/symbol_table.hpp" #include "query/typed_value.hpp" +#include "utils/memory.hpp" namespace query { class Frame { public: - explicit Frame(int size) : size_(size), elems_(size_) {} + /// Create a Frame of given size backed by a utils::NewDeleteResource() + explicit Frame(int64_t size) + : size_(size), elems_(size_, utils::NewDeleteResource()) { + CHECK(size >= 0); + } + + Frame(int64_t size, utils::MemoryResource *memory) + : size_(size), elems_(size_, memory) { + CHECK(size >= 0); + } TypedValue &operator[](const Symbol &symbol) { return elems_[symbol.position()]; @@ -25,9 +37,13 @@ class Frame { auto &elems() { return elems_; } + utils::MemoryResource *GetMemoryResource() const { + return elems_.get_allocator().GetMemoryResource(); + } + private: - int size_; - std::vector elems_; + int64_t size_; + utils::AVector elems_; }; } // namespace query diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 34fa4319f..14c4e18ca 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -97,7 +97,7 @@ class Interpreter { kExecutionMemoryBlockSize)), cursor_( plan_->plan().MakeCursor(db_accessor, execution_memory_.get())), - frame_(plan_->symbol_table().max_position()), + frame_(plan_->symbol_table().max_position(), execution_memory_.get()), output_symbols_(output_symbols), header_(header), summary_(summary), diff --git a/tests/benchmark/query/eval.cpp b/tests/benchmark/query/eval.cpp index 971327d44..ed7368bcd 100644 --- a/tests/benchmark/query/eval.cpp +++ b/tests/benchmark/query/eval.cpp @@ -22,8 +22,8 @@ template static void MapLiteral(benchmark::State &state) { query::AstStorage ast; query::SymbolTable symbol_table; - query::Frame frame(symbol_table.max_position()); TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); database::GraphDb db; auto dba = db.Access(); std::unordered_map elements; @@ -56,8 +56,8 @@ template static void AdditionOperator(benchmark::State &state) { query::AstStorage ast; query::SymbolTable symbol_table; - query::Frame frame(symbol_table.max_position()); TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); database::GraphDb db; auto dba = db.Access(); query::Expression *expr = ast.Create(0); diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index 5661edaab..e324a1f01 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -103,7 +103,6 @@ static void Distinct(benchmark::State &state) { auto plan_and_cost = query::plan::MakeLogicalPlan(&context, parameters, false); ResultStreamFaker results; - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -111,6 +110,7 @@ static void Distinct(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } @@ -153,7 +153,6 @@ static void ExpandVariable(benchmark::State &state) { auto expand_variable = MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -161,6 +160,7 @@ static void ExpandVariable(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); @@ -189,7 +189,6 @@ static void ExpandBfs(benchmark::State &state) { auto expand_variable = MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -197,6 +196,7 @@ static void ExpandBfs(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); @@ -227,7 +227,6 @@ static void ExpandShortest(benchmark::State &state) { expand_variable.common_.existing_node = true; auto dest_symbol = expand_variable.common_.node_symbol; auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -235,6 +234,7 @@ static void ExpandShortest(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); @@ -272,7 +272,6 @@ static void ExpandWeightedShortest(benchmark::State &state) { ast.Create(1)}; auto dest_symbol = expand_variable.common_.node_symbol; auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -280,6 +279,7 @@ static void ExpandWeightedShortest(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); @@ -318,7 +318,6 @@ static void Accumulate(benchmark::State &state) { query::plan::Accumulate accumulate(scan_all, symbols, /* advance_command= */ false); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -326,6 +325,7 @@ static void Accumulate(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = accumulate.MakeCursor(&dba, memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } @@ -367,7 +367,6 @@ static void Aggregate(benchmark::State &state) { } query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -375,6 +374,7 @@ static void Aggregate(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = aggregate.MakeCursor(&dba, memory.get()); frame[symbols.front()] = query::TypedValue(0); // initial group_by value while (cursor->Pull(frame, execution_context)) { @@ -417,7 +417,6 @@ static void OrderBy(benchmark::State &state) { } query::plan::OrderBy order_by(scan_all, sort_items, symbols); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -425,6 +424,7 @@ static void OrderBy(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); auto cursor = order_by.MakeCursor(&dba, memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } @@ -454,9 +454,6 @@ static void Unwind(benchmark::State &state) { auto out_sym = symbol_table.CreateSymbol("out", false); query::plan::Unwind unwind(scan_all, list_expr, out_sym); auto dba = db.Access(); - query::Frame frame(symbol_table.max_position()); - frame[list_sym] = - query::TypedValue(std::vector(state.range(1))); // We need to only set the memory for temporary (per pull) evaluations TMemory per_pull_memory; query::EvaluationContext evaluation_context{per_pull_memory.get()}; @@ -464,6 +461,9 @@ static void Unwind(benchmark::State &state) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; + query::Frame frame(symbol_table.max_position(), memory.get()); + frame[list_sym] = + query::TypedValue(std::vector(state.range(1))); auto cursor = unwind.MakeCursor(&dba, memory.get()); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); }