AF thread issue
This commit is contained in:
parent
986ea37ead
commit
9d457eafa8
@ -27,15 +27,15 @@ auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &
|
|||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
CoordinatorClient::CoordinatorClient(CoordinatorClientConfig config,
|
CoordinatorClient::CoordinatorClient(CoordinatorState *coord_state, CoordinatorClientConfig config,
|
||||||
std::function<void(std::string_view)> freq_check_cb)
|
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
|
||||||
: rpc_context_{CreateClientContext(config)},
|
: rpc_context_{CreateClientContext(config)},
|
||||||
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
|
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
|
||||||
&rpc_context_},
|
&rpc_context_},
|
||||||
config_{std::move(config)},
|
config_{std::move(config)},
|
||||||
freq_check_cb_{std::move(freq_check_cb)} {
|
coord_state_{coord_state},
|
||||||
StartFrequentCheck();
|
succ_cb_{std::move(succ_cb)},
|
||||||
}
|
fail_cb_{std::move(fail_cb)} {}
|
||||||
|
|
||||||
CoordinatorClient::~CoordinatorClient() {
|
CoordinatorClient::~CoordinatorClient() {
|
||||||
auto exit_job = utils::OnScopeExit([&] {
|
auto exit_job = utils::OnScopeExit([&] {
|
||||||
@ -51,26 +51,31 @@ void CoordinatorClient::StartFrequentCheck() {
|
|||||||
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
|
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
|
||||||
"Health check frequency must be greater than 0");
|
"Health check frequency must be greater than 0");
|
||||||
|
|
||||||
replica_checker_.Run(
|
std::string_view instance_name = config_.instance_name;
|
||||||
"Coord checker", config_.health_check_frequency_sec,
|
replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [this, instance_name] {
|
||||||
[instance_name = config_.instance_name, rpc_client = &rpc_client_, freq_check_cb = freq_check_cb_] {
|
try {
|
||||||
try {
|
spdlog::trace("Sending frequent heartbeat to machine {} on {}:{}", instance_name, rpc_client_.Endpoint().address,
|
||||||
auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
rpc_client_.Endpoint().port);
|
||||||
stream.AwaitResponse();
|
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||||
freq_check_cb(instance_name);
|
if (stream.AwaitResponse().success) {
|
||||||
} catch (const rpc::RpcFailedException &) {
|
succ_cb_(coord_state_, instance_name);
|
||||||
// Nothing to do...wait for a reconnect
|
} else {
|
||||||
}
|
fail_cb_(coord_state_, instance_name);
|
||||||
});
|
}
|
||||||
|
} catch (const rpc::RpcFailedException &) {
|
||||||
|
fail_cb_(coord_state_, instance_name);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
||||||
|
|
||||||
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
||||||
auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); }
|
auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); }
|
||||||
// TODO: remove this method and implement copy constructor
|
// TODO: remove these method and implement copy constructor
|
||||||
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
||||||
auto CoordinatorClient::Callback() const -> std::function<void(std::string_view)> const & { return freq_check_cb_; }
|
auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; }
|
||||||
|
auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; }
|
||||||
|
|
||||||
////// AF design choice
|
////// AF design choice
|
||||||
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & {
|
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & {
|
||||||
|
@ -42,11 +42,14 @@ CoordinatorState::CoordinatorState() {
|
|||||||
MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port),
|
MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port),
|
||||||
"Instance cannot be a coordinator and have registered coordinator server.");
|
"Instance cannot be a coordinator and have registered coordinator server.");
|
||||||
|
|
||||||
|
spdlog::info("Executing coordinator constructor");
|
||||||
if (FLAGS_coordinator_server_port) {
|
if (FLAGS_coordinator_server_port) {
|
||||||
|
spdlog::info("Coordinator server port set");
|
||||||
auto const config = CoordinatorServerConfig{
|
auto const config = CoordinatorServerConfig{
|
||||||
.ip_address = kDefaultReplicationServerIp,
|
.ip_address = kDefaultReplicationServerIp,
|
||||||
.port = static_cast<uint16_t>(FLAGS_coordinator_server_port),
|
.port = static_cast<uint16_t>(FLAGS_coordinator_server_port),
|
||||||
};
|
};
|
||||||
|
spdlog::info("Executing coordinator constructor main replica");
|
||||||
|
|
||||||
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
|
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
|
||||||
}
|
}
|
||||||
@ -73,7 +76,13 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
|
|||||||
return name_endpoint_status;
|
return name_endpoint_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto freq_check_cb = [&](std::string_view instance_name) -> void {
|
// TODO: Refactor so that we just know about instances, nothing more. Design struct Instance which will know about
|
||||||
|
// role... From CoordinatorState perspective there should just be instances, no specific handling of main vs. replica.
|
||||||
|
// Here after failover it can happen that replica
|
||||||
|
// has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in
|
||||||
|
// registered replicas info",
|
||||||
|
// instance_name);
|
||||||
|
auto find_client_info = [&](std::string_view instance_name) -> CoordinatorClientInfo & {
|
||||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||||
@ -82,25 +91,33 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
|
|||||||
registered_replicas_info,
|
registered_replicas_info,
|
||||||
[instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; });
|
[instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; });
|
||||||
|
|
||||||
// TODO: Refactor so that we just know about instances, nothing more. Here after failover it can happen that replica
|
if (replica_client_info != registered_replicas_info.end()) {
|
||||||
// has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in
|
return *replica_client_info;
|
||||||
// registered replicas info",
|
|
||||||
// instance_name);
|
|
||||||
|
|
||||||
if (replica_client_info == registered_replicas_info.end()) {
|
|
||||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
|
||||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
|
|
||||||
registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
|
||||||
} else {
|
|
||||||
replica_client_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||||
|
MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
|
||||||
|
return *registered_main_info;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto *coord_client =
|
auto repl_succ_cb = [&](std::string_view instance_name) -> void {
|
||||||
&std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(std::move(config), std::move(freq_check_cb));
|
auto &client_info = find_client_info(instance_name);
|
||||||
|
client_info.UpdateLastResponseTime();
|
||||||
|
};
|
||||||
|
|
||||||
|
auto repl_fail_cb = [&](std::string_view instance_name) -> void {
|
||||||
|
auto &client_info = find_client_info(instance_name);
|
||||||
|
client_info.UpdateInstanceStatus();
|
||||||
|
};
|
||||||
|
|
||||||
|
// CoordinatorClient coord_client{this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)};
|
||||||
|
|
||||||
|
auto *coord_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(
|
||||||
|
this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
|
||||||
|
|
||||||
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
|
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
|
||||||
coord_client->Endpoint());
|
coord_client->Endpoint());
|
||||||
|
coord_client->StartFrequentCheck();
|
||||||
|
|
||||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||||
}
|
}
|
||||||
@ -121,42 +138,54 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM
|
|||||||
return endpoint_status;
|
return endpoint_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto freq_check_cb = [&](std::string_view instance_name) -> void {
|
// TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks?
|
||||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
// We should probably at that point search for main also in instances as for replicas...
|
||||||
|
auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
|
||||||
|
MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
|
||||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||||
MG_ASSERT(std::get<CoordinatorData>(data_).registered_main_info_.has_value(),
|
MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
|
||||||
"Main info is not set, but callback is called");
|
"Main info is not set, but callback is called");
|
||||||
|
|
||||||
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
|
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
|
||||||
// this point....
|
// this point....
|
||||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
auto ®istered_main_info = std::get<CoordinatorData>(coord_state->data_).registered_main_info_;
|
||||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name,
|
MG_ASSERT(registered_main_info->instance_name_ == instance_name,
|
||||||
"Callback called for wrong instance name: {}, expected: {}", instance_name,
|
"Callback called for wrong instance name: {}, expected: {}", instance_name,
|
||||||
registered_main_info->instance_name_);
|
registered_main_info->instance_name_);
|
||||||
|
return *registered_main_info;
|
||||||
|
};
|
||||||
|
|
||||||
registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
auto succ_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
|
spdlog::trace("Executing success callback for main: {}", std::string(instance_name));
|
||||||
|
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
||||||
|
registered_main_info.UpdateLastResponseTime();
|
||||||
|
};
|
||||||
|
|
||||||
// if (!registered_main_info->is_alive_) {
|
auto fail_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
// spdlog::warn("Main is not alive, starting failover");
|
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
||||||
// switch (auto failover_status = DoFailover(); failover_status) {
|
if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
|
||||||
// using enum DoFailoverStatus;
|
// spdlog::warn("Main is not alive, starting failover");
|
||||||
// case ALL_REPLICAS_DOWN:
|
// switch (auto failover_status = DoFailover(); failover_status) {
|
||||||
// spdlog::warn("Failover aborted since all replicas are down!");
|
// using enum DoFailoverStatus;
|
||||||
// case MAIN_ALIVE:
|
// case ALL_REPLICAS_DOWN:
|
||||||
// spdlog::warn("Failover aborted since main is alive!");
|
// spdlog::warn("Failover aborted since all replicas are down!");
|
||||||
// case CLUSTER_UNINITIALIZED:
|
// case MAIN_ALIVE:
|
||||||
// spdlog::warn("Failover aborted since cluster is uninitialized!");
|
// spdlog::warn("Failover aborted since main is alive!");
|
||||||
// case SUCCESS:
|
// case CLUSTER_UNINITIALIZED:
|
||||||
// break;
|
// spdlog::warn("Failover aborted since cluster is uninitialized!");
|
||||||
// }
|
// case SUCCESS:
|
||||||
// }
|
// break;
|
||||||
|
// }
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||||
registered_main = std::make_unique<CoordinatorClient>(std::move(config), std::move(freq_check_cb));
|
registered_main =
|
||||||
|
std::make_unique<CoordinatorClient>(this, std::move(config), std::move(succ_cb), std::move(fail_cb));
|
||||||
|
|
||||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
|
||||||
registered_main_info.emplace(registered_main->InstanceName(), registered_main->Endpoint());
|
registered_main->Endpoint());
|
||||||
|
registered_main->StartFrequentCheck();
|
||||||
|
|
||||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||||
}
|
}
|
||||||
@ -168,19 +197,13 @@ auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorInstanceSt
|
|||||||
std::vector<CoordinatorInstanceStatus> instances_status;
|
std::vector<CoordinatorInstanceStatus> instances_status;
|
||||||
instances_status.reserve(registered_replicas_info.size());
|
instances_status.reserve(registered_replicas_info.size());
|
||||||
|
|
||||||
std::ranges::transform(
|
std::ranges::transform(registered_replicas_info, std::back_inserter(instances_status),
|
||||||
registered_replicas_info, std::back_inserter(instances_status),
|
[](const CoordinatorClientInfo &coord_client_info) {
|
||||||
[](const CoordinatorClientInfo &coord_client_info) {
|
return CoordinatorInstanceStatus{
|
||||||
const auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
.instance_name = coord_client_info.instance_name_,
|
||||||
std::chrono::system_clock::now() -
|
.socket_address = coord_client_info.endpoint->SocketAddress(),
|
||||||
coord_client_info.last_response_time_.load(std::memory_order_acquire))
|
.is_alive = coord_client_info.is_alive_};
|
||||||
.count();
|
});
|
||||||
|
|
||||||
return CoordinatorInstanceStatus{
|
|
||||||
.instance_name = coord_client_info.instance_name_,
|
|
||||||
.socket_address = coord_client_info.endpoint->SocketAddress(),
|
|
||||||
.is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
|
|
||||||
});
|
|
||||||
return instances_status;
|
return instances_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,15 +214,10 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
if (!main.has_value()) {
|
if (!main.has_value()) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
const auto sec_since_last_response =
|
|
||||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
|
||||||
main->last_response_time_.load(std::memory_order_acquire))
|
|
||||||
.count();
|
|
||||||
|
|
||||||
return CoordinatorInstanceStatus{
|
return CoordinatorInstanceStatus{.instance_name = main->instance_name_,
|
||||||
.instance_name = main->instance_name_,
|
.socket_address = main->endpoint->SocketAddress(),
|
||||||
.socket_address = main->endpoint->SocketAddress(),
|
.is_alive = main->is_alive_};
|
||||||
.is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||||
@ -222,12 +240,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sec_since_last_response_main =
|
if (current_main_info->is_alive_) {
|
||||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
|
||||||
current_main_info->last_response_time_.load())
|
|
||||||
.count();
|
|
||||||
|
|
||||||
if (sec_since_last_response_main <= CoordinatorClusterConfig::alive_response_time_difference_sec_) {
|
|
||||||
return DoFailoverStatus::MAIN_ALIVE;
|
return DoFailoverStatus::MAIN_ALIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,19 +252,14 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
// Get all replicas and find new main
|
// Get all replicas and find new main
|
||||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||||
|
|
||||||
const auto chosen_replica_info =
|
const auto chosen_replica_info = std::ranges::find_if(
|
||||||
std::ranges::find_if(registered_replicas_info, [](const CoordinatorClientInfo &client_info) {
|
registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.is_alive_; });
|
||||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
|
||||||
std::chrono::system_clock::now() - client_info.last_response_time_.load())
|
|
||||||
.count();
|
|
||||||
return sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
|
||||||
});
|
|
||||||
if (chosen_replica_info == registered_replicas_info.end()) {
|
if (chosen_replica_info == registered_replicas_info.end()) {
|
||||||
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||||
const auto chosen_replica =
|
auto chosen_replica =
|
||||||
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
|
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
|
||||||
return replica.InstanceName() == chosen_replica_info->instance_name_;
|
return replica.InstanceName() == chosen_replica_info->instance_name_;
|
||||||
});
|
});
|
||||||
@ -270,7 +278,8 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
// Set on coordinator data of new main
|
// Set on coordinator data of new main
|
||||||
// allocate resources for new main, clear replication info on this replica as main
|
// allocate resources for new main, clear replication info on this replica as main
|
||||||
// set last response time
|
// set last response time
|
||||||
auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config(), chosen_replica->Callback());
|
auto potential_new_main = std::make_unique<CoordinatorClient>(
|
||||||
|
this, chosen_replica->Config(), chosen_replica->SuccCallback(), chosen_replica->FailCallback());
|
||||||
potential_new_main->ReplicationClientInfo().reset();
|
potential_new_main->ReplicationClientInfo().reset();
|
||||||
auto potential_new_main_info = *chosen_replica_info;
|
auto potential_new_main_info = *chosen_replica_info;
|
||||||
|
|
||||||
@ -291,6 +300,8 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
registered_replicas.erase(chosen_replica);
|
registered_replicas.erase(chosen_replica);
|
||||||
registered_replicas_info.erase(chosen_replica_info);
|
registered_replicas_info.erase(chosen_replica_info);
|
||||||
|
|
||||||
|
current_main->StartFrequentCheck();
|
||||||
|
|
||||||
return DoFailoverStatus::SUCCESS;
|
return DoFailoverStatus::SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,17 +22,22 @@
|
|||||||
|
|
||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
|
class CoordinatorState;
|
||||||
|
|
||||||
class CoordinatorClient {
|
class CoordinatorClient {
|
||||||
public:
|
public:
|
||||||
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||||
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
|
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
|
||||||
|
|
||||||
explicit CoordinatorClient(CoordinatorClientConfig config, std::function<void(std::string_view)> freq_check_cb);
|
using HealthCheckCallback = std::function<void(CoordinatorState *, std::string_view)>;
|
||||||
|
|
||||||
|
explicit CoordinatorClient(CoordinatorState *coord_state_, CoordinatorClientConfig config,
|
||||||
|
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
|
||||||
|
|
||||||
~CoordinatorClient();
|
~CoordinatorClient();
|
||||||
|
|
||||||
CoordinatorClient(CoordinatorClient &other) = delete;
|
CoordinatorClient(CoordinatorClient &) = delete;
|
||||||
CoordinatorClient &operator=(CoordinatorClient const &other) = delete;
|
CoordinatorClient &operator=(CoordinatorClient const &) = delete;
|
||||||
|
|
||||||
CoordinatorClient(CoordinatorClient &&) noexcept = delete;
|
CoordinatorClient(CoordinatorClient &&) noexcept = delete;
|
||||||
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
|
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
|
||||||
@ -48,7 +53,8 @@ class CoordinatorClient {
|
|||||||
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
||||||
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
||||||
// TODO: We should add copy constructor and then there won't be need for this
|
// TODO: We should add copy constructor and then there won't be need for this
|
||||||
auto Callback() const -> std::function<void(std::string_view)> const &;
|
auto SuccCallback() const -> HealthCheckCallback const &;
|
||||||
|
auto FailCallback() const -> HealthCheckCallback const &;
|
||||||
|
|
||||||
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
||||||
return first.config_ == second.config_;
|
return first.config_ == second.config_;
|
||||||
@ -62,7 +68,9 @@ class CoordinatorClient {
|
|||||||
mutable rpc::Client rpc_client_;
|
mutable rpc::Client rpc_client_;
|
||||||
|
|
||||||
CoordinatorClientConfig config_;
|
CoordinatorClientConfig config_;
|
||||||
std::function<void(std::string_view)> freq_check_cb_;
|
CoordinatorState *coord_state_;
|
||||||
|
HealthCheckCallback succ_cb_;
|
||||||
|
HealthCheckCallback fail_cb_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#ifdef MG_ENTERPRISE
|
#ifdef MG_ENTERPRISE
|
||||||
|
|
||||||
|
#include "coordination/coordinator_cluster_config.hpp"
|
||||||
#include "io/network/endpoint.hpp"
|
#include "io/network/endpoint.hpp"
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
@ -22,18 +23,23 @@ namespace memgraph::coordination {
|
|||||||
|
|
||||||
struct CoordinatorClientInfo {
|
struct CoordinatorClientInfo {
|
||||||
CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint)
|
CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint)
|
||||||
: last_response_time_(std::chrono::system_clock::now()), instance_name_(instance_name), endpoint(endpoint) {}
|
: last_response_time_(std::chrono::system_clock::now()),
|
||||||
|
is_alive_(true),
|
||||||
|
instance_name_(instance_name),
|
||||||
|
endpoint(endpoint) {}
|
||||||
|
|
||||||
~CoordinatorClientInfo() = default;
|
~CoordinatorClientInfo() = default;
|
||||||
|
|
||||||
CoordinatorClientInfo(const CoordinatorClientInfo &other)
|
CoordinatorClientInfo(const CoordinatorClientInfo &other)
|
||||||
: last_response_time_(other.last_response_time_.load()),
|
: last_response_time_(other.last_response_time_.load()),
|
||||||
|
is_alive_(other.is_alive_),
|
||||||
instance_name_(other.instance_name_),
|
instance_name_(other.instance_name_),
|
||||||
endpoint(other.endpoint) {}
|
endpoint(other.endpoint) {}
|
||||||
|
|
||||||
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
|
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
last_response_time_.store(other.last_response_time_.load());
|
last_response_time_.store(other.last_response_time_.load());
|
||||||
|
is_alive_ = other.is_alive_;
|
||||||
instance_name_ = other.instance_name_;
|
instance_name_ = other.instance_name_;
|
||||||
endpoint = other.endpoint;
|
endpoint = other.endpoint;
|
||||||
}
|
}
|
||||||
@ -42,21 +48,32 @@ struct CoordinatorClientInfo {
|
|||||||
|
|
||||||
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
|
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
|
||||||
: last_response_time_(other.last_response_time_.load()),
|
: last_response_time_(other.last_response_time_.load()),
|
||||||
|
is_alive_(other.is_alive_),
|
||||||
instance_name_(other.instance_name_),
|
instance_name_(other.instance_name_),
|
||||||
endpoint(other.endpoint) {}
|
endpoint(other.endpoint) {}
|
||||||
|
|
||||||
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
|
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
last_response_time_.store(other.last_response_time_.load());
|
last_response_time_.store(other.last_response_time_.load());
|
||||||
|
is_alive_ = other.is_alive_;
|
||||||
instance_name_ = other.instance_name_;
|
instance_name_ = other.instance_name_;
|
||||||
endpoint = other.endpoint;
|
endpoint = other.endpoint;
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: Add a method is_alive
|
auto UpdateInstanceStatus() -> bool {
|
||||||
|
is_alive_ =
|
||||||
|
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_.load())
|
||||||
|
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||||
|
return is_alive_;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto UpdateLastResponseTime() -> void { last_response_time_.store(std::chrono::system_clock::now()); }
|
||||||
|
|
||||||
|
// TODO: (andi) Wrap in private to forbid modification
|
||||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||||
|
bool is_alive_{false};
|
||||||
std::string_view instance_name_;
|
std::string_view instance_name_;
|
||||||
const io::network::Endpoint *endpoint;
|
const io::network::Endpoint *endpoint;
|
||||||
};
|
};
|
||||||
|
@ -57,7 +57,7 @@ class CoordinatorState {
|
|||||||
// TODO: Data is not thread safe
|
// TODO: Data is not thread safe
|
||||||
struct CoordinatorData {
|
struct CoordinatorData {
|
||||||
std::list<CoordinatorClient> registered_replicas_;
|
std::list<CoordinatorClient> registered_replicas_;
|
||||||
std::vector<CoordinatorClientInfo> registered_replicas_info_;
|
std::list<CoordinatorClientInfo> registered_replicas_info_;
|
||||||
std::unique_ptr<CoordinatorClient> registered_main_;
|
std::unique_ptr<CoordinatorClient> registered_main_;
|
||||||
std::optional<CoordinatorClientInfo> registered_main_info_;
|
std::optional<CoordinatorClientInfo> registered_main_info_;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user