diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 143ac102f..a2dc0aef2 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -336,53 +336,6 @@ jobs: # multiple paths could be defined build/logs - experimental_build_ha: - name: "High availability build" - runs-on: [self-hosted, Linux, X64, Diff] - env: - THREADS: 24 - MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} - MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} - - steps: - - name: Set up repository - uses: actions/checkout@v4 - with: - # Number of commits to fetch. `0` indicates all history for all - # branches and tags. (default: 1) - fetch-depth: 0 - - - name: Build release binaries - run: | - source /opt/toolchain-v4/activate - ./init - cd build - cmake -DCMAKE_BUILD_TYPE=Release -DMG_EXPERIMENTAL_HIGH_AVAILABILITY=ON .. - make -j$THREADS - - name: Run unit tests - run: | - source /opt/toolchain-v4/activate - cd build - ctest -R memgraph__unit --output-on-failure -j$THREADS - - name: Run e2e tests - if: false - run: | - cd tests - ./setup.sh /opt/toolchain-v4/activate - source ve3/bin/activate_e2e - cd e2e - ./run.sh "Coordinator" - ./run.sh "Client initiated failover" - ./run.sh "Uninitialized cluster" - - name: Save test data - uses: actions/upload-artifact@v4 - if: always() - with: - name: "Test data(High availability build)" - path: | - # multiple paths could be defined - build/logs - release_jepsen_test: name: "Release Jepsen Test" runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl] diff --git a/CMakeLists.txt b/CMakeLists.txt index 3812cc86d..85e2b085c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -271,18 +271,6 @@ endif() set(libs_dir ${CMAKE_SOURCE_DIR}/libs) add_subdirectory(libs EXCLUDE_FROM_ALL) -option(MG_EXPERIMENTAL_HIGH_AVAILABILITY "Feature flag for experimental high availability" OFF) - -if (NOT MG_ENTERPRISE AND MG_EXPERIMENTAL_HIGH_AVAILABILITY) - set(MG_EXPERIMENTAL_HIGH_AVAILABILITY OFF) - message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY can only be used with enterpise version of the code.") -endif () - -if (MG_EXPERIMENTAL_HIGH_AVAILABILITY) - add_compile_definitions(MG_EXPERIMENTAL_HIGH_AVAILABILITY) -endif () - -# Optional subproject configuration ------------------------------------------- option(TEST_COVERAGE "Generate coverage reports from running memgraph" OFF) option(TOOLS "Build tools binaries" ON) option(QUERY_MODULES "Build query modules containing custom procedures" ON) diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 3150a6c02..936d7a5c2 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -11,7 +11,6 @@ target_sources(mg-coordination include/coordination/coordinator_slk.hpp include/coordination/coordinator_instance.hpp include/coordination/coordinator_handlers.hpp - include/coordination/constants.hpp include/coordination/instance_status.hpp include/coordination/replication_instance.hpp include/coordination/raft_state.hpp diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index ff534b549..f605069fe 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -132,7 +132,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa // registering replicas for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) { - auto instance_client = replication_handler.RegisterReplica(config, false); + auto instance_client = replication_handler.RegisterReplica(config); if (instance_client.HasError()) { using enum memgraph::replication::RegisterReplicaError; switch (instance_client.GetError()) { diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 1bbcf4f8f..166b1e8b8 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -186,11 +186,9 @@ auto CoordinatorInstance::TryFailover() -> void { } } - // TODO: (andi) fmap compliant - ReplicationClientsInfo repl_clients_info; - repl_clients_info.reserve(repl_instances_.size() - 1); - std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), - std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); + auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | + ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::to<ReplicationClientsInfo>(); if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { spdlog::warn("Failover failed since promoting replica to main failed!"); diff --git a/src/coordination/include/coordination/constants.hpp b/src/coordination/include/coordination/constants.hpp deleted file mode 100644 index 819b9fa05..000000000 --- a/src/coordination/include/coordination/constants.hpp +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -namespace memgraph::coordination { - -#ifdef MG_EXPERIMENTAL_HIGH_AVAILABILITY -constexpr bool allow_ha = true; -#else -constexpr bool allow_ha = false; -#endif - -} // namespace memgraph::coordination diff --git a/src/flags/experimental.cpp b/src/flags/experimental.cpp index 7bd26a837..123903c96 100644 --- a/src/flags/experimental.cpp +++ b/src/flags/experimental.cpp @@ -19,13 +19,14 @@ // Bolt server flags. // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_string(experimental_enabled, "", - "Experimental features to be used, comma seperated. Options [system-replication]"); + "Experimental features to be used, comma seperated. Options [system-replication, high-availability]"); using namespace std::string_view_literals; namespace memgraph::flags { -auto const mapping = std::map{std::pair{"system-replication"sv, Experiments::SYSTEM_REPLICATION}}; +auto const mapping = std::map{std::pair{"system-replication"sv, Experiments::SYSTEM_REPLICATION}, + std::pair{"high-availability"sv, Experiments::HIGH_AVAILABILITY}}; auto ExperimentsInstance() -> Experiments & { static auto instance = Experiments{}; diff --git a/src/flags/experimental.hpp b/src/flags/experimental.hpp index ec4db2037..5a19889fe 100644 --- a/src/flags/experimental.hpp +++ b/src/flags/experimental.hpp @@ -23,6 +23,7 @@ namespace memgraph::flags { // old experiments can be reused once code cleanup has happened enum class Experiments : uint8_t { SYSTEM_REPLICATION = 1 << 0, + HIGH_AVAILABILITY = 1 << 1, }; bool AreExperimentsEnabled(Experiments experiments); diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index c7ccbb1ef..9fed81d7e 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -109,7 +109,6 @@ #include "utils/variant_helpers.hpp" #ifdef MG_ENTERPRISE -#include "coordination/constants.hpp" #include "flags/experimental.hpp" #endif @@ -370,7 +369,7 @@ class ReplQueryHandler { .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}; - const auto error = handler_->TryRegisterReplica(replication_config, true).HasError(); + const auto error = handler_->TryRegisterReplica(replication_config).HasError(); if (error) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); @@ -1131,17 +1130,21 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Parameters ¶meters, coordination::CoordinatorState *coordinator_state, const query::InterpreterConfig &config, std::vector<Notification> *notifications) { + using enum memgraph::flags::Experiments; + + if (!license::global_license_checker.IsEnterpriseValidFast()) { + throw QueryRuntimeException("High availability is only available in Memgraph Enterprise."); + } + + if (!flags::AreExperimentsEnabled(HIGH_AVAILABILITY)) { + throw QueryRuntimeException( + "High availability is experimental feature. If you want to use it, add high-availability option to the " + "--experimental-enabled flag."); + } + Callback callback; switch (coordinator_query->action_) { case CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can add coordinator instance!"); } @@ -1165,15 +1168,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return callback; } case CoordinatorQuery::Action::REGISTER_INSTANCE: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } - - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } @@ -1205,15 +1199,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return callback; } case CoordinatorQuery::Action::UNREGISTER_INSTANCE: - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } - - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } @@ -1229,14 +1214,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return callback; case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } @@ -1254,14 +1231,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return callback; } case CoordinatorQuery::Action::SHOW_INSTANCES: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES."); } diff --git a/src/query/replication_query_handler.hpp b/src/query/replication_query_handler.hpp index 011548bd4..f2e0f8b19 100644 --- a/src/query/replication_query_handler.hpp +++ b/src/query/replication_query_handler.hpp @@ -53,10 +53,10 @@ struct ReplicationQueryHandler { const std::optional<utils::UUID> &main_uuid) = 0; // as MAIN, define and connect to REPLICAs - virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) + virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> utils::BasicResult<RegisterReplicaError> = 0; - virtual auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) + virtual auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> utils::BasicResult<RegisterReplicaError> = 0; // as MAIN, remove a REPLICA connection diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index 3e8e21265..0e1d15148 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -123,10 +123,10 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { const std::optional<utils::UUID> &main_uuid) override; // as MAIN, define and connect to REPLICAs - auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) + auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override; - auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) + auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override; // as MAIN, remove a REPLICA connection @@ -145,8 +145,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplicaUUID() -> std::optional<utils::UUID>; private: - template <bool AllowRPCFailure> - auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) + template <bool SendSwapUUID> + auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> { MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); auto maybe_client = repl_state_.RegisterReplica(config); @@ -172,7 +172,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { } const auto main_uuid = std::get<memgraph::replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_; - if (send_swap_uuid) { + if constexpr (SendSwapUUID) { if (!memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_, main_uuid)) { return memgraph::query::RegisterReplicaError::ERROR_ACCEPTING_MAIN; @@ -205,9 +205,6 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) { return false; } - if (state == storage::replication::ReplicaState::MAYBE_BEHIND) { - return AllowRPCFailure; - } return true; }); diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index 747f327e4..0d95cbd51 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -192,12 +192,12 @@ bool ReplicationHandler::SetReplicationRoleMain() { bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, const std::optional<utils::UUID> &main_uuid) { - return SetReplicationRoleReplica_<false>(config, main_uuid); + return SetReplicationRoleReplica_<true>(config, main_uuid); } bool ReplicationHandler::TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, const std::optional<utils::UUID> &main_uuid) { - return SetReplicationRoleReplica_<true>(config, main_uuid); + return SetReplicationRoleReplica_<false>(config, main_uuid); } bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) { @@ -226,16 +226,14 @@ bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) }; // as MAIN, define and connect to REPLICAs -auto ReplicationHandler::TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, - bool send_swap_uuid) +auto ReplicationHandler::TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> { - return RegisterReplica_<true>(config, send_swap_uuid); + return RegisterReplica_<true>(config); } -auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, - bool send_swap_uuid) +auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> { - return RegisterReplica_<false>(config, send_swap_uuid); + return RegisterReplica_<false>(config); } auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::query::UnregisterReplicaResult { diff --git a/src/storage/v2/name_id_mapper.hpp b/src/storage/v2/name_id_mapper.hpp index d1e8293f9..2c5aee352 100644 --- a/src/storage/v2/name_id_mapper.hpp +++ b/src/storage/v2/name_id_mapper.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp index 299e49612..e0714de2a 100644 --- a/src/utils/functional.hpp +++ b/src/utils/functional.hpp @@ -14,14 +14,13 @@ #include <algorithm> #include <vector> +#include <range/v3/view.hpp> + namespace memgraph::utils { -template <class F, class T, class R = typename std::result_of<F(T)>::type, class V = std::vector<R>> -V fmap(F &&f, const std::vector<T> &v) { - V r; - r.reserve(v.size()); - std::ranges::transform(v, std::back_inserter(r), std::forward<F>(f)); - return r; +template <class F, class T, class R = typename std::invoke_result<F, T>::type> +auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> { + return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>(); } } // namespace memgraph::utils diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index b8fee9940..1876074ee 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -56,7 +56,6 @@ add_subdirectory(python_query_modules_reloading) add_subdirectory(analyze_graph) add_subdirectory(transaction_queue) add_subdirectory(mock_api) -#add_subdirectory(graphql) add_subdirectory(disk_storage) add_subdirectory(load_csv) add_subdirectory(init_file_flags) @@ -77,10 +76,7 @@ add_subdirectory(query_modules_storage_modes) add_subdirectory(garbage_collection) add_subdirectory(query_planning) add_subdirectory(awesome_functions) - -if (MG_EXPERIMENTAL_HIGH_AVAILABILITY) - add_subdirectory(high_availability_experimental) -endif () +add_subdirectory(high_availability) add_subdirectory(replication_experimental) diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index de05a5617..558cb63f5 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -228,6 +228,6 @@ startup_config_dict = { "experimental_enabled": ( "", "", - "Experimental features to be used, comma seperated. Options [system-replication]", + "Experimental features to be used, comma seperated. Options [system-replication, high-availability]", ), } diff --git a/tests/e2e/high_availability/CMakeLists.txt b/tests/e2e/high_availability/CMakeLists.txt new file mode 100644 index 000000000..47a1781aa --- /dev/null +++ b/tests/e2e/high_availability/CMakeLists.txt @@ -0,0 +1,15 @@ +find_package(gflags REQUIRED) + +copy_e2e_python_files(high_availability coordinator.py) +copy_e2e_python_files(high_availability single_coordinator.py) +copy_e2e_python_files(high_availability coord_cluster_registration.py) +copy_e2e_python_files(high_availability distributed_coords.py) +copy_e2e_python_files(high_availability disable_writing_on_main_after_restart.py) +copy_e2e_python_files(high_availability manual_setting_replicas.py) +copy_e2e_python_files(high_availability not_replicate_from_old_main.py) +copy_e2e_python_files(high_availability common.py) +copy_e2e_python_files(high_availability workloads.yaml) + +copy_e2e_python_files_from_parent_folder(high_availability ".." memgraph.py) +copy_e2e_python_files_from_parent_folder(high_availability ".." interactive_mg_runner.py) +copy_e2e_python_files_from_parent_folder(high_availability ".." mg_utils.py) diff --git a/tests/e2e/high_availability_experimental/common.py b/tests/e2e/high_availability/common.py similarity index 100% rename from tests/e2e/high_availability_experimental/common.py rename to tests/e2e/high_availability/common.py diff --git a/tests/e2e/high_availability_experimental/coord_cluster_registration.py b/tests/e2e/high_availability/coord_cluster_registration.py similarity index 97% rename from tests/e2e/high_availability_experimental/coord_cluster_registration.py rename to tests/e2e/high_availability/coord_cluster_registration.py index cccaec5ad..8b093b5c4 100644 --- a/tests/e2e/high_availability_experimental/coord_cluster_registration.py +++ b/tests/e2e/high_availability/coord_cluster_registration.py @@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", @@ -44,6 +45,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", @@ -57,6 +59,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", @@ -70,6 +73,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", @@ -81,6 +85,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7691", "--log-level=TRACE", @@ -92,6 +97,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7692", "--log-level=TRACE", diff --git a/tests/e2e/high_availability_experimental/coordinator.py b/tests/e2e/high_availability/coordinator.py similarity index 100% rename from tests/e2e/high_availability_experimental/coordinator.py rename to tests/e2e/high_availability/coordinator.py diff --git a/tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py b/tests/e2e/high_availability/disable_writing_on_main_after_restart.py similarity index 94% rename from tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py rename to tests/e2e/high_availability/disable_writing_on_main_after_restart.py index ad3d95828..53d570a6d 100644 --- a/tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py +++ b/tests/e2e/high_availability/disable_writing_on_main_after_restart.py @@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", @@ -49,6 +50,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", @@ -67,6 +69,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", @@ -85,6 +88,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", @@ -96,6 +100,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7691", "--log-level=TRACE", @@ -107,6 +112,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7692", "--log-level=TRACE", diff --git a/tests/e2e/high_availability_experimental/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py similarity index 94% rename from tests/e2e/high_availability_experimental/distributed_coords.py rename to tests/e2e/high_availability/distributed_coords.py index 052cb6dba..fde8889a5 100644 --- a/tests/e2e/high_availability_experimental/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", @@ -44,6 +45,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", @@ -57,6 +59,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", @@ -70,6 +73,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", @@ -81,6 +85,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7691", "--log-level=TRACE", @@ -92,6 +97,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "coordinator_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7692", "--log-level=TRACE", diff --git a/tests/e2e/high_availability_experimental/manual_setting_replicas.py b/tests/e2e/high_availability/manual_setting_replicas.py similarity index 84% rename from tests/e2e/high_availability_experimental/manual_setting_replicas.py rename to tests/e2e/high_availability/manual_setting_replicas.py index f2d48ffd7..b0b0965bc 100644 --- a/tests/e2e/high_availability_experimental/manual_setting_replicas.py +++ b/tests/e2e/high_availability/manual_setting_replicas.py @@ -14,8 +14,7 @@ import sys import interactive_mg_runner import pytest -from common import execute_and_fetch_all -from mg_utils import mg_sleep_and_assert +from common import connect, execute_and_fetch_all interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) interactive_mg_runner.PROJECT_DIR = os.path.normpath( @@ -26,20 +25,28 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_3": { - "args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"], + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], "log_file": "main.log", "setup_queries": [], }, } -def test_no_manual_setup_on_main(connection): +def test_no_manual_setup_on_main(): # Goal of this test is to check that all manual registration actions are disabled on instances with coordiantor server port # 1 interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - any_main = connection(7687, "instance_3").cursor() + any_main = connect(host="localhost", port=7687).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(any_main, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';") assert str(e.value) == "Can't register replica manually on instance with coordinator server port." diff --git a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py b/tests/e2e/high_availability/not_replicate_from_old_main.py similarity index 79% rename from tests/e2e/high_availability_experimental/not_replicate_from_old_main.py rename to tests/e2e/high_availability/not_replicate_from_old_main.py index b859cae84..201ff7afa 100644 --- a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py +++ b/tests/e2e/high_availability/not_replicate_from_old_main.py @@ -16,7 +16,7 @@ import tempfile import interactive_mg_runner import pytest -from common import execute_and_fetch_all, safe_execute +from common import connect, execute_and_fetch_all from mg_utils import mg_sleep_and_assert interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -28,12 +28,12 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = { "shared_replica": { - "args": ["--bolt-port", "7688", "--log-level", "TRACE"], + "args": ["--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", "TRACE"], "log_file": "replica2.log", "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], }, "main1": { - "args": ["--bolt-port", "7687", "--log-level", "TRACE"], + "args": ["--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", "TRACE"], "log_file": "main.log", "setup_queries": ["REGISTER REPLICA shared_replica SYNC TO '127.0.0.1:10001' ;"], }, @@ -42,12 +42,12 @@ MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = { MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = { "replica": { - "args": ["--bolt-port", "7689", "--log-level", "TRACE"], + "args": ["--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", "TRACE"], "log_file": "replica.log", "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], }, "main_2": { - "args": ["--bolt-port", "7690", "--log-level", "TRACE"], + "args": ["--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level", "TRACE"], "log_file": "main_2.log", "setup_queries": [ "REGISTER REPLICA shared_replica SYNC TO '127.0.0.1:10001' ;", @@ -57,7 +57,7 @@ MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = { } -def test_replication_works_on_failover(connection): +def test_replication_works_on_failover(): # Goal of this test is to check that after changing `shared_replica` # to be part of new cluster, `main` (old cluster) can't write any more to it @@ -65,7 +65,7 @@ def test_replication_works_on_failover(connection): interactive_mg_runner.start_all_keep_others(MEMGRAPH_FIRST_CLUSTER_DESCRIPTION) # 2 - main_cursor = connection(7687, "main1").cursor() + main_cursor = connect(host="localhost", port=7687).cursor() expected_data_on_main = [ ("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"), ] @@ -76,7 +76,7 @@ def test_replication_works_on_failover(connection): interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION) # 4 - new_main_cursor = connection(7690, "main_2").cursor() + new_main_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_replicas(): return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) @@ -88,14 +88,11 @@ def test_replication_works_on_failover(connection): mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) # 5 - shared_replica_cursor = connection(7688, "shared_replica").cursor() + shared_replica_cursor = connect(host="localhost", port=7688).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(main_cursor, "CREATE ();") - assert ( - str(e.value) - == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query." - ) + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) res = execute_and_fetch_all(main_cursor, "MATCH (n) RETURN count(n) as count;")[0][0] assert res == 1, "Vertex should be created" @@ -115,7 +112,7 @@ def test_replication_works_on_failover(connection): interactive_mg_runner.stop_all() -def test_not_replicate_old_main_register_new_cluster(connection): +def test_not_replicate_old_main_register_new_cluster(): # Goal of this test is to check that although replica is registered in one cluster # it can be re-registered to new cluster # This flow checks if Registering replica is idempotent and that old main cannot talk to replica @@ -130,6 +127,7 @@ def test_not_replicate_old_main_register_new_cluster(connection): MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION = { "shared_instance": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", @@ -143,6 +141,7 @@ def test_not_replicate_old_main_register_new_cluster(connection): }, "instance_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", @@ -155,7 +154,14 @@ def test_not_replicate_old_main_register_new_cluster(connection): "setup_queries": [], }, "coordinator_1": { - "args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"], + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], "log_file": "coordinator.log", "setup_queries": [ "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", @@ -170,7 +176,7 @@ def test_not_replicate_old_main_register_new_cluster(connection): # 2 - first_cluster_coord_cursor = connection(7690, "coord_1").cursor() + first_cluster_coord_cursor = connect(host="localhost", port=7690).cursor() def show_repl_cluster(): return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;"))) @@ -188,6 +194,7 @@ def test_not_replicate_old_main_register_new_cluster(connection): MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION = { "instance_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", @@ -200,14 +207,21 @@ def test_not_replicate_old_main_register_new_cluster(connection): "setup_queries": [], }, "coordinator_2": { - "args": ["--bolt-port", "7691", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10112"], + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10112", + ], "log_file": "coordinator.log", "setup_queries": [], }, } interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION) - second_cluster_coord_cursor = connection(7691, "coord_2").cursor() + second_cluster_coord_cursor = connect(host="localhost", port=7691).cursor() execute_and_fetch_all( second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';" ) @@ -230,24 +244,21 @@ def test_not_replicate_old_main_register_new_cluster(connection): mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster) # 5 - main_1_cursor = connection(7689, "main_1").cursor() + main_1_cursor = connect(host="localhost", port=7689).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(main_1_cursor, "CREATE ();") - assert ( - str(e.value) - == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query." - ) + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) - shared_replica_cursor = connection(7688, "shared_replica").cursor() + shared_replica_cursor = connect(host="localhost", port=7688).cursor() res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0] assert res == 0, "Old main should not replicate to 'shared' replica" # 6 - main_2_cursor = connection(7687, "main_2").cursor() + main_2_cursor = connect(host="localhost", port=7687).cursor() execute_and_fetch_all(main_2_cursor, "CREATE ();") - shared_replica_cursor = connection(7688, "shared_replica").cursor() + shared_replica_cursor = connect(host="localhost", port=7688).cursor() res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0] assert res == 1, "New main should replicate to 'shared' replica" diff --git a/tests/e2e/high_availability_experimental/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py similarity index 97% rename from tests/e2e/high_availability_experimental/single_coordinator.py rename to tests/e2e/high_availability/single_coordinator.py index 8e620e7e4..ba6dee3b6 100644 --- a/tests/e2e/high_availability_experimental/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -30,6 +30,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_1": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", @@ -43,6 +44,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_2": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", @@ -56,6 +58,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { }, "instance_3": { "args": [ + "--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", @@ -68,7 +71,14 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "setup_queries": [], }, "coordinator": { - "args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"], + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], "log_file": "coordinator.log", "setup_queries": [ "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", @@ -198,10 +208,7 @@ def test_replication_works_on_replica_instance_restart(): instance_1_cursor = connect(host="localhost", port=7688).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(main_cursor, "CREATE ();") - assert ( - str(e.value) - == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query." - ) + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) res_instance_1 = execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] assert res_instance_1 == 1 diff --git a/tests/e2e/high_availability_experimental/workloads.yaml b/tests/e2e/high_availability/workloads.yaml similarity index 57% rename from tests/e2e/high_availability_experimental/workloads.yaml rename to tests/e2e/high_availability/workloads.yaml index 2159c374c..75f17b2f7 100644 --- a/tests/e2e/high_availability_experimental/workloads.yaml +++ b/tests/e2e/high_availability/workloads.yaml @@ -1,19 +1,19 @@ ha_cluster: &ha_cluster cluster: replica_1: - args: ["--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"] + args: ["--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"] log_file: "replication-e2e-replica1.log" setup_queries: [] replica_2: - args: ["--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"] + args: ["--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"] log_file: "replication-e2e-replica2.log" setup_queries: [] main: - args: ["--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"] + args: ["--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"] log_file: "replication-e2e-main.log" setup_queries: [] coordinator: - args: ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"] + args: ["--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"] log_file: "replication-e2e-coordinator.log" setup_queries: [ "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", @@ -25,29 +25,29 @@ ha_cluster: &ha_cluster workloads: - name: "Coordinator" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/coordinator.py"] + args: ["high_availability/coordinator.py"] <<: *ha_cluster - name: "Single coordinator" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/single_coordinator.py"] + args: ["high_availability/single_coordinator.py"] - name: "Disabled manual setting of replication cluster" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/manual_setting_replicas.py"] + args: ["high_availability/manual_setting_replicas.py"] - name: "Coordinator cluster registration" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/coord_cluster_registration.py"] + args: ["high_availability/coord_cluster_registration.py"] - name: "Not replicate from old main" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/not_replicate_from_old_main.py"] + args: ["high_availability/not_replicate_from_old_main.py"] - name: "Disable writing on main after restart" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/disable_writing_on_main_after_restart.py"] + args: ["high_availability/disable_writing_on_main_after_restart.py"] - name: "Distributed coordinators" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/distributed_coords.py"] + args: ["high_availability/distributed_coords.py"] diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt deleted file mode 100644 index bbef1ebc7..000000000 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -find_package(gflags REQUIRED) - -copy_e2e_python_files(ha_experimental coordinator.py) -copy_e2e_python_files(ha_experimental single_coordinator.py) -copy_e2e_python_files(ha_experimental coord_cluster_registration.py) -copy_e2e_python_files(ha_experimental distributed_coords.py) -copy_e2e_python_files(ha_experimental manual_setting_replicas.py) -copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py) -copy_e2e_python_files(ha_experimental disable_writing_on_main_after_restart.py) -copy_e2e_python_files(ha_experimental common.py) -copy_e2e_python_files(ha_experimental workloads.yaml) - -copy_e2e_python_files_from_parent_folder(ha_experimental ".." memgraph.py) -copy_e2e_python_files_from_parent_folder(ha_experimental ".." interactive_mg_runner.py) -copy_e2e_python_files_from_parent_folder(ha_experimental ".." mg_utils.py) diff --git a/tests/e2e/interactive_mg_runner.py b/tests/e2e/interactive_mg_runner.py index 06908747e..efa4dc3d5 100755 --- a/tests/e2e/interactive_mg_runner.py +++ b/tests/e2e/interactive_mg_runner.py @@ -160,6 +160,12 @@ def kill(context, name, keep_directories=True): MEMGRAPH_INSTANCES.pop(name) +def kill_all(context, keep_directories=True): + for key in MEMGRAPH_INSTANCES.keys(): + MEMGRAPH_INSTANCES[key].kill(keep_directories) + MEMGRAPH_INSTANCES.clear() + + def cleanup_directories_on_exit(value=True): CLEANUP_DIRECTORIES_ON_EXIT = value diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index abd5b5f48..2ddd466d9 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -795,7 +795,7 @@ def test_async_replication_when_main_is_killed(): "data_directory": f"{data_directory_main.name}", }, } - + interactive_mg_runner.kill_all(CONFIGURATION) interactive_mg_runner.start_all(CONFIGURATION) # 1/ @@ -878,7 +878,7 @@ def test_sync_replication_when_main_is_killed(): "data_directory": f"{data_directory_main.name}", }, } - + interactive_mg_runner.kill_all(CONFIGURATION) interactive_mg_runner.start_all(CONFIGURATION) # 1/ @@ -1990,5 +1990,4 @@ def test_replication_not_messed_up_by_ShowIndexInfo(connection): if __name__ == "__main__": - sys.exit(pytest.main([__file__, "-k", "test_basic_recovery"])) sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 64366f331..4ae2101cb 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -149,14 +149,12 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { }, std::nullopt); - const auto ® = main.repl_handler.TryRegisterReplica( - ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true); + const auto ® = main.repl_handler.TryRegisterReplica(ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); ASSERT_FALSE(reg.HasError()) << (int)reg.GetError(); // vertex create @@ -453,24 +451,20 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); const auto *vertex_label = "label"; @@ -604,14 +598,12 @@ TEST_F(ReplicationTest, RecoveryProcess) { }, std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); ASSERT_EQ(main.db.storage()->GetReplicaState(replicas[0]), ReplicaState::RECOVERY); @@ -684,14 +676,12 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = "REPLICA_ASYNC", - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = ports[1], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = "REPLICA_ASYNC", + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); static constexpr size_t vertices_create_num = 10; @@ -742,25 +732,21 @@ TEST_F(ReplicationTest, EpochTest) { std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = 10001, - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); std::optional<Gid> vertex_gid; @@ -789,15 +775,12 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_TRUE(replica1.repl_handler.SetReplicationRoleMain()); ASSERT_FALSE(replica1.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = 10001, - }, - true) - + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); { @@ -826,15 +809,12 @@ TEST_F(ReplicationTest, EpochTest) { }, std::nullopt); ASSERT_TRUE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true) - + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); { @@ -875,27 +855,21 @@ TEST_F(ReplicationTest, ReplicationInformation) { std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = replica1_port, - }, - true) - + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = replica1_port, + }) .HasError()); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = replica2_port, - }, - true) - + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) .HasError()); ASSERT_TRUE(main.repl_state.IsMain()); @@ -939,25 +913,21 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { }, std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = replica1_port, - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = replica1_port, + }) .HasError()); ASSERT_TRUE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = replica2_port, - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) .GetError() == RegisterReplicaError::NAME_EXISTS); } @@ -982,25 +952,21 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { std::nullopt); ASSERT_FALSE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = common_port, - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = common_port, + }) .HasError()); ASSERT_TRUE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = common_port, - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = common_port, + }) .GetError() == RegisterReplicaError::ENDPOINT_EXISTS); } @@ -1038,23 +1004,19 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { }, std::nullopt); - auto res = main->repl_handler.TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true); + auto res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); ASSERT_FALSE(res.HasError()) << (int)res.GetError(); - res = main->repl_handler.TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }, - true); + res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()) << (int)res.GetError(); auto replica_infos = main->db.storage()->ReplicasInfo(); @@ -1103,23 +1065,19 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { .port = ports[1], }, std::nullopt); - auto res = main->repl_handler.TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true); + auto res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); ASSERT_FALSE(res.HasError()); - res = main->repl_handler.TryRegisterReplica( - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }, - true); + res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()); auto replica_infos = main->db.storage()->ReplicasInfo(); @@ -1157,13 +1115,11 @@ TEST_F(ReplicationTest, AddingInvalidReplica) { MinMemgraph main(main_conf); ASSERT_TRUE(main.repl_handler - .TryRegisterReplica( - ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }, - true) + .TryRegisterReplica(ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .GetError() == RegisterReplicaError::ERROR_ACCEPTING_MAIN); }