From 4ef6a1f9c3c94afad405a358065ac3108491b0fe Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Tue, 6 Feb 2024 17:07:38 +0000 Subject: [PATCH] Improve memory handling of Deltas (#1688) - Reduce delta from 104B to 80B - Hold and pass them around as in a deque - Detect and deleted deltas within commit if safe to do so --- libs/setup.sh | 2 + src/storage/v2/delta.hpp | 77 +-- src/storage/v2/disk/storage.cpp | 12 +- src/storage/v2/durability/wal.cpp | 4 +- src/storage/v2/edge_accessor.cpp | 10 +- src/storage/v2/id_types.hpp | 26 +- src/storage/v2/indices/indices_utils.hpp | 10 +- src/storage/v2/inmemory/storage.cpp | 571 +++++++++++------- src/storage/v2/inmemory/storage.hpp | 8 +- .../v2/inmemory/unique_constraints.cpp | 12 +- src/storage/v2/mvcc.hpp | 16 +- src/storage/v2/property_value.hpp | 168 +++--- src/storage/v2/transaction.hpp | 7 +- src/storage/v2/vertex_info_helpers.hpp | 24 +- src/utils/disk_utils.hpp | 4 +- src/utils/string.hpp | 7 + tests/unit/storage_v2_property_store.cpp | 8 +- tests/unit/storage_v2_wal_file.cpp | 8 +- 18 files changed, 547 insertions(+), 427 deletions(-) diff --git a/libs/setup.sh b/libs/setup.sh index 4b5b81dfc..76fb4fcfa 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -270,6 +270,8 @@ pushd jemalloc MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \ ./configure \ --disable-cxx \ + --with-lg-page=12 \ + --with-lg-hugepage=21 \ --enable-shared=no --prefix=$working_dir \ --with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" diff --git a/src/storage/v2/delta.hpp b/src/storage/v2/delta.hpp index bcb2930eb..9c70bdc4c 100644 --- a/src/storage/v2/delta.hpp +++ b/src/storage/v2/delta.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -57,9 +57,11 @@ class PreviousPtr { explicit Pointer(Edge *edge) : type(Type::EDGE), edge(edge) {} Type type{Type::NULLPTR}; - Delta *delta{nullptr}; - Vertex *vertex{nullptr}; - Edge *edge{nullptr}; + union { + Delta *delta = nullptr; + Vertex *vertex; + Edge *edge; + }; }; PreviousPtr() : storage_(0) {} @@ -157,59 +159,51 @@ struct Delta { // DELETE_DESERIALIZED_OBJECT is used to load data from disk committed by past txs. // Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from // current tx. This timestamp we got from RocksDB timestamp stored in key. - Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, const std::optional &old_disk_key) - : action(Action::DELETE_DESERIALIZED_OBJECT), - timestamp(new std::atomic(ts)), - command_id(0), - old_disk_key(old_disk_key) {} + Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, std::optional old_disk_key) + : timestamp(new std::atomic(ts)), command_id(0), old_disk_key{.value = std::move(old_disk_key)} {} Delta(DeleteObjectTag /*tag*/, std::atomic *timestamp, uint64_t command_id) - : action(Action::DELETE_OBJECT), timestamp(timestamp), command_id(command_id) {} + : timestamp(timestamp), command_id(command_id), action(Action::DELETE_OBJECT) {} Delta(RecreateObjectTag /*tag*/, std::atomic *timestamp, uint64_t command_id) - : action(Action::RECREATE_OBJECT), timestamp(timestamp), command_id(command_id) {} + : timestamp(timestamp), command_id(command_id), action(Action::RECREATE_OBJECT) {} Delta(AddLabelTag /*tag*/, LabelId label, std::atomic *timestamp, uint64_t command_id) - : action(Action::ADD_LABEL), timestamp(timestamp), command_id(command_id), label(label) {} + : timestamp(timestamp), command_id(command_id), label{.action = Action::ADD_LABEL, .value = label} {} Delta(RemoveLabelTag /*tag*/, LabelId label, std::atomic *timestamp, uint64_t command_id) - : action(Action::REMOVE_LABEL), timestamp(timestamp), command_id(command_id), label(label) {} + : timestamp(timestamp), command_id(command_id), label{.action = Action::REMOVE_LABEL, .value = label} {} - Delta(SetPropertyTag /*tag*/, PropertyId key, const PropertyValue &value, std::atomic *timestamp, + Delta(SetPropertyTag /*tag*/, PropertyId key, PropertyValue value, std::atomic *timestamp, uint64_t command_id) - : action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, value}) {} - - Delta(SetPropertyTag /*tag*/, PropertyId key, PropertyValue &&value, std::atomic *timestamp, - uint64_t command_id) - : action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, std::move(value)}) {} + : timestamp(timestamp), + command_id(command_id), + property{ + .action = Action::SET_PROPERTY, .key = key, .value = std::make_unique(std::move(value))} {} Delta(AddInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic *timestamp, uint64_t command_id) - : action(Action::ADD_IN_EDGE), - timestamp(timestamp), + : timestamp(timestamp), command_id(command_id), - vertex_edge({edge_type, vertex, edge}) {} + vertex_edge{.action = Action::ADD_IN_EDGE, .edge_type = edge_type, vertex, edge} {} Delta(AddOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic *timestamp, uint64_t command_id) - : action(Action::ADD_OUT_EDGE), - timestamp(timestamp), + : timestamp(timestamp), command_id(command_id), - vertex_edge({edge_type, vertex, edge}) {} + vertex_edge{.action = Action::ADD_OUT_EDGE, .edge_type = edge_type, vertex, edge} {} Delta(RemoveInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic *timestamp, uint64_t command_id) - : action(Action::REMOVE_IN_EDGE), - timestamp(timestamp), + : timestamp(timestamp), command_id(command_id), - vertex_edge({edge_type, vertex, edge}) {} + vertex_edge{.action = Action::REMOVE_IN_EDGE, .edge_type = edge_type, vertex, edge} {} Delta(RemoveOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic *timestamp, uint64_t command_id) - : action(Action::REMOVE_OUT_EDGE), - timestamp(timestamp), + : timestamp(timestamp), command_id(command_id), - vertex_edge({edge_type, vertex, edge}) {} + vertex_edge{.action = Action::REMOVE_OUT_EDGE, .edge_type = edge_type, vertex, edge} {} Delta(const Delta &) = delete; Delta(Delta &&) = delete; @@ -228,18 +222,16 @@ struct Delta { case Action::REMOVE_OUT_EDGE: break; case Action::DELETE_DESERIALIZED_OBJECT: - old_disk_key.reset(); + old_disk_key.value.reset(); delete timestamp; timestamp = nullptr; break; case Action::SET_PROPERTY: - property.value.~PropertyValue(); + property.value.reset(); break; } } - Action action; - // TODO: optimize with in-place copy std::atomic *timestamp; uint64_t command_id; @@ -247,13 +239,22 @@ struct Delta { std::atomic next{nullptr}; union { - std::optional old_disk_key; - LabelId label; + Action action; struct { + Action action = Action::DELETE_DESERIALIZED_OBJECT; + std::optional value; + } old_disk_key; + struct { + Action action; + LabelId value; + } label; + struct { + Action action; PropertyId key; - storage::PropertyValue value; + std::unique_ptr value; } property; struct { + Action action; EdgeTypeId edge_type; Vertex *vertex; EdgeRef edge; diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index f3c3aa0f4..adc0e92f4 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -137,14 +137,14 @@ bool VertexHasLabel(const Vertex &vertex, LabelId label, Transaction *transactio ApplyDeltasForRead(transaction, delta, view, [&deleted, &has_label, label](const Delta &delta) { switch (delta.action) { case Delta::Action::REMOVE_LABEL: { - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; } break; } case Delta::Action::ADD_LABEL: { - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; } @@ -177,7 +177,7 @@ PropertyValue GetVertexProperty(const Vertex &vertex, PropertyId property, Trans switch (delta.action) { case Delta::Action::SET_PROPERTY: { if (delta.property.key == property) { - value = delta.property.value; + value = *delta.property.value; } break; } @@ -1682,9 +1682,9 @@ utils::BasicResult DiskStorage::DiskAccessor::Co } break; } } - } else if (transaction_.deltas.use().empty() || + } else if (transaction_.deltas.empty() || (!edge_import_mode_active && - std::all_of(transaction_.deltas.use().begin(), transaction_.deltas.use().end(), [](const Delta &delta) { + std::all_of(transaction_.deltas.begin(), transaction_.deltas.end(), [](const Delta &delta) { return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT; }))) { } else { @@ -1812,7 +1812,7 @@ void DiskStorage::DiskAccessor::UpdateObjectsCountOnAbort() { auto *disk_storage = static_cast(storage_); uint64_t transaction_id = transaction_.transaction_id; - for (const auto &delta : transaction_.deltas.use()) { + for (const auto &delta : transaction_.deltas) { auto prev = delta.prev.Get(); switch (prev.type) { case PreviousPtr::Type::VERTEX: { diff --git a/src/storage/v2/durability/wal.cpp b/src/storage/v2/durability/wal.cpp index e808f01a3..52e916052 100644 --- a/src/storage/v2/durability/wal.cpp +++ b/src/storage/v2/durability/wal.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -580,7 +580,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, SalientConf case Delta::Action::REMOVE_LABEL: { encoder->WriteMarker(VertexActionToMarker(delta.action)); encoder->WriteUint(vertex.gid.AsUint()); - encoder->WriteString(name_id_mapper->IdToName(delta.label.AsUint())); + encoder->WriteString(name_id_mapper->IdToName(delta.label.value.AsUint())); break; } case Delta::Action::ADD_OUT_EDGE: diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index 3ab2e3d79..03522ba16 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -237,7 +237,7 @@ Result EdgeAccessor::GetProperty(PropertyId property, View view) switch (delta.action) { case Delta::Action::SET_PROPERTY: { if (delta.property.key == property) { - *value = delta.property.value; + *value = *delta.property.value; } break; } @@ -281,15 +281,15 @@ Result> EdgeAccessor::Properties(View view) case Delta::Action::SET_PROPERTY: { auto it = properties.find(delta.property.key); if (it != properties.end()) { - if (delta.property.value.IsNull()) { + if (delta.property.value->IsNull()) { // remove the property properties.erase(it); } else { // set the value - it->second = delta.property.value; + it->second = *delta.property.value; } - } else if (!delta.property.value.IsNull()) { - properties.emplace(delta.property.key, delta.property.value); + } else if (!delta.property.value->IsNull()) { + properties.emplace(delta.property.key, *delta.property.value); } break; } diff --git a/src/storage/v2/id_types.hpp b/src/storage/v2/id_types.hpp index 3f2c8aa40..5e1809c67 100644 --- a/src/storage/v2/id_types.hpp +++ b/src/storage/v2/id_types.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -23,24 +23,24 @@ namespace memgraph::storage { -#define STORAGE_DEFINE_ID_TYPE(name) \ +#define STORAGE_DEFINE_ID_TYPE(name, type_store, type_conv, parse) \ class name final { \ private: \ - explicit name(uint64_t id) : id_(id) {} \ + explicit name(type_store id) : id_(id) {} \ \ public: \ /* Default constructor to allow serialization or preallocation. */ \ name() = default; \ \ - static name FromUint(uint64_t id) { return name{id}; } \ - static name FromInt(int64_t id) { return name{utils::MemcpyCast(id)}; } \ - uint64_t AsUint() const { return id_; } \ - int64_t AsInt() const { return utils::MemcpyCast(id_); } \ - static name FromString(std::string_view id) { return name{utils::ParseStringToUint64(id)}; } \ + static name FromUint(type_store id) { return name{id}; } \ + static name FromInt(type_conv id) { return name{utils::MemcpyCast(id)}; } \ + type_store AsUint() const { return id_; } \ + type_conv AsInt() const { return utils::MemcpyCast(id_); } \ + static name FromString(std::string_view id) { return name{parse(id)}; } \ std::string ToString() const { return std::to_string(id_); } \ \ private: \ - uint64_t id_; \ + type_store id_; \ }; \ static_assert(std::is_trivially_copyable_v, "storage::" #name " must be trivially copyable!"); \ inline bool operator==(const name &first, const name &second) { return first.AsUint() == second.AsUint(); } \ @@ -50,10 +50,10 @@ namespace memgraph::storage { inline bool operator<=(const name &first, const name &second) { return first.AsUint() <= second.AsUint(); } \ inline bool operator>=(const name &first, const name &second) { return first.AsUint() >= second.AsUint(); } -STORAGE_DEFINE_ID_TYPE(Gid); -STORAGE_DEFINE_ID_TYPE(LabelId); -STORAGE_DEFINE_ID_TYPE(PropertyId); -STORAGE_DEFINE_ID_TYPE(EdgeTypeId); +STORAGE_DEFINE_ID_TYPE(Gid, uint64_t, int64_t, utils::ParseStringToUint64); +STORAGE_DEFINE_ID_TYPE(LabelId, uint32_t, int32_t, utils::ParseStringToUint32); +STORAGE_DEFINE_ID_TYPE(PropertyId, uint32_t, int32_t, utils::ParseStringToUint32); +STORAGE_DEFINE_ID_TYPE(EdgeTypeId, uint32_t, int32_t, utils::ParseStringToUint32); #undef STORAGE_DEFINE_ID_TYPE diff --git a/src/storage/v2/indices/indices_utils.hpp b/src/storage/v2/indices/indices_utils.hpp index 054609188..52938a1db 100644 --- a/src/storage/v2/indices/indices_utils.hpp +++ b/src/storage/v2/indices/indices_utils.hpp @@ -72,13 +72,13 @@ inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t tim return AnyVersionSatisfiesPredicate(timestamp, delta, [&has_label, &deleted, label](const Delta &delta) { switch (delta.action) { case Delta::Action::ADD_LABEL: - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; } break; case Delta::Action::REMOVE_LABEL: - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; } @@ -135,20 +135,20 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop timestamp, delta, [&has_label, ¤t_value_equal_to_value, &deleted, label, key, &value](const Delta &delta) { switch (delta.action) { case Delta::Action::ADD_LABEL: - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; } break; case Delta::Action::REMOVE_LABEL: - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; } break; case Delta::Action::SET_PROPERTY: if (delta.property.key == key) { - current_value_equal_to_value = delta.property.value == value; + current_value_equal_to_value = *delta.property.value == value; } break; case Delta::Action::RECREATE_OBJECT: { diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 381a67d3f..c97d12072 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -176,9 +176,9 @@ InMemoryStorage::~InMemoryStorage() { committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); }); } -InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, - StorageMode storage_mode, - memgraph::replication_coordination_glue::ReplicationRole replication_role) +InMemoryStorage::InMemoryAccessor::InMemoryAccessor( + auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode, + memgraph::replication_coordination_glue::ReplicationRole replication_role) : Accessor(tag, storage, isolation_level, storage_mode, replication_role), config_(storage->config_.salient.items) {} InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) noexcept @@ -757,7 +757,7 @@ utils::BasicResult InMemoryStorage::InMemoryAcce auto *mem_storage = static_cast(storage_); // TODO: duplicated transaction finalisation in md_deltas and deltas processing cases - if (transaction_.deltas.use().empty() && transaction_.md_deltas.empty()) { + if (transaction_.deltas.empty() && transaction_.md_deltas.empty()) { // We don't have to update the commit timestamp here because no one reads // it. mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp); @@ -836,25 +836,37 @@ utils::BasicResult InMemoryStorage::InMemoryAcce // Replica can log only the write transaction received from Main // so the Wal files are consistent if (is_main_or_replica_write) { - could_replicate_all_sync_replicas = mem_storage->AppendToWal(transaction_, *commit_timestamp_, - std::move(db_acc)); // protected by engine_guard + could_replicate_all_sync_replicas = + mem_storage->AppendToWal(transaction_, *commit_timestamp_, std::move(db_acc)); // TODO: release lock, and update all deltas to have a local copy of the commit timestamp MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); - transaction_.commit_timestamp->store(*commit_timestamp_, - std::memory_order_release); // protected by engine_guard + transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // Replica can only update the last commit timestamp with // the commits received from main. // Update the last commit timestamp - mem_storage->repl_storage_state_.last_commit_timestamp_.store( - *commit_timestamp_); // protected by engine_guard + mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); } - // Release engine lock because we don't have to hold it anymore - engine_guard.unlock(); + // TODO: can and should this be moved earlier? mem_storage->commit_log_->MarkFinished(start_timestamp); + + // while still holding engine lock + // and after durability + replication + // check if we can fast discard deltas (ie. do not hand over to GC) + bool no_older_transactions = mem_storage->commit_log_->OldestActive() == *commit_timestamp_; + bool no_newer_transactions = mem_storage->transaction_id_ == transaction_.transaction_id + 1; + if (no_older_transactions && no_newer_transactions) [[unlikely]] { + // STEP 0) Can only do fast discard if GC is not running + // We can't unlink our transcations deltas until all of the older deltas in GC have been unlinked + // must do a try here, to avoid deadlock between transactions `engine_lock_` and the GC `gc_lock_` + auto gc_guard = std::unique_lock{mem_storage->gc_lock_, std::defer_lock}; + if (gc_guard.try_lock()) { + FastDiscardOfDeltas(*commit_timestamp_, std::move(gc_guard)); + } + } } - } + } // Release engine lock because we don't have to hold it anymore if (unique_constraint_violation) { Abort(); @@ -873,241 +885,332 @@ utils::BasicResult InMemoryStorage::InMemoryAcce return {}; } +void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_active_timestamp, + std::unique_lock /*gc_guard*/) { + auto *mem_storage = static_cast(storage_); + std::list current_deleted_edges; + std::list current_deleted_vertices; + + auto const unlink_remove_clear = [&](std::deque &deltas) { + for (auto &delta : deltas) { + auto prev = delta.prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::NULLPTR: + case PreviousPtr::Type::DELTA: + break; + case PreviousPtr::Type::VERTEX: { + // safe because no other txn can be reading this while we have engine lock + auto &vertex = *prev.vertex; + vertex.delta = nullptr; + if (vertex.deleted) { + DMG_ASSERT(delta.action == Delta::Action::RECREATE_OBJECT); + current_deleted_vertices.push_back(vertex.gid); + } + break; + } + case PreviousPtr::Type::EDGE: { + // safe because no other txn can be reading this while we have engine lock + auto &edge = *prev.edge; + edge.delta = nullptr; + if (edge.deleted) { + DMG_ASSERT(delta.action == Delta::Action::RECREATE_OBJECT); + current_deleted_edges.push_back(edge.gid); + } + break; + } + } + } + // delete deltas + deltas.clear(); + }; + + // STEP 1) ensure everything in GC is gone + + // 1.a) old garbage_undo_buffers are safe to remove + // we are the only transaction, no one is reading those unlinked deltas + mem_storage->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { garbage_undo_buffers.clear(); }); + + // 1.b.0) old committed_transactions_ need mininal unlinking + remove + clear + // must be done before this transactions delta unlinking + auto linked_undo_buffers = std::list{}; + mem_storage->committed_transactions_.WithLock( + [&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); }); + + // 1.b.1) unlink, gathering the removals + for (auto &gc_deltas : linked_undo_buffers) { + unlink_remove_clear(gc_deltas.deltas_); + } + // 1.b.2) clear the list of deltas deques + linked_undo_buffers.clear(); + + // STEP 2) this transactions deltas also mininal unlinking + remove + clear + unlink_remove_clear(transaction_.deltas); + + // STEP 3) skip_list removals + if (!current_deleted_vertices.empty()) { + // 3.a) clear from indexes first + std::stop_source dummy; + mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token()); + auto *mem_unique_constraints = + static_cast(mem_storage->constraints_.unique_constraints_.get()); + mem_unique_constraints->RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token()); + + // 3.b) remove from veretex skip_list + auto vertex_acc = mem_storage->vertices_.access(); + for (auto gid : current_deleted_vertices) { + vertex_acc.remove(gid); + } + } + + if (!current_deleted_edges.empty()) { + // 3.c) remove from edge skip_list + auto edge_acc = mem_storage->edges_.access(); + for (auto gid : current_deleted_edges) { + edge_acc.remove(gid); + } + } +} + void InMemoryStorage::InMemoryAccessor::Abort() { MG_ASSERT(is_transaction_active_, "The transaction is already terminated!"); - // We collect vertices and edges we've created here and then splice them into - // `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one - // by one and acquiring lock every time. - std::list my_deleted_vertices; - std::list my_deleted_edges; + auto *mem_storage = static_cast(storage_); - std::map> label_cleanup; - std::map>> label_property_cleanup; - std::map>> property_cleanup; + // if we have no deltas then no need to do any undo work during Abort + // note: this check also saves on unnecessary contention on `engine_lock_` + if (!transaction_.deltas.empty()) { + // CONSTRAINTS + if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) { + // Need to remove elements from constraints before handling of the deltas, so the elements match the correct + // values + auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking(); + auto vertices_to_check_v = std::vector{vertices_to_check.begin(), vertices_to_check.end()}; + storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp); + } - // CONSTRAINTS - if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) { - // Need to remove elements from constraints before handling of the deltas, so the elements match the correct - // values - auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking(); - auto vertices_to_check_v = std::vector{vertices_to_check.begin(), vertices_to_check.end()}; - storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp); - } + const auto index_stats = storage_->indices_.Analysis(); - const auto index_stats = storage_->indices_.Analysis(); + // We collect vertices and edges we've created here and then splice them into + // `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one + // by one and acquiring lock every time. + std::list my_deleted_vertices; + std::list my_deleted_edges; - for (const auto &delta : transaction_.deltas.use()) { - auto prev = delta.prev.Get(); - switch (prev.type) { - case PreviousPtr::Type::VERTEX: { - auto *vertex = prev.vertex; - auto guard = std::unique_lock{vertex->lock}; - Delta *current = vertex->delta; - while (current != nullptr && - current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { - switch (current->action) { - case Delta::Action::REMOVE_LABEL: { - auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label); - MG_ASSERT(it != vertex->labels.end(), "Invalid database state!"); - std::swap(*it, *vertex->labels.rbegin()); - vertex->labels.pop_back(); + std::map> label_cleanup; + std::map>> label_property_cleanup; + std::map>> property_cleanup; - // For label index - // check if there is a label index for the label and add entry if so - // For property label index - // check if we care about the label; this will return all the propertyIds we care about and then get - // the current property value - if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) { - label_cleanup[current->label].emplace_back(vertex); - } - const auto &properties = index_stats.property_label.l2p.find(current->label); - if (properties != index_stats.property_label.l2p.end()) { - for (const auto &property : properties->second) { - auto current_value = vertex->properties.GetProperty(property); - if (!current_value.IsNull()) { - label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex); + for (const auto &delta : transaction_.deltas) { + auto prev = delta.prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::VERTEX: { + auto *vertex = prev.vertex; + auto guard = std::unique_lock{vertex->lock}; + Delta *current = vertex->delta; + while (current != nullptr && + current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { + switch (current->action) { + case Delta::Action::REMOVE_LABEL: { + auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label.value); + MG_ASSERT(it != vertex->labels.end(), "Invalid database state!"); + std::swap(*it, *vertex->labels.rbegin()); + vertex->labels.pop_back(); + + // For label index + // check if there is a label index for the label and add entry if so + // For property label index + // check if we care about the label; this will return all the propertyIds we care about and then get + // the current property value + if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label.value)) { + label_cleanup[current->label.value].emplace_back(vertex); + } + const auto &properties = index_stats.property_label.l2p.find(current->label.value); + if (properties != index_stats.property_label.l2p.end()) { + for (const auto &property : properties->second) { + auto current_value = vertex->properties.GetProperty(property); + if (!current_value.IsNull()) { + label_property_cleanup[current->label.value].emplace_back(std::move(current_value), vertex); + } } } + break; } - break; - } - case Delta::Action::ADD_LABEL: { - auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label); - MG_ASSERT(it == vertex->labels.end(), "Invalid database state!"); - vertex->labels.push_back(current->label); - break; - } - case Delta::Action::SET_PROPERTY: { - // For label index nothing - // For property label index - // check if we care about the property, this will return all the labels and then get current property - // value - const auto &labels = index_stats.property_label.p2l.find(current->property.key); - if (labels != index_stats.property_label.p2l.end()) { - auto current_value = vertex->properties.GetProperty(current->property.key); - if (!current_value.IsNull()) { - property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex); + case Delta::Action::ADD_LABEL: { + auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label.value); + MG_ASSERT(it == vertex->labels.end(), "Invalid database state!"); + vertex->labels.push_back(current->label.value); + break; + } + case Delta::Action::SET_PROPERTY: { + // For label index nothing + // For property label index + // check if we care about the property, this will return all the labels and then get current property + // value + const auto &labels = index_stats.property_label.p2l.find(current->property.key); + if (labels != index_stats.property_label.p2l.end()) { + auto current_value = vertex->properties.GetProperty(current->property.key); + if (!current_value.IsNull()) { + property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex); + } } + // Setting the correct value + vertex->properties.SetProperty(current->property.key, *current->property.value); + break; + } + case Delta::Action::ADD_IN_EDGE: { + std::tuple link{current->vertex_edge.edge_type, + current->vertex_edge.vertex, current->vertex_edge.edge}; + auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link); + MG_ASSERT(it == vertex->in_edges.end(), "Invalid database state!"); + vertex->in_edges.push_back(link); + break; + } + case Delta::Action::ADD_OUT_EDGE: { + std::tuple link{current->vertex_edge.edge_type, + current->vertex_edge.vertex, current->vertex_edge.edge}; + auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link); + MG_ASSERT(it == vertex->out_edges.end(), "Invalid database state!"); + vertex->out_edges.push_back(link); + // Increment edge count. We only increment the count here because + // the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is + // redundant. Also, `Edge/RECREATE_OBJECT` isn't available when + // edge properties are disabled. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + break; + } + case Delta::Action::REMOVE_IN_EDGE: { + std::tuple link{current->vertex_edge.edge_type, + current->vertex_edge.vertex, current->vertex_edge.edge}; + auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link); + MG_ASSERT(it != vertex->in_edges.end(), "Invalid database state!"); + std::swap(*it, *vertex->in_edges.rbegin()); + vertex->in_edges.pop_back(); + break; + } + case Delta::Action::REMOVE_OUT_EDGE: { + std::tuple link{current->vertex_edge.edge_type, + current->vertex_edge.vertex, current->vertex_edge.edge}; + auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link); + MG_ASSERT(it != vertex->out_edges.end(), "Invalid database state!"); + std::swap(*it, *vertex->out_edges.rbegin()); + vertex->out_edges.pop_back(); + // Decrement edge count. We only decrement the count here because + // the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is + // redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge + // properties are disabled. + storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); + break; + } + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: { + vertex->deleted = true; + my_deleted_vertices.push_back(vertex->gid); + break; + } + case Delta::Action::RECREATE_OBJECT: { + vertex->deleted = false; + break; } - // Setting the correct value - vertex->properties.SetProperty(current->property.key, current->property.value); - break; - } - case Delta::Action::ADD_IN_EDGE: { - std::tuple link{current->vertex_edge.edge_type, - current->vertex_edge.vertex, current->vertex_edge.edge}; - auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link); - MG_ASSERT(it == vertex->in_edges.end(), "Invalid database state!"); - vertex->in_edges.push_back(link); - break; - } - case Delta::Action::ADD_OUT_EDGE: { - std::tuple link{current->vertex_edge.edge_type, - current->vertex_edge.vertex, current->vertex_edge.edge}; - auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link); - MG_ASSERT(it == vertex->out_edges.end(), "Invalid database state!"); - vertex->out_edges.push_back(link); - // Increment edge count. We only increment the count here because - // the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is - // redundant. Also, `Edge/RECREATE_OBJECT` isn't available when - // edge properties are disabled. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); - break; - } - case Delta::Action::REMOVE_IN_EDGE: { - std::tuple link{current->vertex_edge.edge_type, - current->vertex_edge.vertex, current->vertex_edge.edge}; - auto it = std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link); - MG_ASSERT(it != vertex->in_edges.end(), "Invalid database state!"); - std::swap(*it, *vertex->in_edges.rbegin()); - vertex->in_edges.pop_back(); - break; - } - case Delta::Action::REMOVE_OUT_EDGE: { - std::tuple link{current->vertex_edge.edge_type, - current->vertex_edge.vertex, current->vertex_edge.edge}; - auto it = std::find(vertex->out_edges.begin(), vertex->out_edges.end(), link); - MG_ASSERT(it != vertex->out_edges.end(), "Invalid database state!"); - std::swap(*it, *vertex->out_edges.rbegin()); - vertex->out_edges.pop_back(); - // Decrement edge count. We only decrement the count here because - // the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is - // redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge - // properties are disabled. - storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); - break; - } - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: { - vertex->deleted = true; - my_deleted_vertices.push_back(vertex->gid); - break; - } - case Delta::Action::RECREATE_OBJECT: { - vertex->deleted = false; - break; } + current = current->next.load(std::memory_order_acquire); } - current = current->next.load(std::memory_order_acquire); - } - vertex->delta = current; - if (current != nullptr) { - current->prev.Set(vertex); - } - - break; - } - case PreviousPtr::Type::EDGE: { - auto *edge = prev.edge; - auto guard = std::lock_guard{edge->lock}; - Delta *current = edge->delta; - while (current != nullptr && - current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { - switch (current->action) { - case Delta::Action::SET_PROPERTY: { - edge->properties.SetProperty(current->property.key, current->property.value); - break; - } - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: { - edge->deleted = true; - my_deleted_edges.push_back(edge->gid); - break; - } - case Delta::Action::RECREATE_OBJECT: { - edge->deleted = false; - break; - } - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_LABEL: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: { - LOG_FATAL("Invalid database state!"); - break; - } + vertex->delta = current; + if (current != nullptr) { + current->prev.Set(vertex); } - current = current->next.load(std::memory_order_acquire); + + break; } - edge->delta = current; - if (current != nullptr) { - current->prev.Set(edge); + case PreviousPtr::Type::EDGE: { + auto *edge = prev.edge; + auto guard = std::lock_guard{edge->lock}; + Delta *current = edge->delta; + while (current != nullptr && + current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { + switch (current->action) { + case Delta::Action::SET_PROPERTY: { + edge->properties.SetProperty(current->property.key, *current->property.value); + break; + } + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: { + edge->deleted = true; + my_deleted_edges.push_back(edge->gid); + break; + } + case Delta::Action::RECREATE_OBJECT: { + edge->deleted = false; + break; + } + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_LABEL: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: { + LOG_FATAL("Invalid database state!"); + break; + } + } + current = current->next.load(std::memory_order_acquire); + } + edge->delta = current; + if (current != nullptr) { + current->prev.Set(edge); + } + + break; } - - break; - } - case PreviousPtr::Type::DELTA: - // pointer probably couldn't be set because allocation failed - case PreviousPtr::Type::NULLPTR: - break; - } - } - - auto *mem_storage = static_cast(storage_); - { - auto engine_guard = std::unique_lock(storage_->engine_lock_); - uint64_t mark_timestamp = storage_->timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. - mem_storage->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { - // Release engine lock because we don't have to hold it anymore and - // emplace back could take a long time. - engine_guard.unlock(); - - garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas), - std::move(transaction_.commit_timestamp)); - }); - - /// We MUST unlink (aka. remove) entries in indexes and constraints - /// before we unlink (aka. remove) vertices from storage - /// this is because they point into vertices skip_list - - // INDICES - for (auto const &[label, vertices] : label_cleanup) { - storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp); - } - for (auto const &[label, prop_vertices] : label_property_cleanup) { - storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp); - } - for (auto const &[property, prop_vertices] : property_cleanup) { - storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp); - } - - // VERTICES - { - auto vertices_acc = mem_storage->vertices_.access(); - for (auto gid : my_deleted_vertices) { - vertices_acc.remove(gid); + case PreviousPtr::Type::DELTA: + // pointer probably couldn't be set because allocation failed + case PreviousPtr::Type::NULLPTR: + break; } } - // EDGES { - auto edges_acc = mem_storage->edges_.access(); - for (auto gid : my_deleted_edges) { - edges_acc.remove(gid); + auto engine_guard = std::unique_lock(storage_->engine_lock_); + uint64_t mark_timestamp = storage_->timestamp_; + // Take garbage_undo_buffers lock while holding the engine lock to make + // sure that entries are sorted by mark timestamp in the list. + mem_storage->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + engine_guard.unlock(); + + garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas), + std::move(transaction_.commit_timestamp)); + }); + + /// We MUST unlink (aka. remove) entries in indexes and constraints + /// before we unlink (aka. remove) vertices from storage + /// this is because they point into vertices skip_list + + // INDICES + for (auto const &[label, vertices] : label_cleanup) { + storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp); + } + for (auto const &[label, prop_vertices] : label_property_cleanup) { + storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp); + } + for (auto const &[property, prop_vertices] : property_cleanup) { + storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp); + } + + // VERTICES + { + auto vertices_acc = mem_storage->vertices_.access(); + for (auto gid : my_deleted_vertices) { + vertices_acc.remove(gid); + } + } + + // EDGES + { + auto edges_acc = mem_storage->edges_.access(); + for (auto gid : my_deleted_edges) { + edges_acc.remove(gid); + } } } } @@ -1121,7 +1224,7 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() { auto *mem_storage = static_cast(storage_); mem_storage->commit_log_->MarkFinished(*commit_timestamp_); - if (!transaction_.deltas.use().empty()) { + if (!transaction_.deltas.empty()) { // Only hand over delta to be GC'ed if there was any deltas mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) { // using mark of 0 as GC will assign a mark_timestamp after unlinking @@ -1462,7 +1565,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ // chain in a broken state. // The chain can be only read without taking any locks. - for (Delta &delta : linked_entry->deltas_.use()) { + for (Delta &delta : linked_entry->deltas_) { while (true) { auto prev = delta.prev.Get(); switch (prev.type) { @@ -1781,7 +1884,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final // 1. Process all Vertex deltas and store all operations that create vertices // and modify vertex data. - for (const auto &delta : transaction.deltas.use()) { + for (const auto &delta : transaction.deltas) { auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); if (prev.type != PreviousPtr::Type::VERTEX) continue; @@ -1804,7 +1907,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final }); } // 2. Process all Vertex deltas and store all operations that create edges. - for (const auto &delta : transaction.deltas.use()) { + for (const auto &delta : transaction.deltas) { auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); if (prev.type != PreviousPtr::Type::VERTEX) continue; @@ -1826,7 +1929,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final }); } // 3. Process all Edge deltas and store all operations that modify edge data. - for (const auto &delta : transaction.deltas.use()) { + for (const auto &delta : transaction.deltas) { auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); if (prev.type != PreviousPtr::Type::EDGE) continue; @@ -1848,7 +1951,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final }); } // 4. Process all Vertex deltas and store all operations that delete edges. - for (const auto &delta : transaction.deltas.use()) { + for (const auto &delta : transaction.deltas) { auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); if (prev.type != PreviousPtr::Type::VERTEX) continue; @@ -1870,7 +1973,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final }); } // 5. Process all Vertex deltas and store all operations that delete vertices. - for (const auto &delta : transaction.deltas.use()) { + for (const auto &delta : transaction.deltas) { auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); if (prev.type != PreviousPtr::Type::VERTEX) continue; @@ -1894,7 +1997,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final }; // Handle MVCC deltas - if (!transaction.deltas.use().empty()) { + if (!transaction.deltas.empty()) { append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) { wal_file_->AppendDelta(delta, parent, timestamp); repl_storage_state_.AppendDelta(delta, parent, timestamp); diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 6f8806c26..26abe4faf 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -302,6 +302,9 @@ class InMemoryStorage final : public Storage { /// @throw std::bad_alloc Result CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid); + /// Duiring commit, in some cases you do not need to hand over deltas to GC + /// in those cases this method is a light weight way to unlink and discard our deltas + void FastDiscardOfDeltas(uint64_t oldest_active_timestamp, std::unique_lock gc_guard); SalientConfig::Items config_; }; @@ -429,16 +432,15 @@ class InMemoryStorage final : public Storage { utils::Scheduler gc_runner_; std::mutex gc_lock_; - using BondPmrLd = Bond>; struct GCDeltas { - GCDeltas(uint64_t mark_timestamp, BondPmrLd deltas, std::unique_ptr> commit_timestamp) + GCDeltas(uint64_t mark_timestamp, std::deque deltas, std::unique_ptr> commit_timestamp) : mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {} GCDeltas(GCDeltas &&) = default; GCDeltas &operator=(GCDeltas &&) = default; uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has - BondPmrLd deltas_; //!< the deltas that need cleaning + std::deque deltas_; //!< the deltas that need cleaning std::unique_ptr> commit_timestamp_{}; //!< the timestamp the deltas are pointing at }; diff --git a/src/storage/v2/inmemory/unique_constraints.cpp b/src/storage/v2/inmemory/unique_constraints.cpp index 667d0229f..e08965eab 100644 --- a/src/storage/v2/inmemory/unique_constraints.cpp +++ b/src/storage/v2/inmemory/unique_constraints.cpp @@ -80,7 +80,7 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c case Delta::Action::SET_PROPERTY: { auto pos = FindPropertyPosition(property_array, delta->property.key); if (pos) { - current_value_equal_to_value[*pos] = delta->property.value == value_array[*pos]; + current_value_equal_to_value[*pos] = *delta->property.value == value_array[*pos]; } break; } @@ -96,14 +96,14 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c break; } case Delta::Action::ADD_LABEL: { - if (delta->label == label) { + if (delta->label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; break; } } case Delta::Action::REMOVE_LABEL: { - if (delta->label == label) { + if (delta->label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; break; @@ -190,13 +190,13 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std:: } switch (delta->action) { case Delta::Action::ADD_LABEL: - if (delta->label == label) { + if (delta->label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; } break; case Delta::Action::REMOVE_LABEL: - if (delta->label == label) { + if (delta->label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; } @@ -204,7 +204,7 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std:: case Delta::Action::SET_PROPERTY: { auto pos = FindPropertyPosition(property_array, delta->property.key); if (pos) { - current_value_equal_to_value[*pos] = delta->property.value == values[*pos]; + current_value_equal_to_value[*pos] = *delta->property.value == values[*pos]; } break; } diff --git a/src/storage/v2/mvcc.hpp b/src/storage/v2/mvcc.hpp index f046a9b01..1cf057d9d 100644 --- a/src/storage/v2/mvcc.hpp +++ b/src/storage/v2/mvcc.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -114,8 +114,8 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { return nullptr; } transaction->EnsureCommitTimestampExists(); - return &transaction->deltas.use().emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), - transaction->command_id); + return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), + transaction->command_id); } inline Delta *CreateDeleteObjectDelta(Transaction *transaction, std::list *deltas) { @@ -133,19 +133,19 @@ inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std: transaction->EnsureCommitTimestampExists(); // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps uint64_t ts_id = utils::ParseStringToUint64(ts); - return &transaction->deltas.use().emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, old_disk_key); + return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, std::move(old_disk_key)); } inline Delta *CreateDeleteDeserializedObjectDelta(std::list *deltas, std::optional old_disk_key, std::string &&ts) { // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps uint64_t ts_id = utils::ParseStringToUint64(ts); - return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, old_disk_key); + return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, std::move(old_disk_key)); } inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list &deltas, std::optional old_disk_key, const uint64_t ts) { - return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key); + return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, std::move(old_disk_key)); } /// TODO: what if in-memory analytical @@ -165,8 +165,8 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&.. return; } transaction->EnsureCommitTimestampExists(); - auto delta = &transaction->deltas.use().emplace_back(std::forward(args)..., transaction->commit_timestamp.get(), - transaction->command_id); + auto delta = &transaction->deltas.emplace_back(std::forward(args)..., transaction->commit_timestamp.get(), + transaction->command_id); // The operations are written in such order so that both `next` and `prev` // chains are valid at all times. The chains must be valid at all times diff --git a/src/storage/v2/property_value.hpp b/src/storage/v2/property_value.hpp index 727c75377..e48be008a 100644 --- a/src/storage/v2/property_value.hpp +++ b/src/storage/v2/property_value.hpp @@ -57,38 +57,24 @@ class PropertyValue { PropertyValue() : type_(Type::Null) {} // constructors for primitive types - explicit PropertyValue(const bool value) : type_(Type::Bool) { bool_v = value; } - explicit PropertyValue(const int value) : type_(Type::Int) { int_v = value; } - explicit PropertyValue(const int64_t value) : type_(Type::Int) { int_v = value; } - explicit PropertyValue(const double value) : type_(Type::Double) { double_v = value; } - explicit PropertyValue(const TemporalData value) : type_{Type::TemporalData} { temporal_data_v = value; } + explicit PropertyValue(const bool value) : bool_v{.val_ = value} {} + explicit PropertyValue(const int value) : int_v{.val_ = value} {} + explicit PropertyValue(const int64_t value) : int_v{.val_ = value} {} + explicit PropertyValue(const double value) : double_v{.val_ = value} {} + explicit PropertyValue(const TemporalData value) : temporal_data_v{.val_ = value} {} // copy constructors for non-primitive types /// @throw std::bad_alloc - explicit PropertyValue(const std::string &value) : type_(Type::String) { new (&string_v) std::string(value); } + explicit PropertyValue(std::string value) : string_v{.val_ = std::move(value)} {} /// @throw std::bad_alloc /// @throw std::length_error if length of value exceeds /// std::string::max_length(). - explicit PropertyValue(const char *value) : type_(Type::String) { new (&string_v) std::string(value); } + explicit PropertyValue(std::string_view value) : string_v{.val_ = std::string(value)} {} + explicit PropertyValue(char const *value) : string_v{.val_ = std::string(value)} {} /// @throw std::bad_alloc - explicit PropertyValue(const std::vector &value) : type_(Type::List) { - new (&list_v) std::vector(value); - } + explicit PropertyValue(std::vector value) : list_v{.val_ = std::move(value)} {} /// @throw std::bad_alloc - explicit PropertyValue(const std::map &value) : type_(Type::Map) { - new (&map_v) std::map(value); - } - - // move constructors for non-primitive types - explicit PropertyValue(std::string &&value) noexcept : type_(Type::String) { - new (&string_v) std::string(std::move(value)); - } - explicit PropertyValue(std::vector &&value) noexcept : type_(Type::List) { - new (&list_v) std::vector(std::move(value)); - } - explicit PropertyValue(std::map &&value) noexcept : type_(Type::Map) { - new (&map_v) std::map(std::move(value)); - } + explicit PropertyValue(std::map value) : map_v{.val_ = std::move(value)} {} // copy constructor /// @throw std::bad_alloc @@ -126,21 +112,21 @@ class PropertyValue { if (type_ != Type::Bool) [[unlikely]] { throw PropertyValueException("The value isn't a bool!"); } - return bool_v; + return bool_v.val_; } /// @throw PropertyValueException if value isn't of correct type. int64_t ValueInt() const { if (type_ != Type::Int) [[unlikely]] { throw PropertyValueException("The value isn't an int!"); } - return int_v; + return int_v.val_; } /// @throw PropertyValueException if value isn't of correct type. double ValueDouble() const { if (type_ != Type::Double) [[unlikely]] { throw PropertyValueException("The value isn't a double!"); } - return double_v; + return double_v.val_; } /// @throw PropertyValueException if value isn't of correct type. @@ -149,7 +135,7 @@ class PropertyValue { throw PropertyValueException("The value isn't a temporal data!"); } - return temporal_data_v; + return temporal_data_v.val_; } // const value getters for non-primitive types @@ -158,7 +144,7 @@ class PropertyValue { if (type_ != Type::String) [[unlikely]] { throw PropertyValueException("The value isn't a string!"); } - return string_v; + return string_v.val_; } /// @throw PropertyValueException if value isn't of correct type. @@ -166,7 +152,7 @@ class PropertyValue { if (type_ != Type::List) [[unlikely]] { throw PropertyValueException("The value isn't a list!"); } - return list_v; + return list_v.val_; } /// @throw PropertyValueException if value isn't of correct type. @@ -174,7 +160,7 @@ class PropertyValue { if (type_ != Type::Map) [[unlikely]] { throw PropertyValueException("The value isn't a map!"); } - return map_v; + return map_v.val_; } // reference value getters for non-primitive types @@ -183,7 +169,7 @@ class PropertyValue { if (type_ != Type::String) [[unlikely]] { throw PropertyValueException("The value isn't a string!"); } - return string_v; + return string_v.val_; } /// @throw PropertyValueException if value isn't of correct type. @@ -191,7 +177,7 @@ class PropertyValue { if (type_ != Type::List) [[unlikely]] { throw PropertyValueException("The value isn't a list!"); } - return list_v; + return list_v.val_; } /// @throw PropertyValueException if value isn't of correct type. @@ -199,23 +185,45 @@ class PropertyValue { if (type_ != Type::Map) [[unlikely]] { throw PropertyValueException("The value isn't a map!"); } - return map_v; + return map_v.val_; } private: void DestroyValue() noexcept; + // NOTE: this may look strange but it is for better data layout + // https://eel.is/c++draft/class.union#general-note-1 union { - bool bool_v; - int64_t int_v; - double double_v; - std::string string_v; - std::vector list_v; - std::map map_v; - TemporalData temporal_data_v; + Type type_; + struct { + Type type_ = Type::Bool; + bool val_; + } bool_v; + struct { + Type type_ = Type::Int; + int64_t val_; + } int_v; + struct { + Type type_ = Type::Double; + double val_; + } double_v; + struct { + Type type_ = Type::String; + std::string val_; + } string_v; + struct { + Type type_ = Type::List; + std::vector val_; + } list_v; + struct { + Type type_ = Type::Map; + std::map val_; + } map_v; + struct { + Type type_ = Type::TemporalData; + TemporalData val_; + } temporal_data_v; }; - - Type type_; }; // stream output @@ -340,25 +348,25 @@ inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.ty case Type::Null: return; case Type::Bool: - this->bool_v = other.bool_v; + this->bool_v.val_ = other.bool_v.val_; return; case Type::Int: - this->int_v = other.int_v; + this->int_v.val_ = other.int_v.val_; return; case Type::Double: - this->double_v = other.double_v; + this->double_v.val_ = other.double_v.val_; return; case Type::String: - new (&string_v) std::string(other.string_v); + new (&string_v.val_) std::string(other.string_v.val_); return; case Type::List: - new (&list_v) std::vector(other.list_v); + new (&list_v.val_) std::vector(other.list_v.val_); return; case Type::Map: - new (&map_v) std::map(other.map_v); + new (&map_v.val_) std::map(other.map_v.val_); return; case Type::TemporalData: - this->temporal_data_v = other.temporal_data_v; + this->temporal_data_v.val_ = other.temporal_data_v.val_; return; } } @@ -368,28 +376,28 @@ inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std: case Type::Null: break; case Type::Bool: - bool_v = other.bool_v; + bool_v.val_ = other.bool_v.val_; break; case Type::Int: - int_v = other.int_v; + int_v.val_ = other.int_v.val_; break; case Type::Double: - double_v = other.double_v; + double_v.val_ = other.double_v.val_; break; case Type::String: - std::construct_at(&string_v, std::move(other.string_v)); - std::destroy_at(&other.string_v); + std::construct_at(&string_v.val_, std::move(other.string_v.val_)); + std::destroy_at(&other.string_v.val_); break; case Type::List: - std::construct_at(&list_v, std::move(other.list_v)); - std::destroy_at(&other.list_v); + std::construct_at(&list_v.val_, std::move(other.list_v.val_)); + std::destroy_at(&other.list_v.val_); break; case Type::Map: - std::construct_at(&map_v, std::move(other.map_v)); - std::destroy_at(&other.map_v); + std::construct_at(&map_v.val_, std::move(other.map_v.val_)); + std::destroy_at(&other.map_v.val_); break; case Type::TemporalData: - temporal_data_v = other.temporal_data_v; + temporal_data_v.val_ = other.temporal_data_v.val_; break; } } @@ -404,25 +412,25 @@ inline PropertyValue &PropertyValue::operator=(const PropertyValue &other) { case Type::Null: break; case Type::Bool: - this->bool_v = other.bool_v; + this->bool_v.val_ = other.bool_v.val_; break; case Type::Int: - this->int_v = other.int_v; + this->int_v.val_ = other.int_v.val_; break; case Type::Double: - this->double_v = other.double_v; + this->double_v.val_ = other.double_v.val_; break; case Type::String: - new (&string_v) std::string(other.string_v); + new (&string_v.val_) std::string(other.string_v.val_); break; case Type::List: - new (&list_v) std::vector(other.list_v); + new (&list_v.val_) std::vector(other.list_v.val_); break; case Type::Map: - new (&map_v) std::map(other.map_v); + new (&map_v.val_) std::map(other.map_v.val_); break; case Type::TemporalData: - this->temporal_data_v = other.temporal_data_v; + this->temporal_data_v.val_ = other.temporal_data_v.val_; break; } @@ -438,28 +446,28 @@ inline PropertyValue &PropertyValue::operator=(PropertyValue &&other) noexcept { case Type::Null: break; case Type::Bool: - bool_v = other.bool_v; + bool_v.val_ = other.bool_v.val_; break; case Type::Int: - int_v = other.int_v; + int_v.val_ = other.int_v.val_; break; case Type::Double: - double_v = other.double_v; + double_v.val_ = other.double_v.val_; break; case Type::String: - string_v = std::move(other.string_v); - std::destroy_at(&other.string_v); + string_v.val_ = std::move(other.string_v.val_); + std::destroy_at(&other.string_v.val_); break; case Type::List: - list_v = std::move(other.list_v); - std::destroy_at(&other.list_v); + list_v.val_ = std::move(other.list_v.val_); + std::destroy_at(&other.list_v.val_); break; case Type::Map: - map_v = std::move(other.map_v); - std::destroy_at(&other.map_v); + map_v.val_ = std::move(other.map_v.val_); + std::destroy_at(&other.map_v.val_); break; case Type::TemporalData: - temporal_data_v = other.temporal_data_v; + temporal_data_v.val_ = other.temporal_data_v.val_; break; } other.type_ = Type::Null; @@ -482,13 +490,13 @@ inline void PropertyValue::DestroyValue() noexcept { // destructor for non primitive types since we used placement new case Type::String: - std::destroy_at(&string_v); + std::destroy_at(&string_v.val_); return; case Type::List: - std::destroy_at(&list_v); + std::destroy_at(&list_v.val_); return; case Type::Map: - std::destroy_at(&map_v); + std::destroy_at(&map_v.val_); return; } } diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index 2bdd68a94..9f973cbf0 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,7 +13,6 @@ #include #include -#include #include #include "utils/memory.hpp" @@ -39,7 +38,6 @@ namespace memgraph::storage { const uint64_t kTimestampInitialId = 0; const uint64_t kTransactionInitialId = 1ULL << 63U; -using PmrListDelta = utils::pmr::list; struct Transaction { Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level, @@ -47,7 +45,6 @@ struct Transaction { : transaction_id(transaction_id), start_timestamp(start_timestamp), command_id(0), - deltas(0), md_deltas(utils::NewDeleteResource()), must_abort(false), isolation_level(isolation_level), @@ -91,7 +88,7 @@ struct Transaction { std::unique_ptr> commit_timestamp{}; uint64_t command_id{}; - Bond deltas; + std::deque deltas; utils::pmr::list md_deltas; bool must_abort{}; IsolationLevel isolation_level{}; diff --git a/src/storage/v2/vertex_info_helpers.hpp b/src/storage/v2/vertex_info_helpers.hpp index 27ecb8398..7c8e2a652 100644 --- a/src/storage/v2/vertex_info_helpers.hpp +++ b/src/storage/v2/vertex_info_helpers.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -76,13 +76,13 @@ inline auto HasLabel_ActionMethod(bool &has_label, LabelId label) { // clang-format off return utils::Overloaded{ ActionMethod([&, label](Delta const &delta) { - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(has_label, "Invalid database state!"); has_label = false; } }), ActionMethod([&, label](Delta const &delta) { - if (delta.label == label) { + if (delta.label.value == label) { MG_ASSERT(!has_label, "Invalid database state!"); has_label = true; } @@ -96,14 +96,14 @@ inline auto Labels_ActionMethod(std::vector &labels) { // clang-format off return utils::Overloaded{ ActionMethod([&](Delta const &delta) { - auto it = std::find(labels.begin(), labels.end(), delta.label); + auto it = std::find(labels.begin(), labels.end(), delta.label.value); DMG_ASSERT(it != labels.end(), "Invalid database state!"); *it = labels.back(); labels.pop_back(); }), ActionMethod([&](Delta const &delta) { - DMG_ASSERT(std::find(labels.begin(), labels.end(), delta.label) == labels.end(), "Invalid database state!"); - labels.emplace_back(delta.label); + DMG_ASSERT(std::find(labels.begin(), labels.end(), delta.label.value) == labels.end(), "Invalid database state!"); + labels.emplace_back(delta.label.value); }) }; // clang-format on @@ -113,7 +113,7 @@ inline auto PropertyValue_ActionMethod(PropertyValue &value, PropertyId property using enum Delta::Action; return ActionMethod([&, property](Delta const &delta) { if (delta.property.key == property) { - value = delta.property.value; + value = *delta.property.value; } }); } @@ -121,7 +121,7 @@ inline auto PropertyValue_ActionMethod(PropertyValue &value, PropertyId property inline auto PropertyValueMatch_ActionMethod(bool &match, PropertyId property, PropertyValue const &value) { using enum Delta::Action; return ActionMethod([&, property](Delta const &delta) { - if (delta.property.key == property) match = (value == delta.property.value); + if (delta.property.key == property) match = (value == *delta.property.value); }); } @@ -130,15 +130,15 @@ inline auto Properties_ActionMethod(std::map &propert return ActionMethod([&](Delta const &delta) { auto it = properties.find(delta.property.key); if (it != properties.end()) { - if (delta.property.value.IsNull()) { + if (delta.property.value->IsNull()) { // remove the property properties.erase(it); } else { // set the value - it->second = delta.property.value; + it->second = *delta.property.value; } - } else if (!delta.property.value.IsNull()) { - properties.emplace(delta.property.key, delta.property.value); + } else if (!delta.property.value->IsNull()) { + properties.emplace(delta.property.key, *delta.property.value); } }); } diff --git a/src/utils/disk_utils.hpp b/src/utils/disk_utils.hpp index 34bc704f6..c4b9accd6 100644 --- a/src/utils/disk_utils.hpp +++ b/src/utils/disk_utils.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -21,7 +21,7 @@ inline std::optional GetOldDiskKeyOrNull(storage::Delta *head) { head = head->next; } if (head->action == storage::Delta::Action::DELETE_DESERIALIZED_OBJECT) { - return head->old_disk_key; + return head->old_disk_key.value; } return std::nullopt; } diff --git a/src/utils/string.hpp b/src/utils/string.hpp index 8593fc57f..e5c4c4f3c 100644 --- a/src/utils/string.hpp +++ b/src/utils/string.hpp @@ -338,6 +338,13 @@ inline uint64_t ParseStringToUint64(const std::string_view s) { throw utils::ParseException(s); } +inline uint32_t ParseStringToUint32(const std::string_view s) { + if (uint32_t value = 0; std::from_chars(s.data(), s.data() + s.size(), value).ec == std::errc{}) { + return value; + } + throw utils::ParseException(s); +} + /** * Parse a double floating point value from a string using classic locale. * Note, the current implementation copies the given string which may perform a diff --git a/tests/unit/storage_v2_property_store.cpp b/tests/unit/storage_v2_property_store.cpp index 59b38c632..683146f2d 100644 --- a/tests/unit/storage_v2_property_store.cpp +++ b/tests/unit/storage_v2_property_store.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -419,9 +419,9 @@ TEST(PropertyStore, IntEncoding) { {memgraph::storage::PropertyId::FromUint(1048576UL), memgraph::storage::PropertyValue(1048576L)}, {memgraph::storage::PropertyId::FromUint(std::numeric_limits::max()), memgraph::storage::PropertyValue(std::numeric_limits::max())}, - {memgraph::storage::PropertyId::FromUint(4294967296UL), memgraph::storage::PropertyValue(4294967296L)}, - {memgraph::storage::PropertyId::FromUint(137438953472UL), memgraph::storage::PropertyValue(137438953472L)}, - {memgraph::storage::PropertyId::FromUint(std::numeric_limits::max()), + {memgraph::storage::PropertyId::FromUint(1048577UL), memgraph::storage::PropertyValue(4294967296L)}, + {memgraph::storage::PropertyId::FromUint(1048578UL), memgraph::storage::PropertyValue(137438953472L)}, + {memgraph::storage::PropertyId::FromUint(std::numeric_limits::max()), memgraph::storage::PropertyValue(std::numeric_limits::max())}}; memgraph::storage::PropertyStore props; diff --git a/tests/unit/storage_v2_wal_file.cpp b/tests/unit/storage_v2_wal_file.cpp index a67b09305..07a35d754 100644 --- a/tests/unit/storage_v2_wal_file.cpp +++ b/tests/unit/storage_v2_wal_file.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -154,8 +154,8 @@ class DeltaGenerator final { void Finalize(bool append_transaction_end = true) { auto commit_timestamp = gen_->timestamp_++; - if (transaction_.deltas.use().empty()) return; - for (const auto &delta : transaction_.deltas.use()) { + if (transaction_.deltas.empty()) return; + for (const auto &delta : transaction_.deltas) { auto owner = delta.prev.Get(); while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) { owner = owner.delta->prev.Get(); @@ -171,7 +171,7 @@ class DeltaGenerator final { if (append_transaction_end) { gen_->wal_file_.AppendTransactionEnd(commit_timestamp); if (gen_->valid_) { - gen_->UpdateStats(commit_timestamp, transaction_.deltas.use().size() + 1); + gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1); for (auto &data : data_) { if (data.type == memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) { // We need to put the final property value into the SET_PROPERTY