Improve coordinator handler
This commit is contained in:
parent
8884a0ea78
commit
afe7d47a5c
@ -11,6 +11,9 @@ target_sources(mg-coordination
|
||||
include/coordination/coordinator_exceptions.hpp
|
||||
include/coordination/coordinator_slk.hpp
|
||||
include/coordination/constants.hpp
|
||||
include/coordination/failover_status.hpp
|
||||
include/coordination/coordinator_client_info.hpp
|
||||
include/coordination/coordinator_cluster_config.hpp
|
||||
|
||||
PRIVATE
|
||||
coordinator_client.cpp
|
||||
|
@ -46,30 +46,19 @@ CoordinatorClient::~CoordinatorClient() {
|
||||
void CoordinatorClient::StartFrequentCheck() {
|
||||
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
|
||||
"Health check frequency must be greater than 0");
|
||||
replica_checker_.Run(
|
||||
"Coord checker", config_.health_check_frequency_sec,
|
||||
[last_response_time = &last_response_time_, rpc_client = &rpc_client_] {
|
||||
try {
|
||||
{
|
||||
auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||
stream.AwaitResponse();
|
||||
last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel);
|
||||
}
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
// Nothing to do...wait for a reconnect
|
||||
}
|
||||
});
|
||||
replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [rpc_client = &rpc_client_] {
|
||||
try {
|
||||
auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||
stream.AwaitResponse();
|
||||
// last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel);
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
// Nothing to do...wait for a reconnect
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
||||
|
||||
bool CoordinatorClient::DoHealthCheck() const {
|
||||
auto current_time = std::chrono::system_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::seconds>(current_time -
|
||||
last_response_time_.load(std::memory_order_acquire));
|
||||
return duration.count() <= alive_response_time_difference_sec_;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
||||
auto CoordinatorClient::Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); }
|
||||
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
||||
@ -86,12 +75,6 @@ auto CoordinatorClient::ReplicationClientInfo() -> std::optional<CoordinatorClie
|
||||
return config_.replication_client_info;
|
||||
}
|
||||
|
||||
void CoordinatorClient::UpdateTimeCheck(const std::chrono::system_clock::time_point &last_checked_time) {
|
||||
last_response_time_.store(last_checked_time, std::memory_order_acq_rel);
|
||||
}
|
||||
|
||||
auto CoordinatorClient::GetLastTimeResponse() -> std::chrono::system_clock::time_point { return last_response_time_; }
|
||||
|
||||
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
|
||||
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
|
||||
try {
|
||||
|
@ -10,11 +10,11 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "coordination/coordinator_state.hpp"
|
||||
#include <span>
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/coordinator_cluster_config.hpp"
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "flags/replication.hpp"
|
||||
@ -53,8 +53,8 @@ CoordinatorState::CoordinatorState() {
|
||||
}
|
||||
}
|
||||
|
||||
auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *> {
|
||||
/// TODO: Don't return client, start here the client
|
||||
auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
const auto name_endpoint_status =
|
||||
std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
|
||||
return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR;
|
||||
@ -72,12 +72,32 @@ auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config)
|
||||
return name_endpoint_status;
|
||||
}
|
||||
|
||||
auto callback = [&]() -> void {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||
|
||||
auto replica_client_info = std::ranges::find_if(
|
||||
registered_replicas_info,
|
||||
[&config](const CoordinatorClientInfo &replica) { return replica.instance_name_ == config.instance_name; });
|
||||
|
||||
MG_ASSERT(replica_client_info == registered_replicas_info.end(), "Replica {} not found in registered replicas info",
|
||||
config.instance_name);
|
||||
|
||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() - replica_client_info->last_response_time_.load(std::memory_order_acquire));
|
||||
|
||||
replica_client_info->is_alive_ =
|
||||
sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||
};
|
||||
|
||||
// Maybe no need to return client if you can start replica client here
|
||||
return &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config);
|
||||
auto *replica_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config);
|
||||
replica_client->StartFrequentCheck();
|
||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *> {
|
||||
auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
const auto endpoint_status = std::visit(
|
||||
memgraph::utils::Overloaded{
|
||||
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
|
||||
@ -92,7 +112,39 @@ auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config)
|
||||
|
||||
auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
registered_main = std::make_unique<CoordinatorClient>(config);
|
||||
return registered_main.get();
|
||||
|
||||
auto cb = [&]() -> void {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||
|
||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
MG_ASSERT(registered_main_info.instance_name_ == config.instance_name,
|
||||
"Registered main instance name {} does not match config instance name {}",
|
||||
registered_main_info.instance_name_, config.instance_name);
|
||||
|
||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() - registered_main_info.last_response_time_.load(std::memory_order_acquire));
|
||||
registered_main_info.is_alive_ =
|
||||
sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||
|
||||
if (!registered_main_info.is_alive_) {
|
||||
spdlog::warn("Main is not alive, starting failover");
|
||||
switch (auto failover_status = DoFailover(); failover_status) {
|
||||
using enum DoFailoverStatus;
|
||||
case ALL_REPLICAS_DOWN:
|
||||
spdlog::warn("Failover aborted since all replicas are down!");
|
||||
case MAIN_ALIVE:
|
||||
spdlog::warn("Failover aborted since main is alive!");
|
||||
case CLUSTER_UNINITIALIZED:
|
||||
spdlog::warn("Failover aborted since cluster is uninitialized!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
registered_main->StartFrequentCheck();
|
||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorEntityInfo> {
|
||||
@ -123,9 +175,9 @@ auto CoordinatorState::PingReplicas() const -> std::unordered_map<std::string_vi
|
||||
std::unordered_map<std::string_view, bool> result;
|
||||
const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||
result.reserve(registered_replicas.size());
|
||||
for (const CoordinatorClient &replica_client : registered_replicas) {
|
||||
result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck());
|
||||
}
|
||||
// for (const CoordinatorClient &replica_client : registered_replicas) {
|
||||
// result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck());
|
||||
// }
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -135,12 +187,12 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth
|
||||
"Can't call show main on data_, as variant holds wrong alternative");
|
||||
const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
if (registered_main) {
|
||||
return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()};
|
||||
// return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||
// 1. MAIN is already down, stop sending frequent checks
|
||||
// 2. find new replica (coordinator)
|
||||
// 3. make copy replica's client as potential new main client (coordinator)
|
||||
@ -149,7 +201,7 @@ auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||
// 6. remove replica which was promoted to main from all replicas -> this will shut down RPC frequent check client
|
||||
// (coordinator)
|
||||
// 7. for new main start frequent checks (coordinator)
|
||||
|
||||
/*
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
|
||||
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||
|
||||
@ -208,7 +260,7 @@ auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||
|
||||
// 7.
|
||||
current_main->StartFrequentCheck();
|
||||
|
||||
*/
|
||||
return DoFailoverStatus::SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,9 @@ namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorClient {
|
||||
public:
|
||||
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
|
||||
|
||||
explicit CoordinatorClient(const CoordinatorClientConfig &config);
|
||||
|
||||
~CoordinatorClient();
|
||||
@ -37,17 +40,13 @@ class CoordinatorClient {
|
||||
void StartFrequentCheck();
|
||||
void StopFrequentCheck();
|
||||
|
||||
auto DoHealthCheck() const -> bool;
|
||||
auto SendPromoteReplicaToMainRpc(
|
||||
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool;
|
||||
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
|
||||
|
||||
auto InstanceName() const -> std::string_view;
|
||||
auto Endpoint() const -> io::network::Endpoint const &;
|
||||
auto Config() const -> CoordinatorClientConfig const &;
|
||||
auto ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const &;
|
||||
auto ReplicationClientInfo() -> std::optional<CoordinatorClientConfig::ReplicationClientInfo> &;
|
||||
void UpdateTimeCheck(const std::chrono::system_clock::time_point &last_checked_time);
|
||||
auto GetLastTimeResponse() -> std::chrono::system_clock::time_point;
|
||||
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
||||
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
||||
|
||||
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
||||
return first.config_ == second.config_;
|
||||
@ -59,10 +58,8 @@ class CoordinatorClient {
|
||||
|
||||
communication::ClientContext rpc_context_;
|
||||
mutable rpc::Client rpc_client_;
|
||||
CoordinatorClientConfig config_;
|
||||
|
||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||
static constexpr int alive_response_time_difference_sec_{5};
|
||||
CoordinatorClientConfig config_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -0,0 +1,30 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
// TODO: better connect this
|
||||
struct CoordinatorClientInfo {
|
||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||
bool is_alive_{false};
|
||||
std::string_view instance_name_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
||||
#endif
|
@ -0,0 +1,22 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct CoordinatorClusterConfig {
|
||||
static constexpr int alive_response_time_difference_sec_{5};
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
@ -14,8 +14,11 @@
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/coordinator_client_info.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "coordination/coordinator_server.hpp"
|
||||
#include "coordination/failover_status.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
#include "rpc/server.hpp"
|
||||
#include "utils/result.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
@ -26,16 +29,6 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
enum class RegisterMainReplicaCoordinatorStatus : uint8_t {
|
||||
NAME_EXISTS,
|
||||
END_POINT_EXISTS,
|
||||
COULD_NOT_BE_PERSISTED,
|
||||
NOT_COORDINATOR,
|
||||
SUCCESS
|
||||
};
|
||||
|
||||
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED };
|
||||
|
||||
class CoordinatorState {
|
||||
public:
|
||||
CoordinatorState();
|
||||
@ -44,21 +37,12 @@ class CoordinatorState {
|
||||
CoordinatorState(const CoordinatorState &) = delete;
|
||||
CoordinatorState &operator=(const CoordinatorState &) = delete;
|
||||
|
||||
CoordinatorState(CoordinatorState &&other) noexcept : data_(std::move(other.data_)) {}
|
||||
CoordinatorState(CoordinatorState &&) noexcept = delete;
|
||||
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
|
||||
|
||||
CoordinatorState &operator=(CoordinatorState &&other) noexcept {
|
||||
if (this == &other) {
|
||||
return *this;
|
||||
}
|
||||
data_ = std::move(other.data_);
|
||||
return *this;
|
||||
}
|
||||
[[nodiscard]] auto RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto RegisterReplica(const CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *>;
|
||||
|
||||
auto RegisterMain(const CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *>;
|
||||
[[nodiscard]] auto RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto ShowReplicas() const -> std::vector<CoordinatorEntityInfo>;
|
||||
|
||||
@ -71,18 +55,17 @@ class CoordinatorState {
|
||||
// The client code must check that the server exists before calling this method.
|
||||
auto GetCoordinatorServer() const -> CoordinatorServer &;
|
||||
|
||||
auto DoFailover() -> DoFailoverStatus;
|
||||
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
|
||||
|
||||
private:
|
||||
// TODO: Data is not thread safe
|
||||
|
||||
// Coordinator stores registered replicas and main
|
||||
struct CoordinatorData {
|
||||
std::list<CoordinatorClient> registered_replicas_;
|
||||
std::vector<CoordinatorClientInfo> registered_replicas_info_;
|
||||
std::unique_ptr<CoordinatorClient> registered_main_;
|
||||
CoordinatorClientInfo registered_main_info_;
|
||||
};
|
||||
|
||||
// Data which each main and replica stores
|
||||
struct CoordinatorMainReplicaData {
|
||||
std::unique_ptr<CoordinatorServer> coordinator_server_;
|
||||
};
|
||||
|
21
src/coordination/include/coordination/failover_status.hpp
Normal file
21
src/coordination/include/coordination/failover_status.hpp
Normal file
@ -0,0 +1,21 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED };
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
@ -0,0 +1,29 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
enum class RegisterMainReplicaCoordinatorStatus : uint8_t {
|
||||
NAME_EXISTS,
|
||||
END_POINT_EXISTS,
|
||||
COULD_NOT_BE_PERSISTED,
|
||||
NOT_COORDINATOR,
|
||||
SUCCESS
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
@ -19,49 +19,14 @@ namespace memgraph::dbms {
|
||||
|
||||
CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {}
|
||||
|
||||
auto CoordinatorHandler::RegisterReplicaOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus> {
|
||||
auto instance_client = dbms_handler_.CoordinatorState().RegisterReplica(config);
|
||||
using repl_status = memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
using dbms_status = memgraph::dbms::RegisterMainReplicaCoordinatorStatus;
|
||||
if (instance_client.HasError()) {
|
||||
switch (instance_client.GetError()) {
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR:
|
||||
MG_ASSERT(false, "Only coordinator instance can register main and replica!");
|
||||
return {};
|
||||
case repl_status::NAME_EXISTS:
|
||||
return dbms_status::NAME_EXISTS;
|
||||
case repl_status::END_POINT_EXISTS:
|
||||
return dbms_status::END_POINT_EXISTS;
|
||||
case repl_status::COULD_NOT_BE_PERSISTED:
|
||||
return dbms_status::COULD_NOT_BE_PERSISTED;
|
||||
case repl_status::SUCCESS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
instance_client.GetValue()->StartFrequentCheck();
|
||||
return {};
|
||||
auto CoordinatorHandler::RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus {
|
||||
return dbms_handler_.CoordinatorState().RegisterReplica(config);
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus> {
|
||||
auto instance_client = dbms_handler_.CoordinatorState().RegisterMain(config);
|
||||
if (instance_client.HasError()) switch (instance_client.GetError()) {
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR:
|
||||
MG_ASSERT(false, "Only coordinator instance can register main and replica!");
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NAME_EXISTS:
|
||||
return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::NAME_EXISTS;
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::END_POINT_EXISTS:
|
||||
return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::END_POINT_EXISTS;
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::COULD_NOT_BE_PERSISTED:
|
||||
return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::COULD_NOT_BE_PERSISTED;
|
||||
case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::SUCCESS:
|
||||
break;
|
||||
}
|
||||
|
||||
instance_client.GetValue()->StartFrequentCheck();
|
||||
return {};
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus {
|
||||
return dbms_handler_.CoordinatorState().RegisterMain(config);
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo> {
|
||||
@ -80,18 +45,8 @@ auto CoordinatorHandler::PingMainOnCoordinator() const -> std::optional<coordina
|
||||
return dbms_handler_.CoordinatorState().PingMain();
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::DoFailover() const -> DoFailoverStatus {
|
||||
auto status = dbms_handler_.CoordinatorState().DoFailover();
|
||||
switch (status) {
|
||||
case memgraph::coordination::DoFailoverStatus::ALL_REPLICAS_DOWN:
|
||||
return memgraph::dbms::DoFailoverStatus::ALL_REPLICAS_DOWN;
|
||||
case memgraph::coordination::DoFailoverStatus::SUCCESS:
|
||||
return memgraph::dbms::DoFailoverStatus::SUCCESS;
|
||||
case memgraph::coordination::DoFailoverStatus::MAIN_ALIVE:
|
||||
return memgraph::dbms::DoFailoverStatus::MAIN_ALIVE;
|
||||
case memgraph::coordination::DoFailoverStatus::CLUSTER_UNINITIALIZED:
|
||||
return memgraph::dbms::DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
||||
}
|
||||
auto CoordinatorHandler::DoFailover() const -> coordination::DoFailoverStatus {
|
||||
return dbms_handler_.CoordinatorState().DoFailover();
|
||||
}
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
|
@ -15,49 +15,38 @@
|
||||
|
||||
#include "utils/result.hpp"
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "coordination/failover_status.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
struct CoordinatorEntityInfo;
|
||||
struct CoordinatorEntityHealthInfo;
|
||||
struct CoordinatorClientConfig;
|
||||
} // namespace memgraph::coordination
|
||||
|
||||
namespace memgraph::dbms {
|
||||
|
||||
enum class RegisterMainReplicaCoordinatorStatus : uint8_t {
|
||||
NAME_EXISTS,
|
||||
END_POINT_EXISTS,
|
||||
COULD_NOT_BE_PERSISTED,
|
||||
NOT_COORDINATOR,
|
||||
SUCCESS
|
||||
};
|
||||
|
||||
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED };
|
||||
|
||||
class DbmsHandler;
|
||||
|
||||
class CoordinatorHandler {
|
||||
public:
|
||||
explicit CoordinatorHandler(DbmsHandler &dbms_handler);
|
||||
|
||||
auto RegisterReplicaOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus>;
|
||||
auto RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config)
|
||||
-> utils::BasicResult<RegisterMainReplicaCoordinatorStatus>;
|
||||
auto RegisterMainOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto ShowReplicasOnCoordinator() const -> std::vector<memgraph::coordination::CoordinatorEntityInfo>;
|
||||
auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo>;
|
||||
|
||||
auto ShowMainOnCoordinator() const -> std::optional<memgraph::coordination::CoordinatorEntityInfo>;
|
||||
auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo>;
|
||||
|
||||
auto PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool>;
|
||||
|
||||
auto PingMainOnCoordinator() const -> std::optional<memgraph::coordination::CoordinatorEntityHealthInfo>;
|
||||
auto PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo>;
|
||||
|
||||
auto DoFailover() const -> DoFailoverStatus;
|
||||
auto DoFailover() const -> coordination::DoFailoverStatus;
|
||||
|
||||
private:
|
||||
DbmsHandler &dbms_handler_;
|
||||
|
@ -495,8 +495,20 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
.replication_client_info = repl_config,
|
||||
.ssl = std::nullopt};
|
||||
|
||||
if (const auto ret = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config); ret.HasError()) {
|
||||
throw QueryRuntimeException("Couldn't register replica on coordinator!");
|
||||
auto status = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config);
|
||||
switch (status) {
|
||||
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
case NAME_EXISTS:
|
||||
throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!");
|
||||
case END_POINT_EXISTS:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register replica instance since instance with such endpoint already exists!");
|
||||
case COULD_NOT_BE_PERSISTED:
|
||||
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
|
||||
case NOT_COORDINATOR:
|
||||
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -515,8 +527,20 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
.health_check_frequency_sec = instance_check_frequency,
|
||||
.ssl = std::nullopt};
|
||||
|
||||
if (const auto ret = coordinator_handler_.RegisterMainOnCoordinator(config); ret.HasError()) {
|
||||
throw QueryRuntimeException("Couldn't register main on coordinator!");
|
||||
auto status = coordinator_handler_.RegisterMainOnCoordinator(config);
|
||||
switch (status) {
|
||||
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
case NAME_EXISTS:
|
||||
throw QueryRuntimeException("Couldn't register main instance since instance with such name already exists!");
|
||||
case END_POINT_EXISTS:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register main instance since instance with such endpoint already exists!");
|
||||
case COULD_NOT_BE_PERSISTED:
|
||||
throw QueryRuntimeException("Couldn't register main instance since it couldn't be persisted!");
|
||||
case NOT_COORDINATOR:
|
||||
throw QueryRuntimeException("Couldn't register main instance since this instance is not a coordinator!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -528,7 +552,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
|
||||
auto status = coordinator_handler_.DoFailover();
|
||||
switch (status) {
|
||||
using enum memgraph::dbms::DoFailoverStatus;
|
||||
using enum memgraph::coordination::DoFailoverStatus;
|
||||
case ALL_REPLICAS_DOWN:
|
||||
throw QueryRuntimeException("Failover aborted since all replicas are down!");
|
||||
case MAIN_ALIVE:
|
||||
|
Loading…
Reference in New Issue
Block a user