Improve storage GC (#1387)

This commit is contained in:
Gareth Andrew Lloyd 2023-10-24 22:41:21 +01:00 committed by GitHub
parent 0d9bd5554c
commit 5b91f85161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 283 additions and 197 deletions

View File

@ -10,6 +10,6 @@ add_library(mg-memory STATIC ${memory_src_files})
target_link_libraries(mg-memory mg-utils fmt)
if (ENABLE_JEMALLOC)
target_link_libraries(mg-memory Jemalloc::Jemalloc)
target_link_libraries(mg-memory Jemalloc::Jemalloc ${CMAKE_DL_LIBS})
target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=1)
endif()

View File

@ -122,7 +122,7 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); }
struct Delta {
enum class Action {
enum class Action : std::uint8_t {
/// Use for Vertex and Edge
/// Used for disk storage for modifying MVCC logic and storing old key. Storing old key is necessary for
/// deleting old-data (compaction).

View File

@ -271,7 +271,6 @@ DiskStorage::DiskAccessor::~DiskAccessor() {
Abort();
}
FinalizeTransaction();
transaction_.deltas.~Bond<PmrListDelta>();
}
void DiskStorage::LoadPersistingMetadataInfo() {
@ -462,12 +461,12 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
AllVerticesIterable(disk_storage->edge_import_mode_cache_->AccessToVertices(), storage_, &transaction_, view));
}
if (transaction_.scanned_all_vertices_) {
return VerticesIterable(AllVerticesIterable(transaction_.vertices_.access(), storage_, &transaction_, view));
return VerticesIterable(AllVerticesIterable(transaction_.vertices_->access(), storage_, &transaction_, view));
}
disk_storage->LoadVerticesToMainMemoryCache(&transaction_);
transaction_.scanned_all_vertices_ = true;
return VerticesIterable(AllVerticesIterable(transaction_.vertices_.access(), storage_, &transaction_, view));
return VerticesIterable(AllVerticesIterable(transaction_.vertices_->access(), storage_, &transaction_, view));
}
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) {
@ -586,7 +585,7 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p
std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelIndexCache(
Transaction *transaction, LabelId label, View view, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices) {
auto main_cache_acc = transaction->vertices_.access();
auto main_cache_acc = transaction->vertices_->access();
std::unordered_set<Gid> gids;
gids.reserve(main_cache_acc.size());
@ -638,7 +637,7 @@ void DiskStorage::LoadVerticesFromDiskLabelIndex(Transaction *transaction, Label
std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
Transaction *transaction, LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter) {
auto main_cache_acc = transaction->vertices_.access();
auto main_cache_acc = transaction->vertices_->access();
std::unordered_set<storage::Gid> gids;
gids.reserve(main_cache_acc.size());
@ -721,7 +720,7 @@ std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelProperty
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices) {
auto main_cache_acc = transaction->vertices_.access();
auto main_cache_acc = transaction->vertices_->access();
std::unordered_set<storage::Gid> gids;
gids.reserve(main_cache_acc.size());
@ -852,7 +851,7 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertex() {
OOMExceptionEnabler oom_exception;
auto *disk_storage = static_cast<DiskStorage *>(storage_);
auto gid = disk_storage->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = transaction_.vertices_.access();
auto acc = transaction_.vertices_->access();
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
@ -924,8 +923,8 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE;
if (storage_->config_.items.properties_on_edges) {
auto acc =
edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges() : transaction_.edges_.access();
auto acc = edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges()
: transaction_.edges_->access();
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(inserted, "The edge must be inserted here!");
@ -1282,7 +1281,7 @@ std::optional<storage::VertexAccessor> DiskStorage::LoadVertexToMainMemoryCache(
const std::string &key,
const std::string &value,
std::string &&ts) {
auto main_storage_accessor = transaction->vertices_.access();
auto main_storage_accessor = transaction->vertices_->access();
storage::Gid gid = Gid::FromString(utils::ExtractGidFromKey(key));
if (ObjectExistsInCache(main_storage_accessor, gid)) {
@ -1309,7 +1308,7 @@ VertexAccessor DiskStorage::CreateVertexFromDisk(Transaction *transaction, utils
std::optional<VertexAccessor> DiskStorage::FindVertex(storage::Gid gid, Transaction *transaction, View view) {
auto acc = edge_import_status_ == EdgeImportMode::ACTIVE ? edge_import_mode_cache_->AccessToVertices()
: transaction->vertices_.access();
: transaction->vertices_->access();
auto vertex_it = acc.find(gid);
if (vertex_it != acc.end()) {
return VertexAccessor::Create(&*vertex_it, this, transaction, view);
@ -1358,7 +1357,7 @@ std::optional<EdgeAccessor> DiskStorage::CreateEdgeFromDisk(const VertexAccessor
EdgeRef edge(gid);
if (config_.items.properties_on_edges) {
auto acc = edge_import_mode_active ? edge_import_mode_cache_->AccessToEdges() : transaction->edges_.access();
auto acc = edge_import_mode_active ? edge_import_mode_cache_->AccessToEdges() : transaction->edges_->access();
auto *delta = CreateDeleteDeserializedObjectDelta(transaction, old_disk_key, std::move(read_ts));
auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
@ -1660,7 +1659,7 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
} else {
std::vector<std::vector<PropertyValue>> unique_storage;
if (auto vertices_flush_res =
disk_storage->FlushVertices(&transaction_, transaction_.vertices_.access(), unique_storage);
disk_storage->FlushVertices(&transaction_, transaction_.vertices_->access(), unique_storage);
vertices_flush_res.HasError()) {
Abort();
return vertices_flush_res.GetError();
@ -1671,7 +1670,7 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
return del_vertices_res.GetError();
}
if (auto modified_edges_res = disk_storage->FlushModifiedEdges(&transaction_, transaction_.edges_.access());
if (auto modified_edges_res = disk_storage->FlushModifiedEdges(&transaction_, transaction_.edges_->access());
modified_edges_res.HasError()) {
Abort();
return modified_edges_res.GetError();

View File

@ -184,9 +184,7 @@ InMemoryStorage::~InMemoryStorage() {
}
}
}
if (!committed_transactions_->empty()) {
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
}
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
}
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level,
@ -198,6 +196,8 @@ InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) no
InMemoryStorage::InMemoryAccessor::~InMemoryAccessor() {
if (is_transaction_active_) {
Abort();
// We didn't actually commit
commit_timestamp_.reset();
}
FinalizeTransaction();
@ -743,25 +743,18 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_);
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
// timestamp in the list.
mem_storage->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) {
// TODO: release lock, and update all deltas to have a local copy
// of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_);
}
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
engine_guard.unlock();
});
could_replicate_all_sync_replicas =
mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard
// Replica can only update the last commit timestamp with
// the commits received from main.
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); // protected by engine_guard
}
// Release engine lock because we don't have to hold it anymore
engine_guard.unlock();
mem_storage->commit_log_->MarkFinished(start_timestamp);
}
@ -783,6 +776,7 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
auto validation_result = storage_->constraints_.existence_constraints_->Validate(*prev.vertex);
if (validation_result) {
Abort();
DMG_ASSERT(!commit_timestamp_.has_value());
return StorageManipulationError{*validation_result};
}
}
@ -842,27 +836,22 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
// so the Wal files are consistent
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas =
mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_);
mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_); // protected by engine_guard
}
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
// timestamp in the list.
mem_storage->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) {
// TODO: release lock, and update all deltas to have a local copy
// of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_);
}
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
engine_guard.unlock();
});
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(*commit_timestamp_,
std::memory_order_release); // protected by engine_guard
// Replica can only update the last commit timestamp with
// the commits received from main.
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->repl_storage_state_.last_commit_timestamp_.store(
*commit_timestamp_); // protected by engine_guard
}
// Release engine lock because we don't have to hold it anymore
engine_guard.unlock();
mem_storage->commit_log_->MarkFinished(start_timestamp);
}
@ -870,6 +859,8 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
if (unique_constraint_violation) {
Abort();
DMG_ASSERT(commit_timestamp_.has_value());
commit_timestamp_.reset(); // We have aborted, hence we have not committed
return StorageManipulationError{*unique_constraint_violation};
}
}
@ -1041,7 +1032,8 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
// emplace back could take a long time.
engine_guard.unlock();
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas));
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas),
std::move(transaction_.commit_timestamp));
});
mem_storage->deleted_vertices_.WithLock(
[&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); });
@ -1057,8 +1049,15 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() {
if (commit_timestamp_) {
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
mem_storage->commit_log_->MarkFinished(*commit_timestamp_);
mem_storage->committed_transactions_.WithLock(
[&](auto &committed_transactions) { committed_transactions.emplace_back(std::move(transaction_)); });
if (!transaction_.deltas.use().empty()) {
// Only hand over delta to be GC'ed if there was any deltas
mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) {
// using mark of 0 as GC will assign a mark_timestamp after unlinking
committed_transactions.emplace_back(0, std::move(transaction_.deltas),
std::move(transaction_.commit_timestamp));
});
}
commit_timestamp_.reset();
}
}
@ -1288,10 +1287,26 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
uint64_t oldest_active_start_timestamp = commit_log_->OldestActive();
// Deltas from previous GC runs or from aborts can be cleaned up here
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
if constexpr (force) {
// if force is set to true we can simply delete all the leftover undos because
// no transaction is active
garbage_undo_buffers.clear();
} else {
// garbage_undo_buffers is ordered, pop until we can't
while (!garbage_undo_buffers.empty() &&
garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) {
garbage_undo_buffers.pop_front();
}
}
});
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
// list immediately, because we would have to repeatedly take
// garbage_undo_buffers lock.
std::list<std::pair<uint64_t, BondPmrLd>> unlinked_undo_buffers;
std::list<GCDeltas> unlinked_undo_buffers{};
// We will only free vertices deleted up until now in this GC cycle, and we
// will do it after cleaning-up the indices. That way we are sure that all
@ -1304,28 +1319,27 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false);
auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false);
// Short lock, to move to local variable. Hence allows other transactions to commit.
auto linked_undo_buffers = std::list<GCDeltas>{};
committed_transactions_.WithLock(
[&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); });
// Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything.
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty() ||
need_full_scan_vertices || need_full_scan_edges;
bool run_index_cleanup = !linked_undo_buffers.empty() || !garbage_undo_buffers_->empty() || need_full_scan_vertices ||
need_full_scan_edges;
while (true) {
// We don't want to hold the lock on committed transactions for too long,
// because that prevents other transactions from committing.
Transaction *transaction = nullptr;
{
auto committed_transactions_ptr = committed_transactions_.Lock();
if (committed_transactions_ptr->empty()) {
break;
}
transaction = &committed_transactions_ptr->front();
}
auto const end_linked_undo_buffers = linked_undo_buffers.end();
for (auto linked_entry = linked_undo_buffers.begin(); linked_entry != end_linked_undo_buffers;) {
auto const *const commit_timestamp_ptr = linked_entry->commit_timestamp_.get();
auto const commit_timestamp = commit_timestamp_ptr->load(std::memory_order_acquire);
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire);
// only process those that are no longer active
if (commit_timestamp >= oldest_active_start_timestamp) {
break;
++linked_entry; // can not process, skip
continue; // must continue to next transaction, because committed_transactions_ was not ordered
}
// When unlinking a delta which is the first delta in its version chain,
@ -1359,7 +1373,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// chain in a broken state.
// The chain can be only read without taking any locks.
for (Delta &delta : transaction->deltas.use()) {
for (Delta &delta : linked_entry->deltas_.use()) {
while (true) {
auto prev = delta.prev.Get();
switch (prev.type) {
@ -1373,6 +1387,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
vertex->delta = nullptr;
if (vertex->deleted) {
DMG_ASSERT(delta.action == memgraph::storage::Delta::Action::RECREATE_OBJECT);
current_deleted_vertices.push_back(vertex->gid);
}
break;
@ -1387,37 +1402,56 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
edge->delta = nullptr;
if (edge->deleted) {
DMG_ASSERT(delta.action == memgraph::storage::Delta::Action::RECREATE_OBJECT);
current_deleted_edges.push_back(edge->gid);
}
break;
}
case PreviousPtr::Type::DELTA: {
if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) {
// kTransactionInitialId
// │
// ▼
// ┌───────────────────┬─────────────┐
// │ Committed │ Uncommitted │
// ├──────────┬────────┴─────────────┤
// │ Inactive │ Active │
// └──────────┴──────────────────────┘
// ▲
// │
// oldest_active_start_timestamp
if (prev.delta->timestamp == commit_timestamp_ptr) {
// The delta that is newer than this one is also a delta from this
// transaction. We skip the current delta and will remove it as a
// part of the suffix later.
break;
}
std::unique_lock<utils::RWSpinLock> guard;
{
// We need to find the parent object in order to be able to use
// its lock.
auto parent = prev;
while (parent.type == PreviousPtr::Type::DELTA) {
parent = parent.delta->prev.Get();
}
if (prev.delta->timestamp->load() < oldest_active_start_timestamp) {
// If previous is from another inactive transaction, no need to
// lock the edge/vertex, nothing will read this far or relink to
// us directly
break;
}
// Previous is either active (committed or uncommitted), we need to find
// the parent object in order to be able to use its lock.
auto parent = prev;
while (parent.type == PreviousPtr::Type::DELTA) {
parent = parent.delta->prev.Get();
}
auto const guard = std::invoke([&] {
switch (parent.type) {
case PreviousPtr::Type::VERTEX:
guard = std::unique_lock{parent.vertex->lock};
break;
return std::unique_lock{parent.vertex->lock};
case PreviousPtr::Type::EDGE:
guard = std::unique_lock{parent.edge->lock};
break;
return std::unique_lock{parent.edge->lock};
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
LOG_FATAL("Invalid database state!");
}
}
});
if (delta.prev.Get() != prev) {
// Something changed, we could now be the first delta in the
// chain.
@ -1435,9 +1469,16 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
}
committed_transactions_.WithLock([&](auto &committed_transactions) {
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
committed_transactions.pop_front();
// Now unlinked, move to unlinked_undo_buffers
auto const to_move = linked_entry;
++linked_entry; // advanced to next before we move the list node
unlinked_undo_buffers.splice(unlinked_undo_buffers.end(), linked_undo_buffers, to_move);
}
if (!linked_undo_buffers.empty()) {
// some were not able to be collected, add them back to committed_transactions_ for the next GC run
committed_transactions_.WithLock([&linked_undo_buffers](auto &committed_transactions) {
committed_transactions.splice(committed_transactions.begin(), std::move(linked_undo_buffers));
});
}
@ -1455,60 +1496,33 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
{
std::unique_lock<utils::SpinLock> guard(engine_lock_);
uint64_t mark_timestamp = timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// this could take a long time.
guard.unlock();
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
// transactions from aborting until we're done marking, maybe we should
// add them one-by-one or something
for (auto &[timestamp, transaction_deltas] : unlinked_undo_buffers) {
timestamp = mark_timestamp;
}
garbage_undo_buffers.splice(garbage_undo_buffers.end(), std::move(unlinked_undo_buffers));
});
for (auto vertex : current_deleted_vertices) {
garbage_vertices_.emplace_back(mark_timestamp, vertex);
uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have
if (force or mark_timestamp == oldest_active_start_timestamp) {
// if lucky, there are no active transactions, hence nothing looking at the deltas
// remove them now
unlinked_undo_buffers.clear();
} else {
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// this could take a long time.
guard.unlock();
// correct the markers, and defer until next GC run
for (auto &unlinked_undo_buffer : unlinked_undo_buffers) {
unlinked_undo_buffer.mark_timestamp_ = mark_timestamp;
}
// ensure insert at end to preserve the order
garbage_undo_buffers.splice(garbage_undo_buffers.end(), std::move(unlinked_undo_buffers));
});
}
}
garbage_undo_buffers_.WithLock([&](auto &undo_buffers) {
// if force is set to true we can simply delete all the leftover undos because
// no transaction is active
if constexpr (force) {
for (auto &[timestamp, transaction_deltas] : undo_buffers) {
transaction_deltas.~Bond<PmrListDelta>();
}
undo_buffers.clear();
} else {
while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) {
auto &[timestamp, transaction_deltas] = undo_buffers.front();
transaction_deltas.~Bond<PmrListDelta>();
// this will trigger destory of object
// but since we release pointer, it will just destory other stuff
undo_buffers.pop_front();
}
}
});
{
auto vertex_acc = vertices_.access();
if constexpr (force) {
// if force is set to true, then we have unique_lock and no transactions are active
// so we can clean all of the deleted vertices
while (!garbage_vertices_.empty()) {
MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!");
garbage_vertices_.pop_front();
}
} else {
while (!garbage_vertices_.empty() && garbage_vertices_.front().first < oldest_active_start_timestamp) {
MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!");
garbage_vertices_.pop_front();
}
for (auto vertex : current_deleted_vertices) {
MG_ASSERT(vertex_acc.remove(vertex), "Invalid database state!");
}
}
{

View File

@ -411,22 +411,32 @@ class InMemoryStorage final : public Storage {
// whatever.
std::optional<CommitLog> commit_log_;
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
utils::Scheduler gc_runner_;
std::mutex gc_lock_;
using BondPmrLd = Bond<utils::pmr::list<Delta>>;
// Ownership of unlinked deltas is transfered to garabage_undo_buffers once transaction is commited
utils::Synchronized<std::list<std::pair<uint64_t, BondPmrLd>>, utils::SpinLock> garbage_undo_buffers_;
struct GCDeltas {
GCDeltas(uint64_t mark_timestamp, BondPmrLd deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp)
: mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {}
GCDeltas(GCDeltas &&) = default;
GCDeltas &operator=(GCDeltas &&) = default;
uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has
BondPmrLd deltas_; //!< the deltas that need cleaning
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp_{}; //!< the timestamp the deltas are pointing at
};
// Ownership of linked deltas is transferred to committed_transactions_ once transaction is commited
utils::Synchronized<std::list<GCDeltas>, utils::SpinLock> committed_transactions_{};
// Ownership of unlinked deltas is transferred to garabage_undo_buffers once transaction is commited/aborted
utils::Synchronized<std::list<GCDeltas>, utils::SpinLock> garbage_undo_buffers_{};
// Vertices that are logically deleted but still have to be removed from
// indices before removing them from the main storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_vertices_;
// Vertices that are logically deleted and removed from indices and now wait
// to be removed from the main storage.
std::list<std::pair<uint64_t, Gid>> garbage_vertices_;
// Edges that are logically deleted and wait to be removed from the main
// storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;

View File

@ -46,31 +46,26 @@ struct Transaction {
: transaction_id(transaction_id),
start_timestamp(start_timestamp),
command_id(0),
deltas(1024UL),
deltas(0),
md_deltas(utils::NewDeleteResource()),
must_abort(false),
isolation_level(isolation_level),
storage_mode(storage_mode),
edge_import_mode_active(edge_import_mode_active) {}
edge_import_mode_active(edge_import_mode_active),
vertices_{(storage_mode == StorageMode::ON_DISK_TRANSACTIONAL)
? std::optional<utils::SkipList<Vertex>>{std::in_place}
: std::nullopt},
edges_{(storage_mode == StorageMode::ON_DISK_TRANSACTIONAL)
? std::optional<utils::SkipList<Edge>>{std::in_place}
: std::nullopt} {}
Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id),
start_timestamp(other.start_timestamp),
commit_timestamp(std::move(other.commit_timestamp)),
command_id(other.command_id),
deltas(std::move(other.deltas)),
md_deltas(std::move(other.md_deltas)),
must_abort(other.must_abort),
isolation_level(other.isolation_level),
storage_mode(other.storage_mode),
edge_import_mode_active(other.edge_import_mode_active),
manyDeltasCache{std::move(other.manyDeltasCache)} {}
Transaction(Transaction &&other) noexcept = default;
Transaction(const Transaction &) = delete;
Transaction &operator=(const Transaction &) = delete;
Transaction &operator=(Transaction &&other) = delete;
Transaction &operator=(Transaction &&other) = default;
~Transaction() {}
~Transaction() = default;
bool IsDiskStorage() const { return storage_mode == StorageMode::ON_DISK_TRANSACTIONAL; }
@ -86,41 +81,41 @@ struct Transaction {
bool RemoveModifiedEdge(const Gid &gid) { return modified_edges_.erase(gid) > 0U; }
uint64_t transaction_id;
uint64_t start_timestamp;
uint64_t transaction_id{};
uint64_t start_timestamp{};
// The `Transaction` object is stack allocated, but the `commit_timestamp`
// must be heap allocated because `Delta`s have a pointer to it, and that
// pointer must stay valid after the `Transaction` is moved into
// `commited_transactions_` list for GC.
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp;
uint64_t command_id;
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp{};
uint64_t command_id{};
Bond<PmrListDelta> deltas;
utils::pmr::list<MetadataDelta> md_deltas;
bool must_abort;
IsolationLevel isolation_level;
StorageMode storage_mode;
bool must_abort{};
IsolationLevel isolation_level{};
StorageMode storage_mode{};
bool edge_import_mode_active{false};
// A cache which is consistent to the current transaction_id + command_id.
// Used to speedup getting info about a vertex when there is a long delta
// chain involved in rebuilding that info.
mutable VertexInfoCache manyDeltasCache;
mutable VertexInfoCache manyDeltasCache{};
// Store modified edges GID mapped to changed Delta and serialized edge key
// Only for disk storage
ModifiedEdgesMap modified_edges_;
rocksdb::Transaction *disk_transaction_;
ModifiedEdgesMap modified_edges_{};
rocksdb::Transaction *disk_transaction_{};
/// Main storage
utils::SkipList<Vertex> vertices_;
std::vector<std::unique_ptr<utils::SkipList<Vertex>>> index_storage_;
std::optional<utils::SkipList<Vertex>> vertices_{};
std::vector<std::unique_ptr<utils::SkipList<Vertex>>> index_storage_{};
/// We need them because query context for indexed reading is cleared after the query is done not after the
/// transaction is done
std::vector<std::list<Delta>> index_deltas_storage_;
utils::SkipList<Edge> edges_;
std::map<std::string, std::pair<std::string, std::string>> edges_to_delete_;
std::map<std::string, std::string> vertices_to_delete_;
std::vector<std::list<Delta>> index_deltas_storage_{};
std::optional<utils::SkipList<Edge>> edges_{};
std::map<std::string, std::pair<std::string, std::string>> edges_to_delete_{};
std::map<std::string, std::string> vertices_to_delete_{};
bool scanned_all_vertices_ = false;
};

View File

@ -51,8 +51,6 @@ void Store(Value &&value, VertexInfoCache &caches, Func &&getCache, View view, K
cache.emplace(key_type{std::forward<Keys>(keys)...}, std::forward<Value>(value));
}
VertexInfoCache::VertexInfoCache() = default;
VertexInfoCache::~VertexInfoCache() = default;
VertexInfoCache::VertexInfoCache(VertexInfoCache &&) noexcept = default;
VertexInfoCache &VertexInfoCache::operator=(VertexInfoCache &&) noexcept = default;

View File

@ -45,8 +45,8 @@ class PropertyValue;
* - only for View::OLD
*/
struct VertexInfoCache final {
VertexInfoCache();
~VertexInfoCache();
VertexInfoCache() = default;
~VertexInfoCache() = default;
// By design would be a mistake to copy the cache
VertexInfoCache(VertexInfoCache const &) = delete;

View File

@ -26,15 +26,20 @@ struct Bond {
: res_(std::make_unique<resource>(initial_size)),
container_(memgraph::utils::Allocator<Container>(res_.get()).template new_object<Container>()){};
Bond(Bond &&other) noexcept : res_(std::exchange(other.res_, nullptr)), container_(other.container_) {
other.container_ = nullptr;
}
Bond(Bond &&other) noexcept
: res_(std::exchange(other.res_, nullptr)), container_(std::exchange(other.container_, nullptr)) {}
Bond(const Bond &other) = delete;
Bond &operator=(const Bond &other) = delete;
Bond &operator=(Bond &&other) = delete;
Bond &operator=(Bond &&other) {
if (this != &other) {
res_ = std::exchange(other.res_, nullptr);
container_ = std::exchange(other.container_, nullptr);
}
return *this;
};
auto use() -> Container & { return *container_; }

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -62,7 +62,7 @@ class Scheduler {
auto now = std::chrono::system_clock::now();
start_time += pause;
if (start_time > now) {
condition_variable_.wait_for(lk, start_time - now, [&] { return is_working_.load() == false; });
condition_variable_.wait_until(lk, start_time, [&] { return is_working_.load() == false; });
} else {
start_time = now;
}
@ -80,10 +80,7 @@ class Scheduler {
*/
void Stop() {
is_working_.store(false);
{
std::unique_lock<std::mutex> lk(mutex_);
condition_variable_.notify_one();
}
condition_variable_.notify_one();
if (thread_.joinable()) thread_.join();
}

View File

@ -13,7 +13,7 @@ function(add_benchmark test_cpp)
# used to help create two targets of the same name even though CMake
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} benchmark gflags)
target_link_libraries(${target_name} benchmark gflags mg-memory)
# register test
add_test(${target_name} ${exec_name})
add_dependencies(memgraph__benchmark ${target_name})
@ -60,5 +60,8 @@ target_link_libraries(${test_prefix}expansion mg-query mg-communication mg-licen
add_benchmark(storage_v2_gc.cpp)
target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2)
add_benchmark(storage_v2_gc2.cpp)
target_link_libraries(${test_prefix}storage_v2_gc2 mg-storage-v2)
add_benchmark(storage_v2_property_store.cpp)
target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2)

View File

@ -0,0 +1,65 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <iostream>
#include <gflags/gflags.h>
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/storage.hpp"
#include "utils/timer.hpp"
// This benchmark should be run for a fixed amount of time that is
// large compared to GC interval to make the output relevant.
DEFINE_int32(num_poperties, 10'000, "number of set property per transaction");
DEFINE_int32(num_iterations, 5'000, "number of iterations");
std::pair<std::string, memgraph::storage::Config> TestConfigurations[] = {
//{"NoGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::NONE}}},
{"100msPeriodicGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::PERIODIC,
.interval = std::chrono::milliseconds(100)}}},
{"1000msPeriodicGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::PERIODIC,
.interval = std::chrono::milliseconds(1000)}}}};
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
for (const auto &config : TestConfigurations) {
memgraph::utils::Timer timer;
std::chrono::duration<double> end_bench;
{
std::unique_ptr<memgraph::storage::Storage> storage(new memgraph::storage::InMemoryStorage(config.second));
std::array<memgraph::storage::Gid, 1> vertices;
memgraph::storage::PropertyId pid;
{
auto acc = storage->Access();
vertices[0] = acc->CreateVertex().Gid();
pid = acc->NameToProperty("NEW_PROP");
MG_ASSERT(!acc->Commit().HasError());
}
for (int iter = 0; iter != FLAGS_num_iterations; ++iter) {
auto acc = storage->Access();
auto vertex1 = acc->FindVertex(vertices[0], memgraph::storage::View::OLD);
for (auto i = 0; i != FLAGS_num_poperties; ++i) {
MG_ASSERT(!vertex1.value().SetProperty(pid, memgraph::storage::PropertyValue{i}).HasError());
}
MG_ASSERT(!acc->Commit().HasError());
}
end_bench = timer.Elapsed();
std::cout << "Config: " << config.first << ", Time: " << end_bench.count() << std::endl;
}
auto end_shutdown = timer.Elapsed();
std::cout << "Shutdown: " << (end_shutdown - end_bench).count() << std::endl;
}
}