Test CoordClusterState

This commit is contained in:
Andi Skrgat 2024-03-15 14:41:02 +01:00
parent 581767b491
commit 3a0bd33a91
3 changed files with 244 additions and 135 deletions

View File

@ -30,8 +30,20 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
}
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
std::vector<CoordinatorInstanceState> coordinators,
utils::UUID const &current_main_uuid, bool is_lock_opened)
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
: repl_instances_{std::move(instances)},
coordinators_{std::move(coordinators)},
current_main_uuid_(current_main_uuid),
is_lock_opened_(is_lock_opened) {}
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) {
j = nlohmann::json{{"config", instance_state.config}};
}
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state) {
j.at("config").get_to(instance_state.config);
}
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
: repl_instances_{other.repl_instances_},
@ -88,7 +100,13 @@ auto CoordinatorClusterState::IsCurrentMain(std::string_view instance_name) cons
it->second.instance_uuid == current_main_uuid_;
}
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
auto CoordinatorClusterState::InsertInstance(std::string instance_name, ReplicationInstanceState instance_state)
-> void {
auto lock = std::lock_guard{log_lock_};
repl_instances_.insert_or_assign(std::move(instance_name), std::move(instance_state));
}
auto CoordinatorClusterState::DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void {
auto lock = std::lock_guard{log_lock_};
switch (log_action) {
// end of OPEN_LOCK_REGISTER_REPLICATION_INSTANCE
@ -187,9 +205,11 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto lock = std::shared_lock{log_lock_};
nlohmann::json j = {{"repl_instances", repl_instances_},
{"coord_instances", coordinators_},
{"is_lock_opened", is_lock_opened_},
{"current_main_uuid", current_main_uuid_}};
auto const log = j.dump();
data = buffer::alloc(sizeof(uint32_t) + log.size());
buffer_serializer bs(data);
bs.put_str(log);
@ -198,10 +218,14 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
buffer_serializer bs(data);
auto const j = nlohmann::json::parse(bs.get_str());
auto instances = j["repl_instances"].get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
auto current_main_uuid = j["current_main_uuid"].get<utils::UUID>();
bool is_lock_opened = j["is_lock_opened"].get<int>();
return CoordinatorClusterState{std::move(instances), current_main_uuid, is_lock_opened};
auto repl_instances = j.at("repl_instances").get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
auto current_main_uuid = j.at("current_main_uuid").get<utils::UUID>();
bool is_lock_opened = j.at("is_lock_opened").get<int>();
auto coord_instances = j.at("coord_instances").get<std::vector<CoordinatorInstanceState>>();
return CoordinatorClusterState{std::move(repl_instances), std::move(coord_instances), current_main_uuid,
is_lock_opened};
}
auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {

View File

@ -30,6 +30,9 @@
namespace memgraph::coordination {
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
using replication_coordination_glue::ReplicationRole;
struct ReplicationInstanceState {
@ -48,7 +51,7 @@ struct ReplicationInstanceState {
}
};
// NOTE: Currently instance of coordinator doesn't change from the registration. Hence, just wrap
// NOTE: Currently coordinator instance doesn't change from the registration. Hence, just wraps
// CoordinatorToCoordinatorConfig.
struct CoordinatorInstanceState {
CoordinatorToCoordinatorConfig config;
@ -64,14 +67,16 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig,
InstanceUUIDUpdate>;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state);
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state);
// Represents the state of the cluster from the coordinator's perspective.
// Source of truth since it is modified only as the result of RAFT's commiting
class CoordinatorClusterState {
public:
CoordinatorClusterState() = default;
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
std::vector<CoordinatorInstanceState> coordinators,
utils::UUID const &current_main_uuid, bool is_lock_opened);
CoordinatorClusterState(CoordinatorClusterState const &);
@ -89,7 +94,9 @@ class CoordinatorClusterState {
auto IsCurrentMain(std::string_view instance_name) const -> bool;
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
auto InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) -> void;
auto DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void;
auto Serialize(ptr<buffer> &data) -> void;
@ -105,12 +112,17 @@ class CoordinatorClusterState {
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
friend auto operator==(CoordinatorClusterState const &lhs, CoordinatorClusterState const &rhs) -> bool {
return lhs.repl_instances_ == rhs.repl_instances_ && lhs.coordinators_ == rhs.coordinators_ &&
lhs.current_main_uuid_ == rhs.current_main_uuid_;
}
private:
std::vector<CoordinatorInstanceState> coordinators_{};
std::map<std::string, ReplicationInstanceState, std::less<>> repl_instances_{};
std::vector<CoordinatorInstanceState> coordinators_{};
utils::UUID current_main_uuid_{};
mutable utils::ResourceLock log_lock_{};
bool is_lock_opened_{false};
mutable utils::ResourceLock log_lock_{};
};
} // namespace memgraph::coordination

View File

@ -11,10 +11,9 @@
#include "nuraft/coordinator_cluster_state.hpp"
#include "io/network/endpoint.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/file.hpp"
#include "utils/uuid.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
@ -23,15 +22,16 @@
#include "libnuraft/nuraft.hxx"
using memgraph::coordination::CoordinatorClusterState;
using memgraph::coordination::CoordinatorStateMachine;
using memgraph::coordination::CoordinatorInstanceState;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::RaftLogAction;
using memgraph::coordination::ReplicationInstanceState;
using memgraph::io::network::Endpoint;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::replication_coordination_glue::ReplicationRole;
using memgraph::utils::UUID;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
class CoordinatorClusterStateTest : public ::testing::Test {
@ -44,6 +44,161 @@ class CoordinatorClusterStateTest : public ::testing::Test {
"MG_tests_unit_coordinator_cluster_state"};
};
TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) {
CoordinatorClusterState cluster_state;
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
auto instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(instances.size(), 1);
ASSERT_EQ(instances[0].config, config);
ASSERT_EQ(instances[0].status, ReplicationRole::REPLICA);
ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 0);
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
}
TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) {
CoordinatorClusterState cluster_state;
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
cluster_state.DoAction("instance3", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE);
ASSERT_EQ(cluster_state.GetReplicationInstances().size(), 0);
}
TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) {
CoordinatorClusterState cluster_state;
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
auto const repl_instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(repl_instances.size(), 2);
ASSERT_EQ(repl_instances[0].status, ReplicationRole::REPLICA);
ASSERT_EQ(repl_instances[1].status, ReplicationRole::MAIN);
ASSERT_TRUE(cluster_state.MainExists());
ASSERT_TRUE(cluster_state.IsMain("instance3"));
ASSERT_FALSE(cluster_state.IsMain("instance2"));
ASSERT_TRUE(cluster_state.IsReplica("instance2"));
ASSERT_FALSE(cluster_state.IsReplica("instance3"));
}
TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) {
CoordinatorClusterState cluster_state;
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_REPLICA);
cluster_state.DoAction("instance2", RaftLogAction::SET_INSTANCE_AS_MAIN);
auto const repl_instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(repl_instances.size(), 2);
ASSERT_EQ(repl_instances[0].status, ReplicationRole::MAIN);
ASSERT_EQ(repl_instances[1].status, ReplicationRole::REPLICA);
ASSERT_TRUE(cluster_state.MainExists());
ASSERT_TRUE(cluster_state.IsMain("instance2"));
ASSERT_FALSE(cluster_state.IsMain("instance3"));
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
ASSERT_FALSE(cluster_state.IsReplica("instance2"));
}
TEST_F(CoordinatorClusterStateTest, UpdateUUID) {
CoordinatorClusterState cluster_state;
auto uuid = UUID();
cluster_state.DoAction(uuid, RaftLogAction::UPDATE_UUID);
ASSERT_EQ(cluster_state.GetUUID(), uuid);
}
TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) {
CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
CoordinatorClusterState cluster_state;
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
auto instances = cluster_state.GetCoordinatorInstances();
ASSERT_EQ(instances.size(), 1);
ASSERT_EQ(instances[0].config, config);
}
TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
ReplicationInstanceState instance_state{
CoordinatorToReplicaConfig{.instance_name = "instance3",
@ -65,122 +220,40 @@ TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
EXPECT_EQ(instance_state.status, deserialized_instance_state.status);
}
TEST_F(CoordinatorClusterStateTest, DoActionRegisterInstances) {
auto coordinator_cluster_state = memgraph::coordination::CoordinatorClusterState{};
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance1",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance1",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance2",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10002}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10113},
.bolt_server = Endpoint{"127.0.0.1", 7689},
.replication_client_info = {.instance_name = "instance3",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10003}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance4",
.mgt_server = Endpoint{"127.0.0.1", 10114},
.bolt_server = Endpoint{"127.0.0.1", 7690},
.replication_client_info = {.instance_name = "instance4",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10004}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance5",
.mgt_server = Endpoint{"127.0.0.1", 10115},
.bolt_server = Endpoint{"127.0.0.1", 7691},
.replication_client_info = {.instance_name = "instance5",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10005}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance6",
.mgt_server = Endpoint{"127.0.0.1", 10116},
.bolt_server = Endpoint{"127.0.0.1", 7692},
.replication_client_info = {.instance_name = "instance6",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10006}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
ptr<buffer> data;
coordinator_cluster_state.Serialize(data);
auto deserialized_coordinator_cluster_state = CoordinatorClusterState::Deserialize(*data);
ASSERT_EQ(coordinator_cluster_state.GetReplicationInstances(),
deserialized_coordinator_cluster_state.GetReplicationInstances());
TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
CoordinatorInstanceState instance_state{
CoordinatorToCoordinatorConfig{.coordinator_server_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}}};
nlohmann::json j = instance_state;
CoordinatorInstanceState deserialized_instance_state = j.get<CoordinatorInstanceState>();
ASSERT_EQ(instance_state, deserialized_instance_state);
}
TEST_F(CoordinatorClusterStateTest, Marshalling) {
CoordinatorClusterState cluster_state;
CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
auto config2 =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config2, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
ptr<buffer> data{};
cluster_state.Serialize(data);
auto deserialized_cluster_state = CoordinatorClusterState::Deserialize(*data);
ASSERT_EQ(cluster_state, deserialized_cluster_state);
}