Add atomic memory block around unsafe code blocks (#1589)

This commit is contained in:
Antonio Filipovic 2023-12-21 09:43:16 +01:00 committed by GitHub
parent f11b3c6d9d
commit cd37de481e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 322 additions and 163 deletions

View File

@ -21,6 +21,7 @@
#include "storage/v2/result.hpp" #include "storage/v2/result.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
namespace memgraph::storage { namespace memgraph::storage {
@ -126,24 +127,28 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
using ReturnType = decltype(edge_.ptr->properties.GetProperty(property));
auto current_value = edge_.ptr->properties.GetProperty(property); std::optional<ReturnType> current_value;
// We could skip setting the value if the previous one is the same to the new utils::AtomicMemoryBlock atomic_memory_block{
// one. This would save some memory as a delta would not be created as well as [&current_value, &property, &value, transaction = transaction_, edge = edge_]() {
// avoid copying the value. The reason we are not doing that is because the current_value.emplace(edge.ptr->properties.GetProperty(property));
// current code always follows the logical pattern of "create a delta" and // We could skip setting the value if the previous one is the same to the new
// "modify in-place". Additionally, the created delta will make other // one. This would save some memory as a delta would not be created as well as
// transactions get a SERIALIZATION_ERROR. // avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); // "modify in-place". Additionally, the created delta will make other
edge_.ptr->properties.SetProperty(property, value); // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value);
edge.ptr->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (transaction_->IsDiskStorage()) { if (transaction_->IsDiskStorage()) {
ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_); ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_);
transaction_->AddModifiedEdge(Gid(), modified_edge); transaction_->AddModifiedEdge(Gid(), modified_edge);
} }
return std::move(current_value); return std::move(*current_value);
} }
Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) { Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) {
@ -157,9 +162,12 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
if (!edge_.ptr->properties.InitProperties(properties)) return false; if (!edge_.ptr->properties.InitProperties(properties)) return false;
for (const auto &[property, _] : properties) { utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); for (const auto &[property, _] : properties) {
} CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue());
}
}};
std::invoke(atomic_memory_block);
return true; return true;
} }
@ -175,13 +183,18 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties); using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties));
for (auto &[property, old_value, new_value] : *id_old_new_change) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
}};
std::invoke(atomic_memory_block);
for (auto &[property, old_value, new_value] : id_old_new_change) { return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
return id_old_new_change;
} }
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() { Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
@ -193,14 +206,19 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto properties = edge_.ptr->properties.Properties(); using ReturnType = decltype(edge_.ptr->properties.Properties());
for (const auto &property : properties) { std::optional<ReturnType> properties;
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
} properties.emplace(edge_.ptr->properties.Properties());
for (const auto &property : *properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second);
}
edge_.ptr->properties.ClearProperties(); edge_.ptr->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
return std::move(properties); return properties.has_value() ? std::move(properties.value()) : ReturnType{};
} }
Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const { Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -26,6 +26,7 @@
#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/replication/recovery.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp" #include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/resource_lock.hpp" #include "utils/resource_lock.hpp"
#include "utils/stat.hpp" #include "utils/stat.hpp"
@ -338,18 +339,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); // Increment edge count.
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count. std::invoke(atomic_memory_block);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }
@ -433,18 +438,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); // Increment edge count.
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count. std::invoke(atomic_memory_block);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }
@ -534,18 +543,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
return Error::DELETED_OBJECT; return Error::DELETED_OBJECT;
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge_ref, old_from_vertex, new_from_vertex, edge_type, to_vertex]() {
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); }};
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); std::invoke(atomic_memory_block);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_);
} }
@ -636,17 +649,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *
} }
} }
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); utils::AtomicMemoryBlock atomic_memory_block{
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); [this, edge_ref, old_to_vertex, from_vertex, edge_type, new_to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref);
from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref);
new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_);
} }
@ -709,17 +727,21 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeChangeType(EdgeAcces
MG_ASSERT((op1 && op2), "Invalid database state!"); MG_ASSERT((op1 && op2), "Invalid database state!");
// "deleting" old edge utils::AtomicMemoryBlock atomic_memory_block{[this, to_vertex, new_edge_type, edge_ref, from_vertex, edge_type]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); // "deleting" old edge
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
// "adding" new edge // "adding" new edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref);
// edge type is not used while invalidating cache so we can only call it once // edge type is not used while invalidating cache so we can only call it once
transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }

View File

@ -17,6 +17,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/event_counter.hpp" #include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp" #include "utils/event_histogram.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
@ -390,22 +391,29 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::ClearEdgesOn
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
attached_edges_to_vertex->pop_back(); // MarkEdgeAsDeleted allocates additional memory
if (storage_->config_.items.properties_on_edges) { // and CreateAndLinkDelta needs memory
auto *edge_ptr = edge_ref.ptr; utils::AtomicMemoryBlock atomic_memory_block{[&attached_edges_to_vertex, &deleted_edge_ids, &reverse_vertex_order,
MarkEdgeAsDeleted(edge_ptr); &vertex_ptr, &deleted_edges, deletion_delta = deletion_delta,
} edge_type = edge_type, opposing_vertex = opposing_vertex,
edge_ref = edge_ref, this]() {
attached_edges_to_vertex->pop_back();
if (this->storage_->config_.items.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
MarkEdgeAsDeleted(edge_ptr);
}
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted; bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) { if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
} }
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); }};
std::invoke(atomic_memory_block);
} }
return std::make_optional<ReturnType>(); return std::make_optional<ReturnType>();
@ -449,30 +457,37 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::DetachRemain
return !set_for_erasure.contains(edge_gid); return !set_for_erasure.contains(edge_gid);
}); });
for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { // Creating deltas and erasing edge only at the end -> we might have incomplete state as
auto const &[edge_type, opposing_vertex, edge_ref] = *it; // delta might cause OOM, so we don't remove edges from edges_attached_to_vertex
std::unique_lock<utils::RWSpinLock> guard; utils::AtomicMemoryBlock atomic_memory_block{[&mid, &edges_attached_to_vertex, &deleted_edges,
if (storage_->config_.items.properties_on_edges) { &partially_detached_edge_ids, this, vertex_ptr, deletion_delta,
auto edge_ptr = edge_ref.ptr; reverse_vertex_order]() {
guard = std::unique_lock{edge_ptr->lock}; for (auto it = mid; it != edges_attached_to_vertex->end(); it++) {
// this can happen only if we marked edges for deletion with no nodes, auto const &[edge_type, opposing_vertex, edge_ref] = *it;
// so the method detaching nodes will not do anything std::unique_lock<utils::RWSpinLock> guard;
MarkEdgeAsDeleted(edge_ptr); if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
// this can happen only if we marked edges for deletion with no nodes,
// so the method detaching nodes will not do anything
MarkEdgeAsDeleted(edge_ptr);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
} }
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
}};
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); std::invoke(atomic_memory_block);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
}
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
return std::make_optional<ReturnType>(); return std::make_optional<ReturnType>();
}; };

View File

@ -26,6 +26,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/vertex_info_cache.hpp" #include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/vertex_info_helpers.hpp" #include "storage/v2/vertex_info_helpers.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
@ -107,8 +108,11 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false; if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label); utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label]() {
vertex_->labels.push_back(label); CreateAndLinkDelta(transaction, vertex, Delta::RemoveLabelTag(), label);
vertex->labels.push_back(label);
}};
std::invoke(atomic_memory_block);
if (storage_->config_.items.enable_schema_metadata) { if (storage_->config_.items.enable_schema_metadata) {
storage_->stored_node_labels_.try_insert(label); storage_->stored_node_labels_.try_insert(label);
@ -136,9 +140,12 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label); auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return false; if (it == vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label); utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label, &it]() {
*it = vertex_->labels.back(); CreateAndLinkDelta(transaction, vertex, Delta::AddLabelTag(), label);
vertex_->labels.pop_back(); *it = vertex->labels.back();
vertex->labels.pop_back();
}};
std::invoke(atomic_memory_block);
/// TODO: some by pointers, some by reference => not good, make it better /// TODO: some by pointers, some by reference => not good, make it better
storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp); storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
@ -262,8 +269,12 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
// "modify in-place". Additionally, the created delta will make other // "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR. // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); utils::AtomicMemoryBlock atomic_memory_block{
vertex_->properties.SetProperty(property, value); [transaction = transaction_, vertex = vertex_, &value, &property, &current_value]() {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value);
vertex->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (!value.IsNull()) { if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_); transaction_->constraint_verification_info.AddedProperty(vertex_);
@ -287,20 +298,28 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
bool result{false};
utils::AtomicMemoryBlock atomic_memory_block{
[&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() {
if (!vertex->properties.InitProperties(properties)) {
result = false;
return;
}
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, PropertyValue());
storage->indices_.UpdateOnSetProperty(property, value, vertex, *transaction);
transaction->manyDeltasCache.Invalidate(vertex, property);
if (!value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
result = true;
}};
std::invoke(atomic_memory_block);
if (!vertex_->properties.InitProperties(properties)) return false; return result;
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue());
storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return true;
} }
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties( Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
@ -316,20 +335,28 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = vertex_->properties.UpdateProperties(properties); using ReturnType = decltype(vertex_->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(vertex->properties.UpdateProperties(properties));
if (!id_old_new_change.has_value()) {
return;
}
for (auto &[id, old_value, new_value] : *id_old_new_change) {
storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction);
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value));
transaction->manyDeltasCache.Invalidate(vertex, id);
if (!new_value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
}};
std::invoke(atomic_memory_block);
for (auto &[id, old_value, new_value] : id_old_new_change) { return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
storage_->indices_.UpdateOnSetProperty(id, new_value, vertex_, *transaction_);
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value));
transaction_->manyDeltasCache.Invalidate(vertex_, id);
if (!new_value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return id_old_new_change;
} }
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() { Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
@ -342,17 +369,25 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
auto properties = vertex_->properties.Properties(); using ReturnType = decltype(vertex_->properties.Properties());
for (const auto &[property, value] : properties) { std::optional<ReturnType> properties;
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value); utils::AtomicMemoryBlock atomic_memory_block{
storage_->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_); [storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() {
transaction_->constraint_verification_info.RemovedProperty(vertex_); properties.emplace(vertex->properties.Properties());
transaction_->manyDeltasCache.Invalidate(vertex_, property); if (!properties.has_value()) {
} return;
}
for (const auto &[property, value] : *properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, value);
storage->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex, *transaction);
transaction->constraint_verification_info.RemovedProperty(vertex);
transaction->manyDeltasCache.Invalidate(vertex, property);
}
vertex->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
vertex_->properties.ClearProperties(); return properties.has_value() ? std::move(properties.value()) : ReturnType{};
return std::move(properties);
} }
Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view) const { Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -0,0 +1,44 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <functional>
#include "utils/memory_tracker.hpp"
namespace memgraph::utils {
// Calls a function with out of memory exception blocker, checks memory allocation after block execution.
// Use it in case you need block which will be executed atomically considering memory execution
// but will check after block is executed if OOM exceptions needs to be thrown
template <typename Callable>
class [[nodiscard]] AtomicMemoryBlock {
public:
explicit AtomicMemoryBlock(Callable &&function) : function_{std::forward<Callable>(function)} {}
AtomicMemoryBlock(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock(AtomicMemoryBlock &&) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete;
~AtomicMemoryBlock() = default;
void operator()() {
{
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker;
function_();
}
total_memory_tracker.DoCheck();
}
private:
Callable function_;
};
} // namespace memgraph::utils

View File

@ -124,6 +124,19 @@ void MemoryTracker::Alloc(const int64_t size) {
UpdatePeak(will_be); UpdatePeak(will_be);
} }
void MemoryTracker::DoCheck() {
const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed);
const auto current_amount = amount_.load(std::memory_order_relaxed);
if (current_hard_limit && current_amount > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] {
MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker;
throw OutOfMemoryException(
fmt::format("Memory limit exceeded! Current "
"use is {}, while the maximum allowed size for allocation is set to {}.",
GetReadableSize(static_cast<double>(current_amount)),
GetReadableSize(static_cast<double>(current_hard_limit))));
}
}
void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); } void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); }
} // namespace memgraph::utils } // namespace memgraph::utils

View File

@ -49,6 +49,7 @@ class MemoryTracker final {
void Alloc(int64_t size); void Alloc(int64_t size);
void Free(int64_t size); void Free(int64_t size);
void DoCheck();
auto Amount() const { return amount_.load(std::memory_order_relaxed); } auto Amount() const { return amount_.load(std::memory_order_relaxed); }

View File

@ -173,39 +173,49 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
""" """
This writer creates lot of nodes on each write. This writer creates lot of nodes on each write.
Also it checks that query failed if memory limit is tried to be broken Also it checks that query failed if memory limit is tried to be broken
Return:
True if write suceeded
False otherwise
""" """
session = SessionCache.argument_session(args) session = SessionCache.argument_session(args)
def create() -> bool: def try_create() -> bool:
""" """
Returns True if done, False if needs to continue Function tries to create until memory limit is reached
Return:
True if it can continue creating (OOM not reached)
False otherwise
""" """
memory_tracker_data_before_start = get_tracker_data(session) should_continue = True
should_fail = memory_tracker_data_before_start >= 2048
failed = False
try: try:
try_execute( try_execute(
session, session,
f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
) )
except Exception as ex: except Exception as ex:
failed = True
output = str(ex) output = str(ex)
log.info("Exception in create", output) memory_over_2048_mb = False
assert "Memory limit exceeded!" in output memory_tracker_data_after_start = get_tracker_data(session)
if memory_tracker_data_after_start:
memory_over_2048_mb = memory_tracker_data_after_start >= 2048
log.info(
"Exception in create, exception output:",
output,
f"Worker {worker_id} started iteration {curr_repetition}, memory over 2048MB: {memory_over_2048_mb}",
)
has_oom_happend = "Memory limit exceeded!" in output and memory_over_2048_mb
should_continue = not has_oom_happend
if should_fail: return should_continue
assert failed, "Query should have failed"
return False
return True
curr_repetition = 0 curr_repetition = 0
while curr_repetition < repetition_count: while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}") log.info(f"Worker {worker_id} started iteration {curr_repetition}")
should_continue = create() should_continue = try_create()
if not should_continue: if not should_continue:
return True return True
@ -214,6 +224,7 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
curr_repetition += 1 curr_repetition += 1
return False
def execute_function(worker: Worker) -> Worker: def execute_function(worker: Worker) -> Worker: