Update commit log after recovery (#96)
* Define additional commit log constructor which takes an oldest active id * Delay commit log construction until the recovery process is finished * Add test for commit log with initial id * Silence the macro redefinition warning
This commit is contained in:
parent
3f3c55a4aa
commit
f807b495ab
@ -3,6 +3,10 @@
|
||||
## Future
|
||||
|
||||
TODO: Don't forget to add items on the fly.
|
||||
### Bug Fixes
|
||||
|
||||
* Fixed garbage collector by correctly marking the oldest current timestamp
|
||||
after the database was recovered using the durability files.
|
||||
|
||||
## v1.3.0
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
set(storage_v2_src_files
|
||||
commit_log.cpp
|
||||
constraints.cpp
|
||||
durability/durability.cpp
|
||||
durability/serialization.cpp
|
||||
|
100
src/storage/v2/commit_log.cpp
Normal file
100
src/storage/v2/commit_log.cpp
Normal file
@ -0,0 +1,100 @@
|
||||
#include "storage/v2/commit_log.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
|
||||
namespace storage {
|
||||
CommitLog::CommitLog() : allocator_(utils::NewDeleteResource()) {}
|
||||
|
||||
CommitLog::CommitLog(uint64_t oldest_active) : allocator_(utils::NewDeleteResource()) {
|
||||
head_ = allocator_.allocate(1);
|
||||
allocator_.construct(head_);
|
||||
head_start_ = oldest_active / kIdsInBlock * kIdsInBlock;
|
||||
next_start_ = head_start_ + kIdsInBlock;
|
||||
|
||||
// set all the previous ids
|
||||
const auto field_idx = (oldest_active % kIdsInBlock) / kIdsInField;
|
||||
for (size_t i = 0; i < field_idx; ++i) {
|
||||
head_->field[i] = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
const auto idx_in_field = oldest_active % kIdsInField;
|
||||
if (idx_in_field != 0) {
|
||||
head_->field[field_idx] = std::numeric_limits<uint64_t>::max();
|
||||
head_->field[field_idx] >>= kIdsInField - idx_in_field;
|
||||
}
|
||||
|
||||
oldest_active_ = oldest_active;
|
||||
}
|
||||
|
||||
CommitLog::~CommitLog() {
|
||||
while (head_) {
|
||||
Block *tmp = head_->next;
|
||||
head_->~Block();
|
||||
allocator_.deallocate(head_, 1);
|
||||
head_ = tmp;
|
||||
}
|
||||
}
|
||||
|
||||
void CommitLog::MarkFinished(uint64_t id) {
|
||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||
|
||||
Block *block = FindOrCreateBlock(id);
|
||||
block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField);
|
||||
if (id == oldest_active_) {
|
||||
UpdateOldestActive();
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t CommitLog::OldestActive() {
|
||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||
return oldest_active_;
|
||||
}
|
||||
|
||||
void CommitLog::UpdateOldestActive() {
|
||||
while (head_) {
|
||||
// This is necessary for amortized constant complexity. If we always start
|
||||
// from the 0th field, the amount of steps we make through each block is
|
||||
// quadratic in kBlockSize.
|
||||
uint64_t start_field = oldest_active_ >= head_start_ ? (oldest_active_ - head_start_) / kIdsInField : 0;
|
||||
for (uint64_t i = start_field; i < kBlockSize; ++i) {
|
||||
if (head_->field[i] != std::numeric_limits<uint64_t>::max()) {
|
||||
oldest_active_ = head_start_ + i * kIdsInField + __builtin_ffsl(~head_->field[i]) - 1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// All IDs in this block are marked, we can delete it now.
|
||||
Block *tmp = head_->next;
|
||||
head_->~Block();
|
||||
allocator_.deallocate(head_, 1);
|
||||
head_ = tmp;
|
||||
head_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
oldest_active_ = next_start_;
|
||||
}
|
||||
|
||||
CommitLog::Block *CommitLog::FindOrCreateBlock(const uint64_t id) {
|
||||
if (!head_) {
|
||||
head_ = allocator_.allocate(1);
|
||||
allocator_.construct(head_);
|
||||
head_start_ = next_start_;
|
||||
next_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
Block *current = head_;
|
||||
uint64_t current_start = head_start_;
|
||||
|
||||
while (id >= current_start + kIdsInBlock) {
|
||||
if (!current->next) {
|
||||
current->next = allocator_.allocate(1);
|
||||
allocator_.construct(current->next);
|
||||
next_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
current = current->next;
|
||||
current_start += kIdsInBlock;
|
||||
}
|
||||
|
||||
return current;
|
||||
}
|
||||
} // namespace storage
|
@ -22,40 +22,25 @@ namespace storage {
|
||||
class CommitLog final {
|
||||
public:
|
||||
// TODO(mtomic): use pool allocator for blocks
|
||||
CommitLog()
|
||||
: head_(nullptr), head_start_(0), next_start_(0), oldest_active_(0), allocator_(utils::NewDeleteResource()) {}
|
||||
CommitLog();
|
||||
/// Create a commit log which has the oldest active id set to
|
||||
/// oldest_active
|
||||
/// @param oldest_active the oldest active id
|
||||
explicit CommitLog(uint64_t oldest_active);
|
||||
|
||||
CommitLog(const CommitLog &) = delete;
|
||||
CommitLog &operator=(const CommitLog &) = delete;
|
||||
CommitLog(CommitLog &&) = delete;
|
||||
CommitLog &operator=(CommitLog &&) = delete;
|
||||
|
||||
~CommitLog() {
|
||||
while (head_) {
|
||||
Block *tmp = head_->next;
|
||||
head_->~Block();
|
||||
allocator_.deallocate(head_, 1);
|
||||
head_ = tmp;
|
||||
}
|
||||
}
|
||||
~CommitLog();
|
||||
|
||||
/// Mark a transaction as finished.
|
||||
/// @throw std::bad_alloc
|
||||
void MarkFinished(uint64_t id) {
|
||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||
|
||||
Block *block = FindOrCreateBlock(id);
|
||||
block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField);
|
||||
if (id == oldest_active_) {
|
||||
UpdateOldestActive();
|
||||
}
|
||||
}
|
||||
void MarkFinished(uint64_t id);
|
||||
|
||||
/// Retrieve the oldest transaction still not marked as finished.
|
||||
uint64_t OldestActive() {
|
||||
std::lock_guard<utils::SpinLock> guard(lock_);
|
||||
return oldest_active_;
|
||||
}
|
||||
uint64_t OldestActive();
|
||||
|
||||
private:
|
||||
static constexpr uint64_t kBlockSize = 8192;
|
||||
@ -67,55 +52,10 @@ class CommitLog final {
|
||||
uint64_t field[kBlockSize]{};
|
||||
};
|
||||
|
||||
void UpdateOldestActive() {
|
||||
while (head_) {
|
||||
// This is necessary for amortized constant complexity. If we always start
|
||||
// from the 0th field, the amount of steps we make through each block is
|
||||
// quadratic in kBlockSize.
|
||||
uint64_t start_field = oldest_active_ >= head_start_ ? (oldest_active_ - head_start_) / kIdsInField : 0;
|
||||
for (uint64_t i = start_field; i < kBlockSize; ++i) {
|
||||
if (head_->field[i] != std::numeric_limits<uint64_t>::max()) {
|
||||
oldest_active_ = head_start_ + i * kIdsInField + __builtin_ffsl(~head_->field[i]) - 1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// All IDs in this block are marked, we can delete it now.
|
||||
Block *tmp = head_->next;
|
||||
head_->~Block();
|
||||
allocator_.deallocate(head_, 1);
|
||||
head_ = tmp;
|
||||
head_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
oldest_active_ = next_start_;
|
||||
}
|
||||
void UpdateOldestActive();
|
||||
|
||||
/// @throw std::bad_alloc
|
||||
Block *FindOrCreateBlock(uint64_t id) {
|
||||
if (!head_) {
|
||||
head_ = allocator_.allocate(1);
|
||||
allocator_.construct(head_);
|
||||
head_start_ = next_start_;
|
||||
next_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
Block *current = head_;
|
||||
uint64_t current_start = head_start_;
|
||||
|
||||
while (id >= current_start + kIdsInBlock) {
|
||||
if (!current->next) {
|
||||
current->next = allocator_.allocate(1);
|
||||
allocator_.construct(current->next);
|
||||
next_start_ += kIdsInBlock;
|
||||
}
|
||||
|
||||
current = current->next;
|
||||
current_start += kIdsInBlock;
|
||||
}
|
||||
|
||||
return current;
|
||||
}
|
||||
Block *FindOrCreateBlock(uint64_t id);
|
||||
|
||||
Block *head_{nullptr};
|
||||
uint64_t head_start_{0};
|
||||
|
@ -389,36 +389,38 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
|
||||
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
|
||||
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
|
||||
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
// Clear the database
|
||||
storage_->vertices_.clear();
|
||||
storage_->edges_.clear();
|
||||
{
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
// Clear the database
|
||||
storage_->vertices_.clear();
|
||||
storage_->edges_.clear();
|
||||
|
||||
storage_->constraints_ = Constraints();
|
||||
storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
|
||||
storage_->indices_.label_property_index =
|
||||
LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
|
||||
try {
|
||||
spdlog::debug("Loading snapshot");
|
||||
auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
|
||||
&storage_->epoch_history_, &storage_->name_id_mapper_,
|
||||
&storage_->edge_count_, storage_->config_.items);
|
||||
spdlog::debug("Snapshot loaded successfully");
|
||||
// If this step is present it should always be the first step of
|
||||
// the recovery so we use the UUID we read from snasphost
|
||||
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
|
||||
storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
|
||||
const auto &recovery_info = recovered_snapshot.recovery_info;
|
||||
storage_->vertex_id_ = recovery_info.next_vertex_id;
|
||||
storage_->edge_id_ = recovery_info.next_edge_id;
|
||||
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
|
||||
storage_->constraints_ = Constraints();
|
||||
storage_->indices_.label_index = LabelIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
|
||||
storage_->indices_.label_property_index =
|
||||
LabelPropertyIndex(&storage_->indices_, &storage_->constraints_, storage_->config_.items);
|
||||
try {
|
||||
spdlog::debug("Loading snapshot");
|
||||
auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
|
||||
&storage_->epoch_history_, &storage_->name_id_mapper_,
|
||||
&storage_->edge_count_, storage_->config_.items);
|
||||
spdlog::debug("Snapshot loaded successfully");
|
||||
// If this step is present it should always be the first step of
|
||||
// the recovery so we use the UUID we read from snasphost
|
||||
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
|
||||
storage_->epoch_id_ = std::move(recovered_snapshot.snapshot_info.epoch_id);
|
||||
const auto &recovery_info = recovered_snapshot.recovery_info;
|
||||
storage_->vertex_id_ = recovery_info.next_vertex_id;
|
||||
storage_->edge_id_ = recovery_info.next_edge_id;
|
||||
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
|
||||
storage_->commit_log_.emplace(storage_->timestamp_);
|
||||
|
||||
durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
|
||||
&storage_->constraints_, &storage_->vertices_);
|
||||
} catch (const durability::RecoveryFailure &e) {
|
||||
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
|
||||
durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
|
||||
&storage_->constraints_, &storage_->vertices_);
|
||||
} catch (const durability::RecoveryFailure &e) {
|
||||
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
|
||||
}
|
||||
}
|
||||
storage_guard.unlock();
|
||||
|
||||
SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
@ -452,29 +454,30 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B
|
||||
|
||||
utils::EnsureDirOrDie(storage_->wal_directory_);
|
||||
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
durability::RecoveredIndicesAndConstraints indices_constraints;
|
||||
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
|
||||
if (wal_info.seq_num == 0) {
|
||||
storage_->uuid_ = wal_info.uuid;
|
||||
}
|
||||
|
||||
// Check the seq number of the first wal file to see if it's the
|
||||
// finalized form of the current wal on replica
|
||||
if (storage_->wal_file_) {
|
||||
if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) {
|
||||
storage_->wal_file_->DeleteWal();
|
||||
{
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
durability::RecoveredIndicesAndConstraints indices_constraints;
|
||||
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
|
||||
if (wal_info.seq_num == 0) {
|
||||
storage_->uuid_ = wal_info.uuid;
|
||||
}
|
||||
storage_->wal_file_.reset();
|
||||
}
|
||||
|
||||
for (auto i = 1; i < wal_file_number; ++i) {
|
||||
LoadWal(&decoder, &indices_constraints);
|
||||
}
|
||||
// Check the seq number of the first wal file to see if it's the
|
||||
// finalized form of the current wal on replica
|
||||
if (storage_->wal_file_) {
|
||||
if (storage_->wal_file_->SequenceNumber() == wal_info.seq_num && storage_->wal_file_->Path() != path) {
|
||||
storage_->wal_file_->DeleteWal();
|
||||
}
|
||||
storage_->wal_file_.reset();
|
||||
}
|
||||
|
||||
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
|
||||
&storage_->vertices_);
|
||||
storage_guard.unlock();
|
||||
for (auto i = 1; i < wal_file_number; ++i) {
|
||||
LoadWal(&decoder, &indices_constraints);
|
||||
}
|
||||
|
||||
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
|
||||
&storage_->vertices_);
|
||||
}
|
||||
|
||||
WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
@ -488,26 +491,27 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk:
|
||||
|
||||
utils::EnsureDirOrDie(storage_->wal_directory_);
|
||||
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
durability::RecoveredIndicesAndConstraints indices_constraints;
|
||||
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
|
||||
if (wal_info.seq_num == 0) {
|
||||
storage_->uuid_ = wal_info.uuid;
|
||||
}
|
||||
{
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
durability::RecoveredIndicesAndConstraints indices_constraints;
|
||||
auto [wal_info, path] = LoadWal(&decoder, &indices_constraints);
|
||||
if (wal_info.seq_num == 0) {
|
||||
storage_->uuid_ = wal_info.uuid;
|
||||
}
|
||||
|
||||
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num &&
|
||||
storage_->wal_file_->Path() != path) {
|
||||
// Delete the old wal file
|
||||
storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path());
|
||||
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == wal_info.seq_num &&
|
||||
storage_->wal_file_->Path() != path) {
|
||||
// Delete the old wal file
|
||||
storage_->file_retainer_.DeleteFile(storage_->wal_file_->Path());
|
||||
}
|
||||
MG_ASSERT(storage_->config_.durability.snapshot_wal_mode ==
|
||||
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL);
|
||||
storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num,
|
||||
wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas,
|
||||
&storage_->file_retainer_);
|
||||
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
|
||||
&storage_->vertices_);
|
||||
}
|
||||
MG_ASSERT(storage_->config_.durability.snapshot_wal_mode ==
|
||||
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL);
|
||||
storage_->wal_file_.emplace(std::move(path), storage_->config_.items, &storage_->name_id_mapper_, wal_info.seq_num,
|
||||
wal_info.from_timestamp, wal_info.to_timestamp, wal_info.num_deltas,
|
||||
&storage_->file_retainer_);
|
||||
durability::RecoverIndicesAndConstraints(indices_constraints, &storage_->indices_, &storage_->constraints_,
|
||||
&storage_->vertices_);
|
||||
storage_guard.unlock();
|
||||
|
||||
CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
@ -532,6 +536,7 @@ std::pair<durability::WalInfo, std::filesystem::path> Storage::ReplicationServer
|
||||
storage_->vertex_id_ = std::max(storage_->vertex_id_.load(), info.next_vertex_id);
|
||||
storage_->edge_id_ = std::max(storage_->edge_id_.load(), info.next_edge_id);
|
||||
storage_->timestamp_ = std::max(storage_->timestamp_, info.next_timestamp);
|
||||
storage_->commit_log_.emplace(storage_->timestamp_);
|
||||
if (info.last_commit_timestamp) {
|
||||
storage_->last_commit_timestamp_ = *info.last_commit_timestamp;
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "storage/v2/indices.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/replication/config.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
@ -362,6 +363,12 @@ Storage::Storage(Config config)
|
||||
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
|
||||
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); });
|
||||
}
|
||||
|
||||
if (timestamp_ == kTimestampInitialId) {
|
||||
commit_log_.emplace();
|
||||
} else {
|
||||
commit_log_.emplace(timestamp_);
|
||||
}
|
||||
}
|
||||
|
||||
Storage::~Storage() {
|
||||
@ -763,7 +770,7 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
||||
if (transaction_.deltas.empty()) {
|
||||
// We don't have to update the commit timestamp here because no one reads
|
||||
// it.
|
||||
storage_->commit_log_.MarkFinished(transaction_.start_timestamp);
|
||||
storage_->commit_log_->MarkFinished(transaction_.start_timestamp);
|
||||
} else {
|
||||
// Validate that existence constraints are satisfied for all modified
|
||||
// vertices.
|
||||
@ -863,14 +870,14 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
|
||||
committed_transactions.emplace_back(std::move(transaction_));
|
||||
});
|
||||
|
||||
storage_->commit_log_.MarkFinished(start_timestamp);
|
||||
storage_->commit_log_.MarkFinished(commit_timestamp);
|
||||
storage_->commit_log_->MarkFinished(start_timestamp);
|
||||
storage_->commit_log_->MarkFinished(commit_timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
if (unique_constraint_violation) {
|
||||
Abort();
|
||||
storage_->commit_log_.MarkFinished(commit_timestamp);
|
||||
storage_->commit_log_->MarkFinished(commit_timestamp);
|
||||
return *unique_constraint_violation;
|
||||
}
|
||||
}
|
||||
@ -1039,7 +1046,7 @@ void Storage::Accessor::Abort() {
|
||||
[&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); });
|
||||
}
|
||||
|
||||
storage_->commit_log_.MarkFinished(transaction_.start_timestamp);
|
||||
storage_->commit_log_->MarkFinished(transaction_.start_timestamp);
|
||||
is_transaction_active_ = false;
|
||||
}
|
||||
|
||||
@ -1068,7 +1075,7 @@ bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_c
|
||||
if (!indices_.label_index.CreateIndex(label, vertices_.access())) return false;
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1080,7 +1087,7 @@ bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optiona
|
||||
if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) return false;
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label, {property}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1092,7 +1099,7 @@ bool Storage::DropIndex(LabelId label, const std::optional<uint64_t> desired_com
|
||||
if (!indices_.label_index.DropIndex(label)) return false;
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1106,7 +1113,7 @@ bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional<
|
||||
// `CreateIndex(LabelId label)`.
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label, {property}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1125,7 +1132,7 @@ utils::BasicResult<ConstraintViolation, bool> Storage::CreateExistenceConstraint
|
||||
if (ret.HasError() || !ret.GetValue()) return ret;
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label, {property}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1138,7 +1145,7 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property,
|
||||
if (!::storage::DropExistenceConstraint(&constraints_, label, property)) return false;
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1154,7 +1161,7 @@ utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> Stora
|
||||
}
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label, properties, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1170,7 +1177,7 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
|
||||
}
|
||||
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
|
||||
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label, properties, commit_timestamp);
|
||||
commit_log_.MarkFinished(commit_timestamp);
|
||||
commit_log_->MarkFinished(commit_timestamp);
|
||||
#ifdef MG_ENTERPRISE
|
||||
last_commit_timestamp_ = commit_timestamp;
|
||||
#endif
|
||||
@ -1261,7 +1268,7 @@ void Storage::CollectGarbage() {
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t oldest_active_start_timestamp = commit_log_.OldestActive();
|
||||
uint64_t oldest_active_start_timestamp = commit_log_->OldestActive();
|
||||
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
|
||||
// list immediately, because we would have to repeatedly take
|
||||
// garbage_undo_buffers lock.
|
||||
@ -1712,7 +1719,7 @@ void Storage::CreateSnapshot() {
|
||||
&file_retainer_);
|
||||
|
||||
// Finalize snapshot transaction.
|
||||
commit_log_.MarkFinished(transaction.start_timestamp);
|
||||
commit_log_->MarkFinished(transaction.start_timestamp);
|
||||
}
|
||||
|
||||
bool Storage::LockPath() {
|
||||
|
@ -471,7 +471,7 @@ class Storage final {
|
||||
// transaction commited or aborted. We could probably combine this with
|
||||
// `timestamp_` in a sensible unit, something like TransactionClock or
|
||||
// whatever.
|
||||
CommitLog commit_log_;
|
||||
std::optional<CommitLog> commit_log_;
|
||||
|
||||
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#undef SPDLOG_ACTIVE_LEVEL
|
||||
#ifndef NDEBUG
|
||||
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
|
||||
#else
|
||||
|
@ -10,7 +10,7 @@ fi
|
||||
|
||||
if [ ! -d node_modules ]; then
|
||||
# Driver generated with: `npm install neo4j-driver`
|
||||
npm install neo4j-driver
|
||||
npm install --no-package-lock --no-save neo4j-driver @babel/runtime
|
||||
fi
|
||||
|
||||
node docs_how_to_query.js
|
||||
|
@ -229,7 +229,7 @@ target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt)
|
||||
# Test mg-storage-v2
|
||||
|
||||
add_unit_test(commit_log_v2.cpp)
|
||||
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils)
|
||||
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)
|
||||
|
||||
add_unit_test(property_value_v2.cpp)
|
||||
target_link_libraries(${test_prefix}property_value_v2 mg-utils)
|
||||
|
@ -2,6 +2,10 @@
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
namespace {
|
||||
constexpr size_t ids_per_block = 8192 * 64;
|
||||
} // namespace
|
||||
|
||||
TEST(CommitLog, Simple) {
|
||||
storage::CommitLog log;
|
||||
EXPECT_EQ(log.OldestActive(), 0);
|
||||
@ -35,18 +39,32 @@ TEST(CommitLog, Fields) {
|
||||
TEST(CommitLog, Blocks) {
|
||||
storage::CommitLog log;
|
||||
|
||||
for (uint64_t i = 0; i < 8192 * 64; ++i) {
|
||||
for (uint64_t i = 0; i < ids_per_block; ++i) {
|
||||
log.MarkFinished(i);
|
||||
EXPECT_EQ(log.OldestActive(), i + 1);
|
||||
}
|
||||
|
||||
for (uint64_t i = 8192 * 64 * 2; i < 8192 * 64 * 3; ++i) {
|
||||
for (uint64_t i = ids_per_block * 2; i < ids_per_block * 3; ++i) {
|
||||
log.MarkFinished(i);
|
||||
EXPECT_EQ(log.OldestActive(), 8192 * 64);
|
||||
EXPECT_EQ(log.OldestActive(), ids_per_block);
|
||||
}
|
||||
|
||||
for (uint64_t i = 8192 * 64; i < 8192 * 64; ++i) {
|
||||
for (uint64_t i = ids_per_block; i < ids_per_block; ++i) {
|
||||
log.MarkFinished(i);
|
||||
EXPECT_EQ(log.OldestActive(), i < 8192 * 64 - 1 ? i + 1 : 8192 * 64 * 3);
|
||||
EXPECT_EQ(log.OldestActive(), i < ids_per_block - 1 ? i + 1 : ids_per_block * 3);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CommitLog, TrackAfterInitialId) {
|
||||
const auto check_marking_ids = [](auto *log, auto current_oldest_active) {
|
||||
ASSERT_EQ(log->OldestActive(), current_oldest_active);
|
||||
log->MarkFinished(current_oldest_active);
|
||||
++current_oldest_active;
|
||||
ASSERT_EQ(log->OldestActive(), current_oldest_active);
|
||||
};
|
||||
|
||||
for (uint64_t i = 0; i < 2 * ids_per_block; ++i) {
|
||||
storage::CommitLog log{i};
|
||||
check_marking_ids(&log, i);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user