From dd6fe013dc140d20ec319cca266132fb18770398 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 21 Nov 2018 15:49:04 +0100 Subject: [PATCH] Parse raft config Summary: Added command line parameters to specify rpc flags, raft and coordination config files and current server id. Reviewers: ipaljak, teon.banek Reviewed By: ipaljak, teon.banek Subscribers: pullbot, teon.banek Differential Revision: https://phabricator.memgraph.io/D1742 --- src/database/single_node_ha/config.cpp | 33 +++++++++++++++++- src/database/single_node_ha/graph_db.hpp | 22 ++++++++++-- src/raft/config.hpp | 36 ++++++++++++++++++- src/raft/coordination.cpp | 44 +++++++++++++++++++----- src/raft/coordination.hpp | 15 ++++---- src/raft/exceptions.hpp | 23 +++++++++++++ src/raft/raft_server.cpp | 20 ++++++----- src/raft/raft_server.hpp | 11 +++--- 8 files changed, 174 insertions(+), 30 deletions(-) diff --git a/src/database/single_node_ha/config.cpp b/src/database/single_node_ha/config.cpp index fbab720f0..bfa683771 100644 --- a/src/database/single_node_ha/config.cpp +++ b/src/database/single_node_ha/config.cpp @@ -36,6 +36,28 @@ DEFINE_bool(synchronous_commit, false, "Should a transaction end wait for WAL records to be written to " "disk before the transaction finishes."); +// RPC flags. +DEFINE_VALIDATED_HIDDEN_int32( + rpc_num_client_workers, std::max(std::thread::hardware_concurrency(), 1U), + "Number of client workers (RPC)", + FLAG_IN_RANGE(1, std::numeric_limits::max())); +DEFINE_VALIDATED_HIDDEN_int32( + rpc_num_server_workers, std::max(std::thread::hardware_concurrency(), 1U), + "Number of server workers (RPC)", + FLAG_IN_RANGE(1, std::numeric_limits::max())); + +// High availability. +DEFINE_string( + coordination_config_file, "coordination.json", + "Path to the file containing coordination configuration in JSON format"); + +DEFINE_string(raft_config_file, "raft.json", + "Path to the file containing raft configuration in JSON format"); + +DEFINE_VALIDATED_int32( + server_id, 1U, "Id used in the coordination configuration for this machine", + FLAG_IN_RANGE(1, std::numeric_limits::max())); + database::Config::Config() // Durability flags. : durability_enabled{FLAGS_durability_enabled}, @@ -49,4 +71,13 @@ database::Config::Config() gc_cycle_sec{FLAGS_gc_cycle_sec}, query_execution_time_sec{FLAGS_query_execution_time_sec}, // Data location. - properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")) {} + properties_on_disk(utils::Split(FLAGS_properties_on_disk, ",")), + // RPC flags. + rpc_num_client_workers{ + static_cast(FLAGS_rpc_num_client_workers)}, + rpc_num_server_workers{ + static_cast(FLAGS_rpc_num_server_workers)}, + // High availability. + coordination_config_file{FLAGS_coordination_config_file}, + raft_config_file{FLAGS_raft_config_file}, + server_id{static_cast(FLAGS_server_id)} {} diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 8f145351c..5ebb89bd5 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -10,6 +10,8 @@ #include "durability/single_node_ha/recovery.hpp" #include "durability/single_node_ha/wal.hpp" #include "io/network/endpoint.hpp" +#include "raft/coordination.hpp" +#include "raft/raft_server.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/concurrent_id_mapper.hpp" #include "storage/single_node_ha/storage.hpp" @@ -27,7 +29,7 @@ struct Stat { /// Vertex count is number of `VersionList` physically stored. std::atomic vertex_count{0}; - + /// Vertex count is number of `VersionList` physically stored. std::atomic edge_count{0}; @@ -55,6 +57,15 @@ struct Config { // set of properties which will be stored on disk std::vector properties_on_disk; + + // RPC flags. + uint16_t rpc_num_client_workers; + uint16_t rpc_num_server_workers; + + // HA flags. + std::string coordination_config_file; + std::string raft_config_file; + uint16_t server_id; }; class GraphDbAccessor; @@ -132,7 +143,7 @@ class GraphDb { if (vertex_count != 0) { stat_.avg_degree = 2 * static_cast(edge_count) / vertex_count; } else { - stat_.avg_degree = 0; + stat_.avg_degree = 0; } } @@ -151,6 +162,13 @@ class GraphDb { config_.durability_enabled, config_.synchronous_commit}; + raft::Coordination coordination_{ + config_.rpc_num_server_workers, config_.rpc_num_client_workers, + config_.server_id, + raft::Coordination::LoadFromFile(config_.coordination_config_file)}; + raft::RaftServer raft_server_{ + config_.server_id, config_.durability_directory, + raft::Config::LoadFromFile(config_.raft_config_file), &coordination_}; tx::Engine tx_engine_{&wal_}; std::unique_ptr storage_gc_ = std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); diff --git a/src/raft/config.hpp b/src/raft/config.hpp index dcf428009..2800e94dc 100644 --- a/src/raft/config.hpp +++ b/src/raft/config.hpp @@ -4,16 +4,50 @@ #include #include +#include + +#include + +#include "raft/exceptions.hpp" +#include "utils/file.hpp" +#include "utils/string.hpp" namespace raft { /// Configurable Raft parameters. struct Config { - std::experimental::filesystem::path disk_storage_path; std::chrono::milliseconds leader_timeout_min; std::chrono::milliseconds leader_timeout_max; std::chrono::milliseconds heartbeat_interval; std::chrono::milliseconds replicate_timeout; + + static Config LoadFromFile(const std::string &raft_config_file) { + if (!std::experimental::filesystem::exists(raft_config_file)) + throw RaftConfigException(raft_config_file); + + nlohmann::json data; + try { + data = nlohmann::json::parse( + utils::Join(utils::ReadLines(raft_config_file), "")); + } catch (const nlohmann::json::parse_error &e) { + throw RaftConfigException(raft_config_file); + } + + if (!data.is_object()) throw RaftConfigException(raft_config_file); + if (!data["leader_timeout_min"].is_number()) + throw RaftConfigException(raft_config_file); + if (!data["leader_timeout_max"].is_number()) + throw RaftConfigException(raft_config_file); + if (!data["heartbeat_interval"].is_number()) + throw RaftConfigException(raft_config_file); + if (!data["replicate_timeout"].is_number()) + throw RaftConfigException(raft_config_file); + + return Config{std::chrono::duration(data["leader_timeout_min"]), + std::chrono::duration(data["leader_timeout_max"]), + std::chrono::duration(data["heartbeat_interval"]), + std::chrono::duration(data["replicate_timeout"])}; + } }; } // namespace raft diff --git a/src/raft/coordination.cpp b/src/raft/coordination.cpp index 150d72d4c..58da59be6 100644 --- a/src/raft/coordination.cpp +++ b/src/raft/coordination.cpp @@ -1,20 +1,52 @@ -#include "glog/logging.h" +#include "raft/coordination.hpp" #include -#include "raft/coordination.hpp" +#include "glog/logging.h" +#include "json/json.hpp" + +#include "utils/file.hpp" +#include "utils/string.hpp" namespace raft { +namespace fs = std::experimental::filesystem; + Coordination::Coordination( uint16_t server_workers_count, uint16_t client_workers_count, uint16_t worker_id, std::unordered_map workers) : server_(workers[worker_id], server_workers_count), workers_(workers), - worker_id_(worker_id), thread_pool_(client_workers_count, "RPC client") {} +std::unordered_map Coordination::LoadFromFile( + const std::string &coordination_config_file) { + if (!fs::exists(coordination_config_file)) + throw RaftCoordinationConfigException(coordination_config_file); + + std::unordered_map workers; + nlohmann::json data; + try { + data = nlohmann::json::parse( + utils::Join(utils::ReadLines(coordination_config_file), "")); + } catch (const nlohmann::json::parse_error &e) { + throw RaftCoordinationConfigException(coordination_config_file); + } + + if (!data.is_array()) + throw RaftCoordinationConfigException(coordination_config_file); + + for (auto &it : data) { + if (!it.is_array() || it.size() != 3) + throw RaftCoordinationConfigException(coordination_config_file); + + workers[it[0]] = io::network::Endpoint{it[1], it[2]}; + } + + return workers; +} + io::network::Endpoint Coordination::GetEndpoint(int worker_id) { std::lock_guard guard(lock_); auto found = workers_.find(worker_id); @@ -52,11 +84,7 @@ std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) { std::lock_guard guard(lock_); for (const auto &worker : workers_) { if (worker.second == endpoint) { - if (worker.first == 0) { - return fmt::format("master ({})", worker.second); - } else { - return fmt::format("worker {} ({})", worker.first, worker.second); - } + return fmt::format("worker {} ({})", worker.first, worker.second); } } return fmt::format("unknown worker ({})", endpoint); diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp index 7fd89f48d..eacd8f82c 100644 --- a/src/raft/coordination.hpp +++ b/src/raft/coordination.hpp @@ -2,6 +2,7 @@ #pragma once +#include #include #include #include @@ -12,6 +13,7 @@ #include "communication/rpc/client_pool.hpp" #include "communication/rpc/server.hpp" #include "io/network/endpoint.hpp" +#include "raft/exceptions.hpp" #include "utils/future.hpp" #include "utils/thread.hpp" @@ -29,21 +31,20 @@ namespace raft { * * This class is thread safe. */ -class Coordination { - protected: +class Coordination final { + public: /** * Class constructor * * @param server_workers_count Number of workers in RPC Server. * @param client_workers_count Number of workers in RPC Client. * @param worker_id ID of Raft worker (node) on this machine. - * @param workers Mapping between Raft worker_ids to their network addresses. + * @param coordination_config_file file that contains coordination config. */ - Coordination(uint16_t sever_workers_count, uint16_t client_workers_count, + Coordination(uint16_t server_workers_count, uint16_t client_workers_count, uint16_t worker_id, std::unordered_map workers); - public: /// Gets the endpoint for the given `worker_id`. io::network::Endpoint GetEndpoint(int worker_id); @@ -82,6 +83,9 @@ class Coordination { server_.Register(callback); } + static std::unordered_map LoadFromFile( + const std::string &coordination_config_file); + protected: /// Adds a worker to the coordination. This function can be called multiple /// times to replace an existing worker. @@ -95,7 +99,6 @@ class Coordination { private: std::unordered_map workers_; mutable std::mutex lock_; - uint16_t worker_id_; std::unordered_map client_pools_; utils::ThreadPool thread_pool_; diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp index e6f5e9e86..0184abb7b 100644 --- a/src/raft/exceptions.hpp +++ b/src/raft/exceptions.hpp @@ -28,4 +28,27 @@ class InvalidTransitionException : public RaftException { new_mode) {} }; +/** + * Exception used to indicate something is wrong with the raft config provided + * by the user. + */ +class RaftConfigException : public RaftException { + public: + using RaftException::RaftException; + explicit RaftConfigException(const std::string &path) + : RaftException("Unable to parse raft config file " + path) {} +}; + +/** + * Exception used to indicate something is wrong with the coordination config + * provided by the user. + */ +class RaftCoordinationConfigException : public RaftException { + public: + using RaftException::RaftException; + explicit RaftCoordinationConfigException(const std::string &path) + : RaftException("Unable to parse raft coordination config file " + path) { + } +}; + } // namespace raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 0b60df56f..02e1fa25b 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -1,23 +1,27 @@ #include "raft/raft_server.hpp" -#include "raft/exceptions.hpp" -#include "raft/raft_rpc_messages.hpp" - -#include "utils/exceptions.hpp" #include #include #include -namespace fs = std::experimental::filesystem; +#include "raft/coordination.hpp" +#include "raft/exceptions.hpp" +#include "raft/raft_rpc_messages.hpp" +#include "utils/exceptions.hpp" + namespace raft { -RaftServer::RaftServer(uint16_t server_id, const Config &config, - Coordination *coordination) +namespace fs = std::experimental::filesystem; + +const std::string kRaftDir = "raft"; + +RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, + const Config &config, Coordination *coordination) : config_(config), server_id_(server_id), - disk_storage_(config.disk_storage_path) { + disk_storage_(fs::path(durability_dir) / kRaftDir) { coordination->Register( [this](const auto &req_reader, auto *res_builder) { throw utils::NotYetImplemented("RaftServer constructor"); diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 1975e1db3..489593305 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -3,12 +3,14 @@ #pragma once #include "raft/config.hpp" -#include "raft/coordination.hpp" #include "storage/common/kvstore/kvstore.hpp" namespace raft { +// Forward declaration. +class Coordination; + enum class Mode { FOLLOWER, CANDIDATE, LEADER }; /** @@ -26,11 +28,12 @@ class RaftServer { * ranging from 1 to cluster_size. * * @param server_id ID of the current server. - * @param config Configurable Raft parameters (e.g. timeout interval) + * @param durbility_dir directory for persisted data. + * @param config raft configuration. * @param coordination Abstraction for coordination between Raft servers. */ - RaftServer(uint16_t server_id, const raft::Config &config, - raft::Coordination *coordination); + RaftServer(uint16_t server_id, const std::string &durability_dir, + const Config &config, raft::Coordination *coordination); private: /** volatile state on all servers **/