Use MemoryResource in AccumulateCursor
Summary: Micro benchmarks show that MonotonicBufferResource improves performance by a factor of 1.5. Reviewers: mtomic, mferencevic, llugovic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2048
This commit is contained in:
parent
78830b6ed8
commit
1cc71c6ce8
@ -2275,57 +2275,74 @@ Accumulate::Accumulate(const std::shared_ptr<LogicalOperator> &input,
|
||||
|
||||
ACCEPT_WITH_INPUT(Accumulate)
|
||||
|
||||
UniqueCursorPtr Accumulate::MakeCursor(database::GraphDbAccessor *db,
|
||||
utils::MemoryResource *mem) const {
|
||||
return MakeUniqueCursorPtr<Accumulate::AccumulateCursor>(mem, *this, db, mem);
|
||||
}
|
||||
|
||||
std::vector<Symbol> Accumulate::ModifiedSymbols(const SymbolTable &) const {
|
||||
return symbols_;
|
||||
}
|
||||
|
||||
Accumulate::AccumulateCursor::AccumulateCursor(const Accumulate &self,
|
||||
database::GraphDbAccessor *db,
|
||||
utils::MemoryResource *mem)
|
||||
: self_(self), db_(*db), input_cursor_(self.input_->MakeCursor(db, mem)) {}
|
||||
class AccumulateCursor : public Cursor {
|
||||
public:
|
||||
AccumulateCursor(const Accumulate &self, database::GraphDbAccessor *db,
|
||||
utils::MemoryResource *mem)
|
||||
: self_(self),
|
||||
db_(*db),
|
||||
input_cursor_(self.input_->MakeCursor(db, mem)),
|
||||
cache_(mem) {}
|
||||
|
||||
bool Accumulate::AccumulateCursor::Pull(Frame &frame,
|
||||
ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("Accumulate");
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("Accumulate");
|
||||
|
||||
// cache all the input
|
||||
if (!pulled_all_input_) {
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
std::vector<TypedValue> row;
|
||||
row.reserve(self_.symbols_.size());
|
||||
for (const Symbol &symbol : self_.symbols_)
|
||||
row.emplace_back(frame[symbol]);
|
||||
cache_.emplace_back(std::move(row));
|
||||
// cache all the input
|
||||
if (!pulled_all_input_) {
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
std::vector<TypedValue, utils::Allocator<TypedValue>> row(
|
||||
cache_.get_allocator().GetMemoryResource());
|
||||
row.reserve(self_.symbols_.size());
|
||||
for (const Symbol &symbol : self_.symbols_)
|
||||
row.emplace_back(frame[symbol]);
|
||||
cache_.emplace_back(std::move(row));
|
||||
}
|
||||
pulled_all_input_ = true;
|
||||
cache_it_ = cache_.begin();
|
||||
|
||||
if (self_.advance_command_) {
|
||||
db_.AdvanceCommand();
|
||||
for (auto &row : cache_)
|
||||
for (auto &col : row) query::ReconstructTypedValue(col);
|
||||
}
|
||||
}
|
||||
pulled_all_input_ = true;
|
||||
cache_it_ = cache_.begin();
|
||||
|
||||
if (self_.advance_command_) {
|
||||
db_.AdvanceCommand();
|
||||
for (auto &row : cache_)
|
||||
for (auto &col : row) query::ReconstructTypedValue(col);
|
||||
}
|
||||
if (db_.should_abort()) throw HintedAbortError();
|
||||
if (cache_it_ == cache_.end()) return false;
|
||||
auto row_it = (cache_it_++)->begin();
|
||||
for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (db_.should_abort()) throw HintedAbortError();
|
||||
if (cache_it_ == cache_.end()) return false;
|
||||
auto row_it = (cache_it_++)->begin();
|
||||
for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++;
|
||||
return true;
|
||||
}
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Accumulate::AccumulateCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
cache_.clear();
|
||||
cache_it_ = cache_.begin();
|
||||
pulled_all_input_ = false;
|
||||
}
|
||||
|
||||
void Accumulate::AccumulateCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
cache_.clear();
|
||||
cache_it_ = cache_.begin();
|
||||
pulled_all_input_ = false;
|
||||
private:
|
||||
const Accumulate &self_;
|
||||
database::GraphDbAccessor &db_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
std::vector<
|
||||
std::vector<TypedValue, utils::Allocator<TypedValue>>,
|
||||
utils::Allocator<std::vector<TypedValue, utils::Allocator<TypedValue>>>>
|
||||
cache_;
|
||||
decltype(cache_.begin()) cache_it_ = cache_.begin();
|
||||
bool pulled_all_input_{false};
|
||||
};
|
||||
|
||||
UniqueCursorPtr Accumulate::MakeCursor(database::GraphDbAccessor *db,
|
||||
utils::MemoryResource *mem) const {
|
||||
return MakeUniqueCursorPtr<AccumulateCursor>(mem, *this, db, mem);
|
||||
}
|
||||
|
||||
Aggregate::Aggregate(const std::shared_ptr<LogicalOperator> &input,
|
||||
|
@ -1538,25 +1538,6 @@ has been cached will be reconstructed before Pull returns.
|
||||
input_ = input;
|
||||
}
|
||||
cpp<#)
|
||||
(:private
|
||||
#>cpp
|
||||
class AccumulateCursor : public Cursor {
|
||||
public:
|
||||
AccumulateCursor(const Accumulate &, database::GraphDbAccessor *,
|
||||
utils::MemoryResource *);
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
const Accumulate &self_;
|
||||
database::GraphDbAccessor &db_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
std::vector<std::vector<TypedValue>> cache_;
|
||||
decltype(cache_.begin()) cache_it_ = cache_.begin();
|
||||
bool pulled_all_input_{false};
|
||||
};
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
|
@ -12,6 +12,21 @@
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/plan/planner.hpp"
|
||||
|
||||
// The following classes are wrappers for utils::MemoryResource, so that we can
|
||||
// use BENCHMARK_TEMPLATE
|
||||
|
||||
class MonotonicBufferResource final {
|
||||
utils::MonotonicBufferResource memory_{query::kExecutionMemoryBlockSize};
|
||||
|
||||
public:
|
||||
utils::MemoryResource *get() { return &memory_; }
|
||||
};
|
||||
|
||||
class NewDeleteResource final {
|
||||
public:
|
||||
utils::MemoryResource *get() { return utils::NewDeleteResource(); }
|
||||
};
|
||||
|
||||
static void AddVertices(database::GraphDb *db, int vertex_count) {
|
||||
auto dba = db->Access();
|
||||
for (int i = 0; i < vertex_count; i++) dba.InsertVertex();
|
||||
@ -68,8 +83,9 @@ static query::CypherQuery *ParseCypherQuery(const std::string &query_string,
|
||||
return utils::Downcast<query::CypherQuery>(parsed_query.query);
|
||||
};
|
||||
|
||||
template <class TMemory>
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void DistinctDefaultAllocator(benchmark::State &state) {
|
||||
static void Distinct(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
@ -89,48 +105,19 @@ static void DistinctDefaultAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
auto cursor =
|
||||
plan_and_cost.first->MakeCursor(&dba, utils::NewDeleteResource());
|
||||
TMemory memory;
|
||||
auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get());
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(DistinctDefaultAllocator)
|
||||
BENCHMARK_TEMPLATE(Distinct, NewDeleteResource)
|
||||
->Range(1024, 1U << 21U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void DistinctLinearAllocator(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
AddVertices(&db, state.range(0));
|
||||
auto dba = db.Access();
|
||||
auto query_string = "MATCH (s) RETURN DISTINCT s";
|
||||
auto *cypher_query = ParseCypherQuery(query_string, &ast);
|
||||
auto symbol_table = query::MakeSymbolTable(cypher_query);
|
||||
auto context =
|
||||
query::plan::MakePlanningContext(&ast, &symbol_table, cypher_query, &dba);
|
||||
auto plan_and_cost =
|
||||
query::plan::MakeLogicalPlan(&context, parameters, false);
|
||||
ResultStreamFaker<query::TypedValue> results;
|
||||
query::Frame frame(symbol_table.max_position());
|
||||
// Nothing should be used from the EvaluationContext, so leave it empty.
|
||||
query::EvaluationContext evaluation_context;
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
utils::MonotonicBufferResource memory(1 * 1024 * 1024);
|
||||
auto cursor = plan_and_cost.first->MakeCursor(&dba, &memory);
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(DistinctLinearAllocator)
|
||||
BENCHMARK_TEMPLATE(Distinct, MonotonicBufferResource)
|
||||
->Range(1024, 1U << 21U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
@ -151,8 +138,9 @@ static query::plan::ExpandVariable MakeExpandVariable(
|
||||
filter_lambda, std::nullopt, std::nullopt);
|
||||
}
|
||||
|
||||
template <class TMemory>
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandVariableDefaultAllocator(benchmark::State &state) {
|
||||
static void ExpandVariable(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
@ -167,7 +155,8 @@ static void ExpandVariableDefaultAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource());
|
||||
TMemory memory;
|
||||
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
@ -177,43 +166,17 @@ static void ExpandVariableDefaultAllocator(benchmark::State &state) {
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandVariableDefaultAllocator)
|
||||
BENCHMARK_TEMPLATE(ExpandVariable, NewDeleteResource)
|
||||
->Ranges({{1, 1U << 5U}, {512, 1U << 13U}})
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandVariableLinearAllocator(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
AddStarGraph(&db, state.range(0), state.range(1));
|
||||
query::SymbolTable symbol_table;
|
||||
auto expand_variable =
|
||||
MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table);
|
||||
auto dba = db.Access();
|
||||
query::Frame frame(symbol_table.max_position());
|
||||
// Nothing should be used from the EvaluationContext, so leave it empty.
|
||||
query::EvaluationContext evaluation_context;
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize);
|
||||
auto cursor = expand_variable.MakeCursor(&dba, &memory);
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
}
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandVariableLinearAllocator)
|
||||
BENCHMARK_TEMPLATE(ExpandVariable, MonotonicBufferResource)
|
||||
->Ranges({{1, 1U << 5U}, {512, 1U << 13U}})
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
template <class TMemory>
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandBfsDefaultAllocator(benchmark::State &state) {
|
||||
static void ExpandBfs(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
@ -228,7 +191,8 @@ static void ExpandBfsDefaultAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource());
|
||||
TMemory memory;
|
||||
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
@ -238,43 +202,17 @@ static void ExpandBfsDefaultAllocator(benchmark::State &state) {
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandBfsDefaultAllocator)
|
||||
BENCHMARK_TEMPLATE(ExpandBfs, NewDeleteResource)
|
||||
->Range(512, 1U << 19U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandBfsLinearAllocator(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
AddTree(&db, state.range(0));
|
||||
query::SymbolTable symbol_table;
|
||||
auto expand_variable =
|
||||
MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table);
|
||||
auto dba = db.Access();
|
||||
query::Frame frame(symbol_table.max_position());
|
||||
// Nothing should be used from the EvaluationContext, so leave it empty.
|
||||
query::EvaluationContext evaluation_context;
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize);
|
||||
auto cursor = expand_variable.MakeCursor(&dba, &memory);
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
}
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandBfsLinearAllocator)
|
||||
BENCHMARK_TEMPLATE(ExpandBfs, MonotonicBufferResource)
|
||||
->Range(512, 1U << 19U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
template <class TMemory>
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandShortestDefaultAllocator(benchmark::State &state) {
|
||||
static void ExpandShortest(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
@ -291,7 +229,8 @@ static void ExpandShortestDefaultAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource());
|
||||
TMemory memory;
|
||||
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
for (const auto &dest : dba.Vertices(false)) {
|
||||
@ -304,21 +243,31 @@ static void ExpandShortestDefaultAllocator(benchmark::State &state) {
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandShortestDefaultAllocator)
|
||||
BENCHMARK_TEMPLATE(ExpandShortest, NewDeleteResource)
|
||||
->Range(512, 1U << 20U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
BENCHMARK_TEMPLATE(ExpandShortest, MonotonicBufferResource)
|
||||
->Range(512, 1U << 20U)
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
template <class TMemory>
|
||||
// NOLINTNEXTLINE(google-runtime-references)
|
||||
static void ExpandShortestLinearAllocator(benchmark::State &state) {
|
||||
static void Accumulate(benchmark::State &state) {
|
||||
query::AstStorage ast;
|
||||
query::Parameters parameters;
|
||||
database::GraphDb db;
|
||||
AddTree(&db, state.range(0));
|
||||
AddVertices(&db, state.range(1));
|
||||
query::SymbolTable symbol_table;
|
||||
auto expand_variable =
|
||||
MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table);
|
||||
expand_variable.common_.existing_node = true;
|
||||
auto dest_symbol = expand_variable.common_.node_symbol;
|
||||
auto scan_all = std::make_shared<query::plan::ScanAll>(
|
||||
nullptr, symbol_table.CreateSymbol("v", false));
|
||||
std::vector<query::Symbol> symbols;
|
||||
symbols.reserve(state.range(0));
|
||||
for (int i = 0; i < state.range(0); ++i) {
|
||||
symbols.push_back(symbol_table.CreateSymbol(std::to_string(i), false));
|
||||
}
|
||||
query::plan::Accumulate accumulate(scan_all, symbols,
|
||||
/* advance_command= */ false);
|
||||
auto dba = db.Access();
|
||||
query::Frame frame(symbol_table.max_position());
|
||||
// Nothing should be used from the EvaluationContext, so leave it empty.
|
||||
@ -326,22 +275,20 @@ static void ExpandShortestLinearAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize);
|
||||
auto cursor = expand_variable.MakeCursor(&dba, &memory);
|
||||
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
|
||||
frame[expand_variable.input_symbol_] = v;
|
||||
for (const auto &dest : dba.Vertices(false)) {
|
||||
frame[dest_symbol] = dest;
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
}
|
||||
TMemory memory;
|
||||
auto cursor = accumulate.MakeCursor(&dba, memory.get());
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
state.SetItemsProcessed(state.iterations());
|
||||
}
|
||||
|
||||
BENCHMARK(ExpandShortestLinearAllocator)
|
||||
->Range(512, 1U << 20U)
|
||||
BENCHMARK_TEMPLATE(Accumulate, NewDeleteResource)
|
||||
->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
BENCHMARK_TEMPLATE(Accumulate, MonotonicBufferResource)
|
||||
->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
|
||||
->Unit(benchmark::kMicrosecond);
|
||||
|
||||
BENCHMARK_MAIN();
|
||||
|
Loading…
Reference in New Issue
Block a user