diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 1b55b627e..c1b8ee7c5 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -26,6 +26,7 @@ #include "utils/flag_validation.hpp" #include "utils/logging.hpp" #include "utils/memory.hpp" +#include "utils/memory_tracker.hpp" #include "utils/string.hpp" #include "utils/tsc.hpp" @@ -659,7 +660,12 @@ std::optional PullPlan::Pull(AnyStream *stream, std::optional< // Returns true if a result was pulled. const auto pull_result = [&]() -> bool { - utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size); + // We can throw on every query because a simple queries for deleting will use only + // the stack allocated buffer. + // Also, we want to throw only when the query engine requests more memory and not the storage + // so we add the exception to the allocator. + utils::ResourceWithOutOfMemoryException resource_with_exception; + utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception); // TODO (mferencevic): Tune the parameters accordingly. utils::PoolResource pool_memory(128, 1024, &monotonic_memory); ctx_.evaluation_context.memory = &pool_memory; diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index d26728f3f..4fddeaffd 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -17,6 +17,7 @@ #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/wal.hpp" #include "utils/logging.hpp" +#include "utils/memory_tracker.hpp" namespace storage::durability { @@ -135,6 +136,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di std::atomic *edge_count, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, Config::Items items, uint64_t *wal_seq_num) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory)) return std::nullopt; auto snapshot_files = GetSnapshotFiles(snapshot_directory); diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index 7c2fff5ec..2dd29b759 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -4,6 +4,7 @@ #include "storage/v2/mvcc.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/memory_tracker.hpp" namespace storage { @@ -16,6 +17,7 @@ VertexAccessor EdgeAccessor::ToVertex() const { } Result EdgeAccessor::SetProperty(PropertyId property, const PropertyValue &value) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED; std::lock_guard guard(edge_.ptr->lock); diff --git a/src/storage/v2/indices.cpp b/src/storage/v2/indices.cpp index 057690726..d873e8ecf 100644 --- a/src/storage/v2/indices.cpp +++ b/src/storage/v2/indices.cpp @@ -2,6 +2,7 @@ #include "storage/v2/mvcc.hpp" #include "utils/logging.hpp" +#include "utils/memory_tracker.hpp" namespace storage { @@ -256,17 +257,24 @@ void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transacti } bool LabelIndex::CreateIndex(LabelId label, utils::SkipList::Accessor vertices) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label), std::forward_as_tuple()); if (!emplaced) { // Index already exists. return false; } - auto acc = it->second.access(); - for (Vertex &vertex : vertices) { - if (vertex.deleted || !utils::Contains(vertex.labels, label)) { - continue; + try { + auto acc = it->second.access(); + for (Vertex &vertex : vertices) { + if (vertex.deleted || !utils::Contains(vertex.labels, label)) { + continue; + } + acc.insert(Entry{&vertex, 0}); } - acc.insert(Entry{&vertex, 0}); + } catch (const utils::OutOfMemoryException &) { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; + index_.erase(it); + throw; } return true; } @@ -389,22 +397,29 @@ void LabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const Property } bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label, property), std::forward_as_tuple()); if (!emplaced) { // Index already exists. return false; } - auto acc = it->second.access(); - for (Vertex &vertex : vertices) { - if (vertex.deleted || !utils::Contains(vertex.labels, label)) { - continue; + try { + auto acc = it->second.access(); + for (Vertex &vertex : vertices) { + if (vertex.deleted || !utils::Contains(vertex.labels, label)) { + continue; + } + auto value = vertex.properties.GetProperty(property); + if (value.IsNull()) { + continue; + } + acc.insert(Entry{std::move(value), &vertex, 0}); } - auto value = vertex.properties.GetProperty(property); - if (value.IsNull()) { - continue; - } - acc.insert(Entry{std::move(value), &vertex, 0}); + } catch (const utils::OutOfMemoryException &) { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; + index_.erase(it); + throw; } return true; } diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index ba7dc5d8d..f0ebe9fe9 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -19,6 +19,7 @@ #include "storage/v2/transaction.hpp" #include "utils/file.hpp" #include "utils/logging.hpp" +#include "utils/memory_tracker.hpp" #include "utils/rw_lock.hpp" #include "utils/spin_lock.hpp" #include "utils/stat.hpp" @@ -31,6 +32,8 @@ namespace storage { +using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; + namespace { [[maybe_unused]] constexpr uint16_t kEpochHistoryRetention = 1000; } // namespace @@ -415,6 +418,7 @@ Storage::Accessor::~Accessor() { } VertexAccessor Storage::Accessor::CreateVertex() { + OOMExceptionEnabler oom_exception; auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->vertices_.access(); auto delta = CreateDeleteObjectDelta(&transaction_); @@ -426,6 +430,7 @@ VertexAccessor Storage::Accessor::CreateVertex() { } VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) { + OOMExceptionEnabler oom_exception; // NOTE: When we update the next `vertex_id_` here we perform a RMW // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue // because this function is only called from the replication delta applier @@ -528,6 +533,7 @@ Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { } Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) { + OOMExceptionEnabler oom_exception; MG_ASSERT(from->transaction_ == to->transaction_, "VertexAccessors must be from the same transaction when creating " "an edge!"); @@ -587,6 +593,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid) { + OOMExceptionEnabler oom_exception; MG_ASSERT(from->transaction_ == to->transaction_, "VertexAccessors must be from the same transaction when creating " "an edge!"); diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index d06d0fa94..234f61375 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -7,6 +7,7 @@ #include "storage/v2/indices.hpp" #include "storage/v2/mvcc.hpp" #include "utils/logging.hpp" +#include "utils/memory_tracker.hpp" namespace storage { @@ -44,6 +45,7 @@ std::optional VertexAccessor::Create(Vertex *vertex, Transaction } Result VertexAccessor::AddLabel(LabelId label) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; @@ -176,6 +178,7 @@ Result> VertexAccessor::Labels(View view) const { } Result VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp index 0d5607dde..1f061d257 100644 --- a/src/utils/memory.hpp +++ b/src/utils/memory.hpp @@ -23,6 +23,7 @@ #include "utils/logging.hpp" #include "utils/math.hpp" +#include "utils/memory_tracker.hpp" #include "utils/spin_lock.hpp" namespace utils { @@ -552,4 +553,25 @@ class LimitedMemoryResource final : public utils::MemoryResource { bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; } }; +// Allocate memory with the OutOfMemoryException enabled if the requested size +// puts total allocated amount over the limit. +class ResourceWithOutOfMemoryException : public MemoryResource { + public: + explicit ResourceWithOutOfMemoryException(utils::MemoryResource *upstream = utils::NewDeleteResource()) + : upstream_{upstream} {} + + utils::MemoryResource *GetUpstream() noexcept { return upstream_; } + + private: + void *DoAllocate(size_t bytes, size_t alignment) override { + utils::MemoryTracker::OutOfMemoryExceptionEnabler exception_enabler; + return upstream_->Allocate(bytes, alignment); + } + + void DoDeallocate(void *p, size_t bytes, size_t alignment) override { upstream_->Deallocate(p, bytes, alignment); } + + bool DoIsEqual(const utils::MemoryResource &other) const noexcept override { return upstream_->IsEqual(other); } + + MemoryResource *upstream_{utils::NewDeleteResource()}; +}; } // namespace utils diff --git a/src/utils/skip_list.hpp b/src/utils/skip_list.hpp index d770ca768..fd7687504 100644 --- a/src/utils/skip_list.hpp +++ b/src/utils/skip_list.hpp @@ -264,6 +264,10 @@ class SkipListGc final { } void Run() { + // This method can be called after any skip list method, including the add method + // which could have OOMException enabled in its thread so to ensure no exception + // is thrown while cleaning the skip list, we add the blocker. + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; if (!lock_.try_lock()) return; OnScopeExit cleanup([&] { lock_.unlock(); }); Block *tail = tail_.load(std::memory_order_acquire);