diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index bf751c012..2065d9916 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -931,7 +931,9 @@ class STShortestPathCursor : public query::plan::Cursor { STShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor *dba, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input()->MakeCursor(dba, mem)) { + : mem_(mem), + self_(self), + input_cursor_(self_.input()->MakeCursor(dba, mem)) { CHECK(self_.common_.existing_node) << "s-t shortest path algorithm should only " "be used when `existing_node` flag is " @@ -981,11 +983,15 @@ class STShortestPathCursor : public query::plan::Cursor { void Reset() override { input_cursor_->Reset(); } private: + utils::MemoryResource *mem_; const ExpandVariable &self_; UniqueCursorPtr input_cursor_; - using VertexEdgeMapT = - std::unordered_map>; + using VertexEdgeMapT = std::unordered_map< + VertexAccessor, std::optional, std::hash, + std::equal_to<>, + utils::Allocator< + std::pair>>>; void ReconstructPath(const VertexAccessor &midpoint, const VertexEdgeMapT &in_edge, @@ -1041,18 +1047,22 @@ class STShortestPathCursor : public query::plan::Cursor { // Holds vertices at the current level of expansion from the source // (sink). - std::vector source_frontier; - std::vector sink_frontier; + std::vector> + source_frontier(mem_); + std::vector> sink_frontier( + mem_); // Holds vertices we can expand to from `source_frontier` // (`sink_frontier`). - std::vector source_next; - std::vector sink_next; + std::vector> source_next( + mem_); + std::vector> sink_next( + mem_); // Maps each vertex we visited expanding from the source (sink) to the // edge used. Necessary for path reconstruction. - VertexEdgeMapT in_edge; - VertexEdgeMapT out_edge; + VertexEdgeMapT in_edge(mem_); + VertexEdgeMapT out_edge(mem_); size_t current_length = 0; @@ -1164,7 +1174,11 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { SingleSourceShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input()->MakeCursor(db, mem)) { + : self_(self), + input_cursor_(self_.input()->MakeCursor(db, mem)), + processed_(mem), + to_visit_current_(mem), + to_visit_next_(mem) { CHECK(!self_.common_.existing_node) << "Single source shortest path algorithm " "should not be used when `existing_node` " @@ -1313,10 +1327,18 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { // maps vertices to the edge they got expanded from. it is an optional // edge because the root does not get expanded from anything. // contains visited vertices as well as those scheduled to be visited. - std::unordered_map> processed_; + std::unordered_map, + std::hash, std::equal_to<>, + utils::Allocator>>> + processed_; // edge/vertex pairs we have yet to visit, for current and next depth - std::vector> to_visit_current_; - std::vector> to_visit_next_; + std::vector, + utils::Allocator>> + to_visit_current_; + std::vector, + utils::Allocator>> + to_visit_next_; }; class ExpandWeightedShortestPathCursor : public query::plan::Cursor { diff --git a/src/utils/algorithm.hpp b/src/utils/algorithm.hpp index a5063513b..2ecce1c99 100644 --- a/src/utils/algorithm.hpp +++ b/src/utils/algorithm.hpp @@ -112,15 +112,18 @@ inline TVal First(TIterable &&iterable, TVal &&empty_value) { return empty_value; } -template -inline bool Contains(const std::unordered_set &iterable, - const TElement &element) { +template +bool Contains( + const std::unordered_set &iterable, + const TElement &element) { return iterable.find(element) != iterable.end(); } -template -inline bool Contains(const std::unordered_map &iterable, - const TKey &key) { +template +bool Contains(const std::unordered_map &iterable, + const TKey &key) { return iterable.find(key) != iterable.end(); } diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index a942b976a..adced9e13 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -34,6 +35,25 @@ static void AddStarGraph(database::GraphDb *db, int spoke_count, int depth) { dba.Commit(); } +static void AddTree(database::GraphDb *db, int vertex_count) { + auto dba = db->Access(); + std::vector vertices; + vertices.reserve(vertex_count); + auto root = dba.InsertVertex(); + root.add_label(dba.Label(kStartLabel)); + vertices.push_back(root); + // NOLINTNEXTLINE(cert-msc32-c,cert-msc51-cpp) + std::mt19937_64 rg(42); + for (int i = 1; i < vertex_count; ++i) { + auto v = dba.InsertVertex(); + std::uniform_int_distribution<> dis(0U, vertices.size() - 1U); + auto &parent = vertices.at(dis(rg)); + dba.InsertEdge(parent, v, dba.EdgeType("Type")); + vertices.push_back(v); + } + dba.Commit(); +} + static query::CypherQuery *ParseCypherQuery(const std::string &query_string, query::AstStorage *ast) { query::frontend::ParsingContext parsing_context; @@ -114,6 +134,23 @@ BENCHMARK(DistinctLinearAllocator) ->Range(1024, 1U << 21U) ->Unit(benchmark::kMicrosecond); +static query::plan::ExpandVariable MakeExpandVariable( + query::EdgeAtom::Type expand_type, query::SymbolTable *symbol_table) { + auto input_symbol = symbol_table->CreateSymbol("input", false); + auto dest_symbol = symbol_table->CreateSymbol("dest", false); + auto edge_symbol = symbol_table->CreateSymbol("edge", false); + auto lambda_node_symbol = symbol_table->CreateSymbol("n", false); + auto lambda_edge_symbol = symbol_table->CreateSymbol("e", false); + query::plan::ExpansionLambda filter_lambda; + filter_lambda.inner_node_symbol = lambda_node_symbol; + filter_lambda.inner_edge_symbol = lambda_edge_symbol; + filter_lambda.expression = nullptr; + return query::plan::ExpandVariable( + nullptr, input_symbol, dest_symbol, edge_symbol, expand_type, + query::EdgeAtom::Direction::OUT, {}, false, nullptr, nullptr, false, + filter_lambda, std::nullopt, std::nullopt); +} + // NOLINTNEXTLINE(google-runtime-references) static void ExpandVariableDefaultAllocator(benchmark::State &state) { query::AstStorage ast; @@ -121,22 +158,10 @@ static void ExpandVariableDefaultAllocator(benchmark::State &state) { database::GraphDb db; AddStarGraph(&db, state.range(0), state.range(1)); query::SymbolTable symbol_table; - auto input_symbol = symbol_table.CreateSymbol("input", false); - auto dest_symbol = symbol_table.CreateSymbol("dest", false); - auto edge_symbol = symbol_table.CreateSymbol("edge", false); - auto lambda_node_symbol = symbol_table.CreateSymbol("n", false); - auto lambda_edge_symbol = symbol_table.CreateSymbol("e", false); + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - query::plan::ExpansionLambda filter_lambda; - filter_lambda.inner_node_symbol = lambda_node_symbol; - filter_lambda.inner_edge_symbol = lambda_edge_symbol; - filter_lambda.expression = nullptr; - query::plan::ExpandVariable expand_variable( - nullptr, input_symbol, dest_symbol, edge_symbol, - query::EdgeAtom::Type::DEPTH_FIRST, query::EdgeAtom::Direction::OUT, {}, - false, nullptr, nullptr, false, filter_lambda, std::nullopt, - std::nullopt); // Nothing should be used from the EvaluationContext, so leave it empty. query::EvaluationContext evaluation_context; while (state.KeepRunning()) { @@ -144,7 +169,7 @@ static void ExpandVariableDefaultAllocator(benchmark::State &state) { evaluation_context}; auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { - frame[input_symbol] = v; + frame[expand_variable.input_symbol_] = v; while (cursor->Pull(frame, execution_context)) ; } @@ -153,8 +178,8 @@ static void ExpandVariableDefaultAllocator(benchmark::State &state) { } BENCHMARK(ExpandVariableDefaultAllocator) - ->Ranges({{1, 1U << 7U}, {512, 1U << 13U}}) - ->Unit(benchmark::kMillisecond); + ->Ranges({{1, 1U << 5U}, {512, 1U << 13U}}) + ->Unit(benchmark::kMicrosecond); // NOLINTNEXTLINE(google-runtime-references) static void ExpandVariableLinearAllocator(benchmark::State &state) { @@ -163,22 +188,10 @@ static void ExpandVariableLinearAllocator(benchmark::State &state) { database::GraphDb db; AddStarGraph(&db, state.range(0), state.range(1)); query::SymbolTable symbol_table; - auto input_symbol = symbol_table.CreateSymbol("input", false); - auto dest_symbol = symbol_table.CreateSymbol("dest", false); - auto edge_symbol = symbol_table.CreateSymbol("edge", false); - auto lambda_node_symbol = symbol_table.CreateSymbol("n", false); - auto lambda_edge_symbol = symbol_table.CreateSymbol("e", false); + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - query::plan::ExpansionLambda filter_lambda; - filter_lambda.inner_node_symbol = lambda_node_symbol; - filter_lambda.inner_edge_symbol = lambda_edge_symbol; - filter_lambda.expression = nullptr; - query::plan::ExpandVariable expand_variable( - nullptr, input_symbol, dest_symbol, edge_symbol, - query::EdgeAtom::Type::DEPTH_FIRST, query::EdgeAtom::Direction::OUT, {}, - false, nullptr, nullptr, false, filter_lambda, std::nullopt, - std::nullopt); // Nothing should be used from the EvaluationContext, so leave it empty. query::EvaluationContext evaluation_context; while (state.KeepRunning()) { @@ -187,7 +200,7 @@ static void ExpandVariableLinearAllocator(benchmark::State &state) { utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize); auto cursor = expand_variable.MakeCursor(&dba, &memory); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { - frame[input_symbol] = v; + frame[expand_variable.input_symbol_] = v; while (cursor->Pull(frame, execution_context)) ; } @@ -196,7 +209,139 @@ static void ExpandVariableLinearAllocator(benchmark::State &state) { } BENCHMARK(ExpandVariableLinearAllocator) - ->Ranges({{1, 1U << 7U}, {512, 1U << 13U}}) - ->Unit(benchmark::kMillisecond); + ->Ranges({{1, 1U << 5U}, {512, 1U << 13U}}) + ->Unit(benchmark::kMicrosecond); + +// NOLINTNEXTLINE(google-runtime-references) +static void ExpandBfsDefaultAllocator(benchmark::State &state) { + query::AstStorage ast; + query::Parameters parameters; + database::GraphDb db; + AddTree(&db, state.range(0)); + query::SymbolTable symbol_table; + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); + auto dba = db.Access(); + query::Frame frame(symbol_table.max_position()); + // Nothing should be used from the EvaluationContext, so leave it empty. + query::EvaluationContext evaluation_context; + while (state.KeepRunning()) { + query::ExecutionContext execution_context{&dba, symbol_table, + evaluation_context}; + auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource()); + for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { + frame[expand_variable.input_symbol_] = v; + while (cursor->Pull(frame, execution_context)) + ; + } + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK(ExpandBfsDefaultAllocator) + ->Range(512, 1U << 19U) + ->Unit(benchmark::kMicrosecond); + +// NOLINTNEXTLINE(google-runtime-references) +static void ExpandBfsLinearAllocator(benchmark::State &state) { + query::AstStorage ast; + query::Parameters parameters; + database::GraphDb db; + AddTree(&db, state.range(0)); + query::SymbolTable symbol_table; + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); + auto dba = db.Access(); + query::Frame frame(symbol_table.max_position()); + // Nothing should be used from the EvaluationContext, so leave it empty. + query::EvaluationContext evaluation_context; + while (state.KeepRunning()) { + query::ExecutionContext execution_context{&dba, symbol_table, + evaluation_context}; + utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize); + auto cursor = expand_variable.MakeCursor(&dba, &memory); + for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { + frame[expand_variable.input_symbol_] = v; + while (cursor->Pull(frame, execution_context)) + ; + } + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK(ExpandBfsLinearAllocator) + ->Range(512, 1U << 19U) + ->Unit(benchmark::kMicrosecond); + +// NOLINTNEXTLINE(google-runtime-references) +static void ExpandShortestDefaultAllocator(benchmark::State &state) { + query::AstStorage ast; + query::Parameters parameters; + database::GraphDb db; + AddTree(&db, state.range(0)); + query::SymbolTable symbol_table; + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); + expand_variable.common_.existing_node = true; + auto dest_symbol = expand_variable.common_.node_symbol; + auto dba = db.Access(); + query::Frame frame(symbol_table.max_position()); + // Nothing should be used from the EvaluationContext, so leave it empty. + query::EvaluationContext evaluation_context; + while (state.KeepRunning()) { + query::ExecutionContext execution_context{&dba, symbol_table, + evaluation_context}; + auto cursor = expand_variable.MakeCursor(&dba, utils::NewDeleteResource()); + for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { + frame[expand_variable.input_symbol_] = v; + for (const auto &dest : dba.Vertices(false)) { + frame[dest_symbol] = dest; + while (cursor->Pull(frame, execution_context)) + ; + } + } + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK(ExpandShortestDefaultAllocator) + ->Range(512, 1U << 20U) + ->Unit(benchmark::kMicrosecond); + +// NOLINTNEXTLINE(google-runtime-references) +static void ExpandShortestLinearAllocator(benchmark::State &state) { + query::AstStorage ast; + query::Parameters parameters; + database::GraphDb db; + AddTree(&db, state.range(0)); + query::SymbolTable symbol_table; + auto expand_variable = + MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); + expand_variable.common_.existing_node = true; + auto dest_symbol = expand_variable.common_.node_symbol; + auto dba = db.Access(); + query::Frame frame(symbol_table.max_position()); + // Nothing should be used from the EvaluationContext, so leave it empty. + query::EvaluationContext evaluation_context; + while (state.KeepRunning()) { + query::ExecutionContext execution_context{&dba, symbol_table, + evaluation_context}; + utils::MonotonicBufferResource memory(query::kExecutionMemoryBlockSize); + auto cursor = expand_variable.MakeCursor(&dba, &memory); + for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { + frame[expand_variable.input_symbol_] = v; + for (const auto &dest : dba.Vertices(false)) { + frame[dest_symbol] = dest; + while (cursor->Pull(frame, execution_context)) + ; + } + } + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK(ExpandShortestLinearAllocator) + ->Range(512, 1U << 20U) + ->Unit(benchmark::kMicrosecond); BENCHMARK_MAIN();