diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index 7e7166117..4d9cb423b 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -21,6 +21,7 @@ #include "storage/v2/result.hpp" #include "storage/v2/storage.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/memory_tracker.hpp" namespace memgraph::storage { @@ -126,24 +127,28 @@ Result EdgeAccessor::SetProperty(PropertyId property, co if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - - auto current_value = edge_.ptr->properties.GetProperty(property); - // We could skip setting the value if the previous one is the same to the new - // one. This would save some memory as a delta would not be created as well as - // avoid copying the value. The reason we are not doing that is because the - // current code always follows the logical pattern of "create a delta" and - // "modify in-place". Additionally, the created delta will make other - // transactions get a SERIALIZATION_ERROR. - - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); - edge_.ptr->properties.SetProperty(property, value); + using ReturnType = decltype(edge_.ptr->properties.GetProperty(property)); + std::optional current_value; + utils::AtomicMemoryBlock atomic_memory_block{ + [¤t_value, &property, &value, transaction = transaction_, edge = edge_]() { + current_value.emplace(edge.ptr->properties.GetProperty(property)); + // We could skip setting the value if the previous one is the same to the new + // one. This would save some memory as a delta would not be created as well as + // avoid copying the value. The reason we are not doing that is because the + // current code always follows the logical pattern of "create a delta" and + // "modify in-place". Additionally, the created delta will make other + // transactions get a SERIALIZATION_ERROR. + CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value); + edge.ptr->properties.SetProperty(property, value); + }}; + std::invoke(atomic_memory_block); if (transaction_->IsDiskStorage()) { ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_); transaction_->AddModifiedEdge(Gid(), modified_edge); } - return std::move(current_value); + return std::move(*current_value); } Result EdgeAccessor::InitProperties(const std::map &properties) { @@ -157,9 +162,12 @@ Result EdgeAccessor::InitProperties(const std::mapdeleted) return Error::DELETED_OBJECT; if (!edge_.ptr->properties.InitProperties(properties)) return false; - for (const auto &[property, _] : properties) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); - } + utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() { + for (const auto &[property, _] : properties) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); + } + }}; + std::invoke(atomic_memory_block); return true; } @@ -175,13 +183,18 @@ Result>> EdgeAc if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties); + using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties)); + std::optional id_old_new_change; + utils::AtomicMemoryBlock atomic_memory_block{ + [transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() { + id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties)); + for (auto &[property, old_value, new_value] : *id_old_new_change) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); + } + }}; + std::invoke(atomic_memory_block); - for (auto &[property, old_value, new_value] : id_old_new_change) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); - } - - return id_old_new_change; + return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; } Result> EdgeAccessor::ClearProperties() { @@ -193,14 +206,19 @@ Result> EdgeAccessor::ClearProperties() { if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - auto properties = edge_.ptr->properties.Properties(); - for (const auto &property : properties) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); - } + using ReturnType = decltype(edge_.ptr->properties.Properties()); + std::optional properties; + utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() { + properties.emplace(edge_.ptr->properties.Properties()); + for (const auto &property : *properties) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); + } - edge_.ptr->properties.ClearProperties(); + edge_.ptr->properties.ClearProperties(); + }}; + std::invoke(atomic_memory_block); - return std::move(properties); + return properties.has_value() ? std::move(properties.value()) : ReturnType{}; } Result EdgeAccessor::GetProperty(PropertyId property, View view) const { diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 08aa896bf..ddea80fde 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -26,6 +26,7 @@ #include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" #include "storage/v2/property_value.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" @@ -338,18 +339,22 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso delta->prev.Set(&*it); } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); - from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); + to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); - to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + // Increment edge count. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + }}; - // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + std::invoke(atomic_memory_block); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } @@ -433,18 +438,22 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces delta->prev.Set(&*it); } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); - from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); + to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); - to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + // Increment edge count. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + }}; - // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + std::invoke(atomic_memory_block); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } @@ -534,18 +543,22 @@ Result InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor return Error::DELETED_OBJECT; } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge_ref, old_from_vertex, new_from_vertex, edge_type, to_vertex]() { + CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); + new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); + to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); - new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); - to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); + transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + }}; - transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_); } @@ -636,17 +649,22 @@ Result InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor * } } - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge_ref, old_to_vertex, from_vertex, edge_type, new_to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); - from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); - new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); + from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); + new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); + transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); + }}; + + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_); } @@ -709,17 +727,21 @@ Result InMemoryStorage::InMemoryAccessor::EdgeChangeType(EdgeAcces MG_ASSERT((op1 && op2), "Invalid database state!"); - // "deleting" old edge - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); + utils::AtomicMemoryBlock atomic_memory_block{[this, to_vertex, new_edge_type, edge_ref, from_vertex, edge_type]() { + // "deleting" old edge + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - // "adding" new edge - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); + // "adding" new edge + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); - // edge type is not used while invalidating cache so we can only call it once - transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); + // edge type is not used while invalidating cache so we can only call it once + transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); + }}; + + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_); } diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 86cc02696..ed3847314 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -17,6 +17,7 @@ #include "storage/v2/storage.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/event_counter.hpp" #include "utils/event_histogram.hpp" #include "utils/exceptions.hpp" @@ -390,22 +391,29 @@ Result>> Storage::Accessor::ClearEdgesOn if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); - attached_edges_to_vertex->pop_back(); - if (storage_->config_.items.properties_on_edges) { - auto *edge_ptr = edge_ref.ptr; - MarkEdgeAsDeleted(edge_ptr); - } + // MarkEdgeAsDeleted allocates additional memory + // and CreateAndLinkDelta needs memory + utils::AtomicMemoryBlock atomic_memory_block{[&attached_edges_to_vertex, &deleted_edge_ids, &reverse_vertex_order, + &vertex_ptr, &deleted_edges, deletion_delta = deletion_delta, + edge_type = edge_type, opposing_vertex = opposing_vertex, + edge_ref = edge_ref, this]() { + attached_edges_to_vertex->pop_back(); + if (this->storage_->config_.items.properties_on_edges) { + auto *edge_ptr = edge_ref.ptr; + MarkEdgeAsDeleted(edge_ptr); + } - auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; - auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); - bool const edge_cleared_from_both_directions = !was_inserted; - if (edge_cleared_from_both_directions) { - auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; - auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; - deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); - } - - CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); + } + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + }}; + std::invoke(atomic_memory_block); } return std::make_optional(); @@ -449,30 +457,37 @@ Result>> Storage::Accessor::DetachRemain return !set_for_erasure.contains(edge_gid); }); - for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { - auto const &[edge_type, opposing_vertex, edge_ref] = *it; - std::unique_lock guard; - if (storage_->config_.items.properties_on_edges) { - auto edge_ptr = edge_ref.ptr; - guard = std::unique_lock{edge_ptr->lock}; - // this can happen only if we marked edges for deletion with no nodes, - // so the method detaching nodes will not do anything - MarkEdgeAsDeleted(edge_ptr); + // Creating deltas and erasing edge only at the end -> we might have incomplete state as + // delta might cause OOM, so we don't remove edges from edges_attached_to_vertex + utils::AtomicMemoryBlock atomic_memory_block{[&mid, &edges_attached_to_vertex, &deleted_edges, + &partially_detached_edge_ids, this, vertex_ptr, deletion_delta, + reverse_vertex_order]() { + for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { + auto const &[edge_type, opposing_vertex, edge_ref] = *it; + std::unique_lock guard; + if (storage_->config_.items.properties_on_edges) { + auto edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + // this can happen only if we marked edges for deletion with no nodes, + // so the method detaching nodes will not do anything + MarkEdgeAsDeleted(edge_ptr); + } + + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); + } } + edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end()); + }}; - CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); - - auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; - auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid); - bool const edge_cleared_from_both_directions = !was_inserted; - if (edge_cleared_from_both_directions) { - auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; - auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; - deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); - } - } - - edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end()); + std::invoke(atomic_memory_block); return std::make_optional(); }; diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index ff5881563..6559cdee4 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -26,6 +26,7 @@ #include "storage/v2/storage.hpp" #include "storage/v2/vertex_info_cache.hpp" #include "storage/v2/vertex_info_helpers.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/logging.hpp" #include "utils/memory_tracker.hpp" #include "utils/variant_helpers.hpp" @@ -107,8 +108,11 @@ Result VertexAccessor::AddLabel(LabelId label) { if (vertex_->deleted) return Error::DELETED_OBJECT; if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false; - CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label); - vertex_->labels.push_back(label); + utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label]() { + CreateAndLinkDelta(transaction, vertex, Delta::RemoveLabelTag(), label); + vertex->labels.push_back(label); + }}; + std::invoke(atomic_memory_block); if (storage_->config_.items.enable_schema_metadata) { storage_->stored_node_labels_.try_insert(label); @@ -136,9 +140,12 @@ Result VertexAccessor::RemoveLabel(LabelId label) { auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label); if (it == vertex_->labels.end()) return false; - CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label); - *it = vertex_->labels.back(); - vertex_->labels.pop_back(); + utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label, &it]() { + CreateAndLinkDelta(transaction, vertex, Delta::AddLabelTag(), label); + *it = vertex->labels.back(); + vertex->labels.pop_back(); + }}; + std::invoke(atomic_memory_block); /// TODO: some by pointers, some by reference => not good, make it better storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp); @@ -262,8 +269,12 @@ Result VertexAccessor::SetProperty(PropertyId property, const Pro // "modify in-place". Additionally, the created delta will make other // transactions get a SERIALIZATION_ERROR. - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); - vertex_->properties.SetProperty(property, value); + utils::AtomicMemoryBlock atomic_memory_block{ + [transaction = transaction_, vertex = vertex_, &value, &property, ¤t_value]() { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value); + vertex->properties.SetProperty(property, value); + }}; + std::invoke(atomic_memory_block); if (!value.IsNull()) { transaction_->constraint_verification_info.AddedProperty(vertex_); @@ -287,20 +298,28 @@ Result VertexAccessor::InitProperties(const std::mapdeleted) return Error::DELETED_OBJECT; + bool result{false}; + utils::AtomicMemoryBlock atomic_memory_block{ + [&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() { + if (!vertex->properties.InitProperties(properties)) { + result = false; + return; + } + for (const auto &[property, value] : properties) { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, PropertyValue()); + storage->indices_.UpdateOnSetProperty(property, value, vertex, *transaction); + transaction->manyDeltasCache.Invalidate(vertex, property); + if (!value.IsNull()) { + transaction->constraint_verification_info.AddedProperty(vertex); + } else { + transaction->constraint_verification_info.RemovedProperty(vertex); + } + } + result = true; + }}; + std::invoke(atomic_memory_block); - if (!vertex_->properties.InitProperties(properties)) return false; - for (const auto &[property, value] : properties) { - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue()); - storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_); - transaction_->manyDeltasCache.Invalidate(vertex_, property); - if (!value.IsNull()) { - transaction_->constraint_verification_info.AddedProperty(vertex_); - } else { - transaction_->constraint_verification_info.RemovedProperty(vertex_); - } - } - - return true; + return result; } Result>> VertexAccessor::UpdateProperties( @@ -316,20 +335,28 @@ Result>> Vertex if (vertex_->deleted) return Error::DELETED_OBJECT; - auto id_old_new_change = vertex_->properties.UpdateProperties(properties); + using ReturnType = decltype(vertex_->properties.UpdateProperties(properties)); + std::optional id_old_new_change; + utils::AtomicMemoryBlock atomic_memory_block{ + [storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() { + id_old_new_change.emplace(vertex->properties.UpdateProperties(properties)); + if (!id_old_new_change.has_value()) { + return; + } + for (auto &[id, old_value, new_value] : *id_old_new_change) { + storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction); + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value)); + transaction->manyDeltasCache.Invalidate(vertex, id); + if (!new_value.IsNull()) { + transaction->constraint_verification_info.AddedProperty(vertex); + } else { + transaction->constraint_verification_info.RemovedProperty(vertex); + } + } + }}; + std::invoke(atomic_memory_block); - for (auto &[id, old_value, new_value] : id_old_new_change) { - storage_->indices_.UpdateOnSetProperty(id, new_value, vertex_, *transaction_); - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value)); - transaction_->manyDeltasCache.Invalidate(vertex_, id); - if (!new_value.IsNull()) { - transaction_->constraint_verification_info.AddedProperty(vertex_); - } else { - transaction_->constraint_verification_info.RemovedProperty(vertex_); - } - } - - return id_old_new_change; + return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; } Result> VertexAccessor::ClearProperties() { @@ -342,17 +369,25 @@ Result> VertexAccessor::ClearProperties() { if (vertex_->deleted) return Error::DELETED_OBJECT; - auto properties = vertex_->properties.Properties(); - for (const auto &[property, value] : properties) { - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value); - storage_->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_); - transaction_->constraint_verification_info.RemovedProperty(vertex_); - transaction_->manyDeltasCache.Invalidate(vertex_, property); - } + using ReturnType = decltype(vertex_->properties.Properties()); + std::optional properties; + utils::AtomicMemoryBlock atomic_memory_block{ + [storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() { + properties.emplace(vertex->properties.Properties()); + if (!properties.has_value()) { + return; + } + for (const auto &[property, value] : *properties) { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, value); + storage->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex, *transaction); + transaction->constraint_verification_info.RemovedProperty(vertex); + transaction->manyDeltasCache.Invalidate(vertex, property); + } + vertex->properties.ClearProperties(); + }}; + std::invoke(atomic_memory_block); - vertex_->properties.ClearProperties(); - - return std::move(properties); + return properties.has_value() ? std::move(properties.value()) : ReturnType{}; } Result VertexAccessor::GetProperty(PropertyId property, View view) const { diff --git a/src/utils/atomic_memory_block.hpp b/src/utils/atomic_memory_block.hpp new file mode 100644 index 000000000..c15424549 --- /dev/null +++ b/src/utils/atomic_memory_block.hpp @@ -0,0 +1,44 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include "utils/memory_tracker.hpp" + +namespace memgraph::utils { + +// Calls a function with out of memory exception blocker, checks memory allocation after block execution. +// Use it in case you need block which will be executed atomically considering memory execution +// but will check after block is executed if OOM exceptions needs to be thrown +template +class [[nodiscard]] AtomicMemoryBlock { + public: + explicit AtomicMemoryBlock(Callable &&function) : function_{std::forward(function)} {} + AtomicMemoryBlock(AtomicMemoryBlock const &) = delete; + AtomicMemoryBlock(AtomicMemoryBlock &&) = delete; + AtomicMemoryBlock &operator=(AtomicMemoryBlock const &) = delete; + AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete; + ~AtomicMemoryBlock() = default; + + void operator()() { + { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; + function_(); + } + total_memory_tracker.DoCheck(); + } + + private: + Callable function_; +}; + +} // namespace memgraph::utils diff --git a/src/utils/memory_tracker.cpp b/src/utils/memory_tracker.cpp index 029223b71..7dfd88416 100644 --- a/src/utils/memory_tracker.cpp +++ b/src/utils/memory_tracker.cpp @@ -124,6 +124,19 @@ void MemoryTracker::Alloc(const int64_t size) { UpdatePeak(will_be); } +void MemoryTracker::DoCheck() { + const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed); + const auto current_amount = amount_.load(std::memory_order_relaxed); + if (current_hard_limit && current_amount > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] { + MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker; + throw OutOfMemoryException( + fmt::format("Memory limit exceeded! Current " + "use is {}, while the maximum allowed size for allocation is set to {}.", + GetReadableSize(static_cast(current_amount)), + GetReadableSize(static_cast(current_hard_limit)))); + } +} + void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); } } // namespace memgraph::utils diff --git a/src/utils/memory_tracker.hpp b/src/utils/memory_tracker.hpp index 0da888161..a6d7221ff 100644 --- a/src/utils/memory_tracker.hpp +++ b/src/utils/memory_tracker.hpp @@ -49,6 +49,7 @@ class MemoryTracker final { void Alloc(int64_t size); void Free(int64_t size); + void DoCheck(); auto Amount() const { return amount_.load(std::memory_order_relaxed); } diff --git a/tests/stress/memory_limit.py b/tests/stress/memory_limit.py index 2b2084484..dd8225584 100644 --- a/tests/stress/memory_limit.py +++ b/tests/stress/memory_limit.py @@ -173,39 +173,49 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: """ This writer creates lot of nodes on each write. Also it checks that query failed if memory limit is tried to be broken + + Return: + True if write suceeded + False otherwise """ session = SessionCache.argument_session(args) - def create() -> bool: + def try_create() -> bool: """ - Returns True if done, False if needs to continue + Function tries to create until memory limit is reached + Return: + True if it can continue creating (OOM not reached) + False otherwise """ - memory_tracker_data_before_start = get_tracker_data(session) - should_fail = memory_tracker_data_before_start >= 2048 - failed = False + should_continue = True try: try_execute( session, f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", ) except Exception as ex: - failed = True output = str(ex) - log.info("Exception in create", output) - assert "Memory limit exceeded!" in output + memory_over_2048_mb = False + memory_tracker_data_after_start = get_tracker_data(session) + if memory_tracker_data_after_start: + memory_over_2048_mb = memory_tracker_data_after_start >= 2048 + log.info( + "Exception in create, exception output:", + output, + f"Worker {worker_id} started iteration {curr_repetition}, memory over 2048MB: {memory_over_2048_mb}", + ) + has_oom_happend = "Memory limit exceeded!" in output and memory_over_2048_mb + should_continue = not has_oom_happend - if should_fail: - assert failed, "Query should have failed" - return False - return True + return should_continue curr_repetition = 0 while curr_repetition < repetition_count: log.info(f"Worker {worker_id} started iteration {curr_repetition}") - should_continue = create() + should_continue = try_create() if not should_continue: return True @@ -214,6 +224,7 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") curr_repetition += 1 + return False def execute_function(worker: Worker) -> Worker: