diff --git a/src/memgraph.cpp b/src/memgraph.cpp index d1a972c7c..9bd3f3f92 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -216,6 +216,11 @@ DEFINE_bool(telemetry_enabled, false, "the database runtime (vertex and edge counts and resource usage) " "to allow for easier improvement of the product."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_restore_replicas_on_startup, true, + "Controls replicas should be restored automatically."); // TODO(42jeremy) this must be removed once T0835 + // is implemented. + // Streams flags // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32( @@ -1195,7 +1200,8 @@ int main(int argc, char **argv) { .snapshot_retention_count = FLAGS_storage_snapshot_retention_count, .wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib, .wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx, - .snapshot_on_exit = FLAGS_storage_snapshot_on_exit}, + .snapshot_on_exit = FLAGS_storage_snapshot_on_exit, + .restore_replicas_on_startup = FLAGS_storage_restore_replicas_on_startup}, .transaction = {.isolation_level = ParseIsolationLevel()}}; if (FLAGS_storage_snapshot_interval_sec == 0) { if (FLAGS_storage_wal_enabled) { diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 17d62b240..bc1a6a725 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -44,6 +44,7 @@ #include "query/trigger.hpp" #include "query/typed_value.hpp" #include "storage/v2/property_value.hpp" +#include "storage/v2/replication/enums.hpp" #include "utils/algorithm.hpp" #include "utils/csv_parsing.hpp" #include "utils/event_counter.hpp" @@ -184,6 +185,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, + storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, {.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}); if (ret.HasError()) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index 8a02b188f..5f7564b0d 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -619,10 +619,10 @@ void Streams::Drop(const std::string &stream_name) { // no running Test function for this consumer, therefore it can be erased. std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second); - locked_streams->erase(it); if (!storage_.Delete(stream_name)) { throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name); } + locked_streams->erase(it); // TODO(antaljanosbenjamin) Release the transformation } diff --git a/src/query/stream/streams.hpp b/src/query/stream/streams.hpp index 5fe5d3f8e..ef44653d5 100644 --- a/src/query/stream/streams.hpp +++ b/src/query/stream/streams.hpp @@ -188,7 +188,7 @@ class Streams final { void Persist(StreamStatus<TStream> &&status) { const std::string stream_name = status.name; if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) { - throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name}; + throw StreamsException{"Couldn't persist stream data for stream '{}'", stream_name}; } } diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index f33a8553d..d46563183 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -13,7 +13,6 @@ set(storage_v2_src_files storage.cpp) ##### Replication ##### - define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files) add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE) @@ -26,10 +25,10 @@ set(storage_v2_src_files replication/replication_server.cpp replication/serialization.cpp replication/slk.cpp + replication/replication_persistence_helper.cpp ${lcp_storage_cpp_files}) ####################### - find_package(gflags REQUIRED) find_package(Threads REQUIRED) diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index 0db24615d..26251584e 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -49,7 +49,7 @@ struct Config { uint64_t wal_file_flush_every_n_tx{100000}; bool snapshot_on_exit{false}; - + bool restore_replicas_on_startup{false}; } durability; struct Transaction { diff --git a/src/storage/v2/durability/paths.hpp b/src/storage/v2/durability/paths.hpp index a8e210d5a..557e4daa9 100644 --- a/src/storage/v2/durability/paths.hpp +++ b/src/storage/v2/durability/paths.hpp @@ -22,6 +22,7 @@ static const std::string kSnapshotDirectory{"snapshots"}; static const std::string kWalDirectory{"wal"}; static const std::string kBackupDirectory{".backup"}; static const std::string kLockFile{".lock"}; +static const std::string kReplicationDirectory{"replication"}; // This is the prefix used for Snapshot and WAL filenames. It is a timestamp // format that equals to: YYYYmmddHHMMSSffffff diff --git a/src/storage/v2/replication/config.hpp b/src/storage/v2/replication/config.hpp index 6c7e40ba9..b418a60d4 100644 --- a/src/storage/v2/replication/config.hpp +++ b/src/storage/v2/replication/config.hpp @@ -10,6 +10,8 @@ // licenses/APL.txt. #pragma once + +#include <chrono> #include <optional> #include <string> @@ -23,6 +25,8 @@ struct ReplicationClientConfig { struct SSL { std::string key_file = ""; std::string cert_file = ""; + + friend bool operator==(const SSL &, const SSL &) = default; }; std::optional<SSL> ssl; diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index 132fb14da..133fd9b74 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -16,4 +16,6 @@ namespace memgraph::storage::replication { enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID }; + +enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, CAN_BE_INVALID }; } // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/replication_persistence_helper.cpp b/src/storage/v2/replication/replication_persistence_helper.cpp new file mode 100644 index 000000000..f05848cea --- /dev/null +++ b/src/storage/v2/replication/replication_persistence_helper.cpp @@ -0,0 +1,83 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/replication/replication_persistence_helper.hpp" +#include "utils/logging.hpp" + +namespace { +const std::string kReplicaName = "replica_name"; +const std::string kIpAddress = "replica_ip_address"; +const std::string kPort = "replica_port"; +const std::string kSyncMode = "replica_sync_mode"; +const std::string kCheckFrequency = "replica_check_frequency"; +const std::string kSSLKeyFile = "replica_ssl_key_file"; +const std::string kSSLCertFile = "replica_ssl_cert_file"; +} // namespace + +namespace memgraph::storage::replication { + +nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) { + auto data = nlohmann::json::object(); + + data[kReplicaName] = std::move(status.name); + data[kIpAddress] = std::move(status.ip_address); + data[kPort] = status.port; + data[kSyncMode] = status.sync_mode; + + data[kCheckFrequency] = status.replica_check_frequency.count(); + + if (status.ssl.has_value()) { + data[kSSLKeyFile] = std::move(status.ssl->key_file); + data[kSSLCertFile] = std::move(status.ssl->cert_file); + } else { + data[kSSLKeyFile] = nullptr; + data[kSSLCertFile] = nullptr; + } + + return data; +} + +std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data) { + ReplicaStatus replica_status; + + const auto get_failed_message = [](const std::string_view message, const std::string_view nested_message) { + return fmt::format("Failed to deserialize replica's configuration: {} : {}", message, nested_message); + }; + + try { + data.at(kReplicaName).get_to(replica_status.name); + data.at(kIpAddress).get_to(replica_status.ip_address); + data.at(kPort).get_to(replica_status.port); + data.at(kSyncMode).get_to(replica_status.sync_mode); + + replica_status.replica_check_frequency = std::chrono::seconds(data.at(kCheckFrequency)); + + const auto &key_file = data.at(kSSLKeyFile); + const auto &cert_file = data.at(kSSLCertFile); + + MG_ASSERT(key_file.is_null() == cert_file.is_null()); + + if (!key_file.is_null()) { + replica_status.ssl = replication::ReplicationClientConfig::SSL{}; + data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file); + data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file); + } + } catch (const nlohmann::json::type_error &exception) { + spdlog::error(get_failed_message("Invalid type conversion", exception.what())); + return std::nullopt; + } catch (const nlohmann::json::out_of_range &exception) { + spdlog::error(get_failed_message("Non existing field", exception.what())); + return std::nullopt; + } + + return replica_status; +} +} // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/replication_persistence_helper.hpp b/src/storage/v2/replication/replication_persistence_helper.hpp new file mode 100644 index 000000000..c22164e33 --- /dev/null +++ b/src/storage/v2/replication/replication_persistence_helper.hpp @@ -0,0 +1,40 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include <chrono> +#include <compare> +#include <optional> +#include <string> + +#include <json/json.hpp> + +#include "storage/v2/replication/config.hpp" +#include "storage/v2/replication/enums.hpp" + +namespace memgraph::storage::replication { + +struct ReplicaStatus { + std::string name; + std::string ip_address; + uint16_t port; + ReplicationMode sync_mode; + std::chrono::seconds replica_check_frequency; + std::optional<ReplicationClientConfig::SSL> ssl; + + friend bool operator==(const ReplicaStatus &, const ReplicaStatus &) = default; +}; + +nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status); + +std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data); +} // namespace memgraph::storage::replication diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 1e66ffd6e..cbc41da86 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -17,6 +17,7 @@ #include <variant> #include <gflags/gflags.h> +#include <spdlog/spdlog.h> #include "io/network/endpoint.hpp" #include "storage/v2/durability/durability.hpp" @@ -28,6 +29,8 @@ #include "storage/v2/indices.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/replication/config.hpp" +#include "storage/v2/replication/enums.hpp" +#include "storage/v2/replication/replication_persistence_helper.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex_accessor.hpp" #include "utils/file.hpp" @@ -50,6 +53,19 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; namespace { inline constexpr uint16_t kEpochHistoryRetention = 1000; + +std::string RegisterReplicaErrorToString(Storage::RegisterReplicaError error) { + switch (error) { + case Storage::RegisterReplicaError::NAME_EXISTS: + return "NAME_EXISTS"; + case Storage::RegisterReplicaError::END_POINT_EXISTS: + return "END_POINT_EXISTS"; + case Storage::RegisterReplicaError::CONNECTION_FAILED: + return "CONNECTION_FAILED"; + case Storage::RegisterReplicaError::COULD_NOT_BE_PERSISTED: + return "COULD_NOT_BE_PERSISTED"; + } +} } // namespace auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipList<Vertex>::Iterator end, @@ -400,6 +416,16 @@ Storage::Storage(Config config) } else { commit_log_.emplace(timestamp_); } + + if (config_.durability.restore_replicas_on_startup) { + spdlog::info("Replica's configuration will be stored and will be automatically restored in case of a crash."); + utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory); + storage_ = + std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory); + RestoreReplicas(); + } else { + spdlog::warn("Replicas' configuration will NOT be stored. When the server restarts, replicas will be forgotten."); + } } Storage::~Storage() { @@ -1882,7 +1908,7 @@ bool Storage::SetMainReplicationRole() { utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, - const replication::ReplicationClientConfig &config) { + const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!"); const bool name_exists = replication_clients_.WithLock([&](auto &clients) { @@ -1902,9 +1928,28 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica( return RegisterReplicaError::END_POINT_EXISTS; } + if (ShouldStoreAndRestoreReplicas()) { + auto data = replication::ReplicaStatusToJSON( + replication::ReplicaStatus{.name = name, + .ip_address = endpoint.address, + .port = endpoint.port, + .sync_mode = replication_mode, + .replica_check_frequency = config.replica_check_frequency, + .ssl = config.ssl}); + if (!storage_->Put(name, data.dump())) { + spdlog::error("Error when saving replica {} in settings.", name); + return RegisterReplicaError::COULD_NOT_BE_PERSISTED; + } + } + auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config); + if (client->State() == replication::ReplicaState::INVALID) { - return RegisterReplicaError::CONNECTION_FAILED; + if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) { + return RegisterReplicaError::CONNECTION_FAILED; + } + + spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); } return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<Storage::RegisterReplicaError> { @@ -1925,8 +1970,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica( }); } -bool Storage::UnregisterReplica(const std::string_view name) { +bool Storage::UnregisterReplica(const std::string &name) { MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!"); + if (ShouldStoreAndRestoreReplicas()) { + if (!storage_->Delete(name)) { + spdlog::error("Error when removing replica {} from settings.", name); + return false; + } + } + return replication_clients_.WithLock([&](auto &clients) { return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); }); @@ -1962,4 +2014,41 @@ void Storage::SetIsolationLevel(IsolationLevel isolation_level) { isolation_level_ = isolation_level; } +void Storage::RestoreReplicas() { + MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole()); + if (!ShouldStoreAndRestoreReplicas()) { + return; + } + spdlog::info("Restoring replicas."); + + for (const auto &[replica_name, replica_data] : *storage_) { + spdlog::info("Restoring replica {}.", replica_name); + + const auto maybe_replica_status = replication::JSONToReplicaStatus(nlohmann::json::parse(replica_data)); + if (!maybe_replica_status.has_value()) { + LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name); + } + + auto replica_status = *maybe_replica_status; + MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name, + replica_name); + + auto ret = + RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port}, + replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID, + { + .replica_check_frequency = replica_status.replica_check_frequency, + .ssl = replica_status.ssl, + }); + + if (ret.HasError()) { + MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); + LOG_FATAL("Failure when restoring replica {}: {}.", replica_name, RegisterReplicaErrorToString(ret.GetError())); + } + spdlog::info("Replica {} restored.", replica_name); + } +} + +bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; } + } // namespace memgraph::storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index bd8b351e5..6aab1977f 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -18,6 +18,7 @@ #include <variant> #include "io/network/endpoint.hpp" +#include "kvstore/kvstore.hpp" #include "storage/v2/commit_log.hpp" #include "storage/v2/config.hpp" #include "storage/v2/constraints.hpp" @@ -411,15 +412,20 @@ class Storage final { bool SetMainReplicationRole(); - enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED }; + enum class RegisterReplicaError : uint8_t { + NAME_EXISTS, + END_POINT_EXISTS, + CONNECTION_FAILED, + COULD_NOT_BE_PERSISTED + }; /// @pre The instance should have a MAIN role /// @pre Timeout can only be set for SYNC replication utils::BasicResult<RegisterReplicaError, void> RegisterReplica( std::string name, io::network::Endpoint endpoint, replication::ReplicationMode replication_mode, - const replication::ReplicationClientConfig &config = {}); + replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config = {}); /// @pre The instance should have a MAIN role - bool UnregisterReplica(std::string_view name); + bool UnregisterReplica(const std::string &name); std::optional<replication::ReplicaState> GetReplicaState(std::string_view name); @@ -474,6 +480,10 @@ class Storage final { uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); + void RestoreReplicas(); + + bool ShouldStoreAndRestoreReplicas() const; + // Main storage lock. // // Accessors take a shared lock when starting, so it is possible to block @@ -534,6 +544,7 @@ class Storage final { std::filesystem::path wal_directory_; std::filesystem::path lock_file_path_; utils::OutputFile lock_file_handle_; + std::unique_ptr<kvstore::KVStore> storage_; utils::Scheduler snapshot_runner_; utils::SpinLock snapshot_lock_; diff --git a/tests/e2e/interactive_mg_runner.py b/tests/e2e/interactive_mg_runner.py index b7eaf60da..49ba8ed81 100644 --- a/tests/e2e/interactive_mg_runner.py +++ b/tests/e2e/interactive_mg_runner.py @@ -36,6 +36,7 @@ import os import subprocess from argparse import ArgumentParser from pathlib import Path +import tempfile import time import sys from inspect import signature @@ -103,7 +104,7 @@ def is_port_in_use(port: int) -> bool: return s.connect_ex(("localhost", port)) == 0 -def _start_instance(name, args, log_file, queries, use_ssl, procdir): +def _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory): assert ( name not in MEMGRAPH_INSTANCES.keys() ), "If this raises, you are trying to start an instance with the same name than one already running." @@ -113,7 +114,8 @@ def _start_instance(name, args, log_file, queries, use_ssl, procdir): mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl) MEMGRAPH_INSTANCES[name] = mg_instance log_file_path = os.path.join(BUILD_DIR, "logs", log_file) - binary_args = args + ["--log-file", log_file_path] + data_directory_path = os.path.join(BUILD_DIR, data_directory) + binary_args = args + ["--log-file", log_file_path] + ["--data-directory", data_directory_path] if len(procdir) != 0: binary_args.append("--query-modules-directory=" + procdir) @@ -175,8 +177,13 @@ def start_instance(context, name, procdir): if "ssl" in value: use_ssl = bool(value["ssl"]) value.pop("ssl") + data_directory = "" + if "data_directory" in value: + data_directory = value["data_directory"] + else: + data_directory = tempfile.TemporaryDirectory().name - instance = _start_instance(name, args, log_file, queries, use_ssl, procdir) + instance = _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory) mg_instances[name] = instance assert len(mg_instances) == 1 diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py index fd0f8336b..78cf083a5 100755 --- a/tests/e2e/memgraph.py +++ b/tests/e2e/memgraph.py @@ -76,11 +76,8 @@ class MemgraphInstanceRunner: self.stop() self.args = copy.deepcopy(args) self.args = [replace_paths(arg) for arg in self.args] - self.data_directory = tempfile.TemporaryDirectory() args_mg = [ self.binary_path, - "--data-directory", - self.data_directory.name, "--storage-wal-enabled", "--storage-snapshot-interval-sec", "300", diff --git a/tests/e2e/monitoring_server/workloads.yaml b/tests/e2e/monitoring_server/workloads.yaml index a3ec73efc..d6efdb892 100644 --- a/tests/e2e/monitoring_server/workloads.yaml +++ b/tests/e2e/monitoring_server/workloads.yaml @@ -5,7 +5,7 @@ monitoring_port: &monitoring_port "7444" template_cluster: &template_cluster cluster: monitoring: - args: ["--bolt-port=7687", "--log-level=TRACE", "--"] + args: ["--bolt-port=7687", "--log-level=TRACE"] log_file: "monitoring-websocket-e2e.log" template_cluster_ssl: &template_cluster_ssl cluster: @@ -21,7 +21,6 @@ template_cluster_ssl: &template_cluster_ssl *cert_file, "--bolt-key-file", *key_file, - "--", ] log_file: "monitoring-websocket-ssl-e2e.log" ssl: true diff --git a/tests/e2e/replication/show.py b/tests/e2e/replication/show.py index 3857ea7eb..7a51db39d 100755 --- a/tests/e2e/replication/show.py +++ b/tests/e2e/replication/show.py @@ -41,14 +41,14 @@ def test_show_replicas(connection): "state", } actual_column_names = {x.name for x in cursor.description} - assert expected_column_names == actual_column_names + assert actual_column_names == expected_column_names expected_data = { ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), } - assert expected_data == actual_data + assert actual_data == expected_data def test_show_replicas_while_inserting_data(connection): @@ -72,14 +72,14 @@ def test_show_replicas_while_inserting_data(connection): "state", } actual_column_names = {x.name for x in cursor.description} - assert expected_column_names == actual_column_names + assert actual_column_names == expected_column_names expected_data = { ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), } - assert expected_data == actual_data + assert actual_data == expected_data # 1/ execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});") @@ -92,17 +92,15 @@ def test_show_replicas_while_inserting_data(connection): ("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"), } actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) - print("actual_data=" + str(actual_data)) - print("expected_data=" + str(expected_data)) - assert expected_data == actual_data + assert actual_data == expected_data # 3/ res = execute_and_fetch_all(cursor, "MATCH (node) return node;") - assert 1 == len(res) + assert len(res) == 1 # 4/ actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) - assert expected_data == actual_data + assert actual_data == expected_data if __name__ == "__main__": diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 6386edcf2..462d3d8f1 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -18,6 +18,7 @@ import time from common import execute_and_fetch_all import interactive_mg_runner import mgclient +import tempfile interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) interactive_mg_runner.PROJECT_DIR = os.path.normpath( @@ -84,7 +85,7 @@ def test_show_replicas(connection): } actual_column_names = {x.name for x in cursor.description} - assert EXPECTED_COLUMN_NAMES == actual_column_names + assert actual_column_names == EXPECTED_COLUMN_NAMES expected_data = { ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), @@ -92,7 +93,7 @@ def test_show_replicas(connection): ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), } - assert expected_data == actual_data + assert actual_data == expected_data # 2/ execute_and_fetch_all(cursor, "DROP REPLICA replica_2") @@ -102,7 +103,7 @@ def test_show_replicas(connection): ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), } - assert expected_data == actual_data + assert actual_data == expected_data # 3/ interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") @@ -117,7 +118,283 @@ def test_show_replicas(connection): ("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"), ("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"), } - assert expected_data == actual_data + assert actual_data == expected_data + + +def test_basic_recovery(connection): + # Goal of this test is to check the recovery of main. + # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1/ We check that all replicas have the correct state: they should all be ready. + # 2/ We kill main. + # 3/ We re-start main. + # 4/ We check that all replicas have the correct state: they should all be ready. + # 5/ Drop one replica. + # 6/ We add some data to main, then kill it and restart. + # 7/ We check that all replicas but one have the expected data. + # 8/ We kill another replica. + # 9/ We add some data to main. + # 10/ We re-add the two replicas droped/killed and check the data. + # 11/ We kill another replica. + # 12/ Add some more data to main. + # 13/ Check the states of replicas. + + # 0/ + data_directory = tempfile.TemporaryDirectory() + CONFIGURATION = { + "replica_1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "replica_2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "replica_3": { + "args": ["--bolt-port", "7690", "--log-level=TRACE"], + "log_file": "replica3.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"], + }, + "replica_4": { + "args": ["--bolt-port", "7691", "--log-level=TRACE"], + "log_file": "replica4.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory.name}", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration. + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';") + + # 1/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + assert actual_data == expected_data + + def check_roles(): + assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0] + for index in range(1, 4): + assert ( + "replica" + == interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0] + ) + + check_roles() + + # 2/ + interactive_mg_runner.kill(CONFIGURATION, "main") + time.sleep(2) + + # 3/ + interactive_mg_runner.start(CONFIGURATION, "main") + cursor = connection(7687, "main").cursor() + check_roles() + + # 4/ + # We leave some time for the main to recover. + time.sleep(2) + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert actual_data == expected_data + + # 5/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_2;") + + # 6/ + execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic', value:42})") + interactive_mg_runner.kill(CONFIGURATION, "main") + time.sleep(2) + + interactive_mg_runner.start(CONFIGURATION, "main") + cursor = connection(7687, "main").cursor() + check_roles() + + # 7/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 1 + for index in (1, 3, 4): + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 2, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 2, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert actual_data == expected_data + + # Replica_2 was dropped, we check it does not have the data from main. + assert len(interactive_mg_runner.MEMGRAPH_INSTANCES["replica_2"].query(QUERY_TO_CHECK)) == 0 + + # 8/ + interactive_mg_runner.kill(CONFIGURATION, "replica_3") + + # 9/ + execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again', value:43})") + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 2 + + # 10/ + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';") + interactive_mg_runner.start(CONFIGURATION, "replica_3") + + time.sleep(2) + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 6, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert actual_data == expected_data + for index in (1, 2, 3, 4): + assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main + + # 11/ + interactive_mg_runner.kill(CONFIGURATION, "replica_1") + time.sleep(1) + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert actual_data == expected_data + + # 12/ + execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again_again', value:44})") + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 3 + for index in (2, 3, 4): + assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main + + # 13/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("replica_2", "127.0.0.1:10002", "sync", 9, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 9, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 9, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + assert actual_data == expected_data + + +def test_conflict_at_startup(connection): + # Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected. + # main_1 and main_2 have different directory. + + data_directory1 = tempfile.TemporaryDirectory() + data_directory2 = tempfile.TemporaryDirectory() + CONFIGURATION = { + "main_1": { + "args": ["--bolt-port", "7687", "--log-level=TRACE"], + "log_file": "main1.log", + "setup_queries": [], + "data_directory": f"{data_directory1.name}", + }, + "main_2": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "main2.log", + "setup_queries": [], + "data_directory": f"{data_directory2.name}", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + cursor_1 = connection(7687, "main_1").cursor() + cursor_2 = connection(7688, "main_2").cursor() + + assert execute_and_fetch_all(cursor_1, "SHOW REPLICATION ROLE;")[0][0] == "main" + assert execute_and_fetch_all(cursor_2, "SHOW REPLICATION ROLE;")[0][0] == "main" + + +def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection): + # Goal of this test is to check the recovery of main. + # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1/ We check that all replicas have the correct state: they should all be ready. + # 2/ We kill main then kill a replica. + # 3/ We re-start main: it should be able to restart. + # 4/ Check status of replica: replica_2 is invalid. + + data_directory = tempfile.TemporaryDirectory() + CONFIGURATION = { + "replica_1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "replica_2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory.name}", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + + # We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration. + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';") + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';") + + # 1/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + + assert actual_data == expected_data + + def check_roles(): + assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0] + for index in range(1, 2): + assert ( + "replica" + == interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0] + ) + + check_roles() + + # 2/ + interactive_mg_runner.kill(CONFIGURATION, "main") + interactive_mg_runner.kill(CONFIGURATION, "replica_2") + time.sleep(2) + + # 3/ + interactive_mg_runner.start(CONFIGURATION, "main") + + # 4/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data if __name__ == "__main__": diff --git a/tests/e2e/server/workloads.yaml b/tests/e2e/server/workloads.yaml index db956e701..435d4722c 100644 --- a/tests/e2e/server/workloads.yaml +++ b/tests/e2e/server/workloads.yaml @@ -4,7 +4,7 @@ bolt_port: &bolt_port "7687" template_cluster: &template_cluster cluster: server: - args: ["--bolt-port=7687", "--log-level=TRACE", "--"] + args: ["--bolt-port=7687", "--log-level=TRACE"] log_file: "server-connection-e2e.log" template_cluster_ssl: &template_cluster_ssl cluster: @@ -18,7 +18,6 @@ template_cluster_ssl: &template_cluster_ssl *cert_file, "--bolt-key-file", *key_file, - "--", ] log_file: "server-connection-ssl-e2e.log" ssl: true diff --git a/tests/jepsen/src/jepsen/memgraph/support.clj b/tests/jepsen/src/jepsen/memgraph/support.clj index 42e5f4ab8..0689f858e 100644 --- a/tests/jepsen/src/jepsen/memgraph/support.clj +++ b/tests/jepsen/src/jepsen/memgraph/support.clj @@ -25,7 +25,8 @@ :--storage-recover-on-startup :--storage-wal-enabled :--storage-snapshot-interval-sec 300 - :--storage-properties-on-edges)) + :--storage-properties-on-edges + :--storage-restore-replicas-on-startup false)) (defn stop-node! [test node] diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 1a6ae24a1..fbd7481bf 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -22,44 +22,43 @@ function(_add_unit_test test_cpp custom_main) set(target_name ${test_prefix}${exec_name}) set(source_files - ${test_cpp} - ${ARGN}) + ${test_cpp} + ${ARGN}) if(NOT ${custom_main}) set(source_files - ${source_files} - ${memgraph_unit_main}) + ${source_files} + ${memgraph_unit_main}) endif() add_executable(${target_name} ${source_files}) + # OUTPUT_NAME sets the real name of a target when it is built and can be # used to help create two targets of the same name even though CMake # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) target_link_libraries(${target_name} mg-memory mg-utils gtest gmock Threads::Threads dl) + # register test if(TEST_COVERAGE) add_test(${target_name} env LLVM_PROFILE_FILE=${exec_name}.profraw ./${exec_name}) else() add_test(${target_name} ${exec_name}) endif() + # add to memgraph__unit target add_dependencies(memgraph__unit ${target_name}) endfunction(_add_unit_test) - # Test utilities - add_library(storage_test_utils storage_test_utils.cpp) target_link_libraries(storage_test_utils mg-storage-v2) - # Test integrations-kafka - add_library(kafka-mock STATIC kafka_mock.cpp) target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads gtest) -# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests +# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp) target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-integrations-kafka) @@ -70,7 +69,6 @@ add_unit_test(mgp_trans_c_api.cpp) target_link_libraries(${test_prefix}mgp_trans_c_api mg-query) # Test mg-query - add_unit_test(bfs_single_node.cpp) target_link_libraries(${test_prefix}bfs_single_node mg-query) @@ -134,7 +132,6 @@ add_unit_test(query_function_mgp_module.cpp) target_link_libraries(${test_prefix}query_function_mgp_module mg-query) target_include_directories(${test_prefix}query_function_mgp_module PRIVATE ${CMAKE_SOURCE_DIR}/include) - # Test query/procedure add_unit_test(query_procedure_mgp_type.cpp) target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query) @@ -151,8 +148,8 @@ target_include_directories(${test_prefix}query_procedure_py_module PRIVATE ${CMA add_unit_test(query_procedures_mgp_graph.cpp) target_link_libraries(${test_prefix}query_procedures_mgp_graph mg-query storage_test_utils) target_include_directories(${test_prefix}query_procedures_mgp_graph PRIVATE ${CMAKE_SOURCE_DIR}/include) -# END query/procedure +# END query/procedure add_unit_test(query_profile.cpp) target_link_libraries(${test_prefix}query_profile mg-query) @@ -171,9 +168,7 @@ target_link_libraries(${test_prefix}stripped mg-query) add_unit_test(typed_value.cpp) target_link_libraries(${test_prefix}typed_value mg-query) - # Test mg-communication - add_unit_test(bolt_chunked_decoder_buffer.cpp) target_link_libraries(${test_prefix}bolt_chunked_decoder_buffer mg-communication) @@ -195,21 +190,15 @@ target_link_libraries(${test_prefix}communication_buffer mg-communication mg-uti add_unit_test(network_timeouts.cpp) target_link_libraries(${test_prefix}network_timeouts mg-communication) - # Test mg-kvstore - add_unit_test(kvstore.cpp) target_link_libraries(${test_prefix}kvstore mg-kvstore mg-utils) - # Test data structures - add_unit_test(ring_buffer.cpp) target_link_libraries(${test_prefix}ring_buffer mg-utils) - # Test mg-io - add_unit_test(network_endpoint.cpp) target_link_libraries(${test_prefix}network_endpoint mg-io) @@ -219,9 +208,7 @@ target_link_libraries(${test_prefix}network_utils mg-io) add_unit_test(socket.cpp) target_link_libraries(${test_prefix}socket mg-io) - # Test mg-utils - add_unit_test(utils_algorithm.cpp) target_link_libraries(${test_prefix}utils_algorithm mg-utils) @@ -289,7 +276,6 @@ add_unit_test(utils_temporal utils_temporal.cpp) target_link_libraries(${test_prefix}utils_temporal mg-utils) # Test mg-storage-v2 - add_unit_test(commit_log_v2.cpp) target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2) @@ -332,40 +318,36 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt) add_unit_test(storage_v2_isolation_level.cpp) target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2) +add_unit_test(replication_persistence_helper.cpp) +target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) + # Test mg-auth - -if (MG_ENTERPRISE) -add_unit_test(auth.cpp) -target_link_libraries(${test_prefix}auth mg-auth mg-license) +if(MG_ENTERPRISE) + add_unit_test(auth.cpp) + target_link_libraries(${test_prefix}auth mg-auth mg-license) endif() - # Test mg-slk - -if (MG_ENTERPRISE) -add_unit_test(slk_advanced.cpp) -target_link_libraries(${test_prefix}slk_advanced mg-storage-v2) +if(MG_ENTERPRISE) + add_unit_test(slk_advanced.cpp) + target_link_libraries(${test_prefix}slk_advanced mg-storage-v2) endif() -if (MG_ENTERPRISE) -add_unit_test(slk_core.cpp) -target_link_libraries(${test_prefix}slk_core mg-slk gflags fmt) +if(MG_ENTERPRISE) + add_unit_test(slk_core.cpp) + target_link_libraries(${test_prefix}slk_core mg-slk gflags fmt) -add_unit_test(slk_streams.cpp) -target_link_libraries(${test_prefix}slk_streams mg-slk gflags fmt) + add_unit_test(slk_streams.cpp) + target_link_libraries(${test_prefix}slk_streams mg-slk gflags fmt) endif() - # Test mg-rpc - -if (MG_ENTERPRISE) -add_unit_test(rpc.cpp) -target_link_libraries(${test_prefix}rpc mg-rpc) +if(MG_ENTERPRISE) + add_unit_test(rpc.cpp) + target_link_libraries(${test_prefix}rpc mg-rpc) endif() - # Test LCP - add_custom_command( OUTPUT test_lcp DEPENDS ${lcp_src_files} lcp test_lcp.lisp diff --git a/tests/unit/replication_persistence_helper.cpp b/tests/unit/replication_persistence_helper.cpp new file mode 100644 index 000000000..ffe125cd5 --- /dev/null +++ b/tests/unit/replication_persistence_helper.cpp @@ -0,0 +1,91 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/replication/replication_persistence_helper.hpp" +#include "formatters.hpp" +#include "utils/logging.hpp" + +#include <gtest/gtest.h> +#include <fstream> +#include <iostream> +#include <optional> +#include <string> + +class ReplicationPersistanceHelperTest : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} + + memgraph::storage::replication::ReplicaStatus CreateReplicaStatus( + std::string name, std::string ip_address, uint16_t port, + memgraph::storage::replication::ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency, + std::optional<memgraph::storage::replication::ReplicationClientConfig::SSL> ssl) const { + return memgraph::storage::replication::ReplicaStatus{.name = name, + .ip_address = ip_address, + .port = port, + .sync_mode = sync_mode, + .replica_check_frequency = replica_check_frequency, + .ssl = ssl}; + } + + static_assert( + sizeof(memgraph::storage::replication::ReplicaStatus) == 152, + "Most likely you modified ReplicaStatus without updating the tests. Please modify CreateReplicaStatus. "); +}; + +TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesInitialized) { + auto replicas_status = CreateReplicaStatus( + "name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1), + memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}); + + auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( + memgraph::storage::replication::ReplicaStatus(replicas_status)); + auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + + ASSERT_EQ(replicas_status, *replicas_status_converted); +} + +TEST_F(ReplicationPersistanceHelperTest, BasicTestOnlyMandatoryAttributesInitialized) { + auto replicas_status = + CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, + std::chrono::seconds(1), std::nullopt); + + auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( + memgraph::storage::replication::ReplicaStatus(replicas_status)); + auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + + ASSERT_EQ(replicas_status, *replicas_status_converted); +} + +TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButSSLInitialized) { + auto replicas_status = + CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, + std::chrono::seconds(1), std::nullopt); + + auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( + memgraph::storage::replication::ReplicaStatus(replicas_status)); + auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + + ASSERT_EQ(replicas_status, *replicas_status_converted); +} + +TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButTimeoutInitialized) { + auto replicas_status = CreateReplicaStatus( + "name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1), + memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}); + + auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( + memgraph::storage::replication::ReplicaStatus(replicas_status)); + auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + + ASSERT_EQ(replicas_status, *replicas_status_converted); +} diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 75b8a0b27..e453ae17e 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -39,6 +39,10 @@ class ReplicationTest : public ::testing::Test { .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, }}; + const std::string local_host = ("127.0.0.1"); + const std::array<uint16_t, 2> ports{10000, 20000}; + const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"}; + private: void Clear() { if (!std::filesystem::exists(storage_directory)) return; @@ -50,11 +54,12 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { memgraph::storage::Storage main_store(configuration); memgraph::storage::Storage replica_store(configuration); - replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000}); + replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA", memgraph::io::network::Endpoint{"127.0.0.1", 10000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); // vertex create @@ -268,22 +273,24 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { .storage_directory = storage_directory, .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); memgraph::storage::Storage replica_store2( {.durability = { .storage_directory = storage_directory, .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000}); + replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 20000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); const auto *vertex_label = "label"; @@ -314,7 +321,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { check_replica(&replica_store1); check_replica(&replica_store2); - main_store.UnregisterReplica("REPLICA2"); + main_store.UnregisterReplica(replicas[1]); { auto acc = main_store.Access(); auto v = acc.CreateVertex(); @@ -415,16 +422,17 @@ TEST_F(ReplicationTest, RecoveryProcess) { .storage_directory = replica_storage_directory, .snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}}); - replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000}); + replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); - ASSERT_EQ(main_store.GetReplicaState("REPLICA1"), memgraph::storage::replication::ReplicaState::RECOVERY); + ASSERT_EQ(main_store.GetReplicaState(replicas[0]), memgraph::storage::replication::ReplicaState::RECOVERY); - while (main_store.GetReplicaState("REPLICA1") != memgraph::storage::replication::ReplicaState::READY) { + while (main_store.GetReplicaState(replicas[0]) != memgraph::storage::replication::ReplicaState::READY) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -484,11 +492,12 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { memgraph::storage::Storage replica_store_async(configuration); - replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000}); + replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{"127.0.0.1", 20000}, - memgraph::storage::replication::ReplicationMode::ASYNC) + .RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{local_host, ports[1]}, + memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); static constexpr size_t vertices_create_num = 10; @@ -524,20 +533,22 @@ TEST_F(ReplicationTest, EpochTest) { memgraph::storage::Storage replica_store1(configuration); - replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); memgraph::storage::Storage replica_store2(configuration); - replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10001}); + replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, 10001}); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); ASSERT_FALSE(main_store - .RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); std::optional<memgraph::storage::Gid> vertex_gid; @@ -560,13 +571,14 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc.Commit().HasError()); } - main_store.UnregisterReplica("REPLICA1"); - main_store.UnregisterReplica("REPLICA2"); + main_store.UnregisterReplica(replicas[0]); + main_store.UnregisterReplica(replicas[1]); replica_store1.SetMainReplicationRole(); ASSERT_FALSE(replica_store1 - .RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); { @@ -588,10 +600,11 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc.Commit().HasError()); } - replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); ASSERT_TRUE(main_store - .RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000}, - memgraph::storage::replication::ReplicationMode::SYNC) + .RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); { @@ -615,25 +628,27 @@ TEST_F(ReplicationTest, ReplicationInformation) { memgraph::storage::Storage replica_store1(configuration); - const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001}; + const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; replica_store1.SetReplicaRole(replica1_endpoint); - const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002}; + const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002}; memgraph::storage::Storage replica_store2(configuration); replica_store2.SetReplicaRole(replica2_endpoint); - const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE( - main_store - .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) - .HasError()); + const std::string replica1_name{replicas[0]}; + ASSERT_FALSE(main_store + .RegisterReplica(replica1_name, replica1_endpoint, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .HasError()); - const std::string replica2_name{"REPLICA2"}; - ASSERT_FALSE( - main_store - .RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC) - .HasError()); + const std::string replica2_name{replicas[1]}; + ASSERT_FALSE(main_store + .RegisterReplica(replica2_name, replica2_endpoint, + memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .HasError()); ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::ReplicationRole::MAIN); ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA); @@ -660,25 +675,27 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { memgraph::storage::Storage replica_store1(configuration); - const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001}; + const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; replica_store1.SetReplicaRole(replica1_endpoint); - const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002}; + const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002}; memgraph::storage::Storage replica_store2(configuration); replica_store2.SetReplicaRole(replica2_endpoint); - const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE( - main_store - .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) - .HasError()); + const std::string replica1_name{replicas[0]}; + ASSERT_FALSE(main_store + .RegisterReplica(replica1_name, replica1_endpoint, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .HasError()); - const std::string replica2_name{"REPLICA1"}; - ASSERT_TRUE( - main_store - .RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC) - .GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS); + const std::string replica2_name{replicas[0]}; + ASSERT_TRUE(main_store + .RegisterReplica(replica2_name, replica2_endpoint, + memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS); } TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { @@ -686,23 +703,126 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { memgraph::storage::Storage replica_store1(configuration); - const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001}; + const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001}; replica_store1.SetReplicaRole(replica1_endpoint); - const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10001}; + const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10001}; memgraph::storage::Storage replica_store2(configuration); replica_store2.SetReplicaRole(replica2_endpoint); - const std::string replica1_name{"REPLICA1"}; - ASSERT_FALSE( - main_store - .RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC) - .HasError()); + const std::string replica1_name{replicas[0]}; + ASSERT_FALSE(main_store + .RegisterReplica(replica1_name, replica1_endpoint, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .HasError()); - const std::string replica2_name{"REPLICA2"}; - ASSERT_TRUE( - main_store - .RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC) - .GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS); + const std::string replica2_name{replicas[1]}; + ASSERT_TRUE(main_store + .RegisterReplica(replica2_name, replica2_endpoint, + memgraph::storage::replication::ReplicationMode::ASYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS); +} + +TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) { + auto main_config = configuration; + main_config.durability.restore_replicas_on_startup = true; + auto main_store = std::make_unique<memgraph::storage::Storage>(main_config); + + memgraph::storage::Storage replica_store1(configuration); + replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); + + memgraph::storage::Storage replica_store2(configuration); + replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}); + + auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID); + ASSERT_FALSE(res.HasError()); + res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID); + ASSERT_FALSE(res.HasError()); + + auto replica_infos = main_store->ReplicasInfo(); + + ASSERT_EQ(replica_infos.size(), 2); + ASSERT_EQ(replica_infos[0].name, replicas[0]); + ASSERT_EQ(replica_infos[0].endpoint.address, local_host); + ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]); + ASSERT_EQ(replica_infos[1].name, replicas[1]); + ASSERT_EQ(replica_infos[1].endpoint.address, local_host); + ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]); + + main_store.reset(); + + auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config); + replica_infos = other_main_store->ReplicasInfo(); + ASSERT_EQ(replica_infos.size(), 2); + ASSERT_EQ(replica_infos[0].name, replicas[0]); + ASSERT_EQ(replica_infos[0].endpoint.address, local_host); + ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]); + ASSERT_EQ(replica_infos[1].name, replicas[1]); + ASSERT_EQ(replica_infos[1].endpoint.address, local_host); + ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]); +} + +TEST_F(ReplicationTest, RestoringReplicationAtStartup) { + auto main_config = configuration; + main_config.durability.restore_replicas_on_startup = true; + auto main_store = std::make_unique<memgraph::storage::Storage>(main_config); + memgraph::storage::Storage replica_store1(configuration); + replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]}); + + memgraph::storage::Storage replica_store2(configuration); + replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]}); + + auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID); + ASSERT_FALSE(res.HasError()); + res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID); + ASSERT_FALSE(res.HasError()); + + auto replica_infos = main_store->ReplicasInfo(); + + ASSERT_EQ(replica_infos.size(), 2); + ASSERT_EQ(replica_infos[0].name, replicas[0]); + ASSERT_EQ(replica_infos[0].endpoint.address, local_host); + ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]); + ASSERT_EQ(replica_infos[1].name, replicas[1]); + ASSERT_EQ(replica_infos[1].endpoint.address, local_host); + ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]); + + const auto unregister_res = main_store->UnregisterReplica(replicas[0]); + ASSERT_TRUE(unregister_res); + + replica_infos = main_store->ReplicasInfo(); + ASSERT_EQ(replica_infos.size(), 1); + ASSERT_EQ(replica_infos[0].name, replicas[1]); + ASSERT_EQ(replica_infos[0].endpoint.address, local_host); + ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]); + + main_store.reset(); + + auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config); + replica_infos = other_main_store->ReplicasInfo(); + ASSERT_EQ(replica_infos.size(), 1); + ASSERT_EQ(replica_infos[0].name, replicas[1]); + ASSERT_EQ(replica_infos[0].endpoint.address, local_host); + ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]); +} + +TEST_F(ReplicationTest, AddingInvalidReplica) { + memgraph::storage::Storage main_store(configuration); + + ASSERT_TRUE(main_store + .RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]}, + memgraph::storage::replication::ReplicationMode::SYNC, + memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) + .GetError() == memgraph::storage::Storage::RegisterReplicaError::CONNECTION_FAILED); }