Refactor HA coordination

Reviewers: msantl, ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1992
This commit is contained in:
Matej Ferencevic 2019-04-26 16:06:15 +02:00
parent bc46de7b33
commit c5bf7b3c03
8 changed files with 135 additions and 178 deletions

View File

@ -24,16 +24,6 @@ DEFINE_string(properties_on_disk, "",
"Property names of properties which will be stored on available "
"disk. Property names have to be separated with comma (,).");
// RPC flags.
DEFINE_VALIDATED_HIDDEN_int32(
rpc_num_client_workers, std::max(std::thread::hardware_concurrency(), 1U),
"Number of client workers (RPC)",
FLAG_IN_RANGE(1, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_HIDDEN_int32(
rpc_num_server_workers, std::max(std::thread::hardware_concurrency(), 1U),
"Number of server workers (RPC)",
FLAG_IN_RANGE(1, std::numeric_limits<uint16_t>::max()));
// High availability.
DEFINE_string(
coordination_config_file, "coordination.json",
@ -55,11 +45,6 @@ database::Config::Config()
query_execution_time_sec{FLAGS_query_execution_time_sec},
// Data location.
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")),
// RPC flags.
rpc_num_client_workers{
static_cast<uint16_t>(FLAGS_rpc_num_client_workers)},
rpc_num_server_workers{
static_cast<uint16_t>(FLAGS_rpc_num_server_workers)},
// High availability.
coordination_config_file{FLAGS_coordination_config_file},
raft_config_file{FLAGS_raft_config_file},

View File

@ -23,10 +23,6 @@ struct Config {
// set of properties which will be stored on disk
std::vector<std::string> properties_on_disk;
// RPC flags.
uint16_t rpc_num_client_workers;
uint16_t rpc_num_server_workers;
// HA flags.
std::string coordination_config_file;
std::string raft_config_file;

View File

@ -130,9 +130,8 @@ class GraphDb {
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.properties_on_disk);
raft::Coordination coordination_{
config_.rpc_num_server_workers, config_.rpc_num_client_workers,
config_.server_id,
raft::Coordination::LoadFromFile(config_.coordination_config_file)};
raft::LoadNodesFromFile(config_.coordination_config_file)};
raft::RaftServer raft_server_{
config_.server_id,
config_.durability_directory,

View File

@ -1,9 +1,6 @@
#include "raft/coordination.hpp"
#include <thread>
#include "glog/logging.h"
#include "json/json.hpp"
#include <json/json.hpp>
#include "utils/file.hpp"
#include "utils/string.hpp"
@ -12,15 +9,60 @@ namespace raft {
namespace fs = std::filesystem;
std::unordered_map<uint16_t, io::network::Endpoint> LoadNodesFromFile(
const std::string &coordination_config_file) {
if (!fs::exists(coordination_config_file))
throw RaftCoordinationConfigException("file (" + coordination_config_file +
") doesn't exist");
std::unordered_map<uint16_t, io::network::Endpoint> nodes;
nlohmann::json data;
try {
data = nlohmann::json::parse(
utils::Join(utils::ReadLines(coordination_config_file), ""));
} catch (const nlohmann::json::parse_error &e) {
throw RaftCoordinationConfigException("invalid json");
}
if (!data.is_array()) throw RaftCoordinationConfigException("not an array");
for (auto &it : data) {
if (!it.is_array())
throw RaftCoordinationConfigException("element not an array");
if (it.size() != 3)
throw RaftCoordinationConfigException("invalid number of subelements");
if (!it[0].is_number_unsigned() || !it[1].is_string() ||
!it[2].is_number_unsigned())
throw RaftCoordinationConfigException("subelement data is invalid");
nodes[it[0]] = io::network::Endpoint{it[1], it[2]};
}
return nodes;
}
Coordination::Coordination(
uint16_t server_workers_count, uint16_t client_workers_count,
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers)
: server_(workers[worker_id], server_workers_count),
worker_id_(worker_id),
workers_(workers) {
for (const auto &worker : workers_) {
client_locks_[worker.first] = std::make_unique<std::mutex>();
uint16_t node_id,
std::unordered_map<uint16_t, io::network::Endpoint> all_nodes)
: node_id_(node_id),
cluster_size_(all_nodes.size()),
server_(all_nodes[node_id], all_nodes.size() * 2) {
// Create all client elements.
endpoints_.resize(cluster_size_);
clients_.resize(cluster_size_);
client_locks_.resize(cluster_size_);
// Initialize all client elements.
for (uint16_t i = 1; i <= cluster_size_; ++i) {
auto it = all_nodes.find(i);
if (it == all_nodes.end()) {
throw RaftCoordinationConfigException("missing endpoint for node " +
std::to_string(i));
}
endpoints_[i - 1] = it->second;
client_locks_[i - 1] = std::make_unique<std::mutex>();
}
}
@ -28,59 +70,30 @@ Coordination::~Coordination() {
CHECK(!alive_) << "You must call Shutdown and AwaitShutdown on Coordination!";
}
std::unordered_map<uint16_t, io::network::Endpoint> Coordination::LoadFromFile(
const std::string &coordination_config_file) {
if (!fs::exists(coordination_config_file))
throw RaftCoordinationConfigException(coordination_config_file);
std::unordered_map<uint16_t, io::network::Endpoint> workers;
nlohmann::json data;
try {
data = nlohmann::json::parse(
utils::Join(utils::ReadLines(coordination_config_file), ""));
} catch (const nlohmann::json::parse_error &e) {
throw RaftCoordinationConfigException(coordination_config_file);
std::vector<uint16_t> Coordination::GetAllNodeIds() {
std::vector<uint16_t> ret;
ret.reserve(cluster_size_);
for (uint16_t i = 1; i <= cluster_size_; ++i) {
ret.push_back(i);
}
return ret;
}
if (!data.is_array())
throw RaftCoordinationConfigException(coordination_config_file);
for (auto &it : data) {
if (!it.is_array() || it.size() != 3)
throw RaftCoordinationConfigException(coordination_config_file);
workers[it[0]] = io::network::Endpoint{it[1], it[2]};
std::vector<uint16_t> Coordination::GetOtherNodeIds() {
std::vector<uint16_t> ret;
ret.reserve(cluster_size_ - 1);
for (uint16_t i = 1; i <= cluster_size_; ++i) {
if (i == node_id_) continue;
ret.push_back(i);
}
return workers;
return ret;
}
io::network::Endpoint Coordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
auto found = workers_.find(worker_id);
CHECK(found != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
return found->second;
}
uint16_t Coordination::GetAllNodeCount() { return cluster_size_; }
io::network::Endpoint Coordination::GetServerEndpoint() {
return server_.endpoint();
}
uint16_t Coordination::GetOtherNodeCount() { return cluster_size_ - 1; }
std::vector<int> Coordination::GetWorkerIds() {
std::lock_guard<std::mutex> guard(lock_);
std::vector<int> worker_ids;
for (auto worker : workers_) worker_ids.push_back(worker.first);
return worker_ids;
}
uint16_t Coordination::WorkerCount() { return workers_.size(); }
bool Coordination::Start() {
if (!server_.Start()) return false;
AddWorker(worker_id_, server_.endpoint());
return true;
}
bool Coordination::Start() { return server_.Start(); }
void Coordination::AwaitShutdown(
std::function<void(void)> call_before_shutdown) {
@ -99,20 +112,4 @@ void Coordination::AwaitShutdown(
void Coordination::Shutdown() { alive_.store(false); }
std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
for (const auto &worker : workers_) {
if (worker.second == endpoint) {
return fmt::format("worker {} ({})", worker.first, worker.second);
}
}
return fmt::format("unknown worker ({})", endpoint);
}
void Coordination::AddWorker(int worker_id,
const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
workers_.insert({worker_id, endpoint});
}
} // namespace raft

View File

@ -3,45 +3,48 @@
#pragma once
#include <atomic>
#include <filesystem>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include "communication/rpc/client.hpp"
#include "communication/rpc/server.hpp"
#include "io/network/endpoint.hpp"
#include "raft/exceptions.hpp"
#include "utils/thread.hpp"
namespace raft {
/// This class is responsible for coordination between workers (nodes) within
/// the Raft cluster. Its implementation is quite similar to coordination
/// in distributed Memgraph apart from slight modifications which align more
/// closely to Raft.
/// Loads raft cluster configuration from file.
///
/// File format:
/// [[node_id, "node_address", node_port], ...]
std::unordered_map<uint16_t, io::network::Endpoint> LoadNodesFromFile(
const std::string &coordination_config_file);
/// This class is responsible for coordination between nodes within the Raft
/// cluster. Its implementation is quite similar to coordination in distributed
/// Memgraph apart from slight modifications which align more closely to Raft.
///
/// It should be noted that, in the context of communication, all nodes within
/// the Raft cluster are considered equivalent and are henceforth known simply
/// as workers.
/// as nodes.
///
/// This class is thread safe.
class Coordination final {
public:
/// Class constructor
///
/// @param server_workers_count Number of workers in RPC Server.
/// @param client_workers_count Number of workers in RPC Client.
/// @param worker_id ID of Raft worker (node) on this machine.
/// @param workers mapping from worker id to endpoint information.
Coordination(uint16_t server_workers_count, uint16_t client_workers_count,
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers);
/// @param node_id ID of Raft node on this machine.
/// @param node mapping from node_id to endpoint information (for the whole
/// cluster).
Coordination(uint16_t node_id,
std::unordered_map<uint16_t, io::network::Endpoint> all_nodes);
~Coordination();
@ -50,59 +53,46 @@ class Coordination final {
Coordination &operator=(const Coordination &) = delete;
Coordination &operator=(Coordination &&) = delete;
/// Gets the endpoint for the given `worker_id`.
io::network::Endpoint GetEndpoint(int worker_id);
/// Returns all node IDs.
std::vector<uint16_t> GetAllNodeIds();
/// Gets the endpoint for this RPC server.
io::network::Endpoint GetServerEndpoint();
/// Returns other node IDs (excluding this node).
std::vector<uint16_t> GetOtherNodeIds();
/// Returns all workers ids.
std::vector<int> GetWorkerIds();
/// Returns total number of nodes.
uint16_t GetAllNodeCount();
uint16_t WorkerCount();
/// Returns number of other nodes.
uint16_t GetOtherNodeCount();
/// Executes a RPC on another worker in the cluster. If the RPC execution
/// Executes a RPC on another node in the cluster. If the RPC execution
/// fails (because of underlying network issues) it returns a `std::nullopt`.
template <class TRequestResponse, class... Args>
std::optional<typename TRequestResponse::Response> ExecuteOnOtherWorker(
uint16_t worker_id, Args &&... args) {
CHECK(worker_id != worker_id_) << "Trying to execute RPC on self!";
std::optional<typename TRequestResponse::Response> ExecuteOnOtherNode(
uint16_t other_id, Args &&... args) {
CHECK(other_id != node_id_) << "Trying to execute RPC on self!";
CHECK(other_id >= 1 && other_id <= cluster_size_) << "Invalid node id!";
communication::rpc::Client *client = nullptr;
std::mutex *client_lock = nullptr;
{
std::lock_guard<std::mutex> guard(lock_);
auto &lock = *client_locks_[other_id - 1].get();
auto &client = clients_[other_id - 1];
auto found = clients_.find(worker_id);
if (found != clients_.end()) {
client = &found->second;
} else {
auto found_endpoint = workers_.find(worker_id);
CHECK(found_endpoint != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
auto &endpoint = found_endpoint->second;
auto it = clients_.emplace(worker_id, endpoint);
client = &it.first->second;
}
std::lock_guard<std::mutex> guard(lock);
auto lock_found = client_locks_.find(worker_id);
CHECK(lock_found != client_locks_.end())
<< "No client lock for worker id: " << worker_id;
client_lock = lock_found->second.get();
if (!client) {
const auto &endpoint = endpoints_[other_id - 1];
client = std::make_unique<communication::rpc::Client>(endpoint);
}
try {
std::lock_guard<std::mutex> guard(*client_lock);
return client->Call<TRequestResponse>(std::forward<Args>(args)...);
} catch (...) {
// Invalidate the client so that we reconnect next time.
std::lock_guard<std::mutex> guard(lock_);
CHECK(clients_.erase(worker_id) == 1)
<< "Couldn't remove client for worker id: " << worker_id;
client = nullptr;
return std::nullopt;
}
}
/// Registers a RPC call on this node.
template <class TRequestResponse>
void Register(std::function<
void(const typename TRequestResponse::Request::Capnp::Reader &,
@ -111,6 +101,7 @@ class Coordination final {
server_.Register<TRequestResponse>(callback);
}
/// Registers an extended RPC call on this node.
template <class TRequestResponse>
void Register(std::function<
void(const io::network::Endpoint &,
@ -120,35 +111,27 @@ class Coordination final {
server_.Register<TRequestResponse>(callback);
}
static std::unordered_map<uint16_t, io::network::Endpoint> LoadFromFile(
const std::string &coordination_config_file);
/// Starts the coordination and its servers.
bool Start();
/// Blocks until the coordination is shut down. Accepts a callback function
/// that is called to clean up all services that should be stopped before the
/// coordination.
void AwaitShutdown(std::function<void(void)> call_before_shutdown);
/// Hints that the coordination should start shutting down the whole cluster.
void Shutdown();
/// Gets a worker name for the given endpoint.
std::string GetWorkerName(const io::network::Endpoint &endpoint);
private:
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
void AddWorker(int worker_id, const io::network::Endpoint &endpoint);
uint16_t node_id_;
uint16_t cluster_size_;
communication::rpc::Server server_;
uint16_t worker_id_;
mutable std::mutex lock_;
std::unordered_map<uint16_t, io::network::Endpoint> workers_;
std::vector<io::network::Endpoint> endpoints_;
std::vector<std::unique_ptr<communication::rpc::Client>> clients_;
std::vector<std::unique_ptr<std::mutex>> client_locks_;
std::unordered_map<uint16_t, communication::rpc::Client> clients_;
std::unordered_map<uint16_t, std::unique_ptr<std::mutex>> client_locks_;
// Flags used for shutdown.
std::atomic<bool> alive_{true};
};

View File

@ -42,9 +42,9 @@ class RaftConfigException : public RaftException {
class RaftCoordinationConfigException : public RaftException {
public:
using RaftException::RaftException;
explicit RaftCoordinationConfigException(const std::string &path)
: RaftException("Unable to parse raft coordination config file " + path) {
}
explicit RaftCoordinationConfigException(const std::string &msg)
: RaftException("Unable to parse raft coordination config file: " + msg +
"!") {}
};
/// This exception should be thrown when a `RaftServer` instance attempts

View File

@ -75,7 +75,7 @@ void RaftServer::Start() {
}
// Peer state initialization
int cluster_size = coordination_->WorkerCount() + 1;
auto cluster_size = coordination_->GetAllNodeCount() + 1;
next_index_.resize(cluster_size);
match_index_.resize(cluster_size);
next_heartbeat_.resize(cluster_size);
@ -299,8 +299,7 @@ void RaftServer::Start() {
SetNextElectionTimePoint();
election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this);
for (const auto &peer_id : coordination_->GetWorkerIds()) {
if (peer_id == server_id_) continue;
for (auto peer_id : coordination_->GetOtherNodeIds()) {
peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id);
}
@ -574,7 +573,7 @@ void RaftServer::Transition(const Mode &new_mode) {
SetVotedFor(server_id_);
granted_votes_ = 1;
vote_requested_.assign(coordination_->WorkerCount(), false);
vote_requested_.assign(coordination_->GetAllNodeCount(), false);
mode_ = Mode::CANDIDATE;
@ -602,7 +601,7 @@ void RaftServer::Transition(const Mode &new_mode) {
// [Raft paper figure 2]
// "For each server, index of the next log entry to send to that server
// is initialized to leader's last log index + 1"
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
for (int i = 1; i <= coordination_->GetAllNodeCount(); ++i) {
next_index_[i] = log_size_;
match_index_[i] = 0;
}
@ -627,7 +626,7 @@ void RaftServer::AdvanceCommitIndex() {
<< "Commit index can only be advanced by the leader";
std::vector<uint64_t> known_replication_indices;
for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
for (int i = 1; i <= coordination_->GetAllNodeCount(); ++i) {
if (i != server_id_)
known_replication_indices.push_back(match_index_[i]);
else
@ -636,7 +635,7 @@ void RaftServer::AdvanceCommitIndex() {
std::sort(known_replication_indices.begin(), known_replication_indices.end());
uint64_t new_commit_index =
known_replication_indices[(coordination_->WorkerCount() - 1) / 2];
known_replication_indices[(coordination_->GetAllNodeCount() - 1) / 2];
// This can happen because we reset `match_index` vector to 0 after a
// new leader has been elected.
@ -709,7 +708,7 @@ void RaftServer::SendLogEntries(
// Execute the RPC.
lock->unlock();
auto reply = coordination_->ExecuteOnOtherWorker<AppendEntriesRpc>(
auto reply = coordination_->ExecuteOnOtherNode<AppendEntriesRpc>(
peer_id, server_id, commit_index, request_term, request_prev_log_index,
request_prev_log_term, request_entries);
lock->lock();
@ -784,7 +783,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
// Execute the RPC.
lock->unlock();
auto reply = coordination_->ExecuteOnOtherWorker<InstallSnapshotRpc>(
auto reply = coordination_->ExecuteOnOtherNode<InstallSnapshotRpc>(
peer_id, server_id, request_term, snapshot_metadata, std::move(snapshot),
snapshot_size);
lock->lock();
@ -868,7 +867,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
// Execute the RPC.
lock.unlock(); // Release lock while waiting for response
auto reply = coordination_->ExecuteOnOtherWorker<RequestVoteRpc>(
auto reply = coordination_->ExecuteOnOtherNode<RequestVoteRpc>(
peer_id, server_id, request_term, last_entry_data.first,
last_entry_data.second);
lock.lock();
@ -1031,7 +1030,7 @@ void RaftServer::SetNextElectionTimePoint() {
}
bool RaftServer::HasMajorityVote() {
if (2 * granted_votes_ > coordination_->WorkerCount()) {
if (2 * granted_votes_ > coordination_->GetAllNodeCount()) {
VLOG(40) << "Server " << server_id_
<< ": Obtained majority vote (Term: " << current_term_ << ")";
return true;

View File

@ -55,13 +55,11 @@ std::map<std::string, std::vector<std::pair<std::string, std::string>>>
StorageInfo::GetStorageInfo() const {
std::map<std::string, std::vector<std::pair<std::string, std::string>>> info;
auto peers = coordination_->GetWorkerIds();
for (auto id : peers) {
for (auto id : coordination_->GetAllNodeIds()) {
if (id == server_id_) {
info.emplace(std::to_string(id), GetLocalStorageInfo());
} else {
auto reply = coordination_->ExecuteOnOtherWorker<StorageInfoRpc>(id);
auto reply = coordination_->ExecuteOnOtherNode<StorageInfoRpc>(id);
if (reply) {
info[std::to_string(id)] = std::move(reply->storage_info);
} else {