Parse raft config
Summary: Added command line parameters to specify rpc flags, raft and coordination config files and current server id. Reviewers: ipaljak, teon.banek Reviewed By: ipaljak, teon.banek Subscribers: pullbot, teon.banek Differential Revision: https://phabricator.memgraph.io/D1742
This commit is contained in:
parent
6dbe054bdc
commit
dd6fe013dc
@ -36,6 +36,28 @@ DEFINE_bool(synchronous_commit, false,
|
||||
"Should a transaction end wait for WAL records to be written to "
|
||||
"disk before the transaction finishes.");
|
||||
|
||||
// RPC flags.
|
||||
DEFINE_VALIDATED_HIDDEN_int32(
|
||||
rpc_num_client_workers, std::max(std::thread::hardware_concurrency(), 1U),
|
||||
"Number of client workers (RPC)",
|
||||
FLAG_IN_RANGE(1, std::numeric_limits<uint16_t>::max()));
|
||||
DEFINE_VALIDATED_HIDDEN_int32(
|
||||
rpc_num_server_workers, std::max(std::thread::hardware_concurrency(), 1U),
|
||||
"Number of server workers (RPC)",
|
||||
FLAG_IN_RANGE(1, std::numeric_limits<uint16_t>::max()));
|
||||
|
||||
// High availability.
|
||||
DEFINE_string(
|
||||
coordination_config_file, "coordination.json",
|
||||
"Path to the file containing coordination configuration in JSON format");
|
||||
|
||||
DEFINE_string(raft_config_file, "raft.json",
|
||||
"Path to the file containing raft configuration in JSON format");
|
||||
|
||||
DEFINE_VALIDATED_int32(
|
||||
server_id, 1U, "Id used in the coordination configuration for this machine",
|
||||
FLAG_IN_RANGE(1, std::numeric_limits<uint16_t>::max()));
|
||||
|
||||
database::Config::Config()
|
||||
// Durability flags.
|
||||
: durability_enabled{FLAGS_durability_enabled},
|
||||
@ -49,4 +71,13 @@ database::Config::Config()
|
||||
gc_cycle_sec{FLAGS_gc_cycle_sec},
|
||||
query_execution_time_sec{FLAGS_query_execution_time_sec},
|
||||
// Data location.
|
||||
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")) {}
|
||||
properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")),
|
||||
// RPC flags.
|
||||
rpc_num_client_workers{
|
||||
static_cast<uint16_t>(FLAGS_rpc_num_client_workers)},
|
||||
rpc_num_server_workers{
|
||||
static_cast<uint16_t>(FLAGS_rpc_num_server_workers)},
|
||||
// High availability.
|
||||
coordination_config_file{FLAGS_coordination_config_file},
|
||||
raft_config_file{FLAGS_raft_config_file},
|
||||
server_id{static_cast<uint16_t>(FLAGS_server_id)} {}
|
||||
|
@ -10,6 +10,8 @@
|
||||
#include "durability/single_node_ha/recovery.hpp"
|
||||
#include "durability/single_node_ha/wal.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "raft/coordination.hpp"
|
||||
#include "raft/raft_server.hpp"
|
||||
#include "storage/common/types/types.hpp"
|
||||
#include "storage/single_node_ha/concurrent_id_mapper.hpp"
|
||||
#include "storage/single_node_ha/storage.hpp"
|
||||
@ -27,7 +29,7 @@ struct Stat {
|
||||
|
||||
/// Vertex count is number of `VersionList<Vertex>` physically stored.
|
||||
std::atomic<int64_t> vertex_count{0};
|
||||
|
||||
|
||||
/// Vertex count is number of `VersionList<Edge>` physically stored.
|
||||
std::atomic<int64_t> edge_count{0};
|
||||
|
||||
@ -55,6 +57,15 @@ struct Config {
|
||||
|
||||
// set of properties which will be stored on disk
|
||||
std::vector<std::string> properties_on_disk;
|
||||
|
||||
// RPC flags.
|
||||
uint16_t rpc_num_client_workers;
|
||||
uint16_t rpc_num_server_workers;
|
||||
|
||||
// HA flags.
|
||||
std::string coordination_config_file;
|
||||
std::string raft_config_file;
|
||||
uint16_t server_id;
|
||||
};
|
||||
|
||||
class GraphDbAccessor;
|
||||
@ -132,7 +143,7 @@ class GraphDb {
|
||||
if (vertex_count != 0) {
|
||||
stat_.avg_degree = 2 * static_cast<double>(edge_count) / vertex_count;
|
||||
} else {
|
||||
stat_.avg_degree = 0;
|
||||
stat_.avg_degree = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -151,6 +162,13 @@ class GraphDb {
|
||||
config_.durability_enabled,
|
||||
config_.synchronous_commit};
|
||||
|
||||
raft::Coordination coordination_{
|
||||
config_.rpc_num_server_workers, config_.rpc_num_client_workers,
|
||||
config_.server_id,
|
||||
raft::Coordination::LoadFromFile(config_.coordination_config_file)};
|
||||
raft::RaftServer raft_server_{
|
||||
config_.server_id, config_.durability_directory,
|
||||
raft::Config::LoadFromFile(config_.raft_config_file), &coordination_};
|
||||
tx::Engine tx_engine_{&wal_};
|
||||
std::unique_ptr<StorageGc> storage_gc_ =
|
||||
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);
|
||||
|
@ -4,16 +4,50 @@
|
||||
|
||||
#include <chrono>
|
||||
#include <experimental/filesystem>
|
||||
#include <ratio>
|
||||
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
/// Configurable Raft parameters.
|
||||
struct Config {
|
||||
std::experimental::filesystem::path disk_storage_path;
|
||||
std::chrono::milliseconds leader_timeout_min;
|
||||
std::chrono::milliseconds leader_timeout_max;
|
||||
std::chrono::milliseconds heartbeat_interval;
|
||||
std::chrono::milliseconds replicate_timeout;
|
||||
|
||||
static Config LoadFromFile(const std::string &raft_config_file) {
|
||||
if (!std::experimental::filesystem::exists(raft_config_file))
|
||||
throw RaftConfigException(raft_config_file);
|
||||
|
||||
nlohmann::json data;
|
||||
try {
|
||||
data = nlohmann::json::parse(
|
||||
utils::Join(utils::ReadLines(raft_config_file), ""));
|
||||
} catch (const nlohmann::json::parse_error &e) {
|
||||
throw RaftConfigException(raft_config_file);
|
||||
}
|
||||
|
||||
if (!data.is_object()) throw RaftConfigException(raft_config_file);
|
||||
if (!data["leader_timeout_min"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["leader_timeout_max"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["heartbeat_interval"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
if (!data["replicate_timeout"].is_number())
|
||||
throw RaftConfigException(raft_config_file);
|
||||
|
||||
return Config{std::chrono::duration<int64_t>(data["leader_timeout_min"]),
|
||||
std::chrono::duration<int64_t>(data["leader_timeout_max"]),
|
||||
std::chrono::duration<int64_t>(data["heartbeat_interval"]),
|
||||
std::chrono::duration<int64_t>(data["replicate_timeout"])};
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace raft
|
||||
|
@ -1,20 +1,52 @@
|
||||
#include "glog/logging.h"
|
||||
#include "raft/coordination.hpp"
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "raft/coordination.hpp"
|
||||
#include "glog/logging.h"
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
Coordination::Coordination(
|
||||
uint16_t server_workers_count, uint16_t client_workers_count,
|
||||
uint16_t worker_id,
|
||||
std::unordered_map<uint16_t, io::network::Endpoint> workers)
|
||||
: server_(workers[worker_id], server_workers_count),
|
||||
workers_(workers),
|
||||
worker_id_(worker_id),
|
||||
thread_pool_(client_workers_count, "RPC client") {}
|
||||
|
||||
std::unordered_map<uint16_t, io::network::Endpoint> Coordination::LoadFromFile(
|
||||
const std::string &coordination_config_file) {
|
||||
if (!fs::exists(coordination_config_file))
|
||||
throw RaftCoordinationConfigException(coordination_config_file);
|
||||
|
||||
std::unordered_map<uint16_t, io::network::Endpoint> workers;
|
||||
nlohmann::json data;
|
||||
try {
|
||||
data = nlohmann::json::parse(
|
||||
utils::Join(utils::ReadLines(coordination_config_file), ""));
|
||||
} catch (const nlohmann::json::parse_error &e) {
|
||||
throw RaftCoordinationConfigException(coordination_config_file);
|
||||
}
|
||||
|
||||
if (!data.is_array())
|
||||
throw RaftCoordinationConfigException(coordination_config_file);
|
||||
|
||||
for (auto &it : data) {
|
||||
if (!it.is_array() || it.size() != 3)
|
||||
throw RaftCoordinationConfigException(coordination_config_file);
|
||||
|
||||
workers[it[0]] = io::network::Endpoint{it[1], it[2]};
|
||||
}
|
||||
|
||||
return workers;
|
||||
}
|
||||
|
||||
io::network::Endpoint Coordination::GetEndpoint(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto found = workers_.find(worker_id);
|
||||
@ -52,11 +84,7 @@ std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
for (const auto &worker : workers_) {
|
||||
if (worker.second == endpoint) {
|
||||
if (worker.first == 0) {
|
||||
return fmt::format("master ({})", worker.second);
|
||||
} else {
|
||||
return fmt::format("worker {} ({})", worker.first, worker.second);
|
||||
}
|
||||
return fmt::format("worker {} ({})", worker.first, worker.second);
|
||||
}
|
||||
}
|
||||
return fmt::format("unknown worker ({})", endpoint);
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <experimental/filesystem>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
@ -12,6 +13,7 @@
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "utils/future.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
|
||||
@ -29,21 +31,20 @@ namespace raft {
|
||||
*
|
||||
* This class is thread safe.
|
||||
*/
|
||||
class Coordination {
|
||||
protected:
|
||||
class Coordination final {
|
||||
public:
|
||||
/**
|
||||
* Class constructor
|
||||
*
|
||||
* @param server_workers_count Number of workers in RPC Server.
|
||||
* @param client_workers_count Number of workers in RPC Client.
|
||||
* @param worker_id ID of Raft worker (node) on this machine.
|
||||
* @param workers Mapping between Raft worker_ids to their network addresses.
|
||||
* @param coordination_config_file file that contains coordination config.
|
||||
*/
|
||||
Coordination(uint16_t sever_workers_count, uint16_t client_workers_count,
|
||||
Coordination(uint16_t server_workers_count, uint16_t client_workers_count,
|
||||
uint16_t worker_id,
|
||||
std::unordered_map<uint16_t, io::network::Endpoint> workers);
|
||||
|
||||
public:
|
||||
/// Gets the endpoint for the given `worker_id`.
|
||||
io::network::Endpoint GetEndpoint(int worker_id);
|
||||
|
||||
@ -82,6 +83,9 @@ class Coordination {
|
||||
server_.Register<TRequestResponse>(callback);
|
||||
}
|
||||
|
||||
static std::unordered_map<uint16_t, io::network::Endpoint> LoadFromFile(
|
||||
const std::string &coordination_config_file);
|
||||
|
||||
protected:
|
||||
/// Adds a worker to the coordination. This function can be called multiple
|
||||
/// times to replace an existing worker.
|
||||
@ -95,7 +99,6 @@ class Coordination {
|
||||
private:
|
||||
std::unordered_map<uint16_t, io::network::Endpoint> workers_;
|
||||
mutable std::mutex lock_;
|
||||
uint16_t worker_id_;
|
||||
|
||||
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
|
||||
utils::ThreadPool thread_pool_;
|
||||
|
@ -28,4 +28,27 @@ class InvalidTransitionException : public RaftException {
|
||||
new_mode) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Exception used to indicate something is wrong with the raft config provided
|
||||
* by the user.
|
||||
*/
|
||||
class RaftConfigException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
explicit RaftConfigException(const std::string &path)
|
||||
: RaftException("Unable to parse raft config file " + path) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Exception used to indicate something is wrong with the coordination config
|
||||
* provided by the user.
|
||||
*/
|
||||
class RaftCoordinationConfigException : public RaftException {
|
||||
public:
|
||||
using RaftException::RaftException;
|
||||
explicit RaftCoordinationConfigException(const std::string &path)
|
||||
: RaftException("Unable to parse raft coordination config file " + path) {
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace raft
|
||||
|
@ -1,23 +1,27 @@
|
||||
#include "raft/raft_server.hpp"
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "raft/raft_rpc_messages.hpp"
|
||||
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
#include <experimental/filesystem>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
#include "raft/coordination.hpp"
|
||||
#include "raft/exceptions.hpp"
|
||||
#include "raft/raft_rpc_messages.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
|
||||
namespace raft {
|
||||
|
||||
RaftServer::RaftServer(uint16_t server_id, const Config &config,
|
||||
Coordination *coordination)
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
const std::string kRaftDir = "raft";
|
||||
|
||||
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
const Config &config, Coordination *coordination)
|
||||
: config_(config),
|
||||
server_id_(server_id),
|
||||
disk_storage_(config.disk_storage_path) {
|
||||
disk_storage_(fs::path(durability_dir) / kRaftDir) {
|
||||
coordination->Register<RequestVoteRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
throw utils::NotYetImplemented("RaftServer constructor");
|
||||
|
@ -3,12 +3,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "raft/config.hpp"
|
||||
#include "raft/coordination.hpp"
|
||||
|
||||
#include "storage/common/kvstore/kvstore.hpp"
|
||||
|
||||
namespace raft {
|
||||
|
||||
// Forward declaration.
|
||||
class Coordination;
|
||||
|
||||
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
|
||||
|
||||
/**
|
||||
@ -26,11 +28,12 @@ class RaftServer {
|
||||
* ranging from 1 to cluster_size.
|
||||
*
|
||||
* @param server_id ID of the current server.
|
||||
* @param config Configurable Raft parameters (e.g. timeout interval)
|
||||
* @param durbility_dir directory for persisted data.
|
||||
* @param config raft configuration.
|
||||
* @param coordination Abstraction for coordination between Raft servers.
|
||||
*/
|
||||
RaftServer(uint16_t server_id, const raft::Config &config,
|
||||
raft::Coordination *coordination);
|
||||
RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
const Config &config, raft::Coordination *coordination);
|
||||
|
||||
private:
|
||||
/** volatile state on all servers **/
|
||||
|
Loading…
Reference in New Issue
Block a user