From a6fcdfd90553e57cf2ec5329a873defa25d2bbda Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Tue, 27 Feb 2024 14:45:08 +0000 Subject: [PATCH] Make GC + snapshot, main lock friendly (#1759) - Only IN_MEMORY_ANALYTICAL requires unique lock during snapshot - GC in some cases will be provide with unique lock - This fact can be used for optimisations - In all other cases, optimisations should be done with alternative check. Not via getting a unique lock Also: - Faster property lookup - Faster index iteration (better conditional branching) --- src/storage/v2/disk/storage.hpp | 2 +- .../v2/inmemory/label_property_index.cpp | 23 +-- src/storage/v2/inmemory/storage.cpp | 142 +++++++++--------- src/storage/v2/inmemory/storage.hpp | 4 +- src/storage/v2/property_store.cpp | 38 ++++- src/storage/v2/storage.hpp | 12 +- src/utils/resource_lock.hpp | 11 ++ 7 files changed, 138 insertions(+), 94 deletions(-) diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 5803f15d3..4d71fd10b 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -310,7 +310,7 @@ class DiskStorage final : public Storage { StorageInfo GetBaseInfo() override; StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) override; - void FreeMemory(std::unique_lock /*lock*/) override {} + void FreeMemory(std::unique_lock /*lock*/, bool /*periodic*/) override {} void PrepareForNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); } diff --git a/src/storage/v2/inmemory/label_property_index.cpp b/src/storage/v2/inmemory/label_property_index.cpp index 59b12a779..71e3be4b8 100644 --- a/src/storage/v2/inmemory/label_property_index.cpp +++ b/src/storage/v2/inmemory/label_property_index.cpp @@ -144,27 +144,30 @@ void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_st auto maybe_stop = utils::ResettableCounter<2048>(); for (auto &[label_property, index] : index_) { + auto [label_id, prop_id] = label_property; // before starting index, check if stop_requested if (token.stop_requested()) return; auto index_acc = index.access(); - for (auto it = index_acc.begin(); it != index_acc.end();) { + auto it = index_acc.begin(); + auto end_it = index_acc.end(); + if (it == end_it) continue; + while (true) { // Hot loop, don't check stop_requested every time if (maybe_stop() && token.stop_requested()) return; auto next_it = it; ++next_it; - if (it->timestamp >= oldest_active_start_timestamp) { - it = next_it; - continue; - } - - if ((next_it != index_acc.end() && it->vertex == next_it->vertex && it->value == next_it->value) || - !AnyVersionHasLabelProperty(*it->vertex, label_property.first, label_property.second, it->value, - oldest_active_start_timestamp)) { - index_acc.remove(*it); + bool has_next = next_it != end_it; + if (it->timestamp < oldest_active_start_timestamp) { + bool redundant_duplicate = has_next && it->vertex == next_it->vertex && it->value == next_it->value; + if (redundant_duplicate || + !AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp)) { + index_acc.remove(*it); + } } + if (!has_next) break; it = next_it; } } diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index b09d75a30..bd8534673 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -143,9 +143,7 @@ InMemoryStorage::InMemoryStorage(Config config) if (config_.gc.type == Config::Gc::Type::PERIODIC) { // TODO: move out of storage have one global gc_runner_ - gc_runner_.Run("Storage GC", config_.gc.interval, [this] { - this->FreeMemory(std::unique_lock{main_lock_, std::defer_lock}); - }); + gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->FreeMemory({}, true); }); } if (timestamp_ == kTimestampInitialId) { commit_log_.emplace(); @@ -1425,28 +1423,27 @@ void InMemoryStorage::SetStorageMode(StorageMode new_storage_mode) { } storage_mode_ = new_storage_mode; - FreeMemory(std::move(main_guard)); + FreeMemory(std::move(main_guard), false); } } -template -void InMemoryStorage::CollectGarbage(std::unique_lock main_guard) { +template +void InMemoryStorage::CollectGarbage(std::unique_lock main_guard, bool periodic) { // NOTE: You do not need to consider cleanup of deleted object that occurred in // different storage modes within the same CollectGarbage call. This is because // SetStorageMode will ensure CollectGarbage is called before any new transactions // with the new storage mode can start. // SetStorageMode will pass its unique_lock of main_lock_. We will use that lock, - // as reacquiring the lock would cause deadlock. Otherwise, we need to get our own + // as reacquiring the lock would cause deadlock. Otherwise, we need to get our own // lock. if (!main_guard.owns_lock()) { - if constexpr (force) { - // We take the unique lock on the main storage lock, so we can forcefully clean - // everything we can - if (!main_lock_.try_lock()) { - CollectGarbage(); - return; - } + if constexpr (aggressive) { + // We tried to be aggressive but we do not already have main lock continue as not aggressive + // Perf note: Do not try to get unique lock if it was not already passed in. GC maybe expensive, + // do not assume it is fast, unique lock will blocks all new storage transactions. + CollectGarbage({}, periodic); + return; } else { // Because the garbage collector iterates through the indices and constraints // to clean them up, it must take the main lock for reading to make sure that @@ -1458,17 +1455,24 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } utils::OnScopeExit lock_releaser{[&] { - if (!main_guard.owns_lock()) { - if constexpr (force) { - main_lock_.unlock(); - } else { - main_lock_.unlock_shared(); - } - } else { + if (main_guard.owns_lock()) { main_guard.unlock(); + } else { + main_lock_.unlock_shared(); } }}; + // Only one gc run at a time + std::unique_lock gc_guard(gc_lock_, std::try_to_lock); + if (!gc_guard.owns_lock()) { + return; + } + + // Diagnostic trace + spdlog::trace("Storage GC on '{}' started [{}]", name(), periodic ? "periodic" : "forced"); + auto trace_on_exit = utils::OnScopeExit{ + [&] { spdlog::trace("Storage GC on '{}' finished [{}]", name(), periodic ? "periodic" : "forced"); }}; + // Garbage collection must be performed in two phases. In the first phase, // deltas that won't be applied by any transaction anymore are unlinked from // the version chains. They cannot be deleted immediately, because there @@ -1476,27 +1480,29 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ // chain traversal. They are instead marked for deletion and will be deleted // in the second GC phase in this GC iteration or some of the following // ones. - std::unique_lock gc_guard(gc_lock_, std::try_to_lock); - if (!gc_guard.owns_lock()) { - return; - } uint64_t oldest_active_start_timestamp = commit_log_->OldestActive(); - // Deltas from previous GC runs or from aborts can be cleaned up here - garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { - if constexpr (force) { - // if force is set to true we can simply delete all the leftover undos because - // no transaction is active - garbage_undo_buffers.clear(); - } else { - // garbage_undo_buffers is ordered, pop until we can't - while (!garbage_undo_buffers.empty() && - garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) { - garbage_undo_buffers.pop_front(); + { + std::unique_lock guard(engine_lock_); + uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have + + // Deltas from previous GC runs or from aborts can be cleaned up here + garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + guard.unlock(); + if (aggressive or mark_timestamp == oldest_active_start_timestamp) { + // We know no transaction is active, it is safe to simply delete all the garbage undos + // Nothing can be reading them + garbage_undo_buffers.clear(); + } else { + // garbage_undo_buffers is ordered, pop until we can't + while (!garbage_undo_buffers.empty() && + garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) { + garbage_undo_buffers.pop_front(); + } } - } - }); + }); + } // We don't move undo buffers of unlinked transactions to garbage_undo_buffers // list immediately, because we would have to repeatedly take @@ -1694,7 +1700,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ std::unique_lock guard(engine_lock_); uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have - if (force or mark_timestamp == oldest_active_start_timestamp) { + if (aggressive or mark_timestamp == oldest_active_start_timestamp) { + guard.unlock(); // if lucky, there are no active transactions, hence nothing looking at the deltas // remove them now unlinked_undo_buffers.clear(); @@ -1756,8 +1763,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } // tell the linker he can find the CollectGarbage definitions here -template void InMemoryStorage::CollectGarbage(std::unique_lock); -template void InMemoryStorage::CollectGarbage(std::unique_lock); +template void InMemoryStorage::CollectGarbage(std::unique_lock main_guard, bool periodic); +template void InMemoryStorage::CollectGarbage(std::unique_lock main_guard, bool periodic); StorageInfo InMemoryStorage::GetBaseInfo() { StorageInfo info{}; @@ -2108,50 +2115,35 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera utils::BasicResult InMemoryStorage::CreateSnapshot( memgraph::replication_coordination_glue::ReplicationRole replication_role) { - if (replication_role == memgraph::replication_coordination_glue::ReplicationRole::REPLICA) { + using memgraph::replication_coordination_glue::ReplicationRole; + if (replication_role == ReplicationRole::REPLICA) { return InMemoryStorage::CreateSnapshotError::DisabledForReplica; } - auto const &epoch = repl_storage_state_.epoch_; - auto snapshot_creator = [this, &epoch]() { - utils::Timer timer; - auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_, - memgraph::replication_coordination_glue::ReplicationRole::MAIN); - durability::CreateSnapshot(this, &transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_, - &edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_); - // Finalize snapshot transaction. - commit_log_->MarkFinished(transaction.start_timestamp); - - memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us, - std::chrono::duration_cast(timer.Elapsed()).count()); - }; std::lock_guard snapshot_guard(snapshot_lock_); - auto should_try_shared{true}; - auto max_num_tries{10}; - while (max_num_tries) { - if (should_try_shared) { - std::shared_lock storage_guard(main_lock_); - if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) { - snapshot_creator(); - return {}; - } + auto accessor = std::invoke([&]() { + if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) { + // For analytical no other txn can be in play + return UniqueAccess(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION); } else { - std::unique_lock main_guard{main_lock_}; - if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) { - snapshot_creator(); - return {}; - } + return Access(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION); } - should_try_shared = !should_try_shared; - max_num_tries--; - } + }); - return CreateSnapshotError::ReachedMaxNumTries; + utils::Timer timer; + Transaction *transaction = accessor->GetTransaction(); + auto const &epoch = repl_storage_state_.epoch_; + durability::CreateSnapshot(this, transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_, + &edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_); + + memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us, + std::chrono::duration_cast(timer.Elapsed()).count()); + return {}; } -void InMemoryStorage::FreeMemory(std::unique_lock main_guard) { - CollectGarbage(std::move(main_guard)); +void InMemoryStorage::FreeMemory(std::unique_lock main_guard, bool periodic) { + CollectGarbage(std::move(main_guard), periodic); static_cast(indices_.label_index_.get())->RunGC(); static_cast(indices_.label_property_index_.get())->RunGC(); diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 0b62be7b3..c0e46d0c9 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -332,7 +332,7 @@ class InMemoryStorage final : public Storage { std::unique_ptr UniqueAccess(memgraph::replication_coordination_glue::ReplicationRole replication_role, std::optional override_isolation_level) override; - void FreeMemory(std::unique_lock main_guard) override; + void FreeMemory(std::unique_lock main_guard, bool periodic) override; utils::FileRetainer::FileLockerAccessor::ret_type IsPathLocked(); utils::FileRetainer::FileLockerAccessor::ret_type LockPath(); @@ -363,7 +363,7 @@ class InMemoryStorage final : public Storage { /// @throw std::system_error /// @throw std::bad_alloc template - void CollectGarbage(std::unique_lock main_guard = {}); + void CollectGarbage(std::unique_lock main_guard, bool periodic); bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch); void FinalizeWalFile(); diff --git a/src/storage/v2/property_store.cpp b/src/storage/v2/property_store.cpp index e6e4dbbaf..adf3440a2 100644 --- a/src/storage/v2/property_store.cpp +++ b/src/storage/v2/property_store.cpp @@ -1051,6 +1051,14 @@ struct SpecificPropertyAndBufferInfo { uint64_t all_size; }; +// Struct used to return info about the property position +struct SpecificPropertyAndBufferInfoMinimal { + uint64_t property_begin; + uint64_t property_end; + + auto property_size() const { return property_end - property_begin; } +}; + // Function used to find the position where the property should be in the data // buffer. It keeps the properties in the buffer sorted by `PropertyId` and // returns the positions in the buffer where the seeked property starts and @@ -1083,6 +1091,27 @@ SpecificPropertyAndBufferInfo FindSpecificPropertyAndBufferInfo(Reader *reader, return {property_begin, property_end, property_end - property_begin, all_begin, all_end, all_end - all_begin}; } +// Like FindSpecificPropertyAndBufferInfo, but will early exit. No need to find the "all" information +SpecificPropertyAndBufferInfoMinimal FindSpecificPropertyAndBufferInfoMinimal(Reader *reader, PropertyId property) { + uint64_t property_begin = reader->GetPosition(); + while (true) { + switch (HasExpectedProperty(reader, property)) { + case ExpectedPropertyStatus::MISSING_DATA: + [[fallthrough]]; + case ExpectedPropertyStatus::GREATER: { + return {0, 0}; + } + case ExpectedPropertyStatus::EQUAL: { + return {property_begin, reader->GetPosition()}; + } + case ExpectedPropertyStatus::SMALLER: { + property_begin = reader->GetPosition(); + break; + } + } + } +} + // All data buffers will be allocated to a power of 8 size. uint64_t ToPowerOf8(uint64_t size) { uint64_t mod = size % 8; @@ -1254,11 +1283,12 @@ bool PropertyStore::IsPropertyEqual(PropertyId property, const PropertyValue &va BufferInfo buffer_info = GetBufferInfo(buffer_); Reader reader(buffer_info.data, buffer_info.size); - auto info = FindSpecificPropertyAndBufferInfo(&reader, property); - if (info.property_size == 0) return value.IsNull(); - Reader prop_reader(buffer_info.data + info.property_begin, info.property_size); + auto info = FindSpecificPropertyAndBufferInfoMinimal(&reader, property); + auto property_size = info.property_size(); + if (property_size == 0) return value.IsNull(); + Reader prop_reader(buffer_info.data + info.property_begin, property_size); if (!CompareExpectedProperty(&prop_reader, property, value)) return false; - return prop_reader.GetPosition() == info.property_size; + return prop_reader.GetPosition() == property_size; } std::map PropertyStore::Properties() const { diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index ddbe6d8a7..5868d70a3 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -284,6 +284,8 @@ class Storage { virtual UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label, const std::set &properties) = 0; + auto GetTransaction() -> Transaction * { return std::addressof(transaction_); } + protected: Storage *storage_; std::shared_lock storage_guard_; @@ -336,9 +338,15 @@ class Storage { StorageMode GetStorageMode() const noexcept; - virtual void FreeMemory(std::unique_lock main_guard) = 0; + virtual void FreeMemory(std::unique_lock main_guard, bool periodic) = 0; - void FreeMemory() { FreeMemory({}); } + void FreeMemory() { + if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) { + FreeMemory(std::unique_lock{main_lock_}, false); + } else { + FreeMemory({}, false); + } + } virtual std::unique_ptr Access(memgraph::replication_coordination_glue::ReplicationRole replication_role, std::optional override_isolation_level) = 0; diff --git a/src/utils/resource_lock.hpp b/src/utils/resource_lock.hpp index 7a3ef9444..1a812935a 100644 --- a/src/utils/resource_lock.hpp +++ b/src/utils/resource_lock.hpp @@ -66,6 +66,17 @@ struct ResourceLock { } return false; } + + template + bool try_lock_shared_for(std::chrono::duration const &time) { + auto lock = std::unique_lock{mtx}; + // block until available + if (!cv.wait_for(lock, time, [this] { return state != UNIQUE; })) return false; + state = SHARED; + ++count; + return true; + } + void unlock() { auto lock = std::unique_lock{mtx}; state = UNLOCKED;