diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 6a0f44a09..d6ff2e397 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -29,14 +29,6 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state j.at("uuid").get_to(instance_state.instance_uuid); } -CoordinatorClusterState::CoordinatorClusterState(std::map> instances, - std::vector coordinators, - utils::UUID const ¤t_main_uuid, bool is_lock_opened) - : repl_instances_{std::move(instances)}, - coordinators_{std::move(coordinators)}, - current_main_uuid_(current_main_uuid), - is_lock_opened_(is_lock_opened) {} - void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) { j = nlohmann::json{{"config", instance_state.config}}; } @@ -45,6 +37,14 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state j.at("config").get_to(instance_state.config); } +CoordinatorClusterState::CoordinatorClusterState(std::map> instances, + std::vector coordinators, + utils::UUID const ¤t_main_uuid, bool is_lock_opened) + : repl_instances_{std::move(instances)}, + coordinators_{std::move(coordinators)}, + current_main_uuid_(current_main_uuid), + is_lock_opened_(is_lock_opened) {} + CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) { auto c2c_config = CoordinatorToCoordinatorConfig{ .coordinator_id = config.coordinator_id, @@ -53,10 +53,6 @@ CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig c coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)}); } -CoordinatorClusterState::CoordinatorClusterState(std::map> instances, - utils::UUID const ¤t_main_uuid, bool is_lock_opened) - : repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {} - CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other) : repl_instances_{other.repl_instances_}, current_main_uuid_(other.current_main_uuid_), @@ -178,7 +174,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog const &log_entry, RaftLogAction case RaftLogAction::ADD_COORDINATOR_INSTANCE: { auto const &config = std::get(log_entry); coordinators_.emplace_back(CoordinatorInstanceState{config}); - spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id); + spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_id); break; } case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: { diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index f6d2b46b5..7a65fd435 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -28,9 +28,10 @@ namespace memgraph::coordination { using nuraft::ptr; -CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &config) +CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config) : thread_pool_{1}, raft_state_(RaftState::MakeRaftState( + config, [this]() { spdlog::info("Leader changed, starting all replication instances!"); auto const instances = raft_state_.GetReplicationInstances(); @@ -40,8 +41,9 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c std::ranges::for_each(replicas, [this](auto &replica) { spdlog::info("Started pinging replication instance {}", replica.config.instance_name); - repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_, - &CoordinatorInstance::ReplicaSuccessCallback, + auto client = + std::make_unique(this, replica.config, client_succ_cb_, client_fail_cb_); + repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback); }); @@ -50,8 +52,9 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c std::ranges::for_each(main, [this](auto &main_instance) { spdlog::info("Started pinging main instance {}", main_instance.config.instance_name); - repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_, - &CoordinatorInstance::MainSuccessCallback, + auto client = std::make_unique(this, main_instance.config, client_succ_cb_, + client_fail_cb_); + repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainFailCallback); }); @@ -88,7 +91,6 @@ CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &c }; } - auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstanceConnector & { auto repl_instance = diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index a14b98d09..887e93648 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -41,7 +41,7 @@ class CoordinatorInstance { CoordinatorInstance(CoordinatorInstance &&) noexcept = delete; CoordinatorInstance &operator=(CoordinatorInstance &&) noexcept = delete; - ~CoordinatorInstance(); + ~CoordinatorInstance() = default; [[nodiscard]] auto RegisterReplicationInstance(CoordinatorToReplicaConfig const &config) -> RegisterInstanceCoordinatorStatus; diff --git a/src/coordination/replication_instance_connector.cpp b/src/coordination/replication_instance_connector.cpp index 00decf9f3..418fba119 100644 --- a/src/coordination/replication_instance_connector.cpp +++ b/src/coordination/replication_instance_connector.cpp @@ -91,9 +91,6 @@ auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallb auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; } -auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } -auto ReplicationInstance::GetMainUUID() const -> std::optional const & { return main_uuid_; } - auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { if (!IsReadyForUUIDPing()) { return true; @@ -113,11 +110,7 @@ auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID c } auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool { - if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { - return false; - } - SetNewMainUUID(new_main_uuid); - return true; + return replication_coordination_glue::SendSwapMainUUIDRpc(client_->RpcClient(), new_main_uuid); } auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {