Add OOM enabler in operator tree (#1379)
This commit is contained in:
parent
7ef10dd82a
commit
2426d7980d
@ -3015,7 +3015,6 @@ PreparedQuery PrepareDatabaseInfoQuery(ParsedQuery parsed_query, bool in_explici
|
||||
results.push_back({TypedValue(label_property_index_mark), TypedValue(storage->LabelToName(item.first)),
|
||||
TypedValue(storage->PropertyToName(item.second))});
|
||||
}
|
||||
|
||||
std::sort(results.begin(), results.end(), [&label_index_mark](const auto &record_1, const auto &record_2) {
|
||||
const auto type_1 = record_1[0].ValueString();
|
||||
const auto type_2 = record_2[0].ValueString();
|
||||
@ -3136,7 +3135,6 @@ PreparedQuery PrepareSystemInfoQuery(ParsedQuery parsed_query, bool in_explicit_
|
||||
action = action_on_complete;
|
||||
pull_plan = std::make_shared<PullPlanVector>(std::move(results));
|
||||
}
|
||||
|
||||
if (pull_plan->Pull(stream, n)) {
|
||||
return action;
|
||||
}
|
||||
|
@ -136,6 +136,8 @@ extern const Event HashJoinOperator;
|
||||
|
||||
namespace memgraph::query::plan {
|
||||
|
||||
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
namespace {
|
||||
|
||||
// Custom equality function for a vector of typed values.
|
||||
@ -180,6 +182,7 @@ inline void AbortCheck(ExecutionContext const &context) {
|
||||
#define SCOPED_PROFILE_OP_BY_REF(ref) ScopedProfile profile{ComputeProfilingKey(this), ref, &context};
|
||||
|
||||
bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Once");
|
||||
|
||||
if (!did_pull_) {
|
||||
@ -267,6 +270,7 @@ CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self, utils::Me
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool CreateNode::CreateNodeCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("CreateNode");
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker &&
|
||||
@ -357,6 +361,7 @@ EdgeAccessor CreateEdge(const EdgeCreationInfo &edge_info, DbAccessor *dba, Vert
|
||||
} // namespace
|
||||
|
||||
bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -446,6 +451,7 @@ class ScanAllCursor : public Cursor {
|
||||
op_name_(op_name) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
AbortCheck(context);
|
||||
@ -749,6 +755,7 @@ Expand::ExpandCursor::ExpandCursor(const Expand &self, int64_t input_degree, int
|
||||
prev_existing_degree_(existing_node_degree) {}
|
||||
|
||||
bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
// A helper function for expanding a node from an edge.
|
||||
@ -1029,6 +1036,7 @@ class ExpandVariableCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), edges_(mem), edges_it_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
@ -1253,6 +1261,7 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("STShortestPath");
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
@ -1510,6 +1519,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("SingleSourceShortestPath");
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
@ -1691,6 +1701,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
pq_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("ExpandWeightedShortestPath");
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
@ -1948,6 +1959,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
|
||||
pq_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("ExpandAllShortestPathsCursor");
|
||||
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
@ -2303,6 +2315,7 @@ class ConstructNamedPathCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self_.input()->MakeCursor(mem)) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("ConstructNamedPath");
|
||||
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -2433,6 +2446,7 @@ Filter::FilterCursor::FilterCursor(const Filter &self, utils::MemoryResource *me
|
||||
pattern_filter_cursors_(MakeCursorVector(self_.pattern_filters_, mem)) {}
|
||||
|
||||
bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Filter");
|
||||
|
||||
// Like all filters, newly set values should not affect filtering of old
|
||||
@ -2472,6 +2486,7 @@ std::vector<Symbol> EvaluatePatternFilter::ModifiedSymbols(const SymbolTable &ta
|
||||
}
|
||||
|
||||
bool EvaluatePatternFilter::EvaluatePatternFilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("EvaluatePatternFilter");
|
||||
|
||||
input_cursor_->Reset();
|
||||
@ -2510,6 +2525,7 @@ Produce::ProduceCursor::ProduceCursor(const Produce &self, utils::MemoryResource
|
||||
: self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
@ -2602,6 +2618,7 @@ void Delete::DeleteCursor::UpdateDeleteBuffer(Frame &frame, ExecutionContext &co
|
||||
}
|
||||
|
||||
bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Delete");
|
||||
|
||||
if (delete_executed_) {
|
||||
@ -2678,6 +2695,7 @@ SetProperty::SetPropertyCursor::SetPropertyCursor(const SetProperty &self, utils
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool SetProperty::SetPropertyCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("SetProperty");
|
||||
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -2893,6 +2911,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
} // namespace
|
||||
|
||||
bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("SetProperties");
|
||||
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -2957,6 +2976,7 @@ SetLabels::SetLabelsCursor::SetLabelsCursor(const SetLabels &self, utils::Memory
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("SetLabels");
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
@ -3029,6 +3049,7 @@ RemoveProperty::RemovePropertyCursor::RemovePropertyCursor(const RemoveProperty
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("RemoveProperty");
|
||||
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -3115,6 +3136,7 @@ RemoveLabels::RemoveLabelsCursor::RemoveLabelsCursor(const RemoveLabels &self, u
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("RemoveLabels");
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
@ -3209,6 +3231,7 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) {
|
||||
} // namespace
|
||||
|
||||
bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("EdgeUniquenessFilter");
|
||||
|
||||
auto expansion_ok = [&]() {
|
||||
@ -3294,6 +3317,7 @@ class AccumulateCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), cache_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Accumulate");
|
||||
|
||||
auto &dba = *context.db_accessor;
|
||||
@ -3394,6 +3418,7 @@ class AggregateCursor : public Cursor {
|
||||
reused_group_by_(self.group_by_.size(), mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
if (!pulled_all_input_) {
|
||||
@ -3775,6 +3800,7 @@ Skip::SkipCursor::SkipCursor(const Skip &self, utils::MemoryResource *mem)
|
||||
: self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Skip");
|
||||
|
||||
while (input_cursor_->Pull(frame, context)) {
|
||||
@ -3828,6 +3854,7 @@ Limit::LimitCursor::LimitCursor(const Limit &self, utils::MemoryResource *mem)
|
||||
: self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {}
|
||||
|
||||
bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Limit");
|
||||
|
||||
// We need to evaluate the limit expression before the first input Pull
|
||||
@ -3890,6 +3917,7 @@ class OrderByCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self_.input_->MakeCursor(mem)), cache_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
if (!did_pull_all_) {
|
||||
@ -4001,6 +4029,7 @@ Merge::MergeCursor::MergeCursor(const Merge &self, utils::MemoryResource *mem)
|
||||
merge_create_cursor_(self.merge_create_->MakeCursor(mem)) {}
|
||||
|
||||
bool Merge::MergeCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Merge");
|
||||
|
||||
while (true) {
|
||||
@ -4077,6 +4106,7 @@ Optional::OptionalCursor::OptionalCursor(const Optional &self, utils::MemoryReso
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), optional_cursor_(self.optional_->MakeCursor(mem)) {}
|
||||
|
||||
bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Optional");
|
||||
|
||||
while (true) {
|
||||
@ -4144,6 +4174,7 @@ class UnwindCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), input_value_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Unwind");
|
||||
while (true) {
|
||||
AbortCheck(context);
|
||||
@ -4203,6 +4234,7 @@ class DistinctCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), seen_rows_(mem) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Distinct");
|
||||
|
||||
while (true) {
|
||||
@ -4292,6 +4324,7 @@ Union::UnionCursor::UnionCursor(const Union &self, utils::MemoryResource *mem)
|
||||
: self_(self), left_cursor_(self.left_op_->MakeCursor(mem)), right_cursor_(self.right_op_->MakeCursor(mem)) {}
|
||||
|
||||
bool Union::UnionCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
utils::pmr::unordered_map<std::string, TypedValue> results(context.evaluation_context.memory);
|
||||
@ -4366,6 +4399,7 @@ class CartesianCursor : public Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(self_);
|
||||
|
||||
if (!cartesian_pull_initialized_) {
|
||||
@ -4458,6 +4492,7 @@ class OutputTableCursor : public Cursor {
|
||||
OutputTableCursor(const OutputTable &self) : self_(self) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
if (!pulled_) {
|
||||
rows_ = self_.callback_(&frame, &context);
|
||||
for (const auto &row : rows_) {
|
||||
@ -4510,6 +4545,7 @@ class OutputTableStreamCursor : public Cursor {
|
||||
explicit OutputTableStreamCursor(const OutputTableStream *self) : self_(self) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
const auto row = self_->callback_(&frame, &context);
|
||||
if (row) {
|
||||
MG_ASSERT(row->size() == self_->output_symbols_.size(), "Wrong number of columns in row!");
|
||||
@ -4655,6 +4691,7 @@ class CallProcedureCursor : public Cursor {
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(*self_);
|
||||
|
||||
AbortCheck(context);
|
||||
@ -4796,6 +4833,7 @@ class CallValidateProcedureCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self_->input_->MakeCursor(mem)) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("CallValidateProcedureCursor");
|
||||
|
||||
AbortCheck(context);
|
||||
@ -4934,6 +4972,7 @@ class LoadCsvCursor : public Cursor {
|
||||
: self_(self), input_cursor_(self_->input_->MakeCursor(mem)), did_pull_{false} {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP_BY_REF(*self_);
|
||||
|
||||
AbortCheck(context);
|
||||
@ -5022,6 +5061,7 @@ class ForeachCursor : public Cursor {
|
||||
expression(foreach.expression_) {}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
if (!input_->Pull(frame, context)) {
|
||||
@ -5129,6 +5169,7 @@ std::vector<Symbol> Apply::ModifiedSymbols(const SymbolTable &table) const {
|
||||
}
|
||||
|
||||
bool Apply::ApplyCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
SCOPED_PROFILE_OP("Apply");
|
||||
|
||||
while (true) {
|
||||
|
@ -848,7 +848,6 @@ EdgeImportMode DiskStorage::GetEdgeImportMode() const {
|
||||
}
|
||||
|
||||
VertexAccessor DiskStorage::DiskAccessor::CreateVertex() {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||
auto gid = disk_storage->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||
auto acc = transaction_.vertices_->access();
|
||||
@ -911,7 +910,6 @@ DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std
|
||||
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to,
|
||||
EdgeTypeId edge_type) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto *from_vertex = from->vertex_;
|
||||
auto *to_vertex = to->vertex_;
|
||||
|
||||
@ -1296,7 +1294,6 @@ std::optional<storage::VertexAccessor> DiskStorage::LoadVertexToMainMemoryCache(
|
||||
VertexAccessor DiskStorage::CreateVertexFromDisk(Transaction *transaction, utils::SkipList<Vertex>::Accessor &accessor,
|
||||
storage::Gid gid, std::vector<LabelId> label_ids,
|
||||
PropertyStore properties, Delta *delta) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto [it, inserted] = accessor.insert(Vertex{gid, delta});
|
||||
MG_ASSERT(inserted, "The vertex must be inserted here!");
|
||||
MG_ASSERT(it != accessor.end(), "Invalid Vertex accessor!");
|
||||
@ -1342,7 +1339,6 @@ std::optional<EdgeAccessor> DiskStorage::CreateEdgeFromDisk(const VertexAccessor
|
||||
Transaction *transaction, EdgeTypeId edge_type,
|
||||
storage::Gid gid, const std::string_view properties,
|
||||
const std::string &old_disk_key, std::string &&read_ts) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto *from_vertex = from->vertex_;
|
||||
auto *to_vertex = to->vertex_;
|
||||
|
||||
|
@ -204,7 +204,6 @@ InMemoryStorage::InMemoryAccessor::~InMemoryAccessor() {
|
||||
}
|
||||
|
||||
VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex() {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||
auto gid = mem_storage->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||
auto acc = mem_storage->vertices_.access();
|
||||
@ -221,7 +220,6 @@ VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex() {
|
||||
}
|
||||
|
||||
VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertexEx(storage::Gid gid) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
// NOTE: When we update the next `vertex_id_` here we perform a RMW
|
||||
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
|
||||
// because this function is only called from the replication delta applier
|
||||
@ -272,6 +270,7 @@ InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector<VertexAccessor *> no
|
||||
|
||||
// Need to inform the next CollectGarbage call that there are some
|
||||
// non-transactional deletions that need to be collected
|
||||
|
||||
auto const inform_gc_vertex_deletion = utils::OnScopeExit{[this, &deleted_vertices = deleted_vertices]() {
|
||||
if (!deleted_vertices.empty() && transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||
@ -300,7 +299,6 @@ InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector<VertexAccessor *> no
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to,
|
||||
EdgeTypeId edge_type) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
MG_ASSERT(from->transaction_ == to->transaction_,
|
||||
"VertexAccessors must be from the same transaction when creating "
|
||||
"an edge!");
|
||||
@ -365,7 +363,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
|
||||
|
||||
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to,
|
||||
EdgeTypeId edge_type, storage::Gid gid) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
MG_ASSERT(from->transaction_ == to->transaction_,
|
||||
"VertexAccessors must be from the same transaction when creating "
|
||||
"an edge!");
|
||||
|
@ -31,8 +31,6 @@ namespace memgraph::storage {
|
||||
|
||||
class InMemoryStorage;
|
||||
|
||||
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
auto ReplicationStateHelper(Config const &config) -> std::optional<std::filesystem::path> {
|
||||
if (!config.durability.restore_replication_state_on_startup) return std::nullopt;
|
||||
return {config.durability.storage_directory};
|
||||
|
@ -11,3 +11,12 @@ target_link_libraries(memgraph__e2e__memory__limit_global_alloc gflags mgclient
|
||||
add_executable(memgraph__e2e__memory__limit_global_alloc_proc memory_limit_global_alloc_proc.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_global_alloc_proc gflags mgclient mg-utils mg-io Threads::Threads)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_delete memory_limit_delete.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_delete gflags mgclient mg-utils mg-io)
|
||||
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_accumulation memory_limit_accumulation.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient mg-utils mg-io)
|
||||
|
||||
add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp)
|
||||
target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io)
|
||||
|
80
tests/e2e/memory/memory_limit_accumulation.cpp
Normal file
80
tests/e2e/memory/memory_limit_accumulation.cpp
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <iostream>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_bool(multi_db, false, "Run test in multi db environment");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Memory Control");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
memgraph::logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
if (!client) {
|
||||
LOG_FATAL("Failed to connect!");
|
||||
}
|
||||
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
|
||||
if (FLAGS_multi_db) {
|
||||
client->Execute("CREATE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("USE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
}
|
||||
|
||||
const auto *create_query = "UNWIND range(1, 500000) as u CREATE (n {id:u%5, string: 'Some longer string'}) RETURN n;";
|
||||
|
||||
try {
|
||||
client->Execute(create_query);
|
||||
[[maybe_unused]] auto results = client->FetchAll();
|
||||
if (results->empty()) {
|
||||
assert(false);
|
||||
return 0;
|
||||
}
|
||||
} catch (const mg::TransientException & /*unused*/) {
|
||||
spdlog::info("Memgraph is out of memory");
|
||||
assert(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const auto *accumulate_query = "MATCH (n) WITH n.id as id, collect(n.string) as list RETURN id, list;";
|
||||
|
||||
try {
|
||||
client->Execute(accumulate_query);
|
||||
[[maybe_unused]] auto results = client->FetchAll();
|
||||
if (results->empty()) {
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
} catch (const mg::TransientException & /*unused*/) {
|
||||
spdlog::info("Memgraph is out of memory");
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
MG_ASSERT(false, "Query should have failed");
|
||||
|
||||
return 0;
|
||||
}
|
80
tests/e2e/memory/memory_limit_delete.cpp
Normal file
80
tests/e2e/memory/memory_limit_delete.cpp
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <iostream>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_bool(multi_db, false, "Run test in multi db environment");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Memory Control");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
memgraph::logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
if (!client) {
|
||||
LOG_FATAL("Failed to connect!");
|
||||
}
|
||||
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
|
||||
if (FLAGS_multi_db) {
|
||||
client->Execute("CREATE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("USE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
}
|
||||
|
||||
const auto *create_query = "UNWIND range(1, 500000) as u CREATE (n {string: 'Some longer string'}) RETURN n;";
|
||||
|
||||
try {
|
||||
client->Execute(create_query);
|
||||
[[maybe_unused]] auto results = client->FetchAll();
|
||||
if (results->empty()) {
|
||||
assert(false);
|
||||
return 0;
|
||||
}
|
||||
} catch (const mg::TransientException & /*unused*/) {
|
||||
spdlog::info("Memgraph is out of memory");
|
||||
assert(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const auto *delete_query = "MATCH (n) DETACH DELETE n RETURN count(n);";
|
||||
|
||||
try {
|
||||
client->Execute(delete_query);
|
||||
[[maybe_unused]] auto results = client->FetchAll();
|
||||
if (results->empty()) {
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
} catch (const mg::TransientException & /*unused*/) {
|
||||
spdlog::info("Memgraph is out of memory");
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
MG_ASSERT(false, "Query should have failed");
|
||||
|
||||
return 0;
|
||||
}
|
66
tests/e2e/memory/memory_limit_edge_create.cpp
Normal file
66
tests/e2e/memory/memory_limit_edge_create.cpp
Normal file
@ -0,0 +1,66 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <iostream>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_bool(multi_db, false, "Run test in multi db environment");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Memory Control");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
memgraph::logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
if (!client) {
|
||||
LOG_FATAL("Failed to connect!");
|
||||
}
|
||||
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
|
||||
if (FLAGS_multi_db) {
|
||||
client->Execute("CREATE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("USE DATABASE clean;");
|
||||
client->DiscardAll();
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
}
|
||||
client->Execute("CREATE INDEX ON :Node(id);");
|
||||
client->DiscardAll();
|
||||
|
||||
const auto *create_edge_query = "UNWIND range(1, 10000) as u MERGE (n:Node {id:u})-[:CONNECTED]->(m: Node {id:u+1});";
|
||||
|
||||
try {
|
||||
client->Execute(create_edge_query);
|
||||
[[maybe_unused]] auto results = client->FetchAll();
|
||||
if (results->empty()) {
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
} catch (const mg::TransientException & /*unused*/) {
|
||||
spdlog::info("Memgraph is out of memory");
|
||||
assert(true);
|
||||
return 0;
|
||||
}
|
||||
MG_ASSERT(false, "Query should have failed");
|
||||
|
||||
return 0;
|
||||
}
|
@ -23,6 +23,30 @@ disk_cluster: &disk_cluster
|
||||
- "STORAGE MODE ON_DISK_TRANSACTIONAL"
|
||||
validation_queries: []
|
||||
|
||||
args_450_MiB_limit: &args_450_MiB_limit
|
||||
- "--bolt-port"
|
||||
- *bolt_port
|
||||
- "--memory-limit=450"
|
||||
- "--storage-gc-cycle-sec=180"
|
||||
- "--log-level=INFO"
|
||||
|
||||
in_memory_450_MiB_limit_cluster: &in_memory_450_MiB_limit_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args_450_MiB_limit
|
||||
log_file: "memory-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
|
||||
disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args_450_MiB_limit
|
||||
log_file: "memory-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
|
||||
workloads:
|
||||
- name: "Memory control"
|
||||
@ -70,3 +94,33 @@ workloads:
|
||||
args: ["--bolt-port", *bolt_port, "--timeout", "180"]
|
||||
proc: "tests/e2e/memory/procedures/"
|
||||
<<: *disk_cluster
|
||||
|
||||
- name: "Memory control for detach delete"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *in_memory_450_MiB_limit_cluster
|
||||
|
||||
- name: "Memory control for detach delete on disk storage"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *disk_450_MiB_limit_cluster
|
||||
|
||||
- name: "Memory control for accumulation"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *in_memory_450_MiB_limit_cluster
|
||||
|
||||
- name: "Memory control for accumulation on disk storage"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *disk_450_MiB_limit_cluster
|
||||
|
||||
- name: "Memory control for edge create"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *in_memory_450_MiB_limit_cluster
|
||||
|
||||
- name: "Memory control for edge create on disk storage"
|
||||
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *disk_450_MiB_limit_cluster
|
||||
|
Loading…
Reference in New Issue
Block a user