From f807b495ab8be1ee5c3e082bab99ea8fcf301859 Mon Sep 17 00:00:00 2001 From: antonio2368 Date: Fri, 19 Feb 2021 11:00:10 +0100 Subject: [PATCH] Update commit log after recovery (#96) * Define additional commit log constructor which takes an oldest active id * Delay commit log construction until the recovery process is finished * Add test for commit log with initial id * Silence the macro redefinition warning --- CHANGELOG.md | 4 + src/storage/v2/CMakeLists.txt | 1 + src/storage/v2/commit_log.cpp | 100 +++++++++++++ src/storage/v2/commit_log.hpp | 80 ++--------- .../v2/replication/replication_server.cpp | 135 +++++++++--------- src/storage/v2/storage.cpp | 37 +++-- src/storage/v2/storage.hpp | 2 +- src/utils/logging.hpp | 1 + tests/drivers/node/v4_1/run.sh | 2 +- tests/unit/CMakeLists.txt | 2 +- tests/unit/commit_log_v2.cpp | 28 +++- 11 files changed, 234 insertions(+), 158 deletions(-) create mode 100644 src/storage/v2/commit_log.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 843a317a3..62c597954 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## Future TODO: Don't forget to add items on the fly. +### Bug Fixes + +* Fixed garbage collector by correctly marking the oldest current timestamp + after the database was recovered using the durability files. ## v1.3.0 diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 7189ebfb5..9787d771e 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -1,4 +1,5 @@ set(storage_v2_src_files + commit_log.cpp constraints.cpp durability/durability.cpp durability/serialization.cpp diff --git a/src/storage/v2/commit_log.cpp b/src/storage/v2/commit_log.cpp new file mode 100644 index 000000000..e8741f165 --- /dev/null +++ b/src/storage/v2/commit_log.cpp @@ -0,0 +1,100 @@ +#include "storage/v2/commit_log.hpp" +#include "utils/memory.hpp" + +namespace storage { +CommitLog::CommitLog() : allocator_(utils::NewDeleteResource()) {} + +CommitLog::CommitLog(uint64_t oldest_active) : allocator_(utils::NewDeleteResource()) { + head_ = allocator_.allocate(1); + allocator_.construct(head_); + head_start_ = oldest_active / kIdsInBlock * kIdsInBlock; + next_start_ = head_start_ + kIdsInBlock; + + // set all the previous ids + const auto field_idx = (oldest_active % kIdsInBlock) / kIdsInField; + for (size_t i = 0; i < field_idx; ++i) { + head_->field[i] = std::numeric_limits::max(); + } + + const auto idx_in_field = oldest_active % kIdsInField; + if (idx_in_field != 0) { + head_->field[field_idx] = std::numeric_limits::max(); + head_->field[field_idx] >>= kIdsInField - idx_in_field; + } + + oldest_active_ = oldest_active; +} + +CommitLog::~CommitLog() { + while (head_) { + Block *tmp = head_->next; + head_->~Block(); + allocator_.deallocate(head_, 1); + head_ = tmp; + } +} + +void CommitLog::MarkFinished(uint64_t id) { + std::lock_guard guard(lock_); + + Block *block = FindOrCreateBlock(id); + block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField); + if (id == oldest_active_) { + UpdateOldestActive(); + } +} + +uint64_t CommitLog::OldestActive() { + std::lock_guard guard(lock_); + return oldest_active_; +} + +void CommitLog::UpdateOldestActive() { + while (head_) { + // This is necessary for amortized constant complexity. If we always start + // from the 0th field, the amount of steps we make through each block is + // quadratic in kBlockSize. + uint64_t start_field = oldest_active_ >= head_start_ ? (oldest_active_ - head_start_) / kIdsInField : 0; + for (uint64_t i = start_field; i < kBlockSize; ++i) { + if (head_->field[i] != std::numeric_limits::max()) { + oldest_active_ = head_start_ + i * kIdsInField + __builtin_ffsl(~head_->field[i]) - 1; + return; + } + } + + // All IDs in this block are marked, we can delete it now. + Block *tmp = head_->next; + head_->~Block(); + allocator_.deallocate(head_, 1); + head_ = tmp; + head_start_ += kIdsInBlock; + } + + oldest_active_ = next_start_; +} + +CommitLog::Block *CommitLog::FindOrCreateBlock(const uint64_t id) { + if (!head_) { + head_ = allocator_.allocate(1); + allocator_.construct(head_); + head_start_ = next_start_; + next_start_ += kIdsInBlock; + } + + Block *current = head_; + uint64_t current_start = head_start_; + + while (id >= current_start + kIdsInBlock) { + if (!current->next) { + current->next = allocator_.allocate(1); + allocator_.construct(current->next); + next_start_ += kIdsInBlock; + } + + current = current->next; + current_start += kIdsInBlock; + } + + return current; +} +} // namespace storage diff --git a/src/storage/v2/commit_log.hpp b/src/storage/v2/commit_log.hpp index 1b80d7571..e820a7776 100644 --- a/src/storage/v2/commit_log.hpp +++ b/src/storage/v2/commit_log.hpp @@ -22,40 +22,25 @@ namespace storage { class CommitLog final { public: // TODO(mtomic): use pool allocator for blocks - CommitLog() - : head_(nullptr), head_start_(0), next_start_(0), oldest_active_(0), allocator_(utils::NewDeleteResource()) {} + CommitLog(); + /// Create a commit log which has the oldest active id set to + /// oldest_active + /// @param oldest_active the oldest active id + explicit CommitLog(uint64_t oldest_active); CommitLog(const CommitLog &) = delete; CommitLog &operator=(const CommitLog &) = delete; CommitLog(CommitLog &&) = delete; CommitLog &operator=(CommitLog &&) = delete; - ~CommitLog() { - while (head_) { - Block *tmp = head_->next; - head_->~Block(); - allocator_.deallocate(head_, 1); - head_ = tmp; - } - } + ~CommitLog(); /// Mark a transaction as finished. /// @throw std::bad_alloc - void MarkFinished(uint64_t id) { - std::lock_guard guard(lock_); - - Block *block = FindOrCreateBlock(id); - block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField); - if (id == oldest_active_) { - UpdateOldestActive(); - } - } + void MarkFinished(uint64_t id); /// Retrieve the oldest transaction still not marked as finished. - uint64_t OldestActive() { - std::lock_guard guard(lock_); - return oldest_active_; - } + uint64_t OldestActive(); private: static constexpr uint64_t kBlockSize = 8192; @@ -67,55 +52,10 @@ class CommitLog final { uint64_t field[kBlockSize]{}; }; - void UpdateOldestActive() { - while (head_) { - // This is necessary for amortized constant complexity. If we always start - // from the 0th field, the amount of steps we make through each block is - // quadratic in kBlockSize. - uint64_t start_field = oldest_active_ >= head_start_ ? (oldest_active_ - head_start_) / kIdsInField : 0; - for (uint64_t i = start_field; i < kBlockSize; ++i) { - if (head_->field[i] != std::numeric_limits::max()) { - oldest_active_ = head_start_ + i * kIdsInField + __builtin_ffsl(~head_->field[i]) - 1; - return; - } - } - - // All IDs in this block are marked, we can delete it now. - Block *tmp = head_->next; - head_->~Block(); - allocator_.deallocate(head_, 1); - head_ = tmp; - head_start_ += kIdsInBlock; - } - - oldest_active_ = next_start_; - } + void UpdateOldestActive(); /// @throw std::bad_alloc - Block *FindOrCreateBlock(uint64_t id) { - if (!head_) { - head_ = allocator_.allocate(1); - allocator_.construct(head_); - head_start_ = next_start_; - next_start_ += kIdsInBlock; - } - - Block *current = head_; - uint64_t current_start = head_start_; - - while (id >= current_start + kIdsInBlock) { - if (!current->next) { - current->next = allocator_.allocate(1); - allocator_.construct(current->next); - next_start_ += kIdsInBlock; - } - - current = current->next; - current_start += kIdsInBlock; - } - - return current; - } + Block *FindOrCreateBlock(uint64_t id); Block *head_{nullptr}; uint64_t head_start_{0}; diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 47d66f542..495b03d04 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -389,36 +389,38 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!"); spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path); - std::unique_lock storage_guard(storage_->main_lock_); - // Clear the database - storage_->vertices_.clear(); - storage_->edges_.clear(); + { + std::unique_lock storage_guard(storage_->main_lock_); + // Clear the database + storage_->vertices_.clear(); + storage_->edges_.clear(); - storage_->constraints_ = Constraints(); - storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items); - storage_->indices_.label_property_index = - LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items); - try { - spdlog::debug("Loading snapshot"); - auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, - &storage_->epoch_history_, &storage_->name_id_mapper_, - &storage_->edge_count_, storage_->config_.items); - spdlog::debug("Snapshot loaded successfully"); - // If this step is present it should always be the first step of - // the recovery so we use the UUID we read from snasphost - storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid); - storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id); - const auto &recovery_info = recovered_snapshot.recovery_info; - storage_->vertex_id_ = recovery_info.next_vertex_id; - storage_->edge_id_ = recovery_info.next_edge_id; - storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp); + storage_->constraints_ = Constraints(); + storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items); + storage_->indices_.label_property_index = + LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items); + try { + spdlog::debug("Loading snapshot"); + auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, + &storage_->epoch_history_, &storage_->name_id_mapper_, + &storage_->edge_count_, storage_->config_.items); + spdlog::debug("Snapshot loaded successfully"); + // If this step is present it should always be the first step of + // the recovery so we use the UUID we read from snasphost + storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid); + storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id); + const auto &recovery_info = recovered_snapshot.recovery_info; + storage_->vertex_id_ = recovery_info.next_vertex_id; + storage_->edge_id_ = recovery_info.next_edge_id; + storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp); + storage_->commit_log_.emplace(storage_->timestamp_); - durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_, - &storage_->constraints_, &storage_->vertices_); - } catch (const durability::RecoveryFailure &e) { - LOG_FATAL("Couldn't load the snapshot because of: {}", e.what()); + durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_, + &storage_->constraints_, &storage_->vertices_); + } catch (const durability::RecoveryFailure &e) { + LOG_FATAL("Couldn't load the snapshot because of: {}", e.what()); + } } - storage_guard.unlock(); SnapshotRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); @@ -452,29 +454,30 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B utils::EnsureDirOrDie(storage_->wal_directory_); - std::unique_lock storage_guard(storage_->main_lock_); - durability::RecoveredIndicesAndConstraints indices_constraints; - auto [wal_info, path] = LoadWal(&decoder, &indices_constraints); - if (wal_info.seq_num == 0) { - storage_->uuid_ = wal_info.uuid; - } - - // Check the seq number of the first wal file to see if it's the - // finalized form of the current wal on replica - if (storage_->wal_file_) { - if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) { - storage_->wal_file_->DeleteWal(); + { + std::unique_lock storage_guard(storage_->main_lock_); + durability::RecoveredIndicesAndConstraints indices_constraints; + auto [wal_info, path] = LoadWal(&decoder, &indices_constraints); + if (wal_info.seq_num == 0) { + storage_->uuid_ = wal_info.uuid; } - storage_->wal_file_.reset(); - } - for (auto i = 1; i < wal_file_number; ++i) { - LoadWal(&decoder, &indices_constraints); - } + // Check the seq number of the first wal file to see if it's the + // finalized form of the current wal on replica + if (storage_->wal_file_) { + if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) { + storage_->wal_file_->DeleteWal(); + } + storage_->wal_file_.reset(); + } - durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_, - &storage_->vertices_); - storage_guard.unlock(); + for (auto i = 1; i < wal_file_number; ++i) { + LoadWal(&decoder, &indices_constraints); + } + + durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_, + &storage_->vertices_); + } WalFilesRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); @@ -488,26 +491,27 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk: utils::EnsureDirOrDie(storage_->wal_directory_); - std::unique_lock storage_guard(storage_->main_lock_); - durability::RecoveredIndicesAndConstraints indices_constraints; - auto [wal_info, path] = LoadWal(&decoder, &indices_constraints); - if (wal_info.seq_num == 0) { - storage_->uuid_ = wal_info.uuid; - } + { + std::unique_lock storage_guard(storage_->main_lock_); + durability::RecoveredIndicesAndConstraints indices_constraints; + auto [wal_info, path] = LoadWal(&decoder, &indices_constraints); + if (wal_info.seq_num == 0) { + storage_->uuid_ = wal_info.uuid; + } - if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num && - storage_->wal_file_->Path() != path) { - // Delete the old wal file - storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path()); + if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num && + storage_->wal_file_->Path() != path) { + // Delete the old wal file + storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path()); + } + MG_ASSERT(storage_->config_.durability.snapshot_wal_mode == + Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL); + storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num, + wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas, + &storage_->file_retainer_); + durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_, + &storage_->vertices_); } - MG_ASSERT(storage_->config_.durability.snapshot_wal_mode == - Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL); - storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num, - wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas, - &storage_->file_retainer_); - durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_, - &storage_->vertices_); - storage_guard.unlock(); CurrentWalRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); @@ -532,6 +536,7 @@ std::pair Storage::ReplicationServer storage_->vertex_id_ = std::max(storage_->vertex_id_.load(), info.next_vertex_id); storage_->edge_id_ = std::max(storage_->edge_id_.load(), info.next_edge_id); storage_->timestamp_ = std::max(storage_->timestamp_, info.next_timestamp); + storage_->commit_log_.emplace(storage_->timestamp_); if (info.last_commit_timestamp) { storage_->last_commit_timestamp_ = *info.last_commit_timestamp; } diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 3a9542490..bae679f45 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -16,6 +16,7 @@ #include "storage/v2/indices.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/replication/config.hpp" +#include "storage/v2/transaction.hpp" #include "utils/file.hpp" #include "utils/logging.hpp" #include "utils/rw_lock.hpp" @@ -362,6 +363,12 @@ Storage::Storage(Config config) if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); } + + if (timestamp_ == kTimestampInitialId) { + commit_log_.emplace(); + } else { + commit_log_.emplace(timestamp_); + } } Storage::~Storage() { @@ -763,7 +770,7 @@ utils::BasicResult Storage::Accessor::Commit( if (transaction_.deltas.empty()) { // We don't have to update the commit timestamp here because no one reads // it. - storage_->commit_log_.MarkFinished(transaction_.start_timestamp); + storage_->commit_log_->MarkFinished(transaction_.start_timestamp); } else { // Validate that existence constraints are satisfied for all modified // vertices. @@ -863,14 +870,14 @@ utils::BasicResult Storage::Accessor::Commit( committed_transactions.emplace_back(std::move(transaction_)); }); - storage_->commit_log_.MarkFinished(start_timestamp); - storage_->commit_log_.MarkFinished(commit_timestamp); + storage_->commit_log_->MarkFinished(start_timestamp); + storage_->commit_log_->MarkFinished(commit_timestamp); } } if (unique_constraint_violation) { Abort(); - storage_->commit_log_.MarkFinished(commit_timestamp); + storage_->commit_log_->MarkFinished(commit_timestamp); return *unique_constraint_violation; } } @@ -1039,7 +1046,7 @@ void Storage::Accessor::Abort() { [&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); }); } - storage_->commit_log_.MarkFinished(transaction_.start_timestamp); + storage_->commit_log_->MarkFinished(transaction_.start_timestamp); is_transaction_active_ = false; } @@ -1068,7 +1075,7 @@ bool Storage::CreateIndex(LabelId label, const std::optional desired_c if (!indices_.label_index.CreateIndex(label, vertices_.access())) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1080,7 +1087,7 @@ bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optiona if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label, {property}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1092,7 +1099,7 @@ bool Storage::DropIndex(LabelId label, const std::optional desired_com if (!indices_.label_index.DropIndex(label)) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1106,7 +1113,7 @@ bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional< // `CreateIndex(LabelId label)`. const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label, {property}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1125,7 +1132,7 @@ utils::BasicResult Storage::CreateExistenceConstraint if (ret.HasError() || !ret.GetValue()) return ret; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label, {property}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1138,7 +1145,7 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property, if (!::storage::DropExistenceConstraint(&constraints_, label, property)) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1154,7 +1161,7 @@ utils::BasicResult Stora } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label, properties, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1170,7 +1177,7 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint( } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label, properties, commit_timestamp); - commit_log_.MarkFinished(commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); #ifdef MG_ENTERPRISE last_commit_timestamp_ = commit_timestamp; #endif @@ -1261,7 +1268,7 @@ void Storage::CollectGarbage() { return; } - uint64_t oldest_active_start_timestamp = commit_log_.OldestActive(); + uint64_t oldest_active_start_timestamp = commit_log_->OldestActive(); // We don't move undo buffers of unlinked transactions to garbage_undo_buffers // list immediately, because we would have to repeatedly take // garbage_undo_buffers lock. @@ -1712,7 +1719,7 @@ void Storage::CreateSnapshot() { &file_retainer_); // Finalize snapshot transaction. - commit_log_.MarkFinished(transaction.start_timestamp); + commit_log_->MarkFinished(transaction.start_timestamp); } bool Storage::LockPath() { diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 475f48dd0..c76c28c11 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -471,7 +471,7 @@ class Storage final { // transaction commited or aborted. We could probably combine this with // `timestamp_` in a sensible unit, something like TransactionClock or // whatever. - CommitLog commit_log_; + std::optional commit_log_; utils::Synchronized, utils::SpinLock> committed_transactions_; diff --git a/src/utils/logging.hpp b/src/utils/logging.hpp index a2bd09f25..152917563 100644 --- a/src/utils/logging.hpp +++ b/src/utils/logging.hpp @@ -1,5 +1,6 @@ #pragma once +#undef SPDLOG_ACTIVE_LEVEL #ifndef NDEBUG #define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE #else diff --git a/tests/drivers/node/v4_1/run.sh b/tests/drivers/node/v4_1/run.sh index e4178dcbe..66fdec4a6 100755 --- a/tests/drivers/node/v4_1/run.sh +++ b/tests/drivers/node/v4_1/run.sh @@ -10,7 +10,7 @@ fi if [ ! -d node_modules ]; then # Driver generated with: `npm install neo4j-driver` - npm install neo4j-driver + npm install --no-package-lock --no-save neo4j-driver @babel/runtime fi node docs_how_to_query.js diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index a078d7c00..407e4ec62 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -229,7 +229,7 @@ target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt) # Test mg-storage-v2 add_unit_test(commit_log_v2.cpp) -target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils) +target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2) add_unit_test(property_value_v2.cpp) target_link_libraries(${test_prefix}property_value_v2 mg-utils) diff --git a/tests/unit/commit_log_v2.cpp b/tests/unit/commit_log_v2.cpp index fbc66a3ce..8f9ff2a9b 100644 --- a/tests/unit/commit_log_v2.cpp +++ b/tests/unit/commit_log_v2.cpp @@ -2,6 +2,10 @@ #include "gtest/gtest.h" +namespace { +constexpr size_t ids_per_block = 8192 * 64; +} // namespace + TEST(CommitLog, Simple) { storage::CommitLog log; EXPECT_EQ(log.OldestActive(), 0); @@ -35,18 +39,32 @@ TEST(CommitLog, Fields) { TEST(CommitLog, Blocks) { storage::CommitLog log; - for (uint64_t i = 0; i < 8192 * 64; ++i) { + for (uint64_t i = 0; i < ids_per_block; ++i) { log.MarkFinished(i); EXPECT_EQ(log.OldestActive(), i + 1); } - for (uint64_t i = 8192 * 64 * 2; i < 8192 * 64 * 3; ++i) { + for (uint64_t i = ids_per_block * 2; i < ids_per_block * 3; ++i) { log.MarkFinished(i); - EXPECT_EQ(log.OldestActive(), 8192 * 64); + EXPECT_EQ(log.OldestActive(), ids_per_block); } - for (uint64_t i = 8192 * 64; i < 8192 * 64; ++i) { + for (uint64_t i = ids_per_block; i < ids_per_block; ++i) { log.MarkFinished(i); - EXPECT_EQ(log.OldestActive(), i < 8192 * 64 - 1 ? i + 1 : 8192 * 64 * 3); + EXPECT_EQ(log.OldestActive(), i < ids_per_block - 1 ? i + 1 : ids_per_block * 3); + } +} + +TEST(CommitLog, TrackAfterInitialId) { + const auto check_marking_ids = [](auto *log, auto current_oldest_active) { + ASSERT_EQ(log->OldestActive(), current_oldest_active); + log->MarkFinished(current_oldest_active); + ++current_oldest_active; + ASSERT_EQ(log->OldestActive(), current_oldest_active); + }; + + for (uint64_t i = 0; i < 2 * ids_per_block; ++i) { + storage::CommitLog log{i}; + check_marking_ids(&log, i); } }