Add Raft skeleton

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1732
This commit is contained in:
Ivan Paljak 2018-11-19 11:27:45 +01:00
parent 58b450a2ea
commit 73da1e4463
8 changed files with 331 additions and 2 deletions

View File

@ -255,6 +255,8 @@ set(mg_single_node_ha_sources
durability/single_node_ha/wal.cpp
glue/auth.cpp
glue/communication.cpp
raft/coordination.cpp
raft/raft_server.cpp
query/common.cpp
query/frontend/ast/ast.cpp
query/frontend/ast/cypher_main_visitor.cpp

19
src/raft/config.hpp Normal file
View File

@ -0,0 +1,19 @@
/// @file
#pragma once
#include <chrono>
#include <experimental/filesystem>
namespace raft {
/// Configurable Raft parameters.
struct Config {
std::experimental::filesystem::path disk_storage_path;
std::chrono::milliseconds leader_timeout_min;
std::chrono::milliseconds leader_timeout_max;
std::chrono::milliseconds heartbeat_interval;
std::chrono::milliseconds replicate_timeout;
};
} // namespace raft

65
src/raft/coordination.cpp Normal file
View File

@ -0,0 +1,65 @@
#include "glog/logging.h"
#include <thread>
#include "raft/coordination.hpp"
namespace raft {
Coordination::Coordination(
uint16_t server_workers_count, uint16_t client_workers_count,
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers)
: server_(workers[worker_id], server_workers_count),
workers_(workers),
worker_id_(worker_id),
thread_pool_(client_workers_count, "RPC client") {}
io::network::Endpoint Coordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
auto found = workers_.find(worker_id);
CHECK(found != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
return found->second;
}
io::network::Endpoint Coordination::GetServerEndpoint() {
return server_.endpoint();
}
communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
auto found = client_pools_.find(worker_id);
if (found != client_pools_.end()) return &found->second;
auto found_endpoint = workers_.find(worker_id);
CHECK(found_endpoint != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
auto &endpoint = found_endpoint->second;
return &client_pools_
.emplace(std::piecewise_construct,
std::forward_as_tuple(worker_id),
std::forward_as_tuple(endpoint))
.first->second;
}
void Coordination::AddWorker(int worker_id,
const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
workers_.insert({worker_id, endpoint});
}
std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
for (const auto &worker : workers_) {
if (worker.second == endpoint) {
if (worker.first == 0) {
return fmt::format("master ({})", worker.second);
} else {
return fmt::format("worker {} ({})", worker.first, worker.second);
}
}
}
return fmt::format("unknown worker ({})", endpoint);
}
} // namespace raft

104
src/raft/coordination.hpp Normal file
View File

@ -0,0 +1,104 @@
/// @file
#pragma once
#include <functional>
#include <mutex>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "io/network/endpoint.hpp"
#include "utils/future.hpp"
#include "utils/thread.hpp"
namespace raft {
/**
* This class is responsible for coordination between workers (nodes) within
* the Raft cluster. Its implementation is quite similar to coordination
* in distributed Memgraph apart from slight modifications which align more
* closely to Raft.
*
* It should be noted that, in the context of communication, all nodes within
* the Raft cluster are considered equivalent and are henceforth known simply
* as workers.
*
* This class is thread safe.
*/
class Coordination {
protected:
/**
* Class constructor
*
* @param server_workers_count Number of workers in RPC Server.
* @param client_workers_count Number of workers in RPC Client.
* @param worker_id ID of Raft worker (node) on this machine.
* @param workers Mapping between Raft worker_ids to their network addresses.
*/
Coordination(uint16_t sever_workers_count, uint16_t client_workers_count,
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers);
public:
/// Gets the endpoint for the given `worker_id`.
io::network::Endpoint GetEndpoint(int worker_id);
/// Gets the endpoint for this RPC server.
io::network::Endpoint GetServerEndpoint();
/// Returns a cached `ClientPool` for the given `worker_id`.
communication::rpc::ClientPool *GetClientPool(int worker_id);
/// Asynchronously executes the given function on the RPC client for the
/// given worker id. Returns an `utils::Future` of the given `execute`
/// function's return type.
template <typename TResult>
auto ExecuteOnWorker(
int worker_id,
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
execute) {
auto client_pool = GetClientPool(worker_id);
return thread_pool_.Run(execute, worker_id, std::ref(*client_pool));
}
template <class TRequestResponse>
void Register(std::function<
void(const typename TRequestResponse::Request::Capnp::Reader &,
typename TRequestResponse::Response::Capnp::Builder *)>
callback) {
server_.Register<TRequestResponse>(callback);
}
template <class TRequestResponse>
void Register(std::function<
void(const io::network::Endpoint &,
const typename TRequestResponse::Request::Capnp::Reader &,
typename TRequestResponse::Response::Capnp::Builder *)>
callback) {
server_.Register<TRequestResponse>(callback);
}
protected:
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
void AddWorker(int worker_id, const io::network::Endpoint &endpoint);
/// Gets a worker name for the given endpoint.
std::string GetWorkerName(const io::network::Endpoint &endpoint);
communication::rpc::Server server_;
private:
std::unordered_map<uint16_t, io::network::Endpoint> workers_;
mutable std::mutex lock_;
uint16_t worker_id_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
utils::ThreadPool thread_pool_;
};
} // namespace raft

31
src/raft/exceptions.hpp Normal file
View File

@ -0,0 +1,31 @@
/// @file
#pragma once
#include "utils/exceptions.hpp"
namespace raft {
/**
* Base exception class used for all exceptions that can occur within the
* Raft protocol.
*/
class RaftException : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};
/**
* This exception should be thrown when attempting to transition between
* incompatible states, e.g. from `FOLLOWER` to `LEADER`.
*/
class InvalidTransitionException : public RaftException {
public:
using RaftException::RaftException;
InvalidTransitionException(const std::string &old_mode,
const std::string &new_mode)
: RaftException("Invalid transition from " + old_mode + " to " +
new_mode) {}
};
} // namespace raft

36
src/raft/raft_server.cpp Normal file
View File

@ -0,0 +1,36 @@
#include "raft/raft_server.hpp"
#include "raft/exceptions.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "utils/exceptions.hpp"
#include <experimental/filesystem>
#include <gflags/gflags.h>
#include <glog/logging.h>
namespace fs = std::experimental::filesystem;
namespace raft {
RaftServer::RaftServer(uint16_t server_id, const Config &config,
Coordination *coordination)
: config_(config),
server_id_(server_id),
disk_storage_(config.disk_storage_path) {
coordination->Register<RequestVoteRpc>(
[this](const auto &req_reader, auto *res_builder) {
throw utils::NotYetImplemented("RaftServer constructor");
});
coordination->Register<AppendEntriesRpc>(
[this](const auto &req_reader, auto *res_builder) {
throw utils::NotYetImplemented("RaftServer constructor");
});
}
void RaftServer::Transition(const Mode &new_mode) {
throw utils::NotYetImplemented("RaftServer transition");
}
} // namespace raft

72
src/raft/raft_server.hpp Normal file
View File

@ -0,0 +1,72 @@
/// @file
#pragma once
#include "raft/config.hpp"
#include "raft/coordination.hpp"
#include "storage/common/kvstore/kvstore.hpp"
namespace raft {
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
/**
* Class which models the behaviour of a single server within the Raft
* cluster. The class is responsible for storing both volatile and
* persistent internal state of the corresponding state machine as well
* as performing operations that comply with the Raft protocol.
*/
class RaftServer {
public:
RaftServer() = delete;
/**
* The implementation assumes that server IDs are unique integers between
* ranging from 1 to cluster_size.
*
* @param server_id ID of the current server.
* @param config Configurable Raft parameters (e.g. timeout interval)
* @param coordination Abstraction for coordination between Raft servers.
*/
RaftServer(uint16_t server_id, const raft::Config &config,
raft::Coordination *coordination);
private:
/** volatile state on all servers **/
raft::Mode mode_; ///< Server's current mode.
raft::Config config_; ///< Raft config.
uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known commited entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
/** volatile state on leaders **/
std::vector<uint16_t> next_index_; ///< for each server, index of the next
///< log entry to send to that server.
std::vector<uint16_t> match_index_; ///< for each server, index of the
///< highest log entry known to be
///< replicated on server.
/** persistent state on all servers
*
* Persistent data consists of:
* - uint64_t current_term -- latest term server has seen.
* - uint16_t voted_for -- candidate_id that received vote in current
* term (null if none).
* - vector<LogEntry> log -- log entries.
*/
storage::KVStore disk_storage_;
/**
* Makes a transition to a new `raft::Mode`.
*
* @throws InvalidTransitionException when transitioning between uncompatible
* `raft::Mode`s.
*/
void Transition(const raft::Mode &new_mode);
};
} // namespace raft

View File

@ -75,8 +75,8 @@ class PropertyValueStore {
void clear();
/**
* Returns a static storage::kvstore instance used for storing properties on
* disk. This hack is needed due to statics that are internal to rocksdb and
* Returns a static storage::KVStore instance used for storing properties on
* disk. This hack is needed due to statics that are internal to RocksDB and
* availability of durability_directory flag.
*/
storage::KVStore &DiskStorage() const;