diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4b5eeb2eb..8fbb73d71 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -255,6 +255,8 @@ set(mg_single_node_ha_sources durability/single_node_ha/wal.cpp glue/auth.cpp glue/communication.cpp + raft/coordination.cpp + raft/raft_server.cpp query/common.cpp query/frontend/ast/ast.cpp query/frontend/ast/cypher_main_visitor.cpp diff --git a/src/raft/config.hpp b/src/raft/config.hpp new file mode 100644 index 000000000..dcf428009 --- /dev/null +++ b/src/raft/config.hpp @@ -0,0 +1,19 @@ +/// @file + +#pragma once + +#include +#include + +namespace raft { + +/// Configurable Raft parameters. +struct Config { + std::experimental::filesystem::path disk_storage_path; + std::chrono::milliseconds leader_timeout_min; + std::chrono::milliseconds leader_timeout_max; + std::chrono::milliseconds heartbeat_interval; + std::chrono::milliseconds replicate_timeout; +}; + +} // namespace raft diff --git a/src/raft/coordination.cpp b/src/raft/coordination.cpp new file mode 100644 index 000000000..150d72d4c --- /dev/null +++ b/src/raft/coordination.cpp @@ -0,0 +1,65 @@ +#include "glog/logging.h" + +#include + +#include "raft/coordination.hpp" + +namespace raft { + +Coordination::Coordination( + uint16_t server_workers_count, uint16_t client_workers_count, + uint16_t worker_id, + std::unordered_map workers) + : server_(workers[worker_id], server_workers_count), + workers_(workers), + worker_id_(worker_id), + thread_pool_(client_workers_count, "RPC client") {} + +io::network::Endpoint Coordination::GetEndpoint(int worker_id) { + std::lock_guard guard(lock_); + auto found = workers_.find(worker_id); + CHECK(found != workers_.end()) + << "No endpoint registered for worker id: " << worker_id; + return found->second; +} + +io::network::Endpoint Coordination::GetServerEndpoint() { + return server_.endpoint(); +} + +communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) { + std::lock_guard guard(lock_); + auto found = client_pools_.find(worker_id); + if (found != client_pools_.end()) return &found->second; + auto found_endpoint = workers_.find(worker_id); + CHECK(found_endpoint != workers_.end()) + << "No endpoint registered for worker id: " << worker_id; + auto &endpoint = found_endpoint->second; + return &client_pools_ + .emplace(std::piecewise_construct, + std::forward_as_tuple(worker_id), + std::forward_as_tuple(endpoint)) + .first->second; +} + +void Coordination::AddWorker(int worker_id, + const io::network::Endpoint &endpoint) { + std::lock_guard guard(lock_); + workers_.insert({worker_id, endpoint}); +} + +std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) { + std::lock_guard guard(lock_); + for (const auto &worker : workers_) { + if (worker.second == endpoint) { + if (worker.first == 0) { + return fmt::format("master ({})", worker.second); + } else { + return fmt::format("worker {} ({})", worker.first, worker.second); + } + } + } + return fmt::format("unknown worker ({})", endpoint); +} + +} // namespace raft diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp new file mode 100644 index 000000000..7fd89f48d --- /dev/null +++ b/src/raft/coordination.hpp @@ -0,0 +1,104 @@ +/// @file + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "communication/rpc/client_pool.hpp" +#include "communication/rpc/server.hpp" +#include "io/network/endpoint.hpp" +#include "utils/future.hpp" +#include "utils/thread.hpp" + +namespace raft { + +/** + * This class is responsible for coordination between workers (nodes) within + * the Raft cluster. Its implementation is quite similar to coordination + * in distributed Memgraph apart from slight modifications which align more + * closely to Raft. + * + * It should be noted that, in the context of communication, all nodes within + * the Raft cluster are considered equivalent and are henceforth known simply + * as workers. + * + * This class is thread safe. + */ +class Coordination { + protected: + /** + * Class constructor + * + * @param server_workers_count Number of workers in RPC Server. + * @param client_workers_count Number of workers in RPC Client. + * @param worker_id ID of Raft worker (node) on this machine. + * @param workers Mapping between Raft worker_ids to their network addresses. + */ + Coordination(uint16_t sever_workers_count, uint16_t client_workers_count, + uint16_t worker_id, + std::unordered_map workers); + + public: + /// Gets the endpoint for the given `worker_id`. + io::network::Endpoint GetEndpoint(int worker_id); + + /// Gets the endpoint for this RPC server. + io::network::Endpoint GetServerEndpoint(); + + /// Returns a cached `ClientPool` for the given `worker_id`. + communication::rpc::ClientPool *GetClientPool(int worker_id); + + /// Asynchronously executes the given function on the RPC client for the + /// given worker id. Returns an `utils::Future` of the given `execute` + /// function's return type. + template + auto ExecuteOnWorker( + int worker_id, + std::function + execute) { + auto client_pool = GetClientPool(worker_id); + return thread_pool_.Run(execute, worker_id, std::ref(*client_pool)); + } + + template + void Register(std::function< + void(const typename TRequestResponse::Request::Capnp::Reader &, + typename TRequestResponse::Response::Capnp::Builder *)> + callback) { + server_.Register(callback); + } + + template + void Register(std::function< + void(const io::network::Endpoint &, + const typename TRequestResponse::Request::Capnp::Reader &, + typename TRequestResponse::Response::Capnp::Builder *)> + callback) { + server_.Register(callback); + } + + protected: + /// Adds a worker to the coordination. This function can be called multiple + /// times to replace an existing worker. + void AddWorker(int worker_id, const io::network::Endpoint &endpoint); + + /// Gets a worker name for the given endpoint. + std::string GetWorkerName(const io::network::Endpoint &endpoint); + + communication::rpc::Server server_; + + private: + std::unordered_map workers_; + mutable std::mutex lock_; + uint16_t worker_id_; + + std::unordered_map client_pools_; + utils::ThreadPool thread_pool_; +}; + +} // namespace raft diff --git a/src/raft/exceptions.hpp b/src/raft/exceptions.hpp new file mode 100644 index 000000000..e6f5e9e86 --- /dev/null +++ b/src/raft/exceptions.hpp @@ -0,0 +1,31 @@ +/// @file + +#pragma once + +#include "utils/exceptions.hpp" + +namespace raft { + +/** + * Base exception class used for all exceptions that can occur within the + * Raft protocol. + */ +class RaftException : public utils::BasicException { + public: + using utils::BasicException::BasicException; +}; + +/** + * This exception should be thrown when attempting to transition between + * incompatible states, e.g. from `FOLLOWER` to `LEADER`. + */ +class InvalidTransitionException : public RaftException { + public: + using RaftException::RaftException; + InvalidTransitionException(const std::string &old_mode, + const std::string &new_mode) + : RaftException("Invalid transition from " + old_mode + " to " + + new_mode) {} +}; + +} // namespace raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp new file mode 100644 index 000000000..0b60df56f --- /dev/null +++ b/src/raft/raft_server.cpp @@ -0,0 +1,36 @@ +#include "raft/raft_server.hpp" +#include "raft/exceptions.hpp" +#include "raft/raft_rpc_messages.hpp" + +#include "utils/exceptions.hpp" + +#include + +#include +#include + +namespace fs = std::experimental::filesystem; + +namespace raft { + +RaftServer::RaftServer(uint16_t server_id, const Config &config, + Coordination *coordination) + : config_(config), + server_id_(server_id), + disk_storage_(config.disk_storage_path) { + coordination->Register( + [this](const auto &req_reader, auto *res_builder) { + throw utils::NotYetImplemented("RaftServer constructor"); + }); + + coordination->Register( + [this](const auto &req_reader, auto *res_builder) { + throw utils::NotYetImplemented("RaftServer constructor"); + }); +} + +void RaftServer::Transition(const Mode &new_mode) { + throw utils::NotYetImplemented("RaftServer transition"); +} + +} // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp new file mode 100644 index 000000000..1975e1db3 --- /dev/null +++ b/src/raft/raft_server.hpp @@ -0,0 +1,72 @@ +/// @file + +#pragma once + +#include "raft/config.hpp" +#include "raft/coordination.hpp" + +#include "storage/common/kvstore/kvstore.hpp" + +namespace raft { + +enum class Mode { FOLLOWER, CANDIDATE, LEADER }; + +/** + * Class which models the behaviour of a single server within the Raft + * cluster. The class is responsible for storing both volatile and + * persistent internal state of the corresponding state machine as well + * as performing operations that comply with the Raft protocol. + */ +class RaftServer { + public: + RaftServer() = delete; + + /** + * The implementation assumes that server IDs are unique integers between + * ranging from 1 to cluster_size. + * + * @param server_id ID of the current server. + * @param config Configurable Raft parameters (e.g. timeout interval) + * @param coordination Abstraction for coordination between Raft servers. + */ + RaftServer(uint16_t server_id, const raft::Config &config, + raft::Coordination *coordination); + + private: + /** volatile state on all servers **/ + + raft::Mode mode_; ///< Server's current mode. + raft::Config config_; ///< Raft config. + + uint16_t server_id_; ///< ID of the current server. + uint64_t commit_index_; ///< Index of the highest known commited entry. + uint64_t last_applied_; ///< Index of the highest applied entry to SM. + + /** volatile state on leaders **/ + + std::vector next_index_; ///< for each server, index of the next + ///< log entry to send to that server. + + std::vector match_index_; ///< for each server, index of the + ///< highest log entry known to be + ///< replicated on server. + + /** persistent state on all servers + * + * Persistent data consists of: + * - uint64_t current_term -- latest term server has seen. + * - uint16_t voted_for -- candidate_id that received vote in current + * term (null if none). + * - vector log -- log entries. + */ + storage::KVStore disk_storage_; + + /** + * Makes a transition to a new `raft::Mode`. + * + * @throws InvalidTransitionException when transitioning between uncompatible + * `raft::Mode`s. + */ + void Transition(const raft::Mode &new_mode); +}; +} // namespace raft diff --git a/src/storage/common/types/property_value_store.hpp b/src/storage/common/types/property_value_store.hpp index faef6a213..68dc46d71 100644 --- a/src/storage/common/types/property_value_store.hpp +++ b/src/storage/common/types/property_value_store.hpp @@ -75,8 +75,8 @@ class PropertyValueStore { void clear(); /** - * Returns a static storage::kvstore instance used for storing properties on - * disk. This hack is needed due to statics that are internal to rocksdb and + * Returns a static storage::KVStore instance used for storing properties on + * disk. This hack is needed due to statics that are internal to RocksDB and * availability of durability_directory flag. */ storage::KVStore &DiskStorage() const;