From d71b6a50071862df1aeae6b0dc62d70631c04ca4 Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Fri, 29 Sep 2023 11:21:42 +0100 Subject: [PATCH] Refactor replication client/server (#1311) --- src/query/interpreter.cpp | 18 +- src/storage/v2/disk/storage.hpp | 5 +- .../replication/replication_client.cpp | 5 +- .../replication/replication_client.hpp | 3 +- .../replication/replication_server.cpp | 4 +- .../replication/replication_server.hpp | 3 +- src/storage/v2/inmemory/storage.cpp | 10 +- src/storage/v2/inmemory/storage.hpp | 5 +- src/storage/v2/replication/config.hpp | 11 +- src/storage/v2/replication/replication.cpp | 135 ++-- src/storage/v2/replication/replication.hpp | 17 +- .../v2/replication/replication_client.cpp | 10 +- .../v2/replication/replication_client.hpp | 3 +- .../v2/replication/replication_server.cpp | 5 +- .../v2/replication/replication_server.hpp | 2 +- src/storage/v2/storage.hpp | 18 +- tests/unit/storage_v2_replication.cpp | 732 ++++++++++-------- 17 files changed, 514 insertions(+), 472 deletions(-) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index b8e980cb7..a4f3ce5b8 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -238,9 +238,10 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { if (!port || *port < 0 || *port > std::numeric_limits::max()) { throw QueryRuntimeException("Port number invalid!"); } - if (!db_->SetReplicaRole( - io::network::Endpoint(storage::replication::kDefaultReplicationServerIp, static_cast(*port)), - storage::replication::ReplicationServerConfig{})) { + if (!db_->SetReplicaRole(storage::replication::ReplicationServerConfig{ + .ip_address = storage::replication::kDefaultReplicationServerIp, + .port = static_cast(*port), + })) { throw QueryRuntimeException("Couldn't set role to replica!"); } } @@ -286,9 +287,14 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort); if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; - auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, - storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - {.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}); + auto ret = db_->RegisterReplica( + storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, + storage::replication::ReplicationClientConfig{.name = name, + .mode = repl_mode, + .ip_address = ip, + .port = port, + .replica_check_frequency = replica_check_frequency, + .ssl = std::nullopt}); if (ret.HasError()) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); } diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index a3e3d4e62..73a389687 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -375,13 +375,12 @@ class DiskStorage final : public Storage { EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE}; std::unique_ptr edge_import_mode_cache_{nullptr}; - auto CreateReplicationClient(std::string name, io::network::Endpoint endpoint, replication::ReplicationMode mode, - const replication::ReplicationClientConfig &config) + auto CreateReplicationClient(replication::ReplicationClientConfig const &config) -> std::unique_ptr override { throw utils::BasicException("Disk storage mode does not support replication."); } - auto CreateReplicationServer(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) + auto CreateReplicationServer(const replication::ReplicationServerConfig &config) -> std::unique_ptr override { throw utils::BasicException("Disk storage mode does not support replication."); } diff --git a/src/storage/v2/inmemory/replication/replication_client.cpp b/src/storage/v2/inmemory/replication/replication_client.cpp index fc368d31c..bfd272a9b 100644 --- a/src/storage/v2/inmemory/replication/replication_client.cpp +++ b/src/storage/v2/inmemory/replication/replication_client.cpp @@ -102,10 +102,9 @@ uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile cons ////// ReplicationClient ////// -InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, std::string name, - io::network::Endpoint endpoint, replication::ReplicationMode mode, +InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, const replication::ReplicationClientConfig &config) - : ReplicationClient{storage, std::move(name), std::move(endpoint), mode, config} {} + : ReplicationClient{storage, config} {} void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) { spdlog::debug("Starting replica recover"); diff --git a/src/storage/v2/inmemory/replication/replication_client.hpp b/src/storage/v2/inmemory/replication/replication_client.hpp index de939835b..94f47cb48 100644 --- a/src/storage/v2/inmemory/replication/replication_client.hpp +++ b/src/storage/v2/inmemory/replication/replication_client.hpp @@ -18,8 +18,7 @@ class InMemoryStorage; class InMemoryReplicationClient : public ReplicationClient { public: - InMemoryReplicationClient(InMemoryStorage *storage, std::string name, io::network::Endpoint endpoint, - replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {}); + InMemoryReplicationClient(InMemoryStorage *storage, const replication::ReplicationClientConfig &config); protected: void RecoverReplica(uint64_t replica_commit) override; diff --git a/src/storage/v2/inmemory/replication/replication_server.cpp b/src/storage/v2/inmemory/replication/replication_server.cpp index 95b818f9f..0104ed7bc 100644 --- a/src/storage/v2/inmemory/replication/replication_server.cpp +++ b/src/storage/v2/inmemory/replication/replication_server.cpp @@ -32,9 +32,9 @@ std::pair ReadDelta(durability::BaseDecoder }; } // namespace -InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage, memgraph::io::network::Endpoint endpoint, +InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage, const replication::ReplicationServerConfig &config) - : ReplicationServer{std::move(endpoint), config}, storage_(storage) { + : ReplicationServer{config}, storage_(storage) { rpc_server_.Register([this](auto *req_reader, auto *res_builder) { spdlog::debug("Received HeartbeatRpc"); this->HeartbeatHandler(req_reader, res_builder); diff --git a/src/storage/v2/inmemory/replication/replication_server.hpp b/src/storage/v2/inmemory/replication/replication_server.hpp index 76dcf4a9c..1f6f380af 100644 --- a/src/storage/v2/inmemory/replication/replication_server.hpp +++ b/src/storage/v2/inmemory/replication/replication_server.hpp @@ -20,8 +20,7 @@ class InMemoryStorage; class InMemoryReplicationServer : public ReplicationServer { public: - explicit InMemoryReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint, - const replication::ReplicationServerConfig &config); + explicit InMemoryReplicationServer(InMemoryStorage *storage, const replication::ReplicationServerConfig &config); private: // RPC handlers diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 007cdf36f..55d0b0b47 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -1832,16 +1832,14 @@ utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() return true; } -auto InMemoryStorage::CreateReplicationClient(std::string name, io::network::Endpoint endpoint, - replication::ReplicationMode mode, - replication::ReplicationClientConfig const &config) +auto InMemoryStorage::CreateReplicationClient(replication::ReplicationClientConfig const &config) -> std::unique_ptr { - return std::make_unique(this, std::move(name), std::move(endpoint), mode, config); + return std::make_unique(this, config); } std::unique_ptr InMemoryStorage::CreateReplicationServer( - io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { - return std::make_unique(this, std::move(endpoint), config); + const replication::ReplicationServerConfig &config) { + return std::make_unique(this, config); } } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 301c8b139..136f8748b 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -371,11 +371,10 @@ class InMemoryStorage final : public Storage { Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) override; - auto CreateReplicationClient(std::string name, io::network::Endpoint endpoint, replication::ReplicationMode mode, - replication::ReplicationClientConfig const &config) + auto CreateReplicationClient(replication::ReplicationClientConfig const &config) -> std::unique_ptr override; - auto CreateReplicationServer(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) + auto CreateReplicationServer(const replication::ReplicationServerConfig &config) -> std::unique_ptr override; private: diff --git a/src/storage/v2/replication/config.hpp b/src/storage/v2/replication/config.hpp index b418a60d4..d96e1ee76 100644 --- a/src/storage/v2/replication/config.hpp +++ b/src/storage/v2/replication/config.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -15,8 +15,15 @@ #include #include +#include "storage/v2/replication/enums.hpp" + namespace memgraph::storage::replication { struct ReplicationClientConfig { + std::string name; + ReplicationMode mode; + std::string ip_address; + uint16_t port; + // The default delay between main checking/pinging replicas is 1s because // that seems like a reasonable timeframe in which main should notice a // replica is down. @@ -33,6 +40,8 @@ struct ReplicationClientConfig { }; struct ReplicationServerConfig { + std::string ip_address; + uint16_t port; struct SSL { std::string key_file; std::string cert_file; diff --git a/src/storage/v2/replication/replication.cpp b/src/storage/v2/replication/replication.cpp index 66431a3f4..59830a9ec 100644 --- a/src/storage/v2/replication/replication.cpp +++ b/src/storage/v2/replication/replication.cpp @@ -23,8 +23,8 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; namespace { -std::string RegisterReplicaErrorToString(ReplicationState::RegisterReplicaError error) { - using enum ReplicationState::RegisterReplicaError; +std::string RegisterReplicaErrorToString(RegisterReplicaError error) { + using enum RegisterReplicaError; switch (error) { case NAME_EXISTS: return "NAME_EXISTS"; @@ -147,84 +147,75 @@ bool storage::ReplicationState::FinalizeTransaction(uint64_t timestamp) { return finalized_on_all_replicas; } -utils::BasicResult ReplicationState::RegisterReplica( - std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, +utils::BasicResult ReplicationState::RegisterReplica( const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config, Storage *storage) { MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!"); - const bool name_exists = replication_clients_.WithLock([&](auto &clients) { - return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; }); - }); + auto name_check = [&config](auto &clients) { + auto name_matches = [&name = config.name](const auto &client) { return client->Name() == name; }; + return std::any_of(clients.begin(), clients.end(), name_matches); + }; - if (name_exists) { - return RegisterReplicaError::NAME_EXISTS; - } + auto desired_endpoint = io::network::Endpoint{config.ip_address, config.port}; + auto endpoint_check = [&](auto &clients) { + auto endpoint_matches = [&](const auto &client) { return client->Endpoint() == desired_endpoint; }; + return std::any_of(clients.begin(), clients.end(), endpoint_matches); + }; - const auto end_point_exists = replication_clients_.WithLock([&endpoint](auto &clients) { - return std::any_of(clients.begin(), clients.end(), - [&endpoint](const auto &client) { return client->Endpoint() == endpoint; }); - }); + auto task = [&](auto &clients) -> utils::BasicResult { + if (name_check(clients)) { + return RegisterReplicaError::NAME_EXISTS; + } - if (end_point_exists) { - return RegisterReplicaError::END_POINT_EXISTS; - } + if (endpoint_check(clients)) { + return RegisterReplicaError::END_POINT_EXISTS; + } - if (ShouldStoreAndRestoreReplicationState()) { - auto data = replication::ReplicationStatusToJSON( - replication::ReplicationStatus{.name = name, - .ip_address = endpoint.address, - .port = endpoint.port, - .sync_mode = replication_mode, - .replica_check_frequency = config.replica_check_frequency, - .ssl = config.ssl, - .role = replication::ReplicationRole::REPLICA}); - if (!durability_->Put(name, data.dump())) { - spdlog::error("Error when saving replica {} in settings.", name); + if (!TryPersistReplicaClient(config)) { return RegisterReplicaError::COULD_NOT_BE_PERSISTED; } - } - auto client = storage->CreateReplicationClient(std::move(name), std::move(endpoint), replication_mode, config); - client->Start(); + auto client = storage->CreateReplicationClient(config); + client->Start(); - if (client->State() == replication::ReplicaState::INVALID) { - if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) { - return RegisterReplicaError::CONNECTION_FAILED; + if (client->State() == replication::ReplicaState::INVALID) { + if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) { + return RegisterReplicaError::CONNECTION_FAILED; + } + + spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); } - spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); - } + clients.push_back(std::move(client)); + return {}; + }; - return replication_clients_.WithLock( - [&](auto &clients) -> utils::BasicResult { - // Another thread could have added a client with same name while - // we were connecting to this client. - if (std::any_of(clients.begin(), clients.end(), - [&](const auto &other_client) { return client->Name() == other_client->Name(); })) { - return RegisterReplicaError::NAME_EXISTS; - } - - if (std::any_of(clients.begin(), clients.end(), [&client](const auto &other_client) { - return client->Endpoint() == other_client->Endpoint(); - })) { - return RegisterReplicaError::END_POINT_EXISTS; - } - - clients.push_back(std::move(client)); - return {}; - }); + return replication_clients_.WithLock(task); } -bool ReplicationState::SetReplicaRole(io::network::Endpoint endpoint, - const replication::ReplicationServerConfig &config, Storage *storage) { +bool ReplicationState::TryPersistReplicaClient(const replication::ReplicationClientConfig &config) { + if (!ShouldStoreAndRestoreReplicationState()) return true; + auto data = replication::ReplicationStatusToJSON( + replication::ReplicationStatus{.name = config.name, + .ip_address = config.ip_address, + .port = config.port, + .sync_mode = config.mode, + .replica_check_frequency = config.replica_check_frequency, + .ssl = config.ssl, + .role = replication::ReplicationRole::REPLICA}); + if (durability_->Put(config.name, data.dump())) return true; + spdlog::error("Error when saving replica {} in settings.", config.name); + return false; +} + +bool ReplicationState::SetReplicaRole(const replication::ReplicationServerConfig &config, Storage *storage) { // We don't want to restart the server if we're already a REPLICA if (GetRole() == replication::ReplicationRole::REPLICA) { return false; } - auto port = endpoint.port; // assigning because we will move the endpoint - replication_server_ = storage->CreateReplicationServer(std::move(endpoint), config); + replication_server_ = storage->CreateReplicationServer(config); bool res = replication_server_->Start(); if (!res) { spdlog::error("Unable to start the replication server."); @@ -235,8 +226,8 @@ bool ReplicationState::SetReplicaRole(io::network::Endpoint endpoint, // Only thing that matters here is the role saved as REPLICA and the listening port auto data = replication::ReplicationStatusToJSON( replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, - .ip_address = "", - .port = port, + .ip_address = config.ip_address, + .port = config.port, .sync_mode = replication::ReplicationMode::SYNC, .replica_check_frequency = std::chrono::seconds(0), .ssl = std::nullopt, @@ -318,8 +309,10 @@ void ReplicationState::RestoreReplicationRole(Storage *storage) { } if (GetRole() == replication::ReplicationRole::REPLICA) { - io::network::Endpoint endpoint(replication::kDefaultReplicationServerIp, port); - replication_server_ = storage->CreateReplicationServer(std::move(endpoint), {}); + replication_server_ = storage->CreateReplicationServer(replication::ReplicationServerConfig{ + .ip_address = replication::kDefaultReplicationServerIp, + .port = port, + }); bool res = replication_server_->Start(); if (!res) { LOG_FATAL("Unable to start the replication server."); @@ -352,14 +345,16 @@ void ReplicationState::RestoreReplicas(Storage *storage) { continue; } - auto ret = - RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port}, - replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID, - { - .replica_check_frequency = replica_status.replica_check_frequency, - .ssl = replica_status.ssl, - }, - storage); + auto ret = RegisterReplica(replication::RegistrationMode::CAN_BE_INVALID, + replication::ReplicationClientConfig{ + .name = replica_status.name, + .mode = replica_status.sync_mode, + .ip_address = replica_status.ip_address, + .port = replica_status.port, + .replica_check_frequency = replica_status.replica_check_frequency, + .ssl = replica_status.ssl, + }, + storage); if (ret.HasError()) { MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); diff --git a/src/storage/v2/replication/replication.hpp b/src/storage/v2/replication/replication.hpp index 18306542f..afa3f7d06 100644 --- a/src/storage/v2/replication/replication.hpp +++ b/src/storage/v2/replication/replication.hpp @@ -32,14 +32,9 @@ class Storage; class ReplicationServer; class ReplicationClient; -struct ReplicationState { - enum class RegisterReplicaError : uint8_t { - NAME_EXISTS, - END_POINT_EXISTS, - CONNECTION_FAILED, - COULD_NOT_BE_PERSISTED - }; +enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED, COULD_NOT_BE_PERSISTED }; +struct ReplicationState { // TODO: This mirrors the logic in InMemoryConstructor; make it independent ReplicationState(bool restore, std::filesystem::path durability_dir); @@ -50,7 +45,7 @@ struct ReplicationState { bool SetMainReplicationRole(Storage *storage); // Set the instance to MAIN // TODO: ReplicationServer/Client uses Storage* for RPC callbacks - bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config, + bool SetReplicaRole(const replication::ReplicationServerConfig &config, Storage *storage); // Sets the instance to REPLICA // Generic restoration void RestoreReplicationRole(Storage *storage); @@ -64,9 +59,7 @@ struct ReplicationState { bool FinalizeTransaction(uint64_t timestamp); // MAIN connecting to replicas - utils::BasicResult RegisterReplica(std::string name, io::network::Endpoint endpoint, - const replication::ReplicationMode replication_mode, - const replication::RegistrationMode registration_mode, + utils::BasicResult RegisterReplica(const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config, Storage *storage); bool UnregisterReplica(std::string_view name); @@ -97,8 +90,8 @@ struct ReplicationState { void AppendEpoch(std::string new_epoch); private: + bool TryPersistReplicaClient(const replication::ReplicationClientConfig &config); bool ShouldStoreAndRestoreReplicationState() const { return nullptr != durability_; } - void SetRole(replication::ReplicationRole role) { return replication_role_.store(role); } // NOTE: Server is not in MAIN it is in REPLICA diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 5b22e6628..e4817929f 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -30,14 +30,12 @@ static auto CreateClientContext(const replication::ReplicationClientConfig &conf : communication::ClientContext{}; } -ReplicationClient::ReplicationClient(Storage *storage, std::string name, memgraph::io::network::Endpoint endpoint, - replication::ReplicationMode mode, - replication::ReplicationClientConfig const &config) - : name_{std::move(name)}, +ReplicationClient::ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config) + : name_{config.name}, rpc_context_{CreateClientContext(config)}, - rpc_client_{std::move(endpoint), &rpc_context_}, + rpc_client_{io::network::Endpoint(config.ip_address, config.port), &rpc_context_}, replica_check_frequency_{config.replica_check_frequency}, - mode_{mode}, + mode_{config.mode}, storage_{storage} {} ReplicationClient::~ReplicationClient() { diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 76c2150a9..fb3d1eb5c 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -66,8 +66,7 @@ class ReplicationClient { friend class ReplicaStream; public: - ReplicationClient(Storage *storage, std::string name, memgraph::io::network::Endpoint endpoint, - replication::ReplicationMode mode, const replication::ReplicationClientConfig &config); + ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config); ReplicationClient(ReplicationClient const &) = delete; ReplicationClient &operator=(ReplicationClient const &) = delete; diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 1601db16c..f64e2cd16 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -30,9 +30,10 @@ auto CreateServerContext(const replication::ReplicationServerConfig &config) -> constexpr auto kReplictionServerThreads = 1; } // namespace -ReplicationServer::ReplicationServer(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) +ReplicationServer::ReplicationServer(const replication::ReplicationServerConfig &config) : rpc_server_context_{CreateServerContext(config)}, - rpc_server_{std::move(endpoint), &rpc_server_context_, kReplictionServerThreads} { + rpc_server_{io::network::Endpoint{config.ip_address, config.port}, &rpc_server_context_, + kReplictionServerThreads} { rpc_server_.Register([](auto *req_reader, auto *res_builder) { spdlog::debug("Received FrequentHeartbeatRpc"); FrequentHeartbeatHandler(req_reader, res_builder); diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index 408d1efbd..ca036fd3a 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -20,7 +20,7 @@ namespace memgraph::storage { class ReplicationServer { public: - explicit ReplicationServer(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config); + explicit ReplicationServer(const replication::ReplicationServerConfig &config); ReplicationServer(const ReplicationServer &) = delete; ReplicationServer(ReplicationServer &&) = delete; ReplicationServer &operator=(const ReplicationServer &) = delete; diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 7d73388c7..608421228 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -315,29 +315,23 @@ class Storage { virtual void EstablishNewEpoch() = 0; - virtual auto CreateReplicationClient(std::string name, io::network::Endpoint endpoint, - replication::ReplicationMode mode, - replication::ReplicationClientConfig const &config) + virtual auto CreateReplicationClient(replication::ReplicationClientConfig const &config) -> std::unique_ptr = 0; - virtual auto CreateReplicationServer(io::network::Endpoint endpoint, - replication::ReplicationServerConfig const &config) + virtual auto CreateReplicationServer(const replication::ReplicationServerConfig &config) -> std::unique_ptr = 0; /// REPLICATION - bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { - return replication_state_.SetReplicaRole(std::move(endpoint), config, this); + bool SetReplicaRole(const replication::ReplicationServerConfig &config) { + return replication_state_.SetReplicaRole(config, this); } bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); } /// @pre The instance should have a MAIN role /// @pre Timeout can only be set for SYNC replication - auto RegisterReplica(std::string name, io::network::Endpoint endpoint, - const replication::ReplicationMode replication_mode, - const replication::RegistrationMode registration_mode, + auto RegisterReplica(const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { - return replication_state_.RegisterReplica(std::move(name), std::move(endpoint), replication_mode, registration_mode, - config, this); + return replication_state_.RegisterReplica(registration_mode, config, this); } /// @pre The instance should have a MAIN role bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); } diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 94312bba5..bbf55796a 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -27,6 +27,21 @@ using testing::UnorderedElementsAre; +using memgraph::storage::Config; +using memgraph::storage::EdgeAccessor; +using memgraph::storage::Gid; +using memgraph::storage::InMemoryStorage; +using memgraph::storage::PropertyValue; +using memgraph::storage::RegisterReplicaError; +using memgraph::storage::Storage; +using memgraph::storage::View; +using memgraph::storage::replication::RegistrationMode; +using memgraph::storage::replication::ReplicaState; +using memgraph::storage::replication::ReplicationClientConfig; +using memgraph::storage::replication::ReplicationMode; +using memgraph::storage::replication::ReplicationRole; +using memgraph::storage::replication::ReplicationServerConfig; + class ReplicationTest : public ::testing::Test { protected: std::filesystem::path storage_directory{std::filesystem::temp_directory_path() / @@ -35,12 +50,11 @@ class ReplicationTest : public ::testing::Test { void TearDown() override { Clear(); } - memgraph::storage::Config configuration{ - .items = {.properties_on_edges = true}, - .durability = { - .storage_directory = storage_directory, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }}; + Config configuration{.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + }}; const std::string local_host = ("127.0.0.1"); const std::array ports{10000, 20000}; @@ -54,22 +68,22 @@ class ReplicationTest : public ::testing::Test { }; TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { - std::unique_ptr main_store = - std::make_unique(configuration); - std::unique_ptr replica_store = - std::make_unique(configuration); + std::unique_ptr main_store = std::make_unique(configuration); + std::unique_ptr replica_store = std::make_unique(configuration); - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store = static_cast(replica_store.get()); + replica_store->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - replica_mem_store->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); - - ASSERT_FALSE(main_mem_store - ->RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); // vertex create @@ -78,32 +92,30 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { const auto *vertex_label = "vertex_label"; const auto *vertex_property = "vertex_property"; const auto *vertex_property_value = "vertex_property_value"; - std::optional vertex_gid; + std::optional vertex_gid; { auto acc = main_store->Access(); auto v = acc->CreateVertex(); vertex_gid.emplace(v.Gid()); ASSERT_TRUE(v.AddLabel(main_store->NameToLabel(vertex_label)).HasValue()); - ASSERT_TRUE(v.SetProperty(main_store->NameToProperty(vertex_property), - memgraph::storage::PropertyValue(vertex_property_value)) - .HasValue()); + ASSERT_TRUE( + v.SetProperty(main_store->NameToProperty(vertex_property), PropertyValue(vertex_property_value)).HasValue()); ASSERT_FALSE(acc->Commit().HasError()); } { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto labels = v->Labels(memgraph::storage::View::OLD); + const auto labels = v->Labels(View::OLD); ASSERT_TRUE(labels.HasValue()); ASSERT_EQ(labels->size(), 1); ASSERT_THAT(*labels, UnorderedElementsAre(replica_store->NameToLabel(vertex_label))); - const auto properties = v->Properties(memgraph::storage::View::OLD); + const auto properties = v->Properties(View::OLD); ASSERT_TRUE(properties.HasValue()); ASSERT_EQ(properties->size(), 1); - ASSERT_THAT(*properties, - UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(vertex_property), - memgraph::storage::PropertyValue(vertex_property_value)))); + ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(vertex_property), + PropertyValue(vertex_property_value)))); ASSERT_FALSE(acc->Commit().HasError()); } @@ -111,7 +123,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { // vertex remove label { auto acc = main_store->Access(); - auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_TRUE(v->RemoveLabel(main_store->NameToLabel(vertex_label)).HasValue()); ASSERT_FALSE(acc->Commit().HasError()); @@ -119,9 +131,9 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto labels = v->Labels(memgraph::storage::View::OLD); + const auto labels = v->Labels(View::OLD); ASSERT_TRUE(labels.HasValue()); ASSERT_EQ(labels->size(), 0); ASSERT_FALSE(acc->Commit().HasError()); @@ -130,7 +142,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { // vertex delete { auto acc = main_store->Access(); - auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_TRUE(acc->DeleteVertex(&*v).HasValue()); ASSERT_FALSE(acc->Commit().HasError()); @@ -138,7 +150,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_FALSE(v); vertex_gid.reset(); ASSERT_FALSE(acc->Commit().HasError()); @@ -149,7 +161,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { const auto *edge_type = "edge_type"; const auto *edge_property = "edge_property"; const auto *edge_property_value = "edge_property_value"; - std::optional edge_gid; + std::optional edge_gid; { auto acc = main_store->Access(); auto v = acc->CreateVertex(); @@ -157,15 +169,13 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { auto edgeRes = acc->CreateEdge(&v, &v, main_store->NameToEdgeType(edge_type)); ASSERT_TRUE(edgeRes.HasValue()); auto edge = edgeRes.GetValue(); - ASSERT_TRUE(edge.SetProperty(main_store->NameToProperty(edge_property), - memgraph::storage::PropertyValue(edge_property_value)) - .HasValue()); + ASSERT_TRUE( + edge.SetProperty(main_store->NameToProperty(edge_property), PropertyValue(edge_property_value)).HasValue()); edge_gid.emplace(edge.Gid()); ASSERT_FALSE(acc->Commit().HasError()); } - const auto find_edge = [&](const auto &edges, - const memgraph::storage::Gid edge_gid) -> std::optional { + const auto find_edge = [&](const auto &edges, const Gid edge_gid) -> std::optional { for (const auto &edge : edges) { if (edge.Gid() == edge_gid) { return edge; @@ -176,27 +186,26 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto out_edges = v->OutEdges(memgraph::storage::View::OLD); + const auto out_edges = v->OutEdges(View::OLD); ASSERT_TRUE(out_edges.HasValue()); const auto edge = find_edge(out_edges->edges, *edge_gid); ASSERT_EQ(edge->EdgeType(), replica_store->NameToEdgeType(edge_type)); - const auto properties = edge->Properties(memgraph::storage::View::OLD); + const auto properties = edge->Properties(View::OLD); ASSERT_TRUE(properties.HasValue()); ASSERT_EQ(properties->size(), 1); - ASSERT_THAT(*properties, - UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(edge_property), - memgraph::storage::PropertyValue(edge_property_value)))); + ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(edge_property), + PropertyValue(edge_property_value)))); ASSERT_FALSE(acc->Commit().HasError()); } // delete edge { auto acc = main_store->Access(); - auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - auto out_edges = v->OutEdges(memgraph::storage::View::OLD); + auto out_edges = v->OutEdges(View::OLD); auto edge = find_edge(out_edges->edges, *edge_gid); ASSERT_TRUE(edge); ASSERT_TRUE(acc->DeleteEdge(&*edge).HasValue()); @@ -205,9 +214,9 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto out_edges = v->OutEdges(memgraph::storage::View::OLD); + const auto out_edges = v->OutEdges(View::OLD); ASSERT_TRUE(out_edges.HasValue()); ASSERT_FALSE(find_edge(out_edges->edges, *edge_gid)); ASSERT_FALSE(acc->Commit().HasError()); @@ -280,65 +289,71 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { } TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage( - {.durability = { - .storage_directory = storage_directory, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }})}; - auto *main_mem_store = static_cast(main_store.get()); + std::unique_ptr main_store{ + new InMemoryStorage({.durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + }})}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage( - {.durability = { - .storage_directory = storage_directory, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }})}; - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr replica_store1{ + new InMemoryStorage({.durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + }})}; - replica_mem_store1->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage( - {.durability = { - .storage_directory = storage_directory, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }})}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); - replica_mem_store2->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationServerConfig{}); + std::unique_ptr replica_store2{ + new InMemoryStorage({.durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + }})}; + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[1], + }); - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); const auto *vertex_label = "label"; const auto *vertex_property = "property"; const auto *vertex_property_value = "property_value"; - std::optional vertex_gid; + std::optional vertex_gid; { auto acc = main_store->Access(); auto v = acc->CreateVertex(); ASSERT_TRUE(v.AddLabel(main_store->NameToLabel(vertex_label)).HasValue()); - ASSERT_TRUE(v.SetProperty(main_store->NameToProperty(vertex_property), - memgraph::storage::PropertyValue(vertex_property_value)) - .HasValue()); + ASSERT_TRUE( + v.SetProperty(main_store->NameToProperty(vertex_property), PropertyValue(vertex_property_value)).HasValue()); vertex_gid.emplace(v.Gid()); ASSERT_FALSE(acc->Commit().HasError()); } - const auto check_replica = [&](memgraph::storage::Storage *replica_store) { + const auto check_replica = [&](Storage *replica_store) { auto acc = replica_store->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto labels = v->Labels(memgraph::storage::View::OLD); + const auto labels = v->Labels(View::OLD); ASSERT_TRUE(labels.HasValue()); ASSERT_THAT(*labels, UnorderedElementsAre(replica_store->NameToLabel(vertex_label))); ASSERT_FALSE(acc->Commit().HasError()); @@ -347,7 +362,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { check_replica(replica_store1.get()); check_replica(replica_store2.get()); - main_mem_store->UnregisterReplica(replicas[1]); + main_store->UnregisterReplica(replicas[1]); { auto acc = main_store->Access(); auto v = acc->CreateVertex(); @@ -358,7 +373,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { // REPLICA1 should contain the new vertex { auto acc = replica_store1->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_FALSE(acc->Commit().HasError()); } @@ -366,23 +381,23 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { // REPLICA2 should not contain the new vertex { auto acc = replica_store2->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_FALSE(v); ASSERT_FALSE(acc->Commit().HasError()); } } TEST_F(ReplicationTest, RecoveryProcess) { - std::vector vertex_gids; + std::vector vertex_gids; // Force the creation of snapshot { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage( - {.durability = { - .storage_directory = storage_directory, - .recover_on_startup = true, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - .snapshot_on_exit = true, - }})}; + std::unique_ptr main_store{ + new InMemoryStorage({.durability = { + .storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_on_exit = true, + }})}; { auto acc = main_store->Access(); // Create the vertex before registering a replica @@ -394,11 +409,10 @@ TEST_F(ReplicationTest, RecoveryProcess) { { // Create second WAL - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage( + std::unique_ptr main_store{new InMemoryStorage( {.durability = {.storage_directory = storage_directory, .recover_on_startup = true, - .snapshot_wal_mode = - memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; // Create vertices in 2 different transactions { auto acc = main_store->Access(); @@ -414,13 +428,12 @@ TEST_F(ReplicationTest, RecoveryProcess) { } } - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage( - {.durability = { - .storage_directory = storage_directory, - .recover_on_startup = true, - .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }})}; - auto *main_mem_store = static_cast(main_store.get()); + std::unique_ptr main_store{ + new InMemoryStorage({.durability = { + .storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + }})}; static constexpr const auto *property_name = "property_name"; static constexpr const auto property_value = 1; @@ -428,11 +441,9 @@ TEST_F(ReplicationTest, RecoveryProcess) { // Force the creation of current WAL file auto acc = main_store->Access(); for (const auto &vertex_gid : vertex_gids) { - auto v = acc->FindVertex(vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(vertex_gid, View::OLD); ASSERT_TRUE(v); - ASSERT_TRUE( - v->SetProperty(main_store->NameToProperty(property_name), memgraph::storage::PropertyValue(property_value)) - .HasValue()); + ASSERT_TRUE(v->SetProperty(main_store->NameToProperty(property_name), PropertyValue(property_value)).HasValue()); } ASSERT_FALSE(acc->Commit().HasError()); } @@ -444,32 +455,35 @@ TEST_F(ReplicationTest, RecoveryProcess) { static constexpr const auto *vertex_label = "vertex_label"; { - std::unique_ptr replica_store{new memgraph::storage::InMemoryStorage( + std::unique_ptr replica_store{new InMemoryStorage( {.durability = {.storage_directory = replica_storage_directory, - .snapshot_wal_mode = - memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; - auto *replica_mem_store = static_cast(replica_store.get()); + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; - replica_mem_store->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); - ASSERT_EQ(main_mem_store->GetReplicaState(replicas[0]), memgraph::storage::replication::ReplicaState::RECOVERY); + ASSERT_EQ(main_store->GetReplicaState(replicas[0]), ReplicaState::RECOVERY); - while (main_mem_store->GetReplicaState(replicas[0]) != memgraph::storage::replication::ReplicaState::READY) { + while (main_store->GetReplicaState(replicas[0]) != ReplicaState::READY) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } { auto acc = main_store->Access(); for (const auto &vertex_gid : vertex_gids) { - auto v = acc->FindVertex(vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_TRUE(v->AddLabel(main_store->NameToLabel(vertex_label)).HasValue()); } @@ -478,39 +492,36 @@ TEST_F(ReplicationTest, RecoveryProcess) { { auto acc = replica_store->Access(); for (const auto &vertex_gid : vertex_gids) { - auto v = acc->FindVertex(vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto labels = v->Labels(memgraph::storage::View::OLD); + const auto labels = v->Labels(View::OLD); ASSERT_TRUE(labels.HasValue()); ASSERT_THAT(*labels, UnorderedElementsAre(replica_store->NameToLabel(vertex_label))); - const auto properties = v->Properties(memgraph::storage::View::OLD); + const auto properties = v->Properties(View::OLD); ASSERT_TRUE(properties.HasValue()); - ASSERT_THAT(*properties, - UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(property_name), - memgraph::storage::PropertyValue(property_value)))); + ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(property_name), + PropertyValue(property_value)))); } ASSERT_FALSE(acc->Commit().HasError()); } } { - std::unique_ptr replica_store{new memgraph::storage::InMemoryStorage( + std::unique_ptr replica_store{new InMemoryStorage( {.durability = {.storage_directory = replica_storage_directory, .recover_on_startup = true, - .snapshot_wal_mode = - memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; + .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; { auto acc = replica_store->Access(); for (const auto &vertex_gid : vertex_gids) { - auto v = acc->FindVertex(vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(vertex_gid, View::OLD); ASSERT_TRUE(v); - const auto labels = v->Labels(memgraph::storage::View::OLD); + const auto labels = v->Labels(View::OLD); ASSERT_TRUE(labels.HasValue()); ASSERT_THAT(*labels, UnorderedElementsAre(replica_store->NameToLabel(vertex_label))); - const auto properties = v->Properties(memgraph::storage::View::OLD); + const auto properties = v->Properties(View::OLD); ASSERT_TRUE(properties.HasValue()); - ASSERT_THAT(*properties, - UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(property_name), - memgraph::storage::PropertyValue(property_value)))); + ASSERT_THAT(*properties, UnorderedElementsAre(std::make_pair(replica_store->NameToProperty(property_name), + PropertyValue(property_value)))); } ASSERT_FALSE(acc->Commit().HasError()); } @@ -518,26 +529,27 @@ TEST_F(ReplicationTest, RecoveryProcess) { } TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; - std::unique_ptr replica_store_async{ - new memgraph::storage::InMemoryStorage(configuration)}; + std::unique_ptr replica_store_async{new InMemoryStorage(configuration)}; - auto *replica_mem_store_async = static_cast(replica_store_async.get()); + replica_store_async->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[1], + }); - replica_mem_store_async->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationServerConfig{}); - - ASSERT_FALSE(main_mem_store - ->RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA_ASYNC", + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); static constexpr size_t vertices_create_num = 10; - std::vector created_vertices; + std::vector created_vertices; for (size_t i = 0; i < vertices_create_num; ++i) { auto acc = main_store->Access(); auto v = acc->CreateVertex(); @@ -545,21 +557,19 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { ASSERT_FALSE(acc->Commit().HasError()); if (i == 0) { - ASSERT_EQ(main_mem_store->GetReplicaState("REPLICA_ASYNC"), - memgraph::storage::replication::ReplicaState::REPLICATING); + ASSERT_EQ(main_store->GetReplicaState("REPLICA_ASYNC"), ReplicaState::REPLICATING); } else { - ASSERT_EQ(main_mem_store->GetReplicaState("REPLICA_ASYNC"), - memgraph::storage::replication::ReplicaState::RECOVERY); + ASSERT_EQ(main_store->GetReplicaState("REPLICA_ASYNC"), ReplicaState::RECOVERY); } } - while (main_mem_store->GetReplicaState("REPLICA_ASYNC") != memgraph::storage::replication::ReplicaState::READY) { + while (main_store->GetReplicaState("REPLICA_ASYNC") != ReplicaState::READY) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } ASSERT_TRUE(std::all_of(created_vertices.begin(), created_vertices.end(), [&](const auto vertex_gid) { auto acc = replica_store_async->Access(); - auto v = acc->FindVertex(vertex_gid, memgraph::storage::View::OLD); + auto v = acc->FindVertex(vertex_gid, View::OLD); const bool exists = v.has_value(); EXPECT_FALSE(acc->Commit().HasError()); return exists; @@ -567,35 +577,42 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { } TEST_F(ReplicationTest, EpochTest) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_mem_store1->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_mem_store2->SetReplicaRole(memgraph::io::network::Endpoint{local_host, 10001}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = 10001, + }); - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); - std::optional vertex_gid; + std::optional vertex_gid; { auto acc = main_store->Access(); const auto v = acc->CreateVertex(); @@ -604,26 +621,29 @@ TEST_F(ReplicationTest, EpochTest) { } { auto acc = replica_store1->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_FALSE(acc->Commit().HasError()); } { auto acc = replica_store2->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_FALSE(acc->Commit().HasError()); } - main_mem_store->UnregisterReplica(replicas[0]); - main_mem_store->UnregisterReplica(replicas[1]); + main_store->UnregisterReplica(replicas[0]); + main_store->UnregisterReplica(replicas[1]); - replica_mem_store1->SetMainReplicationRole(); - ASSERT_FALSE(replica_mem_store1 - ->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + replica_store1->SetMainReplicationRole(); + ASSERT_FALSE(replica_store1 + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); @@ -641,18 +661,23 @@ TEST_F(ReplicationTest, EpochTest) { // Replica1 should forward it's vertex to Replica2 { auto acc = replica_store2->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_TRUE(v); ASSERT_FALSE(acc->Commit().HasError()); } - replica_mem_store1->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); - ASSERT_TRUE(main_mem_store - ->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); + ASSERT_TRUE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); @@ -666,155 +691,179 @@ TEST_F(ReplicationTest, EpochTest) { // it's newest vertex { auto acc = replica_store1->Access(); - const auto v = acc->FindVertex(*vertex_gid, memgraph::storage::View::OLD); + const auto v = acc->FindVertex(*vertex_gid, View::OLD); ASSERT_FALSE(v); ASSERT_FALSE(acc->Commit().HasError()); } } TEST_F(ReplicationTest, ReplicationInformation) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; - replica_mem_store1->SetReplicaRole(replica1_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); + uint16_t replica1_port = 10001; + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = replica1_port, + }); - const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002}; - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); - replica_mem_store2->SetReplicaRole(replica2_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); + uint16_t replica2_port = 10002; + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = replica2_port, + }); - const std::string replica1_name{replicas[0]}; - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = replica1_port, + }) .HasError()); - const std::string replica2_name{replicas[1]}; - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replica2_name, replica2_endpoint, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) .HasError()); - ASSERT_EQ(main_mem_store->GetReplicationRole(), memgraph::storage::replication::ReplicationRole::MAIN); - ASSERT_EQ(replica_mem_store1->GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA); - ASSERT_EQ(replica_mem_store2->GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA); + ASSERT_EQ(main_store->GetReplicationRole(), ReplicationRole::MAIN); + ASSERT_EQ(replica_store1->GetReplicationRole(), ReplicationRole::REPLICA); + ASSERT_EQ(replica_store2->GetReplicationRole(), ReplicationRole::REPLICA); - const auto replicas_info = main_mem_store->ReplicasInfo(); + const auto replicas_info = main_store->ReplicasInfo(); ASSERT_EQ(replicas_info.size(), 2); const auto &first_info = replicas_info[0]; - ASSERT_EQ(first_info.name, replica1_name); - ASSERT_EQ(first_info.mode, memgraph::storage::replication::ReplicationMode::SYNC); - ASSERT_EQ(first_info.endpoint, replica1_endpoint); - ASSERT_EQ(first_info.state, memgraph::storage::replication::ReplicaState::READY); + ASSERT_EQ(first_info.name, replicas[0]); + ASSERT_EQ(first_info.mode, ReplicationMode::SYNC); + ASSERT_EQ(first_info.endpoint, (memgraph::io::network::Endpoint{local_host, replica1_port})); + ASSERT_EQ(first_info.state, ReplicaState::READY); const auto &second_info = replicas_info[1]; - ASSERT_EQ(second_info.name, replica2_name); - ASSERT_EQ(second_info.mode, memgraph::storage::replication::ReplicationMode::ASYNC); - ASSERT_EQ(second_info.endpoint, replica2_endpoint); - ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY); + ASSERT_EQ(second_info.name, replicas[1]); + ASSERT_EQ(second_info.mode, ReplicationMode::ASYNC); + ASSERT_EQ(second_info.endpoint, (memgraph::io::network::Endpoint{local_host, replica2_port})); + ASSERT_EQ(second_info.state, ReplicaState::READY); } TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; - replica_mem_store1->SetReplicaRole(replica1_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); + uint16_t replica1_port = 10001; + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = replica1_port, + }); - const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002}; - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); - replica_mem_store2->SetReplicaRole(replica2_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); + uint16_t replica2_port = 10002; + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = replica2_port, + }); - const std::string replica1_name{replicas[0]}; - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = replica1_port, + }) .HasError()); - const std::string replica2_name{replicas[0]}; - ASSERT_TRUE(main_mem_store - ->RegisterReplica(replica2_name, replica2_endpoint, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::ReplicationState::RegisterReplicaError::NAME_EXISTS); + ASSERT_TRUE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) + .GetError() == RegisterReplicaError::NAME_EXISTS); } TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + uint16_t common_port = 10001; - const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; - replica_mem_store1->SetReplicaRole(replica1_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = common_port, + }); - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = common_port, + }); - const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10001}; - replica_mem_store2->SetReplicaRole(replica2_endpoint, memgraph::storage::replication::ReplicationServerConfig{}); - - const std::string replica1_name{replicas[0]}; - ASSERT_FALSE(main_mem_store - ->RegisterReplica(replica1_name, replica1_endpoint, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) + ASSERT_FALSE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = common_port, + }) .HasError()); - const std::string replica2_name{replicas[1]}; - ASSERT_TRUE(main_mem_store - ->RegisterReplica(replica2_name, replica2_endpoint, - memgraph::storage::replication::ReplicationMode::ASYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::ReplicationState::RegisterReplicaError::END_POINT_EXISTS); + ASSERT_TRUE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = common_port, + }) + .GetError() == RegisterReplicaError::END_POINT_EXISTS); } TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { auto main_config = configuration; main_config.durability.restore_replication_state_on_startup = true; - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(main_config)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr main_store{new InMemoryStorage(main_config)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_mem_store1->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); - replica_mem_store2->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationServerConfig{}); + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[1], + }); - auto res = main_mem_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}); + auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); ASSERT_FALSE(res.HasError()); - res = main_mem_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}); + res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()); - auto replica_infos = main_mem_store->ReplicasInfo(); + auto replica_infos = main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 2); ASSERT_EQ(replica_infos[0].name, replicas[0]); @@ -826,10 +875,9 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { main_store.reset(); - std::unique_ptr other_main_store{new memgraph::storage::InMemoryStorage(main_config)}; - auto *other_main_mem_store = static_cast(other_main_store.get()); + std::unique_ptr other_main_store{new InMemoryStorage(main_config)}; - replica_infos = other_main_mem_store->ReplicasInfo(); + replica_infos = other_main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 2); ASSERT_EQ(replica_infos[0].name, replicas[0]); ASSERT_EQ(replica_infos[0].endpoint.address, local_host); @@ -843,32 +891,37 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { auto main_config = configuration; main_config.durability.restore_replication_state_on_startup = true; - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(main_config)}; - std::unique_ptr replica_store1{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); - auto *replica_mem_store1 = static_cast(replica_store1.get()); + std::unique_ptr main_store{new InMemoryStorage(main_config)}; + std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_mem_store1->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store1->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[0], + }); - std::unique_ptr replica_store2{new memgraph::storage::InMemoryStorage(configuration)}; - auto *replica_mem_store2 = static_cast(replica_store2.get()); + std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_mem_store2->SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationServerConfig{}); + replica_store2->SetReplicaRole(ReplicationServerConfig{ + .ip_address = local_host, + .port = ports[1], + }); - auto res = main_mem_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}); + auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); ASSERT_FALSE(res.HasError()); - res = main_mem_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}); + res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()); - auto replica_infos = main_mem_store->ReplicasInfo(); + auto replica_infos = main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 2); ASSERT_EQ(replica_infos[0].name, replicas[0]); @@ -878,10 +931,10 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { ASSERT_EQ(replica_infos[1].endpoint.address, local_host); ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]); - const auto unregister_res = main_mem_store->UnregisterReplica(replicas[0]); + const auto unregister_res = main_store->UnregisterReplica(replicas[0]); ASSERT_TRUE(unregister_res); - replica_infos = main_mem_store->ReplicasInfo(); + replica_infos = main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 1); ASSERT_EQ(replica_infos[0].name, replicas[1]); ASSERT_EQ(replica_infos[0].endpoint.address, local_host); @@ -889,9 +942,8 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { main_store.reset(); - std::unique_ptr other_main_store{new memgraph::storage::InMemoryStorage(main_config)}; - auto *other_main_mem_store = static_cast(other_main_store.get()); - replica_infos = other_main_mem_store->ReplicasInfo(); + std::unique_ptr other_main_store{new InMemoryStorage(main_config)}; + replica_infos = other_main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 1); ASSERT_EQ(replica_infos[0].name, replicas[1]); ASSERT_EQ(replica_infos[0].endpoint.address, local_host); @@ -899,13 +951,15 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { } TEST_F(ReplicationTest, AddingInvalidReplica) { - std::unique_ptr main_store{new memgraph::storage::InMemoryStorage(configuration)}; - auto *main_mem_store = static_cast(main_store.get()); + std::unique_ptr main_store{new InMemoryStorage(configuration)}; - ASSERT_TRUE(main_mem_store - ->RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]}, - memgraph::storage::replication::ReplicationMode::SYNC, - memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - memgraph::storage::replication::ReplicationClientConfig{}) - .GetError() == memgraph::storage::ReplicationState::RegisterReplicaError::CONNECTION_FAILED); + ASSERT_TRUE(main_store + ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) + .GetError() == RegisterReplicaError::CONNECTION_FAILED); }