From 9bc0d9425e193a29659c7357a6a87136d9a08a28 Mon Sep 17 00:00:00 2001 From: Andi Skrgat Date: Thu, 25 Jan 2024 15:20:14 +0100 Subject: [PATCH] Improve concurrency model --- src/coordination/CMakeLists.txt | 2 +- src/coordination/coordinator_data.cpp | 20 +++++++++---------- .../coordination/coordinator_instance.hpp | 7 +++---- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index acf3e7d0f..e8c4b3735 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -25,5 +25,5 @@ target_sources(mg-coordination target_include_directories(mg-coordination PUBLIC include) target_link_libraries(mg-coordination - PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue rangev3 + PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue lib::rangev3 ) diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp index 1bc271d5b..7908f17ec 100644 --- a/src/coordination/coordinator_data.cpp +++ b/src/coordination/coordinator_data.cpp @@ -22,8 +22,6 @@ namespace memgraph::coordination { CoordinatorData::CoordinatorData() { auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & { - std::shared_lock lock{coord_data->coord_data_lock_}; - auto instance = std::ranges::find_if( coord_data->registered_instances_, [instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; }); @@ -34,6 +32,7 @@ CoordinatorData::CoordinatorData() { }; replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { + auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing replica successful callback", instance_name); auto &instance = find_instance(coord_data, instance_name); MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); @@ -41,6 +40,7 @@ CoordinatorData::CoordinatorData() { }; replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { + auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing replica failure callback", instance_name); auto &instance = find_instance(coord_data, instance_name); MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); @@ -48,6 +48,7 @@ CoordinatorData::CoordinatorData() { }; main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { + auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing main successful callback", instance_name); auto &instance = find_instance(coord_data, instance_name); MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); @@ -55,6 +56,7 @@ CoordinatorData::CoordinatorData() { }; main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { + auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing main failure callback", instance_name); auto &instance = find_instance(coord_data, instance_name); MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); @@ -80,7 +82,6 @@ CoordinatorData::CoordinatorData() { auto CoordinatorData::DoFailover() -> DoFailoverStatus { using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; - std::lock_guard lock{coord_data_lock_}; auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica); @@ -94,10 +95,10 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus { std::vector repl_clients_info; repl_clients_info.reserve(std::ranges::distance(replica_instances)); - auto not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) { + auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) { return instance != *chosen_replica_instance; }; - auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); }; + auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); }; // TODO (antoniofilipovic): Should we send also data on old MAIN??? // TODO: (andi) Don't send replicas which aren't alive @@ -117,7 +118,7 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus { } auto CoordinatorData::ShowMain() const -> std::optional { - std::shared_lock lock{coord_data_lock_}; + auto lock = std::shared_lock{coord_data_lock_}; auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain); if (main_instance == registered_instances_.end()) { return std::nullopt; @@ -129,7 +130,7 @@ auto CoordinatorData::ShowMain() const -> std::optional std::vector { - std::shared_lock lock{coord_data_lock_}; + auto lock = std::shared_lock{coord_data_lock_}; std::vector instances_status; for (const auto &replica_instance : registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica)) { @@ -142,8 +143,7 @@ auto CoordinatorData::ShowReplicas() const -> std::vector SetInstanceToMainCoordinatorStatus { - // TODO: (andi) test this - std::lock_guard lock{coord_data_lock_}; + auto lock = std::lock_guard{coord_data_lock_}; // Find replica we already registered auto registered_replica = std::find_if( @@ -185,7 +185,7 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc } auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { - std::lock_guard lock{coord_data_lock_}; + auto lock = std::lock_guard{coord_data_lock_}; if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { return instance.InstanceName() == config.instance_name; })) { diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index c4779963c..31a6d8204 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -36,8 +36,7 @@ class CoordinatorInstance { ~CoordinatorInstance() = default; auto UpdateInstanceStatus() -> bool { - is_alive_ = std::chrono::duration_cast(std::chrono::system_clock::now() - - last_response_time_.load(std::memory_order_acquire)) + is_alive_ = std::chrono::duration_cast(std::chrono::system_clock::now() - last_response_time_) .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_; return is_alive_; } @@ -66,8 +65,8 @@ class CoordinatorInstance { CoordinatorClient client_; replication_coordination_glue::ReplicationRole replication_role_; - std::atomic last_response_time_{}; - std::atomic is_alive_{false}; + std::chrono::system_clock::time_point last_response_time_{}; + bool is_alive_{false}; friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) { return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;