Improve all shortest paths memory usage (#981)
* Change allocator to PoolResource
This commit is contained in:
parent
8ebab84324
commit
285b409927
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
|
@ -793,15 +793,15 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
|
||||
const auto &element_symbol = symbol_table_->at(*extract.identifier_);
|
||||
TypedValue::TVector result(ctx_->memory);
|
||||
result.reserve(list.size());
|
||||
for (const auto &element : list) {
|
||||
for (auto &element : list) {
|
||||
if (element.IsNull()) {
|
||||
result.emplace_back();
|
||||
} else {
|
||||
frame_->at(element_symbol) = element;
|
||||
frame_->at(element_symbol) = std::move(element);
|
||||
result.emplace_back(extract.expression_->Accept(*this));
|
||||
}
|
||||
}
|
||||
return TypedValue(result, ctx_->memory);
|
||||
return TypedValue(std::move(result), ctx_->memory);
|
||||
}
|
||||
|
||||
TypedValue Visit(Exists &exists) override { return TypedValue{frame_->at(symbol_table_->at(exists)), ctx_->memory}; }
|
||||
|
@ -165,6 +165,27 @@ std::optional<std::string> GetOptionalStringValue(query::Expression *expression,
|
||||
return {};
|
||||
};
|
||||
|
||||
bool IsAllShortestPathsQuery(const std::vector<memgraph::query::Clause *> &clauses) {
|
||||
for (const auto &clause : clauses) {
|
||||
if (clause->GetTypeInfo() != Match::kType) {
|
||||
continue;
|
||||
}
|
||||
auto *match_clause = utils::Downcast<Match>(clause);
|
||||
for (const auto &pattern : match_clause->patterns_) {
|
||||
for (const auto &atom : pattern->atoms_) {
|
||||
if (atom->GetTypeInfo() != EdgeAtom::kType) {
|
||||
continue;
|
||||
}
|
||||
auto *edge_atom = utils::Downcast<EdgeAtom>(atom);
|
||||
if (edge_atom->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
public:
|
||||
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
|
||||
@ -1420,8 +1441,13 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
||||
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
|
||||
contains_csv = true;
|
||||
}
|
||||
|
||||
// If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory
|
||||
auto use_monotonic_memory = !contains_csv && !IsCallBatchedProcedureQuery(clauses);
|
||||
auto use_monotonic_memory =
|
||||
!contains_csv && !IsCallBatchedProcedureQuery(clauses) && !IsAllShortestPathsQuery(clauses);
|
||||
spdlog::trace("PrepareCypher has {} encountered all shortest paths and will {} use of monotonic memory",
|
||||
IsAllShortestPathsQuery(clauses) ? "" : "not", use_monotonic_memory ? "" : "not");
|
||||
|
||||
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
|
||||
parsed_query.parameters,
|
||||
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba);
|
||||
@ -1554,8 +1580,11 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
|
||||
contains_csv = true;
|
||||
}
|
||||
// If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory
|
||||
auto use_monotonic_memory = !contains_csv && !IsCallBatchedProcedureQuery(clauses);
|
||||
|
||||
// If this is LOAD CSV, BatchedProcedure or AllShortest query, use PoolResource without MonotonicMemoryResource as we
|
||||
// want to reuse allocated memory
|
||||
auto use_monotonic_memory =
|
||||
!contains_csv && !IsCallBatchedProcedureQuery(clauses) && !IsAllShortestPathsQuery(clauses);
|
||||
|
||||
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE");
|
||||
Frame frame(0);
|
||||
@ -2408,8 +2437,7 @@ Callback SwitchMemoryDevice(storage::StorageMode current_mode, storage::StorageM
|
||||
throw utils::BasicException(
|
||||
"You cannot switch from the on-disk storage mode to an in-memory storage mode while the database is running. "
|
||||
"To make the switch, delete the data directory and restart the database. Once restarted, Memgraph will "
|
||||
"automatically "
|
||||
"start in the default in-memory transactional storage mode.");
|
||||
"automatically start in the default in-memory transactional storage mode.");
|
||||
}
|
||||
if (SwitchingFromInMemoryToDisk(current_mode, requested_mode)) {
|
||||
std::unique_lock main_guard{interpreter_context->db->main_lock_};
|
||||
@ -3186,14 +3214,16 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
cypher_query = profile_query->cypher_query_;
|
||||
}
|
||||
if (const auto &clauses = cypher_query->single_query_->clauses_;
|
||||
IsCallBatchedProcedureQuery(clauses) || std::any_of(clauses.begin(), clauses.end(), [](const auto *clause) {
|
||||
return clause->GetTypeInfo() == LoadCsv::kType;
|
||||
})) {
|
||||
IsAllShortestPathsQuery(clauses) || IsCallBatchedProcedureQuery(clauses) ||
|
||||
std::any_of(clauses.begin(), clauses.end(),
|
||||
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
|
||||
// Using PoolResource without MonotonicMemoryResouce for LOAD CSV reduces memory usage.
|
||||
// QueryExecution MemoryResource is mostly used for allocations done on Frame and storing `row`s
|
||||
query_executions_[query_executions_.size() - 1] = std::make_unique<QueryExecution>(utils::PoolResource(
|
||||
128, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource()));
|
||||
query_execution_ptr = &query_executions_.back();
|
||||
spdlog::trace("PrepareCypher has {} encountered all shortest paths, QueryExection will use PoolResource",
|
||||
IsAllShortestPathsQuery(clauses) ? "" : "not");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -473,7 +473,6 @@ TYPED_TEST(CppApiTestFixture, TestNodeProperties) {
|
||||
ASSERT_EQ(node_1.GetProperty("b").ValueString(), "b");
|
||||
}
|
||||
|
||||
|
||||
TYPED_TEST(CppApiTestFixture, TestValueOperatorLessThan) {
|
||||
const int64_t int1 = 3;
|
||||
const int64_t int2 = 4;
|
||||
@ -503,7 +502,6 @@ TYPED_TEST(CppApiTestFixture, TestNumberEquality) {
|
||||
mgp::Value double_2{2.01};
|
||||
mgp::Value int_2{static_cast<int64_t>(2)};
|
||||
ASSERT_FALSE(double_2 == int_2);
|
||||
|
||||
}
|
||||
|
||||
TYPED_TEST(CppApiTestFixture, TestTypeOperatorStream) {
|
||||
|
Loading…
Reference in New Issue
Block a user