diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index cee74574d..ef32412c5 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1110,8 +1110,9 @@ void Storage::Accessor::Abort() { { std::unique_lock engine_guard(storage_->engine_lock_); uint64_t mark_timestamp = storage_->timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. + // Take garbage_undo_buffers lock while holding the engine lock to make sure that entries are sorted by mark + // timestamp in the list. This is necessary when a transaction is aborting simultaneously with a GC run: both of + // these operations acquire a mark timestamps and then modify the garbage deltas. storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { // Release engine lock because we don't have to hold it anymore and // emplace back could take a long time. @@ -1517,8 +1518,9 @@ void Storage::CollectGarbage() { { std::unique_lock guard(engine_lock_); uint64_t mark_timestamp = timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. + // Take garbage_undo_buffers lock while holding the engine lock to make sure that entries are sorted by mark + // timestamp in the list. This is necessary when a transaction is aborting simultaneously with a GC run: both of + // these operations acquire a mark timestamps and then modify the garbage deltas. garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { // Release engine lock because we don't have to hold it anymore and // this could take a long time. diff --git a/src/storage/v3/CMakeLists.txt b/src/storage/v3/CMakeLists.txt index c66d4a647..272d2cd03 100644 --- a/src/storage/v3/CMakeLists.txt +++ b/src/storage/v3/CMakeLists.txt @@ -9,6 +9,7 @@ set(storage_v3_src_files edge_accessor.cpp indices.cpp key_store.cpp + lexicographically_ordered_vertex.cpp property_store.cpp vertex_accessor.cpp schemas.cpp diff --git a/src/storage/v3/commit_log.cpp b/src/storage/v3/commit_log.cpp index 793e199d3..4ce4586be 100644 --- a/src/storage/v3/commit_log.cpp +++ b/src/storage/v3/commit_log.cpp @@ -47,8 +47,6 @@ CommitLog::~CommitLog() { } void CommitLog::MarkFinished(uint64_t id) { - std::lock_guard guard(lock_); - Block *block = FindOrCreateBlock(id); block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField); if (id == oldest_active_) { @@ -56,10 +54,7 @@ void CommitLog::MarkFinished(uint64_t id) { } } -uint64_t CommitLog::OldestActive() { - std::lock_guard guard(lock_); - return oldest_active_; -} +uint64_t CommitLog::OldestActive() const noexcept { return oldest_active_; } void CommitLog::UpdateOldestActive() { while (head_) { diff --git a/src/storage/v3/commit_log.hpp b/src/storage/v3/commit_log.hpp index c3f16e9ca..0ef3817dd 100644 --- a/src/storage/v3/commit_log.hpp +++ b/src/storage/v3/commit_log.hpp @@ -51,7 +51,7 @@ class CommitLog final { void MarkFinished(uint64_t id); /// Retrieve the oldest transaction still not marked as finished. - uint64_t OldestActive(); + uint64_t OldestActive() const noexcept; private: static constexpr uint64_t kBlockSize = 8192; @@ -72,7 +72,6 @@ class CommitLog final { uint64_t head_start_{0}; uint64_t next_start_{0}; uint64_t oldest_active_{0}; - utils::SpinLock lock_; utils::Allocator allocator_; }; diff --git a/src/storage/v3/config.hpp b/src/storage/v3/config.hpp index e0f878cc9..9d6dec759 100644 --- a/src/storage/v3/config.hpp +++ b/src/storage/v3/config.hpp @@ -23,9 +23,10 @@ namespace memgraph::storage::v3 { /// the storage. This class also defines the default behavior. struct Config { struct Gc { - enum class Type { NONE, PERIODIC }; + // TODO(antaljanosbenjamin): How to handle garbage collection? + enum class Type { NONE }; - Type type{Type::PERIODIC}; + Type type{Type::NONE}; std::chrono::milliseconds interval{std::chrono::milliseconds(1000)}; } gc; diff --git a/src/storage/v3/constraints.cpp b/src/storage/v3/constraints.cpp index 59006625e..17315165a 100644 --- a/src/storage/v3/constraints.cpp +++ b/src/storage/v3/constraints.cpp @@ -57,7 +57,6 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c bool deleted{false}; bool has_label{false}; { - std::lock_guard guard(vertex.lock); delta = vertex.delta; deleted = vertex.deleted; has_label = VertexHasLabel(vertex, label); @@ -142,7 +141,6 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std:: bool deleted{false}; Delta *delta{nullptr}; { - std::lock_guard guard(vertex.lock); has_label = VertexHasLabel(vertex, label); deleted = vertex.deleted; delta = vertex.delta; diff --git a/src/storage/v3/durability/durability.cpp b/src/storage/v3/durability/durability.cpp index a96594258..65ef294f0 100644 --- a/src/storage/v3/durability/durability.cpp +++ b/src/storage/v3/durability/durability.cpp @@ -157,11 +157,13 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ spdlog::info("Constraints are recreated from metadata."); } -std::optional RecoverData( - const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid, - std::string *epoch_id, std::deque> *epoch_history, VerticesSkipList *vertices, - utils::SkipList *edges, std::atomic *edge_count, NameIdMapper *name_id_mapper, Indices *indices, - Constraints *constraints, Config::Items items, uint64_t *wal_seq_num) { +std::optional RecoverData(const std::filesystem::path &snapshot_directory, + const std::filesystem::path &wal_directory, std::string *uuid, + std::string *epoch_id, + std::deque> *epoch_history, + VerticesSkipList *vertices, utils::SkipList *edges, uint64_t *edge_count, + NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, + Config::Items items, uint64_t *wal_seq_num) { utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory, wal_directory); diff --git a/src/storage/v3/durability/durability.hpp b/src/storage/v3/durability/durability.hpp index b7fc39005..b84c59b23 100644 --- a/src/storage/v3/durability/durability.hpp +++ b/src/storage/v3/durability/durability.hpp @@ -11,7 +11,6 @@ #pragma once -#include #include #include #include @@ -102,10 +101,12 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ /// Recovers data either from a snapshot and/or WAL files. /// @throw RecoveryFailure /// @throw std::bad_alloc -std::optional RecoverData( - const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid, - std::string *epoch_id, std::deque> *epoch_history, VerticesSkipList *vertices, - utils::SkipList *edges, std::atomic *edge_count, NameIdMapper *name_id_mapper, Indices *indices, - Constraints *constraints, Config::Items items, uint64_t *wal_seq_num); +std::optional RecoverData(const std::filesystem::path &snapshot_directory, + const std::filesystem::path &wal_directory, std::string *uuid, + std::string *epoch_id, + std::deque> *epoch_history, + VerticesSkipList *vertices, utils::SkipList *edges, uint64_t *edge_count, + NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, + Config::Items items, uint64_t *wal_seq_num); } // namespace memgraph::storage::v3::durability diff --git a/src/storage/v3/durability/snapshot.cpp b/src/storage/v3/durability/snapshot.cpp index 3c3ec9fe6..9ae4d00ab 100644 --- a/src/storage/v3/durability/snapshot.cpp +++ b/src/storage/v3/durability/snapshot.cpp @@ -162,7 +162,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipList *vertices, utils::SkipList *edges, std::deque> *epoch_history, - NameIdMapper *name_id_mapper, std::atomic *edge_count, Config::Items items) { + NameIdMapper *name_id_mapper, uint64_t *edge_count, Config::Items items) { RecoveryInfo ret; RecoveredIndicesAndConstraints indices_constraints; @@ -226,7 +226,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipLi }; // Reset current edge count. - edge_count->store(0, std::memory_order_release); + *edge_count = 0; { // Recover edges. @@ -485,7 +485,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipLi } // Increment edge count. We only increment the count here because the // information is duplicated in in_edges. - edge_count->fetch_add(*out_size, std::memory_order_acq_rel); + *edge_count += *out_size; } } spdlog::info("Connectivity is recovered."); @@ -687,13 +687,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps for (auto &edge : acc) { // The edge visibility check must be done here manually because we don't // allow direct access to the edges through the public API. - bool is_visible = true; - Delta *delta = nullptr; - { - std::lock_guard guard(edge.lock); - is_visible = !edge.deleted; - delta = edge.delta; - } + auto is_visible = !edge.deleted; + auto *delta = edge.delta; ApplyDeltasForRead(transaction, delta, View::OLD, [&is_visible](const Delta &delta) { switch (delta.action) { case Delta::Action::ADD_LABEL: diff --git a/src/storage/v3/durability/snapshot.hpp b/src/storage/v3/durability/snapshot.hpp index 68871cfe6..34af826a7 100644 --- a/src/storage/v3/durability/snapshot.hpp +++ b/src/storage/v3/durability/snapshot.hpp @@ -63,7 +63,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path); RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipList *vertices, utils::SkipList *edges, std::deque> *epoch_history, - NameIdMapper *name_id_mapper, std::atomic *edge_count, Config::Items items); + NameIdMapper *name_id_mapper, uint64_t *edge_count, Config::Items items); /// Function used to create a snapshot using the given transaction. void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snapshot_directory, diff --git a/src/storage/v3/durability/wal.cpp b/src/storage/v3/durability/wal.cpp index 7e1a0108c..52959379f 100644 --- a/src/storage/v3/durability/wal.cpp +++ b/src/storage/v3/durability/wal.cpp @@ -488,7 +488,6 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Config::Ite // actions. // encoder->WriteMarker(Marker::SECTION_DELTA); // encoder->WriteUint(timestamp); - // std::lock_guard guard(vertex.lock); // switch (delta.action) { // case Delta::Action::DELETE_OBJECT: // case Delta::Action::RECREATE_OBJECT: { @@ -540,10 +539,9 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, const Delta uint64_t timestamp) { // When converting a Delta to a WAL delta the logic is inverted. That is // because the Delta's represent undo actions and we want to store redo - // // actions. + // actions. // encoder->WriteMarker(Marker::SECTION_DELTA); // encoder->WriteUint(timestamp); - // std::lock_guard guard(edge.lock); // switch (delta.action) { // case Delta::Action::SET_PROPERTY: { // encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY); @@ -619,7 +617,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints, const std::optional last_loaded_timestamp, VerticesSkipList *vertices, - utils::SkipList *edges, NameIdMapper *name_id_mapper, std::atomic *edge_count, + utils::SkipList *edges, NameIdMapper * /*name_id_mapper*/, uint64_t * /*edge_count*/, Config::Items items) { spdlog::info("Trying to load WAL file {}.", path); RecoveryInfo ret; @@ -750,7 +748,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst // ret.next_edge_id = std::max(ret.next_edge_id, edge_gid.AsUint() + 1); // // Increment edge count. - // edge_count->fetch_add(1, std::memory_order_acq_rel); + // *edge_count += 1; // break; // } @@ -795,7 +793,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst // } // // Decrement edge count. - // edge_count->fetch_add(-1, std::memory_order_acq_rel); + // *edge_count += -1; // break; // } @@ -881,8 +879,8 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst // } // } - spdlog::info("Applied {} deltas from WAL. Skipped {} deltas, because they were too old.", deltas_applied, - info.num_deltas - deltas_applied); + // spdlog::info("Applied {} deltas from WAL. Skipped {} deltas, because they were too old.", deltas_applied, + // info.num_deltas - deltas_applied); return ret; } diff --git a/src/storage/v3/durability/wal.hpp b/src/storage/v3/durability/wal.hpp index 0e80134a5..a59789edb 100644 --- a/src/storage/v3/durability/wal.hpp +++ b/src/storage/v3/durability/wal.hpp @@ -191,7 +191,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage /// @throw RecoveryFailure RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints, std::optional last_loaded_timestamp, VerticesSkipList *vertices, - utils::SkipList *edges, NameIdMapper *name_id_mapper, std::atomic *edge_count, + utils::SkipList *edges, NameIdMapper *name_id_mapper, uint64_t *edge_count, Config::Items items); /// WalFile class used to append deltas and operations to the WAL file. diff --git a/src/storage/v3/edge.hpp b/src/storage/v3/edge.hpp index d6440793a..df47b8416 100644 --- a/src/storage/v3/edge.hpp +++ b/src/storage/v3/edge.hpp @@ -33,7 +33,6 @@ struct Edge { PropertyStore properties; - mutable utils::SpinLock lock; bool deleted; // uint8_t PAD; // uint16_t PAD; diff --git a/src/storage/v3/edge_accessor.cpp b/src/storage/v3/edge_accessor.cpp index abb5597e5..b4d275d69 100644 --- a/src/storage/v3/edge_accessor.cpp +++ b/src/storage/v3/edge_accessor.cpp @@ -22,14 +22,10 @@ namespace memgraph::storage::v3 { bool EdgeAccessor::IsVisible(const View view) const { - bool deleted = true; - bool exists = true; - Delta *delta = nullptr; - { - std::lock_guard guard(edge_.ptr->lock); - deleted = edge_.ptr->deleted; - delta = edge_.ptr->delta; - } + auto deleted = edge_.ptr->deleted; + auto exists = true; + auto *delta = edge_.ptr->delta; + ApplyDeltasForRead(transaction_, delta, view, [&](const Delta &delta) { switch (delta.action) { case Delta::Action::ADD_LABEL: @@ -66,8 +62,6 @@ Result EdgeAccessor::SetProperty(PropertyId property, const Prope utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED; - std::lock_guard guard(edge_.ptr->lock); - if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (edge_.ptr->deleted) return Error::DELETED_OBJECT; @@ -88,8 +82,6 @@ Result EdgeAccessor::SetProperty(PropertyId property, const Prope Result> EdgeAccessor::ClearProperties() { if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED; - std::lock_guard guard(edge_.ptr->lock); - if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (edge_.ptr->deleted) return Error::DELETED_OBJECT; @@ -106,16 +98,11 @@ Result> EdgeAccessor::ClearProperties() { Result EdgeAccessor::GetProperty(PropertyId property, View view) const { if (!config_.properties_on_edges) return PropertyValue(); - bool exists = true; - bool deleted = false; - PropertyValue value; - Delta *delta = nullptr; - { - std::lock_guard guard(edge_.ptr->lock); - deleted = edge_.ptr->deleted; - value = edge_.ptr->properties.GetProperty(property); - delta = edge_.ptr->delta; - } + auto exists = true; + auto deleted = edge_.ptr->deleted; + auto value = edge_.ptr->properties.GetProperty(property); + auto *delta = edge_.ptr->delta; + ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &value, property](const Delta &delta) { switch (delta.action) { case Delta::Action::SET_PROPERTY: { @@ -148,16 +135,11 @@ Result EdgeAccessor::GetProperty(PropertyId property, View view) Result> EdgeAccessor::Properties(View view) const { if (!config_.properties_on_edges) return std::map{}; - bool exists = true; - bool deleted = false; - std::map properties; - Delta *delta = nullptr; - { - std::lock_guard guard(edge_.ptr->lock); - deleted = edge_.ptr->deleted; - properties = edge_.ptr->properties.Properties(); - delta = edge_.ptr->delta; - } + auto exists = true; + auto deleted = edge_.ptr->deleted; + auto properties = edge_.ptr->properties.Properties(); + auto *delta = edge_.ptr->delta; + ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &properties](const Delta &delta) { switch (delta.action) { case Delta::Action::SET_PROPERTY: { diff --git a/src/storage/v3/indices.cpp b/src/storage/v3/indices.cpp index b9874d023..e69aa9d7e 100644 --- a/src/storage/v3/indices.cpp +++ b/src/storage/v3/indices.cpp @@ -53,7 +53,6 @@ bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t timestamp) bool deleted{false}; const Delta *delta{nullptr}; { - std::lock_guard guard(vertex.lock); has_label = utils::Contains(vertex.labels, label); deleted = vertex.deleted; delta = vertex.delta; @@ -106,7 +105,6 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, PropertyId bool deleted{false}; const Delta *delta{nullptr}; { - std::lock_guard guard(vertex.lock); has_label = utils::Contains(vertex.labels, label); current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value); deleted = vertex.deleted; @@ -165,7 +163,6 @@ bool CurrentVersionHasLabel(const Vertex &vertex, LabelId label, Transaction *tr bool has_label{false}; const Delta *delta{nullptr}; { - std::lock_guard guard(vertex.lock); deleted = vertex.deleted; has_label = utils::Contains(vertex.labels, label); delta = vertex.delta; @@ -217,7 +214,6 @@ bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, Propert bool current_value_equal_to_value = value.IsNull(); const Delta *delta{nullptr}; { - std::lock_guard guard(vertex.lock); deleted = vertex.deleted; has_label = utils::Contains(vertex.labels, label); current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value); diff --git a/src/storage/v3/lexicographically_ordered_vertex.cpp b/src/storage/v3/lexicographically_ordered_vertex.cpp new file mode 100644 index 000000000..04687f6e6 --- /dev/null +++ b/src/storage/v3/lexicographically_ordered_vertex.cpp @@ -0,0 +1,14 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v3/lexicographically_ordered_vertex.hpp" + +namespace memgraph::storage::v3 {} // namespace memgraph::storage::v3 diff --git a/src/storage/v3/replication/replication_client.cpp b/src/storage/v3/replication/replication_client.cpp index 4e893a314..ecc3ffbf4 100644 --- a/src/storage/v3/replication/replication_client.cpp +++ b/src/storage/v3/replication/replication_client.cpp @@ -90,14 +90,7 @@ void Storage::ReplicationClient::FrequentCheck() { void Storage::ReplicationClient::InitializeClient() { uint64_t current_commit_timestamp{kTimestampInitialId}; - std::optional epoch_id; - { - // epoch_id_ can be changed if we don't take this lock - std::unique_lock engine_guard(storage_->engine_lock_); - epoch_id.emplace(storage_->epoch_id_); - } - - auto stream{rpc_client_->Stream(storage_->last_commit_timestamp_, std::move(*epoch_id))}; + auto stream{rpc_client_->Stream(storage_->last_commit_timestamp_, storage_->epoch_id_)}; const auto response = stream.AwaitResponse(); std::optional branching_point; @@ -122,8 +115,8 @@ void Storage::ReplicationClient::InitializeClient() { current_commit_timestamp = response.current_commit_timestamp; spdlog::trace("Current timestamp on replica: {}", current_commit_timestamp); - spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_.load()); - if (current_commit_timestamp == storage_->last_commit_timestamp_.load()) { + spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_); + if (current_commit_timestamp == storage_->last_commit_timestamp_) { spdlog::debug("Replica '{}' up to date", name_); std::unique_lock client_guard{client_lock_}; replica_state_.store(replication::ReplicaState::READY); @@ -197,7 +190,7 @@ void Storage::ReplicationClient::StartTransactionReplication(const uint64_t curr case replication::ReplicaState::READY: MG_ASSERT(!replica_stream_); try { - replica_stream_.emplace(ReplicaStream{this, storage_->last_commit_timestamp_.load(), current_wal_seq_num}); + replica_stream_.emplace(ReplicaStream{this, storage_->last_commit_timestamp_, current_wal_seq_num}); replica_state_.store(replication::ReplicaState::REPLICATING); } catch (const rpc::RpcFailedException &) { replica_state_.store(replication::ReplicaState::INVALID); @@ -319,10 +312,8 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { auto response = TransferWalFiles(arg); replica_commit = response.current_commit_timestamp; } else if constexpr (std::is_same_v) { - std::unique_lock transaction_guard(storage_->engine_lock_); if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) { storage_->wal_file_->DisableFlushing(); - transaction_guard.unlock(); spdlog::debug("Sending current wal file"); replica_commit = ReplicateCurrentWal(); storage_->wal_file_->EnableFlushing(); @@ -355,7 +346,7 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { std::unique_lock client_guard{client_lock_}; SPDLOG_INFO("Replica timestamp: {}", replica_commit); SPDLOG_INFO("Last commit: {}", storage_->last_commit_timestamp_); - if (storage_->last_commit_timestamp_.load() == replica_commit) { + if (storage_->last_commit_timestamp_ == replica_commit) { replica_state_.store(replication::ReplicaState::READY); return; } @@ -403,7 +394,7 @@ std::vector Storage::ReplicationClient // This lock is also necessary to force the missed transaction to finish. std::optional current_wal_seq_num; std::optional current_wal_from_timestamp; - if (std::unique_lock transtacion_guard(storage_->engine_lock_); storage_->wal_file_) { + if (storage_->wal_file_) { current_wal_seq_num.emplace(storage_->wal_file_->SequenceNumber()); current_wal_from_timestamp.emplace(storage_->wal_file_->FromTimestamp()); } diff --git a/src/storage/v3/replication/replication_server.cpp b/src/storage/v3/replication/replication_server.cpp index 8f8fe10f8..552214b9a 100644 --- a/src/storage/v3/replication/replication_server.cpp +++ b/src/storage/v3/replication/replication_server.cpp @@ -87,7 +87,7 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::HeartbeatReq req; slk::Load(&req, req_reader); - replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_}; + replication::HeartbeatRes res{true, storage_->last_commit_timestamp_, storage_->epoch_id_}; slk::Save(res, res_builder); } @@ -125,7 +125,7 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl storage_->wal_seq_num_ = req.seq_num; } - if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) { + if (req.previous_commit_timestamp != storage_->last_commit_timestamp_) { // Empty the stream bool transaction_complete = false; while (!transaction_complete) { @@ -134,14 +134,14 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type); } - replication::AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()}; + replication::AppendDeltasRes res{false, storage_->last_commit_timestamp_}; slk::Save(res, res_builder); return; } ReadAndApplyDelta(&decoder); - replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()}; + replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_}; slk::Save(res, res_builder); } @@ -157,7 +157,6 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!"); spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path); - std::unique_lock storage_guard(storage_->main_lock_); // Clear the database storage_->vertices_.clear(); storage_->edges_.clear(); @@ -188,9 +187,8 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B } catch (const durability::RecoveryFailure &e) { LOG_FATAL("Couldn't load the snapshot because of: {}", e.what()); } - storage_guard.unlock(); - replication::SnapshotRes res{true, storage_->last_commit_timestamp_.load()}; + replication::SnapshotRes res{true, storage_->last_commit_timestamp_}; slk::Save(res, res_builder); // Delete other durability files @@ -226,7 +224,7 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B LoadWal(&decoder); } - replication::WalFilesRes res{true, storage_->last_commit_timestamp_.load()}; + replication::WalFilesRes res{true, storage_->last_commit_timestamp_}; slk::Save(res, res_builder); } @@ -240,7 +238,7 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk: LoadWal(&decoder); - replication::CurrentWalRes res{true, storage_->last_commit_timestamp_.load()}; + replication::CurrentWalRes res{true, storage_->last_commit_timestamp_}; slk::Save(res, res_builder); } @@ -298,17 +296,17 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * // auto vertex_acc = storage_->vertices_.access(); std::optional> commit_timestamp_and_accessor; - auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) { - if (!commit_timestamp_and_accessor) { - commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access()); - } else if (commit_timestamp_and_accessor->first != commit_timestamp) { - throw utils::BasicException("Received more than one transaction!"); - } - return &commit_timestamp_and_accessor->second; - }; + // auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) { + // if (!commit_timestamp_and_accessor) { + // commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access()); + // } else if (commit_timestamp_and_accessor->first != commit_timestamp) { + // throw utils::BasicException("Received more than one transaction!"); + // } + // return &commit_timestamp_and_accessor->second; + // }; uint64_t applied_deltas = 0; - auto max_commit_timestamp = storage_->last_commit_timestamp_.load(); + auto max_commit_timestamp = storage_->last_commit_timestamp_; for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) { const auto [timestamp, delta] = ReadDelta(decoder); @@ -423,13 +421,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * // // The edge visibility check must be done here manually because we // // don't allow direct access to the edges through the public API. // { - // bool is_visible = true; - // Delta *delta = nullptr; - // { - // std::lock_guard guard(edge->lock); - // is_visible = !edge->deleted; - // delta = edge->delta; - // } + // auto is_visible = !edge->deleted; + // auto *delta = edge->delta; // ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW, [&is_visible](const Delta &delta) { // switch (delta.action) { // case Delta::Action::ADD_LABEL: @@ -466,8 +459,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * // &storage_->indices_, // &storage_->constraints_, // storage_->config_.items, - // storage_->schema_validator_, - // storage_->schemas_}; + // storage_->schema_validator_}; // auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), // delta.vertex_edge_set_property.value); diff --git a/src/storage/v3/storage.cpp b/src/storage/v3/storage.cpp index bbe1aa0f4..23dd45427 100644 --- a/src/storage/v3/storage.cpp +++ b/src/storage/v3/storage.cpp @@ -401,19 +401,17 @@ Storage::Storage(Config config) } } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { - snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { - if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { - switch (maybe_error.GetError()) { - case CreateSnapshotError::DisabledForReplica: - spdlog::warn( - utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); - break; - } - } - }); - } - if (config_.gc.type == Config::Gc::Type::PERIODIC) { - gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); + // TODO(antaljanosbenjamin): handle snapshots + // snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { + // if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { + // switch (maybe_error.GetError()) { + // case CreateSnapshotError::DisabledForReplica: + // spdlog::warn( + // utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); + // break; + // } + // } + // }); } if (timestamp_ == kTimestampInitialId) { @@ -424,9 +422,6 @@ Storage::Storage(Config config) } Storage::~Storage() { - if (config_.gc.type == Config::Gc::Type::PERIODIC) { - gc_runner_.Stop(); - } { // Clear replication data replication_server_.reset(); @@ -437,7 +432,7 @@ Storage::~Storage() { wal_file_ = std::nullopt; } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { - snapshot_runner_.Stop(); + // TODO(antaljanosbenjamin): stop snapshot creation } if (config_.durability.snapshot_on_exit) { if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { @@ -452,17 +447,12 @@ Storage::~Storage() { Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level) : storage_(storage), - // The lock must be acquired before creating the transaction object to - // prevent freshly created transactions from dangling in an active state - // during exclusive operations. - storage_guard_(storage_->main_lock_), transaction_(storage->CreateTransaction(isolation_level)), is_transaction_active_(true), config_(storage->config_.items) {} Storage::Accessor::Accessor(Accessor &&other) noexcept : storage_(other.storage_), - storage_guard_(std::move(other.storage_guard_)), transaction_(std::move(other.transaction_)), commit_timestamp_(other.commit_timestamp_), is_transaction_active_(other.is_transaction_active_), @@ -533,8 +523,6 @@ Result> Storage::Accessor::DeleteVertex(VertexAcce "accessor when deleting a vertex!"); auto *vertex_ptr = vertex->vertex_; - std::lock_guard guard(vertex_ptr->lock); - if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; if (vertex_ptr->deleted) { @@ -563,8 +551,6 @@ Result>>> Stor std::vector> out_edges; { - std::lock_guard guard(vertex_ptr->lock); - if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; if (vertex_ptr->deleted) return std::optional{}; @@ -603,8 +589,6 @@ Result>>> Stor } } - std::lock_guard guard(vertex_ptr->lock); - // We need to check again for serialization errors because we unlocked the // vertex. Some other transaction could have modified the vertex in the // meantime if we didn't have any edges to delete. @@ -634,20 +618,6 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA auto *from_vertex = from->vertex_; auto *to_vertex = to->vertex_; - // Obtain the locks by `gid` order to avoid lock cycles. - std::unique_lock guard_from(from_vertex->lock, std::defer_lock); - std::unique_lock guard_to(to_vertex->lock, std::defer_lock); - if (from_vertex < to_vertex) { - guard_from.lock(); - guard_to.lock(); - } else if (from_vertex > to_vertex) { - guard_to.lock(); - guard_from.lock(); - } else { - // The vertices are the same vertex, only lock one. - guard_from.lock(); - } - if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR; if (from_vertex->deleted) return Error::DELETED_OBJECT; @@ -656,7 +626,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA if (to_vertex->deleted) return Error::DELETED_OBJECT; } - auto gid = Gid::FromUint(storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel)); + auto gid = Gid::FromUint(storage_->edge_id_++); EdgeRef edge(gid); if (config_.properties_on_edges) { auto acc = storage_->edges_.access(); @@ -675,7 +645,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + ++storage_->edge_count_; return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, config_, storage_->schema_validator_); @@ -694,20 +664,6 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA auto *from_vertex = from->vertex_; auto *to_vertex = to->vertex_; - // Obtain the locks by `gid` order to avoid lock cycles. - std::unique_lock guard_from(from_vertex->lock, std::defer_lock); - std::unique_lock guard_to(to_vertex->lock, std::defer_lock); - if (&from_vertex < &to_vertex) { - guard_from.lock(); - guard_to.lock(); - } else if (&from_vertex > &to_vertex) { - guard_to.lock(); - guard_from.lock(); - } else { - // The vertices are the same vertex, only lock one. - guard_from.lock(); - } - if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR; if (from_vertex->deleted) return Error::DELETED_OBJECT; @@ -722,8 +678,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA // that runs single-threadedly and while this instance is set-up to apply // threads (it is the replica), it is guaranteed that no other writes are // possible. - storage_->edge_id_.store(std::max(storage_->edge_id_.load(std::memory_order_acquire), gid.AsUint() + 1), - std::memory_order_release); + storage_->edge_id_ = std::max(storage_->edge_id_, gid.AsUint() + 1); EdgeRef edge(gid); if (config_.properties_on_edges) { @@ -743,7 +698,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + ++storage_->edge_count_; return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, config_, storage_->schema_validator_); @@ -756,10 +711,8 @@ Result> Storage::Accessor::DeleteEdge(EdgeAccessor * auto edge_ref = edge->edge_; auto edge_type = edge->edge_type_; - std::unique_lock guard; if (config_.properties_on_edges) { auto *edge_ptr = edge_ref.ptr; - guard = std::unique_lock(edge_ptr->lock); if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; @@ -769,20 +722,6 @@ Result> Storage::Accessor::DeleteEdge(EdgeAccessor * auto *from_vertex = edge->from_vertex_; auto *to_vertex = edge->to_vertex_; - // Obtain the locks by `gid` order to avoid lock cycles. - std::unique_lock guard_from(from_vertex->lock, std::defer_lock); - std::unique_lock guard_to(to_vertex->lock, std::defer_lock); - if (&from_vertex < &to_vertex) { - guard_from.lock(); - guard_to.lock(); - } else if (&from_vertex > &to_vertex) { - guard_to.lock(); - guard_from.lock(); - } else { - // The vertices are the same vertex, only lock one. - guard_from.lock(); - } - if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR; MG_ASSERT(!from_vertex->deleted, "Invalid database state!"); @@ -827,7 +766,7 @@ Result> Storage::Accessor::DeleteEdge(EdgeAccessor * CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); // Decrement edge count. - storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); + --storage_->edge_count_; return std::make_optional(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, config_, @@ -887,75 +826,66 @@ utils::BasicResult Storage::Accessor::Commit( // Save these so we can mark them used in the commit log. uint64_t start_timestamp = transaction_.start_timestamp; - { - std::unique_lock engine_guard(storage_->engine_lock_); - commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp)); + commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp)); - // Before committing and validating vertices against unique constraints, - // we have to update unique constraints with the vertices that are going - // to be validated/committed. - for (const auto &delta : transaction_.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) { - continue; - } - storage_->constraints_.unique_constraints.UpdateBeforeCommit(prev.vertex, transaction_); + // Before committing and validating vertices against unique constraints, + // we have to update unique constraints with the vertices that are going + // to be validated/committed. + for (const auto &delta : transaction_.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::VERTEX) { + continue; + } + storage_->constraints_.unique_constraints.UpdateBeforeCommit(prev.vertex, transaction_); + } + + // Validate that unique constraints are satisfied for all modified + // vertices. + for (const auto &delta : transaction_.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::VERTEX) { + continue; } - // Validate that unique constraints are satisfied for all modified - // vertices. - for (const auto &delta : transaction_.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) { - continue; - } + // No need to take any locks here because we modified this vertex and no + // one else can touch it until we commit. + unique_constraint_violation = + storage_->constraints_.unique_constraints.Validate(*prev.vertex, transaction_, *commit_timestamp_); + if (unique_constraint_violation) { + break; + } + } - // No need to take any locks here because we modified this vertex and no - // one else can touch it until we commit. - unique_constraint_violation = - storage_->constraints_.unique_constraints.Validate(*prev.vertex, transaction_, *commit_timestamp_); - if (unique_constraint_violation) { - break; - } + if (!unique_constraint_violation) { + // Write transaction to WAL while holding the engine lock to make sure + // that committed transactions are sorted by the commit timestamp in the + // WAL files. We supply the new commit timestamp to the function so that + // it knows what will be the final commit timestamp. The WAL must be + // written before actually committing the transaction (before setting + // the commit timestamp) so that no other transaction can see the + // modifications before they are written to disk. + // Replica can log only the write transaction received from Main + // so the Wal files are consistent + if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { + storage_->AppendToWal(transaction_, *commit_timestamp_); } - if (!unique_constraint_violation) { - // Write transaction to WAL while holding the engine lock to make sure - // that committed transactions are sorted by the commit timestamp in the - // WAL files. We supply the new commit timestamp to the function so that - // it knows what will be the final commit timestamp. The WAL must be - // written before actually committing the transaction (before setting - // the commit timestamp) so that no other transaction can see the - // modifications before they are written to disk. - // Replica can log only the write transaction received from Main - // so the Wal files are consistent - if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { - storage_->AppendToWal(transaction_, *commit_timestamp_); - } - - // Take committed_transactions lock while holding the engine lock to - // make sure that committed transactions are sorted by the commit - // timestamp in the list. - storage_->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) { - // TODO: release lock, and update all deltas to have a local copy - // of the commit timestamp - MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); - transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); - // Replica can only update the last commit timestamp with - // the commits received from main. - if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { - // Update the last commit timestamp - storage_->last_commit_timestamp_.store(*commit_timestamp_); - } - // Release engine lock because we don't have to hold it anymore - // and emplace back could take a long time. - engine_guard.unlock(); - }); - - storage_->commit_log_->MarkFinished(start_timestamp); + // TODO(antaljanosbenjamin): Figure out: + // 1. How the committed transactions are sorted in `committed_transactions_` + // 2. Why it was necessary to lock `committed_transactions_` when it was not accessed at all + // TODO: Update all deltas to have a local copy of the commit timestamp + MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!"); + transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); + // Replica can only update the last commit timestamp with + // the commits received from main. + if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { + // Update the last commit timestamp + storage_->last_commit_timestamp_ = *commit_timestamp_; } + + storage_->commit_log_->MarkFinished(start_timestamp); } if (unique_constraint_violation) { @@ -971,18 +901,11 @@ utils::BasicResult Storage::Accessor::Commit( void Storage::Accessor::Abort() { MG_ASSERT(is_transaction_active_, "The transaction is already terminated!"); - // We collect vertices and edges we've created here and then splice them into - // `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one - // by one and acquiring lock every time. - std::list my_deleted_vertices; - std::list my_deleted_edges; - for (const auto &delta : transaction_.deltas) { auto prev = delta.prev.Get(); switch (prev.type) { case PreviousPtr::Type::VERTEX: { auto *vertex = prev.vertex; - std::lock_guard guard(vertex->lock); Delta *current = vertex->delta; while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { @@ -1022,7 +945,7 @@ void Storage::Accessor::Abort() { // the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is // redundant. Also, `Edge/RECREATE_OBJECT` isn't available when // edge properties are disabled. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + ++storage_->edge_count_; break; } case Delta::Action::REMOVE_IN_EDGE: { @@ -1045,12 +968,12 @@ void Storage::Accessor::Abort() { // the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is // redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge // properties are disabled. - storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); + --storage_->edge_count_; break; } case Delta::Action::DELETE_OBJECT: { vertex->deleted = true; - InsertVertexPKIntoList(my_deleted_vertices, vertex->keys.Keys()); + InsertVertexPKIntoList(storage_->deleted_vertices_, vertex->keys.Keys()); break; } case Delta::Action::RECREATE_OBJECT: { @@ -1069,7 +992,6 @@ void Storage::Accessor::Abort() { } case PreviousPtr::Type::EDGE: { auto *edge = prev.edge; - std::lock_guard guard(edge->lock); Delta *current = edge->delta; while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) { @@ -1080,7 +1002,7 @@ void Storage::Accessor::Abort() { } case Delta::Action::DELETE_OBJECT: { edge->deleted = true; - my_deleted_edges.push_back(edge->gid); + storage_->deleted_edges_.push_back(edge->gid); break; } case Delta::Action::RECREATE_OBJECT: { @@ -1114,20 +1036,11 @@ void Storage::Accessor::Abort() { } { - std::unique_lock engine_guard(storage_->engine_lock_); uint64_t mark_timestamp = storage_->timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. - storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { - // Release engine lock because we don't have to hold it anymore and - // emplace back could take a long time. - engine_guard.unlock(); - garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas)); - }); - storage_->deleted_vertices_.WithLock( - [&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); }); - storage_->deleted_edges_.WithLock( - [&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); }); + + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + storage_->garbage_undo_buffers_.emplace_back(mark_timestamp, std::move(transaction_.deltas)); } storage_->commit_log_->MarkFinished(transaction_.start_timestamp); @@ -1137,8 +1050,7 @@ void Storage::Accessor::Abort() { void Storage::Accessor::FinalizeTransaction() { if (commit_timestamp_) { storage_->commit_log_->MarkFinished(*commit_timestamp_); - storage_->committed_transactions_.WithLock( - [&](auto &committed_transactions) { committed_transactions.emplace_back(std::move(transaction_)); }); + storage_->committed_transactions_.emplace_back(std::move(transaction_)); commit_timestamp_.reset(); } } @@ -1164,7 +1076,6 @@ EdgeTypeId Storage::NameToEdgeType(const std::string_view name) { } bool Storage::CreateIndex(LabelId label, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); // TODO Fix Index return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); @@ -1175,7 +1086,6 @@ bool Storage::CreateIndex(LabelId label, const std::optional desired_c } bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); // TODO Fix Index // if (!indices_.label_property_index.CreateIndex(label, property, labelspace.access())) return false; return false; @@ -1187,7 +1097,6 @@ bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optiona } bool Storage::DropIndex(LabelId label, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); if (!indices_.label_index.DropIndex(label)) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp); @@ -1197,7 +1106,6 @@ bool Storage::DropIndex(LabelId label, const std::optional desired_com } bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); if (!indices_.label_property_index.DropIndex(label, property)) return false; // For a description why using `timestamp_` is correct, see // `CreateIndex(LabelId label)`. @@ -1209,13 +1117,11 @@ bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional< } IndicesInfo Storage::ListAllIndices() const { - std::shared_lock storage_guard_(main_lock_); return {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; } utils::BasicResult Storage::CreateExistenceConstraint( LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); // TODO Fix constraints // auto ret = ::memgraph::storage::v3::CreateExistenceConstraint(&constraints_, label, property, vertices_.access()); // if (ret.HasError() || !ret.GetValue()) return ret; @@ -1229,7 +1135,6 @@ utils::BasicResult Storage::CreateExistenceConstraint bool Storage::DropExistenceConstraint(LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); if (!::memgraph::storage::v3::DropExistenceConstraint(&constraints_, label, property)) return false; const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp); @@ -1240,7 +1145,6 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property, utils::BasicResult Storage::CreateUniqueConstraint( LabelId label, const std::set &properties, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); // TODO Fix constraints // auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access()); // if (ret.HasError() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) { @@ -1256,7 +1160,6 @@ utils::BasicResult Stora UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint( LabelId label, const std::set &properties, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); auto ret = constraints_.unique_constraints.DropConstraint(label, properties); if (ret != UniqueConstraints::DeletionStatus::SUCCESS) { return ret; @@ -1271,17 +1174,12 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint( const SchemaValidator &Storage::Accessor::GetSchemaValidator() const { return storage_->schema_validator_; } ConstraintsInfo Storage::ListAllConstraints() const { - std::shared_lock storage_guard_(main_lock_); return {ListExistenceConstraints(constraints_), constraints_.unique_constraints.ListConstraints()}; } -SchemasInfo Storage::ListAllSchemas() const { - std::shared_lock storage_guard_(main_lock_); - return {schemas_.ListSchemas()}; -} +SchemasInfo Storage::ListAllSchemas() const { return {schemas_.ListSchemas()}; } const Schemas::Schema *Storage::GetSchema(const LabelId primary_label) const { - std::shared_lock storage_guard_(main_lock_); return schemas_.GetSchema(primary_label); } @@ -1293,12 +1191,11 @@ bool Storage::DropSchema(const LabelId primary_label) { return schemas_.DropSche StorageInfo Storage::GetInfo() const { auto vertex_count = vertices_.size(); - auto edge_count = edge_count_.load(std::memory_order_acquire); double average_degree = 0.0; if (vertex_count) { - average_degree = 2.0 * static_cast(edge_count) / static_cast(vertex_count); + average_degree = 2.0 * static_cast(edge_count_) / static_cast(vertex_count); } - return {vertex_count, edge_count, average_degree, utils::GetMemoryUsage(), + return {vertex_count, edge_count_, average_degree, utils::GetMemoryUsage(), utils::GetDirDiskUsage(config_.durability.storage_directory)}; } @@ -1331,48 +1228,34 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) { // `timestamp`) below. uint64_t transaction_id{0}; uint64_t start_timestamp{0}; - { - std::lock_guard guard(engine_lock_); - transaction_id = transaction_id_++; - // Replica should have only read queries and the write queries - // can come from main instance with any past timestamp. - // To preserve snapshot isolation we set the start timestamp - // of any query on replica to the last commited transaction - // which is timestamp_ as only commit of transaction with writes - // can change the value of it. - if (replication_role_ == ReplicationRole::REPLICA) { - start_timestamp = timestamp_; - } else { - start_timestamp = timestamp_++; - } + + transaction_id = transaction_id_++; + // Replica should have only read queries and the write queries + // can come from main instance with any past timestamp. + // To preserve snapshot isolation we set the start timestamp + // of any query on replica to the last commited transaction + // which is timestamp_ as only commit of transaction with writes + // can change the value of it. + if (replication_role_ == ReplicationRole::REPLICA) { + start_timestamp = timestamp_; + } else { + start_timestamp = timestamp_++; } + return {transaction_id, start_timestamp, isolation_level}; } +// `force` means there are no active transactions, so everything can be deleted without worrying about removing some +// data that is used by an active transaction template void Storage::CollectGarbage() { if constexpr (force) { - // We take the unique lock on the main storage lock so we can forcefully clean - // everything we can - if (!main_lock_.try_lock()) { - CollectGarbage(); - return; - } - } else { - // Because the garbage collector iterates through the indices and constraints - // to clean them up, it must take the main lock for reading to make sure that - // the indices and constraints aren't concurrently being modified. - main_lock_.lock_shared(); + // TODO(antaljanosbenjamin): figure out whether is there any active transaction or not (probably accessors should + // increment/decrement a counter). If there are no transactions, then garbage collection can be forced + CollectGarbage(); + return; } - utils::OnScopeExit lock_releaser{[&] { - if constexpr (force) { - main_lock_.unlock(); - } else { - main_lock_.unlock_shared(); - } - }}; - // Garbage collection must be performed in two phases. In the first phase, // deltas that won't be applied by any transaction anymore are unlinked from // the version chains. They cannot be deleted immediately, because there @@ -1380,10 +1263,6 @@ void Storage::CollectGarbage() { // chain traversal. They are instead marked for deletion and will be deleted // in the second GC phase in this GC iteration or some of the following // ones. - std::unique_lock gc_guard(gc_lock_, std::try_to_lock); - if (!gc_guard.owns_lock()) { - return; - } uint64_t oldest_active_start_timestamp = commit_log_->OldestActive(); // We don't move undo buffers of unlinked transactions to garbage_undo_buffers @@ -1394,27 +1273,22 @@ void Storage::CollectGarbage() { // We will only free vertices deleted up until now in this GC cycle, and we // will do it after cleaning-up the indices. That way we are sure that all // vertices that appear in an index also exist in main storage. - std::list current_deleted_edges; - std::list current_deleted_vertices; - deleted_vertices_->swap(current_deleted_vertices); - deleted_edges_->swap(current_deleted_edges); // Flag that will be used to determine whether the Index GC should be run. It // should be run when there were any items that were cleaned up (there were // updates between this run of the GC and the previous run of the GC). This // eliminates high CPU usage when the GC doesn't have to clean up anything. - bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty(); + bool run_index_cleanup = !committed_transactions_.empty() || !garbage_undo_buffers_.empty(); while (true) { // We don't want to hold the lock on commited transactions for too long, // because that prevents other transactions from committing. Transaction *transaction{nullptr}; { - auto committed_transactions_ptr = committed_transactions_.Lock(); - if (committed_transactions_ptr->empty()) { + if (committed_transactions_.empty()) { break; } - transaction = &committed_transactions_ptr->front(); + transaction = &committed_transactions_.front(); } auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire); @@ -1459,7 +1333,6 @@ void Storage::CollectGarbage() { switch (prev.type) { case PreviousPtr::Type::VERTEX: { Vertex *vertex = prev.vertex; - std::lock_guard vertex_guard(vertex->lock); if (vertex->delta != &delta) { // Something changed, we're not the first delta in the chain // anymore. @@ -1467,13 +1340,12 @@ void Storage::CollectGarbage() { } vertex->delta = nullptr; if (vertex->deleted) { - InsertVertexPKIntoList(current_deleted_vertices, vertex->keys.Keys()); + InsertVertexPKIntoList(deleted_vertices_, vertex->keys.Keys()); } break; } case PreviousPtr::Type::EDGE: { Edge *edge = prev.edge; - std::lock_guard edge_guard(edge->lock); if (edge->delta != &delta) { // Something changed, we're not the first delta in the chain // anymore. @@ -1481,7 +1353,7 @@ void Storage::CollectGarbage() { } edge->delta = nullptr; if (edge->deleted) { - current_deleted_edges.push_back(edge->gid); + deleted_edges_.push_back(edge->gid); } break; } @@ -1492,7 +1364,6 @@ void Storage::CollectGarbage() { // part of the suffix later. break; } - std::unique_lock guard; { // We need to find the parent object in order to be able to use // its lock. @@ -1502,21 +1373,13 @@ void Storage::CollectGarbage() { } switch (parent.type) { case PreviousPtr::Type::VERTEX: - guard = std::unique_lock(parent.vertex->lock); - break; case PreviousPtr::Type::EDGE: - guard = std::unique_lock(parent.edge->lock); break; case PreviousPtr::Type::DELTA: case PreviousPtr::Type::NULLPTR: LOG_FATAL("Invalid database state!"); } } - if (delta.prev.Get() != prev) { - // Something changed, we could now be the first delta in the - // chain. - continue; - } Delta *prev_delta = prev.delta; prev_delta->next.store(nullptr, std::memory_order_release); break; @@ -1529,10 +1392,8 @@ void Storage::CollectGarbage() { } } - committed_transactions_.WithLock([&](auto &committed_transactions) { - unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas)); - committed_transactions.pop_front(); - }); + unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas)); + committed_transactions_.pop_front(); } // After unlinking deltas from vertices, we refresh the indices. That way @@ -1547,38 +1408,26 @@ void Storage::CollectGarbage() { } { - std::unique_lock guard(engine_lock_); uint64_t mark_timestamp = timestamp_; - // Take garbage_undo_buffers lock while holding the engine lock to make - // sure that entries are sorted by mark timestamp in the list. - garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { - // Release engine lock because we don't have to hold it anymore and - // this could take a long time. - guard.unlock(); - // TODO(mtomic): holding garbage_undo_buffers_ lock here prevents - // transactions from aborting until we're done marking, maybe we should - // add them one-by-one or something - for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) { - timestamp = mark_timestamp; - } - garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers); - }); - for (auto vertex : current_deleted_vertices) { + for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) { + timestamp = mark_timestamp; + } + garbage_undo_buffers_.splice(garbage_undo_buffers_.end(), unlinked_undo_buffers); + + for (const auto &vertex : deleted_vertices_) { garbage_vertices_.emplace_back(mark_timestamp, vertex); } } - garbage_undo_buffers_.WithLock([&](auto &undo_buffers) { - // if force is set to true we can simply delete all the leftover undos because - // no transaction is active - if constexpr (force) { - undo_buffers.clear(); - } else { - while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) { - undo_buffers.pop_front(); - } + // if force is set to true we can simply delete all the leftover undos because + // no transaction is active + if constexpr (force) { + garbage_undo_buffers_.clear(); + } else { + while (!garbage_undo_buffers_.empty() && garbage_undo_buffers_.front().first <= oldest_active_start_timestamp) { + garbage_undo_buffers_.pop_front(); } - }); + } { auto vertex_acc = vertices_.access(); @@ -1598,7 +1447,7 @@ void Storage::CollectGarbage() { } { auto edge_acc = edges_.access(); - for (auto edge : current_deleted_edges) { + for (auto edge : deleted_edges_) { MG_ASSERT(edge_acc.remove(edge), "Invalid database state!"); } } @@ -1643,7 +1492,7 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_ // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - if (replication_role_.load() == ReplicationRole::MAIN) { + if (replication_role_ == ReplicationRole::MAIN) { replication_clients_.WithLock([&](auto &clients) { for (auto &client : clients) { client->StartTransactionReplication(wal_file_->SequenceNumber()); @@ -1820,7 +1669,7 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId if (!InitializeWalFile()) return; wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); { - if (replication_role_.load() == ReplicationRole::MAIN) { + if (replication_role_ == ReplicationRole::MAIN) { replication_clients_.WithLock([&](auto &clients) { for (auto &client : clients) { client->StartTransactionReplication(wal_file_->SequenceNumber()); @@ -1835,15 +1684,10 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId } utils::BasicResult Storage::CreateSnapshot() { - if (replication_role_.load() != ReplicationRole::MAIN) { + if (replication_role_ != ReplicationRole::MAIN) { return CreateSnapshotError::DisabledForReplica; } - std::lock_guard snapshot_guard(snapshot_lock_); - - // Take master RW lock (for reading). - std::shared_lock storage_guard(main_lock_); - // Create the transaction used to create the snapshot. auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION); @@ -1903,7 +1747,7 @@ bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication:: replication_server_ = std::make_unique(this, std::move(endpoint), config); - replication_role_.store(ReplicationRole::REPLICA); + replication_role_ = ReplicationRole::REPLICA; return true; } @@ -1918,29 +1762,26 @@ bool Storage::SetMainReplicationRole() { // This should be always called first so we finalize everything replication_server_.reset(nullptr); - { - std::unique_lock engine_guard{engine_lock_}; - if (wal_file_) { - wal_file_->FinalizeWal(); - wal_file_.reset(); - } - - // Generate new epoch id and save the last one to the history. - if (epoch_history_.size() == kEpochHistoryRetention) { - epoch_history_.pop_front(); - } - epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_); - epoch_id_ = utils::GenerateUUID(); + if (wal_file_) { + wal_file_->FinalizeWal(); + wal_file_.reset(); } - replication_role_.store(ReplicationRole::MAIN); + // Generate new epoch id and save the last one to the history. + if (epoch_history_.size() == kEpochHistoryRetention) { + epoch_history_.pop_front(); + } + epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_); + epoch_id_ = utils::GenerateUUID(); + + replication_role_ = ReplicationRole::MAIN; return true; } utils::BasicResult Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, const replication::ReplicationClientConfig &config) { - MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!"); + MG_ASSERT(replication_role_ == ReplicationRole::MAIN, "Only main instance can register a replica!"); const bool name_exists = replication_clients_.WithLock([&](auto &clients) { return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; }); @@ -1986,7 +1827,7 @@ utils::BasicResult Storage::RegisterReplica( } bool Storage::UnregisterReplica(const std::string_view name) { - MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!"); + MG_ASSERT(replication_role_ == ReplicationRole::MAIN, "Only main instance can unregister a replica!"); return replication_clients_.WithLock([&](auto &clients) { return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); }); @@ -2017,9 +1858,6 @@ std::vector Storage::ReplicasInfo() { }); } -void Storage::SetIsolationLevel(IsolationLevel isolation_level) { - std::unique_lock main_guard{main_lock_}; - isolation_level_ = isolation_level; -} +void Storage::SetIsolationLevel(IsolationLevel isolation_level) { isolation_level_ = isolation_level; } } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/storage.hpp b/src/storage/v3/storage.hpp index 1862503d7..ed884c86e 100644 --- a/src/storage/v3/storage.hpp +++ b/src/storage/v3/storage.hpp @@ -355,7 +355,6 @@ class Storage final { Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, Gid gid); Storage *storage_; - std::shared_lock storage_guard_; Transaction transaction_; std::optional commit_timestamp_; bool is_transaction_active_; @@ -510,22 +509,14 @@ class Storage final { uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); - // Main storage lock. - // - // Accessors take a shared lock when starting, so it is possible to block - // creation of new accessors by taking a unique lock. This is used when doing - // operations on storage that affect the global state, for example index - // creation. - mutable utils::RWLock main_lock_{utils::RWLock::Priority::WRITE}; - // Main object storage VerticesSkipList vertices_; utils::SkipList edges_; - std::atomic edge_id_{0}; + uint64_t edge_id_{0}; // Even though the edge count is already kept in the `edges_` SkipList, the // list is used only when properties are enabled for edges. Because of that we // keep a separate count of edges that is always updated. - std::atomic edge_count_{0}; + uint64_t edge_count_{0}; NameIdMapper name_id_mapper_; @@ -535,7 +526,6 @@ class Storage final { Schemas schemas_; // Transaction engine - utils::SpinLock engine_lock_; uint64_t timestamp_{kTimestampInitialId}; uint64_t transaction_id_{kTransactionInitialId}; // TODO: This isn't really a commit log, it doesn't even care if a @@ -544,19 +534,17 @@ class Storage final { // whatever. std::optional commit_log_; - utils::Synchronized, utils::SpinLock> committed_transactions_; + std::list committed_transactions_; IsolationLevel isolation_level_; Config config_; - utils::Scheduler gc_runner_; - std::mutex gc_lock_; // Undo buffers that were unlinked and now are waiting to be freed. - utils::Synchronized>>, utils::SpinLock> garbage_undo_buffers_; + std::list>> garbage_undo_buffers_; // Vertices that are logically deleted but still have to be removed from // indices before removing them from the main storage. - utils::Synchronized, utils::SpinLock> deleted_vertices_; + std::list deleted_vertices_; // Vertices that are logically deleted and removed from indices and now wait // to be removed from the main storage. @@ -564,7 +552,7 @@ class Storage final { // Edges that are logically deleted and wait to be removed from the main // storage. - utils::Synchronized, utils::SpinLock> deleted_edges_; + std::list deleted_edges_; // Durability std::filesystem::path snapshot_directory_; @@ -572,9 +560,6 @@ class Storage final { std::filesystem::path lock_file_path_; utils::OutputFile lock_file_handle_; - utils::Scheduler snapshot_runner_; - utils::SpinLock snapshot_lock_; - // UUID used to distinguish snapshots and to link snapshots to WALs std::string uuid_; // Sequence number used to keep track of the chain of WALs. @@ -609,7 +594,7 @@ class Storage final { utils::FileRetainer::FileLocker global_locker_; // Last commited timestamp - std::atomic last_commit_timestamp_{kTimestampInitialId}; + uint64_t last_commit_timestamp_{kTimestampInitialId}; class ReplicationServer; std::unique_ptr replication_server_{nullptr}; @@ -628,7 +613,7 @@ class Storage final { using ReplicationClientList = utils::Synchronized>, utils::SpinLock>; ReplicationClientList replication_clients_; - std::atomic replication_role_{ReplicationRole::MAIN}; + ReplicationRole replication_role_{ReplicationRole::MAIN}; }; } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/vertex.hpp b/src/storage/v3/vertex.hpp index 44698381c..50cb0333d 100644 --- a/src/storage/v3/vertex.hpp +++ b/src/storage/v3/vertex.hpp @@ -69,7 +69,6 @@ struct Vertex { std::vector> in_edges; std::vector> out_edges; - mutable utils::SpinLock lock; bool deleted{false}; // uint8_t PAD; // uint16_t PAD; diff --git a/src/storage/v3/vertex_accessor.cpp b/src/storage/v3/vertex_accessor.cpp index 6b5dae827..ff8b34276 100644 --- a/src/storage/v3/vertex_accessor.cpp +++ b/src/storage/v3/vertex_accessor.cpp @@ -33,7 +33,6 @@ std::pair IsVisible(Vertex *vertex, Transaction *transaction, View v bool deleted = false; Delta *delta = nullptr; { - std::lock_guard guard(vertex->lock); deleted = vertex->deleted; delta = vertex->delta; } @@ -80,7 +79,6 @@ bool VertexAccessor::IsVisible(View view) const { Result VertexAccessor::AddLabel(LabelId label) { utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; @@ -102,7 +100,6 @@ ResultSchema VertexAccessor::AddLabelAndValidate(LabelId label) { return {*maybe_violation_error}; } utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return {Error::SERIALIZATION_ERROR}; @@ -120,8 +117,6 @@ ResultSchema VertexAccessor::AddLabelAndValidate(LabelId label) { } Result VertexAccessor::RemoveLabel(LabelId label) { - std::lock_guard guard(vertex_->lock); - if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; if (vertex_->deleted) return Error::DELETED_OBJECT; @@ -140,7 +135,6 @@ ResultSchema VertexAccessor::RemoveLabelAndValidate(LabelId label) { if (const auto maybe_violation_error = vertex_validator_.ValidateRemoveLabel(label); maybe_violation_error) { return {*maybe_violation_error}; } - std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return {Error::SERIALIZATION_ERROR}; @@ -162,7 +156,6 @@ Result VertexAccessor::HasLabel(LabelId label, View view) const { bool has_label = false; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; has_label = VertexHasLabel(*vertex_, label); delta = vertex_->delta; @@ -209,7 +202,6 @@ Result VertexAccessor::PrimaryLabel(const View view) const { bool deleted = false; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; delta = vertex_->delta; } @@ -243,7 +235,6 @@ Result VertexAccessor::PrimaryKey(const View view) const { bool deleted = false; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; delta = vertex_->delta; } @@ -282,7 +273,6 @@ Result> VertexAccessor::Labels(View view) const { std::vector labels; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; labels = vertex_->labels; delta = vertex_->delta; @@ -327,7 +317,6 @@ Result> VertexAccessor::Labels(View view) const { Result VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) { utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; @@ -353,7 +342,6 @@ ResultSchema VertexAccessor::SetPropertyAndValidate(PropertyId pr return {*maybe_violation_error}; } utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - std::lock_guard guard(vertex_->lock); if (!PrepareForWrite(transaction_, vertex_)) { return {Error::SERIALIZATION_ERROR}; @@ -379,8 +367,6 @@ ResultSchema VertexAccessor::SetPropertyAndValidate(PropertyId pr } Result> VertexAccessor::ClearProperties() { - std::lock_guard guard(vertex_->lock); - if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; if (vertex_->deleted) return Error::DELETED_OBJECT; @@ -402,7 +388,6 @@ Result VertexAccessor::GetProperty(PropertyId property, View view PropertyValue value; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; value = vertex_->properties.GetProperty(property); delta = vertex_->delta; @@ -443,7 +428,6 @@ Result> VertexAccessor::Properties(View view std::map properties; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; properties = vertex_->properties.Properties(); delta = vertex_->delta; @@ -495,7 +479,6 @@ Result> VertexAccessor::InEdges(View view, const std:: std::vector> in_edges; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; if (edge_types.empty() && !destination) { in_edges = vertex_->in_edges; @@ -576,7 +559,6 @@ Result> VertexAccessor::OutEdges(View view, const std: std::vector> out_edges; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; if (edge_types.empty() && !destination) { out_edges = vertex_->out_edges; @@ -655,7 +637,6 @@ Result VertexAccessor::InDegree(View view) const { size_t degree = 0; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; degree = vertex_->in_edges.size(); delta = vertex_->delta; @@ -693,7 +674,6 @@ Result VertexAccessor::OutDegree(View view) const { size_t degree = 0; Delta *delta = nullptr; { - std::lock_guard guard(vertex_->lock); deleted = vertex_->deleted; degree = vertex_->out_edges.size(); delta = vertex_->delta;