diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 44817ccfe..fe21cfd71 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -30,8 +30,10 @@ auto CreateClientContext(memgraph::coordination::CoordinatorToReplicaConfig cons } } // namespace -CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb) +ReplicationInstanceClient::ReplicationInstanceClient(CoordinatorInstance *coord_instance, + CoordinatorToReplicaConfig config, + HealthCheckClientCallback succ_cb, + HealthCheckClientCallback fail_cb) : rpc_context_{CreateClientContext(config)}, rpc_client_{config.mgt_server, &rpc_context_}, config_{std::move(config)}, @@ -39,20 +41,24 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi succ_cb_{std::move(succ_cb)}, fail_cb_{std::move(fail_cb)} {} -auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; } +auto ReplicationInstanceClient::InstanceName() const -> std::string { return config_.instance_name; } -auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); } -auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); } +auto ReplicationInstanceClient::CoordinatorSocketAddress() const -> std::string { + return config_.CoordinatorSocketAddress(); +} +auto ReplicationInstanceClient::ReplicationSocketAddress() const -> std::string { + return config_.ReplicationSocketAddress(); +} -auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds { +auto ReplicationInstanceClient::InstanceDownTimeoutSec() const -> std::chrono::seconds { return config_.instance_down_timeout_sec; } -auto CoordinatorClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds { +auto ReplicationInstanceClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds { return config_.instance_get_uuid_frequency_sec; } -void CoordinatorClient::StartFrequentCheck() { +void ReplicationInstanceClient::StartFrequentCheck() { if (instance_checker_.IsRunning()) { return; } @@ -81,16 +87,17 @@ void CoordinatorClient::StartFrequentCheck() { }); } -void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); } -void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); } -void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); } +void ReplicationInstanceClient::StopFrequentCheck() { instance_checker_.Stop(); } +void ReplicationInstanceClient::PauseFrequentCheck() { instance_checker_.Pause(); } +void ReplicationInstanceClient::ResumeFrequentCheck() { instance_checker_.Resume(); } -auto CoordinatorClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo { +auto ReplicationInstanceClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo { return config_.replication_client_info; } -auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid, - ReplicationClientsInfo replication_clients_info) const -> bool { +auto ReplicationInstanceClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid, + ReplicationClientsInfo replication_clients_info) const + -> bool { try { auto stream{rpc_client_.Stream(uuid, std::move(replication_clients_info))}; if (!stream.AwaitResponse().success) { @@ -104,7 +111,7 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid, return false; } -auto CoordinatorClient::DemoteToReplica() const -> bool { +auto ReplicationInstanceClient::DemoteToReplica() const -> bool { auto const &instance_name = config_.instance_name; try { auto stream{rpc_client_.Stream(config_.replication_client_info)}; @@ -120,7 +127,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool { return false; } -auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool { +auto ReplicationInstanceClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool { try { auto stream{rpc_client_.Stream(uuid)}; if (!stream.AwaitResponse().success) { @@ -134,7 +141,7 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bo return false; } -auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool { +auto ReplicationInstanceClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool { try { auto stream{rpc_client_.Stream(instance_name)}; if (!stream.AwaitResponse().success) { @@ -148,7 +155,7 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) return false; } -auto CoordinatorClient::SendGetInstanceUUIDRpc() const +auto ReplicationInstanceClient::SendGetInstanceUUIDRpc() const -> utils::BasicResult> { try { auto stream{rpc_client_.Stream()}; @@ -160,7 +167,7 @@ auto CoordinatorClient::SendGetInstanceUUIDRpc() const } } -auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool { +auto ReplicationInstanceClient::SendEnableWritingOnMainRpc() const -> bool { try { auto stream{rpc_client_.Stream()}; if (!stream.AwaitResponse().success) { @@ -174,7 +181,7 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool { return false; } -auto CoordinatorClient::SendGetInstanceTimestampsRpc() const +auto ReplicationInstanceClient::SendGetInstanceTimestampsRpc() const -> utils::BasicResult { try { auto stream{rpc_client_.Stream()}; diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 4830848f7..17f274a03 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -27,7 +27,6 @@ namespace memgraph::coordination { using nuraft::ptr; -using nuraft::srv_config; CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config) : thread_pool_{1}, @@ -90,9 +89,10 @@ CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &co }; } -auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & { +auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) + -> ReplicationInstanceConnector & { auto repl_instance = - std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) { + std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstanceConnector const &instance) { return instance.InstanceName() == replication_instance_name; }); @@ -111,18 +111,18 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status); if (raft_state_.IsLeader()) { - auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string { + auto const stringify_repl_role = [this](ReplicationInstanceConnector const &instance) -> std::string { if (!instance.IsAlive()) return "unknown"; if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main"; return "replica"; }; - auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string { + auto const stringify_repl_health = [](ReplicationInstanceConnector const &instance) -> std::string { return instance.IsAlive() ? "up" : "down"; }; auto process_repl_instance_as_leader = - [&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus { + [&stringify_repl_role, &stringify_repl_health](ReplicationInstanceConnector const &instance) -> InstanceStatus { return {.instance_name = instance.InstanceName(), .coord_socket_address = instance.CoordinatorSocketAddress(), .cluster_role = stringify_repl_role(instance), @@ -161,19 +161,26 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { } auto CoordinatorInstance::TryFailover() -> void { - auto const is_replica = [this](ReplicationInstance const &instance) { + auto const is_replica = [this](ReplicationInstanceConnector const &instance) { return HasReplicaState(instance.InstanceName()); }; - auto alive_replicas = - repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive); + auto alive_replicas = repl_instances_ | ranges::views::filter(is_replica) | + ranges::views::filter(&ReplicationInstanceConnector::IsAlive); if (ranges::empty(alive_replicas)) { spdlog::warn("Failover failed since all replicas are down!"); return; } - auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); }; + if (!raft_state_.RequestLeadership()) { + spdlog::error("Failover failed since the instance is not the leader!"); + return; + } + + auto const get_ts = [](ReplicationInstanceConnector &replica) { + return replica.GetClient().SendGetInstanceTimestampsRpc(); + }; auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to(); @@ -207,13 +214,13 @@ auto CoordinatorInstance::TryFailover() -> void { new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; - auto const is_not_new_main = [&new_main](ReplicationInstance &instance) { + auto const is_not_new_main = [&new_main](ReplicationInstanceConnector &instance) { return instance.InstanceName() != new_main->InstanceName(); }; auto const new_main_uuid = utils::UUID{}; - auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) { + auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) { return !instance.SendSwapAndUpdateUUID(new_main_uuid) || !raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid); }; @@ -226,7 +233,7 @@ auto CoordinatorInstance::TryFailover() -> void { } auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | - ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) | ranges::to(); if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, @@ -269,7 +276,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance return SetInstanceToMainCoordinatorStatus::NOT_LEADER; } - auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { + auto const is_new_main = [&instance_name](ReplicationInstanceConnector const &instance) { return instance.InstanceName() == instance_name; }; @@ -288,13 +295,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; - auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { + auto const is_not_new_main = [&instance_name](ReplicationInstanceConnector const &instance) { return instance.InstanceName() != instance_name; }; auto const new_main_uuid = utils::UUID{}; - auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) { + auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) { return !instance.SendSwapAndUpdateUUID(new_main_uuid) || !raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid); }; @@ -305,7 +312,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance } auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | - ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) | ranges::to(); if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, @@ -335,19 +342,20 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig return RegisterInstanceCoordinatorStatus::LOCK_OPENED; } - if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() == instance_name; - })) { + if (std::ranges::any_of(repl_instances_, + [instance_name = config.instance_name](ReplicationInstanceConnector const &instance) { + return instance.InstanceName() == instance_name; + })) { return RegisterInstanceCoordinatorStatus::NAME_EXISTS; } - if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) { return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress(); })) { return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS; } - if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) { return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress(); })) { return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS; @@ -393,7 +401,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc return UnregisterInstanceCoordinatorStatus::NOT_LEADER; } - auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + auto const name_matches = [&instance_name](ReplicationInstanceConnector const &instance) { return instance.InstanceName() == instance_name; }; @@ -402,7 +410,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME; } - auto const is_current_main = [this](ReplicationInstance const &instance) { + auto const is_current_main = [this](ReplicationInstanceConnector const &instance) { return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive(); }; @@ -626,55 +634,10 @@ auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const return raft_state_.HasReplicaState(instance_name); } -auto CoordinatorInstance::GetRoutingTable(std::map const &routing) -> RoutingTable { - auto res = RoutingTable{}; - - auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) { - return instance.config.BoltSocketAddress(); - }; - - // TODO: (andi) This is wrong check, Fico will correct in #1819. - auto const is_instance_main = [&](ReplicationInstanceState const &instance) { - return instance.status == ReplicationRole::MAIN; - }; - - auto const is_instance_replica = [&](ReplicationInstanceState const &instance) { - return instance.status == ReplicationRole::REPLICA; - }; - - auto const &raft_log_repl_instances = raft_state_.GetReplicationInstances(); - - auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) | - ranges::views::transform(repl_instance_to_bolt) | ranges::to(); - MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!"); - - if (!std::ranges::empty(bolt_mains)) { - res.emplace_back(std::move(bolt_mains), "WRITE"); - } - - auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) | - ranges::views::transform(repl_instance_to_bolt) | ranges::to(); - if (!std::ranges::empty(bolt_replicas)) { - res.emplace_back(std::move(bolt_replicas), "READ"); - } - - auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) { - return instance.config.bolt_server.SocketAddress(); - }; - - auto const &raft_log_coord_instances = raft_state_.GetCoordinatorInstances(); - auto bolt_coords = - raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to(); - - auto const &local_bolt_coord = routing.find("address"); - if (local_bolt_coord == routing.end()) { - throw InvalidRoutingTableException("No bolt address found in routing table for the current coordinator!"); - } - - bolt_coords.push_back(local_bolt_coord->second); - res.emplace_back(std::move(bolt_coords), "ROUTE"); - - return res; +// TODO: (andi) Remove accepting param +auto CoordinatorInstance::GetRoutingTable(std::map const & /*routing*/) const + -> RoutingTable { + return raft_state_.GetRoutingTable(); } } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 875efaa45..870d291ea 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -27,18 +27,18 @@ class CoordinatorInstance; using HealthCheckClientCallback = std::function; using ReplicationClientsInfo = std::vector; -class CoordinatorClient { +class ReplicationInstanceClient { public: - explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb); + explicit ReplicationInstanceClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config, + HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb); - ~CoordinatorClient() = default; + ~ReplicationInstanceClient() = default; - CoordinatorClient(CoordinatorClient &) = delete; - CoordinatorClient &operator=(CoordinatorClient const &) = delete; + ReplicationInstanceClient(ReplicationInstanceClient &) = delete; + ReplicationInstanceClient &operator=(ReplicationInstanceClient const &) = delete; - CoordinatorClient(CoordinatorClient &&) noexcept = delete; - CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete; + ReplicationInstanceClient(ReplicationInstanceClient &&) noexcept = delete; + ReplicationInstanceClient &operator=(ReplicationInstanceClient &&) noexcept = delete; void StartFrequentCheck(); void StopFrequentCheck(); @@ -73,7 +73,7 @@ class CoordinatorClient { auto InstanceGetUUIDFrequencySec() const -> std::chrono::seconds; - friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { + friend bool operator==(ReplicationInstanceClient const &first, ReplicationInstanceClient const &second) { return first.config_ == second.config_; } diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 526930ccd..be6b3c156 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -26,8 +26,6 @@ namespace memgraph::coordination { -using RoutingTable = std::vector, std::string>>; - struct NewMainRes { std::string most_up_to_date_instance; std::string latest_epoch; @@ -56,9 +54,9 @@ class CoordinatorInstance { auto TryFailover() -> void; - auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void; + auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void; - auto GetRoutingTable(std::map const &routing) -> RoutingTable; + auto GetRoutingTable(std::map const &routing) const -> RoutingTable; static auto ChooseMostUpToDateInstance(std::span histories) -> NewMainRes; @@ -67,7 +65,7 @@ class CoordinatorInstance { auto HasReplicaState(std::string_view instance_name) const -> bool; private: - auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; + auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstanceConnector &; void MainFailCallback(std::string_view); @@ -79,7 +77,8 @@ class CoordinatorInstance { HealthCheckClientCallback client_succ_cb_, client_fail_cb_; // NOTE: Must be std::list because we rely on pointer stability. - std::list repl_instances_; + // TODO: (andi) Rename + virtualize for mocking. + std::list repl_instances_; mutable utils::ResourceLock coord_instance_lock_{}; // Thread pool needs to be constructed before raft state as raft state can call thread pool diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 7e3043bc5..33dab3562 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -27,6 +27,7 @@ struct CoordinatorToReplicaConfig; using BecomeLeaderCb = std::function; using BecomeFollowerCb = std::function; +using RoutingTable = std::vector, std::string>>; using nuraft::buffer; using nuraft::logger; @@ -77,6 +78,7 @@ class RaftState { auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool; auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool; auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool; + auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool; auto GetReplicationInstances() const -> std::vector; auto GetCoordinatorInstances() const -> std::vector; @@ -90,6 +92,7 @@ class RaftState { auto GetInstanceUUID(std::string_view) const -> utils::UUID; auto IsLockOpened() const -> bool; + auto GetRoutingTable() const -> RoutingTable; private: io::network::Endpoint raft_endpoint_; diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 19127e7eb..7a93ec14a 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -26,21 +26,21 @@ namespace memgraph::coordination { class CoordinatorInstance; -class ReplicationInstance; using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view); -class ReplicationInstance { +class ReplicationInstanceConnector { public: - ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, HealthCheckClientCallback succ_cb, - HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb, - HealthCheckInstanceCallback fail_instance_cb); + ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, + HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb, + HealthCheckInstanceCallback succ_instance_cb, + HealthCheckInstanceCallback fail_instance_cb); - ReplicationInstance(ReplicationInstance const &other) = delete; - ReplicationInstance &operator=(ReplicationInstance const &other) = delete; - ReplicationInstance(ReplicationInstance &&other) noexcept = delete; - ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete; - ~ReplicationInstance() = default; + ReplicationInstanceConnector(ReplicationInstanceConnector const &other) = delete; + ReplicationInstanceConnector &operator=(ReplicationInstanceConnector const &other) = delete; + ReplicationInstanceConnector(ReplicationInstanceConnector &&other) noexcept = delete; + ReplicationInstanceConnector &operator=(ReplicationInstanceConnector &&other) noexcept = delete; + ~ReplicationInstanceConnector() = default; auto OnSuccessPing() -> void; auto OnFailPing() -> bool; @@ -75,7 +75,7 @@ class ReplicationInstance { auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool; auto SendGetInstanceUUID() -> utils::BasicResult>; - auto GetClient() -> CoordinatorClient &; + auto GetClient() -> ReplicationInstanceClient &; auto EnableWritingOnMain() -> bool; @@ -83,7 +83,7 @@ class ReplicationInstance { auto GetFailCallback() -> HealthCheckInstanceCallback &; private: - CoordinatorClient client_; + ReplicationInstanceClient client_; std::chrono::system_clock::time_point last_response_time_{}; bool is_alive_{false}; std::chrono::system_clock::time_point last_check_of_uuid_{}; @@ -91,7 +91,7 @@ class ReplicationInstance { HealthCheckInstanceCallback succ_cb_; HealthCheckInstanceCallback fail_cb_; - friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) { + friend bool operator==(ReplicationInstanceConnector const &first, ReplicationInstanceConnector const &second) { return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ && first.is_alive_ == second.is_alive_; } diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index b65bb9d16..5a16695eb 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -10,12 +10,16 @@ // licenses/APL.txt. #ifdef MG_ENTERPRISE -#include + +#include "coordination/raft_state.hpp" #include "coordination/coordinator_communication_config.hpp" #include "coordination/coordinator_exceptions.hpp" -#include "coordination/raft_state.hpp" #include "utils/counter.hpp" +#include "utils/logging.hpp" + +#include +#include namespace memgraph::coordination { @@ -45,7 +49,7 @@ auto RaftState::InitRaftServer() -> void { asio_opts.thread_pool_size_ = 1; raft_params params; - params.heart_beat_interval_ = 150; + params.heart_beat_interval_ = 100; params.election_timeout_lower_bound_ = 200; params.election_timeout_upper_bound_ = 400; params.reserved_log_items_ = 5; @@ -415,5 +419,50 @@ auto RaftState::GetCoordinatorInstances() const -> std::vectorGetCoordinatorInstances(); } +auto RaftState::GetRoutingTable() const -> RoutingTable { + auto res = RoutingTable{}; + + auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) { + return instance.config.BoltSocketAddress(); + }; + + // TODO: (andi) This is wrong check, Fico will correct in #1819. + auto const is_instance_main = [&](ReplicationInstanceState const &instance) { + return instance.status == ReplicationRole::MAIN; + }; + + auto const is_instance_replica = [&](ReplicationInstanceState const &instance) { + return instance.status == ReplicationRole::REPLICA; + }; + + auto const &raft_log_repl_instances = GetReplicationInstances(); + + auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) | + ranges::views::transform(repl_instance_to_bolt) | ranges::to(); + MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!"); + + if (!std::ranges::empty(bolt_mains)) { + res.emplace_back(std::move(bolt_mains), "WRITE"); + } + + auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) | + ranges::views::transform(repl_instance_to_bolt) | ranges::to(); + if (!std::ranges::empty(bolt_replicas)) { + res.emplace_back(std::move(bolt_replicas), "READ"); + } + + auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) { + return instance.config.bolt_server.SocketAddress(); + }; + + auto const &raft_log_coord_instances = GetCoordinatorInstances(); + auto bolt_coords = + raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to(); + + res.emplace_back(std::move(bolt_coords), "ROUTE"); + + return res; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 00e4a98e0..1b70ad900 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -20,38 +20,43 @@ namespace memgraph::coordination { -ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, - HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb, - HealthCheckInstanceCallback succ_instance_cb, - HealthCheckInstanceCallback fail_instance_cb) +ReplicationInstanceConnector::ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, + HealthCheckClientCallback succ_cb, + HealthCheckClientCallback fail_cb, + HealthCheckInstanceCallback succ_instance_cb, + HealthCheckInstanceCallback fail_instance_cb) : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), succ_cb_(succ_instance_cb), fail_cb_(fail_instance_cb) {} -auto ReplicationInstance::OnSuccessPing() -> void { +auto ReplicationInstanceConnector::OnSuccessPing() -> void { last_response_time_ = std::chrono::system_clock::now(); is_alive_ = true; } -auto ReplicationInstance::OnFailPing() -> bool { +auto ReplicationInstanceConnector::OnFailPing() -> bool { auto elapsed_time = std::chrono::system_clock::now() - last_response_time_; is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec(); return is_alive_; } -auto ReplicationInstance::IsReadyForUUIDPing() -> bool { +auto ReplicationInstanceConnector::IsReadyForUUIDPing() -> bool { return std::chrono::duration_cast(std::chrono::system_clock::now() - last_check_of_uuid_) > client_.InstanceGetUUIDFrequencySec(); } -auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); } -auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); } -auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); } -auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; } +auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_.InstanceName(); } +auto ReplicationInstanceConnector::CoordinatorSocketAddress() const -> std::string { + return client_.CoordinatorSocketAddress(); +} +auto ReplicationInstanceConnector::ReplicationSocketAddress() const -> std::string { + return client_.ReplicationSocketAddress(); +} +auto ReplicationInstanceConnector::IsAlive() const -> bool { return is_alive_; } -auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info, - HealthCheckInstanceCallback main_succ_cb, - HealthCheckInstanceCallback main_fail_cb) -> bool { +auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info, + HealthCheckInstanceCallback main_succ_cb, + HealthCheckInstanceCallback main_fail_cb) -> bool { if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { return false; } @@ -62,10 +67,10 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication return true; } -auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } +auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } -auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, - HealthCheckInstanceCallback replica_fail_cb) -> bool { +auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, + HealthCheckInstanceCallback replica_fail_cb) -> bool { if (!client_.DemoteToReplica()) { return false; } @@ -76,21 +81,24 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_su return true; } -auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); } -auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); } -auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } -auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } +auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_.StartFrequentCheck(); } +auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_.StopFrequentCheck(); } +auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } +auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } -auto ReplicationInstance::ReplicationClientInfo() const -> coordination::ReplicationClientInfo { +auto ReplicationInstanceConnector::ReplicationClientInfo() const -> coordination::ReplicationClientInfo { return client_.ReplicationClientInfo(); } -auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; } -auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; } +auto ReplicationInstanceConnector::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; } +auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; } -auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } +auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return client_; } -auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { +auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } +auto ReplicationInstance::GetMainUUID() const -> std::optional const & { return main_uuid_; } + +auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { if (!IsReadyForUUIDPing()) { return true; } @@ -108,25 +116,24 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur return SendSwapAndUpdateUUID(curr_main_uuid); } -auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool { - if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { - return false; - } - return true; +auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool { + return replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid); } -auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool { +auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool { return client_.SendUnregisterReplicaRpc(instance_name); } -auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); } +auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); } -auto ReplicationInstance::SendGetInstanceUUID() +auto ReplicationInstanceConnector::SendGetInstanceUUID() -> utils::BasicResult> { return client_.SendGetInstanceUUIDRpc(); } -void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); } +void ReplicationInstanceConnector::UpdateReplicaLastResponseUUID() { + last_check_of_uuid_ = std::chrono::system_clock::now(); +} } // namespace memgraph::coordination #endif diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 05bf2717f..86fc17ed8 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -453,14 +453,6 @@ target_link_libraries(${test_prefix}coordinator_cluster_state gflags mg-coordina target_include_directories(${test_prefix}coordinator_cluster_state PRIVATE ${CMAKE_SOURCE_DIR}/include) endif() -# TODO: (andi) Move into appropriate test suite -# Test Routing table -if(MG_ENTERPRISE) -add_unit_test(routing_table.cpp) -target_link_libraries(${test_prefix}routing_table gflags mg-coordination mg-repl_coord_glue) -target_include_directories(${test_prefix}routing_table PRIVATE ${CMAKE_SOURCE_DIR}/include) -endif() - # Test coordinator state machine if(MG_ENTERPRISE) add_unit_test(coordinator_state_machine.cpp) @@ -474,3 +466,10 @@ add_unit_test(raft_state.cpp) target_link_libraries(${test_prefix}raft_state gflags mg-coordination mg-repl_coord_glue) target_include_directories(${test_prefix}raft_state PRIVATE ${CMAKE_SOURCE_DIR}/include) endif() + +# Test coordinator instance +if(MG_ENTERPRISE) +add_unit_test(coordinator_instance.cpp) +target_link_libraries(${test_prefix}coordinator_instance gflags mg-coordination mg-repl_coord_glue) +target_include_directories(${test_prefix}coordinator_instance PRIVATE ${CMAKE_SOURCE_DIR}/include) +endif() diff --git a/tests/unit/coordination_utils.cpp b/tests/unit/coordination_utils.cpp index 7c77b4e68..2e0330255 100644 --- a/tests/unit/coordination_utils.cpp +++ b/tests/unit/coordination_utils.cpp @@ -16,6 +16,8 @@ #include "replication_coordination_glue/common.hpp" #include "utils/functional.hpp" +using memgraph::coordination::CoordinatorInstanceInitConfig; + class CoordinationUtils : public ::testing::Test { protected: void SetUp() override {} @@ -60,7 +62,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) { memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history}; instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); - memgraph::coordination::CoordinatorInstance instance; + + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + memgraph::coordination::CoordinatorInstance instance{init_config1}; auto [instance_name, latest_epoch, latest_commit_timestamp] = instance.ChooseMostUpToDateInstance(instance_database_histories); @@ -112,7 +117,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) { memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3}; instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); - memgraph::coordination::CoordinatorInstance instance; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + memgraph::coordination::CoordinatorInstance instance{init_config1}; auto [instance_name, latest_epoch, latest_commit_timestamp] = instance.ChooseMostUpToDateInstance(instance_database_histories); @@ -167,7 +174,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) { memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest}; instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); - memgraph::coordination::CoordinatorInstance instance; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + memgraph::coordination::CoordinatorInstance instance{init_config1}; auto [instance_name, latest_epoch, latest_commit_timestamp] = instance.ChooseMostUpToDateInstance(instance_database_histories); @@ -226,7 +235,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) { memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2}; instance_database_histories.emplace_back("instance_2", instance_2_db_histories_); - memgraph::coordination::CoordinatorInstance instance; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + memgraph::coordination::CoordinatorInstance instance{init_config1}; auto [instance_name, latest_epoch, latest_commit_timestamp] = instance.ChooseMostUpToDateInstance(instance_database_histories); diff --git a/tests/unit/coordinator_cluster_state.cpp b/tests/unit/coordinator_cluster_state.cpp index 581e07621..2b770eab9 100644 --- a/tests/unit/coordinator_cluster_state.cpp +++ b/tests/unit/coordinator_cluster_state.cpp @@ -22,6 +22,7 @@ #include "libnuraft/nuraft.hxx" using memgraph::coordination::CoordinatorClusterState; +using memgraph::coordination::CoordinatorInstanceInitConfig; using memgraph::coordination::CoordinatorInstanceState; using memgraph::coordination::CoordinatorToCoordinatorConfig; using memgraph::coordination::CoordinatorToReplicaConfig; @@ -45,7 +46,9 @@ class CoordinatorClusterStateTest : public ::testing::Test { }; TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + CoordinatorClusterState cluster_state{init_config1}; auto config = CoordinatorToReplicaConfig{.instance_name = "instance3", @@ -65,13 +68,15 @@ TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) { ASSERT_EQ(instances.size(), 1); ASSERT_EQ(instances[0].config, config); ASSERT_EQ(instances[0].status, ReplicationRole::REPLICA); - ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 0); + ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 1); ASSERT_TRUE(cluster_state.IsReplica("instance3")); } TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + CoordinatorClusterState cluster_state{init_config1}; auto config = CoordinatorToReplicaConfig{.instance_name = "instance3", @@ -92,7 +97,9 @@ TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) { } TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + CoordinatorClusterState cluster_state{init_config1}; { auto config = CoordinatorToReplicaConfig{.instance_name = "instance3", @@ -135,7 +142,10 @@ TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) { } TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + + CoordinatorClusterState cluster_state{init_config1}; { auto config = CoordinatorToReplicaConfig{.instance_name = "instance3", @@ -180,23 +190,27 @@ TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) { } TEST_F(CoordinatorClusterStateTest, UpdateUUID) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + CoordinatorClusterState cluster_state{init_config1}; auto uuid = UUID(); cluster_state.DoAction(uuid, RaftLogAction::UPDATE_UUID); ASSERT_EQ(cluster_state.GetUUID(), uuid); } TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) { + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; CoordinatorToCoordinatorConfig config{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}; - CoordinatorClusterState cluster_state; + CoordinatorClusterState cluster_state{init_config1}; cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE); auto instances = cluster_state.GetCoordinatorInstances(); - ASSERT_EQ(instances.size(), 1); - ASSERT_EQ(instances[0].config, config); + ASSERT_EQ(instances.size(), 2); + ASSERT_EQ(instances[1].config, config); } TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) { @@ -231,7 +245,10 @@ TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) { } TEST_F(CoordinatorClusterStateTest, Marshalling) { - CoordinatorClusterState cluster_state; + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688}; + + CoordinatorClusterState cluster_state{init_config1}; CoordinatorToCoordinatorConfig config{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}; diff --git a/tests/unit/coordinator_instance.cpp b/tests/unit/coordinator_instance.cpp new file mode 100644 index 000000000..0f0e20c7c --- /dev/null +++ b/tests/unit/coordinator_instance.cpp @@ -0,0 +1,92 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "coordination/coordinator_instance.hpp" + +#include "auth/auth.hpp" +#include "flags/run_time_configurable.hpp" +#include "interpreter_faker.hpp" +#include "io/network/endpoint.hpp" +#include "license/license.hpp" +#include "replication_handler/replication_handler.hpp" +#include "storage/v2/config.hpp" + +#include "utils/file.hpp" + +#include +#include +#include "json/json.hpp" + +using memgraph::coordination::CoordinatorInstance; +using memgraph::coordination::CoordinatorInstanceInitConfig; +using memgraph::coordination::CoordinatorToCoordinatorConfig; +using memgraph::coordination::CoordinatorToReplicaConfig; +using memgraph::coordination::RaftState; +using memgraph::coordination::ReplicationClientInfo; +using memgraph::io::network::Endpoint; +using memgraph::replication::ReplicationHandler; +using memgraph::replication_coordination_glue::ReplicationMode; +using memgraph::storage::Config; + +class CoordinatorInstanceTest : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} + + std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() / + "MG_tests_unit_coordinator_instance"}; +}; + +TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) { + auto const init_config = + CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686}; + + auto const instance1 = CoordinatorInstance{init_config}; + auto const instances = instance1.ShowInstances(); + ASSERT_EQ(instances.size(), 1); + ASSERT_EQ(instances[0].instance_name, "coordinator_4"); + ASSERT_EQ(instances[0].health, "unknown"); + ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110"); + ASSERT_EQ(instances[0].coord_socket_address, ""); + ASSERT_EQ(instances[0].cluster_role, "coordinator"); +} + +TEST_F(CoordinatorInstanceTest, ConnectCoordinators) { + auto const init_config2 = + CoordinatorInstanceInitConfig{.coordinator_id = 2, .coordinator_port = 10112, .bolt_port = 7688}; + + auto const instance2 = CoordinatorInstance{init_config2}; + + auto const init_config3 = + CoordinatorInstanceInitConfig{.coordinator_id = 3, .coordinator_port = 10113, .bolt_port = 7689}; + + auto const instance3 = CoordinatorInstance{init_config3}; + + auto const init_config1 = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687}; + + auto instance1 = CoordinatorInstance{init_config1}; + + instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 2, + .bolt_server = Endpoint{"127.0.0.1", 7688}, + .coordinator_server = Endpoint{"127.0.0.1", 10112}}); + + instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 3, + .bolt_server = Endpoint{"127.0.0.1", 7689}, + .coordinator_server = Endpoint{"127.0.0.1", 10113}}); + + auto const instances = instance1.ShowInstances(); + ASSERT_EQ(instances.size(), 3); + ASSERT_EQ(instances[0].instance_name, "coordinator_1"); + ASSERT_EQ(instances[1].instance_name, "coordinator_2"); + ASSERT_EQ(instances[2].instance_name, "coordinator_3"); +} diff --git a/tests/unit/raft_state.cpp b/tests/unit/raft_state.cpp index 1ef9826d1..e8ef165f6 100644 --- a/tests/unit/raft_state.cpp +++ b/tests/unit/raft_state.cpp @@ -20,8 +20,11 @@ using memgraph::coordination::CoordinatorInstanceInitConfig; using memgraph::coordination::CoordinatorToCoordinatorConfig; +using memgraph::coordination::CoordinatorToReplicaConfig; using memgraph::coordination::RaftState; +using memgraph::coordination::ReplicationClientInfo; using memgraph::io::network::Endpoint; +using memgraph::replication_coordination_glue::ReplicationMode; using nuraft::ptr; class RaftStateTest : public ::testing::Test { @@ -54,3 +57,82 @@ TEST_F(RaftStateTest, RaftStateEmptyMetadata) { .coordinator_server = Endpoint{"127.0.0.1", 1234}}; ASSERT_EQ(coord_instance.config, coord_config); } + +TEST_F(RaftStateTest, GetSingleRouterRoutingTable) { + auto become_leader_cb = []() {}; + auto become_follower_cb = []() {}; + auto const init_config = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10112, .bolt_port = 7688}; + + auto const raft_state = + RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb)); + auto routing_table = raft_state.GetRoutingTable(); + + ASSERT_EQ(routing_table.size(), 1); + + auto const routers = routing_table[0]; + ASSERT_EQ(routers.first, std::vector{"127.0.0.1:7688"}); + ASSERT_EQ(routers.second, "ROUTE"); +} + +TEST_F(RaftStateTest, GetMixedRoutingTable) { + auto become_leader_cb = []() {}; + auto become_follower_cb = []() {}; + auto const init_config = + CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10113, .bolt_port = 7690}; + auto leader = RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb)); + + leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{ + .instance_name = "instance1", + .mgt_server = Endpoint{"127.0.0.1", 10011}, + .bolt_server = Endpoint{"127.0.0.1", 7687}, + .replication_client_info = ReplicationClientInfo{.instance_name = "instance1", + .replication_mode = ReplicationMode::ASYNC, + .replication_server = Endpoint{"127.0.0.1", 10001}}}); + + leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{ + .instance_name = "instance2", + .mgt_server = Endpoint{"127.0.0.1", 10012}, + .bolt_server = Endpoint{"127.0.0.1", 7688}, + .replication_client_info = ReplicationClientInfo{.instance_name = "instance2", + .replication_mode = ReplicationMode::ASYNC, + .replication_server = Endpoint{"127.0.0.1", 10002}}}); + + leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{ + .instance_name = "instance3", + .mgt_server = Endpoint{"127.0.0.1", 10013}, + .bolt_server = Endpoint{"127.0.0.1", 7689}, + .replication_client_info = ReplicationClientInfo{.instance_name = "instance3", + .replication_mode = ReplicationMode::ASYNC, + .replication_server = Endpoint{"127.0.0.1", 10003}}}); + + leader.AppendAddCoordinatorInstanceLog( + CoordinatorToCoordinatorConfig{.coordinator_id = 2, + .bolt_server = Endpoint{"127.0.0.1", 7691}, + .coordinator_server = Endpoint{"127.0.0.1", 10114}}); + + leader.AppendAddCoordinatorInstanceLog( + CoordinatorToCoordinatorConfig{.coordinator_id = 3, + .bolt_server = Endpoint{"127.0.0.1", 7692}, + .coordinator_server = Endpoint{"127.0.0.1", 10115}}); + + leader.AppendSetInstanceAsMainLog("instance1"); + + auto const routing_table = leader.GetRoutingTable(); + + ASSERT_EQ(routing_table.size(), 3); + + auto const &mains = routing_table[0]; + ASSERT_EQ(mains.second, "WRITE"); + ASSERT_EQ(mains.first, std::vector{"127.0.0.1:7687"}); + + auto const &replicas = routing_table[1]; + ASSERT_EQ(replicas.second, "READ"); + auto const expected_replicas = std::vector{"127.0.0.1:7688", "127.0.0.1:7689"}; + ASSERT_EQ(replicas.first, expected_replicas); + + auto const &routers = routing_table[2]; + ASSERT_EQ(routers.second, "ROUTE"); + auto const expected_routers = std::vector{"127.0.0.1:7690", "127.0.0.1:7691", "127.0.0.1:7692"}; + ASSERT_EQ(routers.first, expected_routers); +} diff --git a/tests/unit/routing_table.cpp b/tests/unit/routing_table.cpp deleted file mode 100644 index 39c09df0f..000000000 --- a/tests/unit/routing_table.cpp +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include "auth/auth.hpp" -#include "coordination/coordinator_instance.hpp" -#include "flags/run_time_configurable.hpp" -#include "interpreter_faker.hpp" -#include "io/network/endpoint.hpp" -#include "license/license.hpp" -#include "replication_handler/replication_handler.hpp" -#include "storage/v2/config.hpp" - -#include "utils/file.hpp" - -#include -#include -#include "json/json.hpp" - -using memgraph::coordination::CoordinatorInstance; -using memgraph::coordination::CoordinatorToCoordinatorConfig; -using memgraph::coordination::CoordinatorToReplicaConfig; -using memgraph::coordination::RaftState; -using memgraph::coordination::ReplicationClientInfo; -using memgraph::io::network::Endpoint; -using memgraph::replication::ReplicationHandler; -using memgraph::replication_coordination_glue::ReplicationMode; -using memgraph::storage::Config; - -// class MockCoordinatorInstance : CoordinatorInstance { -// auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void override {} -// }; - -class RoutingTableTest : public ::testing::Test { - protected: - std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() / - "MG_tests_unit_coordinator_cluster_state"}; - std::filesystem::path repl1_data_directory{std::filesystem::temp_directory_path() / - "MG_test_unit_storage_v2_replication_repl"}; - std::filesystem::path repl2_data_directory{std::filesystem::temp_directory_path() / - "MG_test_unit_storage_v2_replication_repl2"}; - void SetUp() override { Clear(); } - - void TearDown() override { Clear(); } - - Config main_conf = [&] { - Config config{ - .durability = - { - .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }, - .salient.items = {.properties_on_edges = true}, - }; - UpdatePaths(config, main_data_directory); - return config; - }(); - Config repl1_conf = [&] { - Config config{ - .durability = - { - .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }, - .salient.items = {.properties_on_edges = true}, - }; - UpdatePaths(config, repl1_data_directory); - return config; - }(); - Config repl2_conf = [&] { - Config config{ - .durability = - { - .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, - }, - .salient.items = {.properties_on_edges = true}, - }; - UpdatePaths(config, repl2_data_directory); - return config; - }(); - - const std::string local_host = ("127.0.0.1"); - const std::array ports{10000, 20000}; - const std::array replicas = {"REPLICA1", "REPLICA2"}; - - private: - void Clear() { - if (std::filesystem::exists(main_data_directory)) std::filesystem::remove_all(main_data_directory); - if (std::filesystem::exists(repl1_data_directory)) std::filesystem::remove_all(repl1_data_directory); - if (std::filesystem::exists(repl2_data_directory)) std::filesystem::remove_all(repl2_data_directory); - } -}; - -struct MinMemgraph { - MinMemgraph(const memgraph::storage::Config &conf) - : auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}}, - repl_state{ReplicationStateRootPath(conf)}, - dbms{conf, repl_state -#ifdef MG_ENTERPRISE - , - auth, true -#endif - }, - db_acc{dbms.Get()}, - db{*db_acc.get()}, - repl_handler(repl_state, dbms -#ifdef MG_ENTERPRISE - , - system_, auth -#endif - ) { - } - memgraph::auth::SynchedAuth auth; - memgraph::system::System system_; - memgraph::replication::ReplicationState repl_state; - memgraph::dbms::DbmsHandler dbms; - memgraph::dbms::DatabaseAccess db_acc; - memgraph::dbms::Database &db; - ReplicationHandler repl_handler; -}; -; - -TEST_F(RoutingTableTest, GetSingleRouterRoutingTable) { - CoordinatorInstance instance1; - auto routing = std::map{{"address", "localhost:7688"}}; - auto routing_table = instance1.GetRoutingTable(routing); - - ASSERT_EQ(routing_table.size(), 1); - - auto const routers = routing_table[0]; - ASSERT_EQ(routers.first, std::vector{"localhost:7688"}); - ASSERT_EQ(routers.second, "ROUTE"); -} - -TEST_F(RoutingTableTest, GetMixedRoutingTable) { - auto instance1 = RaftState::MakeRaftState([]() {}, []() {}); - auto routing = std::map{{"address", "localhost:7690"}}; - instance1.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{ - .instance_name = "instance2", - .mgt_server = Endpoint{"127.0.0.1", 10011}, - .bolt_server = Endpoint{"127.0.0.1", 7687}, - .replication_client_info = ReplicationClientInfo{.instance_name = "instance2", - .replication_mode = ReplicationMode::ASYNC, - .replication_server = Endpoint{"127.0.0.1", 10001}}}); - // auto routing_table = instance1.GetRoutingTable(routing); - - // ASSERT_EQ(routing_table.size(), 1); - // auto const routers = routing_table[0]; - // ASSERT_EQ(routers.second, "ROUTE"); -} - -// TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) { -// -// CoordinatorInstance instance1; -// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 1, -// .bolt_server = Endpoint{"127.0.0.1", 7689}, -// .coordinator_server = Endpoint{"127.0.0.1", -// 10111}}); -// -// auto routing = std::map{{"address", "localhost:7688"}}; -// auto routing_table = instance1.GetRoutingTable(routing); -// -// ASSERT_EQ(routing_table.size(), 1); -// -// auto const routers = routing_table[0]; -// ASSERT_EQ(routers.second, "ROUTE"); -// ASSERT_EQ(routers.first.size(), 2); -// auto const expected_routers = std::vector{"localhost:7689", "localhost:7688"}; -// ASSERT_EQ(routers.first, expected_routers); -// }