Improve concurrency model
This commit is contained in:
parent
62d1b68c2f
commit
9bc0d9425e
@ -25,5 +25,5 @@ target_sources(mg-coordination
|
|||||||
target_include_directories(mg-coordination PUBLIC include)
|
target_include_directories(mg-coordination PUBLIC include)
|
||||||
|
|
||||||
target_link_libraries(mg-coordination
|
target_link_libraries(mg-coordination
|
||||||
PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue rangev3
|
PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue lib::rangev3
|
||||||
)
|
)
|
||||||
|
@ -22,8 +22,6 @@ namespace memgraph::coordination {
|
|||||||
|
|
||||||
CoordinatorData::CoordinatorData() {
|
CoordinatorData::CoordinatorData() {
|
||||||
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
|
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
|
||||||
std::shared_lock<utils::RWLock> lock{coord_data->coord_data_lock_};
|
|
||||||
|
|
||||||
auto instance = std::ranges::find_if(
|
auto instance = std::ranges::find_if(
|
||||||
coord_data->registered_instances_,
|
coord_data->registered_instances_,
|
||||||
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
|
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
|
||||||
@ -34,6 +32,7 @@ CoordinatorData::CoordinatorData() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
||||||
|
auto lock = std::lock_guard{coord_data->coord_data_lock_};
|
||||||
spdlog::trace("Instance {} performing replica successful callback", instance_name);
|
spdlog::trace("Instance {} performing replica successful callback", instance_name);
|
||||||
auto &instance = find_instance(coord_data, instance_name);
|
auto &instance = find_instance(coord_data, instance_name);
|
||||||
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
|
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
|
||||||
@ -41,6 +40,7 @@ CoordinatorData::CoordinatorData() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
||||||
|
auto lock = std::lock_guard{coord_data->coord_data_lock_};
|
||||||
spdlog::trace("Instance {} performing replica failure callback", instance_name);
|
spdlog::trace("Instance {} performing replica failure callback", instance_name);
|
||||||
auto &instance = find_instance(coord_data, instance_name);
|
auto &instance = find_instance(coord_data, instance_name);
|
||||||
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
|
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
|
||||||
@ -48,6 +48,7 @@ CoordinatorData::CoordinatorData() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
||||||
|
auto lock = std::lock_guard{coord_data->coord_data_lock_};
|
||||||
spdlog::trace("Instance {} performing main successful callback", instance_name);
|
spdlog::trace("Instance {} performing main successful callback", instance_name);
|
||||||
auto &instance = find_instance(coord_data, instance_name);
|
auto &instance = find_instance(coord_data, instance_name);
|
||||||
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
|
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
|
||||||
@ -55,6 +56,7 @@ CoordinatorData::CoordinatorData() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
|
||||||
|
auto lock = std::lock_guard{coord_data->coord_data_lock_};
|
||||||
spdlog::trace("Instance {} performing main failure callback", instance_name);
|
spdlog::trace("Instance {} performing main failure callback", instance_name);
|
||||||
auto &instance = find_instance(coord_data, instance_name);
|
auto &instance = find_instance(coord_data, instance_name);
|
||||||
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
|
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
|
||||||
@ -80,7 +82,6 @@ CoordinatorData::CoordinatorData() {
|
|||||||
|
|
||||||
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
||||||
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||||
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
|
|
||||||
|
|
||||||
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
|
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
|
||||||
|
|
||||||
@ -94,10 +95,10 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
|||||||
std::vector<ReplicationClientInfo> repl_clients_info;
|
std::vector<ReplicationClientInfo> repl_clients_info;
|
||||||
repl_clients_info.reserve(std::ranges::distance(replica_instances));
|
repl_clients_info.reserve(std::ranges::distance(replica_instances));
|
||||||
|
|
||||||
auto not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
|
auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
|
||||||
return instance != *chosen_replica_instance;
|
return instance != *chosen_replica_instance;
|
||||||
};
|
};
|
||||||
auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
|
auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
|
||||||
|
|
||||||
// TODO (antoniofilipovic): Should we send also data on old MAIN???
|
// TODO (antoniofilipovic): Should we send also data on old MAIN???
|
||||||
// TODO: (andi) Don't send replicas which aren't alive
|
// TODO: (andi) Don't send replicas which aren't alive
|
||||||
@ -117,7 +118,7 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorData::ShowMain() const -> std::optional<CoordinatorInstanceStatus> {
|
auto CoordinatorData::ShowMain() const -> std::optional<CoordinatorInstanceStatus> {
|
||||||
std::shared_lock<utils::RWLock> lock{coord_data_lock_};
|
auto lock = std::shared_lock{coord_data_lock_};
|
||||||
auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
|
auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
|
||||||
if (main_instance == registered_instances_.end()) {
|
if (main_instance == registered_instances_.end()) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
@ -129,7 +130,7 @@ auto CoordinatorData::ShowMain() const -> std::optional<CoordinatorInstanceStatu
|
|||||||
};
|
};
|
||||||
|
|
||||||
auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> {
|
auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> {
|
||||||
std::shared_lock<utils::RWLock> lock{coord_data_lock_};
|
auto lock = std::shared_lock{coord_data_lock_};
|
||||||
std::vector<CoordinatorInstanceStatus> instances_status;
|
std::vector<CoordinatorInstanceStatus> instances_status;
|
||||||
|
|
||||||
for (const auto &replica_instance : registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica)) {
|
for (const auto &replica_instance : registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica)) {
|
||||||
@ -142,8 +143,7 @@ auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceSta
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
|
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
|
||||||
// TODO: (andi) test this
|
auto lock = std::lock_guard{coord_data_lock_};
|
||||||
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
|
|
||||||
|
|
||||||
// Find replica we already registered
|
// Find replica we already registered
|
||||||
auto registered_replica = std::find_if(
|
auto registered_replica = std::find_if(
|
||||||
@ -185,7 +185,7 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
|
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
|
||||||
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
|
auto lock = std::lock_guard{coord_data_lock_};
|
||||||
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
|
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
|
||||||
return instance.InstanceName() == config.instance_name;
|
return instance.InstanceName() == config.instance_name;
|
||||||
})) {
|
})) {
|
||||||
|
@ -36,8 +36,7 @@ class CoordinatorInstance {
|
|||||||
~CoordinatorInstance() = default;
|
~CoordinatorInstance() = default;
|
||||||
|
|
||||||
auto UpdateInstanceStatus() -> bool {
|
auto UpdateInstanceStatus() -> bool {
|
||||||
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_)
|
||||||
last_response_time_.load(std::memory_order_acquire))
|
|
||||||
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||||
return is_alive_;
|
return is_alive_;
|
||||||
}
|
}
|
||||||
@ -66,8 +65,8 @@ class CoordinatorInstance {
|
|||||||
|
|
||||||
CoordinatorClient client_;
|
CoordinatorClient client_;
|
||||||
replication_coordination_glue::ReplicationRole replication_role_;
|
replication_coordination_glue::ReplicationRole replication_role_;
|
||||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
std::chrono::system_clock::time_point last_response_time_{};
|
||||||
std::atomic<bool> is_alive_{false};
|
bool is_alive_{false};
|
||||||
|
|
||||||
friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) {
|
friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) {
|
||||||
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
|
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
|
||||||
|
Loading…
Reference in New Issue
Block a user