Make GC + snapshot, main lock friendly (#1759)
- Only IN_MEMORY_ANALYTICAL requires unique lock during snapshot - GC in some cases will be provide with unique lock - This fact can be used for optimisations - In all other cases, optimisations should be done with alternative check. Not via getting a unique lock Also: - Faster property lookup - Faster index iteration (better conditional branching)
This commit is contained in:
parent
e88c7a0aa5
commit
a6fcdfd905
@ -310,7 +310,7 @@ class DiskStorage final : public Storage {
|
||||
StorageInfo GetBaseInfo() override;
|
||||
StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
|
||||
|
||||
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/) override {}
|
||||
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/, bool /*periodic*/) override {}
|
||||
|
||||
void PrepareForNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }
|
||||
|
||||
|
@ -144,27 +144,30 @@ void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_st
|
||||
auto maybe_stop = utils::ResettableCounter<2048>();
|
||||
|
||||
for (auto &[label_property, index] : index_) {
|
||||
auto [label_id, prop_id] = label_property;
|
||||
// before starting index, check if stop_requested
|
||||
if (token.stop_requested()) return;
|
||||
|
||||
auto index_acc = index.access();
|
||||
for (auto it = index_acc.begin(); it != index_acc.end();) {
|
||||
auto it = index_acc.begin();
|
||||
auto end_it = index_acc.end();
|
||||
if (it == end_it) continue;
|
||||
while (true) {
|
||||
// Hot loop, don't check stop_requested every time
|
||||
if (maybe_stop() && token.stop_requested()) return;
|
||||
|
||||
auto next_it = it;
|
||||
++next_it;
|
||||
|
||||
if (it->timestamp >= oldest_active_start_timestamp) {
|
||||
it = next_it;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((next_it != index_acc.end() && it->vertex == next_it->vertex && it->value == next_it->value) ||
|
||||
!AnyVersionHasLabelProperty(*it->vertex, label_property.first, label_property.second, it->value,
|
||||
oldest_active_start_timestamp)) {
|
||||
bool has_next = next_it != end_it;
|
||||
if (it->timestamp < oldest_active_start_timestamp) {
|
||||
bool redundant_duplicate = has_next && it->vertex == next_it->vertex && it->value == next_it->value;
|
||||
if (redundant_duplicate ||
|
||||
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp)) {
|
||||
index_acc.remove(*it);
|
||||
}
|
||||
}
|
||||
if (!has_next) break;
|
||||
it = next_it;
|
||||
}
|
||||
}
|
||||
|
@ -143,9 +143,7 @@ InMemoryStorage::InMemoryStorage(Config config)
|
||||
|
||||
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
|
||||
// TODO: move out of storage have one global gc_runner_
|
||||
gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
|
||||
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
|
||||
});
|
||||
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->FreeMemory({}, true); });
|
||||
}
|
||||
if (timestamp_ == kTimestampInitialId) {
|
||||
commit_log_.emplace();
|
||||
@ -1425,12 +1423,12 @@ void InMemoryStorage::SetStorageMode(StorageMode new_storage_mode) {
|
||||
}
|
||||
|
||||
storage_mode_ = new_storage_mode;
|
||||
FreeMemory(std::move(main_guard));
|
||||
FreeMemory(std::move(main_guard), false);
|
||||
}
|
||||
}
|
||||
|
||||
template <bool force>
|
||||
void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard) {
|
||||
template <bool aggressive = true>
|
||||
void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) {
|
||||
// NOTE: You do not need to consider cleanup of deleted object that occurred in
|
||||
// different storage modes within the same CollectGarbage call. This is because
|
||||
// SetStorageMode will ensure CollectGarbage is called before any new transactions
|
||||
@ -1440,13 +1438,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
// as reacquiring the lock would cause deadlock. Otherwise, we need to get our own
|
||||
// lock.
|
||||
if (!main_guard.owns_lock()) {
|
||||
if constexpr (force) {
|
||||
// We take the unique lock on the main storage lock, so we can forcefully clean
|
||||
// everything we can
|
||||
if (!main_lock_.try_lock()) {
|
||||
CollectGarbage<false>();
|
||||
if constexpr (aggressive) {
|
||||
// We tried to be aggressive but we do not already have main lock continue as not aggressive
|
||||
// Perf note: Do not try to get unique lock if it was not already passed in. GC maybe expensive,
|
||||
// do not assume it is fast, unique lock will blocks all new storage transactions.
|
||||
CollectGarbage<false>({}, periodic);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Because the garbage collector iterates through the indices and constraints
|
||||
// to clean them up, it must take the main lock for reading to make sure that
|
||||
@ -1458,17 +1455,24 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
}
|
||||
|
||||
utils::OnScopeExit lock_releaser{[&] {
|
||||
if (!main_guard.owns_lock()) {
|
||||
if constexpr (force) {
|
||||
main_lock_.unlock();
|
||||
if (main_guard.owns_lock()) {
|
||||
main_guard.unlock();
|
||||
} else {
|
||||
main_lock_.unlock_shared();
|
||||
}
|
||||
} else {
|
||||
main_guard.unlock();
|
||||
}
|
||||
}};
|
||||
|
||||
// Only one gc run at a time
|
||||
std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock);
|
||||
if (!gc_guard.owns_lock()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Diagnostic trace
|
||||
spdlog::trace("Storage GC on '{}' started [{}]", name(), periodic ? "periodic" : "forced");
|
||||
auto trace_on_exit = utils::OnScopeExit{
|
||||
[&] { spdlog::trace("Storage GC on '{}' finished [{}]", name(), periodic ? "periodic" : "forced"); }};
|
||||
|
||||
// Garbage collection must be performed in two phases. In the first phase,
|
||||
// deltas that won't be applied by any transaction anymore are unlinked from
|
||||
// the version chains. They cannot be deleted immediately, because there
|
||||
@ -1476,18 +1480,19 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
// chain traversal. They are instead marked for deletion and will be deleted
|
||||
// in the second GC phase in this GC iteration or some of the following
|
||||
// ones.
|
||||
std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock);
|
||||
if (!gc_guard.owns_lock()) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t oldest_active_start_timestamp = commit_log_->OldestActive();
|
||||
|
||||
{
|
||||
std::unique_lock<utils::SpinLock> guard(engine_lock_);
|
||||
uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have
|
||||
|
||||
// Deltas from previous GC runs or from aborts can be cleaned up here
|
||||
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
|
||||
if constexpr (force) {
|
||||
// if force is set to true we can simply delete all the leftover undos because
|
||||
// no transaction is active
|
||||
guard.unlock();
|
||||
if (aggressive or mark_timestamp == oldest_active_start_timestamp) {
|
||||
// We know no transaction is active, it is safe to simply delete all the garbage undos
|
||||
// Nothing can be reading them
|
||||
garbage_undo_buffers.clear();
|
||||
} else {
|
||||
// garbage_undo_buffers is ordered, pop until we can't
|
||||
@ -1497,6 +1502,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
|
||||
// list immediately, because we would have to repeatedly take
|
||||
@ -1694,7 +1700,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
std::unique_lock<utils::SpinLock> guard(engine_lock_);
|
||||
uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have
|
||||
|
||||
if (force or mark_timestamp == oldest_active_start_timestamp) {
|
||||
if (aggressive or mark_timestamp == oldest_active_start_timestamp) {
|
||||
guard.unlock();
|
||||
// if lucky, there are no active transactions, hence nothing looking at the deltas
|
||||
// remove them now
|
||||
unlinked_undo_buffers.clear();
|
||||
@ -1756,8 +1763,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
}
|
||||
|
||||
// tell the linker he can find the CollectGarbage definitions here
|
||||
template void InMemoryStorage::CollectGarbage<true>(std::unique_lock<utils::ResourceLock>);
|
||||
template void InMemoryStorage::CollectGarbage<false>(std::unique_lock<utils::ResourceLock>);
|
||||
template void InMemoryStorage::CollectGarbage<true>(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
|
||||
template void InMemoryStorage::CollectGarbage<false>(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
|
||||
|
||||
StorageInfo InMemoryStorage::GetBaseInfo() {
|
||||
StorageInfo info{};
|
||||
@ -2108,50 +2115,35 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
|
||||
|
||||
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(
|
||||
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
|
||||
if (replication_role == memgraph::replication_coordination_glue::ReplicationRole::REPLICA) {
|
||||
using memgraph::replication_coordination_glue::ReplicationRole;
|
||||
if (replication_role == ReplicationRole::REPLICA) {
|
||||
return InMemoryStorage::CreateSnapshotError::DisabledForReplica;
|
||||
}
|
||||
auto const &epoch = repl_storage_state_.epoch_;
|
||||
auto snapshot_creator = [this, &epoch]() {
|
||||
utils::Timer timer;
|
||||
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_,
|
||||
memgraph::replication_coordination_glue::ReplicationRole::MAIN);
|
||||
durability::CreateSnapshot(this, &transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_,
|
||||
&edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_);
|
||||
// Finalize snapshot transaction.
|
||||
commit_log_->MarkFinished(transaction.start_timestamp);
|
||||
|
||||
memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us,
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
|
||||
};
|
||||
|
||||
std::lock_guard snapshot_guard(snapshot_lock_);
|
||||
|
||||
auto should_try_shared{true};
|
||||
auto max_num_tries{10};
|
||||
while (max_num_tries) {
|
||||
if (should_try_shared) {
|
||||
std::shared_lock storage_guard(main_lock_);
|
||||
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
|
||||
snapshot_creator();
|
||||
return {};
|
||||
}
|
||||
auto accessor = std::invoke([&]() {
|
||||
if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) {
|
||||
// For analytical no other txn can be in play
|
||||
return UniqueAccess(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION);
|
||||
} else {
|
||||
std::unique_lock main_guard{main_lock_};
|
||||
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
|
||||
snapshot_creator();
|
||||
return Access(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION);
|
||||
}
|
||||
});
|
||||
|
||||
utils::Timer timer;
|
||||
Transaction *transaction = accessor->GetTransaction();
|
||||
auto const &epoch = repl_storage_state_.epoch_;
|
||||
durability::CreateSnapshot(this, transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_,
|
||||
&edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_);
|
||||
|
||||
memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us,
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
|
||||
return {};
|
||||
}
|
||||
}
|
||||
should_try_shared = !should_try_shared;
|
||||
max_num_tries--;
|
||||
}
|
||||
|
||||
return CreateSnapshotError::ReachedMaxNumTries;
|
||||
}
|
||||
|
||||
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) {
|
||||
CollectGarbage<true>(std::move(main_guard));
|
||||
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) {
|
||||
CollectGarbage(std::move(main_guard), periodic);
|
||||
|
||||
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
|
||||
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();
|
||||
|
@ -332,7 +332,7 @@ class InMemoryStorage final : public Storage {
|
||||
std::unique_ptr<Accessor> UniqueAccess(memgraph::replication_coordination_glue::ReplicationRole replication_role,
|
||||
std::optional<IsolationLevel> override_isolation_level) override;
|
||||
|
||||
void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) override;
|
||||
void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) override;
|
||||
|
||||
utils::FileRetainer::FileLockerAccessor::ret_type IsPathLocked();
|
||||
utils::FileRetainer::FileLockerAccessor::ret_type LockPath();
|
||||
@ -363,7 +363,7 @@ class InMemoryStorage final : public Storage {
|
||||
/// @throw std::system_error
|
||||
/// @throw std::bad_alloc
|
||||
template <bool force>
|
||||
void CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard = {});
|
||||
void CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
|
||||
|
||||
bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch);
|
||||
void FinalizeWalFile();
|
||||
|
@ -1051,6 +1051,14 @@ struct SpecificPropertyAndBufferInfo {
|
||||
uint64_t all_size;
|
||||
};
|
||||
|
||||
// Struct used to return info about the property position
|
||||
struct SpecificPropertyAndBufferInfoMinimal {
|
||||
uint64_t property_begin;
|
||||
uint64_t property_end;
|
||||
|
||||
auto property_size() const { return property_end - property_begin; }
|
||||
};
|
||||
|
||||
// Function used to find the position where the property should be in the data
|
||||
// buffer. It keeps the properties in the buffer sorted by `PropertyId` and
|
||||
// returns the positions in the buffer where the seeked property starts and
|
||||
@ -1083,6 +1091,27 @@ SpecificPropertyAndBufferInfo FindSpecificPropertyAndBufferInfo(Reader *reader,
|
||||
return {property_begin, property_end, property_end - property_begin, all_begin, all_end, all_end - all_begin};
|
||||
}
|
||||
|
||||
// Like FindSpecificPropertyAndBufferInfo, but will early exit. No need to find the "all" information
|
||||
SpecificPropertyAndBufferInfoMinimal FindSpecificPropertyAndBufferInfoMinimal(Reader *reader, PropertyId property) {
|
||||
uint64_t property_begin = reader->GetPosition();
|
||||
while (true) {
|
||||
switch (HasExpectedProperty(reader, property)) {
|
||||
case ExpectedPropertyStatus::MISSING_DATA:
|
||||
[[fallthrough]];
|
||||
case ExpectedPropertyStatus::GREATER: {
|
||||
return {0, 0};
|
||||
}
|
||||
case ExpectedPropertyStatus::EQUAL: {
|
||||
return {property_begin, reader->GetPosition()};
|
||||
}
|
||||
case ExpectedPropertyStatus::SMALLER: {
|
||||
property_begin = reader->GetPosition();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All data buffers will be allocated to a power of 8 size.
|
||||
uint64_t ToPowerOf8(uint64_t size) {
|
||||
uint64_t mod = size % 8;
|
||||
@ -1254,11 +1283,12 @@ bool PropertyStore::IsPropertyEqual(PropertyId property, const PropertyValue &va
|
||||
BufferInfo buffer_info = GetBufferInfo(buffer_);
|
||||
Reader reader(buffer_info.data, buffer_info.size);
|
||||
|
||||
auto info = FindSpecificPropertyAndBufferInfo(&reader, property);
|
||||
if (info.property_size == 0) return value.IsNull();
|
||||
Reader prop_reader(buffer_info.data + info.property_begin, info.property_size);
|
||||
auto info = FindSpecificPropertyAndBufferInfoMinimal(&reader, property);
|
||||
auto property_size = info.property_size();
|
||||
if (property_size == 0) return value.IsNull();
|
||||
Reader prop_reader(buffer_info.data + info.property_begin, property_size);
|
||||
if (!CompareExpectedProperty(&prop_reader, property, value)) return false;
|
||||
return prop_reader.GetPosition() == info.property_size;
|
||||
return prop_reader.GetPosition() == property_size;
|
||||
}
|
||||
|
||||
std::map<PropertyId, PropertyValue> PropertyStore::Properties() const {
|
||||
|
@ -284,6 +284,8 @@ class Storage {
|
||||
virtual UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label,
|
||||
const std::set<PropertyId> &properties) = 0;
|
||||
|
||||
auto GetTransaction() -> Transaction * { return std::addressof(transaction_); }
|
||||
|
||||
protected:
|
||||
Storage *storage_;
|
||||
std::shared_lock<utils::ResourceLock> storage_guard_;
|
||||
@ -336,9 +338,15 @@ class Storage {
|
||||
|
||||
StorageMode GetStorageMode() const noexcept;
|
||||
|
||||
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) = 0;
|
||||
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) = 0;
|
||||
|
||||
void FreeMemory() { FreeMemory({}); }
|
||||
void FreeMemory() {
|
||||
if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) {
|
||||
FreeMemory(std::unique_lock{main_lock_}, false);
|
||||
} else {
|
||||
FreeMemory({}, false);
|
||||
}
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<Accessor> Access(memgraph::replication_coordination_glue::ReplicationRole replication_role,
|
||||
std::optional<IsolationLevel> override_isolation_level) = 0;
|
||||
|
@ -66,6 +66,17 @@ struct ResourceLock {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
template <typename Rep, typename Period>
|
||||
bool try_lock_shared_for(std::chrono::duration<Rep, Period> const &time) {
|
||||
auto lock = std::unique_lock{mtx};
|
||||
// block until available
|
||||
if (!cv.wait_for(lock, time, [this] { return state != UNIQUE; })) return false;
|
||||
state = SHARED;
|
||||
++count;
|
||||
return true;
|
||||
}
|
||||
|
||||
void unlock() {
|
||||
auto lock = std::unique_lock{mtx};
|
||||
state = UNLOCKED;
|
||||
|
Loading…
Reference in New Issue
Block a user