diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index d39d3e738..7fe4360b6 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(mg-coordination STATIC) add_library(mg::coordination ALIAS mg-coordination) target_sources(mg-coordination PUBLIC - include/coordination/coordinator_client.hpp + include/coordination/replication_instance_client.hpp include/coordination/coordinator_state.hpp include/coordination/coordinator_rpc.hpp include/coordination/coordinator_server.hpp @@ -12,7 +12,7 @@ target_sources(mg-coordination include/coordination/coordinator_instance.hpp include/coordination/coordinator_handlers.hpp include/coordination/instance_status.hpp - include/coordination/replication_instance.hpp + include/coordination/replication_instance_connector.hpp include/coordination/raft_state.hpp include/coordination/rpc_errors.hpp @@ -24,13 +24,13 @@ target_sources(mg-coordination PRIVATE coordinator_communication_config.cpp - coordinator_client.cpp + replication_instance_client.cpp coordinator_state.cpp coordinator_rpc.cpp coordinator_server.cpp coordinator_handlers.cpp coordinator_instance.cpp - replication_instance.cpp + replication_instance_connector.cpp raft_state.cpp coordinator_log_store.cpp diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 1251c5827..f6d2b46b5 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -342,6 +342,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig return RegisterInstanceCoordinatorStatus::LOCK_OPENED; } + // TODO: (andi) Change that this is being asked from raft state if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstanceConnector const &instance) { return instance.InstanceName() == instance_name; @@ -370,8 +371,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig return RegisterInstanceCoordinatorStatus::OPEN_LOCK; } - auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_, - &CoordinatorInstance::ReplicaSuccessCallback, + auto client = std::make_unique(this, config, client_succ_cb_, client_fail_cb_); + + auto *new_instance = &repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback); if (!new_instance->SendDemoteToReplicaRpc()) { diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index a3785c19e..a14b98d09 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -17,7 +17,7 @@ #include "coordination/instance_status.hpp" #include "coordination/raft_state.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" -#include "coordination/replication_instance.hpp" +#include "coordination/replication_instance_connector.hpp" #include "utils/resource_lock.hpp" #include "utils/rw_lock.hpp" #include "utils/thread_pool.hpp" diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/replication_instance_client.hpp similarity index 94% rename from src/coordination/include/coordination/coordinator_client.hpp rename to src/coordination/include/coordination/replication_instance_client.hpp index 870d291ea..00270c9b7 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/replication_instance_client.hpp @@ -30,9 +30,10 @@ using ReplicationClientsInfo = std::vector; class ReplicationInstanceClient { public: explicit ReplicationInstanceClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb); + HealthCheckClientCallback succ_cb = nullptr, + HealthCheckClientCallback fail_cb = nullptr); - ~ReplicationInstanceClient() = default; + virtual ~ReplicationInstanceClient() = default; ReplicationInstanceClient(ReplicationInstanceClient &) = delete; ReplicationInstanceClient &operator=(ReplicationInstanceClient const &) = delete; @@ -49,7 +50,7 @@ class ReplicationInstanceClient { auto CoordinatorSocketAddress() const -> std::string; auto ReplicationSocketAddress() const -> std::string; - [[nodiscard]] auto DemoteToReplica() const -> bool; + virtual auto DemoteToReplica() const -> bool; auto SendPromoteReplicaToMainRpc(utils::UUID const &uuid, ReplicationClientsInfo replication_clients_info) const -> bool; diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance_connector.hpp similarity index 86% rename from src/coordination/include/coordination/replication_instance.hpp rename to src/coordination/include/coordination/replication_instance_connector.hpp index 7a93ec14a..9473345d1 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance_connector.hpp @@ -13,8 +13,8 @@ #ifdef MG_ENTERPRISE -#include "coordination/coordinator_client.hpp" #include "coordination/coordinator_exceptions.hpp" +#include "coordination/replication_instance_client.hpp" #include "replication_coordination_glue/role.hpp" #include "utils/resource_lock.hpp" @@ -25,16 +25,13 @@ namespace memgraph::coordination { -class CoordinatorInstance; - using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view); class ReplicationInstanceConnector { public: - ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb, - HealthCheckInstanceCallback succ_instance_cb, - HealthCheckInstanceCallback fail_instance_cb); + explicit ReplicationInstanceConnector(std::unique_ptr client, + HealthCheckInstanceCallback succ_instance_cb = nullptr, + HealthCheckInstanceCallback fail_instance_cb = nullptr); ReplicationInstanceConnector(ReplicationInstanceConnector const &other) = delete; ReplicationInstanceConnector &operator=(ReplicationInstanceConnector const &other) = delete; @@ -46,10 +43,9 @@ class ReplicationInstanceConnector { auto OnFailPing() -> bool; auto IsReadyForUUIDPing() -> bool; - void UpdateReplicaLastResponseUUID(); - auto IsAlive() const -> bool; + // TODO: (andi) Fetch from ClusterState auto InstanceName() const -> std::string; auto CoordinatorSocketAddress() const -> std::string; auto ReplicationSocketAddress() const -> std::string; @@ -82,8 +78,9 @@ class ReplicationInstanceConnector { auto GetSuccessCallback() -> HealthCheckInstanceCallback &; auto GetFailCallback() -> HealthCheckInstanceCallback &; - private: - ReplicationInstanceClient client_; + protected: + auto UpdateReplicaLastResponseUUID() -> void; + std::unique_ptr client_; std::chrono::system_clock::time_point last_response_time_{}; bool is_alive_{false}; std::chrono::system_clock::time_point last_check_of_uuid_{}; diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/replication_instance_client.cpp similarity index 99% rename from src/coordination/coordinator_client.cpp rename to src/coordination/replication_instance_client.cpp index fe21cfd71..295c7adb7 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/replication_instance_client.cpp @@ -12,7 +12,7 @@ #include "utils/uuid.hpp" #ifdef MG_ENTERPRISE -#include "coordination/coordinator_client.hpp" +#include "coordination/replication_instance_client.hpp" #include "coordination/coordinator_communication_config.hpp" #include "coordination/coordinator_rpc.hpp" diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance_connector.cpp similarity index 75% rename from src/coordination/replication_instance.cpp rename to src/coordination/replication_instance_connector.cpp index 1b70ad900..00decf9f3 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance_connector.cpp @@ -11,7 +11,7 @@ #ifdef MG_ENTERPRISE -#include "coordination/replication_instance.hpp" +#include "coordination/replication_instance_connector.hpp" #include @@ -20,44 +20,40 @@ namespace memgraph::coordination { -ReplicationInstanceConnector::ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, - HealthCheckClientCallback fail_cb, +ReplicationInstanceConnector::ReplicationInstanceConnector(std::unique_ptr client, HealthCheckInstanceCallback succ_instance_cb, HealthCheckInstanceCallback fail_instance_cb) - : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), - succ_cb_(succ_instance_cb), - fail_cb_(fail_instance_cb) {} + : client_(std::move(client)), succ_cb_(succ_instance_cb), fail_cb_(fail_instance_cb) {} -auto ReplicationInstanceConnector::OnSuccessPing() -> void { +void ReplicationInstanceConnector::OnSuccessPing() { last_response_time_ = std::chrono::system_clock::now(); is_alive_ = true; } auto ReplicationInstanceConnector::OnFailPing() -> bool { auto elapsed_time = std::chrono::system_clock::now() - last_response_time_; - is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec(); + is_alive_ = elapsed_time < client_->InstanceDownTimeoutSec(); return is_alive_; } auto ReplicationInstanceConnector::IsReadyForUUIDPing() -> bool { return std::chrono::duration_cast(std::chrono::system_clock::now() - last_check_of_uuid_) > - client_.InstanceGetUUIDFrequencySec(); + client_->InstanceGetUUIDFrequencySec(); } -auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_.InstanceName(); } +auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_->InstanceName(); } auto ReplicationInstanceConnector::CoordinatorSocketAddress() const -> std::string { - return client_.CoordinatorSocketAddress(); + return client_->CoordinatorSocketAddress(); } auto ReplicationInstanceConnector::ReplicationSocketAddress() const -> std::string { - return client_.ReplicationSocketAddress(); + return client_->ReplicationSocketAddress(); } auto ReplicationInstanceConnector::IsAlive() const -> bool { return is_alive_; } auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info, HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool { - if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { + if (!client_->SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { return false; } @@ -67,11 +63,11 @@ auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, Re return true; } -auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } +auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_->DemoteToReplica(); } auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool { - if (!client_.DemoteToReplica()) { + if (!client_->DemoteToReplica()) { return false; } @@ -81,19 +77,19 @@ auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback r return true; } -auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_.StartFrequentCheck(); } -auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_.StopFrequentCheck(); } -auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } -auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } +auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_->StartFrequentCheck(); } +auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_->StopFrequentCheck(); } +auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_->PauseFrequentCheck(); } +auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_->ResumeFrequentCheck(); } auto ReplicationInstanceConnector::ReplicationClientInfo() const -> coordination::ReplicationClientInfo { - return client_.ReplicationClientInfo(); + return client_->ReplicationClientInfo(); } auto ReplicationInstanceConnector::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; } auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; } -auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return client_; } +auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; } auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } auto ReplicationInstance::GetMainUUID() const -> std::optional const & { return main_uuid_; } @@ -117,18 +113,22 @@ auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID c } auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool { - return replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid); + if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { + return false; + } + SetNewMainUUID(new_main_uuid); + return true; } auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool { - return client_.SendUnregisterReplicaRpc(instance_name); + return client_->SendUnregisterReplicaRpc(instance_name); } -auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); } +auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_->SendEnableWritingOnMainRpc(); } auto ReplicationInstanceConnector::SendGetInstanceUUID() -> utils::BasicResult> { - return client_.SendGetInstanceUUIDRpc(); + return client_->SendGetInstanceUUIDRpc(); } void ReplicationInstanceConnector::UpdateReplicaLastResponseUUID() { diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 86fc17ed8..f26dfc02a 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -473,3 +473,10 @@ add_unit_test(coordinator_instance.cpp) target_link_libraries(${test_prefix}coordinator_instance gflags mg-coordination mg-repl_coord_glue) target_include_directories(${test_prefix}coordinator_instance PRIVATE ${CMAKE_SOURCE_DIR}/include) endif() + +# Test replication instance connector +if(MG_ENTERPRISE) +add_unit_test(replication_instance_connector.cpp) +target_link_libraries(${test_prefix}replication_instance_connector gflags mg-coordination mg-repl_coord_glue) +target_include_directories(${test_prefix}replication_instance_connector PRIVATE ${CMAKE_SOURCE_DIR}/include) +endif() diff --git a/tests/unit/coordinator_instance.cpp b/tests/unit/coordinator_instance.cpp index 631352ab6..f339d0979 100644 --- a/tests/unit/coordinator_instance.cpp +++ b/tests/unit/coordinator_instance.cpp @@ -22,6 +22,7 @@ #include "utils/file.hpp" #include +#include #include #include "json/json.hpp" @@ -29,13 +30,29 @@ using memgraph::coordination::CoordinatorInstance; using memgraph::coordination::CoordinatorInstanceInitConfig; using memgraph::coordination::CoordinatorToCoordinatorConfig; using memgraph::coordination::CoordinatorToReplicaConfig; +using memgraph::coordination::HealthCheckClientCallback; +using memgraph::coordination::HealthCheckInstanceCallback; using memgraph::coordination::RaftState; +using memgraph::coordination::RegisterInstanceCoordinatorStatus; using memgraph::coordination::ReplicationClientInfo; +using memgraph::coordination::ReplicationInstanceClient; +using memgraph::coordination::ReplicationInstanceConnector; using memgraph::io::network::Endpoint; using memgraph::replication::ReplicationHandler; using memgraph::replication_coordination_glue::ReplicationMode; using memgraph::storage::Config; +using testing::_; + +class ReplicationInstanceClientMock : public ReplicationInstanceClient { + public: + ReplicationInstanceClientMock(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config) + : ReplicationInstanceClient(coord_instance, config, nullptr, nullptr) { + ON_CALL(*this, DemoteToReplica()).WillByDefault(testing::Return(true)); + } + MOCK_METHOD0(DemoteToReplica, bool()); +}; + class CoordinatorInstanceTest : public ::testing::Test { protected: void SetUp() override {} @@ -46,6 +63,27 @@ class CoordinatorInstanceTest : public ::testing::Test { "MG_tests_unit_coordinator_instance"}; }; +TEST_F(CoordinatorInstanceTest, RegisterReplicationInstance) { + auto const init_config = + CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686}; + auto instance1 = CoordinatorInstance{init_config}; + + auto const coord_to_replica_config = + CoordinatorToReplicaConfig{.instance_name = "instance3", + .mgt_server = Endpoint{"127.0.0.1", 10112}, + .bolt_server = Endpoint{"127.0.0.1", 7687}, + .replication_client_info = {.instance_name = "instance_name", + .replication_mode = ReplicationMode::ASYNC, + .replication_server = Endpoint{"127.0.0.1", 10001}}, + .instance_health_check_frequency_sec = std::chrono::seconds{1}, + .instance_down_timeout_sec = std::chrono::seconds{5}, + .instance_get_uuid_frequency_sec = std::chrono::seconds{10}, + .ssl = std::nullopt}; + + auto status = instance1.RegisterReplicationInstance(coord_to_replica_config); + EXPECT_EQ(status, RegisterInstanceCoordinatorStatus::SUCCESS); +} + TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) { auto const init_config = CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686}; diff --git a/tests/unit/replication_instance_connector.cpp b/tests/unit/replication_instance_connector.cpp new file mode 100644 index 000000000..867e3b288 --- /dev/null +++ b/tests/unit/replication_instance_connector.cpp @@ -0,0 +1,51 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "coordination/replication_instance_connector.hpp" +#include "coordination/coordinator_instance.hpp" + +#include "auth/auth.hpp" +#include "flags/run_time_configurable.hpp" +#include "interpreter_faker.hpp" +#include "io/network/endpoint.hpp" +#include "license/license.hpp" +#include "replication_handler/replication_handler.hpp" +#include "storage/v2/config.hpp" + +#include "utils/file.hpp" + +#include +#include +#include +#include "json/json.hpp" + +using memgraph::coordination::CoordinatorInstance; +using memgraph::coordination::CoordinatorToReplicaConfig; +using memgraph::coordination::HealthCheckClientCallback; +using memgraph::coordination::HealthCheckInstanceCallback; +using memgraph::coordination::ReplicationClientInfo; +using memgraph::coordination::ReplicationInstanceClient; +using memgraph::coordination::ReplicationInstanceConnector; +using memgraph::io::network::Endpoint; +using memgraph::replication::ReplicationHandler; +using memgraph::replication_coordination_glue::ReplicationMode; +using memgraph::storage::Config; + +using testing::_; + +class ReplicationInstanceClientMock {}; + +class ReplicationInstanceConnectorTest : public ::testing::Test { + public: + void SetUp() override {} + + void TearDown() override {} +};