Remove multi-threaded related logic and variables (#460)

* Remove logic that was necessary for optimal multi-threaded performance, such
  as accumulating deleted object in local containers and appending them to a
  global one, handling overlapping locking.
* Remove background GC thread.
* Remove mutexes, locks and atomics throughout storage.
This commit is contained in:
János Benjamin Antal 2022-08-30 13:41:53 +02:00 committed by GitHub
parent 95dbc022c0
commit efb3c8d03d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 241 additions and 473 deletions

View File

@ -1110,8 +1110,9 @@ void Storage::Accessor::Abort() {
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
uint64_t mark_timestamp = storage_->timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
// Take garbage_undo_buffers lock while holding the engine lock to make sure that entries are sorted by mark
// timestamp in the list. This is necessary when a transaction is aborting simultaneously with a GC run: both of
// these operations acquire a mark timestamps and then modify the garbage deltas.
storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// emplace back could take a long time.
@ -1517,8 +1518,9 @@ void Storage::CollectGarbage() {
{
std::unique_lock<utils::SpinLock> guard(engine_lock_);
uint64_t mark_timestamp = timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
// Take garbage_undo_buffers lock while holding the engine lock to make sure that entries are sorted by mark
// timestamp in the list. This is necessary when a transaction is aborting simultaneously with a GC run: both of
// these operations acquire a mark timestamps and then modify the garbage deltas.
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// this could take a long time.

View File

@ -9,6 +9,7 @@ set(storage_v3_src_files
edge_accessor.cpp
indices.cpp
key_store.cpp
lexicographically_ordered_vertex.cpp
property_store.cpp
vertex_accessor.cpp
schemas.cpp

View File

@ -47,8 +47,6 @@ CommitLog::~CommitLog() {
}
void CommitLog::MarkFinished(uint64_t id) {
std::lock_guard<utils::SpinLock> guard(lock_);
Block *block = FindOrCreateBlock(id);
block->field[(id % kIdsInBlock) / kIdsInField] |= 1ULL << (id % kIdsInField);
if (id == oldest_active_) {
@ -56,10 +54,7 @@ void CommitLog::MarkFinished(uint64_t id) {
}
}
uint64_t CommitLog::OldestActive() {
std::lock_guard<utils::SpinLock> guard(lock_);
return oldest_active_;
}
uint64_t CommitLog::OldestActive() const noexcept { return oldest_active_; }
void CommitLog::UpdateOldestActive() {
while (head_) {

View File

@ -51,7 +51,7 @@ class CommitLog final {
void MarkFinished(uint64_t id);
/// Retrieve the oldest transaction still not marked as finished.
uint64_t OldestActive();
uint64_t OldestActive() const noexcept;
private:
static constexpr uint64_t kBlockSize = 8192;
@ -72,7 +72,6 @@ class CommitLog final {
uint64_t head_start_{0};
uint64_t next_start_{0};
uint64_t oldest_active_{0};
utils::SpinLock lock_;
utils::Allocator<Block> allocator_;
};

View File

@ -23,9 +23,10 @@ namespace memgraph::storage::v3 {
/// the storage. This class also defines the default behavior.
struct Config {
struct Gc {
enum class Type { NONE, PERIODIC };
// TODO(antaljanosbenjamin): How to handle garbage collection?
enum class Type { NONE };
Type type{Type::PERIODIC};
Type type{Type::NONE};
std::chrono::milliseconds interval{std::chrono::milliseconds(1000)};
} gc;

View File

@ -57,7 +57,6 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
bool deleted{false};
bool has_label{false};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
delta = vertex.delta;
deleted = vertex.deleted;
has_label = VertexHasLabel(vertex, label);
@ -142,7 +141,6 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
bool deleted{false};
Delta *delta{nullptr};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
has_label = VertexHasLabel(vertex, label);
deleted = vertex.deleted;
delta = vertex.delta;

View File

@ -157,11 +157,13 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
spdlog::info("Constraints are recreated from metadata.");
}
std::optional<RecoveryInfo> RecoverData(
const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id, std::deque<std::pair<std::string, uint64_t>> *epoch_history, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, Indices *indices,
Constraints *constraints, Config::Items items, uint64_t *wal_seq_num) {
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
VerticesSkipList *vertices, utils::SkipList<Edge> *edges, uint64_t *edge_count,
NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints,
Config::Items items, uint64_t *wal_seq_num) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory,
wal_directory);

View File

@ -11,7 +11,6 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <filesystem>
#include <optional>
@ -102,10 +101,12 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
/// Recovers data either from a snapshot and/or WAL files.
/// @throw RecoveryFailure
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(
const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id, std::deque<std::pair<std::string, uint64_t>> *epoch_history, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges, std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper, Indices *indices,
Constraints *constraints, Config::Items items, uint64_t *wal_seq_num);
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
VerticesSkipList *vertices, utils::SkipList<Edge> *edges, uint64_t *edge_count,
NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints,
Config::Items items, uint64_t *wal_seq_num);
} // namespace memgraph::storage::v3::durability

View File

@ -162,7 +162,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) {
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, Config::Items items) {
NameIdMapper *name_id_mapper, uint64_t *edge_count, Config::Items items) {
RecoveryInfo ret;
RecoveredIndicesAndConstraints indices_constraints;
@ -226,7 +226,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipLi
};
// Reset current edge count.
edge_count->store(0, std::memory_order_release);
*edge_count = 0;
{
// Recover edges.
@ -485,7 +485,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipLi
}
// Increment edge count. We only increment the count here because the
// information is duplicated in in_edges.
edge_count->fetch_add(*out_size, std::memory_order_acq_rel);
*edge_count += *out_size;
}
}
spdlog::info("Connectivity is recovered.");
@ -687,13 +687,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
for (auto &edge : acc) {
// The edge visibility check must be done here manually because we don't
// allow direct access to the edges through the public API.
bool is_visible = true;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge.lock);
is_visible = !edge.deleted;
delta = edge.delta;
}
auto is_visible = !edge.deleted;
auto *delta = edge.delta;
ApplyDeltasForRead(transaction, delta, View::OLD, [&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:

View File

@ -63,7 +63,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, Config::Items items);
NameIdMapper *name_id_mapper, uint64_t *edge_count, Config::Items items);
/// Function used to create a snapshot using the given transaction.
void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snapshot_directory,

View File

@ -488,7 +488,6 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Config::Ite
// actions.
// encoder->WriteMarker(Marker::SECTION_DELTA);
// encoder->WriteUint(timestamp);
// std::lock_guard<utils::SpinLock> guard(vertex.lock);
// switch (delta.action) {
// case Delta::Action::DELETE_OBJECT:
// case Delta::Action::RECREATE_OBJECT: {
@ -540,10 +539,9 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, const Delta
uint64_t timestamp) {
// When converting a Delta to a WAL delta the logic is inverted. That is
// because the Delta's represent undo actions and we want to store redo
// // actions.
// actions.
// encoder->WriteMarker(Marker::SECTION_DELTA);
// encoder->WriteUint(timestamp);
// std::lock_guard<utils::SpinLock> guard(edge.lock);
// switch (delta.action) {
// case Delta::Action::SET_PROPERTY: {
// encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
@ -619,7 +617,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage
RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints,
const std::optional<uint64_t> last_loaded_timestamp, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
utils::SkipList<Edge> *edges, NameIdMapper * /*name_id_mapper*/, uint64_t * /*edge_count*/,
Config::Items items) {
spdlog::info("Trying to load WAL file {}.", path);
RecoveryInfo ret;
@ -750,7 +748,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
// ret.next_edge_id = std::max(ret.next_edge_id, edge_gid.AsUint() + 1);
// // Increment edge count.
// edge_count->fetch_add(1, std::memory_order_acq_rel);
// *edge_count += 1;
// break;
// }
@ -795,7 +793,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
// }
// // Decrement edge count.
// edge_count->fetch_add(-1, std::memory_order_acq_rel);
// *edge_count += -1;
// break;
// }
@ -881,8 +879,8 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
// }
// }
spdlog::info("Applied {} deltas from WAL. Skipped {} deltas, because they were too old.", deltas_applied,
info.num_deltas - deltas_applied);
// spdlog::info("Applied {} deltas from WAL. Skipped {} deltas, because they were too old.", deltas_applied,
// info.num_deltas - deltas_applied);
return ret;
}

View File

@ -191,7 +191,7 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage
/// @throw RecoveryFailure
RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints,
std::optional<uint64_t> last_loaded_timestamp, VerticesSkipList *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper, uint64_t *edge_count,
Config::Items items);
/// WalFile class used to append deltas and operations to the WAL file.

View File

@ -33,7 +33,6 @@ struct Edge {
PropertyStore properties;
mutable utils::SpinLock lock;
bool deleted;
// uint8_t PAD;
// uint16_t PAD;

View File

@ -22,14 +22,10 @@
namespace memgraph::storage::v3 {
bool EdgeAccessor::IsVisible(const View view) const {
bool deleted = true;
bool exists = true;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
deleted = edge_.ptr->deleted;
delta = edge_.ptr->delta;
}
auto deleted = edge_.ptr->deleted;
auto exists = true;
auto *delta = edge_.ptr->delta;
ApplyDeltasForRead(transaction_, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
@ -66,8 +62,6 @@ Result<PropertyValue> EdgeAccessor::SetProperty(PropertyId property, const Prope
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
@ -88,8 +82,6 @@ Result<PropertyValue> EdgeAccessor::SetProperty(PropertyId property, const Prope
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
@ -106,16 +98,11 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const {
if (!config_.properties_on_edges) return PropertyValue();
bool exists = true;
bool deleted = false;
PropertyValue value;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
deleted = edge_.ptr->deleted;
value = edge_.ptr->properties.GetProperty(property);
delta = edge_.ptr->delta;
}
auto exists = true;
auto deleted = edge_.ptr->deleted;
auto value = edge_.ptr->properties.GetProperty(property);
auto *delta = edge_.ptr->delta;
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &value, property](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
@ -148,16 +135,11 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::Properties(View view) const {
if (!config_.properties_on_edges) return std::map<PropertyId, PropertyValue>{};
bool exists = true;
bool deleted = false;
std::map<PropertyId, PropertyValue> properties;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
deleted = edge_.ptr->deleted;
properties = edge_.ptr->properties.Properties();
delta = edge_.ptr->delta;
}
auto exists = true;
auto deleted = edge_.ptr->deleted;
auto properties = edge_.ptr->properties.Properties();
auto *delta = edge_.ptr->delta;
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &properties](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {

View File

@ -53,7 +53,6 @@ bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t timestamp)
bool deleted{false};
const Delta *delta{nullptr};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
has_label = utils::Contains(vertex.labels, label);
deleted = vertex.deleted;
delta = vertex.delta;
@ -106,7 +105,6 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, PropertyId
bool deleted{false};
const Delta *delta{nullptr};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
has_label = utils::Contains(vertex.labels, label);
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
deleted = vertex.deleted;
@ -165,7 +163,6 @@ bool CurrentVersionHasLabel(const Vertex &vertex, LabelId label, Transaction *tr
bool has_label{false};
const Delta *delta{nullptr};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
deleted = vertex.deleted;
has_label = utils::Contains(vertex.labels, label);
delta = vertex.delta;
@ -217,7 +214,6 @@ bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, Propert
bool current_value_equal_to_value = value.IsNull();
const Delta *delta{nullptr};
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
deleted = vertex.deleted;
has_label = utils::Contains(vertex.labels, label);
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);

View File

@ -0,0 +1,14 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v3/lexicographically_ordered_vertex.hpp"
namespace memgraph::storage::v3 {} // namespace memgraph::storage::v3

View File

@ -90,14 +90,7 @@ void Storage::ReplicationClient::FrequentCheck() {
void Storage::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
std::optional<std::string> epoch_id;
{
// epoch_id_ can be changed if we don't take this lock
std::unique_lock engine_guard(storage_->engine_lock_);
epoch_id.emplace(storage_->epoch_id_);
}
auto stream{rpc_client_->Stream<replication::HeartbeatRpc>(storage_->last_commit_timestamp_, std::move(*epoch_id))};
auto stream{rpc_client_->Stream<replication::HeartbeatRpc>(storage_->last_commit_timestamp_, storage_->epoch_id_)};
const auto response = stream.AwaitResponse();
std::optional<uint64_t> branching_point;
@ -122,8 +115,8 @@ void Storage::ReplicationClient::InitializeClient() {
current_commit_timestamp = response.current_commit_timestamp;
spdlog::trace("Current timestamp on replica: {}", current_commit_timestamp);
spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_.load());
if (current_commit_timestamp == storage_->last_commit_timestamp_.load()) {
spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_);
if (current_commit_timestamp == storage_->last_commit_timestamp_) {
spdlog::debug("Replica '{}' up to date", name_);
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
@ -197,7 +190,7 @@ void Storage::ReplicationClient::StartTransactionReplication(const uint64_t curr
case replication::ReplicaState::READY:
MG_ASSERT(!replica_stream_);
try {
replica_stream_.emplace(ReplicaStream{this, storage_->last_commit_timestamp_.load(), current_wal_seq_num});
replica_stream_.emplace(ReplicaStream{this, storage_->last_commit_timestamp_, current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
@ -319,10 +312,8 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
auto response = TransferWalFiles(arg);
replica_commit = response.current_commit_timestamp;
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
std::unique_lock transaction_guard(storage_->engine_lock_);
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
storage_->wal_file_->DisableFlushing();
transaction_guard.unlock();
spdlog::debug("Sending current wal file");
replica_commit = ReplicateCurrentWal();
storage_->wal_file_->EnableFlushing();
@ -355,7 +346,7 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
std::unique_lock client_guard{client_lock_};
SPDLOG_INFO("Replica timestamp: {}", replica_commit);
SPDLOG_INFO("Last commit: {}", storage_->last_commit_timestamp_);
if (storage_->last_commit_timestamp_.load() == replica_commit) {
if (storage_->last_commit_timestamp_ == replica_commit) {
replica_state_.store(replication::ReplicaState::READY);
return;
}
@ -403,7 +394,7 @@ std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient
// This lock is also necessary to force the missed transaction to finish.
std::optional<uint64_t> current_wal_seq_num;
std::optional<uint64_t> current_wal_from_timestamp;
if (std::unique_lock transtacion_guard(storage_->engine_lock_); storage_->wal_file_) {
if (storage_->wal_file_) {
current_wal_seq_num.emplace(storage_->wal_file_->SequenceNumber());
current_wal_from_timestamp.emplace(storage_->wal_file_->FromTimestamp());
}

View File

@ -87,7 +87,7 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::HeartbeatReq req;
slk::Load(&req, req_reader);
replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_};
replication::HeartbeatRes res{true, storage_->last_commit_timestamp_, storage_->epoch_id_};
slk::Save(res, res_builder);
}
@ -125,7 +125,7 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
storage_->wal_seq_num_ = req.seq_num;
}
if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) {
if (req.previous_commit_timestamp != storage_->last_commit_timestamp_) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
@ -134,14 +134,14 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
transaction_complete = durability::IsWalDeltaDataTypeTransactionEnd(delta.type);
}
replication::AppendDeltasRes res{false, storage_->last_commit_timestamp_.load()};
replication::AppendDeltasRes res{false, storage_->last_commit_timestamp_};
slk::Save(res, res_builder);
return;
}
ReadAndApplyDelta(&decoder);
replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
@ -157,7 +157,6 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
// Clear the database
storage_->vertices_.clear();
storage_->edges_.clear();
@ -188,9 +187,8 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
} catch (const durability::RecoveryFailure &e) {
LOG_FATAL("Couldn't load the snapshot because of: {}", e.what());
}
storage_guard.unlock();
replication::SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
replication::SnapshotRes res{true, storage_->last_commit_timestamp_};
slk::Save(res, res_builder);
// Delete other durability files
@ -226,7 +224,7 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B
LoadWal(&decoder);
}
replication::WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
replication::WalFilesRes res{true, storage_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
@ -240,7 +238,7 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk:
LoadWal(&decoder);
replication::CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
replication::CurrentWalRes res{true, storage_->last_commit_timestamp_};
slk::Save(res, res_builder);
}
@ -298,17 +296,17 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// auto vertex_acc = storage_->vertices_.access();
std::optional<std::pair<uint64_t, Storage::Accessor>> commit_timestamp_and_accessor;
auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
if (!commit_timestamp_and_accessor) {
commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access());
} else if (commit_timestamp_and_accessor->first != commit_timestamp) {
throw utils::BasicException("Received more than one transaction!");
}
return &commit_timestamp_and_accessor->second;
};
// auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
// if (!commit_timestamp_and_accessor) {
// commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access());
// } else if (commit_timestamp_and_accessor->first != commit_timestamp) {
// throw utils::BasicException("Received more than one transaction!");
// }
// return &commit_timestamp_and_accessor->second;
// };
uint64_t applied_deltas = 0;
auto max_commit_timestamp = storage_->last_commit_timestamp_.load();
auto max_commit_timestamp = storage_->last_commit_timestamp_;
for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) {
const auto [timestamp, delta] = ReadDelta(decoder);
@ -423,13 +421,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// // The edge visibility check must be done here manually because we
// // don't allow direct access to the edges through the public API.
// {
// bool is_visible = true;
// Delta *delta = nullptr;
// {
// std::lock_guard<utils::SpinLock> guard(edge->lock);
// is_visible = !edge->deleted;
// delta = edge->delta;
// }
// auto is_visible = !edge->deleted;
// auto *delta = edge->delta;
// ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW, [&is_visible](const Delta &delta) {
// switch (delta.action) {
// case Delta::Action::ADD_LABEL:
@ -466,8 +459,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// &storage_->indices_,
// &storage_->constraints_,
// storage_->config_.items,
// storage_->schema_validator_,
// storage_->schemas_};
// storage_->schema_validator_};
// auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
// delta.vertex_edge_set_property.value);

View File

@ -401,19 +401,17 @@ Storage::Storage(Config config)
}
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(
utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break;
}
}
});
}
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage<false>(); });
// TODO(antaljanosbenjamin): handle snapshots
// snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] {
// if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) {
// switch (maybe_error.GetError()) {
// case CreateSnapshotError::DisabledForReplica:
// spdlog::warn(
// utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
// break;
// }
// }
// });
}
if (timestamp_ == kTimestampInitialId) {
@ -424,9 +422,6 @@ Storage::Storage(Config config)
}
Storage::~Storage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
}
{
// Clear replication data
replication_server_.reset();
@ -437,7 +432,7 @@ Storage::~Storage() {
wal_file_ = std::nullopt;
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Stop();
// TODO(antaljanosbenjamin): stop snapshot creation
}
if (config_.durability.snapshot_on_exit) {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) {
@ -452,17 +447,12 @@ Storage::~Storage() {
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level)
: storage_(storage),
// The lock must be acquired before creating the transaction object to
// prevent freshly created transactions from dangling in an active state
// during exclusive operations.
storage_guard_(storage_->main_lock_),
transaction_(storage->CreateTransaction(isolation_level)),
is_transaction_active_(true),
config_(storage->config_.items) {}
Storage::Accessor::Accessor(Accessor &&other) noexcept
: storage_(other.storage_),
storage_guard_(std::move(other.storage_guard_)),
transaction_(std::move(other.transaction_)),
commit_timestamp_(other.commit_timestamp_),
is_transaction_active_(other.is_transaction_active_),
@ -533,8 +523,6 @@ Result<std::optional<VertexAccessor>> Storage::Accessor::DeleteVertex(VertexAcce
"accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_;
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) {
@ -563,8 +551,6 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
{
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) return std::optional<ReturnType>{};
@ -603,8 +589,6 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
}
}
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
// We need to check again for serialization errors because we unlocked the
// vertex. Some other transaction could have modified the vertex in the
// meantime if we didn't have any edges to delete.
@ -634,20 +618,6 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
auto *from_vertex = from->vertex_;
auto *to_vertex = to->vertex_;
// Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
if (from_vertex < to_vertex) {
guard_from.lock();
guard_to.lock();
} else if (from_vertex > to_vertex) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR;
if (from_vertex->deleted) return Error::DELETED_OBJECT;
@ -656,7 +626,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
if (to_vertex->deleted) return Error::DELETED_OBJECT;
}
auto gid = Gid::FromUint(storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel));
auto gid = Gid::FromUint(storage_->edge_id_++);
EdgeRef edge(gid);
if (config_.properties_on_edges) {
auto acc = storage_->edges_.access();
@ -675,7 +645,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
++storage_->edge_count_;
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
&storage_->constraints_, config_, storage_->schema_validator_);
@ -694,20 +664,6 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
auto *from_vertex = from->vertex_;
auto *to_vertex = to->vertex_;
// Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
if (&from_vertex < &to_vertex) {
guard_from.lock();
guard_to.lock();
} else if (&from_vertex > &to_vertex) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR;
if (from_vertex->deleted) return Error::DELETED_OBJECT;
@ -722,8 +678,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
// that runs single-threadedly and while this instance is set-up to apply
// threads (it is the replica), it is guaranteed that no other writes are
// possible.
storage_->edge_id_.store(std::max(storage_->edge_id_.load(std::memory_order_acquire), gid.AsUint() + 1),
std::memory_order_release);
storage_->edge_id_ = std::max(storage_->edge_id_, gid.AsUint() + 1);
EdgeRef edge(gid);
if (config_.properties_on_edges) {
@ -743,7 +698,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
++storage_->edge_count_;
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
&storage_->constraints_, config_, storage_->schema_validator_);
@ -756,10 +711,8 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
auto edge_ref = edge->edge_;
auto edge_type = edge->edge_type_;
std::unique_lock<utils::SpinLock> guard;
if (config_.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
guard = std::unique_lock<utils::SpinLock>(edge_ptr->lock);
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
@ -769,20 +722,6 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
auto *from_vertex = edge->from_vertex_;
auto *to_vertex = edge->to_vertex_;
// Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
if (&from_vertex < &to_vertex) {
guard_from.lock();
guard_to.lock();
} else if (&from_vertex > &to_vertex) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!from_vertex->deleted, "Invalid database state!");
@ -827,7 +766,7 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
// Decrement edge count.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
--storage_->edge_count_;
return std::make_optional<EdgeAccessor>(edge_ref, edge_type, from_vertex, to_vertex, &transaction_,
&storage_->indices_, &storage_->constraints_, config_,
@ -887,75 +826,66 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
// Save these so we can mark them used in the commit log.
uint64_t start_timestamp = transaction_.start_timestamp;
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp));
commit_timestamp_.emplace(storage_->CommitTimestamp(desired_commit_timestamp));
// Before committing and validating vertices against unique constraints,
// we have to update unique constraints with the vertices that are going
// to be validated/committed.
for (const auto &delta : transaction_.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) {
continue;
}
storage_->constraints_.unique_constraints.UpdateBeforeCommit(prev.vertex, transaction_);
// Before committing and validating vertices against unique constraints,
// we have to update unique constraints with the vertices that are going
// to be validated/committed.
for (const auto &delta : transaction_.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) {
continue;
}
storage_->constraints_.unique_constraints.UpdateBeforeCommit(prev.vertex, transaction_);
}
// Validate that unique constraints are satisfied for all modified
// vertices.
for (const auto &delta : transaction_.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) {
continue;
}
// Validate that unique constraints are satisfied for all modified
// vertices.
for (const auto &delta : transaction_.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) {
continue;
}
// No need to take any locks here because we modified this vertex and no
// one else can touch it until we commit.
unique_constraint_violation =
storage_->constraints_.unique_constraints.Validate(*prev.vertex, transaction_, *commit_timestamp_);
if (unique_constraint_violation) {
break;
}
}
// No need to take any locks here because we modified this vertex and no
// one else can touch it until we commit.
unique_constraint_violation =
storage_->constraints_.unique_constraints.Validate(*prev.vertex, transaction_, *commit_timestamp_);
if (unique_constraint_violation) {
break;
}
if (!unique_constraint_violation) {
// Write transaction to WAL while holding the engine lock to make sure
// that committed transactions are sorted by the commit timestamp in the
// WAL files. We supply the new commit timestamp to the function so that
// it knows what will be the final commit timestamp. The WAL must be
// written before actually committing the transaction (before setting
// the commit timestamp) so that no other transaction can see the
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
storage_->AppendToWal(transaction_, *commit_timestamp_);
}
if (!unique_constraint_violation) {
// Write transaction to WAL while holding the engine lock to make sure
// that committed transactions are sorted by the commit timestamp in the
// WAL files. We supply the new commit timestamp to the function so that
// it knows what will be the final commit timestamp. The WAL must be
// written before actually committing the transaction (before setting
// the commit timestamp) so that no other transaction can see the
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
storage_->AppendToWal(transaction_, *commit_timestamp_);
}
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
// timestamp in the list.
storage_->committed_transactions_.WithLock([&](auto & /*committed_transactions*/) {
// TODO: release lock, and update all deltas to have a local copy
// of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
storage_->last_commit_timestamp_.store(*commit_timestamp_);
}
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
engine_guard.unlock();
});
storage_->commit_log_->MarkFinished(start_timestamp);
// TODO(antaljanosbenjamin): Figure out:
// 1. How the committed transactions are sorted in `committed_transactions_`
// 2. Why it was necessary to lock `committed_transactions_` when it was not accessed at all
// TODO: Update all deltas to have a local copy of the commit timestamp
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
storage_->last_commit_timestamp_ = *commit_timestamp_;
}
storage_->commit_log_->MarkFinished(start_timestamp);
}
if (unique_constraint_violation) {
@ -971,18 +901,11 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit(
void Storage::Accessor::Abort() {
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
// We collect vertices and edges we've created here and then splice them into
// `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one
// by one and acquiring lock every time.
std::list<PrimaryKey> my_deleted_vertices;
std::list<Gid> my_deleted_edges;
for (const auto &delta : transaction_.deltas) {
auto prev = delta.prev.Get();
switch (prev.type) {
case PreviousPtr::Type::VERTEX: {
auto *vertex = prev.vertex;
std::lock_guard<utils::SpinLock> guard(vertex->lock);
Delta *current = vertex->delta;
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
@ -1022,7 +945,7 @@ void Storage::Accessor::Abort() {
// the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is
// redundant. Also, `Edge/RECREATE_OBJECT` isn't available when
// edge properties are disabled.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
++storage_->edge_count_;
break;
}
case Delta::Action::REMOVE_IN_EDGE: {
@ -1045,12 +968,12 @@ void Storage::Accessor::Abort() {
// the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is
// redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge
// properties are disabled.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
--storage_->edge_count_;
break;
}
case Delta::Action::DELETE_OBJECT: {
vertex->deleted = true;
InsertVertexPKIntoList(my_deleted_vertices, vertex->keys.Keys());
InsertVertexPKIntoList(storage_->deleted_vertices_, vertex->keys.Keys());
break;
}
case Delta::Action::RECREATE_OBJECT: {
@ -1069,7 +992,6 @@ void Storage::Accessor::Abort() {
}
case PreviousPtr::Type::EDGE: {
auto *edge = prev.edge;
std::lock_guard<utils::SpinLock> guard(edge->lock);
Delta *current = edge->delta;
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
@ -1080,7 +1002,7 @@ void Storage::Accessor::Abort() {
}
case Delta::Action::DELETE_OBJECT: {
edge->deleted = true;
my_deleted_edges.push_back(edge->gid);
storage_->deleted_edges_.push_back(edge->gid);
break;
}
case Delta::Action::RECREATE_OBJECT: {
@ -1114,20 +1036,11 @@ void Storage::Accessor::Abort() {
}
{
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
uint64_t mark_timestamp = storage_->timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// emplace back could take a long time.
engine_guard.unlock();
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas));
});
storage_->deleted_vertices_.WithLock(
[&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); });
storage_->deleted_edges_.WithLock(
[&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); });
// Release engine lock because we don't have to hold it anymore and
// emplace back could take a long time.
storage_->garbage_undo_buffers_.emplace_back(mark_timestamp, std::move(transaction_.deltas));
}
storage_->commit_log_->MarkFinished(transaction_.start_timestamp);
@ -1137,8 +1050,7 @@ void Storage::Accessor::Abort() {
void Storage::Accessor::FinalizeTransaction() {
if (commit_timestamp_) {
storage_->commit_log_->MarkFinished(*commit_timestamp_);
storage_->committed_transactions_.WithLock(
[&](auto &committed_transactions) { committed_transactions.emplace_back(std::move(transaction_)); });
storage_->committed_transactions_.emplace_back(std::move(transaction_));
commit_timestamp_.reset();
}
}
@ -1164,7 +1076,6 @@ EdgeTypeId Storage::NameToEdgeType(const std::string_view name) {
}
bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
// TODO Fix Index
return false;
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
@ -1175,7 +1086,6 @@ bool Storage::CreateIndex(LabelId label, const std::optional<uint64_t> desired_c
}
bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
// TODO Fix Index
// if (!indices_.label_property_index.CreateIndex(label, property, labelspace.access())) return false;
return false;
@ -1187,7 +1097,6 @@ bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optiona
}
bool Storage::DropIndex(LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.DropIndex(label)) return false;
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp);
@ -1197,7 +1106,6 @@ bool Storage::DropIndex(LabelId label, const std::optional<uint64_t> desired_com
}
bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_property_index.DropIndex(label, property)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
@ -1209,13 +1117,11 @@ bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional<
}
IndicesInfo Storage::ListAllIndices() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
}
utils::BasicResult<ConstraintViolation, bool> Storage::CreateExistenceConstraint(
LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
// TODO Fix constraints
// auto ret = ::memgraph::storage::v3::CreateExistenceConstraint(&constraints_, label, property, vertices_.access());
// if (ret.HasError() || !ret.GetValue()) return ret;
@ -1229,7 +1135,6 @@ utils::BasicResult<ConstraintViolation, bool> Storage::CreateExistenceConstraint
bool Storage::DropExistenceConstraint(LabelId label, PropertyId property,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!::memgraph::storage::v3::DropExistenceConstraint(&constraints_, label, property)) return false;
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp);
@ -1240,7 +1145,6 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property,
utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> Storage::CreateUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
// TODO Fix constraints
// auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access());
// if (ret.HasError() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) {
@ -1256,7 +1160,6 @@ utils::BasicResult<ConstraintViolation, UniqueConstraints::CreationStatus> Stora
UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = constraints_.unique_constraints.DropConstraint(label, properties);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
return ret;
@ -1271,17 +1174,12 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
const SchemaValidator &Storage::Accessor::GetSchemaValidator() const { return storage_->schema_validator_; }
ConstraintsInfo Storage::ListAllConstraints() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {ListExistenceConstraints(constraints_), constraints_.unique_constraints.ListConstraints()};
}
SchemasInfo Storage::ListAllSchemas() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {schemas_.ListSchemas()};
}
SchemasInfo Storage::ListAllSchemas() const { return {schemas_.ListSchemas()}; }
const Schemas::Schema *Storage::GetSchema(const LabelId primary_label) const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return schemas_.GetSchema(primary_label);
}
@ -1293,12 +1191,11 @@ bool Storage::DropSchema(const LabelId primary_label) { return schemas_.DropSche
StorageInfo Storage::GetInfo() const {
auto vertex_count = vertices_.size();
auto edge_count = edge_count_.load(std::memory_order_acquire);
double average_degree = 0.0;
if (vertex_count) {
average_degree = 2.0 * static_cast<double>(edge_count) / static_cast<double>(vertex_count);
average_degree = 2.0 * static_cast<double>(edge_count_) / static_cast<double>(vertex_count);
}
return {vertex_count, edge_count, average_degree, utils::GetMemoryUsage(),
return {vertex_count, edge_count_, average_degree, utils::GetMemoryUsage(),
utils::GetDirDiskUsage(config_.durability.storage_directory)};
}
@ -1331,48 +1228,34 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
// `timestamp`) below.
uint64_t transaction_id{0};
uint64_t start_timestamp{0};
{
std::lock_guard<utils::SpinLock> guard(engine_lock_);
transaction_id = transaction_id_++;
// Replica should have only read queries and the write queries
// can come from main instance with any past timestamp.
// To preserve snapshot isolation we set the start timestamp
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_role_ == ReplicationRole::REPLICA) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
}
transaction_id = transaction_id_++;
// Replica should have only read queries and the write queries
// can come from main instance with any past timestamp.
// To preserve snapshot isolation we set the start timestamp
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_role_ == ReplicationRole::REPLICA) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
}
return {transaction_id, start_timestamp, isolation_level};
}
// `force` means there are no active transactions, so everything can be deleted without worrying about removing some
// data that is used by an active transaction
template <bool force>
void Storage::CollectGarbage() {
if constexpr (force) {
// We take the unique lock on the main storage lock so we can forcefully clean
// everything we can
if (!main_lock_.try_lock()) {
CollectGarbage<false>();
return;
}
} else {
// Because the garbage collector iterates through the indices and constraints
// to clean them up, it must take the main lock for reading to make sure that
// the indices and constraints aren't concurrently being modified.
main_lock_.lock_shared();
// TODO(antaljanosbenjamin): figure out whether is there any active transaction or not (probably accessors should
// increment/decrement a counter). If there are no transactions, then garbage collection can be forced
CollectGarbage<false>();
return;
}
utils::OnScopeExit lock_releaser{[&] {
if constexpr (force) {
main_lock_.unlock();
} else {
main_lock_.unlock_shared();
}
}};
// Garbage collection must be performed in two phases. In the first phase,
// deltas that won't be applied by any transaction anymore are unlinked from
// the version chains. They cannot be deleted immediately, because there
@ -1380,10 +1263,6 @@ void Storage::CollectGarbage() {
// chain traversal. They are instead marked for deletion and will be deleted
// in the second GC phase in this GC iteration or some of the following
// ones.
std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock);
if (!gc_guard.owns_lock()) {
return;
}
uint64_t oldest_active_start_timestamp = commit_log_->OldestActive();
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
@ -1394,27 +1273,22 @@ void Storage::CollectGarbage() {
// We will only free vertices deleted up until now in this GC cycle, and we
// will do it after cleaning-up the indices. That way we are sure that all
// vertices that appear in an index also exist in main storage.
std::list<Gid> current_deleted_edges;
std::list<PrimaryKey> current_deleted_vertices;
deleted_vertices_->swap(current_deleted_vertices);
deleted_edges_->swap(current_deleted_edges);
// Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything.
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty();
bool run_index_cleanup = !committed_transactions_.empty() || !garbage_undo_buffers_.empty();
while (true) {
// We don't want to hold the lock on commited transactions for too long,
// because that prevents other transactions from committing.
Transaction *transaction{nullptr};
{
auto committed_transactions_ptr = committed_transactions_.Lock();
if (committed_transactions_ptr->empty()) {
if (committed_transactions_.empty()) {
break;
}
transaction = &committed_transactions_ptr->front();
transaction = &committed_transactions_.front();
}
auto commit_timestamp = transaction->commit_timestamp->load(std::memory_order_acquire);
@ -1459,7 +1333,6 @@ void Storage::CollectGarbage() {
switch (prev.type) {
case PreviousPtr::Type::VERTEX: {
Vertex *vertex = prev.vertex;
std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
if (vertex->delta != &delta) {
// Something changed, we're not the first delta in the chain
// anymore.
@ -1467,13 +1340,12 @@ void Storage::CollectGarbage() {
}
vertex->delta = nullptr;
if (vertex->deleted) {
InsertVertexPKIntoList(current_deleted_vertices, vertex->keys.Keys());
InsertVertexPKIntoList(deleted_vertices_, vertex->keys.Keys());
}
break;
}
case PreviousPtr::Type::EDGE: {
Edge *edge = prev.edge;
std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
if (edge->delta != &delta) {
// Something changed, we're not the first delta in the chain
// anymore.
@ -1481,7 +1353,7 @@ void Storage::CollectGarbage() {
}
edge->delta = nullptr;
if (edge->deleted) {
current_deleted_edges.push_back(edge->gid);
deleted_edges_.push_back(edge->gid);
}
break;
}
@ -1492,7 +1364,6 @@ void Storage::CollectGarbage() {
// part of the suffix later.
break;
}
std::unique_lock<utils::SpinLock> guard;
{
// We need to find the parent object in order to be able to use
// its lock.
@ -1502,21 +1373,13 @@ void Storage::CollectGarbage() {
}
switch (parent.type) {
case PreviousPtr::Type::VERTEX:
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock);
break;
case PreviousPtr::Type::EDGE:
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
break;
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
LOG_FATAL("Invalid database state!");
}
}
if (delta.prev.Get() != prev) {
// Something changed, we could now be the first delta in the
// chain.
continue;
}
Delta *prev_delta = prev.delta;
prev_delta->next.store(nullptr, std::memory_order_release);
break;
@ -1529,10 +1392,8 @@ void Storage::CollectGarbage() {
}
}
committed_transactions_.WithLock([&](auto &committed_transactions) {
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
committed_transactions.pop_front();
});
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
committed_transactions_.pop_front();
}
// After unlinking deltas from vertices, we refresh the indices. That way
@ -1547,38 +1408,26 @@ void Storage::CollectGarbage() {
}
{
std::unique_lock<utils::SpinLock> guard(engine_lock_);
uint64_t mark_timestamp = timestamp_;
// Take garbage_undo_buffers lock while holding the engine lock to make
// sure that entries are sorted by mark timestamp in the list.
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
// Release engine lock because we don't have to hold it anymore and
// this could take a long time.
guard.unlock();
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
// transactions from aborting until we're done marking, maybe we should
// add them one-by-one or something
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
timestamp = mark_timestamp;
}
garbage_undo_buffers.splice(garbage_undo_buffers.end(), unlinked_undo_buffers);
});
for (auto vertex : current_deleted_vertices) {
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
timestamp = mark_timestamp;
}
garbage_undo_buffers_.splice(garbage_undo_buffers_.end(), unlinked_undo_buffers);
for (const auto &vertex : deleted_vertices_) {
garbage_vertices_.emplace_back(mark_timestamp, vertex);
}
}
garbage_undo_buffers_.WithLock([&](auto &undo_buffers) {
// if force is set to true we can simply delete all the leftover undos because
// no transaction is active
if constexpr (force) {
undo_buffers.clear();
} else {
while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) {
undo_buffers.pop_front();
}
// if force is set to true we can simply delete all the leftover undos because
// no transaction is active
if constexpr (force) {
garbage_undo_buffers_.clear();
} else {
while (!garbage_undo_buffers_.empty() && garbage_undo_buffers_.front().first <= oldest_active_start_timestamp) {
garbage_undo_buffers_.pop_front();
}
});
}
{
auto vertex_acc = vertices_.access();
@ -1598,7 +1447,7 @@ void Storage::CollectGarbage() {
}
{
auto edge_acc = edges_.access();
for (auto edge : current_deleted_edges) {
for (auto edge : deleted_edges_) {
MG_ASSERT(edge_acc.remove(edge), "Invalid database state!");
}
}
@ -1643,7 +1492,7 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire);
if (replication_role_.load() == ReplicationRole::MAIN) {
if (replication_role_ == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
@ -1820,7 +1669,7 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp);
{
if (replication_role_.load() == ReplicationRole::MAIN) {
if (replication_role_ == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
@ -1835,15 +1684,10 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId
}
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() {
if (replication_role_.load() != ReplicationRole::MAIN) {
if (replication_role_ != ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica;
}
std::lock_guard snapshot_guard(snapshot_lock_);
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION);
@ -1903,7 +1747,7 @@ bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::
replication_server_ = std::make_unique<ReplicationServer>(this, std::move(endpoint), config);
replication_role_.store(ReplicationRole::REPLICA);
replication_role_ = ReplicationRole::REPLICA;
return true;
}
@ -1918,29 +1762,26 @@ bool Storage::SetMainReplicationRole() {
// This should be always called first so we finalize everything
replication_server_.reset(nullptr);
{
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_);
epoch_id_ = utils::GenerateUUID();
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
replication_role_.store(ReplicationRole::MAIN);
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_);
epoch_id_ = utils::GenerateUUID();
replication_role_ = ReplicationRole::MAIN;
return true;
}
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
MG_ASSERT(replication_role_ == ReplicationRole::MAIN, "Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
@ -1986,7 +1827,7 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
}
bool Storage::UnregisterReplica(const std::string_view name) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
MG_ASSERT(replication_role_ == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
return replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; });
});
@ -2017,9 +1858,6 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
});
}
void Storage::SetIsolationLevel(IsolationLevel isolation_level) {
std::unique_lock main_guard{main_lock_};
isolation_level_ = isolation_level;
}
void Storage::SetIsolationLevel(IsolationLevel isolation_level) { isolation_level_ = isolation_level; }
} // namespace memgraph::storage::v3

View File

@ -355,7 +355,6 @@ class Storage final {
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, Gid gid);
Storage *storage_;
std::shared_lock<utils::RWLock> storage_guard_;
Transaction transaction_;
std::optional<uint64_t> commit_timestamp_;
bool is_transaction_active_;
@ -510,22 +509,14 @@ class Storage final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
// creation of new accessors by taking a unique lock. This is used when doing
// operations on storage that affect the global state, for example index
// creation.
mutable utils::RWLock main_lock_{utils::RWLock::Priority::WRITE};
// Main object storage
VerticesSkipList vertices_;
utils::SkipList<Edge> edges_;
std::atomic<uint64_t> edge_id_{0};
uint64_t edge_id_{0};
// Even though the edge count is already kept in the `edges_` SkipList, the
// list is used only when properties are enabled for edges. Because of that we
// keep a separate count of edges that is always updated.
std::atomic<uint64_t> edge_count_{0};
uint64_t edge_count_{0};
NameIdMapper name_id_mapper_;
@ -535,7 +526,6 @@ class Storage final {
Schemas schemas_;
// Transaction engine
utils::SpinLock engine_lock_;
uint64_t timestamp_{kTimestampInitialId};
uint64_t transaction_id_{kTransactionInitialId};
// TODO: This isn't really a commit log, it doesn't even care if a
@ -544,19 +534,17 @@ class Storage final {
// whatever.
std::optional<CommitLog> commit_log_;
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
std::list<Transaction> committed_transactions_;
IsolationLevel isolation_level_;
Config config_;
utils::Scheduler gc_runner_;
std::mutex gc_lock_;
// Undo buffers that were unlinked and now are waiting to be freed.
utils::Synchronized<std::list<std::pair<uint64_t, std::list<Delta>>>, utils::SpinLock> garbage_undo_buffers_;
std::list<std::pair<uint64_t, std::list<Delta>>> garbage_undo_buffers_;
// Vertices that are logically deleted but still have to be removed from
// indices before removing them from the main storage.
utils::Synchronized<std::list<PrimaryKey>, utils::SpinLock> deleted_vertices_;
std::list<PrimaryKey> deleted_vertices_;
// Vertices that are logically deleted and removed from indices and now wait
// to be removed from the main storage.
@ -564,7 +552,7 @@ class Storage final {
// Edges that are logically deleted and wait to be removed from the main
// storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
std::list<Gid> deleted_edges_;
// Durability
std::filesystem::path snapshot_directory_;
@ -572,9 +560,6 @@ class Storage final {
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
utils::Scheduler snapshot_runner_;
utils::SpinLock snapshot_lock_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
@ -609,7 +594,7 @@ class Storage final {
utils::FileRetainer::FileLocker global_locker_;
// Last commited timestamp
std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId};
uint64_t last_commit_timestamp_{kTimestampInitialId};
class ReplicationServer;
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
@ -628,7 +613,7 @@ class Storage final {
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
std::atomic<ReplicationRole> replication_role_{ReplicationRole::MAIN};
ReplicationRole replication_role_{ReplicationRole::MAIN};
};
} // namespace memgraph::storage::v3

View File

@ -69,7 +69,6 @@ struct Vertex {
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
mutable utils::SpinLock lock;
bool deleted{false};
// uint8_t PAD;
// uint16_t PAD;

View File

@ -33,7 +33,6 @@ std::pair<bool, bool> IsVisible(Vertex *vertex, Transaction *transaction, View v
bool deleted = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex->lock);
deleted = vertex->deleted;
delta = vertex->delta;
}
@ -80,7 +79,6 @@ bool VertexAccessor::IsVisible(View view) const {
Result<bool> VertexAccessor::AddLabel(LabelId label) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
@ -102,7 +100,6 @@ ResultSchema<bool> VertexAccessor::AddLabelAndValidate(LabelId label) {
return {*maybe_violation_error};
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return {Error::SERIALIZATION_ERROR};
@ -120,8 +117,6 @@ ResultSchema<bool> VertexAccessor::AddLabelAndValidate(LabelId label) {
}
Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
if (vertex_->deleted) return Error::DELETED_OBJECT;
@ -140,7 +135,6 @@ ResultSchema<bool> VertexAccessor::RemoveLabelAndValidate(LabelId label) {
if (const auto maybe_violation_error = vertex_validator_.ValidateRemoveLabel(label); maybe_violation_error) {
return {*maybe_violation_error};
}
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return {Error::SERIALIZATION_ERROR};
@ -162,7 +156,6 @@ Result<bool> VertexAccessor::HasLabel(LabelId label, View view) const {
bool has_label = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
has_label = VertexHasLabel(*vertex_, label);
delta = vertex_->delta;
@ -209,7 +202,6 @@ Result<LabelId> VertexAccessor::PrimaryLabel(const View view) const {
bool deleted = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
delta = vertex_->delta;
}
@ -243,7 +235,6 @@ Result<PrimaryKey> VertexAccessor::PrimaryKey(const View view) const {
bool deleted = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
delta = vertex_->delta;
}
@ -282,7 +273,6 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
std::vector<LabelId> labels;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
labels = vertex_->labels;
delta = vertex_->delta;
@ -327,7 +317,6 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
@ -353,7 +342,6 @@ ResultSchema<PropertyValue> VertexAccessor::SetPropertyAndValidate(PropertyId pr
return {*maybe_violation_error};
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) {
return {Error::SERIALIZATION_ERROR};
@ -379,8 +367,6 @@ ResultSchema<PropertyValue> VertexAccessor::SetPropertyAndValidate(PropertyId pr
}
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
if (vertex_->deleted) return Error::DELETED_OBJECT;
@ -402,7 +388,6 @@ Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view
PropertyValue value;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
value = vertex_->properties.GetProperty(property);
delta = vertex_->delta;
@ -443,7 +428,6 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
std::map<PropertyId, PropertyValue> properties;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
properties = vertex_->properties.Properties();
delta = vertex_->delta;
@ -495,7 +479,6 @@ Result<std::vector<EdgeAccessor>> VertexAccessor::InEdges(View view, const std::
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
if (edge_types.empty() && !destination) {
in_edges = vertex_->in_edges;
@ -576,7 +559,6 @@ Result<std::vector<EdgeAccessor>> VertexAccessor::OutEdges(View view, const std:
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
if (edge_types.empty() && !destination) {
out_edges = vertex_->out_edges;
@ -655,7 +637,6 @@ Result<size_t> VertexAccessor::InDegree(View view) const {
size_t degree = 0;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
degree = vertex_->in_edges.size();
delta = vertex_->delta;
@ -693,7 +674,6 @@ Result<size_t> VertexAccessor::OutDegree(View view) const {
size_t degree = 0;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
degree = vertex_->out_edges.size();
delta = vertex_->delta;