From c5bf7b3c03895a1fd709868ba96c3d3e115c5b62 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Fri, 26 Apr 2019 16:06:15 +0200 Subject: [PATCH] Refactor HA coordination Reviewers: msantl, ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1992 --- src/database/single_node_ha/config.cpp | 15 --- src/database/single_node_ha/config.hpp | 4 - src/database/single_node_ha/graph_db.hpp | 3 +- src/raft/coordination.cpp | 145 +++++++++++------------ src/raft/coordination.hpp | 113 ++++++++---------- src/raft/exceptions.hpp | 6 +- src/raft/raft_server.cpp | 21 ++-- src/raft/storage_info.cpp | 6 +- 8 files changed, 135 insertions(+), 178 deletions(-) diff --git a/src/database/single_node_ha/config.cpp b/src/database/single_node_ha/config.cpp index fa4930c1b..69d9585f8 100644 --- a/src/database/single_node_ha/config.cpp +++ b/src/database/single_node_ha/config.cpp @@ -24,16 +24,6 @@ DEFINE_string(properties_on_disk, "", "Property names of properties which will be stored on available " "disk. Property names have to be separated with comma (,)."); -// RPC flags. -DEFINE_VALIDATED_HIDDEN_int32( - rpc_num_client_workers, std::max(std::thread::hardware_concurrency(), 1U), - "Number of client workers (RPC)", - FLAG_IN_RANGE(1, std::numeric_limits::max())); -DEFINE_VALIDATED_HIDDEN_int32( - rpc_num_server_workers, std::max(std::thread::hardware_concurrency(), 1U), - "Number of server workers (RPC)", - FLAG_IN_RANGE(1, std::numeric_limits::max())); - // High availability. DEFINE_string( coordination_config_file, "coordination.json", @@ -55,11 +45,6 @@ database::Config::Config() query_execution_time_sec{FLAGS_query_execution_time_sec}, // Data location. properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")), - // RPC flags. - rpc_num_client_workers{ - static_cast(FLAGS_rpc_num_client_workers)}, - rpc_num_server_workers{ - static_cast(FLAGS_rpc_num_server_workers)}, // High availability. coordination_config_file{FLAGS_coordination_config_file}, raft_config_file{FLAGS_raft_config_file}, diff --git a/src/database/single_node_ha/config.hpp b/src/database/single_node_ha/config.hpp index 0c9d78aa9..ef123c24d 100644 --- a/src/database/single_node_ha/config.hpp +++ b/src/database/single_node_ha/config.hpp @@ -23,10 +23,6 @@ struct Config { // set of properties which will be stored on disk std::vector properties_on_disk; - // RPC flags. - uint16_t rpc_num_client_workers; - uint16_t rpc_num_server_workers; - // HA flags. std::string coordination_config_file; std::string raft_config_file; diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index afc14c786..60bbf804f 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -130,9 +130,8 @@ class GraphDb { std::unique_ptr storage_ = std::make_unique(config_.properties_on_disk); raft::Coordination coordination_{ - config_.rpc_num_server_workers, config_.rpc_num_client_workers, config_.server_id, - raft::Coordination::LoadFromFile(config_.coordination_config_file)}; + raft::LoadNodesFromFile(config_.coordination_config_file)}; raft::RaftServer raft_server_{ config_.server_id, config_.durability_directory, diff --git a/src/raft/coordination.cpp b/src/raft/coordination.cpp index 2192fd93e..746923809 100644 --- a/src/raft/coordination.cpp +++ b/src/raft/coordination.cpp @@ -1,9 +1,6 @@ #include "raft/coordination.hpp" -#include - -#include "glog/logging.h" -#include "json/json.hpp" +#include #include "utils/file.hpp" #include "utils/string.hpp" @@ -12,15 +9,60 @@ namespace raft { namespace fs = std::filesystem; +std::unordered_map LoadNodesFromFile( + const std::string &coordination_config_file) { + if (!fs::exists(coordination_config_file)) + throw RaftCoordinationConfigException("file (" + coordination_config_file + + ") doesn't exist"); + + std::unordered_map nodes; + nlohmann::json data; + try { + data = nlohmann::json::parse( + utils::Join(utils::ReadLines(coordination_config_file), "")); + } catch (const nlohmann::json::parse_error &e) { + throw RaftCoordinationConfigException("invalid json"); + } + + if (!data.is_array()) throw RaftCoordinationConfigException("not an array"); + + for (auto &it : data) { + if (!it.is_array()) + throw RaftCoordinationConfigException("element not an array"); + + if (it.size() != 3) + throw RaftCoordinationConfigException("invalid number of subelements"); + + if (!it[0].is_number_unsigned() || !it[1].is_string() || + !it[2].is_number_unsigned()) + throw RaftCoordinationConfigException("subelement data is invalid"); + + nodes[it[0]] = io::network::Endpoint{it[1], it[2]}; + } + + return nodes; +} + Coordination::Coordination( - uint16_t server_workers_count, uint16_t client_workers_count, - uint16_t worker_id, - std::unordered_map workers) - : server_(workers[worker_id], server_workers_count), - worker_id_(worker_id), - workers_(workers) { - for (const auto &worker : workers_) { - client_locks_[worker.first] = std::make_unique(); + uint16_t node_id, + std::unordered_map all_nodes) + : node_id_(node_id), + cluster_size_(all_nodes.size()), + server_(all_nodes[node_id], all_nodes.size() * 2) { + // Create all client elements. + endpoints_.resize(cluster_size_); + clients_.resize(cluster_size_); + client_locks_.resize(cluster_size_); + + // Initialize all client elements. + for (uint16_t i = 1; i <= cluster_size_; ++i) { + auto it = all_nodes.find(i); + if (it == all_nodes.end()) { + throw RaftCoordinationConfigException("missing endpoint for node " + + std::to_string(i)); + } + endpoints_[i - 1] = it->second; + client_locks_[i - 1] = std::make_unique(); } } @@ -28,59 +70,30 @@ Coordination::~Coordination() { CHECK(!alive_) << "You must call Shutdown and AwaitShutdown on Coordination!"; } -std::unordered_map Coordination::LoadFromFile( - const std::string &coordination_config_file) { - if (!fs::exists(coordination_config_file)) - throw RaftCoordinationConfigException(coordination_config_file); - - std::unordered_map workers; - nlohmann::json data; - try { - data = nlohmann::json::parse( - utils::Join(utils::ReadLines(coordination_config_file), "")); - } catch (const nlohmann::json::parse_error &e) { - throw RaftCoordinationConfigException(coordination_config_file); +std::vector Coordination::GetAllNodeIds() { + std::vector ret; + ret.reserve(cluster_size_); + for (uint16_t i = 1; i <= cluster_size_; ++i) { + ret.push_back(i); } + return ret; +} - if (!data.is_array()) - throw RaftCoordinationConfigException(coordination_config_file); - - for (auto &it : data) { - if (!it.is_array() || it.size() != 3) - throw RaftCoordinationConfigException(coordination_config_file); - - workers[it[0]] = io::network::Endpoint{it[1], it[2]}; +std::vector Coordination::GetOtherNodeIds() { + std::vector ret; + ret.reserve(cluster_size_ - 1); + for (uint16_t i = 1; i <= cluster_size_; ++i) { + if (i == node_id_) continue; + ret.push_back(i); } - - return workers; + return ret; } -io::network::Endpoint Coordination::GetEndpoint(int worker_id) { - std::lock_guard guard(lock_); - auto found = workers_.find(worker_id); - CHECK(found != workers_.end()) - << "No endpoint registered for worker id: " << worker_id; - return found->second; -} +uint16_t Coordination::GetAllNodeCount() { return cluster_size_; } -io::network::Endpoint Coordination::GetServerEndpoint() { - return server_.endpoint(); -} +uint16_t Coordination::GetOtherNodeCount() { return cluster_size_ - 1; } -std::vector Coordination::GetWorkerIds() { - std::lock_guard guard(lock_); - std::vector worker_ids; - for (auto worker : workers_) worker_ids.push_back(worker.first); - return worker_ids; -} - -uint16_t Coordination::WorkerCount() { return workers_.size(); } - -bool Coordination::Start() { - if (!server_.Start()) return false; - AddWorker(worker_id_, server_.endpoint()); - return true; -} +bool Coordination::Start() { return server_.Start(); } void Coordination::AwaitShutdown( std::function call_before_shutdown) { @@ -99,20 +112,4 @@ void Coordination::AwaitShutdown( void Coordination::Shutdown() { alive_.store(false); } -std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) { - std::lock_guard guard(lock_); - for (const auto &worker : workers_) { - if (worker.second == endpoint) { - return fmt::format("worker {} ({})", worker.first, worker.second); - } - } - return fmt::format("unknown worker ({})", endpoint); -} - -void Coordination::AddWorker(int worker_id, - const io::network::Endpoint &endpoint) { - std::lock_guard guard(lock_); - workers_.insert({worker_id, endpoint}); -} - } // namespace raft diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp index 85d9cdda1..3e211f204 100644 --- a/src/raft/coordination.hpp +++ b/src/raft/coordination.hpp @@ -3,45 +3,48 @@ #pragma once #include -#include #include #include #include #include #include -#include #include #include +#include + #include "communication/rpc/client.hpp" #include "communication/rpc/server.hpp" #include "io/network/endpoint.hpp" #include "raft/exceptions.hpp" -#include "utils/thread.hpp" namespace raft { -/// This class is responsible for coordination between workers (nodes) within -/// the Raft cluster. Its implementation is quite similar to coordination -/// in distributed Memgraph apart from slight modifications which align more -/// closely to Raft. +/// Loads raft cluster configuration from file. +/// +/// File format: +/// [[node_id, "node_address", node_port], ...] +std::unordered_map LoadNodesFromFile( + const std::string &coordination_config_file); + +/// This class is responsible for coordination between nodes within the Raft +/// cluster. Its implementation is quite similar to coordination in distributed +/// Memgraph apart from slight modifications which align more closely to Raft. /// /// It should be noted that, in the context of communication, all nodes within /// the Raft cluster are considered equivalent and are henceforth known simply -/// as workers. +/// as nodes. /// /// This class is thread safe. class Coordination final { public: /// Class constructor /// - /// @param server_workers_count Number of workers in RPC Server. - /// @param client_workers_count Number of workers in RPC Client. - /// @param worker_id ID of Raft worker (node) on this machine. - /// @param workers mapping from worker id to endpoint information. - Coordination(uint16_t server_workers_count, uint16_t client_workers_count, - uint16_t worker_id, - std::unordered_map workers); + /// @param node_id ID of Raft node on this machine. + /// @param node mapping from node_id to endpoint information (for the whole + /// cluster). + Coordination(uint16_t node_id, + std::unordered_map all_nodes); ~Coordination(); @@ -50,59 +53,46 @@ class Coordination final { Coordination &operator=(const Coordination &) = delete; Coordination &operator=(Coordination &&) = delete; - /// Gets the endpoint for the given `worker_id`. - io::network::Endpoint GetEndpoint(int worker_id); + /// Returns all node IDs. + std::vector GetAllNodeIds(); - /// Gets the endpoint for this RPC server. - io::network::Endpoint GetServerEndpoint(); + /// Returns other node IDs (excluding this node). + std::vector GetOtherNodeIds(); - /// Returns all workers ids. - std::vector GetWorkerIds(); + /// Returns total number of nodes. + uint16_t GetAllNodeCount(); - uint16_t WorkerCount(); + /// Returns number of other nodes. + uint16_t GetOtherNodeCount(); - /// Executes a RPC on another worker in the cluster. If the RPC execution + /// Executes a RPC on another node in the cluster. If the RPC execution /// fails (because of underlying network issues) it returns a `std::nullopt`. template - std::optional ExecuteOnOtherWorker( - uint16_t worker_id, Args &&... args) { - CHECK(worker_id != worker_id_) << "Trying to execute RPC on self!"; + std::optional ExecuteOnOtherNode( + uint16_t other_id, Args &&... args) { + CHECK(other_id != node_id_) << "Trying to execute RPC on self!"; + CHECK(other_id >= 1 && other_id <= cluster_size_) << "Invalid node id!"; - communication::rpc::Client *client = nullptr; - std::mutex *client_lock = nullptr; - { - std::lock_guard guard(lock_); + auto &lock = *client_locks_[other_id - 1].get(); + auto &client = clients_[other_id - 1]; - auto found = clients_.find(worker_id); - if (found != clients_.end()) { - client = &found->second; - } else { - auto found_endpoint = workers_.find(worker_id); - CHECK(found_endpoint != workers_.end()) - << "No endpoint registered for worker id: " << worker_id; - auto &endpoint = found_endpoint->second; - auto it = clients_.emplace(worker_id, endpoint); - client = &it.first->second; - } + std::lock_guard guard(lock); - auto lock_found = client_locks_.find(worker_id); - CHECK(lock_found != client_locks_.end()) - << "No client lock for worker id: " << worker_id; - client_lock = lock_found->second.get(); + if (!client) { + const auto &endpoint = endpoints_[other_id - 1]; + client = std::make_unique(endpoint); } try { - std::lock_guard guard(*client_lock); return client->Call(std::forward(args)...); } catch (...) { // Invalidate the client so that we reconnect next time. - std::lock_guard guard(lock_); - CHECK(clients_.erase(worker_id) == 1) - << "Couldn't remove client for worker id: " << worker_id; + client = nullptr; return std::nullopt; } } + /// Registers a RPC call on this node. template void Register(std::function< void(const typename TRequestResponse::Request::Capnp::Reader &, @@ -111,6 +101,7 @@ class Coordination final { server_.Register(callback); } + /// Registers an extended RPC call on this node. template void Register(std::function< void(const io::network::Endpoint &, @@ -120,35 +111,27 @@ class Coordination final { server_.Register(callback); } - static std::unordered_map LoadFromFile( - const std::string &coordination_config_file); - /// Starts the coordination and its servers. bool Start(); + /// Blocks until the coordination is shut down. Accepts a callback function + /// that is called to clean up all services that should be stopped before the + /// coordination. void AwaitShutdown(std::function call_before_shutdown); /// Hints that the coordination should start shutting down the whole cluster. void Shutdown(); - /// Gets a worker name for the given endpoint. - std::string GetWorkerName(const io::network::Endpoint &endpoint); - private: - /// Adds a worker to the coordination. This function can be called multiple - /// times to replace an existing worker. - void AddWorker(int worker_id, const io::network::Endpoint &endpoint); + uint16_t node_id_; + uint16_t cluster_size_; communication::rpc::Server server_; - uint16_t worker_id_; - mutable std::mutex lock_; - std::unordered_map workers_; + std::vector endpoints_; + std::vector> clients_; + std::vector> client_locks_; - std::unordered_map clients_; - std::unordered_map> client_locks_; - - // Flags used for shutdown. std::atomic alive_{true}; }; diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index add00f93b..4dafa73f2 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -42,9 +42,9 @@ class RaftConfigException : public RaftException { class RaftCoordinationConfigException : public RaftException { public: using RaftException::RaftException; - explicit RaftCoordinationConfigException(const std::string &path) - : RaftException("Unable to parse raft coordination config file " + path) { - } + explicit RaftCoordinationConfigException(const std::string &msg) + : RaftException("Unable to parse raft coordination config file: " + msg + + "!") {} }; /// This exception should be thrown when a `RaftServer` instance attempts diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index bee2897f7..295edf6b9 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -75,7 +75,7 @@ void RaftServer::Start() { } // Peer state initialization - int cluster_size = coordination_->WorkerCount() + 1; + auto cluster_size = coordination_->GetAllNodeCount() + 1; next_index_.resize(cluster_size); match_index_.resize(cluster_size); next_heartbeat_.resize(cluster_size); @@ -299,8 +299,7 @@ void RaftServer::Start() { SetNextElectionTimePoint(); election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this); - for (const auto &peer_id : coordination_->GetWorkerIds()) { - if (peer_id == server_id_) continue; + for (auto peer_id : coordination_->GetOtherNodeIds()) { peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); } @@ -574,7 +573,7 @@ void RaftServer::Transition(const Mode &new_mode) { SetVotedFor(server_id_); granted_votes_ = 1; - vote_requested_.assign(coordination_->WorkerCount(), false); + vote_requested_.assign(coordination_->GetAllNodeCount(), false); mode_ = Mode::CANDIDATE; @@ -602,7 +601,7 @@ void RaftServer::Transition(const Mode &new_mode) { // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server // is initialized to leader's last log index + 1" - for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) { + for (int i = 1; i <= coordination_->GetAllNodeCount(); ++i) { next_index_[i] = log_size_; match_index_[i] = 0; } @@ -627,7 +626,7 @@ void RaftServer::AdvanceCommitIndex() { << "Commit index can only be advanced by the leader"; std::vector known_replication_indices; - for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) { + for (int i = 1; i <= coordination_->GetAllNodeCount(); ++i) { if (i != server_id_) known_replication_indices.push_back(match_index_[i]); else @@ -636,7 +635,7 @@ void RaftServer::AdvanceCommitIndex() { std::sort(known_replication_indices.begin(), known_replication_indices.end()); uint64_t new_commit_index = - known_replication_indices[(coordination_->WorkerCount() - 1) / 2]; + known_replication_indices[(coordination_->GetAllNodeCount() - 1) / 2]; // This can happen because we reset `match_index` vector to 0 after a // new leader has been elected. @@ -709,7 +708,7 @@ void RaftServer::SendLogEntries( // Execute the RPC. lock->unlock(); - auto reply = coordination_->ExecuteOnOtherWorker( + auto reply = coordination_->ExecuteOnOtherNode( peer_id, server_id, commit_index, request_term, request_prev_log_index, request_prev_log_term, request_entries); lock->lock(); @@ -784,7 +783,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, // Execute the RPC. lock->unlock(); - auto reply = coordination_->ExecuteOnOtherWorker( + auto reply = coordination_->ExecuteOnOtherNode( peer_id, server_id, request_term, snapshot_metadata, std::move(snapshot), snapshot_size); lock->lock(); @@ -868,7 +867,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { // Execute the RPC. lock.unlock(); // Release lock while waiting for response - auto reply = coordination_->ExecuteOnOtherWorker( + auto reply = coordination_->ExecuteOnOtherNode( peer_id, server_id, request_term, last_entry_data.first, last_entry_data.second); lock.lock(); @@ -1031,7 +1030,7 @@ void RaftServer::SetNextElectionTimePoint() { } bool RaftServer::HasMajorityVote() { - if (2 * granted_votes_ > coordination_->WorkerCount()) { + if (2 * granted_votes_ > coordination_->GetAllNodeCount()) { VLOG(40) << "Server " << server_id_ << ": Obtained majority vote (Term: " << current_term_ << ")"; return true; diff --git a/src/raft/storage_info.cpp b/src/raft/storage_info.cpp index 9643f8c61..000584458 100644 --- a/src/raft/storage_info.cpp +++ b/src/raft/storage_info.cpp @@ -55,13 +55,11 @@ std::map>> StorageInfo::GetStorageInfo() const { std::map>> info; - auto peers = coordination_->GetWorkerIds(); - - for (auto id : peers) { + for (auto id : coordination_->GetAllNodeIds()) { if (id == server_id_) { info.emplace(std::to_string(id), GetLocalStorageInfo()); } else { - auto reply = coordination_->ExecuteOnOtherWorker(id); + auto reply = coordination_->ExecuteOnOtherNode(id); if (reply) { info[std::to_string(id)] = std::move(reply->storage_info); } else {