diff --git a/src/kvstore/kvstore.cpp b/src/kvstore/kvstore.cpp index 37f9f503d..383324238 100644 --- a/src/kvstore/kvstore.cpp +++ b/src/kvstore/kvstore.cpp @@ -72,7 +72,7 @@ std::optional<std::string> KVStore::Get(const std::string &key) const noexcept { return value; } -bool KVStore::Delete(const std::string &key) { +bool KVStore::Delete(std::string_view key) { auto s = pimpl_->db->Delete(rocksdb::WriteOptions(), key); return s.ok(); } diff --git a/src/kvstore/kvstore.hpp b/src/kvstore/kvstore.hpp index cefadc485..d378b2439 100644 --- a/src/kvstore/kvstore.hpp +++ b/src/kvstore/kvstore.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -91,7 +91,7 @@ class KVStore final { * true if the key doesn't exist and underlying storage * didn't encounter any error. */ - bool Delete(const std::string &key); + bool Delete(std::string_view key); /** * Deletes the keys and corresponding values from storage. diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index c9d4cd550..c48da7bca 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -334,6 +334,8 @@ class DiskStorage final : public Storage { uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); + void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); } + std::unique_ptr<RocksDBStorage> kvstore_; std::unique_ptr<kvstore::KVStore> durability_kvstore_; }; diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index f102cfd7e..6a39dd6b2 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -33,15 +33,16 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; namespace { inline constexpr uint16_t kEpochHistoryRetention = 1000; -std::string RegisterReplicaErrorToString(InMemoryStorage::RegisterReplicaError error) { +std::string RegisterReplicaErrorToString(InMemoryStorage::ReplicationState::RegisterReplicaError error) { + using enum InMemoryStorage::ReplicationState::RegisterReplicaError; switch (error) { - case InMemoryStorage::RegisterReplicaError::NAME_EXISTS: + case NAME_EXISTS: return "NAME_EXISTS"; - case InMemoryStorage::RegisterReplicaError::END_POINT_EXISTS: + case END_POINT_EXISTS: return "END_POINT_EXISTS"; - case InMemoryStorage::RegisterReplicaError::CONNECTION_FAILED: + case CONNECTION_FAILED: return "CONNECTION_FAILED"; - case InMemoryStorage::RegisterReplicaError::COULD_NOT_BE_PERSISTED: + case COULD_NOT_BE_PERSISTED: return "COULD_NOT_BE_PERSISTED"; } } @@ -153,12 +154,13 @@ InMemoryStorage::InMemoryStorage(Config config) if (config_.durability.restore_replication_state_on_startup) { spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash."); utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory); - storage_ = + // TODO: Move this to replication + replication_state_.durability_ = std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory); RestoreReplicationRole(); - if (replication_role_ == replication::ReplicationRole::MAIN) { + if (replication_state_.GetRole() == replication::ReplicationRole::MAIN) { RestoreReplicas(); } } else { @@ -168,7 +170,7 @@ InMemoryStorage::InMemoryStorage(Config config) } if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && - replication_role_ == replication::ReplicationRole::MAIN) { + replication_state_.GetRole() == replication::ReplicationRole::MAIN) { spdlog::warn( "The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider " "enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because " @@ -182,8 +184,7 @@ InMemoryStorage::~InMemoryStorage() { } { // Clear replication data - replication_server_.reset(); - replication_clients_.WithLock([&](auto &clients) { clients.clear(); }); + replication_state_.Reset(); } if (wal_file_) { wal_file_->FinalizeWal(); @@ -241,7 +242,7 @@ VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex() { return {&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_}; } -VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex(storage::Gid gid) { +VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertexEx(storage::Gid gid) { OOMExceptionEnabler oom_exception; // NOTE: When we update the next `vertex_id_` here we perform a RMW // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue @@ -448,8 +449,8 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso &storage_->constraints_, config_); } -Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, - EdgeTypeId edge_type, storage::Gid gid) { +Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, + EdgeTypeId edge_type, storage::Gid gid) { OOMExceptionEnabler oom_exception; MG_ASSERT(from->transaction_ == to->transaction_, "VertexAccessors must be from the same transaction when creating " @@ -702,7 +703,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory // modifications before they are written to disk. // Replica can log only the write transaction received from Main // so the Wal files are consistent - if (mem_storage->replication_role_ == replication::ReplicationRole::MAIN || + if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { could_replicate_all_sync_replicas = mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_); @@ -718,7 +719,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // Replica can only update the last commit timestamp with // the commits received from main. - if (mem_storage->replication_role_ == replication::ReplicationRole::MAIN || + if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { // Update the last commit timestamp mem_storage->last_commit_timestamp_.store(*commit_timestamp_); @@ -1155,7 +1156,7 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S // of any query on replica to the last commited transaction // which is timestamp_ as only commit of transaction with writes // can change the value of it. - if (replication_role_ == replication::ReplicationRole::REPLICA) { + if (replication_state_.GetRole() == replication::ReplicationRole::REPLICA) { start_timestamp = timestamp_; } else { start_timestamp = timestamp_++; @@ -1519,184 +1520,166 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - if (replication_role_.load() == replication::ReplicationRole::MAIN) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->StartTransactionReplication(wal_file_->SequenceNumber()); - } - }); - } + replication_state_.InitializeTransaction(wal_file_->SequenceNumber()); - // Helper lambda that traverses the delta chain on order to find the first - // delta that should be processed and then appends all discovered deltas. - auto find_and_apply_deltas = [&](const auto *delta, const auto &parent, auto filter) { - while (true) { - auto *older = delta->next.load(std::memory_order_acquire); - if (older == nullptr || older->timestamp->load(std::memory_order_acquire) != current_commit_timestamp) break; - delta = older; - } - while (true) { - if (filter(delta->action)) { - wal_file_->AppendDelta(*delta, parent, final_commit_timestamp); - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction( - [&](auto &stream) { stream.AppendDelta(*delta, parent, final_commit_timestamp); }); - } - }); + auto append_deltas = [&](auto callback) { + // Helper lambda that traverses the delta chain on order to find the first + // delta that should be processed and then appends all discovered deltas. + auto find_and_apply_deltas = [&](const auto *delta, const auto &parent, auto filter) { + while (true) { + auto *older = delta->next.load(std::memory_order_acquire); + if (older == nullptr || older->timestamp->load(std::memory_order_acquire) != current_commit_timestamp) break; + delta = older; } - auto prev = delta->prev.Get(); + while (true) { + if (filter(delta->action)) { + callback(*delta, parent, final_commit_timestamp); + } + auto prev = delta->prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::DELTA) break; + delta = prev.delta; + } + }; + + // The deltas are ordered correctly in the `transaction.deltas` buffer, but we + // don't traverse them in that order. That is because for each delta we need + // information about the vertex or edge they belong to and that information + // isn't stored in the deltas themselves. In order to find out information + // about the corresponding vertex or edge it is necessary to traverse the + // delta chain for each delta until a vertex or edge is encountered. This + // operation is very expensive as the chain grows. + // Instead, we traverse the edges until we find a vertex or edge and traverse + // their delta chains. This approach has a drawback because we lose the + // correct order of the operations. Because of that, we need to traverse the + // deltas several times and we have to manually ensure that the stored deltas + // will be ordered correctly. + + // 1. Process all Vertex deltas and store all operations that create vertices + // and modify vertex data. + for (const auto &delta : transaction.deltas) { + auto prev = delta.prev.Get(); MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::DELTA) break; - delta = prev.delta; + if (prev.type != PreviousPtr::Type::VERTEX) continue; + find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { + switch (action) { + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + return true; + + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + return false; + } + }); + } + // 2. Process all Vertex deltas and store all operations that create edges. + for (const auto &delta : transaction.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::VERTEX) continue; + find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { + switch (action) { + case Delta::Action::REMOVE_OUT_EDGE: + return true; + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + return false; + } + }); + } + // 3. Process all Edge deltas and store all operations that modify edge data. + for (const auto &delta : transaction.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::EDGE) continue; + find_and_apply_deltas(&delta, *prev.edge, [](auto action) { + switch (action) { + case Delta::Action::SET_PROPERTY: + return true; + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + return false; + } + }); + } + // 4. Process all Vertex deltas and store all operations that delete edges. + for (const auto &delta : transaction.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::VERTEX) continue; + find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { + switch (action) { + case Delta::Action::ADD_OUT_EDGE: + return true; + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + return false; + } + }); + } + // 5. Process all Vertex deltas and store all operations that delete vertices. + for (const auto &delta : transaction.deltas) { + auto prev = delta.prev.Get(); + MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); + if (prev.type != PreviousPtr::Type::VERTEX) continue; + find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { + switch (action) { + case Delta::Action::RECREATE_OBJECT: + return true; + case Delta::Action::DELETE_DESERIALIZED_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + return false; + } + }); } }; - // The deltas are ordered correctly in the `transaction.deltas` buffer, but we - // don't traverse them in that order. That is because for each delta we need - // information about the vertex or edge they belong to and that information - // isn't stored in the deltas themselves. In order to find out information - // about the corresponding vertex or edge it is necessary to traverse the - // delta chain for each delta until a vertex or edge is encountered. This - // operation is very expensive as the chain grows. - // Instead, we traverse the edges until we find a vertex or edge and traverse - // their delta chains. This approach has a drawback because we lose the - // correct order of the operations. Because of that, we need to traverse the - // deltas several times and we have to manually ensure that the stored deltas - // will be ordered correctly. - - // 1. Process all Vertex deltas and store all operations that create vertices - // and modify vertex data. - for (const auto &delta : transaction.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) continue; - find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { - switch (action) { - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: - case Delta::Action::SET_PROPERTY: - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - return true; - - case Delta::Action::RECREATE_OBJECT: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: - return false; - } - }); - } - // 2. Process all Vertex deltas and store all operations that create edges. - for (const auto &delta : transaction.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) continue; - find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { - switch (action) { - case Delta::Action::REMOVE_OUT_EDGE: - return true; - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: - case Delta::Action::RECREATE_OBJECT: - case Delta::Action::SET_PROPERTY: - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - return false; - } - }); - } - // 3. Process all Edge deltas and store all operations that modify edge data. - for (const auto &delta : transaction.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::EDGE) continue; - find_and_apply_deltas(&delta, *prev.edge, [](auto action) { - switch (action) { - case Delta::Action::SET_PROPERTY: - return true; - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: - case Delta::Action::RECREATE_OBJECT: - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: - return false; - } - }); - } - // 4. Process all Vertex deltas and store all operations that delete edges. - for (const auto &delta : transaction.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) continue; - find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { - switch (action) { - case Delta::Action::ADD_OUT_EDGE: - return true; - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: - case Delta::Action::RECREATE_OBJECT: - case Delta::Action::SET_PROPERTY: - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: - return false; - } - }); - } - // 5. Process all Vertex deltas and store all operations that delete vertices. - for (const auto &delta : transaction.deltas) { - auto prev = delta.prev.Get(); - MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!"); - if (prev.type != PreviousPtr::Type::VERTEX) continue; - find_and_apply_deltas(&delta, *prev.vertex, [](auto action) { - switch (action) { - case Delta::Action::RECREATE_OBJECT: - return true; - case Delta::Action::DELETE_DESERIALIZED_OBJECT: - case Delta::Action::DELETE_OBJECT: - case Delta::Action::SET_PROPERTY: - case Delta::Action::ADD_LABEL: - case Delta::Action::REMOVE_LABEL: - case Delta::Action::ADD_IN_EDGE: - case Delta::Action::ADD_OUT_EDGE: - case Delta::Action::REMOVE_IN_EDGE: - case Delta::Action::REMOVE_OUT_EDGE: - return false; - } - }); - } - - // Add a delta that indicates that the transaction is fully written to the WAL - // file. - wal_file_->AppendTransactionEnd(final_commit_timestamp); - - FinalizeWalFile(); - - auto finalized_on_all_replicas = true; - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); }); - const auto finalized = client->FinalizeTransactionReplication(); - - if (client->Mode() == replication::ReplicationMode::SYNC) { - finalized_on_all_replicas = finalized && finalized_on_all_replicas; - } - } + append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) { + wal_file_->AppendDelta(delta, parent, timestamp); + replication_state_.AppendDelta(delta, parent, timestamp); }); - return finalized_on_all_replicas; + // Add a delta that indicates that the transaction is fully written to the WAL + // file.replication_clients_.WithLock + wal_file_->AppendTransactionEnd(final_commit_timestamp); + FinalizeWalFile(); + + return replication_state_.FinalizeTransaction(final_commit_timestamp); } bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label, @@ -1706,31 +1689,15 @@ bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperati return true; } - auto finalized_on_all_replicas = true; wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); - { - if (replication_role_.load() == replication::ReplicationRole::MAIN) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->StartTransactionReplication(wal_file_->SequenceNumber()); - client->IfStreamingTransaction( - [&](auto &stream) { stream.AppendOperation(operation, label, properties, final_commit_timestamp); }); - - const auto finalized = client->FinalizeTransactionReplication(); - if (client->Mode() == replication::ReplicationMode::SYNC) { - finalized_on_all_replicas = finalized && finalized_on_all_replicas; - } - } - }); - } - } FinalizeWalFile(); - return finalized_on_all_replicas; + return replication_state_.AppendToWalDataDefinition(wal_file_->SequenceNumber(), operation, label, properties, + final_commit_timestamp); } utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot( std::optional<bool> is_periodic) { - if (replication_role_.load() != replication::ReplicationRole::MAIN) { + if (replication_state_.GetRole() != replication::ReplicationRole::MAIN) { return CreateSnapshotError::DisabledForReplica; } @@ -1796,41 +1763,54 @@ uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_ return *desired_commit_timestamp; } -bool InMemoryStorage::SetReplicaRole(io::network::Endpoint endpoint, - const replication::ReplicationServerConfig &config) { - // We don't want to restart the server if we're already a REPLICA - if (replication_role_ == replication::ReplicationRole::REPLICA) { - return false; +void InMemoryStorage::EstablishNewEpoch() { + std::unique_lock engine_guard{engine_lock_}; + if (wal_file_) { + wal_file_->FinalizeWal(); + wal_file_.reset(); } - auto port = endpoint.port; // assigning because we will move the endpoint - replication_server_ = std::make_unique<ReplicationServer>(this, std::move(endpoint), config); + // Generate new epoch id and save the last one to the history. + if (epoch_history_.size() == kEpochHistoryRetention) { + epoch_history_.pop_front(); + } + epoch_history_.emplace_back(std::exchange(epoch_id_, utils::GenerateUUID()), last_commit_timestamp_); +} - if (ShouldStoreAndRestoreReplicationState()) { - // Only thing that matters here is the role saved as REPLICA and the listening port - auto data = replication::ReplicationStatusToJSON( - replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, - .ip_address = "", - .port = port, - .sync_mode = replication::ReplicationMode::SYNC, - .replica_check_frequency = std::chrono::seconds(0), - .ssl = std::nullopt, - .role = replication::ReplicationRole::REPLICA}); +utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() { + auto locker_accessor = global_locker_.Access(); + return locker_accessor.IsPathLocked(config_.durability.storage_directory); +} - if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) { - spdlog::error("Error when saving REPLICA replication role in settings."); - return false; +utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::LockPath() { + auto locker_accessor = global_locker_.Access(); + return locker_accessor.AddPath(config_.durability.storage_directory); +} + +utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() { + { + auto locker_accessor = global_locker_.Access(); + const auto ret = locker_accessor.RemovePath(config_.durability.storage_directory); + if (ret.HasError() || !ret.GetValue()) { + // Exit without cleaning the queue + return ret; } } - - replication_role_.store(replication::ReplicationRole::REPLICA); + // We use locker accessor in seperate scope so we don't produce deadlock + // after we call clean queue. + file_retainer_.CleanQueue(); return true; } -bool InMemoryStorage::SetMainReplicationRole() { +void storage::InMemoryStorage::ReplicationState::Reset() { + replication_server_.reset(); + replication_clients_.WithLock([&](auto &clients) { clients.clear(); }); +} + +bool storage::InMemoryStorage::ReplicationState::SetMainReplicationRole(storage::Storage *storage) { // We don't want to generate new epoch_id and do the // cleanup if we're already a MAIN - if (replication_role_ == replication::ReplicationRole::MAIN) { + if (GetRole() == replication::ReplicationRole::MAIN) { return false; } @@ -1838,20 +1818,7 @@ bool InMemoryStorage::SetMainReplicationRole() { // This should be always called first so we finalize everything replication_server_.reset(nullptr); - { - std::unique_lock engine_guard{engine_lock_}; - if (wal_file_) { - wal_file_->FinalizeWal(); - wal_file_.reset(); - } - - // Generate new epoch id and save the last one to the history. - if (epoch_history_.size() == kEpochHistoryRetention) { - epoch_history_.pop_front(); - } - epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_); - epoch_id_ = utils::GenerateUUID(); - } + storage->EstablishNewEpoch(); if (ShouldStoreAndRestoreReplicationState()) { // Only thing that matters here is the role saved as MAIN @@ -1864,21 +1831,90 @@ bool InMemoryStorage::SetMainReplicationRole() { .ssl = std::nullopt, .role = replication::ReplicationRole::MAIN}); - if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) { + if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) { spdlog::error("Error when saving MAIN replication role in settings."); return false; } } - replication_role_.store(replication::ReplicationRole::MAIN); + SetRole(replication::ReplicationRole::MAIN); return true; } -utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::RegisterReplica( - std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, - const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { - MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, - "Only main instance can register a replica!"); +bool storage::InMemoryStorage::ReplicationState::AppendToWalDataDefinition(const uint64_t seq_num, + durability::StorageGlobalOperation operation, + LabelId label, + const std::set<PropertyId> &properties, + uint64_t final_commit_timestamp) { + bool finalized_on_all_replicas = true; + // TODO Should we return true if not MAIN? + if (GetRole() == replication::ReplicationRole::MAIN) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->StartTransactionReplication(seq_num); + client->IfStreamingTransaction( + [&](auto &stream) { stream.AppendOperation(operation, label, properties, final_commit_timestamp); }); + + const auto finalized = client->FinalizeTransactionReplication(); + if (client->Mode() == replication::ReplicationMode::SYNC) { + finalized_on_all_replicas = finalized && finalized_on_all_replicas; + } + } + }); + } + return finalized_on_all_replicas; +} + +void storage::InMemoryStorage::ReplicationState::InitializeTransaction(uint64_t seq_num) { + if (GetRole() == replication::ReplicationRole::MAIN) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->StartTransactionReplication(seq_num); + } + }); + } +} + +void storage::InMemoryStorage::ReplicationState::AppendDelta(const Delta &delta, const Edge &parent, + uint64_t timestamp) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); }); + } + }); +} + +void storage::InMemoryStorage::ReplicationState::AppendDelta(const Delta &delta, const Vertex &parent, + uint64_t timestamp) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); }); + } + }); +} + +bool storage::InMemoryStorage::ReplicationState::FinalizeTransaction(uint64_t timestamp) { + bool finalized_on_all_replicas = true; + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); }); + const auto finalized = client->FinalizeTransactionReplication(); + + if (client->Mode() == replication::ReplicationMode::SYNC) { + finalized_on_all_replicas = finalized && finalized_on_all_replicas; + } + } + }); + return finalized_on_all_replicas; +} + +utils::BasicResult<InMemoryStorage::ReplicationState::RegisterReplicaError> +InMemoryStorage::ReplicationState::RegisterReplica(std::string name, io::network::Endpoint endpoint, + const replication::ReplicationMode replication_mode, + const replication::RegistrationMode registration_mode, + const replication::ReplicationClientConfig &config, + InMemoryStorage *storage) { + MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!"); const bool name_exists = replication_clients_.WithLock([&](auto &clients) { return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; }); @@ -1906,13 +1942,13 @@ utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::Regis .replica_check_frequency = config.replica_check_frequency, .ssl = config.ssl, .role = replication::ReplicationRole::REPLICA}); - if (!storage_->Put(name, data.dump())) { + if (!durability_->Put(name, data.dump())) { spdlog::error("Error when saving replica {} in settings.", name); return RegisterReplicaError::COULD_NOT_BE_PERSISTED; } } - auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config); + auto client = std::make_unique<ReplicationClient>(std::move(name), storage, endpoint, replication_mode, config); if (client->State() == replication::ReplicaState::INVALID) { if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) { @@ -1922,29 +1958,62 @@ utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::Regis spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); } - return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<InMemoryStorage::RegisterReplicaError> { - // Another thread could have added a client with same name while - // we were connecting to this client. - if (std::any_of(clients.begin(), clients.end(), - [&](const auto &other_client) { return client->Name() == other_client->Name(); })) { - return RegisterReplicaError::NAME_EXISTS; - } + return replication_clients_.WithLock( + [&](auto &clients) -> utils::BasicResult<InMemoryStorage::ReplicationState::RegisterReplicaError> { + // Another thread could have added a client with same name while + // we were connecting to this client. + if (std::any_of(clients.begin(), clients.end(), + [&](const auto &other_client) { return client->Name() == other_client->Name(); })) { + return RegisterReplicaError::NAME_EXISTS; + } - if (std::any_of(clients.begin(), clients.end(), - [&client](const auto &other_client) { return client->Endpoint() == other_client->Endpoint(); })) { - return RegisterReplicaError::END_POINT_EXISTS; - } + if (std::any_of(clients.begin(), clients.end(), [&client](const auto &other_client) { + return client->Endpoint() == other_client->Endpoint(); + })) { + return RegisterReplicaError::END_POINT_EXISTS; + } - clients.push_back(std::move(client)); - return {}; - }); + clients.push_back(std::move(client)); + return {}; + }); } -bool InMemoryStorage::UnregisterReplica(const std::string &name) { - MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, - "Only main instance can unregister a replica!"); +bool InMemoryStorage::ReplicationState::SetReplicaRole(io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config, + InMemoryStorage *storage) { + // We don't want to restart the server if we're already a REPLICA + if (GetRole() == replication::ReplicationRole::REPLICA) { + return false; + } + + auto port = endpoint.port; // assigning because we will move the endpoint + replication_server_ = std::make_unique<ReplicationServer>(storage, std::move(endpoint), config); + if (ShouldStoreAndRestoreReplicationState()) { - if (!storage_->Delete(name)) { + // Only thing that matters here is the role saved as REPLICA and the listening port + auto data = replication::ReplicationStatusToJSON( + replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, + .ip_address = "", + .port = port, + .sync_mode = replication::ReplicationMode::SYNC, + .replica_check_frequency = std::chrono::seconds(0), + .ssl = std::nullopt, + .role = replication::ReplicationRole::REPLICA}); + + if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) { + spdlog::error("Error when saving REPLICA replication role in settings."); + return false; + } + } + + SetRole(replication::ReplicationRole::REPLICA); + return true; +} + +bool InMemoryStorage::ReplicationState::UnregisterReplica(std::string_view name) { + MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can unregister a replica!"); + if (ShouldStoreAndRestoreReplicationState()) { + if (!durability_->Delete(name)) { spdlog::error("Error when removing replica {} from settings.", name); return false; } @@ -1955,7 +2024,8 @@ bool InMemoryStorage::UnregisterReplica(const std::string &name) { }); } -std::optional<replication::ReplicaState> InMemoryStorage::GetReplicaState(const std::string_view name) { +std::optional<replication::ReplicaState> InMemoryStorage::ReplicationState::GetReplicaState( + const std::string_view name) { return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> { const auto client_it = std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; }); @@ -1966,9 +2036,7 @@ std::optional<replication::ReplicaState> InMemoryStorage::GetReplicaState(const }); } -replication::ReplicationRole InMemoryStorage::GetReplicationRole() const { return replication_role_; } - -std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicasInfo() { +std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicationState::ReplicasInfo() { return replication_clients_.WithLock([](auto &clients) { std::vector<InMemoryStorage::ReplicaInfo> replica_info; replica_info.reserve(clients.size()); @@ -1980,7 +2048,7 @@ std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicasInfo() { }); } -void InMemoryStorage::RestoreReplicationRole() { +void InMemoryStorage::ReplicationState::RestoreReplicationRole(InMemoryStorage *storage) { if (!ShouldStoreAndRestoreReplicationState()) { return; } @@ -1988,7 +2056,7 @@ void InMemoryStorage::RestoreReplicationRole() { spdlog::info("Restoring replication role."); uint16_t port = replication::kDefaultReplicationPort; - const auto replication_data = storage_->Get(replication::kReservedReplicationRoleName); + const auto replication_data = durability_->Get(replication::kReservedReplicationRoleName); if (!replication_data.has_value()) { spdlog::debug("Cannot find data needed for restore replication role in persisted metadata."); return; @@ -2002,29 +2070,29 @@ void InMemoryStorage::RestoreReplicationRole() { const auto replication_status = *maybe_replication_status; if (!replication_status.role.has_value()) { - replication_role_.store(replication::ReplicationRole::MAIN); + SetRole(replication::ReplicationRole::MAIN); } else { - replication_role_.store(*replication_status.role); + SetRole(*replication_status.role); port = replication_status.port; } - if (replication_role_ == replication::ReplicationRole::REPLICA) { + if (GetRole() == replication::ReplicationRole::REPLICA) { io::network::Endpoint endpoint(replication::kDefaultReplicationServerIp, port); replication_server_ = - std::make_unique<ReplicationServer>(this, std::move(endpoint), replication::ReplicationServerConfig{}); + std::make_unique<ReplicationServer>(storage, std::move(endpoint), replication::ReplicationServerConfig{}); } spdlog::info("Replication role restored to {}.", - replication_role_ == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA"); + GetRole() == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA"); } -void InMemoryStorage::RestoreReplicas() { +void InMemoryStorage::ReplicationState::RestoreReplicas(InMemoryStorage *storage) { if (!ShouldStoreAndRestoreReplicationState()) { return; } spdlog::info("Restoring replicas."); - for (const auto &[replica_name, replica_data] : *storage_) { + for (const auto &[replica_name, replica_data] : *durability_) { spdlog::info("Restoring replica {}.", replica_name); const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data)); @@ -2046,7 +2114,8 @@ void InMemoryStorage::RestoreReplicas() { { .replica_check_frequency = replica_status.replica_check_frequency, .ssl = replica_status.ssl, - }); + }, + storage); if (ret.HasError()) { MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); @@ -2056,31 +2125,4 @@ void InMemoryStorage::RestoreReplicas() { } } -bool InMemoryStorage::ShouldStoreAndRestoreReplicationState() const { return nullptr != storage_; } - -utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() { - auto locker_accessor = global_locker_.Access(); - return locker_accessor.IsPathLocked(config_.durability.storage_directory); -} - -utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::LockPath() { - auto locker_accessor = global_locker_.Access(); - return locker_accessor.AddPath(config_.durability.storage_directory); -} - -utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() { - { - auto locker_accessor = global_locker_.Access(); - const auto ret = locker_accessor.RemovePath(config_.durability.storage_directory); - if (ret.HasError() || !ret.GetValue()) { - // Exit without cleaning the queue - return ret; - } - } - // We use locker accessor in seperate scope so we don't produce deadlock - // after we call clean queue. - file_retainer_.CleanQueue(); - return true; -} - } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index b45fce0ef..100b9ab8f 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -11,6 +11,7 @@ #pragma once +#include <cstddef> #include "storage/v2/inmemory/label_index.hpp" #include "storage/v2/inmemory/label_property_index.hpp" #include "storage/v2/storage.hpp" @@ -25,20 +26,19 @@ namespace memgraph::storage { +class ReplicationServer; +class ReplicationClient; + // The storage is based on this paper: // https://db.in.tum.de/~muehlbau/papers/mvcc.pdf // The paper implements a fully serializable storage, in our implementation we // only implement snapshot isolation for transactions. class InMemoryStorage final : public Storage { - public: - enum class RegisterReplicaError : uint8_t { - NAME_EXISTS, - END_POINT_EXISTS, - CONNECTION_FAILED, - COULD_NOT_BE_PERSISTED - }; + friend class ReplicationServer; + friend class ReplicationClient; + public: struct TimestampInfo { uint64_t current_timestamp_of_replica; uint64_t current_number_of_timestamp_behind_master; @@ -69,7 +69,7 @@ class InMemoryStorage final : public Storage { ~InMemoryStorage() override; - class InMemoryAccessor final : public Storage::Accessor { + class InMemoryAccessor : public Storage::Accessor { private: friend class InMemoryStorage; @@ -268,16 +268,33 @@ class InMemoryStorage final : public Storage { void FinalizeTransaction() override; - private: + protected: + // TODO Better naming /// @throw std::bad_alloc - VertexAccessor CreateVertex(storage::Gid gid); - + VertexAccessor CreateVertexEx(storage::Gid gid); /// @throw std::bad_alloc - Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid); + Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid); Config::Items config_; }; + class ReplicationAccessor final : public InMemoryAccessor { + public: + explicit ReplicationAccessor(InMemoryAccessor &&inmem) : InMemoryAccessor(std::move(inmem)) {} + + /// @throw std::bad_alloc + VertexAccessor CreateVertexEx(storage::Gid gid) { return InMemoryAccessor::CreateVertexEx(gid); } + + /// @throw std::bad_alloc + Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, + storage::Gid gid) { + return InMemoryAccessor::CreateEdgeEx(from, to, edge_type, gid); + } + + const Transaction &GetTransaction() const { return transaction_; } + Transaction &GetTransaction() { return transaction_; } + }; + std::unique_ptr<Storage::Accessor> Access(std::optional<IsolationLevel> override_isolation_level) override { return std::unique_ptr<InMemoryAccessor>( new InMemoryAccessor{this, override_isolation_level.value_or(isolation_level_), storage_mode_}); @@ -359,24 +376,32 @@ class InMemoryStorage final : public Storage { utils::BasicResult<StorageUniqueConstraintDroppingError, UniqueConstraints::DeletionStatus> DropUniqueConstraint( LabelId label, const std::set<PropertyId> &properties, std::optional<uint64_t> desired_commit_timestamp) override; - bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config); + bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { + return replication_state_.SetReplicaRole(std::move(endpoint), config, this); + } - bool SetMainReplicationRole(); + bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); } /// @pre The instance should have a MAIN role /// @pre Timeout can only be set for SYNC replication - utils::BasicResult<RegisterReplicaError, void> RegisterReplica(std::string name, io::network::Endpoint endpoint, - replication::ReplicationMode replication_mode, - replication::RegistrationMode registration_mode, - const replication::ReplicationClientConfig &config); + auto RegisterReplica(std::string name, io::network::Endpoint endpoint, + const replication::ReplicationMode replication_mode, + const replication::RegistrationMode registration_mode, + const replication::ReplicationClientConfig &config) { + return replication_state_.RegisterReplica(std::move(name), std::move(endpoint), replication_mode, registration_mode, + config, this); + } + /// @pre The instance should have a MAIN role - bool UnregisterReplica(const std::string &name); + bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); } - std::optional<replication::ReplicaState> GetReplicaState(std::string_view name); + replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); } - replication::ReplicationRole GetReplicationRole() const; + auto ReplicasInfo() { return replication_state_.ReplicasInfo(); } - std::vector<ReplicaInfo> ReplicasInfo(); + std::optional<replication::ReplicaState> GetReplicaState(std::string_view name) { + return replication_state_.GetReplicaState(name); + } void FreeMemory(std::unique_lock<utils::RWLock> main_guard) override; @@ -416,11 +441,11 @@ class InMemoryStorage final : public Storage { uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); - void RestoreReplicas(); + void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); } - void RestoreReplicationRole(); + void RestoreReplicas() { return replication_state_.RestoreReplicas(this); } - bool ShouldStoreAndRestoreReplicationState() const; + void EstablishNewEpoch() override; // Main object storage utils::SkipList<storage::Vertex> vertices_; @@ -430,7 +455,6 @@ class InMemoryStorage final : public Storage { std::filesystem::path snapshot_directory_; std::filesystem::path lock_file_path_; utils::OutputFile lock_file_handle_; - std::unique_ptr<kvstore::KVStore> storage_; std::filesystem::path wal_directory_; utils::Scheduler snapshot_runner_; @@ -455,7 +479,10 @@ class InMemoryStorage final : public Storage { // and register it on S1. // Without using the epoch_id, we don't know that S1 and S2 have completely // different transactions, we think that the S2 is behind only by 5 commits. - std::string epoch_id_; + std::string epoch_id_; // TODO: Move to replication level + // Questions: + // - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id) + // - multi-tenant durability <- databases/.durability (there is a list of all active tenants) // History of the previous epoch ids. // Each value consists of the epoch id along the last commit belonging to that // epoch. @@ -499,24 +526,80 @@ class InMemoryStorage final : public Storage { std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId}; - class ReplicationServer; - std::unique_ptr<ReplicationServer> replication_server_{nullptr}; + public: + struct ReplicationState { + enum class RegisterReplicaError : uint8_t { + NAME_EXISTS, + END_POINT_EXISTS, + CONNECTION_FAILED, + COULD_NOT_BE_PERSISTED + }; - class ReplicationClient; - // We create ReplicationClient using unique_ptr so we can move - // newly created client into the vector. - // We cannot move the client directly because it contains ThreadPool - // which cannot be moved. Also, the move is necessary because - // we don't want to create the client directly inside the vector - // because that would require the lock on the list putting all - // commits (they iterate list of clients) to halt. - // This way we can initialize client in main thread which means - // that we can immediately notify the user if the initialization - // failed. - using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>; - ReplicationClientList replication_clients_; + // TODO Move to private (needed for Storage construction) + std::unique_ptr<kvstore::KVStore> durability_; - std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN}; + // Generic API + void Reset(); + // TODO: Just check if server exists -> you are REPLICA + replication::ReplicationRole GetRole() const { return replication_role_.load(); } + + bool SetMainReplicationRole(Storage *storage); // Set the instance to MAIN + // TODO: ReplicationServer/Client uses InMemoryStorage* for RPC callbacks + bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config, + InMemoryStorage *storage); // Sets the instance to REPLICA + // Generic restoration + void RestoreReplicationRole(InMemoryStorage *storage); + + // MAIN actually doing the replication + bool AppendToWalDataDefinition(uint64_t seq_num, durability::StorageGlobalOperation operation, LabelId label, + const std::set<PropertyId> &properties, uint64_t final_commit_timestamp); + void InitializeTransaction(uint64_t seq_num); + void AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp); + void AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp); + bool FinalizeTransaction(uint64_t timestamp); + + // MAIN connecting to replicas + utils::BasicResult<RegisterReplicaError> RegisterReplica(std::string name, io::network::Endpoint endpoint, + replication::ReplicationMode replication_mode, + replication::RegistrationMode registration_mode, + const replication::ReplicationClientConfig &config, + InMemoryStorage *storage); + bool UnregisterReplica(std::string_view name); + + // MAIN reconnecting to replicas + void RestoreReplicas(InMemoryStorage *storage); + + // MAIN getting info from replicas + // TODO make into const (problem with SpinLock and WithReadLock) + std::optional<replication::ReplicaState> GetReplicaState(std::string_view name); + std::vector<InMemoryStorage::ReplicaInfo> ReplicasInfo(); + + private: + bool ShouldStoreAndRestoreReplicationState() const { return nullptr != durability_; } + + void SetRole(replication::ReplicationRole role) { return replication_role_.store(role); } + + // NOTE: Server is not in MAIN it is in REPLICA + std::unique_ptr<ReplicationServer> replication_server_{nullptr}; + + // We create ReplicationClient using unique_ptr so we can move + // newly created client into the vector. + // We cannot move the client directly because it contains ThreadPool + // which cannot be moved. Also, the move is necessary because + // we don't want to create the client directly inside the vector + // because that would require the lock on the list putting all + // commits (they iterate list of clients) to halt. + // This way we can initialize client in main thread which means + // that we can immediately notify the user if the initialization + // failed. + using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>; + ReplicationClientList replication_clients_; + + std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN}; + }; + + private: + ReplicationState replication_state_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index c36bf5caf..f939ad7ed 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -30,10 +30,9 @@ template <typename> } // namespace ////// ReplicationClient ////// -InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemoryStorage *storage, - const io::network::Endpoint &endpoint, - const replication::ReplicationMode mode, - const replication::ReplicationClientConfig &config) +ReplicationClient::ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint, + const replication::ReplicationMode mode, + const replication::ReplicationClientConfig &config) : name_(std::move(name)), storage_(storage), mode_(mode) { spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); if (config.ssl) { @@ -42,7 +41,6 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory rpc_context_.emplace(); } - rpc_client_.emplace(endpoint, &*rpc_context_); TryInitializeClientSync(); @@ -52,14 +50,14 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory } } -void InMemoryStorage::ReplicationClient::TryInitializeClientAsync() { +void ReplicationClient::TryInitializeClientAsync() { thread_pool_.AddTask([this] { rpc_client_->Abort(); this->TryInitializeClientSync(); }); } -void InMemoryStorage::ReplicationClient::FrequentCheck() { +void ReplicationClient::FrequentCheck() { const auto is_success = std::invoke([this]() { try { auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()}; @@ -85,7 +83,7 @@ void InMemoryStorage::ReplicationClient::FrequentCheck() { } /// @throws rpc::RpcFailedException -void InMemoryStorage::ReplicationClient::InitializeClient() { +void ReplicationClient::InitializeClient() { uint64_t current_commit_timestamp{kTimestampInitialId}; std::optional<std::string> epoch_id; @@ -137,7 +135,7 @@ void InMemoryStorage::ReplicationClient::InitializeClient() { } } -void InMemoryStorage::ReplicationClient::TryInitializeClientSync() { +void ReplicationClient::TryInitializeClientSync() { try { InitializeClient(); } catch (const rpc::RpcFailedException &) { @@ -148,20 +146,19 @@ void InMemoryStorage::ReplicationClient::TryInitializeClientSync() { } } -void InMemoryStorage::ReplicationClient::HandleRpcFailure() { +void ReplicationClient::HandleRpcFailure() { spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication")); TryInitializeClientAsync(); } -replication::SnapshotRes InMemoryStorage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) { +replication::SnapshotRes ReplicationClient::TransferSnapshot(const std::filesystem::path &path) { auto stream{rpc_client_->Stream<replication::SnapshotRpc>()}; replication::Encoder encoder(stream.GetBuilder()); encoder.WriteFile(path); return stream.AwaitResponse(); } -replication::WalFilesRes InMemoryStorage::ReplicationClient::TransferWalFiles( - const std::vector<std::filesystem::path> &wal_files) { +replication::WalFilesRes ReplicationClient::TransferWalFiles(const std::vector<std::filesystem::path> &wal_files) { MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); auto stream{rpc_client_->Stream<replication::WalFilesRpc>(wal_files.size())}; replication::Encoder encoder(stream.GetBuilder()); @@ -173,7 +170,7 @@ replication::WalFilesRes InMemoryStorage::ReplicationClient::TransferWalFiles( return stream.AwaitResponse(); } -void InMemoryStorage::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) { +void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) { std::unique_lock guard(client_lock_); const auto status = replica_state_.load(); switch (status) { @@ -207,8 +204,7 @@ void InMemoryStorage::ReplicationClient::StartTransactionReplication(const uint6 } } -void InMemoryStorage::ReplicationClient::IfStreamingTransaction( - const std::function<void(ReplicaStream &handler)> &callback) { +void ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback) { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption // that this and other transaction replication functions can only be @@ -228,7 +224,7 @@ void InMemoryStorage::ReplicationClient::IfStreamingTransaction( } } -bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplication() { +bool ReplicationClient::FinalizeTransactionReplication() { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption // that this and other transaction replication functions can only be @@ -245,7 +241,7 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplication() { } } -bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal() { +bool ReplicationClient::FinalizeTransactionReplicationInternal() { MG_ASSERT(replica_stream_, "Missing stream for transaction deltas"); try { auto response = replica_stream_->Finalize(); @@ -269,7 +265,7 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal( return false; } -void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { +void ReplicationClient::RecoverReplica(uint64_t replica_commit) { spdlog::debug("Starting replica recover"); while (true) { auto file_locker = storage_->file_retainer_.AddLocker(); @@ -337,7 +333,7 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) } } -uint64_t InMemoryStorage::ReplicationClient::ReplicateCurrentWal() { +uint64_t ReplicationClient::ReplicateCurrentWal() { const auto &wal_file = storage_->wal_file_; auto stream = TransferCurrentWalFile(); stream.AppendFilename(wal_file->Path().filename()); @@ -371,7 +367,7 @@ uint64_t InMemoryStorage::ReplicationClient::ReplicateCurrentWal() { /// recovery steps, so we can safely send it to the replica. /// We assume that the property of preserving at least 1 WAL before the snapshot /// is satisfied as we extract the timestamp information from it. -std::vector<InMemoryStorage::ReplicationClient::RecoveryStep> InMemoryStorage::ReplicationClient::GetRecoverySteps( +std::vector<ReplicationClient::RecoveryStep> ReplicationClient::GetRecoverySteps( const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) { // First check if we can recover using the current wal file only // otherwise save the seq_num of the current wal file @@ -517,7 +513,7 @@ std::vector<InMemoryStorage::ReplicationClient::RecoveryStep> InMemoryStorage::R return recovery_steps; } -InMemoryStorage::TimestampInfo InMemoryStorage::ReplicationClient::GetTimestampInfo() { +InMemoryStorage::TimestampInfo ReplicationClient::GetTimestampInfo() { InMemoryStorage::TimestampInfo info; info.current_timestamp_of_replica = 0; info.current_number_of_timestamp_behind_master = 0; @@ -546,71 +542,63 @@ InMemoryStorage::TimestampInfo InMemoryStorage::ReplicationClient::GetTimestampI } ////// ReplicaStream ////// -InMemoryStorage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self, - const uint64_t previous_commit_timestamp, - const uint64_t current_seq_num) +ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_commit_timestamp, + const uint64_t current_seq_num) : self_(self), stream_(self_->rpc_client_->Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) { replication::Encoder encoder{stream_.GetBuilder()}; encoder.WriteString(self_->storage_->epoch_id_); } -void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, - uint64_t final_commit_timestamp) { +void ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, + uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); EncodeDelta(&encoder, self_->storage_->name_id_mapper_.get(), self_->storage_->config_.items, delta, vertex, final_commit_timestamp); } -void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge, - uint64_t final_commit_timestamp) { +void ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge, + uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); EncodeDelta(&encoder, self_->storage_->name_id_mapper_.get(), delta, edge, final_commit_timestamp); } -void InMemoryStorage::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) { +void ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); EncodeTransactionEnd(&encoder, final_commit_timestamp); } -void InMemoryStorage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation, - LabelId label, - const std::set<PropertyId> &properties, - uint64_t timestamp) { +void ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation, LabelId label, + const std::set<PropertyId> &properties, uint64_t timestamp) { replication::Encoder encoder(stream_.GetBuilder()); EncodeOperation(&encoder, self_->storage_->name_id_mapper_.get(), operation, label, properties, timestamp); } -replication::AppendDeltasRes InMemoryStorage::ReplicationClient::ReplicaStream::Finalize() { - return stream_.AwaitResponse(); -} +replication::AppendDeltasRes ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); } ////// CurrentWalHandler ////// -InMemoryStorage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) +ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) : self_(self), stream_(self_->rpc_client_->Stream<replication::CurrentWalRpc>()) {} -void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) { +void ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) { replication::Encoder encoder(stream_.GetBuilder()); encoder.WriteString(filename); } -void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) { +void ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) { replication::Encoder encoder(stream_.GetBuilder()); encoder.WriteUint(size); } -void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) { +void ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) { replication::Encoder encoder(stream_.GetBuilder()); encoder.WriteFileData(file); } -void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, - const size_t buffer_size) { +void ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { replication::Encoder encoder(stream_.GetBuilder()); encoder.WriteBuffer(buffer, buffer_size); } -replication::CurrentWalRes InMemoryStorage::ReplicationClient::CurrentWalHandler::Finalize() { - return stream_.AwaitResponse(); -} +replication::CurrentWalRes ReplicationClient::CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 2ae88b91d..6babc57c1 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -37,7 +37,7 @@ namespace memgraph::storage { -class InMemoryStorage::ReplicationClient { +class ReplicationClient { public: ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint, replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {}); diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index bb3893251..7549eb69b 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -44,8 +44,8 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder }; } // namespace -InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint, - const replication::ReplicationServerConfig &config) +ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config) : storage_(storage), endpoint_(endpoint) { // Create RPC server. if (config.ssl) { @@ -92,21 +92,21 @@ InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, rpc_server_->Start(); } -void InMemoryStorage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::HeartbeatReq req; slk::Load(&req, req_reader); replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_}; slk::Save(res, res_builder); } -void InMemoryStorage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::FrequentHeartbeatReq req; slk::Load(&req, req_reader); replication::FrequentHeartbeatRes res{true}; slk::Save(res, res_builder); } -void InMemoryStorage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::AppendDeltasReq req; slk::Load(&req, req_reader); @@ -155,7 +155,7 @@ void InMemoryStorage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_re spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!"); } -void InMemoryStorage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::SnapshotReq req; slk::Load(&req, req_reader); @@ -228,7 +228,7 @@ void InMemoryStorage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader spdlog::debug("Replication recovery from snapshot finished!"); } -void InMemoryStorage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::WalFilesReq req; slk::Load(&req, req_reader); @@ -248,7 +248,7 @@ void InMemoryStorage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!"); } -void InMemoryStorage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::CurrentWalReq req; slk::Load(&req, req_reader); @@ -263,7 +263,7 @@ void InMemoryStorage::ReplicationServer::CurrentWalHandler(slk::Reader *req_read spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!"); } -void InMemoryStorage::ReplicationServer::LoadWal(replication::Decoder *decoder) { +void ReplicationServer::LoadWal(replication::Decoder *decoder) { const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory; utils::EnsureDir(temp_wal_directory); auto maybe_wal_path = decoder->ReadFile(temp_wal_directory); @@ -308,7 +308,7 @@ void InMemoryStorage::ReplicationServer::LoadWal(replication::Decoder *decoder) } } -void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) { +void ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::TimestampReq req; slk::Load(&req, req_reader); @@ -316,30 +316,28 @@ void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reade slk::Save(res, res_builder); } -InMemoryStorage::ReplicationServer::~ReplicationServer() { +ReplicationServer::~ReplicationServer() { if (rpc_server_) { spdlog::trace("Closing replication server on {}:{}", endpoint_.address, endpoint_.port); rpc_server_->Shutdown(); rpc_server_->AwaitShutdown(); } } -uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) { +uint64_t ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) { auto edge_acc = storage_->edges_.access(); auto vertex_acc = storage_->vertices_.access(); - std::optional<std::pair<uint64_t, std::unique_ptr<storage::Storage::Accessor>>> commit_timestamp_and_accessor; + std::optional<std::pair<uint64_t, storage::InMemoryStorage::ReplicationAccessor>> commit_timestamp_and_accessor; auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) { if (!commit_timestamp_and_accessor) { - commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access(std::optional<IsolationLevel>{})); + auto acc = storage_->Access(std::nullopt); + auto inmem_acc = std::unique_ptr<storage::InMemoryStorage::InMemoryAccessor>( + static_cast<storage::InMemoryStorage::InMemoryAccessor *>(acc.release())); + commit_timestamp_and_accessor.emplace(commit_timestamp, std::move(*inmem_acc)); } else if (commit_timestamp_and_accessor->first != commit_timestamp) { throw utils::BasicException("Received more than one transaction!"); } - // TODO: Rethink this if we would reuse ReplicationServer for on disk storage. - if (auto *inmemoryAcc = - static_cast<storage::InMemoryStorage::InMemoryAccessor *>(commit_timestamp_and_accessor->second.get())) { - return inmemoryAcc; - } - throw utils::BasicException("Received transaction for not supported storage!"); + return &commit_timestamp_and_accessor->second; }; uint64_t applied_deltas = 0; @@ -362,7 +360,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD case durability::WalDeltaData::Type::VERTEX_CREATE: { spdlog::trace(" Create vertex {}", delta.vertex_create_delete.gid.AsUint()); auto transaction = get_transaction(timestamp); - transaction->CreateVertex(delta.vertex_create_delete.gid); + transaction->CreateVertexEx(delta.vertex_create_delete.gid); break; } case durability::WalDeltaData::Type::VERTEX_DELETE: { @@ -414,9 +412,9 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD if (!from_vertex) throw utils::BasicException("Invalid transaction!"); auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, storage::View::NEW); if (!to_vertex) throw utils::BasicException("Invalid transaction!"); - auto edge = transaction->CreateEdge(&*from_vertex, &*to_vertex, - transaction->NameToEdgeType(delta.edge_create_delete.edge_type), - delta.edge_create_delete.gid); + auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex, + transaction->NameToEdgeType(delta.edge_create_delete.edge_type), + delta.edge_create_delete.gid); if (edge.HasError()) throw utils::BasicException("Invalid transaction!"); break; } @@ -463,7 +461,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD is_visible = !edge->deleted; delta = edge->delta; } - ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW, [&is_visible](const Delta &delta) { + ApplyDeltasForRead(&transaction->GetTransaction(), delta, View::NEW, [&is_visible](const Delta &delta) { switch (delta.action) { case Delta::Action::ADD_LABEL: case Delta::Action::REMOVE_LABEL: @@ -496,7 +494,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD EdgeTypeId::FromUint(0UL), nullptr, nullptr, - &transaction->transaction_, + &transaction->GetTransaction(), &storage_->indices_, &storage_->constraints_, storage_->config_.items}; @@ -511,7 +509,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD spdlog::trace(" Transaction end"); if (!commit_timestamp_and_accessor || commit_timestamp_and_accessor->first != timestamp) throw utils::BasicException("Invalid data!"); - auto ret = commit_timestamp_and_accessor->second->Commit(commit_timestamp_and_accessor->first); + auto ret = commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first); if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); commit_timestamp_and_accessor = std::nullopt; break; diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index 3bd027995..937c20f00 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -18,7 +18,7 @@ namespace memgraph::storage { -class InMemoryStorage::ReplicationServer { +class ReplicationServer { public: explicit ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config); diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 0ff286d15..8d33c3eb3 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -277,6 +277,8 @@ class Storage { virtual Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) = 0; + virtual void EstablishNewEpoch() = 0; + // Main storage lock. // Accessors take a shared lock when starting, so it is possible to block // creation of new accessors by taking a unique lock. This is used when doing diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index ed076e0bf..52c81950a 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -747,12 +747,12 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { .HasError()); const std::string replica2_name{replicas[0]}; - ASSERT_TRUE(main_mem_store - ->RegisterReplica(replica2_name, replica2_endpoint, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::NAME_EXISTS); + ASSERT_TRUE( + main_mem_store + ->RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, + memgraph::storage::replication::ReplicationClientConfig{}) + .GetError() == memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::NAME_EXISTS); } TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { @@ -779,12 +779,12 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { .HasError()); const std::string replica2_name{replicas[1]}; - ASSERT_TRUE(main_mem_store - ->RegisterReplica(replica2_name, replica2_endpoint, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::END_POINT_EXISTS); + ASSERT_TRUE( + main_mem_store + ->RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, + memgraph::storage::replication::ReplicationClientConfig{}) + .GetError() == memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::END_POINT_EXISTS); } TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { @@ -907,5 +907,6 @@ TEST_F(ReplicationTest, AddingInvalidReplica) { memgraph::storage::replication::ReplicationMode::SYNC, memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::CONNECTION_FAILED); + .GetError() == + memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::CONNECTION_FAILED); }