From 4a7c7f0898e2644f7a1551b5a3f9d52c796b02aa Mon Sep 17 00:00:00 2001 From: Andi <andi8647@gmail.com> Date: Tue, 13 Feb 2024 09:49:28 +0100 Subject: [PATCH] Distributed coordinators (#1693) --- src/coordination/CMakeLists.txt | 10 +- src/coordination/coordinator_client.cpp | 12 +- src/coordination/coordinator_data.cpp | 282 --------------- src/coordination/coordinator_instance.cpp | 328 +++++++++++++++--- src/coordination/coordinator_log_store.cpp | 306 +++++----------- src/coordination/coordinator_state.cpp | 32 +- .../coordinator_state_machine.cpp | 13 + .../coordination/coordinator_client.hpp | 10 +- .../include/coordination/coordinator_data.hpp | 61 ---- .../coordination/coordinator_exceptions.hpp | 33 ++ .../coordination/coordinator_instance.hpp | 57 +-- .../coordination/coordinator_state.hpp | 14 +- .../include/coordination/raft_state.hpp | 81 +++++ ...gister_main_replica_coordinator_status.hpp | 3 + .../coordination/replication_instance.hpp | 12 +- .../include/nuraft/coordinator_log_store.hpp | 60 +--- .../nuraft/coordinator_state_machine.hpp | 4 + src/coordination/raft_state.cpp | 140 ++++++++ src/coordination/replication_instance.cpp | 31 +- src/dbms/coordinator_handler.cpp | 8 +- src/dbms/coordinator_handler.hpp | 4 +- src/dbms/replication_client.cpp | 0 src/dbms/replication_handler.cpp | 0 src/dbms/replication_handler.hpp | 0 src/query/interpreter.cpp | 29 +- src/query/interpreter.hpp | 10 +- .../CMakeLists.txt | 5 +- .../coord_cluster_registration.py | 284 +++++++++++++++ .../distributed_coordinators.py | 145 -------- .../distributed_coords.py | 164 +++++++++ ...atic_failover.py => single_coordinator.py} | 1 - .../workloads.yaml | 12 +- 32 files changed, 1231 insertions(+), 920 deletions(-) delete mode 100644 src/coordination/coordinator_data.cpp delete mode 100644 src/coordination/include/coordination/coordinator_data.hpp create mode 100644 src/coordination/include/coordination/raft_state.hpp create mode 100644 src/coordination/raft_state.cpp delete mode 100644 src/dbms/replication_client.cpp delete mode 100644 src/dbms/replication_handler.cpp delete mode 100644 src/dbms/replication_handler.hpp create mode 100644 tests/e2e/high_availability_experimental/coord_cluster_registration.py delete mode 100644 tests/e2e/high_availability_experimental/distributed_coordinators.py create mode 100644 tests/e2e/high_availability_experimental/distributed_coords.py rename tests/e2e/high_availability_experimental/{automatic_failover.py => single_coordinator.py} (99%) diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index d6ab23132..4937c0ad3 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -9,13 +9,13 @@ target_sources(mg-coordination include/coordination/coordinator_config.hpp include/coordination/coordinator_exceptions.hpp include/coordination/coordinator_slk.hpp - include/coordination/coordinator_data.hpp - include/coordination/constants.hpp + include/coordination/coordinator_instance.hpp include/coordination/coordinator_cluster_config.hpp include/coordination/coordinator_handlers.hpp - include/coordination/coordinator_instance.hpp + include/coordination/constants.hpp include/coordination/instance_status.hpp include/coordination/replication_instance.hpp + include/coordination/raft_state.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp @@ -26,10 +26,10 @@ target_sources(mg-coordination coordinator_state.cpp coordinator_rpc.cpp coordinator_server.cpp - coordinator_data.cpp - coordinator_instance.cpp coordinator_handlers.cpp + coordinator_instance.cpp replication_instance.cpp + raft_state.cpp coordinator_log_store.cpp coordinator_state_machine.cpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index ce2cb0bda..a30e504b7 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -28,13 +28,13 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const & } } // namespace -CoordinatorClient::CoordinatorClient(CoordinatorData *coord_data, CoordinatorClientConfig config, +CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config, HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) : rpc_context_{CreateClientContext(config)}, rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), &rpc_context_}, config_{std::move(config)}, - coord_data_{coord_data}, + coord_instance_{coord_instance}, succ_cb_{std::move(succ_cb)}, fail_cb_{std::move(fail_cb)} {} @@ -42,6 +42,10 @@ auto CoordinatorClient::InstanceName() const -> std::string { return config_.ins auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); } void CoordinatorClient::StartFrequentCheck() { + if (instance_checker_.IsRunning()) { + return; + } + MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); @@ -54,9 +58,9 @@ void CoordinatorClient::StartFrequentCheck() { auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; stream.AwaitResponse(); } - succ_cb_(coord_data_, instance_name); + succ_cb_(coord_instance_, instance_name); } catch (rpc::RpcFailedException const &) { - fail_cb_(coord_data_, instance_name); + fail_cb_(coord_instance_, instance_name); } }); } diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp deleted file mode 100644 index 3732958de..000000000 --- a/src/coordination/coordinator_data.cpp +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#ifdef MG_ENTERPRISE - -#include "coordination/coordinator_data.hpp" - -#include "coordination/register_main_replica_coordinator_status.hpp" -#include "coordination/replication_instance.hpp" -#include "utils/uuid.hpp" - -#include <range/v3/view.hpp> -#include <shared_mutex> - -namespace memgraph::coordination { - -using nuraft::ptr; -using nuraft::srv_config; - -CoordinatorData::CoordinatorData() { - auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & { - auto instance = std::ranges::find_if( - coord_data->repl_instances_, - [instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; }); - - MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name); - return *instance; - }; - - replica_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { - auto lock = std::lock_guard{coord_data->coord_data_lock_}; - spdlog::trace("Instance {} performing replica successful callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - - if (!instance.GetMainUUID().has_value() || main_uuid_ != instance.GetMainUUID().value()) { - if (!instance.SendSwapAndUpdateUUID(main_uuid_)) { - spdlog::error( - fmt::format("Failed to swap uuid for replica instance {} which is alive", instance.InstanceName())); - return; - } - } - - instance.OnSuccessPing(); - }; - - replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { - auto lock = std::lock_guard{coord_data->coord_data_lock_}; - spdlog::trace("Instance {} performing replica failure callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - instance.OnFailPing(); - // We need to restart main uuid from instance since it was "down" at least a second - // There is slight delay, if we choose to use isAlive, instance can be down and back up in less than - // our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any - // incoming RPCs from valid main - // TODO(antoniofilipovic) this needs here more complex logic - // We need to get id of main replica is listening to on successful ping - // and swap it to correct uuid if it failed - instance.SetNewMainUUID(); - }; - - main_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { - auto lock = std::lock_guard{coord_data->coord_data_lock_}; - spdlog::trace("Instance {} performing main successful callback", instance_name); - - auto &instance = find_instance(coord_data, instance_name); - - if (instance.IsAlive()) { - instance.OnSuccessPing(); - return; - } - - const auto &instance_uuid = instance.GetMainUUID(); - MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set"); - if (main_uuid_ == instance_uuid.value()) { - instance.OnSuccessPing(); - return; - } - - // TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but - // swapUUID can fail - bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_); - if (demoted) { - instance.OnSuccessPing(); - spdlog::info("Instance {} demoted to replica", instance_name); - } else { - spdlog::error("Instance {} failed to become replica", instance_name); - return; - } - - if (!instance.SendSwapAndUpdateUUID(main_uuid_)) { - spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", instance.InstanceName())); - return; - } - }; - - main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { - auto lock = std::lock_guard{coord_data->coord_data_lock_}; - spdlog::trace("Instance {} performing main failure callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - instance.OnFailPing(); - const auto &instance_uuid = instance.GetMainUUID(); - MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set"); - - if (!instance.IsAlive() && main_uuid_ == instance_uuid.value()) { - spdlog::info("Cluster without main instance, trying automatic failover"); - coord_data->TryFailover(); - } - }; -} - -auto CoordinatorData::TryFailover() -> void { - auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) | - ranges::views::filter(&ReplicationInstance::IsAlive); - - if (ranges::empty(alive_replicas)) { - spdlog::warn("Failover failed since all replicas are down!"); - return; - } - - // TODO: Smarter choice - auto chosen_replica_instance = ranges::begin(alive_replicas); - - chosen_replica_instance->PauseFrequentCheck(); - utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }}; - - auto const potential_new_main_uuid = utils::UUID{}; - - auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) { - return instance != *chosen_replica_instance; - }; - - // If for some replicas swap fails, for others on successful ping we will revert back on next change - // or we will do failover first again and then it will be consistent again - for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) { - if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) { - spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover", - other_replica_instance.InstanceName())); - return; - } - } - - std::vector<ReplClientInfo> repl_clients_info; - repl_clients_info.reserve(repl_instances_.size() - 1); - std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance), - std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); - - if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, - main_fail_cb_)) { - spdlog::warn("Failover failed since promoting replica to main failed!"); - return; - } - chosen_replica_instance->SetNewMainUUID(potential_new_main_uuid); - main_uuid_ = potential_new_main_uuid; - - spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName()); -} - -auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> { - auto const coord_instances = self_.GetAllCoordinators(); - - std::vector<InstanceStatus> instances_status; - instances_status.reserve(repl_instances_.size() + coord_instances.size()); - - auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string { - if (!instance.IsAlive()) return "unknown"; - if (instance.IsMain()) return "main"; - return "replica"; - }; - - auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus { - return {.instance_name = instance.InstanceName(), - .coord_socket_address = instance.SocketAddress(), - .cluster_role = stringify_repl_role(instance), - .is_alive = instance.IsAlive()}; - }; - - auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus { - return {.instance_name = "coordinator_" + std::to_string(instance->get_id()), - .raft_socket_address = instance->get_endpoint(), - .cluster_role = "coordinator", - .is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move - // CoordinatorState to every instance, we can be smarter about this using our RPC. - }; - - std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status); - - { - auto lock = std::shared_lock{coord_data_lock_}; - std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status); - } - - return instances_status; -} - -// TODO: (andi) Make sure you cannot put coordinator instance to the main -auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { - auto lock = std::lock_guard{coord_data_lock_}; - - auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() == instance_name; - }; - auto new_main = std::ranges::find_if(repl_instances_, is_new_main); - - if (new_main == repl_instances_.end()) { - spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, - instance_name); - return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; - } - - new_main->PauseFrequentCheck(); - utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; - - ReplicationClientsInfo repl_clients_info; - repl_clients_info.reserve(repl_instances_.size() - 1); - - auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() != instance_name; - }; - - auto potential_new_main_uuid = utils::UUID{}; - spdlog::trace("Generated potential new main uuid"); - - for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { - if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) { - spdlog::error( - fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); - return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; - } - } - - std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), - std::back_inserter(repl_clients_info), - [](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); }); - - if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { - return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; - } - - new_main->SetNewMainUUID(potential_new_main_uuid); - main_uuid_ = potential_new_main_uuid; - spdlog::info("Instance {} promoted to main", instance_name); - return SetInstanceToMainCoordinatorStatus::SUCCESS; -} - -auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { - auto lock = std::lock_guard{coord_data_lock_}; - if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { - return instance.InstanceName() == config.instance_name; - })) { - return RegisterInstanceCoordinatorStatus::NAME_EXISTS; - } - - if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { - return instance.SocketAddress() == config.SocketAddress(); - })) { - return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; - } - - try { - repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); - return RegisterInstanceCoordinatorStatus::SUCCESS; - - } catch (CoordinatorRegisterInstanceException const &) { - return RegisterInstanceCoordinatorStatus::RPC_FAILED; - } -} - -auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) - -> void { - self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); -} - -} // namespace memgraph::coordination -#endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 7a0b0fbd0..4c3f3e646 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -18,81 +18,305 @@ #include "nuraft/coordinator_state_manager.hpp" #include "utils/counter.hpp" +#include <range/v3/view.hpp> +#include <shared_mutex> + namespace memgraph::coordination { -using nuraft::asio_service; -using nuraft::cmd_result; -using nuraft::cs_new; using nuraft::ptr; -using nuraft::raft_params; using nuraft::srv_config; -using raft_result = cmd_result<ptr<buffer>>; CoordinatorInstance::CoordinatorInstance() - : raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") { - auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_); - state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint); - state_machine_ = cs_new<CoordinatorStateMachine>(); - logger_ = nullptr; + : raft_state_(RaftState::MakeRaftState( + [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, + [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { + auto find_repl_instance = [](CoordinatorInstance *self, + std::string_view repl_instance_name) -> ReplicationInstance & { + auto repl_instance = + std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == repl_instance_name; + }); - // ASIO options - asio_service::options asio_opts; - asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this + MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!", + repl_instance_name); + return *repl_instance; + }; - // RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms. - raft_params params; - params.heart_beat_interval_ = 100; - params.election_timeout_lower_bound_ = 200; - params.election_timeout_upper_bound_ = 400; - // 5 logs are preserved before the last snapshot - params.reserved_log_items_ = 5; - // Create snapshot for every 5 log appends - params.snapshot_distance_ = 5; - params.client_req_timeout_ = 3000; - params.return_method_ = raft_params::blocking; + replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); - raft_server_ = - launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params); + if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) { + spdlog::error( + fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName())); + return; + } - if (!raft_server_) { - throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint); - } + repl_instance.OnSuccessPing(); + }; - auto maybe_stop = utils::ResettableCounter<20>(); - while (!raft_server_->is_initialized() && !maybe_stop()) { - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } + replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + repl_instance.OnFailPing(); + // We need to restart main uuid from instance since it was "down" at least a second + // There is slight delay, if we choose to use isAlive, instance can be down and back up in less than + // our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any + // incoming RPCs from valid main + // TODO(antoniofilipovic) this needs here more complex logic + // We need to get id of main replica is listening to on successful ping + // and swap it to correct uuid if it failed + repl_instance.ResetMainUUID(); + }; - if (!raft_server_->is_initialized()) { - throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint); - } + main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing main successful callback", repl_instance_name); - spdlog::info("Raft server started on {}", raft_endpoint); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + + if (repl_instance.IsAlive()) { + repl_instance.OnSuccessPing(); + return; + } + + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); + + auto const curr_main_uuid = self->GetMainUUID(); + if (curr_main_uuid == repl_instance_uuid.value()) { + repl_instance.OnSuccessPing(); + return; + } + + // TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but + // swapUUID can fail + if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) { + repl_instance.OnSuccessPing(); + spdlog::info("Instance {} demoted to replica", repl_instance_name); + } else { + spdlog::error("Instance {} failed to become replica", repl_instance_name); + return; + } + + if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) { + spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName())); + return; + } + }; + + main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing main failure callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + repl_instance.OnFailPing(); + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); + + if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) { + spdlog::info("Cluster without main instance, trying automatic failover"); + self->TryFailover(); // TODO: (andi) Initiate failover + } + }; } -auto CoordinatorInstance::InstanceName() const -> std::string { - return "coordinator_" + std::to_string(raft_server_id_); +auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool { + auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); }; + return std::ranges::any_of(repl_instances_, alive_main); } -auto CoordinatorInstance::RaftSocketAddress() const -> std::string { - return raft_address_ + ":" + std::to_string(raft_port_); +auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { + auto const coord_instances = raft_state_.GetAllCoordinators(); + + std::vector<InstanceStatus> instances_status; + instances_status.reserve(repl_instances_.size() + coord_instances.size()); + + auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string { + if (!instance.IsAlive()) return "unknown"; + if (instance.IsMain()) return "main"; + return "replica"; + }; + + auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus { + return {.instance_name = instance.InstanceName(), + .coord_socket_address = instance.SocketAddress(), + .cluster_role = stringify_repl_role(instance), + .is_alive = instance.IsAlive()}; + }; + + auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus { + return {.instance_name = "coordinator_" + std::to_string(instance->get_id()), + .raft_socket_address = instance->get_endpoint(), + .cluster_role = "coordinator", + .is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move + // CoordinatorState to every instance, we can be smarter about this using our RPC. + }; + + std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status); + + { + auto lock = std::shared_lock{coord_instance_lock_}; + std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status); + } + + return instances_status; +} + +auto CoordinatorInstance::TryFailover() -> void { + auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) | + ranges::views::filter(&ReplicationInstance::IsAlive); + + if (ranges::empty(alive_replicas)) { + spdlog::warn("Failover failed since all replicas are down!"); + return; + } + + // TODO: Smarter choice + auto new_main = ranges::begin(alive_replicas); + + new_main->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; + + auto const is_not_new_main = [&new_main](ReplicationInstance &instance) { + return instance.InstanceName() != new_main->InstanceName(); + }; + + auto const new_main_uuid = utils::UUID{}; + // If for some replicas swap fails, for others on successful ping we will revert back on next change + // or we will do failover first again and then it will be consistent again + for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) { + if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) { + spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover", + other_replica_instance.InstanceName())); + return; + } + } + + ReplicationClientsInfo repl_clients_info; + repl_clients_info.reserve(repl_instances_.size() - 1); + std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), + std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); + + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { + spdlog::warn("Failover failed since promoting replica to main failed!"); + return; + } + // TODO: (andi) This should be replicated across all coordinator instances with Raft log + SetMainUUID(new_main_uuid); + spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); +} + +// TODO: (andi) Make sure you cannot put coordinator instance to the main +auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name) + -> SetInstanceToMainCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + auto new_main = std::ranges::find_if(repl_instances_, is_new_main); + + if (new_main == repl_instances_.end()) { + spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, + instance_name); + return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + + new_main->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; + + auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() != instance_name; + }; + + auto const new_main_uuid = utils::UUID{}; + + for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { + if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) { + spdlog::error( + fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); + return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; + } + } + + ReplicationClientsInfo repl_clients_info; + repl_clients_info.reserve(repl_instances_.size() - 1); + std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), + std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); + + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { + return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; + } + + // TODO: (andi) This should be replicated across all coordinator instances with Raft log + SetMainUUID(new_main_uuid); + spdlog::info("Instance {} promoted to main", instance_name); + return SetInstanceToMainCoordinatorStatus::SUCCESS; +} + +auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config) + -> RegisterInstanceCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + auto instance_name = config.instance_name; + + auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + + if (std::ranges::any_of(repl_instances_, name_matches)) { + return RegisterInstanceCoordinatorStatus::NAME_EXISTS; + } + + auto const socket_address_matches = [&config](ReplicationInstance const &instance) { + return instance.SocketAddress() == config.SocketAddress(); + }; + + if (std::ranges::any_of(repl_instances_, socket_address_matches)) { + return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; + } + + if (!raft_state_.RequestLeadership()) { + return RegisterInstanceCoordinatorStatus::NOT_LEADER; + } + + auto const res = raft_state_.AppendRegisterReplicationInstance(instance_name); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " + "the " + "leader.", + config.instance_name); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; + } + + spdlog::info("Request for registering instance {} accepted", instance_name); + try { + repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); + } catch (CoordinatorRegisterInstanceException const &) { + return RegisterInstanceCoordinatorStatus::RPC_FAILED; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to register instance {} with error code {}", instance_name, res->get_result_code()); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND; + } + + spdlog::info("Instance {} registered", instance_name); + return RegisterInstanceCoordinatorStatus::SUCCESS; } auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void { - auto const endpoint = raft_address + ":" + std::to_string(raft_port); - srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint); - if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) { - throw RaftAddServerException("Failed to add server {} to the cluster", endpoint); - } - spdlog::info("Request to add server {} to the cluster accepted", endpoint); + raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); } -auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> { - std::vector<ptr<srv_config>> all_srv_configs; - raft_server_->get_srv_config_all(all_srv_configs); - return all_srv_configs; -} +auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; } + +// TODO: (andi) Add to the RAFT log. +auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; } } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_log_store.cpp b/src/coordination/coordinator_log_store.cpp index 11b7be0dd..37126b747 100644 --- a/src/coordination/coordinator_log_store.cpp +++ b/src/coordination/coordinator_log_store.cpp @@ -13,214 +13,149 @@ #include "nuraft/coordinator_log_store.hpp" +#include "coordination/coordinator_exceptions.hpp" +#include "utils/logging.hpp" + namespace memgraph::coordination { using nuraft::cs_new; -using nuraft::timer_helper; -CoordinatorLogStore::CoordinatorLogStore() - : start_idx_(1), - raft_server_bwd_pointer_(nullptr), - disk_emul_delay(0), - disk_emul_thread_(nullptr), - disk_emul_thread_stop_signal_(false), - disk_emul_last_durable_index_(0) { - // Dummy entry for index 0. - ptr<buffer> buf = buffer::alloc(sz_ulong); +namespace { + +ptr<log_entry> MakeClone(const ptr<log_entry> &entry) { + return cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(), + entry->get_timestamp()); +} + +} // namespace + +CoordinatorLogStore::CoordinatorLogStore() : start_idx_(1) { + ptr<buffer> buf = buffer::alloc(sizeof(uint64_t)); logs_[0] = cs_new<log_entry>(0, buf); } -CoordinatorLogStore::~CoordinatorLogStore() { - if (disk_emul_thread_) { - disk_emul_thread_stop_signal_ = true; - // disk_emul_ea_.invoke(); - if (disk_emul_thread_->joinable()) { - disk_emul_thread_->join(); - } - } -} +CoordinatorLogStore::~CoordinatorLogStore() {} -ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) { - // NOTE: - // Timestamp is used only when `replicate_log_timestamp_` option is on. - // Otherwise, log store does not need to store or load it. - ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(), - entry->get_timestamp()); - return clone; -} - -ulong CoordinatorLogStore::next_slot() const { - std::lock_guard<std::mutex> l(logs_lock_); - // Exclude the dummy entry. - return start_idx_ + logs_.size() - 1; -} - -ulong CoordinatorLogStore::start_index() const { return start_idx_; } - -ptr<log_entry> CoordinatorLogStore::last_entry() const { - ulong next_idx = next_slot(); - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.find(next_idx - 1); +auto CoordinatorLogStore::FindOrDefault_(uint64_t index) const -> ptr<log_entry> { + auto entry = logs_.find(index); if (entry == logs_.end()) { entry = logs_.find(0); } - - return MakeClone(entry->second); + return entry->second; } -ulong CoordinatorLogStore::append(ptr<log_entry> &entry) { +uint64_t CoordinatorLogStore::next_slot() const { + auto lock = std::lock_guard{logs_lock_}; + return start_idx_ + logs_.size() - 1; +} + +uint64_t CoordinatorLogStore::start_index() const { return start_idx_; } + +ptr<log_entry> CoordinatorLogStore::last_entry() const { + auto lock = std::lock_guard{logs_lock_}; + + uint64_t const last_idx = start_idx_ + logs_.size() - 1; + auto const last_src = FindOrDefault_(last_idx - 1); + + return MakeClone(last_src); +} + +uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) { ptr<log_entry> clone = MakeClone(entry); - - std::lock_guard<std::mutex> l(logs_lock_); - size_t idx = start_idx_ + logs_.size() - 1; - logs_[idx] = clone; - - if (disk_emul_delay) { - uint64_t cur_time = timer_helper::get_timeofday_us(); - disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx; - // disk_emul_ea_.invoke(); + uint64_t next_slot{0}; + { + auto lock = std::lock_guard{logs_lock_}; + next_slot = start_idx_ + logs_.size() - 1; + logs_[next_slot] = clone; } - - return idx; + return next_slot; } -void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) { +void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) { ptr<log_entry> clone = MakeClone(entry); // Discard all logs equal to or greater than `index. - std::lock_guard<std::mutex> l(logs_lock_); - auto itr = logs_.lower_bound(index); - while (itr != logs_.end()) { - itr = logs_.erase(itr); - } - logs_[index] = clone; - - if (disk_emul_delay) { - uint64_t cur_time = timer_helper::get_timeofday_us(); - disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index; - - // Remove entries greater than `index`. - auto entry = disk_emul_logs_being_written_.begin(); - while (entry != disk_emul_logs_being_written_.end()) { - if (entry->second > index) { - entry = disk_emul_logs_being_written_.erase(entry); - } else { - entry++; - } + { + auto lock = std::lock_guard{logs_lock_}; + auto itr = logs_.lower_bound(index); + while (itr != logs_.end()) { + itr = logs_.erase(itr); } - // disk_emul_ea_.invoke(); + logs_[index] = clone; } } -ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) { - ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>(); - +ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) { + auto ret = cs_new<std::vector<ptr<log_entry>>>(); ret->resize(end - start); - ulong cc = 0; - for (ulong ii = start; ii < end; ++ii) { + + for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) { ptr<log_entry> src = nullptr; { - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.find(ii); - if (entry == logs_.end()) { - entry = logs_.find(0); - assert(0); + auto lock = std::lock_guard{logs_lock_}; + if (auto const entry = logs_.find(i); entry != logs_.end()) { + src = entry->second; + } else { + throw RaftCouldNotFindEntryException("Could not find entry at index {}", i); } - src = entry->second; } - (*ret)[cc++] = MakeClone(src); + (*ret)[curr_index] = MakeClone(src); } return ret; } -// NOLINTNEXTLINE(google-default-arguments) -ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end, - int64 batch_size_hint_in_bytes) { - ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>(); - - if (batch_size_hint_in_bytes < 0) { - return ret; - } - - size_t accum_size = 0; - for (ulong ii = start; ii < end; ++ii) { - ptr<log_entry> src = nullptr; - { - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.find(ii); - if (entry == logs_.end()) { - entry = logs_.find(0); - assert(0); - } - src = entry->second; - } - ret->push_back(MakeClone(src)); - accum_size += src->get_buf().size(); - if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break; - } - return ret; -} - -ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) { +ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) { ptr<log_entry> src = nullptr; { - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.find(index); - if (entry == logs_.end()) { - entry = logs_.find(0); - } - src = entry->second; + auto lock = std::lock_guard{logs_lock_}; + src = FindOrDefault_(index); } return MakeClone(src); } -ulong CoordinatorLogStore::term_at(ulong index) { - ulong term = 0; +uint64_t CoordinatorLogStore::term_at(uint64_t index) { + uint64_t term = 0; { - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.find(index); - if (entry == logs_.end()) { - entry = logs_.find(0); - } - term = entry->second->get_term(); + auto lock = std::lock_guard{logs_lock_}; + term = FindOrDefault_(index)->get_term(); } return term; } -ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) { +ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) { std::vector<ptr<buffer>> logs; size_t size_total = 0; - for (ulong ii = index; ii < index + cnt; ++ii) { + uint64_t const end_index = index + cnt; + for (uint64_t i = index; i < end_index; ++i) { ptr<log_entry> le = nullptr; { - std::lock_guard<std::mutex> l(logs_lock_); - le = logs_[ii]; + auto lock = std::lock_guard{logs_lock_}; + le = logs_[i]; } - assert(le.get()); - ptr<buffer> buf = le->serialize(); + MG_ASSERT(le.get(), "Could not find log entry at index {}", i); + auto buf = le->serialize(); size_total += buf->size(); logs.push_back(buf); } - ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total); + auto buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total); buf_out->pos(0); buf_out->put((int32)cnt); for (auto &entry : logs) { - ptr<buffer> &bb = entry; - buf_out->put((int32)bb->size()); - buf_out->put(*bb); + buf_out->put(static_cast<int32>(entry->size())); + buf_out->put(*entry); } return buf_out; } -void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) { +void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) { pack.pos(0); - int32 num_logs = pack.get_int(); + int32 const num_logs = pack.get_int(); - for (int32 ii = 0; ii < num_logs; ++ii) { - ulong cur_idx = index + ii; + for (int32 i = 0; i < num_logs; ++i) { + uint64_t cur_idx = index + i; int32 buf_size = pack.get_int(); ptr<buffer> buf_local = buffer::alloc(buf_size); @@ -228,14 +163,14 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) { ptr<log_entry> le = log_entry::deserialize(*buf_local); { - std::lock_guard<std::mutex> l(logs_lock_); + auto lock = std::lock_guard{logs_lock_}; logs_[cur_idx] = le; } } { - std::lock_guard<std::mutex> l(logs_lock_); - auto entry = logs_.upper_bound(0); + auto lock = std::lock_guard{logs_lock_}; + auto const entry = logs_.upper_bound(0); if (entry != logs_.end()) { start_idx_ = entry->first; } else { @@ -244,88 +179,23 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) { } } -bool CoordinatorLogStore::compact(ulong last_log_index) { - std::lock_guard<std::mutex> l(logs_lock_); - for (ulong ii = start_idx_; ii <= last_log_index; ++ii) { - auto entry = logs_.find(ii); +// NOTE: Remove all logs up to given 'last_log_index' (inclusive). +bool CoordinatorLogStore::compact(uint64_t last_log_index) { + auto lock = std::lock_guard{logs_lock_}; + for (uint64_t ii = start_idx_; ii <= last_log_index; ++ii) { + auto const entry = logs_.find(ii); if (entry != logs_.end()) { logs_.erase(entry); } } - // WARNING: - // Even though nothing has been erased, - // we should set `start_idx_` to new index. if (start_idx_ <= last_log_index) { start_idx_ = last_log_index + 1; } return true; } -bool CoordinatorLogStore::flush() { - disk_emul_last_durable_index_ = next_slot() - 1; - return true; -} - -ulong CoordinatorLogStore::last_durable_index() { - uint64_t last_log = next_slot() - 1; - if (!disk_emul_delay) { - return last_log; - } - - return disk_emul_last_durable_index_; -} - -void CoordinatorLogStore::DiskEmulLoop() { - // This thread mimics async disk writes. - - // uint32_t next_sleep_us = 100 * 1000; - while (!disk_emul_thread_stop_signal_) { - // disk_emul_ea_.wait_us(next_sleep_us); - // disk_emul_ea_.reset(); - if (disk_emul_thread_stop_signal_) break; - - uint64_t cur_time = timer_helper::get_timeofday_us(); - // next_sleep_us = 100 * 1000; - - bool call_notification = false; - { - std::lock_guard<std::mutex> l(logs_lock_); - // Remove all timestamps equal to or smaller than `cur_time`, - // and pick the greatest one among them. - auto entry = disk_emul_logs_being_written_.begin(); - while (entry != disk_emul_logs_being_written_.end()) { - if (entry->first <= cur_time) { - disk_emul_last_durable_index_ = entry->second; - entry = disk_emul_logs_being_written_.erase(entry); - call_notification = true; - } else { - break; - } - } - - entry = disk_emul_logs_being_written_.begin(); - if (entry != disk_emul_logs_being_written_.end()) { - // next_sleep_us = entry->first - cur_time; - } - } - - if (call_notification) { - raft_server_bwd_pointer_->notify_log_append_completion(true); - } - } -} - -void CoordinatorLogStore::Close() {} - -void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) { - disk_emul_delay = delay_ms; - raft_server_bwd_pointer_ = raft; - - if (!disk_emul_thread_) { - disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this); - } -} +bool CoordinatorLogStore::flush() { return true; } } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 8337fa9d8..a2f6c9cee 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -41,37 +41,39 @@ CoordinatorState::CoordinatorState() { } } -auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), +auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config) + -> RegisterInstanceCoordinatorStatus { + MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), "Coordinator cannot register replica since variant holds wrong alternative"); return std::visit( - memgraph::utils::Overloaded{ - [](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { - return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; - }, - [config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }}, + memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { + return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; + }, + [config](CoordinatorInstance &coordinator_instance) { + return coordinator_instance.RegisterReplicationInstance(config); + }}, data_); } -auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), +auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { + MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), "Coordinator cannot register replica since variant holds wrong alternative"); return std::visit( memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR; }, - [&instance_name](CoordinatorData &coordinator_data) { - return coordinator_data.SetInstanceToMain(instance_name); + [&instance_name](CoordinatorInstance &coordinator_instance) { + return coordinator_instance.SetReplicationInstanceToMain(instance_name); }}, data_); } auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), "Can't call show instances on data_, as variant holds wrong alternative"); - return std::get<CoordinatorData>(data_).ShowInstances(); + return std::get<CoordinatorInstance>(data_).ShowInstances(); } auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & { @@ -82,9 +84,9 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & { auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), "Coordinator cannot register replica since variant holds wrong alternative"); - return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address); + return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address); } } // namespace memgraph::coordination diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index a278ab422..b939bd304 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -15,6 +15,19 @@ namespace memgraph::coordination { +auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> { + std::string str_log = name + "_replica"; + ptr<buffer> log = buffer::alloc(sizeof(uint32_t) + str_log.size()); + buffer_serializer bs(log); + bs.put_str(str_log); + return log; +} + +auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string { + buffer_serializer bs(data); + return bs.get_str(); +} + auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> { buffer_serializer bs(data); std::string str = bs.get_str(); diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 76ae49a9f..02bae1c03 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -20,14 +20,14 @@ namespace memgraph::coordination { -class CoordinatorData; -using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>; +class CoordinatorInstance; +using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>; using ReplicationClientsInfo = std::vector<ReplClientInfo>; class CoordinatorClient { public: - explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb, - HealthCheckCallback fail_cb); + explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config, + HealthCheckCallback succ_cb, HealthCheckCallback fail_cb); ~CoordinatorClient() = default; @@ -69,7 +69,7 @@ class CoordinatorClient { mutable rpc::Client rpc_client_; CoordinatorClientConfig config_; - CoordinatorData *coord_data_; + CoordinatorInstance *coord_instance_; HealthCheckCallback succ_cb_; HealthCheckCallback fail_cb_; }; diff --git a/src/coordination/include/coordination/coordinator_data.hpp b/src/coordination/include/coordination/coordinator_data.hpp deleted file mode 100644 index 9f4c60297..000000000 --- a/src/coordination/include/coordination/coordinator_data.hpp +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -#ifdef MG_ENTERPRISE - -#include "coordination/coordinator_instance.hpp" -#include "coordination/coordinator_server.hpp" -#include "coordination/instance_status.hpp" -#include "coordination/register_main_replica_coordinator_status.hpp" -#include "coordination/replication_instance.hpp" -#include "replication_coordination_glue/handler.hpp" -#include "utils/rw_lock.hpp" -#include "utils/thread_pool.hpp" -#include "utils/uuid.hpp" - -#include <list> - -namespace memgraph::coordination { -class CoordinatorData { - public: - CoordinatorData(); - - // TODO: (andi) Probably rename to RegisterReplicationInstance - [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; - - [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; - - auto ShowInstances() const -> std::vector<InstanceStatus>; - - auto TryFailover() -> void; - - auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; - - private: - HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; - - // NOTE: Must be std::list because we rely on pointer stability - std::list<ReplicationInstance> repl_instances_; - mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; - - CoordinatorInstance self_; - - utils::UUID main_uuid_; -}; - -struct CoordinatorMainReplicaData { - std::unique_ptr<CoordinatorServer> coordinator_server_; -}; - -} // namespace memgraph::coordination -#endif diff --git a/src/coordination/include/coordination/coordinator_exceptions.hpp b/src/coordination/include/coordination/coordinator_exceptions.hpp index 5b697e371..59a2e89d8 100644 --- a/src/coordination/include/coordination/coordinator_exceptions.hpp +++ b/src/coordination/include/coordination/coordinator_exceptions.hpp @@ -50,5 +50,38 @@ class RaftAddServerException final : public utils::BasicException { SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException) }; +class RaftBecomeLeaderException final : public utils::BasicException { + public: + explicit RaftBecomeLeaderException(std::string_view what) noexcept : BasicException(what) {} + + template <class... Args> + explicit RaftBecomeLeaderException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : RaftBecomeLeaderException(fmt::format(fmt, std::forward<Args>(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(RaftBecomeLeaderException) +}; + +class RaftCouldNotFindEntryException final : public utils::BasicException { + public: + explicit RaftCouldNotFindEntryException(std::string_view what) noexcept : BasicException(what) {} + + template <class... Args> + explicit RaftCouldNotFindEntryException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : RaftCouldNotFindEntryException(fmt::format(fmt, std::forward<Args>(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotFindEntryException) +}; + +class RaftCouldNotParseFlagsException final : public utils::BasicException { + public: + explicit RaftCouldNotParseFlagsException(std::string_view what) noexcept : BasicException(what) {} + + template <class... Args> + explicit RaftCouldNotParseFlagsException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : RaftCouldNotParseFlagsException(fmt::format(fmt, std::forward<Args>(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException) +}; + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 1c7af59ae..bc6954b37 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -13,45 +13,48 @@ #ifdef MG_ENTERPRISE -#include <flags/replication.hpp> +#include "coordination/coordinator_server.hpp" +#include "coordination/instance_status.hpp" +#include "coordination/raft_state.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" +#include "coordination/replication_instance.hpp" +#include "utils/rw_lock.hpp" +#include "utils/thread_pool.hpp" -#include <libnuraft/nuraft.hxx> +#include <list> namespace memgraph::coordination { -using nuraft::logger; -using nuraft::ptr; -using nuraft::raft_launcher; -using nuraft::raft_server; -using nuraft::srv_config; -using nuraft::state_machine; -using nuraft::state_mgr; - class CoordinatorInstance { public: CoordinatorInstance(); - CoordinatorInstance(CoordinatorInstance const &other) = delete; - CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete; - CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete; - CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete; - ~CoordinatorInstance() = default; - auto InstanceName() const -> std::string; - auto RaftSocketAddress() const -> std::string; + [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + + [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + + auto ShowInstances() const -> std::vector<InstanceStatus>; + + auto TryFailover() -> void; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; - auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>; + + auto GetMainUUID() const -> utils::UUID; + + auto SetMainUUID(utils::UUID new_uuid) -> void; private: - ptr<state_machine> state_machine_; - ptr<state_mgr> state_manager_; - ptr<raft_server> raft_server_; - ptr<logger> logger_; - raft_launcher launcher_; + auto ClusterHasAliveMain_() const -> bool; - // TODO: (andi) I think variables below can be abstracted - uint32_t raft_server_id_; - uint32_t raft_port_; - std::string raft_address_; + HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; + + // NOTE: Must be std::list because we rely on pointer stability + std::list<ReplicationInstance> repl_instances_; + mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ}; + + utils::UUID main_uuid_; + + RaftState raft_state_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 9ab33a04e..8830d1b49 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -13,7 +13,7 @@ #ifdef MG_ENTERPRISE -#include "coordination/coordinator_data.hpp" +#include "coordination/coordinator_instance.hpp" #include "coordination/coordinator_server.hpp" #include "coordination/instance_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" @@ -33,19 +33,23 @@ class CoordinatorState { CoordinatorState(CoordinatorState &&) noexcept = delete; CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; - [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; - [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector<InstanceStatus>; auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; - // The client code must check that the server exists before calling this method. + // NOTE: The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; private: - std::variant<CoordinatorData, CoordinatorMainReplicaData> data_; + struct CoordinatorMainReplicaData { + std::unique_ptr<CoordinatorServer> coordinator_server_; + }; + + std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp new file mode 100644 index 000000000..6b53197a0 --- /dev/null +++ b/src/coordination/include/coordination/raft_state.hpp @@ -0,0 +1,81 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <flags/replication.hpp> + +#include <optional> + +#include <libnuraft/nuraft.hxx> + +namespace memgraph::coordination { + +using BecomeLeaderCb = std::function<void()>; +using BecomeFollowerCb = std::function<void()>; + +using nuraft::buffer; +using nuraft::logger; +using nuraft::ptr; +using nuraft::raft_launcher; +using nuraft::raft_server; +using nuraft::srv_config; +using nuraft::state_machine; +using nuraft::state_mgr; +using raft_result = nuraft::cmd_result<ptr<buffer>>; + +class RaftState { + private: + explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id, + uint32_t raft_port, std::string raft_address); + + auto InitRaftServer() -> void; + + public: + RaftState() = delete; + RaftState(RaftState const &other) = default; + RaftState &operator=(RaftState const &other) = default; + RaftState(RaftState &&other) noexcept = default; + RaftState &operator=(RaftState &&other) noexcept = default; + ~RaftState(); + + static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState; + + auto InstanceName() const -> std::string; + auto RaftSocketAddress() const -> std::string; + + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>; + + auto RequestLeadership() -> bool; + auto IsLeader() const -> bool; + + auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result>; + + // TODO: (andi) I think variables below can be abstracted + uint32_t raft_server_id_; + uint32_t raft_port_; + std::string raft_address_; + + ptr<state_machine> state_machine_; + ptr<state_mgr> state_manager_; + ptr<raft_server> raft_server_; + ptr<logger> logger_; + raft_launcher launcher_; + + BecomeLeaderCb become_leader_cb_; + BecomeFollowerCb become_follower_cb_; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index bf35e9156..3a0df5607 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -22,6 +22,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t { ENDPOINT_EXISTS, NOT_COORDINATOR, RPC_FAILED, + NOT_LEADER, + RAFT_COULD_NOT_ACCEPT, + RAFT_COULD_NOT_APPEND, SUCCESS }; diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 9d4765b47..713a66fd8 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -23,11 +23,11 @@ namespace memgraph::coordination { -class CoordinatorData; +class CoordinatorInstance; class ReplicationInstance { public: - ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb, + ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb, HealthCheckCallback fail_cb); ReplicationInstance(ReplicationInstance const &other) = delete; @@ -51,15 +51,19 @@ class ReplicationInstance { HealthCheckCallback main_fail_cb) -> bool; auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; + auto StartFrequentCheck() -> void; + auto StopFrequentCheck() -> void; auto PauseFrequentCheck() -> void; auto ResumeFrequentCheck() -> void; auto ReplicationClientInfo() const -> ReplClientInfo; - auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool; + auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool; + auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; auto GetClient() -> CoordinatorClient &; - void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt); + auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; + auto ResetMainUUID() -> void; auto GetMainUUID() -> const std::optional<utils::UUID> &; private: diff --git a/src/coordination/include/nuraft/coordinator_log_store.hpp b/src/coordination/include/nuraft/coordinator_log_store.hpp index ce1695d2f..46e70b7f4 100644 --- a/src/coordination/include/nuraft/coordinator_log_store.hpp +++ b/src/coordination/include/nuraft/coordinator_log_store.hpp @@ -46,9 +46,6 @@ class CoordinatorLogStore : public log_store { ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override; - // NOLINTNEXTLINE - ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override; - ptr<log_entry> entry_at(ulong index) override; ulong term_at(ulong index) override; @@ -61,67 +58,12 @@ class CoordinatorLogStore : public log_store { bool flush() override; - ulong last_durable_index() override; - - void Close(); - - void SetDiskDelay(raft_server *raft, size_t delay_ms); - private: - static ptr<log_entry> MakeClone(ptr<log_entry> const &entry); + auto FindOrDefault_(ulong index) const -> ptr<log_entry>; - void DiskEmulLoop(); - - /** - * Map of <log index, log data>. - */ std::map<ulong, ptr<log_entry>> logs_; - - /** - * Lock for `logs_`. - */ mutable std::mutex logs_lock_; - - /** - * The index of the first log. - */ std::atomic<ulong> start_idx_; - - /** - * Backward pointer to Raft server. - */ - raft_server *raft_server_bwd_pointer_; - - // Testing purpose --------------- BEGIN - - /** - * If non-zero, this log store will emulate the disk write delay. - */ - std::atomic<size_t> disk_emul_delay; - - /** - * Map of <timestamp, log index>, emulating logs that is being written to disk. - * Log index will be regarded as "durable" after the corresponding timestamp. - */ - std::map<uint64_t, uint64_t> disk_emul_logs_being_written_; - - /** - * Thread that will update `last_durable_index_` and call - * `notify_log_append_completion` at proper time. - */ - std::unique_ptr<std::thread> disk_emul_thread_; - - /** - * Flag to terminate the thread. - */ - std::atomic<bool> disk_emul_thread_stop_signal_; - - /** - * Last written log index. - */ - std::atomic<uint64_t> disk_emul_last_durable_index_; - - // Testing purpose --------------- END }; } // namespace memgraph::coordination diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index fd7e92401..5b5f37b48 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -36,6 +36,10 @@ class CoordinatorStateMachine : public state_machine { CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete; ~CoordinatorStateMachine() override {} + static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer>; + + static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string; + auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override; auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override; diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp new file mode 100644 index 000000000..d171a6b3d --- /dev/null +++ b/src/coordination/raft_state.cpp @@ -0,0 +1,140 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "coordination/raft_state.hpp" + +#include "coordination/coordinator_exceptions.hpp" +#include "nuraft/coordinator_state_machine.hpp" +#include "nuraft/coordinator_state_manager.hpp" +#include "utils/counter.hpp" + +namespace memgraph::coordination { + +using nuraft::asio_service; +using nuraft::cb_func; +using nuraft::CbReturnCode; +using nuraft::cmd_result; +using nuraft::cs_new; +using nuraft::ptr; +using nuraft::raft_params; +using nuraft::raft_server; +using nuraft::srv_config; +using raft_result = cmd_result<ptr<buffer>>; + +RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id, + uint32_t raft_port, std::string raft_address) + : raft_server_id_(raft_server_id), + raft_port_(raft_port), + raft_address_(std::move(raft_address)), + state_machine_(cs_new<CoordinatorStateMachine>()), + state_manager_( + cs_new<CoordinatorStateManager>(raft_server_id_, raft_address_ + ":" + std::to_string(raft_port_))), + logger_(nullptr), + become_leader_cb_(std::move(become_leader_cb)), + become_follower_cb_(std::move(become_follower_cb)) {} + +auto RaftState::InitRaftServer() -> void { + asio_service::options asio_opts; + asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this + + raft_params params; + params.heart_beat_interval_ = 100; + params.election_timeout_lower_bound_ = 200; + params.election_timeout_upper_bound_ = 400; + // 5 logs are preserved before the last snapshot + params.reserved_log_items_ = 5; + // Create snapshot for every 5 log appends + params.snapshot_distance_ = 5; + params.client_req_timeout_ = 3000; + params.return_method_ = raft_params::blocking; + + raft_server::init_options init_opts; + init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { + if (event_type == cb_func::BecomeLeader) { + spdlog::info("Node {} became leader", param->leaderId); + become_leader_cb_(); + } else if (event_type == cb_func::BecomeFollower) { + spdlog::info("Node {} became follower", param->myId); + become_follower_cb_(); + } + return CbReturnCode::Ok; + }; + + raft_launcher launcher; + + raft_server_ = launcher.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params, + init_opts); + + if (!raft_server_) { + throw RaftServerStartException("Failed to launch raft server on {}:{}", raft_address_, raft_port_); + } + + auto maybe_stop = utils::ResettableCounter<20>(); + do { + if (raft_server_->is_initialized()) { + return; + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } while (!maybe_stop()); + + throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_); +} + +auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState { + uint32_t raft_server_id{0}; + uint32_t raft_port{0}; + try { + raft_server_id = FLAGS_raft_server_id; + raft_port = FLAGS_raft_server_port; + } catch (std::exception const &e) { + throw RaftCouldNotParseFlagsException("Failed to parse flags: {}", e.what()); + } + + auto raft_state = + RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_server_id, raft_port, "127.0.0.1"); + raft_state.InitRaftServer(); + return raft_state; +} + +RaftState::~RaftState() { launcher_.shutdown(); } + +auto RaftState::InstanceName() const -> std::string { return "coordinator_" + std::to_string(raft_server_id_); } + +auto RaftState::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); } + +auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void { + auto const endpoint = raft_address + ":" + std::to_string(raft_port); + srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint); + if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) { + throw RaftAddServerException("Failed to add server {} to the cluster", endpoint); + } + spdlog::info("Request to add server {} to the cluster accepted", endpoint); +} + +auto RaftState::GetAllCoordinators() const -> std::vector<ptr<srv_config>> { + std::vector<ptr<srv_config>> all_srv_configs; + raft_server_->get_srv_config_all(all_srv_configs); + return all_srv_configs; +} + +auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } + +auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } + +auto RaftState::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> { + auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance); + return raft_server_->append_entries({new_log}); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 96a5c2a0e..0fb13998c 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -17,14 +17,14 @@ namespace memgraph::coordination { -ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, +ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) - : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)), - replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), - is_alive_(true) { + : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), + replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) { if (!client_.DemoteToReplica()) { throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); } + client_.StartFrequentCheck(); } @@ -51,13 +51,14 @@ auto ReplicationInstance::IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; } -auto ReplicationInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, +auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool { - if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) { + if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { return false; } replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; + main_uuid_ = new_uuid; client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb)); return true; @@ -75,6 +76,8 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, H return true; } +auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); } +auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); } auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } @@ -83,14 +86,22 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf } auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } -void ReplicationInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; } +auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } +auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; } auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; } -auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool { - if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) { +auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { + if (!main_uuid_ || *main_uuid_ != curr_main_uuid) { + return SendSwapAndUpdateUUID(curr_main_uuid); + } + return true; +} + +auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool { + if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { return false; } - SetNewMainUUID(main_uuid_); + SetNewMainUUID(new_main_uuid); return true; } diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index d1310dee2..b623e1db6 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -20,14 +20,14 @@ namespace memgraph::dbms { CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state) : coordinator_state_(coordinator_state) {} -auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config) +auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config) -> coordination::RegisterInstanceCoordinatorStatus { - return coordinator_state_.RegisterInstance(config); + return coordinator_state_.RegisterReplicationInstance(config); } -auto CoordinatorHandler::SetInstanceToMain(std::string instance_name) +auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus { - return coordinator_state_.SetInstanceToMain(std::move(instance_name)); + return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name)); } auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> { diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index a2a1f19dc..03d45ee41 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -28,10 +28,10 @@ class CoordinatorHandler { public: explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state); - auto RegisterInstance(coordination::CoordinatorClientConfig config) + auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config) -> coordination::RegisterInstanceCoordinatorStatus; - auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; + auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector<coordination::InstanceStatus>; diff --git a/src/dbms/replication_client.cpp b/src/dbms/replication_client.cpp deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/dbms/replication_handler.cpp b/src/dbms/replication_handler.cpp deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/dbms/replication_handler.hpp b/src/dbms/replication_handler.hpp deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 2ddb8dd2a..e9c3ec3f9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -458,9 +458,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { : coordinator_handler_(coordinator_state) {} /// @throw QueryRuntimeException if an error ocurred. - void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, const std::string &instance_name, - CoordinatorQuery::SyncMode sync_mode) override { + void RegisterReplicationInstance(const std::string &coordinator_socket_address, + const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, + const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override { const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); if (!maybe_replication_ip_port) { @@ -489,7 +490,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_client_info = repl_config, .ssl = std::nullopt}; - auto status = coordinator_handler_.RegisterInstance(coordinator_client_config); + auto status = coordinator_handler_.RegisterReplicationInstance(coordinator_client_config); switch (status) { using enum memgraph::coordination::RegisterInstanceCoordinatorStatus; case NAME_EXISTS: @@ -499,6 +500,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { "Couldn't register replica instance since instance with such endpoint already exists!"); case NOT_COORDINATOR: throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!"); + case NOT_LEADER: + throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!"); + case RAFT_COULD_NOT_ACCEPT: + throw QueryRuntimeException( + "Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft " + "instance is not a leader!"); + case RAFT_COULD_NOT_APPEND: + throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!"); case RPC_FAILED: throw QueryRuntimeException( "Couldn't register replica instance because setting instance to replica failed! Check logs on replica to " @@ -519,8 +528,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - void SetInstanceToMain(const std::string &instance_name) override { - auto status = coordinator_handler_.SetInstanceToMain(instance_name); + void SetReplicationInstanceToMain(const std::string &instance_name) override { + auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name); switch (status) { using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus; case NO_INSTANCE_WITH_NAME: @@ -1145,9 +1154,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency, instance_name = coordinator_query->instance_name_, sync_mode = coordinator_query->sync_mode_]() mutable { - handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()), - std::string(replication_socket_address_tv.ValueString()), main_check_frequency, - instance_name, sync_mode); + handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()), + std::string(replication_socket_address_tv.ValueString()), + main_check_frequency, instance_name, sync_mode); return std::vector<std::vector<TypedValue>>(); }; @@ -1176,7 +1185,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param callback.fn = [handler = CoordQueryHandler{*coordinator_state}, instance_name = coordinator_query->instance_name_]() mutable { - handler.SetInstanceToMain(instance_name); + handler.SetReplicationInstanceToMain(instance_name); return std::vector<std::vector<TypedValue>>(); }; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 698c639fa..da032b8e3 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -105,13 +105,13 @@ class CoordinatorQueryHandler { }; /// @throw QueryRuntimeException if an error ocurred. - virtual void RegisterInstance(const std::string &coordinator_socket_address, - const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, const std::string &instance_name, - CoordinatorQuery::SyncMode sync_mode) = 0; + virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address, + const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, + const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual void SetInstanceToMain(const std::string &instance_name) = 0; + virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0; diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt index 424ebd08f..d97080585 100644 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ b/tests/e2e/high_availability_experimental/CMakeLists.txt @@ -1,8 +1,9 @@ find_package(gflags REQUIRED) copy_e2e_python_files(ha_experimental coordinator.py) -copy_e2e_python_files(ha_experimental automatic_failover.py) -copy_e2e_python_files(ha_experimental distributed_coordinators.py) +copy_e2e_python_files(ha_experimental single_coordinator.py) +copy_e2e_python_files(ha_experimental coord_cluster_registration.py) +copy_e2e_python_files(ha_experimental distributed_coords.py) copy_e2e_python_files(ha_experimental manual_setting_replicas.py) copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py) copy_e2e_python_files(ha_experimental common.py) diff --git a/tests/e2e/high_availability_experimental/coord_cluster_registration.py b/tests/e2e/high_availability_experimental/coord_cluster_registration.py new file mode 100644 index 000000000..5feb0bb11 --- /dev/null +++ b/tests/e2e/high_availability_experimental/coord_cluster_registration.py @@ -0,0 +1,284 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import os +import shutil +import sys +import tempfile + +import interactive_mg_runner +import pytest +from common import connect, execute_and_fetch_all, safe_execute +from mg_utils import mg_sleep_and_assert + +interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +interactive_mg_runner.PROJECT_DIR = os.path.normpath( + os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") +) +interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) +interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) + +TEMP_DIR = tempfile.TemporaryDirectory().name + +MEMGRAPH_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "setup_queries": [], + }, +} + + +# NOTE: Repeated execution because it can fail if Raft server is not up +def add_coordinator(cursor, query): + for _ in range(10): + try: + execute_and_fetch_all(cursor, query) + return True + except Exception: + pass + return False + + +def test_register_repl_instances_then_coordinators(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + expected_cluster_coord3 = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3) + + coordinator1_cursor = connect(host="localhost", port=7690).cursor() + + def check_coordinator1(): + return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) + + # TODO: (andi) This should be solved eventually + expected_cluster_not_shared = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ] + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1) + + coordinator2_cursor = connect(host="localhost", port=7691).cursor() + + def check_coordinator2(): + return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2) + + +def test_register_coordinator_then_repl_instances(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + expected_cluster_coord3 = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3) + + coordinator1_cursor = connect(host="localhost", port=7690).cursor() + + def check_coordinator1(): + return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) + + # TODO: (andi) This should be solved eventually + expected_cluster_not_shared = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ] + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1) + + coordinator2_cursor = connect(host="localhost", port=7691).cursor() + + def check_coordinator2(): + return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2) + + +def test_coordinators_communication_with_restarts(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + + expected_cluster_not_shared = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ] + + coordinator1_cursor = connect(host="localhost", port=7690).cursor() + + def check_coordinator1(): + return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1) + + coordinator2_cursor = connect(host="localhost", port=7691).cursor() + + def check_coordinator2(): + return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1") + coordinator1_cursor = connect(host="localhost", port=7690).cursor() + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2") + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2") + coordinator1_cursor = connect(host="localhost", port=7690).cursor() + coordinator2_cursor = connect(host="localhost", port=7691).cursor() + + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1) + mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/distributed_coordinators.py b/tests/e2e/high_availability_experimental/distributed_coordinators.py deleted file mode 100644 index 8a9ebf3c2..000000000 --- a/tests/e2e/high_availability_experimental/distributed_coordinators.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright 2022 Memgraph Ltd. -# -# Use of this software is governed by the Business Source License -# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -# License, and you may not use this file except in compliance with the Business Source License. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0, included in the file -# licenses/APL.txt. - -import os -import shutil -import sys -import tempfile - -import interactive_mg_runner -import pytest -from common import connect, execute_and_fetch_all, safe_execute -from mg_utils import mg_sleep_and_assert - -interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -interactive_mg_runner.PROJECT_DIR = os.path.normpath( - os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") -) -interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) -interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) - -TEMP_DIR = tempfile.TemporaryDirectory().name - -MEMGRAPH_INSTANCES_DESCRIPTION = { - "coordinator1": { - "args": [ - "--bolt-port", - "7687", - "--log-level=TRACE", - "--raft-server-id=1", - "--raft-server-port=10111", - ], - "log_file": "coordinator1.log", - "setup_queries": [], - }, - "coordinator2": { - "args": [ - "--bolt-port", - "7688", - "--log-level=TRACE", - "--raft-server-id=2", - "--raft-server-port=10112", - ], - "log_file": "coordinator2.log", - "setup_queries": [], - }, - "coordinator3": { - "args": [ - "--bolt-port", - "7689", - "--log-level=TRACE", - "--raft-server-id=3", - "--raft-server-port=10113", - ], - "log_file": "coordinator3.log", - "setup_queries": [ - "ADD COORDINATOR 1 ON '127.0.0.1:10111'", - "ADD COORDINATOR 2 ON '127.0.0.1:10112'", - ], - }, -} - - -def test_coordinators_communication(): - safe_execute(shutil.rmtree, TEMP_DIR) - interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - - coordinator3_cursor = connect(host="localhost", port=7689).cursor() - - def check_coordinator3(): - return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) - - expected_cluster = [ - ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), - ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), - ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), - ] - mg_sleep_and_assert(expected_cluster, check_coordinator3) - - coordinator1_cursor = connect(host="localhost", port=7687).cursor() - - def check_coordinator1(): - return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) - - mg_sleep_and_assert(expected_cluster, check_coordinator1) - - coordinator2_cursor = connect(host="localhost", port=7688).cursor() - - def check_coordinator2(): - return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) - - mg_sleep_and_assert(expected_cluster, check_coordinator2) - - -def test_coordinators_communication_with_restarts(): - safe_execute(shutil.rmtree, TEMP_DIR) - interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - - expected_cluster = [ - ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), - ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), - ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), - ] - - coordinator1_cursor = connect(host="localhost", port=7687).cursor() - - def check_coordinator1(): - return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) - - mg_sleep_and_assert(expected_cluster, check_coordinator1) - - coordinator2_cursor = connect(host="localhost", port=7688).cursor() - - def check_coordinator2(): - return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) - - mg_sleep_and_assert(expected_cluster, check_coordinator2) - - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") - interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") - coordinator1_cursor = connect(host="localhost", port=7687).cursor() - - mg_sleep_and_assert(expected_cluster, check_coordinator1) - - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2") - - interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") - interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2") - coordinator1_cursor = connect(host="localhost", port=7687).cursor() - coordinator2_cursor = connect(host="localhost", port=7688).cursor() - - mg_sleep_and_assert(expected_cluster, check_coordinator1) - mg_sleep_and_assert(expected_cluster, check_coordinator2) - - -if __name__ == "__main__": - sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/distributed_coords.py b/tests/e2e/high_availability_experimental/distributed_coords.py new file mode 100644 index 000000000..052cb6dba --- /dev/null +++ b/tests/e2e/high_availability_experimental/distributed_coords.py @@ -0,0 +1,164 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import os +import shutil +import sys +import tempfile + +import interactive_mg_runner +import pytest +from common import connect, execute_and_fetch_all, safe_execute +from mg_utils import mg_sleep_and_assert + +interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +interactive_mg_runner.PROJECT_DIR = os.path.normpath( + os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") +) +interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) +interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) + +TEMP_DIR = tempfile.TemporaryDirectory().name + +MEMGRAPH_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "setup_queries": [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "SET INSTANCE instance_3 TO MAIN", + ], + }, +} + + +def test_distributed_automatic_failover(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + main_cursor = connect(host="localhost", port=7689).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + coord_cursor = connect(host="localhost", port=7692).cursor() + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + new_main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_new_main = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"), + ] + mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_on_new_main_old_alive = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"), + ] + + mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/automatic_failover.py b/tests/e2e/high_availability_experimental/single_coordinator.py similarity index 99% rename from tests/e2e/high_availability_experimental/automatic_failover.py rename to tests/e2e/high_availability_experimental/single_coordinator.py index 1148075a1..d490a36ba 100644 --- a/tests/e2e/high_availability_experimental/automatic_failover.py +++ b/tests/e2e/high_availability_experimental/single_coordinator.py @@ -1,5 +1,4 @@ # Copyright 2022 Memgraph Ltd. -# # Use of this software is governed by the Business Source License # included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source # License, and you may not use this file except in compliance with the Business Source License. diff --git a/tests/e2e/high_availability_experimental/workloads.yaml b/tests/e2e/high_availability_experimental/workloads.yaml index 23fa3a5db..e624d35c0 100644 --- a/tests/e2e/high_availability_experimental/workloads.yaml +++ b/tests/e2e/high_availability_experimental/workloads.yaml @@ -28,18 +28,22 @@ workloads: args: ["high_availability_experimental/coordinator.py"] <<: *ha_cluster - - name: "Automatic failover" + - name: "Single coordinator" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/automatic_failover.py"] + args: ["high_availability_experimental/single_coordinator.py"] - name: "Disabled manual setting of replication cluster" binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/manual_setting_replicas.py"] - - name: "Distributed coordinators" + - name: "Coordinator cluster registration" binary: "tests/e2e/pytest_runner.sh" - args: ["high_availability_experimental/distributed_coordinators.py"] + args: ["high_availability_experimental/coord_cluster_registration.py"] - name: "Not replicate from old main" binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/not_replicate_from_old_main.py"] + + - name: "Distributed coordinators" + binary: "tests/e2e/pytest_runner.sh" + args: ["high_availability_experimental/distributed_coords.py"]