Use execution memory for Frame allocations

Summary:
Micro benchmarks show some minor variations compared to the previous
commit. Smaller cases are a bit worse while larger data cases are a bit
better.

Reviewers: mtomic, mferencevic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2136
This commit is contained in:
Teon Banek 2019-06-17 13:39:40 +02:00
parent 6086257204
commit 5adb4aafe0
6 changed files with 45 additions and 25 deletions

View File

@ -53,7 +53,8 @@ void ExpandBfsSubcursor::PrepareForExpand(
bool clear, std::vector<query::TypedValue> frame) { bool clear, std::vector<query::TypedValue> frame) {
if (clear) { if (clear) {
Reset(); Reset();
frame_.elems() = std::move(frame); frame_.elems().assign(std::make_move_iterator(frame.begin()),
std::make_move_iterator(frame.end()));
} else { } else {
std::swap(to_visit_current_, to_visit_next_); std::swap(to_visit_current_, to_visit_next_);
to_visit_next_.clear(); to_visit_next_.clear();

View File

@ -592,7 +592,7 @@ class SynchronizeCursor : public Cursor {
// Accumulate local results // Accumulate local results
while (input_cursor_->Pull(frame, context)) { while (input_cursor_->Pull(frame, context)) {
// Copy the frame elements, because Pull may still use them. // Copy the frame elements, because Pull may still use them.
local_frames_.emplace_back(frame.elems()); local_frames_.emplace_back(frame.elems().begin(), frame.elems().end());
} }
// Wait for all workers to finish accumulation (first sync point). // Wait for all workers to finish accumulation (first sync point).
@ -852,7 +852,8 @@ class DistributedExpandCursor : public query::plan::Cursor {
LOG(FATAL) << "Must indicate exact expansion direction here"; LOG(FATAL) << "Must indicate exact expansion direction here";
}); });
future_expands_.emplace_back( future_expands_.emplace_back(
FutureExpand{utils::make_future(std::move(edge_to)), frame.elems()}); FutureExpand{utils::make_future(std::move(edge_to)),
{frame.elems().begin(), frame.elems().end()}});
}; };
auto find_ready_future = [this]() { auto find_ready_future = [this]() {
@ -863,7 +864,8 @@ class DistributedExpandCursor : public query::plan::Cursor {
auto put_future_edge_on_frame = [this, &frame](auto &future) { auto put_future_edge_on_frame = [this, &frame](auto &future) {
auto edge_to = future.edge_to.get(); auto edge_to = future.edge_to.get();
frame.elems() = future.frame_elems; frame.elems().assign(future.frame_elems.begin(),
future.frame_elems.end());
frame[self_->common_.edge_symbol] = edge_to.first; frame[self_->common_.edge_symbol] = edge_to.first;
frame[self_->common_.node_symbol] = edge_to.second; frame[self_->common_.node_symbol] = edge_to.second;
}; };
@ -878,7 +880,8 @@ class DistributedExpandCursor : public query::plan::Cursor {
if (future_it != future_expands_.end()) { if (future_it != future_expands_.end()) {
// Backup the current frame (if we haven't done so already) before // Backup the current frame (if we haven't done so already) before
// putting the future edge. // putting the future edge.
if (last_frame_.empty()) last_frame_ = frame.elems(); if (last_frame_.empty())
last_frame_.assign(frame.elems().begin(), frame.elems().end());
put_future_edge_on_frame(*future_it); put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result. // Erase the future and return true to yield the result.
future_expands_.erase(future_it); future_expands_.erase(future_it);
@ -888,7 +891,7 @@ class DistributedExpandCursor : public query::plan::Cursor {
// In case we have replaced the frame with the one for a future edge, // In case we have replaced the frame with the one for a future edge,
// restore it. // restore it.
if (!last_frame_.empty()) { if (!last_frame_.empty()) {
frame.elems() = last_frame_; frame.elems().assign(last_frame_.begin(), last_frame_.end());
last_frame_.clear(); last_frame_.clear();
} }
// attempt to get a value from the incoming edges // attempt to get a value from the incoming edges
@ -1204,8 +1207,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
VLOG(10) << "Starting BFS from " << vertex << " with limits " VLOG(10) << "Starting BFS from " << vertex << " with limits "
<< lower_bound_ << ".." << upper_bound_; << lower_bound_ << ".." << upper_bound_;
bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true, bfs_subcursor_clients_->PrepareForExpand(
frame.elems()); subcursor_ids_, true, {frame.elems().begin(), frame.elems().end()});
bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress()); bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
current_depth_ = 1; current_depth_ = 1;
} }

View File

@ -2,14 +2,26 @@
#include <vector> #include <vector>
#include <glog/logging.h>
#include "query/frontend/semantic/symbol_table.hpp" #include "query/frontend/semantic/symbol_table.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "utils/memory.hpp"
namespace query { namespace query {
class Frame { class Frame {
public: public:
explicit Frame(int size) : size_(size), elems_(size_) {} /// Create a Frame of given size backed by a utils::NewDeleteResource()
explicit Frame(int64_t size)
: size_(size), elems_(size_, utils::NewDeleteResource()) {
CHECK(size >= 0);
}
Frame(int64_t size, utils::MemoryResource *memory)
: size_(size), elems_(size_, memory) {
CHECK(size >= 0);
}
TypedValue &operator[](const Symbol &symbol) { TypedValue &operator[](const Symbol &symbol) {
return elems_[symbol.position()]; return elems_[symbol.position()];
@ -25,9 +37,13 @@ class Frame {
auto &elems() { return elems_; } auto &elems() { return elems_; }
utils::MemoryResource *GetMemoryResource() const {
return elems_.get_allocator().GetMemoryResource();
}
private: private:
int size_; int64_t size_;
std::vector<TypedValue> elems_; utils::AVector<TypedValue> elems_;
}; };
} // namespace query } // namespace query

View File

@ -97,7 +97,7 @@ class Interpreter {
kExecutionMemoryBlockSize)), kExecutionMemoryBlockSize)),
cursor_( cursor_(
plan_->plan().MakeCursor(db_accessor, execution_memory_.get())), plan_->plan().MakeCursor(db_accessor, execution_memory_.get())),
frame_(plan_->symbol_table().max_position()), frame_(plan_->symbol_table().max_position(), execution_memory_.get()),
output_symbols_(output_symbols), output_symbols_(output_symbols),
header_(header), header_(header),
summary_(summary), summary_(summary),

View File

@ -22,8 +22,8 @@ template <class TMemory>
static void MapLiteral(benchmark::State &state) { static void MapLiteral(benchmark::State &state) {
query::AstStorage ast; query::AstStorage ast;
query::SymbolTable symbol_table; query::SymbolTable symbol_table;
query::Frame frame(symbol_table.max_position());
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
database::GraphDb db; database::GraphDb db;
auto dba = db.Access(); auto dba = db.Access();
std::unordered_map<query::PropertyIx, query::Expression *> elements; std::unordered_map<query::PropertyIx, query::Expression *> elements;
@ -56,8 +56,8 @@ template <class TMemory>
static void AdditionOperator(benchmark::State &state) { static void AdditionOperator(benchmark::State &state) {
query::AstStorage ast; query::AstStorage ast;
query::SymbolTable symbol_table; query::SymbolTable symbol_table;
query::Frame frame(symbol_table.max_position());
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
database::GraphDb db; database::GraphDb db;
auto dba = db.Access(); auto dba = db.Access();
query::Expression *expr = ast.Create<query::PrimitiveLiteral>(0); query::Expression *expr = ast.Create<query::PrimitiveLiteral>(0);

View File

@ -103,7 +103,6 @@ static void Distinct(benchmark::State &state) {
auto plan_and_cost = auto plan_and_cost =
query::plan::MakeLogicalPlan(&context, parameters, false); query::plan::MakeLogicalPlan(&context, parameters, false);
ResultStreamFaker<query::TypedValue> results; ResultStreamFaker<query::TypedValue> results;
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -111,6 +110,7 @@ static void Distinct(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get()); auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
} }
@ -153,7 +153,6 @@ static void ExpandVariable(benchmark::State &state) {
auto expand_variable = auto expand_variable =
MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table); MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -161,6 +160,7 @@ static void ExpandVariable(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = expand_variable.MakeCursor(&dba, memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v); frame[expand_variable.input_symbol_] = query::TypedValue(v);
@ -189,7 +189,6 @@ static void ExpandBfs(benchmark::State &state) {
auto expand_variable = auto expand_variable =
MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -197,6 +196,7 @@ static void ExpandBfs(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = expand_variable.MakeCursor(&dba, memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v); frame[expand_variable.input_symbol_] = query::TypedValue(v);
@ -227,7 +227,6 @@ static void ExpandShortest(benchmark::State &state) {
expand_variable.common_.existing_node = true; expand_variable.common_.existing_node = true;
auto dest_symbol = expand_variable.common_.node_symbol; auto dest_symbol = expand_variable.common_.node_symbol;
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -235,6 +234,7 @@ static void ExpandShortest(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = expand_variable.MakeCursor(&dba, memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v); frame[expand_variable.input_symbol_] = query::TypedValue(v);
@ -272,7 +272,6 @@ static void ExpandWeightedShortest(benchmark::State &state) {
ast.Create<query::PrimitiveLiteral>(1)}; ast.Create<query::PrimitiveLiteral>(1)};
auto dest_symbol = expand_variable.common_.node_symbol; auto dest_symbol = expand_variable.common_.node_symbol;
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -280,6 +279,7 @@ static void ExpandWeightedShortest(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = expand_variable.MakeCursor(&dba, memory.get()); auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v); frame[expand_variable.input_symbol_] = query::TypedValue(v);
@ -318,7 +318,6 @@ static void Accumulate(benchmark::State &state) {
query::plan::Accumulate accumulate(scan_all, symbols, query::plan::Accumulate accumulate(scan_all, symbols,
/* advance_command= */ false); /* advance_command= */ false);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -326,6 +325,7 @@ static void Accumulate(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = accumulate.MakeCursor(&dba, memory.get()); auto cursor = accumulate.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
} }
@ -367,7 +367,6 @@ static void Aggregate(benchmark::State &state) {
} }
query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols); query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -375,6 +374,7 @@ static void Aggregate(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = aggregate.MakeCursor(&dba, memory.get()); auto cursor = aggregate.MakeCursor(&dba, memory.get());
frame[symbols.front()] = query::TypedValue(0); // initial group_by value frame[symbols.front()] = query::TypedValue(0); // initial group_by value
while (cursor->Pull(frame, execution_context)) { while (cursor->Pull(frame, execution_context)) {
@ -417,7 +417,6 @@ static void OrderBy(benchmark::State &state) {
} }
query::plan::OrderBy order_by(scan_all, sort_items, symbols); query::plan::OrderBy order_by(scan_all, sort_items, symbols);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -425,6 +424,7 @@ static void OrderBy(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
auto cursor = order_by.MakeCursor(&dba, memory.get()); auto cursor = order_by.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
} }
@ -454,9 +454,6 @@ static void Unwind(benchmark::State &state) {
auto out_sym = symbol_table.CreateSymbol("out", false); auto out_sym = symbol_table.CreateSymbol("out", false);
query::plan::Unwind unwind(scan_all, list_expr, out_sym); query::plan::Unwind unwind(scan_all, list_expr, out_sym);
auto dba = db.Access(); auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
frame[list_sym] =
query::TypedValue(std::vector<query::TypedValue>(state.range(1)));
// We need to only set the memory for temporary (per pull) evaluations // We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory; TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()}; query::EvaluationContext evaluation_context{per_pull_memory.get()};
@ -464,6 +461,9 @@ static void Unwind(benchmark::State &state) {
query::ExecutionContext execution_context{&dba, symbol_table, query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context}; evaluation_context};
TMemory memory; TMemory memory;
query::Frame frame(symbol_table.max_position(), memory.get());
frame[list_sym] =
query::TypedValue(std::vector<query::TypedValue>(state.range(1)));
auto cursor = unwind.MakeCursor(&dba, memory.get()); auto cursor = unwind.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
} }