Add exception enablers and blockers (#99)
* Throw OOMException while creating vertices and edges * Throw on indices creation * Throw on setting a property * Throw oom exception while recovering * Throw exception when query engine asks for extra memory * Block out of memor exception during skip list GC
This commit is contained in:
parent
bbed7a2397
commit
dee885d69c
@ -26,6 +26,7 @@
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
@ -659,7 +660,12 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
|
||||
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size);
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "storage/v2/durability/snapshot.hpp"
|
||||
#include "storage/v2/durability/wal.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace storage::durability {
|
||||
|
||||
@ -135,6 +136,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
|
||||
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
|
||||
Indices *indices, Constraints *constraints, Config::Items items,
|
||||
uint64_t *wal_seq_num) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory)) return std::nullopt;
|
||||
|
||||
auto snapshot_files = GetSnapshotFiles(snapshot_directory);
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/vertex_accessor.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -16,6 +17,7 @@ VertexAccessor EdgeAccessor::ToVertex() const {
|
||||
}
|
||||
|
||||
Result<bool> EdgeAccessor::SetProperty(PropertyId property, const PropertyValue &value) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -256,17 +257,24 @@ void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transacti
|
||||
}
|
||||
|
||||
bool LabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label), std::forward_as_tuple());
|
||||
if (!emplaced) {
|
||||
// Index already exists.
|
||||
return false;
|
||||
}
|
||||
auto acc = it->second.access();
|
||||
for (Vertex &vertex : vertices) {
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
|
||||
continue;
|
||||
try {
|
||||
auto acc = it->second.access();
|
||||
for (Vertex &vertex : vertices) {
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
|
||||
continue;
|
||||
}
|
||||
acc.insert(Entry{&vertex, 0});
|
||||
}
|
||||
acc.insert(Entry{&vertex, 0});
|
||||
} catch (const utils::OutOfMemoryException &) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
|
||||
index_.erase(it);
|
||||
throw;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -389,22 +397,29 @@ void LabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const Property
|
||||
}
|
||||
|
||||
bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
auto [it, emplaced] =
|
||||
index_.emplace(std::piecewise_construct, std::forward_as_tuple(label, property), std::forward_as_tuple());
|
||||
if (!emplaced) {
|
||||
// Index already exists.
|
||||
return false;
|
||||
}
|
||||
auto acc = it->second.access();
|
||||
for (Vertex &vertex : vertices) {
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
|
||||
continue;
|
||||
try {
|
||||
auto acc = it->second.access();
|
||||
for (Vertex &vertex : vertices) {
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
|
||||
continue;
|
||||
}
|
||||
auto value = vertex.properties.GetProperty(property);
|
||||
if (value.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
acc.insert(Entry{std::move(value), &vertex, 0});
|
||||
}
|
||||
auto value = vertex.properties.GetProperty(property);
|
||||
if (value.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
acc.insert(Entry{std::move(value), &vertex, 0});
|
||||
} catch (const utils::OutOfMemoryException &) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
|
||||
index_.erase(it);
|
||||
throw;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
#include "utils/stat.hpp"
|
||||
@ -31,6 +32,8 @@
|
||||
|
||||
namespace storage {
|
||||
|
||||
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
namespace {
|
||||
[[maybe_unused]] constexpr uint16_t kEpochHistoryRetention = 1000;
|
||||
} // namespace
|
||||
@ -415,6 +418,7 @@ Storage::Accessor::~Accessor() {
|
||||
}
|
||||
|
||||
VertexAccessor Storage::Accessor::CreateVertex() {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||
auto acc = storage_->vertices_.access();
|
||||
auto delta = CreateDeleteObjectDelta(&transaction_);
|
||||
@ -426,6 +430,7 @@ VertexAccessor Storage::Accessor::CreateVertex() {
|
||||
}
|
||||
|
||||
VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
// NOTE: When we update the next `vertex_id_` here we perform a RMW
|
||||
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
|
||||
// because this function is only called from the replication delta applier
|
||||
@ -528,6 +533,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
MG_ASSERT(from->transaction_ == to->transaction_,
|
||||
"VertexAccessors must be from the same transaction when creating "
|
||||
"an edge!");
|
||||
@ -587,6 +593,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
|
||||
|
||||
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type,
|
||||
storage::Gid gid) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
MG_ASSERT(from->transaction_ == to->transaction_,
|
||||
"VertexAccessors must be from the same transaction when creating "
|
||||
"an edge!");
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "storage/v2/indices.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -44,6 +45,7 @@ std::optional<VertexAccessor> VertexAccessor::Create(Vertex *vertex, Transaction
|
||||
}
|
||||
|
||||
Result<bool> VertexAccessor::AddLabel(LabelId label) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
@ -176,6 +178,7 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
|
||||
}
|
||||
|
||||
Result<bool> VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/math.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
|
||||
namespace utils {
|
||||
@ -552,4 +553,25 @@ class LimitedMemoryResource final : public utils::MemoryResource {
|
||||
bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
|
||||
};
|
||||
|
||||
// Allocate memory with the OutOfMemoryException enabled if the requested size
|
||||
// puts total allocated amount over the limit.
|
||||
class ResourceWithOutOfMemoryException : public MemoryResource {
|
||||
public:
|
||||
explicit ResourceWithOutOfMemoryException(utils::MemoryResource *upstream = utils::NewDeleteResource())
|
||||
: upstream_{upstream} {}
|
||||
|
||||
utils::MemoryResource *GetUpstream() noexcept { return upstream_; }
|
||||
|
||||
private:
|
||||
void *DoAllocate(size_t bytes, size_t alignment) override {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler exception_enabler;
|
||||
return upstream_->Allocate(bytes, alignment);
|
||||
}
|
||||
|
||||
void DoDeallocate(void *p, size_t bytes, size_t alignment) override { upstream_->Deallocate(p, bytes, alignment); }
|
||||
|
||||
bool DoIsEqual(const utils::MemoryResource &other) const noexcept override { return upstream_->IsEqual(other); }
|
||||
|
||||
MemoryResource *upstream_{utils::NewDeleteResource()};
|
||||
};
|
||||
} // namespace utils
|
||||
|
@ -264,6 +264,10 @@ class SkipListGc final {
|
||||
}
|
||||
|
||||
void Run() {
|
||||
// This method can be called after any skip list method, including the add method
|
||||
// which could have OOMException enabled in its thread so to ensure no exception
|
||||
// is thrown while cleaning the skip list, we add the blocker.
|
||||
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker;
|
||||
if (!lock_.try_lock()) return;
|
||||
OnScopeExit cleanup([&] { lock_.unlock(); });
|
||||
Block *tail = tail_.load(std::memory_order_acquire);
|
||||
|
Loading…
Reference in New Issue
Block a user