From ab34b060c02ae3f36ef90544a655ecab0be4ec77 Mon Sep 17 00:00:00 2001 From: Andi Skrgat Date: Mon, 22 Jan 2024 11:57:56 +0100 Subject: [PATCH] Thread-safe access to coordinator data --- src/coordination/CMakeLists.txt | 1 + src/coordination/coordinator_client.cpp | 3 +- src/coordination/coordinator_state.cpp | 57 ++++++++++--------- .../coordination/coordinator_client.hpp | 5 +- .../coordination/coordinator_client_info.hpp | 44 +++++++------- .../include/coordination/coordinator_data.hpp | 41 +++++++++++++ .../coordinator_instance_status.hpp | 4 +- .../coordination/coordinator_state.hpp | 13 +---- src/query/interpreter.hpp | 6 +- 9 files changed, 105 insertions(+), 69 deletions(-) create mode 100644 src/coordination/include/coordination/coordinator_data.hpp diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 98d682830..a5fe8e8a0 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -9,6 +9,7 @@ target_sources(mg-coordination include/coordination/coordinator_config.hpp include/coordination/coordinator_exceptions.hpp include/coordination/coordinator_slk.hpp + include/coordination/coordinator_data.hpp include/coordination/constants.hpp include/coordination/failover_status.hpp include/coordination/coordinator_client_info.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 1d7b85822..ff005f217 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -71,8 +71,7 @@ void CoordinatorClient::StartFrequentCheck() { void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; } -auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); } -// TODO: remove these method and implement copy constructor +auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); } auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; } auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; } auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; } diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index bc21d1961..eed28885b 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -85,27 +85,28 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { MG_ASSERT(std::holds_alternative(coord_state->data_), "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - auto ®istered_replicas_info = std::get(coord_state->data_).registered_replicas_info_; + auto &coord_data = std::get(coord_state->data_); + std::shared_lock lock{coord_data.coord_data_lock_}; auto replica_client_info = std::ranges::find_if( - registered_replicas_info, - [instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; }); + coord_data.registered_replicas_info_, + [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; }); - if (replica_client_info != registered_replicas_info.end()) { + if (replica_client_info != coord_data.registered_replicas_info_.end()) { return *replica_client_info; } - auto ®istered_main_info = std::get(coord_state->data_).registered_main_info_; - MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main..."); - return *registered_main_info; + MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name, + "Instance is neither a replica nor main..."); + return *coord_data.registered_main_info_; }; - auto repl_succ_cb = [&find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { auto &client_info = find_client_info(coord_state, instance_name); client_info.UpdateLastResponseTime(); }; - auto repl_fail_cb = [&find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { auto &client_info = find_client_info(coord_state, instance_name); client_info.UpdateInstanceStatus(); }; @@ -114,7 +115,7 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), - coord_client->Endpoint()); + coord_client->SocketAddress()); coord_client->StartFrequentCheck(); return RegisterMainReplicaCoordinatorStatus::SUCCESS; @@ -143,24 +144,26 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), "Main info is not set, but callback is called"); + auto &coord_data = std::get(coord_state->data_); + std::shared_lock lock{coord_data.coord_data_lock_}; // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at // this point.... - auto ®istered_main_info = std::get(coord_state->data_).registered_main_info_; - MG_ASSERT(registered_main_info->instance_name_ == instance_name, + auto ®istered_main_info = coord_data.registered_main_info_; + MG_ASSERT(registered_main_info->InstanceName() == instance_name, "Callback called for wrong instance name: {}, expected: {}", instance_name, - registered_main_info->instance_name_); + registered_main_info->InstanceName()); return *registered_main_info; }; - auto succ_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - spdlog::trace("Executing success callback for main: {}", std::string(instance_name)); + auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { auto ®istered_main_info = get_client_info(coord_state, instance_name); registered_main_info.UpdateLastResponseTime(); }; - auto fail_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto fail_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { auto ®istered_main_info = get_client_info(coord_state, instance_name); + // TODO: (andi) Take unique lock if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { // spdlog::warn("Main is not alive, starting failover"); // switch (auto failover_status = DoFailover(); failover_status) { @@ -182,7 +185,7 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM std::make_unique(this, std::move(config), std::move(succ_cb), std::move(fail_cb)); std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), - registered_main->Endpoint()); + registered_main->SocketAddress()); registered_main->StartFrequentCheck(); return RegisterMainReplicaCoordinatorStatus::SUCCESS; @@ -197,10 +200,9 @@ auto CoordinatorState::ShowReplicas() const -> std::vectorSocketAddress(), - .is_alive = coord_client_info.is_alive_}; + return CoordinatorInstanceStatus{.instance_name = coord_client_info.InstanceName(), + .socket_address = coord_client_info.SocketAddress(), + .is_alive = coord_client_info.IsAlive()}; }); return instances_status; } @@ -213,9 +215,8 @@ auto CoordinatorState::ShowMain() const -> std::optionalinstance_name_, - .socket_address = main->endpoint->SocketAddress(), - .is_alive = main->is_alive_}; + return CoordinatorInstanceStatus{ + .instance_name = main->InstanceName(), .socket_address = main->SocketAddress(), .is_alive = main->IsAlive()}; }; [[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus { @@ -238,7 +239,7 @@ auto CoordinatorState::ShowMain() const -> std::optionalis_alive_) { + if (current_main_info->IsAlive()) { return DoFailoverStatus::MAIN_ALIVE; } @@ -251,7 +252,7 @@ auto CoordinatorState::ShowMain() const -> std::optional(data_).registered_replicas_info_; const auto chosen_replica_info = std::ranges::find_if( - registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.is_alive_; }); + registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.IsAlive(); }); if (chosen_replica_info == registered_replicas_info.end()) { return DoFailoverStatus::ALL_REPLICAS_DOWN; } @@ -259,10 +260,10 @@ auto CoordinatorState::ShowMain() const -> std::optional(data_).registered_replicas_; auto chosen_replica = std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) { - return replica.InstanceName() == chosen_replica_info->instance_name_; + return replica.InstanceName() == chosen_replica_info->InstanceName(); }); MG_ASSERT(chosen_replica != registered_replicas.end(), "Chosen replica {} not found in registered replicas", - chosen_replica_info->instance_name_); + chosen_replica_info->InstanceName()); std::vector repl_clients_info; repl_clients_info.reserve(registered_replicas.size() - 1); diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 296880745..461251337 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -48,11 +48,12 @@ class CoordinatorClient { auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; auto InstanceName() const -> std::string_view; - auto Endpoint() const -> const io::network::Endpoint *; + auto SocketAddress() const -> std::string; auto Config() const -> CoordinatorClientConfig const &; + auto ReplicationClientInfo() const -> ReplClientInfo const &; auto ReplicationClientInfo() -> std::optional &; - // TODO: We should add copy constructor and then there won't be need for this + auto SuccCallback() const -> HealthCheckCallback const &; auto FailCallback() const -> HealthCheckCallback const &; diff --git a/src/coordination/include/coordination/coordinator_client_info.hpp b/src/coordination/include/coordination/coordinator_client_info.hpp index 1a725a51c..924fdea78 100644 --- a/src/coordination/include/coordination/coordinator_client_info.hpp +++ b/src/coordination/include/coordination/coordinator_client_info.hpp @@ -21,61 +21,65 @@ namespace memgraph::coordination { -struct CoordinatorClientInfo { - CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint) +class CoordinatorClientInfo { + public: + CoordinatorClientInfo(std::string_view instance_name, std::string_view socket_address) : last_response_time_(std::chrono::system_clock::now()), - is_alive_(true), + is_alive_(true), // TODO: (andi) Maybe it should be false until the first ping instance_name_(instance_name), - endpoint(endpoint) {} + socket_address_(socket_address) {} ~CoordinatorClientInfo() = default; CoordinatorClientInfo(const CoordinatorClientInfo &other) : last_response_time_(other.last_response_time_.load()), - is_alive_(other.is_alive_), + is_alive_(other.is_alive_.load()), instance_name_(other.instance_name_), - endpoint(other.endpoint) {} + socket_address_(other.socket_address_) {} CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) { if (this != &other) { - last_response_time_.store(other.last_response_time_.load()); - is_alive_ = other.is_alive_; + last_response_time_ = other.last_response_time_.load(); + is_alive_ = other.is_alive_.load(); instance_name_ = other.instance_name_; - endpoint = other.endpoint; + socket_address_ = other.socket_address_; } return *this; } CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept : last_response_time_(other.last_response_time_.load()), - is_alive_(other.is_alive_), + is_alive_(other.is_alive_.load()), instance_name_(other.instance_name_), - endpoint(other.endpoint) {} + socket_address_(other.socket_address_) {} CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept { if (this != &other) { last_response_time_.store(other.last_response_time_.load()); - is_alive_ = other.is_alive_; + is_alive_ = other.is_alive_.load(); instance_name_ = other.instance_name_; - endpoint = other.endpoint; + socket_address_ = other.socket_address_; } return *this; } auto UpdateInstanceStatus() -> bool { - is_alive_ = - std::chrono::duration_cast(std::chrono::system_clock::now() - last_response_time_.load()) - .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_; + is_alive_ = std::chrono::duration_cast(std::chrono::system_clock::now() - + last_response_time_.load(std::memory_order_acquire)) + .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_; return is_alive_; } - auto UpdateLastResponseTime() -> void { last_response_time_.store(std::chrono::system_clock::now()); } + auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); } + auto InstanceName() const -> std::string_view { return instance_name_; } + auto IsAlive() const -> bool { return is_alive_; } + auto SocketAddress() const -> std::string_view { return socket_address_; } - // TODO: (andi) Wrap in private to forbid modification + private: std::atomic last_response_time_{}; - bool is_alive_{false}; + std::atomic is_alive_{false}; std::string_view instance_name_; - const io::network::Endpoint *endpoint; + std::string socket_address_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_data.hpp b/src/coordination/include/coordination/coordinator_data.hpp new file mode 100644 index 000000000..79791f008 --- /dev/null +++ b/src/coordination/include/coordination/coordinator_data.hpp @@ -0,0 +1,41 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_client.hpp" +#include "coordination/coordinator_client_info.hpp" +#include "coordination/coordinator_server.hpp" +#include "utils/rw_lock.hpp" + +#include +#include +#include + +namespace memgraph::coordination { + +struct CoordinatorData { + std::list registered_replicas_; + std::list registered_replicas_info_; + std::unique_ptr registered_main_; + std::optional registered_main_info_; + + mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; +}; + +struct CoordinatorMainReplicaData { + std::unique_ptr coordinator_server_; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/coordination/coordinator_instance_status.hpp b/src/coordination/include/coordination/coordinator_instance_status.hpp index b220636f8..0904cb9a6 100644 --- a/src/coordination/include/coordination/coordinator_instance_status.hpp +++ b/src/coordination/include/coordination/coordinator_instance_status.hpp @@ -15,13 +15,13 @@ #include "io/network/endpoint.hpp" -#include +#include namespace memgraph::coordination { struct CoordinatorInstanceStatus { std::string_view instance_name; - std::string socket_address; + std::string_view socket_address; bool is_alive; }; diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 7c63d7075..964b0d438 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -15,6 +15,7 @@ #include "coordination/coordinator_client.hpp" #include "coordination/coordinator_client_info.hpp" +#include "coordination/coordinator_data.hpp" #include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" #include "coordination/failover_status.hpp" @@ -54,18 +55,6 @@ class CoordinatorState { [[nodiscard]] auto DoFailover() -> DoFailoverStatus; private: - // TODO: Data is not thread safe - struct CoordinatorData { - std::list registered_replicas_; - std::list registered_replicas_info_; - std::unique_ptr registered_main_; - std::optional registered_main_info_; - }; - - struct CoordinatorMainReplicaData { - std::unique_ptr coordinator_server_; - }; - std::variant data_; }; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 1c9265006..b7c05ec6e 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -96,12 +96,12 @@ class CoordinatorQueryHandler { #ifdef MG_ENTERPRISE struct MainReplicaStatus { std::string_view name; - std::string socket_address; + std::string_view socket_address; bool alive; bool is_main; - MainReplicaStatus(std::string_view name, std::string socket_address, bool alive, bool is_main) - : name{name}, socket_address{std::move(socket_address)}, alive{alive}, is_main{is_main} {} + MainReplicaStatus(std::string_view name, std::string_view socket_address, bool alive, bool is_main) + : name{name}, socket_address{socket_address}, alive{alive}, is_main{is_main} {} }; #endif