diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt index bf3cbb23b..4a669b9e6 100644 --- a/src/memory/CMakeLists.txt +++ b/src/memory/CMakeLists.txt @@ -10,6 +10,6 @@ add_library(mg-memory STATIC ${memory_src_files}) target_link_libraries(mg-memory mg-utils fmt) if (ENABLE_JEMALLOC) - target_link_libraries(mg-memory Jemalloc::Jemalloc) + target_link_libraries(mg-memory Jemalloc::Jemalloc ${CMAKE_DL_LIBS}) target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=1) endif() diff --git a/src/storage/v2/delta.hpp b/src/storage/v2/delta.hpp index 66d7c37f4..bcb2930eb 100644 --- a/src/storage/v2/delta.hpp +++ b/src/storage/v2/delta.hpp @@ -122,7 +122,7 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); } struct Delta { - enum class Action { + enum class Action : std::uint8_t { /// Use for Vertex and Edge /// Used for disk storage for modifying MVCC logic and storing old key. Storing old key is necessary for /// deleting old-data (compaction). diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index 35beb4fd9..d9cbea292 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -271,7 +271,6 @@ DiskStorage::DiskAccessor::~DiskAccessor() { Abort(); } FinalizeTransaction(); - transaction_.deltas.~Bond(); } void DiskStorage::LoadPersistingMetadataInfo() { @@ -462,12 +461,12 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) { AllVerticesIterable(disk_storage->edge_import_mode_cache_->AccessToVertices(), storage_, &transaction_, view)); } if (transaction_.scanned_all_vertices_) { - return VerticesIterable(AllVerticesIterable(transaction_.vertices_.access(), storage_, &transaction_, view)); + return VerticesIterable(AllVerticesIterable(transaction_.vertices_->access(), storage_, &transaction_, view)); } disk_storage->LoadVerticesToMainMemoryCache(&transaction_); transaction_.scanned_all_vertices_ = true; - return VerticesIterable(AllVerticesIterable(transaction_.vertices_.access(), storage_, &transaction_, view)); + return VerticesIterable(AllVerticesIterable(transaction_.vertices_->access(), storage_, &transaction_, view)); } VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) { @@ -586,7 +585,7 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p std::unordered_set DiskStorage::MergeVerticesFromMainCacheWithLabelIndexCache( Transaction *transaction, LabelId label, View view, std::list &index_deltas, utils::SkipList *indexed_vertices) { - auto main_cache_acc = transaction->vertices_.access(); + auto main_cache_acc = transaction->vertices_->access(); std::unordered_set gids; gids.reserve(main_cache_acc.size()); @@ -638,7 +637,7 @@ void DiskStorage::LoadVerticesFromDiskLabelIndex(Transaction *transaction, Label std::unordered_set DiskStorage::MergeVerticesFromMainCacheWithLabelPropertyIndexCache( Transaction *transaction, LabelId label, PropertyId property, View view, std::list &index_deltas, utils::SkipList *indexed_vertices, const auto &label_property_filter) { - auto main_cache_acc = transaction->vertices_.access(); + auto main_cache_acc = transaction->vertices_->access(); std::unordered_set gids; gids.reserve(main_cache_acc.size()); @@ -721,7 +720,7 @@ std::unordered_set DiskStorage::MergeVerticesFromMainCacheWithLabelProperty const std::optional> &lower_bound, const std::optional> &upper_bound, std::list &index_deltas, utils::SkipList *indexed_vertices) { - auto main_cache_acc = transaction->vertices_.access(); + auto main_cache_acc = transaction->vertices_->access(); std::unordered_set gids; gids.reserve(main_cache_acc.size()); @@ -852,7 +851,7 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertex() { OOMExceptionEnabler oom_exception; auto *disk_storage = static_cast(storage_); auto gid = disk_storage->vertex_id_.fetch_add(1, std::memory_order_acq_rel); - auto acc = transaction_.vertices_.access(); + auto acc = transaction_.vertices_->access(); auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta}); @@ -924,8 +923,8 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; if (storage_->config_.items.properties_on_edges) { - auto acc = - edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges() : transaction_.edges_.access(); + auto acc = edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges() + : transaction_.edges_->access(); auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Edge(gid, delta)); MG_ASSERT(inserted, "The edge must be inserted here!"); @@ -1282,7 +1281,7 @@ std::optional DiskStorage::LoadVertexToMainMemoryCache( const std::string &key, const std::string &value, std::string &&ts) { - auto main_storage_accessor = transaction->vertices_.access(); + auto main_storage_accessor = transaction->vertices_->access(); storage::Gid gid = Gid::FromString(utils::ExtractGidFromKey(key)); if (ObjectExistsInCache(main_storage_accessor, gid)) { @@ -1309,7 +1308,7 @@ VertexAccessor DiskStorage::CreateVertexFromDisk(Transaction *transaction, utils std::optional DiskStorage::FindVertex(storage::Gid gid, Transaction *transaction, View view) { auto acc = edge_import_status_ == EdgeImportMode::ACTIVE ? edge_import_mode_cache_->AccessToVertices() - : transaction->vertices_.access(); + : transaction->vertices_->access(); auto vertex_it = acc.find(gid); if (vertex_it != acc.end()) { return VertexAccessor::Create(&*vertex_it, this, transaction, view); @@ -1358,7 +1357,7 @@ std::optional DiskStorage::CreateEdgeFromDisk(const VertexAccessor EdgeRef edge(gid); if (config_.items.properties_on_edges) { - auto acc = edge_import_mode_active ? edge_import_mode_cache_->AccessToEdges() : transaction->edges_.access(); + auto acc = edge_import_mode_active ? edge_import_mode_cache_->AccessToEdges() : transaction->edges_->access(); auto *delta = CreateDeleteDeserializedObjectDelta(transaction, old_disk_key, std::move(read_ts)); auto [it, inserted] = acc.insert(Edge(gid, delta)); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); @@ -1660,7 +1659,7 @@ utils::BasicResult DiskStorage::DiskAccessor::Co } else { std::vector> unique_storage; if (auto vertices_flush_res = - disk_storage->FlushVertices(&transaction_, transaction_.vertices_.access(), unique_storage); + disk_storage->FlushVertices(&transaction_, transaction_.vertices_->access(), unique_storage); vertices_flush_res.HasError()) { Abort(); return vertices_flush_res.GetError(); @@ -1671,7 +1670,7 @@ utils::BasicResult DiskStorage::DiskAccessor::Co return del_vertices_res.GetError(); } - if (auto modified_edges_res = disk_storage->FlushModifiedEdges(&transaction_, transaction_.edges_.access()); + if (auto modified_edges_res = disk_storage->FlushModifiedEdges(&transaction_, transaction_.edges_->access()); modified_edges_res.HasError()) { Abort(); return modified_edges_res.GetError(); diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 27d4e9feb..25f75706d 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -184,9 +184,7 @@ InMemoryStorage::~InMemoryStorage() { } } } - if (!committed_transactions_->empty()) { - committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); }); - } + committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); }); } InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, @@ -198,6 +196,8 @@ InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) no InMemoryStorage::InMemoryAccessor::~InMemoryAccessor() { if (is_transaction_active_) { Abort(); + // We didn't actually commit + commit_timestamp_.reset(); } FinalizeTransaction(); @@ -743,25 +743,18 @@ utils::BasicResult InMemoryStorage::InMemoryAcce // Replica can log only the write transaction received from Main // so the Wal files are consistent if (replState.IsMain() || desired_commit_timestamp.has_value()) { - could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); - // Take committed_transactions lock while holding the engine lock to - // make sure that committed transactions are sorted by the commit - // timestamp in the list. - mem_storage->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) { - // TODO: release lock, and update all deltas to have a local copy - // of the commit timestamp - MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); - transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); - // Replica can only update the last commit timestamp with - // the commits received from main. - if (replState.IsMain() || desired_commit_timestamp.has_value()) { - // Update the last commit timestamp - mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); - } - // Release engine lock because we don't have to hold it anymore - // and emplace back could take a long time. - engine_guard.unlock(); - }); + could_replicate_all_sync_replicas = + mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard + // TODO: release lock, and update all deltas to have a local copy of the commit timestamp + transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard + // Replica can only update the last commit timestamp with + // the commits received from main. + if (replState.IsMain() || desired_commit_timestamp.has_value()) { + // Update the last commit timestamp + mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); // protected by engine_guard + } + // Release engine lock because we don't have to hold it anymore + engine_guard.unlock(); mem_storage->commit_log_->MarkFinished(start_timestamp); } @@ -783,6 +776,7 @@ utils::BasicResult InMemoryStorage::InMemoryAcce auto validation_result = storage_->constraints_.existence_constraints_->Validate(*prev.vertex); if (validation_result) { Abort(); + DMG_ASSERT(!commit_timestamp_.has_value()); return StorageManipulationError{*validation_result}; } } @@ -842,27 +836,22 @@ utils::BasicResult InMemoryStorage::InMemoryAcce // so the Wal files are consistent if (replState.IsMain() || desired_commit_timestamp.has_value()) { could_replicate_all_sync_replicas = - mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_); + mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_); // protected by engine_guard } - // Take committed_transactions lock while holding the engine lock to - // make sure that committed transactions are sorted by the commit - // timestamp in the list. - mem_storage->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) { - // TODO: release lock, and update all deltas to have a local copy - // of the commit timestamp - MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); - transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); - // Replica can only update the last commit timestamp with - // the commits received from main. - if (replState.IsMain() || desired_commit_timestamp.has_value()) { - // Update the last commit timestamp - mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); - } - // Release engine lock because we don't have to hold it anymore - // and emplace back could take a long time. - engine_guard.unlock(); - }); + // TODO: release lock, and update all deltas to have a local copy of the commit timestamp + MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); + transaction_.commit_timestamp->store(*commit_timestamp_, + std::memory_order_release); // protected by engine_guard + // Replica can only update the last commit timestamp with + // the commits received from main. + if (replState.IsMain() || desired_commit_timestamp.has_value()) { + // Update the last commit timestamp + mem_storage->repl_storage_state_.last_commit_timestamp_.store( + *commit_timestamp_); // protected by engine_guard + } + // Release engine lock because we don't have to hold it anymore + engine_guard.unlock(); mem_storage->commit_log_->MarkFinished(start_timestamp); } @@ -870,6 +859,8 @@ utils::BasicResult InMemoryStorage::InMemoryAcce if (unique_constraint_violation) { Abort(); + DMG_ASSERT(commit_timestamp_.has_value()); + commit_timestamp_.reset(); // We have aborted, hence we have not committed return StorageManipulationError{*unique_constraint_violation}; } } @@ -1041,7 +1032,8 @@ void InMemoryStorage::InMemoryAccessor::Abort() { // emplace back could take a long time. engine_guard.unlock(); - garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas)); + garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas), + std::move(transaction_.commit_timestamp)); }); mem_storage->deleted_vertices_.WithLock( [&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); }); @@ -1057,8 +1049,15 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() { if (commit_timestamp_) { auto *mem_storage = static_cast(storage_); mem_storage->commit_log_->MarkFinished(*commit_timestamp_); - mem_storage->committed_transactions_.WithLock( - [&](auto &committed_transactions) { committed_transactions.emplace_back(std::move(transaction_)); }); + + if (!transaction_.deltas.use().empty()) { + // Only hand over delta to be GC'ed if there was any deltas + mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) { + // using mark of 0 as GC will assign a mark_timestamp after unlinking + committed_transactions.emplace_back(0, std::move(transaction_.deltas), + std::move(transaction_.commit_timestamp)); + }); + } commit_timestamp_.reset(); } } @@ -1288,10 +1287,26 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } uint64_t oldest_active_start_timestamp = commit_log_->OldestActive(); + + // Deltas from previous GC runs or from aborts can be cleaned up here + garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + if constexpr (force) { + // if force is set to true we can simply delete all the leftover undos because + // no transaction is active + garbage_undo_buffers.clear(); + } else { + // garbage_undo_buffers is ordered, pop until we can't + while (!garbage_undo_buffers.empty() && + garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) { + garbage_undo_buffers.pop_front(); + } + } + }); + // We don't move undo buffers of unlinked transactions to garbage_undo_buffers // list immediately, because we would have to repeatedly take // garbage_undo_buffers lock. - std::list> unlinked_undo_buffers; + std::list unlinked_undo_buffers{}; // We will only free vertices deleted up until now in this GC cycle, and we // will do it after cleaning-up the indices. That way we are sure that all @@ -1304,28 +1319,27 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false); auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false); + // Short lock, to move to local variable. Hence allows other transactions to commit. + auto linked_undo_buffers = std::list{}; + committed_transactions_.WithLock( + [&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); }); + // Flag that will be used to determine whether the Index GC should be run. It // should be run when there were any items that were cleaned up (there were // updates between this run of the GC and the previous run of the GC). This // eliminates high CPU usage when the GC doesn't have to clean up anything. - bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty() || - need_full_scan_vertices || need_full_scan_edges; + bool run_index_cleanup = !linked_undo_buffers.empty() || !garbage_undo_buffers_->empty() || need_full_scan_vertices || + need_full_scan_edges; - while (true) { - // We don't want to hold the lock on committed transactions for too long, - // because that prevents other transactions from committing. - Transaction *transaction = nullptr; - { - auto committed_transactions_ptr = committed_transactions_.Lock(); - if (committed_transactions_ptr->empty()) { - break; - } - transaction = &committed_transactions_ptr->front(); - } + auto const end_linked_undo_buffers = linked_undo_buffers.end(); + for (auto linked_entry = linked_undo_buffers.begin(); linked_entry != end_linked_undo_buffers;) { + auto const *const commit_timestamp_ptr = linked_entry->commit_timestamp_.get(); + auto const commit_timestamp = commit_timestamp_ptr->load(std::memory_order_acquire); - auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire); + // only process those that are no longer active if (commit_timestamp >= oldest_active_start_timestamp) { - break; + ++linked_entry; // can not process, skip + continue; // must continue to next transaction, because committed_transactions_ was not ordered } // When unlinking a delta which is the first delta in its version chain, @@ -1359,7 +1373,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ // chain in a broken state. // The chain can be only read without taking any locks. - for (Delta &delta : transaction->deltas.use()) { + for (Delta &delta : linked_entry->deltas_.use()) { while (true) { auto prev = delta.prev.Get(); switch (prev.type) { @@ -1373,6 +1387,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } vertex->delta = nullptr; if (vertex->deleted) { + DMG_ASSERT(delta.action == memgraph::storage::Delta::Action::RECREATE_OBJECT); current_deleted_vertices.push_back(vertex->gid); } break; @@ -1387,37 +1402,56 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } edge->delta = nullptr; if (edge->deleted) { + DMG_ASSERT(delta.action == memgraph::storage::Delta::Action::RECREATE_OBJECT); current_deleted_edges.push_back(edge->gid); } break; } case PreviousPtr::Type::DELTA: { - if (prev.delta->timestamp->load(std::memory_order_acquire) == commit_timestamp) { + // kTransactionInitialId + // │ + // ▼ + // ┌───────────────────┬─────────────┐ + // │ Committed │ Uncommitted │ + // ├──────────┬────────┴─────────────┤ + // │ Inactive │ Active │ + // └──────────┴──────────────────────┘ + // ▲ + // │ + // oldest_active_start_timestamp + + if (prev.delta->timestamp == commit_timestamp_ptr) { // The delta that is newer than this one is also a delta from this // transaction. We skip the current delta and will remove it as a // part of the suffix later. break; } - std::unique_lock guard; - { - // We need to find the parent object in order to be able to use - // its lock. - auto parent = prev; - while (parent.type == PreviousPtr::Type::DELTA) { - parent = parent.delta->prev.Get(); - } + + if (prev.delta->timestamp->load() < oldest_active_start_timestamp) { + // If previous is from another inactive transaction, no need to + // lock the edge/vertex, nothing will read this far or relink to + // us directly + break; + } + + // Previous is either active (committed or uncommitted), we need to find + // the parent object in order to be able to use its lock. + auto parent = prev; + while (parent.type == PreviousPtr::Type::DELTA) { + parent = parent.delta->prev.Get(); + } + + auto const guard = std::invoke([&] { switch (parent.type) { case PreviousPtr::Type::VERTEX: - guard = std::unique_lock{parent.vertex->lock}; - break; + return std::unique_lock{parent.vertex->lock}; case PreviousPtr::Type::EDGE: - guard = std::unique_lock{parent.edge->lock}; - break; + return std::unique_lock{parent.edge->lock}; case PreviousPtr::Type::DELTA: case PreviousPtr::Type::NULLPTR: LOG_FATAL("Invalid database state!"); } - } + }); if (delta.prev.Get() != prev) { // Something changed, we could now be the first delta in the // chain. @@ -1435,9 +1469,16 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ } } - committed_transactions_.WithLock([&](auto &committed_transactions) { - unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas)); - committed_transactions.pop_front(); + // Now unlinked, move to unlinked_undo_buffers + auto const to_move = linked_entry; + ++linked_entry; // advanced to next before we move the list node + unlinked_undo_buffers.splice(unlinked_undo_buffers.end(), linked_undo_buffers, to_move); + } + + if (!linked_undo_buffers.empty()) { + // some were not able to be collected, add them back to committed_transactions_ for the next GC run + committed_transactions_.WithLock([&linked_undo_buffers](auto &committed_transactions) { + committed_transactions.splice(committed_transactions.begin(), std::move(linked_undo_buffers)); }); } @@ -1455,60 +1496,33 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ { std::unique_lock guard(engine_lock_); - uint64_t mark_timestamp = timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. - garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { - // Release engine lock because we don't have to hold it anymore and - // this could take a long time. - guard.unlock(); - // TODO(mtomic): holding garbage_undo_buffers_ lock here prevents - // transactions from aborting until we're done marking, maybe we should - // add them one-by-one or something - for (auto &[timestamp, transaction_deltas] : unlinked_undo_buffers) { - timestamp = mark_timestamp; - } - garbage_undo_buffers.splice(garbage_undo_buffers.end(), std::move(unlinked_undo_buffers)); - }); - for (auto vertex : current_deleted_vertices) { - garbage_vertices_.emplace_back(mark_timestamp, vertex); + uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have + + if (force or mark_timestamp == oldest_active_start_timestamp) { + // if lucky, there are no active transactions, hence nothing looking at the deltas + // remove them now + unlinked_undo_buffers.clear(); + } else { + // Take garbage_undo_buffers lock while holding the engine lock to make + // sure that entries are sorted by mark timestamp in the list. + garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + // Release engine lock because we don't have to hold it anymore and + // this could take a long time. + guard.unlock(); + // correct the markers, and defer until next GC run + for (auto &unlinked_undo_buffer : unlinked_undo_buffers) { + unlinked_undo_buffer.mark_timestamp_ = mark_timestamp; + } + // ensure insert at end to preserve the order + garbage_undo_buffers.splice(garbage_undo_buffers.end(), std::move(unlinked_undo_buffers)); + }); } } - garbage_undo_buffers_.WithLock([&](auto &undo_buffers) { - // if force is set to true we can simply delete all the leftover undos because - // no transaction is active - if constexpr (force) { - for (auto &[timestamp, transaction_deltas] : undo_buffers) { - transaction_deltas.~Bond(); - } - undo_buffers.clear(); - - } else { - while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) { - auto &[timestamp, transaction_deltas] = undo_buffers.front(); - transaction_deltas.~Bond(); - // this will trigger destory of object - // but since we release pointer, it will just destory other stuff - undo_buffers.pop_front(); - } - } - }); - { auto vertex_acc = vertices_.access(); - if constexpr (force) { - // if force is set to true, then we have unique_lock and no transactions are active - // so we can clean all of the deleted vertices - while (!garbage_vertices_.empty()) { - MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!"); - garbage_vertices_.pop_front(); - } - } else { - while (!garbage_vertices_.empty() && garbage_vertices_.front().first < oldest_active_start_timestamp) { - MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!"); - garbage_vertices_.pop_front(); - } + for (auto vertex : current_deleted_vertices) { + MG_ASSERT(vertex_acc.remove(vertex), "Invalid database state!"); } } { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 153409126..73b3bfc5c 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -411,22 +411,32 @@ class InMemoryStorage final : public Storage { // whatever. std::optional commit_log_; - utils::Synchronized, utils::SpinLock> committed_transactions_; utils::Scheduler gc_runner_; std::mutex gc_lock_; using BondPmrLd = Bond>; - // Ownership of unlinked deltas is transfered to garabage_undo_buffers once transaction is commited - utils::Synchronized>, utils::SpinLock> garbage_undo_buffers_; + struct GCDeltas { + GCDeltas(uint64_t mark_timestamp, BondPmrLd deltas, std::unique_ptr> commit_timestamp) + : mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {} + + GCDeltas(GCDeltas &&) = default; + GCDeltas &operator=(GCDeltas &&) = default; + + uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has + BondPmrLd deltas_; //!< the deltas that need cleaning + std::unique_ptr> commit_timestamp_{}; //!< the timestamp the deltas are pointing at + }; + + // Ownership of linked deltas is transferred to committed_transactions_ once transaction is commited + utils::Synchronized, utils::SpinLock> committed_transactions_{}; + + // Ownership of unlinked deltas is transferred to garabage_undo_buffers once transaction is commited/aborted + utils::Synchronized, utils::SpinLock> garbage_undo_buffers_{}; // Vertices that are logically deleted but still have to be removed from // indices before removing them from the main storage. utils::Synchronized, utils::SpinLock> deleted_vertices_; - // Vertices that are logically deleted and removed from indices and now wait - // to be removed from the main storage. - std::list> garbage_vertices_; - // Edges that are logically deleted and wait to be removed from the main // storage. utils::Synchronized, utils::SpinLock> deleted_edges_; diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index 69565b902..dab484095 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -46,31 +46,26 @@ struct Transaction { : transaction_id(transaction_id), start_timestamp(start_timestamp), command_id(0), - deltas(1024UL), + deltas(0), md_deltas(utils::NewDeleteResource()), must_abort(false), isolation_level(isolation_level), storage_mode(storage_mode), - edge_import_mode_active(edge_import_mode_active) {} + edge_import_mode_active(edge_import_mode_active), + vertices_{(storage_mode == StorageMode::ON_DISK_TRANSACTIONAL) + ? std::optional>{std::in_place} + : std::nullopt}, + edges_{(storage_mode == StorageMode::ON_DISK_TRANSACTIONAL) + ? std::optional>{std::in_place} + : std::nullopt} {} - Transaction(Transaction &&other) noexcept - : transaction_id(other.transaction_id), - start_timestamp(other.start_timestamp), - commit_timestamp(std::move(other.commit_timestamp)), - command_id(other.command_id), - deltas(std::move(other.deltas)), - md_deltas(std::move(other.md_deltas)), - must_abort(other.must_abort), - isolation_level(other.isolation_level), - storage_mode(other.storage_mode), - edge_import_mode_active(other.edge_import_mode_active), - manyDeltasCache{std::move(other.manyDeltasCache)} {} + Transaction(Transaction &&other) noexcept = default; Transaction(const Transaction &) = delete; Transaction &operator=(const Transaction &) = delete; - Transaction &operator=(Transaction &&other) = delete; + Transaction &operator=(Transaction &&other) = default; - ~Transaction() {} + ~Transaction() = default; bool IsDiskStorage() const { return storage_mode == StorageMode::ON_DISK_TRANSACTIONAL; } @@ -86,41 +81,41 @@ struct Transaction { bool RemoveModifiedEdge(const Gid &gid) { return modified_edges_.erase(gid) > 0U; } - uint64_t transaction_id; - uint64_t start_timestamp; + uint64_t transaction_id{}; + uint64_t start_timestamp{}; // The `Transaction` object is stack allocated, but the `commit_timestamp` // must be heap allocated because `Delta`s have a pointer to it, and that // pointer must stay valid after the `Transaction` is moved into // `commited_transactions_` list for GC. - std::unique_ptr> commit_timestamp; - uint64_t command_id; + std::unique_ptr> commit_timestamp{}; + uint64_t command_id{}; Bond deltas; utils::pmr::list md_deltas; - bool must_abort; - IsolationLevel isolation_level; - StorageMode storage_mode; + bool must_abort{}; + IsolationLevel isolation_level{}; + StorageMode storage_mode{}; bool edge_import_mode_active{false}; // A cache which is consistent to the current transaction_id + command_id. // Used to speedup getting info about a vertex when there is a long delta // chain involved in rebuilding that info. - mutable VertexInfoCache manyDeltasCache; + mutable VertexInfoCache manyDeltasCache{}; // Store modified edges GID mapped to changed Delta and serialized edge key // Only for disk storage - ModifiedEdgesMap modified_edges_; - rocksdb::Transaction *disk_transaction_; + ModifiedEdgesMap modified_edges_{}; + rocksdb::Transaction *disk_transaction_{}; /// Main storage - utils::SkipList vertices_; - std::vector>> index_storage_; + std::optional> vertices_{}; + std::vector>> index_storage_{}; /// We need them because query context for indexed reading is cleared after the query is done not after the /// transaction is done - std::vector> index_deltas_storage_; - utils::SkipList edges_; - std::map> edges_to_delete_; - std::map vertices_to_delete_; + std::vector> index_deltas_storage_{}; + std::optional> edges_{}; + std::map> edges_to_delete_{}; + std::map vertices_to_delete_{}; bool scanned_all_vertices_ = false; }; diff --git a/src/storage/v2/vertex_info_cache.cpp b/src/storage/v2/vertex_info_cache.cpp index c0fc2b11b..a2546d7be 100644 --- a/src/storage/v2/vertex_info_cache.cpp +++ b/src/storage/v2/vertex_info_cache.cpp @@ -51,8 +51,6 @@ void Store(Value &&value, VertexInfoCache &caches, Func &&getCache, View view, K cache.emplace(key_type{std::forward(keys)...}, std::forward(value)); } -VertexInfoCache::VertexInfoCache() = default; -VertexInfoCache::~VertexInfoCache() = default; VertexInfoCache::VertexInfoCache(VertexInfoCache &&) noexcept = default; VertexInfoCache &VertexInfoCache::operator=(VertexInfoCache &&) noexcept = default; diff --git a/src/storage/v2/vertex_info_cache.hpp b/src/storage/v2/vertex_info_cache.hpp index 725e2f378..a5acbd7af 100644 --- a/src/storage/v2/vertex_info_cache.hpp +++ b/src/storage/v2/vertex_info_cache.hpp @@ -45,8 +45,8 @@ class PropertyValue; * - only for View::OLD */ struct VertexInfoCache final { - VertexInfoCache(); - ~VertexInfoCache(); + VertexInfoCache() = default; + ~VertexInfoCache() = default; // By design would be a mistake to copy the cache VertexInfoCache(VertexInfoCache const &) = delete; diff --git a/src/utils/bond.hpp b/src/utils/bond.hpp index 7ea3c2aba..3b826adc7 100644 --- a/src/utils/bond.hpp +++ b/src/utils/bond.hpp @@ -26,15 +26,20 @@ struct Bond { : res_(std::make_unique(initial_size)), container_(memgraph::utils::Allocator(res_.get()).template new_object()){}; - Bond(Bond &&other) noexcept : res_(std::exchange(other.res_, nullptr)), container_(other.container_) { - other.container_ = nullptr; - } + Bond(Bond &&other) noexcept + : res_(std::exchange(other.res_, nullptr)), container_(std::exchange(other.container_, nullptr)) {} Bond(const Bond &other) = delete; Bond &operator=(const Bond &other) = delete; - Bond &operator=(Bond &&other) = delete; + Bond &operator=(Bond &&other) { + if (this != &other) { + res_ = std::exchange(other.res_, nullptr); + container_ = std::exchange(other.container_, nullptr); + } + return *this; + }; auto use() -> Container & { return *container_; } diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index 5bebfabae..b97320c21 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -62,7 +62,7 @@ class Scheduler { auto now = std::chrono::system_clock::now(); start_time += pause; if (start_time > now) { - condition_variable_.wait_for(lk, start_time - now, [&] { return is_working_.load() == false; }); + condition_variable_.wait_until(lk, start_time, [&] { return is_working_.load() == false; }); } else { start_time = now; } @@ -80,10 +80,7 @@ class Scheduler { */ void Stop() { is_working_.store(false); - { - std::unique_lock lk(mutex_); - condition_variable_.notify_one(); - } + condition_variable_.notify_one(); if (thread_.joinable()) thread_.join(); } diff --git a/tests/benchmark/CMakeLists.txt b/tests/benchmark/CMakeLists.txt index 4bf8374b0..638fd4f5d 100644 --- a/tests/benchmark/CMakeLists.txt +++ b/tests/benchmark/CMakeLists.txt @@ -13,7 +13,7 @@ function(add_benchmark test_cpp) # used to help create two targets of the same name even though CMake # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) - target_link_libraries(${target_name} benchmark gflags) + target_link_libraries(${target_name} benchmark gflags mg-memory) # register test add_test(${target_name} ${exec_name}) add_dependencies(memgraph__benchmark ${target_name}) @@ -60,5 +60,8 @@ target_link_libraries(${test_prefix}expansion mg-query mg-communication mg-licen add_benchmark(storage_v2_gc.cpp) target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2) +add_benchmark(storage_v2_gc2.cpp) +target_link_libraries(${test_prefix}storage_v2_gc2 mg-storage-v2) + add_benchmark(storage_v2_property_store.cpp) target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2) diff --git a/tests/benchmark/storage_v2_gc2.cpp b/tests/benchmark/storage_v2_gc2.cpp new file mode 100644 index 000000000..a8c0a589e --- /dev/null +++ b/tests/benchmark/storage_v2_gc2.cpp @@ -0,0 +1,65 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include + +#include + +#include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/storage.hpp" +#include "utils/timer.hpp" + +// This benchmark should be run for a fixed amount of time that is +// large compared to GC interval to make the output relevant. + +DEFINE_int32(num_poperties, 10'000, "number of set property per transaction"); +DEFINE_int32(num_iterations, 5'000, "number of iterations"); + +std::pair TestConfigurations[] = { + //{"NoGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::NONE}}}, + {"100msPeriodicGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::PERIODIC, + .interval = std::chrono::milliseconds(100)}}}, + {"1000msPeriodicGc", memgraph::storage::Config{.gc = {.type = memgraph::storage::Config::Gc::Type::PERIODIC, + .interval = std::chrono::milliseconds(1000)}}}}; + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + for (const auto &config : TestConfigurations) { + memgraph::utils::Timer timer; + std::chrono::duration end_bench; + { + std::unique_ptr storage(new memgraph::storage::InMemoryStorage(config.second)); + std::array vertices; + memgraph::storage::PropertyId pid; + { + auto acc = storage->Access(); + vertices[0] = acc->CreateVertex().Gid(); + pid = acc->NameToProperty("NEW_PROP"); + MG_ASSERT(!acc->Commit().HasError()); + } + + for (int iter = 0; iter != FLAGS_num_iterations; ++iter) { + auto acc = storage->Access(); + auto vertex1 = acc->FindVertex(vertices[0], memgraph::storage::View::OLD); + for (auto i = 0; i != FLAGS_num_poperties; ++i) { + MG_ASSERT(!vertex1.value().SetProperty(pid, memgraph::storage::PropertyValue{i}).HasError()); + } + MG_ASSERT(!acc->Commit().HasError()); + } + + end_bench = timer.Elapsed(); + std::cout << "Config: " << config.first << ", Time: " << end_bench.count() << std::endl; + } + auto end_shutdown = timer.Elapsed(); + std::cout << "Shutdown: " << (end_shutdown - end_bench).count() << std::endl; + } +}