Thread-safe access to coordinator data
This commit is contained in:
parent
67c1874e81
commit
ab34b060c0
@ -9,6 +9,7 @@ target_sources(mg-coordination
|
|||||||
include/coordination/coordinator_config.hpp
|
include/coordination/coordinator_config.hpp
|
||||||
include/coordination/coordinator_exceptions.hpp
|
include/coordination/coordinator_exceptions.hpp
|
||||||
include/coordination/coordinator_slk.hpp
|
include/coordination/coordinator_slk.hpp
|
||||||
|
include/coordination/coordinator_data.hpp
|
||||||
include/coordination/constants.hpp
|
include/coordination/constants.hpp
|
||||||
include/coordination/failover_status.hpp
|
include/coordination/failover_status.hpp
|
||||||
include/coordination/coordinator_client_info.hpp
|
include/coordination/coordinator_client_info.hpp
|
||||||
|
@ -71,8 +71,7 @@ void CoordinatorClient::StartFrequentCheck() {
|
|||||||
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
||||||
|
|
||||||
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
||||||
auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); }
|
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
|
||||||
// TODO: remove these method and implement copy constructor
|
|
||||||
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
||||||
auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; }
|
auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; }
|
||||||
auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; }
|
auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; }
|
||||||
|
@ -85,27 +85,28 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
|
|||||||
auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
|
auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
|
||||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
|
MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
|
||||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||||
auto ®istered_replicas_info = std::get<CoordinatorData>(coord_state->data_).registered_replicas_info_;
|
auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
|
||||||
|
std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
|
||||||
|
|
||||||
auto replica_client_info = std::ranges::find_if(
|
auto replica_client_info = std::ranges::find_if(
|
||||||
registered_replicas_info,
|
coord_data.registered_replicas_info_,
|
||||||
[instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; });
|
[instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; });
|
||||||
|
|
||||||
if (replica_client_info != registered_replicas_info.end()) {
|
if (replica_client_info != coord_data.registered_replicas_info_.end()) {
|
||||||
return *replica_client_info;
|
return *replica_client_info;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ®istered_main_info = std::get<CoordinatorData>(coord_state->data_).registered_main_info_;
|
MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name,
|
||||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
|
"Instance is neither a replica nor main...");
|
||||||
return *registered_main_info;
|
return *coord_data.registered_main_info_;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto repl_succ_cb = [&find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
auto &client_info = find_client_info(coord_state, instance_name);
|
auto &client_info = find_client_info(coord_state, instance_name);
|
||||||
client_info.UpdateLastResponseTime();
|
client_info.UpdateLastResponseTime();
|
||||||
};
|
};
|
||||||
|
|
||||||
auto repl_fail_cb = [&find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
auto &client_info = find_client_info(coord_state, instance_name);
|
auto &client_info = find_client_info(coord_state, instance_name);
|
||||||
client_info.UpdateInstanceStatus();
|
client_info.UpdateInstanceStatus();
|
||||||
};
|
};
|
||||||
@ -114,7 +115,7 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
|
|||||||
this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
|
this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
|
||||||
|
|
||||||
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
|
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
|
||||||
coord_client->Endpoint());
|
coord_client->SocketAddress());
|
||||||
coord_client->StartFrequentCheck();
|
coord_client->StartFrequentCheck();
|
||||||
|
|
||||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||||
@ -143,24 +144,26 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM
|
|||||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||||
MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
|
MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
|
||||||
"Main info is not set, but callback is called");
|
"Main info is not set, but callback is called");
|
||||||
|
auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
|
||||||
|
std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
|
||||||
|
|
||||||
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
|
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
|
||||||
// this point....
|
// this point....
|
||||||
auto ®istered_main_info = std::get<CoordinatorData>(coord_state->data_).registered_main_info_;
|
auto ®istered_main_info = coord_data.registered_main_info_;
|
||||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name,
|
MG_ASSERT(registered_main_info->InstanceName() == instance_name,
|
||||||
"Callback called for wrong instance name: {}, expected: {}", instance_name,
|
"Callback called for wrong instance name: {}, expected: {}", instance_name,
|
||||||
registered_main_info->instance_name_);
|
registered_main_info->InstanceName());
|
||||||
return *registered_main_info;
|
return *registered_main_info;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto succ_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
spdlog::trace("Executing success callback for main: {}", std::string(instance_name));
|
|
||||||
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
||||||
registered_main_info.UpdateLastResponseTime();
|
registered_main_info.UpdateLastResponseTime();
|
||||||
};
|
};
|
||||||
|
|
||||||
auto fail_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
auto fail_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
|
||||||
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
auto ®istered_main_info = get_client_info(coord_state, instance_name);
|
||||||
|
// TODO: (andi) Take unique lock
|
||||||
if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
|
if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
|
||||||
// spdlog::warn("Main is not alive, starting failover");
|
// spdlog::warn("Main is not alive, starting failover");
|
||||||
// switch (auto failover_status = DoFailover(); failover_status) {
|
// switch (auto failover_status = DoFailover(); failover_status) {
|
||||||
@ -182,7 +185,7 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM
|
|||||||
std::make_unique<CoordinatorClient>(this, std::move(config), std::move(succ_cb), std::move(fail_cb));
|
std::make_unique<CoordinatorClient>(this, std::move(config), std::move(succ_cb), std::move(fail_cb));
|
||||||
|
|
||||||
std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
|
std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
|
||||||
registered_main->Endpoint());
|
registered_main->SocketAddress());
|
||||||
registered_main->StartFrequentCheck();
|
registered_main->StartFrequentCheck();
|
||||||
|
|
||||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||||
@ -197,10 +200,9 @@ auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorInstanceSt
|
|||||||
|
|
||||||
std::ranges::transform(registered_replicas_info, std::back_inserter(instances_status),
|
std::ranges::transform(registered_replicas_info, std::back_inserter(instances_status),
|
||||||
[](const CoordinatorClientInfo &coord_client_info) {
|
[](const CoordinatorClientInfo &coord_client_info) {
|
||||||
return CoordinatorInstanceStatus{
|
return CoordinatorInstanceStatus{.instance_name = coord_client_info.InstanceName(),
|
||||||
.instance_name = coord_client_info.instance_name_,
|
.socket_address = coord_client_info.SocketAddress(),
|
||||||
.socket_address = coord_client_info.endpoint->SocketAddress(),
|
.is_alive = coord_client_info.IsAlive()};
|
||||||
.is_alive = coord_client_info.is_alive_};
|
|
||||||
});
|
});
|
||||||
return instances_status;
|
return instances_status;
|
||||||
}
|
}
|
||||||
@ -213,9 +215,8 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
return CoordinatorInstanceStatus{.instance_name = main->instance_name_,
|
return CoordinatorInstanceStatus{
|
||||||
.socket_address = main->endpoint->SocketAddress(),
|
.instance_name = main->InstanceName(), .socket_address = main->SocketAddress(), .is_alive = main->IsAlive()};
|
||||||
.is_alive = main->is_alive_};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||||
@ -238,7 +239,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (current_main_info->is_alive_) {
|
if (current_main_info->IsAlive()) {
|
||||||
return DoFailoverStatus::MAIN_ALIVE;
|
return DoFailoverStatus::MAIN_ALIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +252,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||||
|
|
||||||
const auto chosen_replica_info = std::ranges::find_if(
|
const auto chosen_replica_info = std::ranges::find_if(
|
||||||
registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.is_alive_; });
|
registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.IsAlive(); });
|
||||||
if (chosen_replica_info == registered_replicas_info.end()) {
|
if (chosen_replica_info == registered_replicas_info.end()) {
|
||||||
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
||||||
}
|
}
|
||||||
@ -259,10 +260,10 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
|
|||||||
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||||
auto chosen_replica =
|
auto chosen_replica =
|
||||||
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
|
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
|
||||||
return replica.InstanceName() == chosen_replica_info->instance_name_;
|
return replica.InstanceName() == chosen_replica_info->InstanceName();
|
||||||
});
|
});
|
||||||
MG_ASSERT(chosen_replica != registered_replicas.end(), "Chosen replica {} not found in registered replicas",
|
MG_ASSERT(chosen_replica != registered_replicas.end(), "Chosen replica {} not found in registered replicas",
|
||||||
chosen_replica_info->instance_name_);
|
chosen_replica_info->InstanceName());
|
||||||
|
|
||||||
std::vector<ReplicationClientInfo> repl_clients_info;
|
std::vector<ReplicationClientInfo> repl_clients_info;
|
||||||
repl_clients_info.reserve(registered_replicas.size() - 1);
|
repl_clients_info.reserve(registered_replicas.size() - 1);
|
||||||
|
@ -48,11 +48,12 @@ class CoordinatorClient {
|
|||||||
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
|
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
|
||||||
|
|
||||||
auto InstanceName() const -> std::string_view;
|
auto InstanceName() const -> std::string_view;
|
||||||
auto Endpoint() const -> const io::network::Endpoint *;
|
auto SocketAddress() const -> std::string;
|
||||||
auto Config() const -> CoordinatorClientConfig const &;
|
auto Config() const -> CoordinatorClientConfig const &;
|
||||||
|
|
||||||
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
||||||
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
||||||
// TODO: We should add copy constructor and then there won't be need for this
|
|
||||||
auto SuccCallback() const -> HealthCheckCallback const &;
|
auto SuccCallback() const -> HealthCheckCallback const &;
|
||||||
auto FailCallback() const -> HealthCheckCallback const &;
|
auto FailCallback() const -> HealthCheckCallback const &;
|
||||||
|
|
||||||
|
@ -21,61 +21,65 @@
|
|||||||
|
|
||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
struct CoordinatorClientInfo {
|
class CoordinatorClientInfo {
|
||||||
CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint)
|
public:
|
||||||
|
CoordinatorClientInfo(std::string_view instance_name, std::string_view socket_address)
|
||||||
: last_response_time_(std::chrono::system_clock::now()),
|
: last_response_time_(std::chrono::system_clock::now()),
|
||||||
is_alive_(true),
|
is_alive_(true), // TODO: (andi) Maybe it should be false until the first ping
|
||||||
instance_name_(instance_name),
|
instance_name_(instance_name),
|
||||||
endpoint(endpoint) {}
|
socket_address_(socket_address) {}
|
||||||
|
|
||||||
~CoordinatorClientInfo() = default;
|
~CoordinatorClientInfo() = default;
|
||||||
|
|
||||||
CoordinatorClientInfo(const CoordinatorClientInfo &other)
|
CoordinatorClientInfo(const CoordinatorClientInfo &other)
|
||||||
: last_response_time_(other.last_response_time_.load()),
|
: last_response_time_(other.last_response_time_.load()),
|
||||||
is_alive_(other.is_alive_),
|
is_alive_(other.is_alive_.load()),
|
||||||
instance_name_(other.instance_name_),
|
instance_name_(other.instance_name_),
|
||||||
endpoint(other.endpoint) {}
|
socket_address_(other.socket_address_) {}
|
||||||
|
|
||||||
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
|
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
last_response_time_.store(other.last_response_time_.load());
|
last_response_time_ = other.last_response_time_.load();
|
||||||
is_alive_ = other.is_alive_;
|
is_alive_ = other.is_alive_.load();
|
||||||
instance_name_ = other.instance_name_;
|
instance_name_ = other.instance_name_;
|
||||||
endpoint = other.endpoint;
|
socket_address_ = other.socket_address_;
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
|
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
|
||||||
: last_response_time_(other.last_response_time_.load()),
|
: last_response_time_(other.last_response_time_.load()),
|
||||||
is_alive_(other.is_alive_),
|
is_alive_(other.is_alive_.load()),
|
||||||
instance_name_(other.instance_name_),
|
instance_name_(other.instance_name_),
|
||||||
endpoint(other.endpoint) {}
|
socket_address_(other.socket_address_) {}
|
||||||
|
|
||||||
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
|
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
|
||||||
if (this != &other) {
|
if (this != &other) {
|
||||||
last_response_time_.store(other.last_response_time_.load());
|
last_response_time_.store(other.last_response_time_.load());
|
||||||
is_alive_ = other.is_alive_;
|
is_alive_ = other.is_alive_.load();
|
||||||
instance_name_ = other.instance_name_;
|
instance_name_ = other.instance_name_;
|
||||||
endpoint = other.endpoint;
|
socket_address_ = other.socket_address_;
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto UpdateInstanceStatus() -> bool {
|
auto UpdateInstanceStatus() -> bool {
|
||||||
is_alive_ =
|
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
||||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_.load())
|
last_response_time_.load(std::memory_order_acquire))
|
||||||
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||||
return is_alive_;
|
return is_alive_;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto UpdateLastResponseTime() -> void { last_response_time_.store(std::chrono::system_clock::now()); }
|
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
|
||||||
|
auto InstanceName() const -> std::string_view { return instance_name_; }
|
||||||
|
auto IsAlive() const -> bool { return is_alive_; }
|
||||||
|
auto SocketAddress() const -> std::string_view { return socket_address_; }
|
||||||
|
|
||||||
// TODO: (andi) Wrap in private to forbid modification
|
private:
|
||||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||||
bool is_alive_{false};
|
std::atomic<bool> is_alive_{false};
|
||||||
std::string_view instance_name_;
|
std::string_view instance_name_;
|
||||||
const io::network::Endpoint *endpoint;
|
std::string socket_address_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
|
41
src/coordination/include/coordination/coordinator_data.hpp
Normal file
41
src/coordination/include/coordination/coordinator_data.hpp
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Copyright 2024 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#ifdef MG_ENTERPRISE
|
||||||
|
|
||||||
|
#include "coordination/coordinator_client.hpp"
|
||||||
|
#include "coordination/coordinator_client_info.hpp"
|
||||||
|
#include "coordination/coordinator_server.hpp"
|
||||||
|
#include "utils/rw_lock.hpp"
|
||||||
|
|
||||||
|
#include <list>
|
||||||
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
|
struct CoordinatorData {
|
||||||
|
std::list<CoordinatorClient> registered_replicas_;
|
||||||
|
std::list<CoordinatorClientInfo> registered_replicas_info_;
|
||||||
|
std::unique_ptr<CoordinatorClient> registered_main_;
|
||||||
|
std::optional<CoordinatorClientInfo> registered_main_info_;
|
||||||
|
|
||||||
|
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
|
||||||
|
};
|
||||||
|
|
||||||
|
struct CoordinatorMainReplicaData {
|
||||||
|
std::unique_ptr<CoordinatorServer> coordinator_server_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace memgraph::coordination
|
||||||
|
#endif
|
@ -15,13 +15,13 @@
|
|||||||
|
|
||||||
#include "io/network/endpoint.hpp"
|
#include "io/network/endpoint.hpp"
|
||||||
|
|
||||||
#include <string>
|
#include <string_view>
|
||||||
|
|
||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
struct CoordinatorInstanceStatus {
|
struct CoordinatorInstanceStatus {
|
||||||
std::string_view instance_name;
|
std::string_view instance_name;
|
||||||
std::string socket_address;
|
std::string_view socket_address;
|
||||||
bool is_alive;
|
bool is_alive;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
#include "coordination/coordinator_client.hpp"
|
#include "coordination/coordinator_client.hpp"
|
||||||
#include "coordination/coordinator_client_info.hpp"
|
#include "coordination/coordinator_client_info.hpp"
|
||||||
|
#include "coordination/coordinator_data.hpp"
|
||||||
#include "coordination/coordinator_instance_status.hpp"
|
#include "coordination/coordinator_instance_status.hpp"
|
||||||
#include "coordination/coordinator_server.hpp"
|
#include "coordination/coordinator_server.hpp"
|
||||||
#include "coordination/failover_status.hpp"
|
#include "coordination/failover_status.hpp"
|
||||||
@ -54,18 +55,6 @@ class CoordinatorState {
|
|||||||
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
|
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// TODO: Data is not thread safe
|
|
||||||
struct CoordinatorData {
|
|
||||||
std::list<CoordinatorClient> registered_replicas_;
|
|
||||||
std::list<CoordinatorClientInfo> registered_replicas_info_;
|
|
||||||
std::unique_ptr<CoordinatorClient> registered_main_;
|
|
||||||
std::optional<CoordinatorClientInfo> registered_main_info_;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct CoordinatorMainReplicaData {
|
|
||||||
std::unique_ptr<CoordinatorServer> coordinator_server_;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
|
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -96,12 +96,12 @@ class CoordinatorQueryHandler {
|
|||||||
#ifdef MG_ENTERPRISE
|
#ifdef MG_ENTERPRISE
|
||||||
struct MainReplicaStatus {
|
struct MainReplicaStatus {
|
||||||
std::string_view name;
|
std::string_view name;
|
||||||
std::string socket_address;
|
std::string_view socket_address;
|
||||||
bool alive;
|
bool alive;
|
||||||
bool is_main;
|
bool is_main;
|
||||||
|
|
||||||
MainReplicaStatus(std::string_view name, std::string socket_address, bool alive, bool is_main)
|
MainReplicaStatus(std::string_view name, std::string_view socket_address, bool alive, bool is_main)
|
||||||
: name{name}, socket_address{std::move(socket_address)}, alive{alive}, is_main{is_main} {}
|
: name{name}, socket_address{socket_address}, alive{alive}, is_main{is_main} {}
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user