Test Raft state
This commit is contained in:
parent
a999110737
commit
ef85e8ae61
@ -45,6 +45,18 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state
|
||||
j.at("config").get_to(instance_state.config);
|
||||
}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) {
|
||||
auto c2c_config = CoordinatorToCoordinatorConfig{
|
||||
.coordinator_id = config.coordinator_id,
|
||||
.bolt_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.bolt_port)},
|
||||
.coordinator_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.coordinator_port)}};
|
||||
coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)});
|
||||
}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
||||
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
||||
: repl_instances_{other.repl_instances_},
|
||||
current_main_uuid_(other.current_main_uuid_),
|
||||
|
@ -16,13 +16,13 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
void to_json(nlohmann::json &j, CoordinatorToCoordinatorConfig const &config) {
|
||||
j = nlohmann::json{{"coordinator_server_id", config.coordinator_server_id},
|
||||
j = nlohmann::json{{"coordinator_id", config.coordinator_id},
|
||||
{"coordinator_server", config.coordinator_server},
|
||||
{"bolt_server", config.bolt_server}};
|
||||
}
|
||||
|
||||
void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config) {
|
||||
config.coordinator_server_id = j.at("coordinator_server_id").get<uint32_t>();
|
||||
config.coordinator_id = j.at("coordinator_id").get<uint32_t>();
|
||||
config.coordinator_server = j.at("coordinator_server").get<io::network::Endpoint>();
|
||||
config.bolt_server = j.at("bolt_server").get<io::network::Endpoint>();
|
||||
}
|
||||
|
@ -29,9 +29,10 @@ namespace memgraph::coordination {
|
||||
using nuraft::ptr;
|
||||
using nuraft::srv_config;
|
||||
|
||||
CoordinatorInstance::CoordinatorInstance()
|
||||
CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config)
|
||||
: thread_pool_{1},
|
||||
raft_state_(RaftState::MakeRaftState(
|
||||
config,
|
||||
[this]() {
|
||||
spdlog::info("Leader changed, starting all replication instances!");
|
||||
auto const instances = raft_state_.GetReplicationInstances();
|
||||
@ -101,13 +102,13 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
|
||||
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
|
||||
.raft_socket_address = instance->get_endpoint(),
|
||||
auto const coord_instance_to_status = [](CoordinatorInstanceState const &instance) -> InstanceStatus {
|
||||
return {.instance_name = fmt::format("coordinator_{}", instance.config.coordinator_id),
|
||||
.raft_socket_address = instance.config.coordinator_server.SocketAddress(),
|
||||
.cluster_role = "coordinator",
|
||||
.health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move
|
||||
.health = "unknown"};
|
||||
};
|
||||
auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status);
|
||||
auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status);
|
||||
|
||||
if (raft_state_.IsLeader()) {
|
||||
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
||||
|
@ -24,14 +24,16 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
CoordinatorState::CoordinatorState(InstanceInitConfig const &config) {
|
||||
if (std::holds_alternative<ReplicationInstanceInitConfig>(config.data_)) {
|
||||
auto const mgmt_config = ManagementServerConfig{
|
||||
.ip_address = kDefaultReplicationServerIp,
|
||||
.port = static_cast<uint16_t>(std::get<ReplicationInstanceInitConfig>(config.data_).management_port),
|
||||
};
|
||||
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(mgmt_config)};
|
||||
}
|
||||
CoordinatorState::CoordinatorState(CoordinatorInstanceInitConfig const &config) {
|
||||
data_.emplace<CoordinatorInstance>(config);
|
||||
}
|
||||
|
||||
CoordinatorState::CoordinatorState(ReplicationInstanceInitConfig const &config) {
|
||||
auto const mgmt_config = ManagementServerConfig{
|
||||
.ip_address = kDefaultReplicationServerIp,
|
||||
.port = static_cast<uint16_t>(config.management_port),
|
||||
};
|
||||
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(mgmt_config)};
|
||||
}
|
||||
|
||||
auto CoordinatorState::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
||||
|
@ -20,6 +20,9 @@ constexpr int MAX_SNAPSHOTS = 3;
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
CoordinatorStateMachine::CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config)
|
||||
: cluster_state_(config) {}
|
||||
|
||||
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
|
||||
|
||||
auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool {
|
||||
|
@ -30,6 +30,16 @@ namespace memgraph::coordination {
|
||||
|
||||
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
|
||||
|
||||
struct ReplicationInstanceInitConfig {
|
||||
int management_port{0};
|
||||
};
|
||||
|
||||
struct CoordinatorInstanceInitConfig {
|
||||
uint32_t coordinator_id{0};
|
||||
int coordinator_port{0};
|
||||
int bolt_port{0};
|
||||
};
|
||||
|
||||
struct ReplicationClientInfo {
|
||||
std::string instance_name{};
|
||||
replication_coordination_glue::ReplicationMode replication_mode{};
|
||||
@ -66,7 +76,7 @@ struct CoordinatorToReplicaConfig {
|
||||
};
|
||||
|
||||
struct CoordinatorToCoordinatorConfig {
|
||||
uint32_t coordinator_server_id{0};
|
||||
uint32_t coordinator_id{0};
|
||||
io::network::Endpoint bolt_server;
|
||||
io::network::Endpoint coordinator_server;
|
||||
|
||||
|
@ -37,7 +37,7 @@ using InstanceNameDbHistories = std::pair<std::string, replication_coordination_
|
||||
|
||||
class CoordinatorInstance {
|
||||
public:
|
||||
CoordinatorInstance();
|
||||
explicit CoordinatorInstance(CoordinatorInstanceInitConfig const &config);
|
||||
CoordinatorInstance(CoordinatorInstance const &) = delete;
|
||||
CoordinatorInstance &operator=(CoordinatorInstance const &) = delete;
|
||||
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
||||
|
@ -22,22 +22,10 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct ReplicationInstanceInitConfig {
|
||||
uint32_t management_port{0};
|
||||
};
|
||||
|
||||
struct CoordinatorInstanceInitConfig {
|
||||
uint32_t raft_server_id{0};
|
||||
uint32_t coordinator_port{0};
|
||||
};
|
||||
|
||||
struct InstanceInitConfig {
|
||||
std::variant<ReplicationInstanceInitConfig, CoordinatorInstanceInitConfig> data_;
|
||||
};
|
||||
|
||||
class CoordinatorState {
|
||||
public:
|
||||
explicit CoordinatorState(InstanceInitConfig const &config);
|
||||
explicit CoordinatorState(CoordinatorInstanceInitConfig const &config);
|
||||
explicit CoordinatorState(ReplicationInstanceInitConfig const &config);
|
||||
~CoordinatorState() = default;
|
||||
|
||||
CoordinatorState(CoordinatorState const &) = delete;
|
||||
@ -67,7 +55,7 @@ class CoordinatorState {
|
||||
std::unique_ptr<CoordinatorServer> coordinator_server_;
|
||||
};
|
||||
|
||||
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
|
||||
std::variant<CoordinatorMainReplicaData, CoordinatorInstance> data_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -40,8 +40,8 @@ using raft_result = nuraft::cmd_result<ptr<buffer>>;
|
||||
|
||||
class RaftState {
|
||||
private:
|
||||
explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
|
||||
uint32_t raft_port, std::string raft_address);
|
||||
explicit RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
|
||||
BecomeFollowerCb become_follower_cb);
|
||||
|
||||
auto InitRaftServer() -> void;
|
||||
|
||||
@ -53,13 +53,13 @@ class RaftState {
|
||||
RaftState &operator=(RaftState &&other) noexcept = default;
|
||||
~RaftState();
|
||||
|
||||
static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState;
|
||||
static auto MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
|
||||
BecomeFollowerCb &&become_follower_cb) -> RaftState;
|
||||
|
||||
auto InstanceName() const -> std::string;
|
||||
auto RaftSocketAddress() const -> std::string;
|
||||
|
||||
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
|
||||
|
||||
auto RequestLeadership() -> bool;
|
||||
auto IsLeader() const -> bool;
|
||||
@ -68,6 +68,7 @@ class RaftState {
|
||||
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
|
||||
auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
|
||||
|
||||
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
|
||||
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool;
|
||||
@ -78,7 +79,6 @@ class RaftState {
|
||||
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
||||
|
||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||
// TODO: (andi) Do we need then GetAllCoordinators?
|
||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||
|
||||
auto MainExists() const -> bool;
|
||||
|
@ -74,7 +74,7 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state
|
||||
// Source of truth since it is modified only as the result of RAFT's commiting
|
||||
class CoordinatorClusterState {
|
||||
public:
|
||||
CoordinatorClusterState() = default;
|
||||
explicit CoordinatorClusterState(CoordinatorInstanceInitConfig const &config);
|
||||
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||
std::vector<CoordinatorInstanceState> coordinators,
|
||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened);
|
||||
|
@ -35,7 +35,7 @@ using nuraft::state_machine;
|
||||
|
||||
class CoordinatorStateMachine : public state_machine {
|
||||
public:
|
||||
CoordinatorStateMachine() = default;
|
||||
explicit CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config);
|
||||
CoordinatorStateMachine(CoordinatorStateMachine const &) = delete;
|
||||
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
|
||||
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
|
||||
|
@ -30,11 +30,11 @@ using nuraft::raft_server;
|
||||
using nuraft::srv_config;
|
||||
using raft_result = cmd_result<ptr<buffer>>;
|
||||
|
||||
RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
|
||||
uint32_t raft_port, std::string raft_address)
|
||||
: raft_endpoint_(raft_address, raft_port),
|
||||
coordinator_id_(coordinator_id),
|
||||
state_machine_(cs_new<CoordinatorStateMachine>()),
|
||||
RaftState::RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
|
||||
BecomeFollowerCb become_follower_cb)
|
||||
: raft_endpoint_("127.0.0.1", config.coordinator_port),
|
||||
coordinator_id_(config.coordinator_id),
|
||||
state_machine_(cs_new<CoordinatorStateMachine>(config)),
|
||||
state_manager_(cs_new<CoordinatorStateManager>(coordinator_id_, raft_endpoint_.SocketAddress())),
|
||||
logger_(nullptr),
|
||||
become_leader_cb_(std::move(become_leader_cb)),
|
||||
@ -97,12 +97,9 @@ auto RaftState::InitRaftServer() -> void {
|
||||
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint_.SocketAddress());
|
||||
}
|
||||
|
||||
auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState {
|
||||
uint32_t coordinator_id = FLAGS_coordinator_id;
|
||||
uint32_t raft_port = FLAGS_coordinator_port;
|
||||
|
||||
auto raft_state =
|
||||
RaftState(std::move(become_leader_cb), std::move(become_follower_cb), coordinator_id, raft_port, "127.0.0.1");
|
||||
auto RaftState::MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
|
||||
BecomeFollowerCb &&become_follower_cb) -> RaftState {
|
||||
auto raft_state = RaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
raft_state.InitRaftServer();
|
||||
return raft_state;
|
||||
@ -110,15 +107,13 @@ auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerC
|
||||
|
||||
RaftState::~RaftState() { launcher_.shutdown(); }
|
||||
|
||||
auto RaftState::InstanceName() const -> std::string {
|
||||
return fmt::format("coordinator_{}", std::to_string(coordinator_id_));
|
||||
}
|
||||
auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); }
|
||||
|
||||
auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); }
|
||||
|
||||
auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||
auto const endpoint = config.coordinator_server.SocketAddress();
|
||||
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_server_id), endpoint);
|
||||
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_id), endpoint);
|
||||
|
||||
auto cmd_result = raft_server_->add_srv(srv_config_to_add);
|
||||
|
||||
@ -136,9 +131,9 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
|
||||
bool added{false};
|
||||
while (!maybe_stop()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(waiting_period));
|
||||
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_server_id));
|
||||
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_id));
|
||||
if (server_config) {
|
||||
spdlog::trace("Server with id {} added to cluster", config.coordinator_server_id);
|
||||
spdlog::trace("Server with id {} added to cluster", config.coordinator_id);
|
||||
added = true;
|
||||
break;
|
||||
}
|
||||
@ -150,12 +145,6 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
|
||||
}
|
||||
}
|
||||
|
||||
auto RaftState::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
|
||||
std::vector<ptr<srv_config>> all_srv_configs;
|
||||
raft_server_->get_srv_config_all(all_srv_configs);
|
||||
return all_srv_configs;
|
||||
}
|
||||
|
||||
auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
|
||||
|
||||
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
|
||||
@ -363,14 +352,14 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
|
||||
spdlog::error(
|
||||
"Failed to accept request for adding coordinator instance {}. Most likely the reason is that the instance is "
|
||||
"not the leader.",
|
||||
config.coordinator_server_id);
|
||||
config.coordinator_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_server_id);
|
||||
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_id);
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_server_id,
|
||||
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_id,
|
||||
static_cast<int>(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
@ -10,12 +10,17 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "replication.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
|
||||
#include <limits>
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(management_port, 0, "Port on which coordinator servers will be started.");
|
||||
DEFINE_VALIDATED_int32(management_port, 0, "Port on which coordinator servers will be started.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(coordinator_port, 0, "Port on which raft servers will be started.");
|
||||
DEFINE_VALIDATED_int32(coordinator_port, 0, "Port on which raft servers will be started.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(coordinator_id, 0, "Unique ID of the raft server.");
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
@ -15,9 +15,9 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(management_port);
|
||||
DECLARE_int32(management_port);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(coordinator_port);
|
||||
DECLARE_int32(coordinator_port);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(coordinator_id);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
@ -407,17 +407,20 @@ int main(int argc, char **argv) {
|
||||
|
||||
// singleton coordinator state
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::InstanceInitConfig config;
|
||||
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
|
||||
config.data_ = memgraph::coordination::CoordinatorInstanceInitConfig {
|
||||
.raft_server_id = FLAGS_coordinator_id, .coordinator_port = FLAGS_coordinator_port
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorState;
|
||||
using memgraph::coordination::ReplicationInstanceInitConfig;
|
||||
auto coordinator_state = [&]() -> CoordinatorState {
|
||||
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
|
||||
return CoordinatorState{CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id,
|
||||
.coordinator_port = FLAGS_coordinator_port,
|
||||
.bolt_port = FLAGS_bolt_port}};
|
||||
}
|
||||
} else if (FLAGS_management_port) {
|
||||
config.data_ = memgraph::coordination::ReplicationInstanceInitConfig { .management_port = FLAGS_management_port }
|
||||
} else {
|
||||
throw std::runtime_error("Coordinator or replica must be started with valid flags!");
|
||||
}
|
||||
memgraph::coordination::CoordinatorState coordinator_state{config};
|
||||
if (FLAGS_management_port) {
|
||||
return CoordinatorState{ReplicationInstanceInitConfig{.management_port = FLAGS_management_port}};
|
||||
}
|
||||
throw std::runtime_error("Coordination or replication instances must be started with valid flags!");
|
||||
}();
|
||||
#endif
|
||||
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
|
||||
|
@ -3178,7 +3178,7 @@ class CoordinatorQuery : public memgraph::query::Query {
|
||||
memgraph::query::CoordinatorQuery::Action action_;
|
||||
std::string instance_name_{};
|
||||
std::unordered_map<memgraph::query::Expression *, memgraph::query::Expression *> configs_;
|
||||
memgraph::query::Expression *coordinator_server_id_{nullptr};
|
||||
memgraph::query::Expression *coordinator_id_{nullptr};
|
||||
memgraph::query::CoordinatorQuery::SyncMode sync_mode_;
|
||||
|
||||
CoordinatorQuery *Clone(AstStorage *storage) const override {
|
||||
@ -3186,7 +3186,7 @@ class CoordinatorQuery : public memgraph::query::Query {
|
||||
|
||||
object->action_ = action_;
|
||||
object->instance_name_ = instance_name_;
|
||||
object->coordinator_server_id_ = coordinator_server_id_ ? coordinator_server_id_->Clone(storage) : nullptr;
|
||||
object->coordinator_id_ = coordinator_id_ ? coordinator_id_->Clone(storage) : nullptr;
|
||||
object->sync_mode_ = sync_mode_;
|
||||
for (const auto &[key, value] : configs_) {
|
||||
object->configs_[key->Clone(storage)] = value->Clone(storage);
|
||||
|
@ -447,7 +447,7 @@ antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::Add
|
||||
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
|
||||
|
||||
coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE;
|
||||
coordinator_query->coordinator_server_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
|
||||
coordinator_query->coordinator_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
|
||||
coordinator_query->configs_ =
|
||||
std::any_cast<std::unordered_map<Expression *, Expression *>>(ctx->configsMap->accept(this));
|
||||
|
||||
|
@ -498,7 +498,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
}
|
||||
|
||||
auto const coord_coord_config =
|
||||
coordination::CoordinatorToCoordinatorConfig{.coordinator_server_id = coordinator_id,
|
||||
coordination::CoordinatorToCoordinatorConfig{.coordinator_id = coordinator_id,
|
||||
.bolt_server = *maybe_bolt_server,
|
||||
.coordinator_server = *maybe_coordinator_server};
|
||||
|
||||
@ -1224,7 +1224,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer);
|
||||
}
|
||||
|
||||
auto coord_server_id = coordinator_query->coordinator_server_id_->Accept(evaluator).ValueInt();
|
||||
auto coord_server_id = coordinator_query->coordinator_id_->Accept(evaluator).ValueInt();
|
||||
|
||||
callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coord_server_id,
|
||||
bolt_server = bolt_server_it->second,
|
||||
|
@ -187,7 +187,7 @@ TEST_F(CoordinatorClusterStateTest, UpdateUUID) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) {
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1,
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
@ -222,7 +222,7 @@ TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
|
||||
CoordinatorInstanceState instance_state{
|
||||
CoordinatorToCoordinatorConfig{.coordinator_server_id = 1,
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}}};
|
||||
nlohmann::json j = instance_state;
|
||||
@ -232,7 +232,7 @@ TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, Marshalling) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1,
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
|
@ -81,7 +81,7 @@ TEST_F(CoordinatorStateMachineTest, SerializeUnregisterReplicationInstance) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeAddCoordinatorInstance) {
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1,
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
|
@ -2706,7 +2706,7 @@ TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) {
|
||||
auto *parsed_query = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query));
|
||||
|
||||
EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE);
|
||||
ast_generator.CheckLiteral(parsed_query->coordinator_server_id_, TypedValue(1));
|
||||
ast_generator.CheckLiteral(parsed_query->coordinator_id_, TypedValue(1));
|
||||
|
||||
auto const evaluate_config_map = [&ast_generator](std::unordered_map<Expression *, Expression *> const &config_map)
|
||||
-> std::map<std::string, std::string, std::less<>> {
|
||||
|
@ -18,9 +18,10 @@
|
||||
|
||||
#include "libnuraft/nuraft.hxx"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using nuraft::ptr;
|
||||
|
||||
class RaftStateTest : public ::testing::Test {
|
||||
@ -32,11 +33,24 @@ class RaftStateTest : public ::testing::Test {
|
||||
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_state"};
|
||||
};
|
||||
|
||||
TEST_F(RaftStateTest, CreationOfRaftState) {
|
||||
TEST_F(RaftStateTest, RaftStateEmptyMetadata) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
|
||||
auto raft_state = RaftState::MakeRaftState(std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
spdlog::info("Raft state instance name: {}", raft_state.InstanceName());
|
||||
spdlog::info("Raft state socket address: {}", raft_state.RaftSocketAddress());
|
||||
auto const config = CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 1234, .bolt_port = 7688};
|
||||
|
||||
auto raft_state = RaftState::MakeRaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
ASSERT_EQ(raft_state.InstanceName(), "coordinator_1");
|
||||
ASSERT_EQ(raft_state.RaftSocketAddress(), "127.0.0.1:1234");
|
||||
ASSERT_TRUE(raft_state.IsLeader());
|
||||
ASSERT_TRUE(raft_state.GetReplicationInstances().empty());
|
||||
|
||||
auto const coords = raft_state.GetCoordinatorInstances();
|
||||
ASSERT_EQ(coords.size(), 1);
|
||||
auto const &coord_instance = coords[0];
|
||||
auto const &coord_config = CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 1234}};
|
||||
ASSERT_EQ(coord_instance.config, coord_config);
|
||||
}
|
||||
|
@ -147,7 +147,6 @@ TEST_F(RoutingTableTest, GetMixedRoutingTable) {
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}}});
|
||||
instance1.GetAllCoordinators();
|
||||
// auto routing_table = instance1.GetRoutingTable(routing);
|
||||
|
||||
// ASSERT_EQ(routing_table.size(), 1);
|
||||
@ -158,7 +157,7 @@ TEST_F(RoutingTableTest, GetMixedRoutingTable) {
|
||||
// TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) {
|
||||
//
|
||||
// CoordinatorInstance instance1;
|
||||
// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_server_id = 1,
|
||||
// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
// .bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
// .coordinator_server = Endpoint{"127.0.0.1",
|
||||
// 10111}});
|
||||
|
Loading…
Reference in New Issue
Block a user