Compare commits
10 Commits
master
...
improve-ha
Author | SHA1 | Date | |
---|---|---|---|
|
4159c9f6a5 | ||
|
b826a0a518 | ||
|
8688c7361a | ||
|
16b3df3b77 | ||
|
5d5c39f862 | ||
|
03f5cc2f33 | ||
|
ef85e8ae61 | ||
|
a999110737 | ||
|
fdd49b7983 | ||
|
3a0bd33a91 |
@ -2,7 +2,7 @@ add_library(mg-coordination STATIC)
|
||||
add_library(mg::coordination ALIAS mg-coordination)
|
||||
target_sources(mg-coordination
|
||||
PUBLIC
|
||||
include/coordination/coordinator_client.hpp
|
||||
include/coordination/replication_instance_client.hpp
|
||||
include/coordination/coordinator_state.hpp
|
||||
include/coordination/coordinator_rpc.hpp
|
||||
include/coordination/coordinator_server.hpp
|
||||
@ -12,7 +12,7 @@ target_sources(mg-coordination
|
||||
include/coordination/coordinator_instance.hpp
|
||||
include/coordination/coordinator_handlers.hpp
|
||||
include/coordination/instance_status.hpp
|
||||
include/coordination/replication_instance.hpp
|
||||
include/coordination/replication_instance_connector.hpp
|
||||
include/coordination/raft_state.hpp
|
||||
include/coordination/rpc_errors.hpp
|
||||
|
||||
@ -24,13 +24,13 @@ target_sources(mg-coordination
|
||||
|
||||
PRIVATE
|
||||
coordinator_communication_config.cpp
|
||||
coordinator_client.cpp
|
||||
replication_instance_client.cpp
|
||||
coordinator_state.cpp
|
||||
coordinator_rpc.cpp
|
||||
coordinator_server.cpp
|
||||
coordinator_handlers.cpp
|
||||
coordinator_instance.cpp
|
||||
replication_instance.cpp
|
||||
replication_instance_connector.cpp
|
||||
raft_state.cpp
|
||||
|
||||
coordinator_log_store.cpp
|
||||
|
@ -29,9 +29,29 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
|
||||
j.at("uuid").get_to(instance_state.instance_uuid);
|
||||
}
|
||||
|
||||
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) {
|
||||
j = nlohmann::json{{"config", instance_state.config}};
|
||||
}
|
||||
|
||||
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state) {
|
||||
j.at("config").get_to(instance_state.config);
|
||||
}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||
std::vector<CoordinatorInstanceState> coordinators,
|
||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
||||
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
|
||||
: repl_instances_{std::move(instances)},
|
||||
coordinators_{std::move(coordinators)},
|
||||
current_main_uuid_(current_main_uuid),
|
||||
is_lock_opened_(is_lock_opened) {}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) {
|
||||
auto c2c_config = CoordinatorToCoordinatorConfig{
|
||||
.coordinator_id = config.coordinator_id,
|
||||
.bolt_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.bolt_port)},
|
||||
.coordinator_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.coordinator_port)}};
|
||||
coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)});
|
||||
}
|
||||
|
||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
||||
: repl_instances_{other.repl_instances_},
|
||||
@ -88,7 +108,13 @@ auto CoordinatorClusterState::IsCurrentMain(std::string_view instance_name) cons
|
||||
it->second.instance_uuid == current_main_uuid_;
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
|
||||
auto CoordinatorClusterState::InsertInstance(std::string instance_name, ReplicationInstanceState instance_state)
|
||||
-> void {
|
||||
auto lock = std::lock_guard{log_lock_};
|
||||
repl_instances_.insert_or_assign(std::move(instance_name), std::move(instance_state));
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void {
|
||||
auto lock = std::lock_guard{log_lock_};
|
||||
switch (log_action) {
|
||||
// end of OPEN_LOCK_REGISTER_REPLICATION_INSTANCE
|
||||
@ -148,7 +174,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
|
||||
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
|
||||
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
|
||||
coordinators_.emplace_back(CoordinatorInstanceState{config});
|
||||
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
|
||||
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_id);
|
||||
break;
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||
@ -187,9 +213,11 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
|
||||
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
||||
auto lock = std::shared_lock{log_lock_};
|
||||
nlohmann::json j = {{"repl_instances", repl_instances_},
|
||||
{"coord_instances", coordinators_},
|
||||
{"is_lock_opened", is_lock_opened_},
|
||||
{"current_main_uuid", current_main_uuid_}};
|
||||
auto const log = j.dump();
|
||||
|
||||
data = buffer::alloc(sizeof(uint32_t) + log.size());
|
||||
buffer_serializer bs(data);
|
||||
bs.put_str(log);
|
||||
@ -198,10 +226,14 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
||||
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
|
||||
buffer_serializer bs(data);
|
||||
auto const j = nlohmann::json::parse(bs.get_str());
|
||||
auto instances = j["repl_instances"].get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
|
||||
auto current_main_uuid = j["current_main_uuid"].get<utils::UUID>();
|
||||
bool is_lock_opened = j["is_lock_opened"].get<int>();
|
||||
return CoordinatorClusterState{std::move(instances), current_main_uuid, is_lock_opened};
|
||||
|
||||
auto repl_instances = j.at("repl_instances").get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
|
||||
auto current_main_uuid = j.at("current_main_uuid").get<utils::UUID>();
|
||||
bool is_lock_opened = j.at("is_lock_opened").get<int>();
|
||||
auto coord_instances = j.at("coord_instances").get<std::vector<CoordinatorInstanceState>>();
|
||||
|
||||
return CoordinatorClusterState{std::move(repl_instances), std::move(coord_instances), current_main_uuid,
|
||||
is_lock_opened};
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {
|
||||
|
@ -16,13 +16,13 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
void to_json(nlohmann::json &j, CoordinatorToCoordinatorConfig const &config) {
|
||||
j = nlohmann::json{{"coordinator_server_id", config.coordinator_server_id},
|
||||
j = nlohmann::json{{"coordinator_id", config.coordinator_id},
|
||||
{"coordinator_server", config.coordinator_server},
|
||||
{"bolt_server", config.bolt_server}};
|
||||
}
|
||||
|
||||
void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config) {
|
||||
config.coordinator_server_id = j.at("coordinator_server_id").get<uint32_t>();
|
||||
config.coordinator_id = j.at("coordinator_id").get<uint32_t>();
|
||||
config.coordinator_server = j.at("coordinator_server").get<io::network::Endpoint>();
|
||||
config.bolt_server = j.at("bolt_server").get<io::network::Endpoint>();
|
||||
}
|
||||
|
@ -27,11 +27,11 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using nuraft::ptr;
|
||||
using nuraft::srv_config;
|
||||
|
||||
CoordinatorInstance::CoordinatorInstance()
|
||||
CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config)
|
||||
: thread_pool_{1},
|
||||
raft_state_(RaftState::MakeRaftState(
|
||||
config,
|
||||
[this]() {
|
||||
spdlog::info("Leader changed, starting all replication instances!");
|
||||
auto const instances = raft_state_.GetReplicationInstances();
|
||||
@ -41,8 +41,9 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
|
||||
std::ranges::for_each(replicas, [this](auto &replica) {
|
||||
spdlog::info("Started pinging replication instance {}", replica.config.instance_name);
|
||||
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
|
||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
auto client =
|
||||
std::make_unique<ReplicationInstanceClient>(this, replica.config, client_succ_cb_, client_fail_cb_);
|
||||
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback);
|
||||
});
|
||||
|
||||
@ -51,8 +52,9 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
|
||||
std::ranges::for_each(main, [this](auto &main_instance) {
|
||||
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
|
||||
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
|
||||
&CoordinatorInstance::MainSuccessCallback,
|
||||
auto client = std::make_unique<ReplicationInstanceClient>(this, main_instance.config, client_succ_cb_,
|
||||
client_fail_cb_);
|
||||
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::MainSuccessCallback,
|
||||
&CoordinatorInstance::MainFailCallback);
|
||||
});
|
||||
|
||||
@ -89,9 +91,10 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
};
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
|
||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
||||
-> ReplicationInstanceConnector & {
|
||||
auto repl_instance =
|
||||
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
|
||||
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == replication_instance_name;
|
||||
});
|
||||
|
||||
@ -101,27 +104,27 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
|
||||
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
|
||||
.raft_socket_address = instance->get_endpoint(),
|
||||
auto const coord_instance_to_status = [](CoordinatorInstanceState const &instance) -> InstanceStatus {
|
||||
return {.instance_name = fmt::format("coordinator_{}", instance.config.coordinator_id),
|
||||
.raft_socket_address = instance.config.coordinator_server.SocketAddress(),
|
||||
.cluster_role = "coordinator",
|
||||
.health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move
|
||||
.health = "unknown"};
|
||||
};
|
||||
auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status);
|
||||
auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status);
|
||||
|
||||
if (raft_state_.IsLeader()) {
|
||||
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
||||
auto const stringify_repl_role = [this](ReplicationInstanceConnector const &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main";
|
||||
return "replica";
|
||||
};
|
||||
|
||||
auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string {
|
||||
auto const stringify_repl_health = [](ReplicationInstanceConnector const &instance) -> std::string {
|
||||
return instance.IsAlive() ? "up" : "down";
|
||||
};
|
||||
|
||||
auto process_repl_instance_as_leader =
|
||||
[&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus {
|
||||
[&stringify_repl_role, &stringify_repl_health](ReplicationInstanceConnector const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.InstanceName(),
|
||||
.coord_socket_address = instance.CoordinatorSocketAddress(),
|
||||
.cluster_role = stringify_repl_role(instance),
|
||||
@ -160,19 +163,26 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::TryFailover() -> void {
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||
auto const is_replica = [this](ReplicationInstanceConnector const &instance) {
|
||||
return HasReplicaState(instance.InstanceName());
|
||||
};
|
||||
|
||||
auto alive_replicas =
|
||||
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
|
||||
auto alive_replicas = repl_instances_ | ranges::views::filter(is_replica) |
|
||||
ranges::views::filter(&ReplicationInstanceConnector::IsAlive);
|
||||
|
||||
if (ranges::empty(alive_replicas)) {
|
||||
spdlog::warn("Failover failed since all replicas are down!");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
spdlog::error("Failover failed since the instance is not the leader!");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const get_ts = [](ReplicationInstanceConnector &replica) {
|
||||
return replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||
};
|
||||
|
||||
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||
|
||||
@ -206,13 +216,13 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
|
||||
auto const is_not_new_main = [&new_main](ReplicationInstanceConnector &instance) {
|
||||
return instance.InstanceName() != new_main->InstanceName();
|
||||
};
|
||||
|
||||
auto const new_main_uuid = utils::UUID{};
|
||||
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
|
||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||
};
|
||||
@ -225,7 +235,7 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
}
|
||||
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
@ -268,7 +278,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const is_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
};
|
||||
|
||||
@ -287,13 +297,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const is_not_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() != instance_name;
|
||||
};
|
||||
|
||||
auto const new_main_uuid = utils::UUID{};
|
||||
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
|
||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||
};
|
||||
@ -304,7 +314,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
}
|
||||
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
@ -334,19 +344,21 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
||||
return RegisterInstanceCoordinatorStatus::LOCK_OPENED;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
})) {
|
||||
// TODO: (andi) Change that this is being asked from raft state
|
||||
if (std::ranges::any_of(repl_instances_,
|
||||
[instance_name = config.instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
|
||||
return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress();
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
|
||||
return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress();
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
|
||||
@ -361,8 +373,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
||||
return RegisterInstanceCoordinatorStatus::OPEN_LOCK;
|
||||
}
|
||||
|
||||
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
|
||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
auto client = std::make_unique<ReplicationInstanceClient>(this, config, client_succ_cb_, client_fail_cb_);
|
||||
|
||||
auto *new_instance = &repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback);
|
||||
|
||||
if (!new_instance->SendDemoteToReplicaRpc()) {
|
||||
@ -392,7 +405,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const name_matches = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
};
|
||||
|
||||
@ -401,7 +414,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||
}
|
||||
|
||||
auto const is_current_main = [this](ReplicationInstance const &instance) {
|
||||
auto const is_current_main = [this](ReplicationInstanceConnector const &instance) {
|
||||
return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive();
|
||||
};
|
||||
|
||||
@ -434,6 +447,8 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||
spdlog::trace("Adding coordinator instance {} start in CoordinatorInstance for {}", config.coordinator_id,
|
||||
raft_state_.InstanceName());
|
||||
raft_state_.AddCoordinatorInstance(config);
|
||||
// NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log.
|
||||
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
|
||||
@ -625,56 +640,7 @@ auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const
|
||||
return raft_state_.HasReplicaState(instance_name);
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
|
||||
auto res = RoutingTable{};
|
||||
|
||||
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
|
||||
return instance.config.BoltSocketAddress();
|
||||
};
|
||||
|
||||
// TODO: (andi) This is wrong check, Fico will correct in #1819.
|
||||
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::MAIN;
|
||||
};
|
||||
|
||||
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
auto const &raft_log_repl_instances = raft_state_.GetReplicationInstances();
|
||||
|
||||
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
|
||||
|
||||
if (!std::ranges::empty(bolt_mains)) {
|
||||
res.emplace_back(std::move(bolt_mains), "WRITE");
|
||||
}
|
||||
|
||||
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
if (!std::ranges::empty(bolt_replicas)) {
|
||||
res.emplace_back(std::move(bolt_replicas), "READ");
|
||||
}
|
||||
|
||||
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
|
||||
return instance.config.bolt_server.SocketAddress();
|
||||
};
|
||||
|
||||
auto const &raft_log_coord_instances = raft_state_.GetCoordinatorInstances();
|
||||
auto bolt_coords =
|
||||
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
|
||||
|
||||
auto const &local_bolt_coord = routing.find("address");
|
||||
if (local_bolt_coord == routing.end()) {
|
||||
throw InvalidRoutingTableException("No bolt address found in routing table for the current coordinator!");
|
||||
}
|
||||
|
||||
bolt_coords.push_back(local_bolt_coord->second);
|
||||
res.emplace_back(std::move(bolt_coords), "ROUTE");
|
||||
|
||||
return res;
|
||||
}
|
||||
auto CoordinatorInstance::GetRoutingTable() const -> RoutingTable { return raft_state_.GetRoutingTable(); }
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -24,21 +24,16 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
CoordinatorState::CoordinatorState() {
|
||||
MG_ASSERT(!(FLAGS_coordinator_id && FLAGS_management_port),
|
||||
"Instance cannot be a coordinator and have registered coordinator server.");
|
||||
CoordinatorState::CoordinatorState(CoordinatorInstanceInitConfig const &config) {
|
||||
data_.emplace<CoordinatorInstance>(config);
|
||||
}
|
||||
|
||||
spdlog::info("Executing coordinator constructor");
|
||||
if (FLAGS_management_port) {
|
||||
spdlog::info("Coordinator server port set");
|
||||
auto const config = ManagementServerConfig{
|
||||
.ip_address = kDefaultReplicationServerIp,
|
||||
.port = static_cast<uint16_t>(FLAGS_management_port),
|
||||
};
|
||||
spdlog::info("Executing coordinator constructor main replica");
|
||||
|
||||
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
|
||||
}
|
||||
CoordinatorState::CoordinatorState(ReplicationInstanceInitConfig const &config) {
|
||||
auto const mgmt_config = ManagementServerConfig{
|
||||
.ip_address = kDefaultReplicationServerIp,
|
||||
.port = static_cast<uint16_t>(config.management_port),
|
||||
};
|
||||
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(mgmt_config)};
|
||||
}
|
||||
|
||||
auto CoordinatorState::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
||||
@ -104,10 +99,10 @@ auto CoordinatorState::AddCoordinatorInstance(coordination::CoordinatorToCoordin
|
||||
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(config);
|
||||
}
|
||||
|
||||
auto CoordinatorState::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
|
||||
auto CoordinatorState::GetRoutingTable() -> RoutingTable {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
|
||||
"Coordinator cannot get routing table since variant holds wrong alternative");
|
||||
return std::get<CoordinatorInstance>(data_).GetRoutingTable(routing);
|
||||
return std::get<CoordinatorInstance>(data_).GetRoutingTable();
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -20,6 +20,9 @@ constexpr int MAX_SNAPSHOTS = 3;
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
CoordinatorStateMachine::CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config)
|
||||
: cluster_state_(config) {}
|
||||
|
||||
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
|
||||
|
||||
auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool {
|
||||
|
@ -30,6 +30,16 @@ namespace memgraph::coordination {
|
||||
|
||||
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
|
||||
|
||||
struct ReplicationInstanceInitConfig {
|
||||
int management_port{0};
|
||||
};
|
||||
|
||||
struct CoordinatorInstanceInitConfig {
|
||||
uint32_t coordinator_id{0};
|
||||
int coordinator_port{0};
|
||||
int bolt_port{0};
|
||||
};
|
||||
|
||||
struct ReplicationClientInfo {
|
||||
std::string instance_name{};
|
||||
replication_coordination_glue::ReplicationMode replication_mode{};
|
||||
@ -66,7 +76,7 @@ struct CoordinatorToReplicaConfig {
|
||||
};
|
||||
|
||||
struct CoordinatorToCoordinatorConfig {
|
||||
uint32_t coordinator_server_id{0};
|
||||
uint32_t coordinator_id{0};
|
||||
io::network::Endpoint bolt_server;
|
||||
io::network::Endpoint coordinator_server;
|
||||
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "coordination/instance_status.hpp"
|
||||
#include "coordination/raft_state.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
#include "coordination/replication_instance.hpp"
|
||||
#include "coordination/replication_instance_connector.hpp"
|
||||
#include "utils/resource_lock.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
#include "utils/thread_pool.hpp"
|
||||
@ -26,8 +26,6 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
|
||||
|
||||
struct NewMainRes {
|
||||
std::string most_up_to_date_instance;
|
||||
std::string latest_epoch;
|
||||
@ -37,7 +35,7 @@ using InstanceNameDbHistories = std::pair<std::string, replication_coordination_
|
||||
|
||||
class CoordinatorInstance {
|
||||
public:
|
||||
CoordinatorInstance();
|
||||
explicit CoordinatorInstance(CoordinatorInstanceInitConfig const &config);
|
||||
CoordinatorInstance(CoordinatorInstance const &) = delete;
|
||||
CoordinatorInstance &operator=(CoordinatorInstance const &) = delete;
|
||||
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
||||
@ -56,9 +54,9 @@ class CoordinatorInstance {
|
||||
|
||||
auto TryFailover() -> void;
|
||||
|
||||
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
|
||||
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable;
|
||||
auto GetRoutingTable() const -> RoutingTable;
|
||||
|
||||
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
|
||||
|
||||
@ -67,7 +65,7 @@ class CoordinatorInstance {
|
||||
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||
|
||||
private:
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstanceConnector &;
|
||||
|
||||
void MainFailCallback(std::string_view);
|
||||
|
||||
@ -78,8 +76,10 @@ class CoordinatorInstance {
|
||||
void ReplicaFailCallback(std::string_view);
|
||||
|
||||
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
||||
|
||||
// NOTE: Must be std::list because we rely on pointer stability.
|
||||
std::list<ReplicationInstance> repl_instances_;
|
||||
// TODO: (andi) Rename + virtualize for mocking.
|
||||
std::list<ReplicationInstanceConnector> repl_instances_;
|
||||
mutable utils::ResourceLock coord_instance_lock_{};
|
||||
|
||||
// Thread pool needs to be constructed before raft state as raft state can call thread pool
|
||||
|
@ -24,7 +24,8 @@ namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorState {
|
||||
public:
|
||||
CoordinatorState();
|
||||
explicit CoordinatorState(CoordinatorInstanceInitConfig const &config);
|
||||
explicit CoordinatorState(ReplicationInstanceInitConfig const &config);
|
||||
~CoordinatorState() = default;
|
||||
|
||||
CoordinatorState(CoordinatorState const &) = delete;
|
||||
@ -47,14 +48,14 @@ class CoordinatorState {
|
||||
// NOTE: The client code must check that the server exists before calling this method.
|
||||
auto GetCoordinatorServer() const -> CoordinatorServer &;
|
||||
|
||||
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable;
|
||||
auto GetRoutingTable() -> RoutingTable;
|
||||
|
||||
private:
|
||||
struct CoordinatorMainReplicaData {
|
||||
std::unique_ptr<CoordinatorServer> coordinator_server_;
|
||||
};
|
||||
|
||||
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
|
||||
std::variant<CoordinatorMainReplicaData, CoordinatorInstance> data_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -27,6 +27,7 @@ struct CoordinatorToReplicaConfig;
|
||||
|
||||
using BecomeLeaderCb = std::function<void()>;
|
||||
using BecomeFollowerCb = std::function<void()>;
|
||||
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
|
||||
|
||||
using nuraft::buffer;
|
||||
using nuraft::logger;
|
||||
@ -40,8 +41,8 @@ using raft_result = nuraft::cmd_result<ptr<buffer>>;
|
||||
|
||||
class RaftState {
|
||||
private:
|
||||
explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
|
||||
uint32_t raft_port, std::string raft_address);
|
||||
explicit RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
|
||||
BecomeFollowerCb become_follower_cb);
|
||||
|
||||
auto InitRaftServer() -> void;
|
||||
|
||||
@ -53,13 +54,13 @@ class RaftState {
|
||||
RaftState &operator=(RaftState &&other) noexcept = default;
|
||||
~RaftState();
|
||||
|
||||
static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState;
|
||||
static auto MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
|
||||
BecomeFollowerCb &&become_follower_cb) -> RaftState;
|
||||
|
||||
auto InstanceName() const -> std::string;
|
||||
auto RaftSocketAddress() const -> std::string;
|
||||
|
||||
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
|
||||
|
||||
auto RequestLeadership() -> bool;
|
||||
auto IsLeader() const -> bool;
|
||||
@ -68,6 +69,7 @@ class RaftState {
|
||||
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
|
||||
auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
|
||||
|
||||
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
|
||||
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool;
|
||||
@ -76,9 +78,9 @@ class RaftState {
|
||||
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
|
||||
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
|
||||
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
||||
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
|
||||
|
||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||
// TODO: (andi) Do we need then GetAllCoordinators?
|
||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||
|
||||
auto MainExists() const -> bool;
|
||||
@ -90,17 +92,17 @@ class RaftState {
|
||||
auto GetInstanceUUID(std::string_view) const -> utils::UUID;
|
||||
|
||||
auto IsLockOpened() const -> bool;
|
||||
auto GetRoutingTable() const -> RoutingTable;
|
||||
|
||||
private:
|
||||
// TODO: (andi) I think variables below can be abstracted/clean them.
|
||||
io::network::Endpoint raft_endpoint_;
|
||||
uint32_t coordinator_id_;
|
||||
|
||||
ptr<CoordinatorStateMachine> state_machine_;
|
||||
ptr<CoordinatorStateManager> state_manager_;
|
||||
ptr<raft_server> raft_server_;
|
||||
ptr<logger> logger_;
|
||||
raft_launcher launcher_;
|
||||
ptr<raft_server> raft_server_;
|
||||
|
||||
BecomeLeaderCb become_leader_cb_;
|
||||
BecomeFollowerCb become_follower_cb_;
|
||||
|
@ -27,18 +27,19 @@ class CoordinatorInstance;
|
||||
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
|
||||
using ReplicationClientsInfo = std::vector<ReplicationClientInfo>;
|
||||
|
||||
class CoordinatorClient {
|
||||
class ReplicationInstanceClient {
|
||||
public:
|
||||
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
|
||||
explicit ReplicationInstanceClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb = nullptr,
|
||||
HealthCheckClientCallback fail_cb = nullptr);
|
||||
|
||||
~CoordinatorClient() = default;
|
||||
virtual ~ReplicationInstanceClient() = default;
|
||||
|
||||
CoordinatorClient(CoordinatorClient &) = delete;
|
||||
CoordinatorClient &operator=(CoordinatorClient const &) = delete;
|
||||
ReplicationInstanceClient(ReplicationInstanceClient &) = delete;
|
||||
ReplicationInstanceClient &operator=(ReplicationInstanceClient const &) = delete;
|
||||
|
||||
CoordinatorClient(CoordinatorClient &&) noexcept = delete;
|
||||
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
|
||||
ReplicationInstanceClient(ReplicationInstanceClient &&) noexcept = delete;
|
||||
ReplicationInstanceClient &operator=(ReplicationInstanceClient &&) noexcept = delete;
|
||||
|
||||
void StartFrequentCheck();
|
||||
void StopFrequentCheck();
|
||||
@ -49,7 +50,7 @@ class CoordinatorClient {
|
||||
auto CoordinatorSocketAddress() const -> std::string;
|
||||
auto ReplicationSocketAddress() const -> std::string;
|
||||
|
||||
[[nodiscard]] auto DemoteToReplica() const -> bool;
|
||||
virtual auto DemoteToReplica() const -> bool;
|
||||
|
||||
auto SendPromoteReplicaToMainRpc(utils::UUID const &uuid, ReplicationClientsInfo replication_clients_info) const
|
||||
-> bool;
|
||||
@ -73,7 +74,7 @@ class CoordinatorClient {
|
||||
|
||||
auto InstanceGetUUIDFrequencySec() const -> std::chrono::seconds;
|
||||
|
||||
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
||||
friend bool operator==(ReplicationInstanceClient const &first, ReplicationInstanceClient const &second) {
|
||||
return first.config_ == second.config_;
|
||||
}
|
||||
|
@ -13,8 +13,8 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/replication_instance_client.hpp"
|
||||
#include "replication_coordination_glue/role.hpp"
|
||||
|
||||
#include "utils/resource_lock.hpp"
|
||||
@ -25,31 +25,27 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorInstance;
|
||||
class ReplicationInstance;
|
||||
|
||||
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
|
||||
|
||||
class ReplicationInstance {
|
||||
class ReplicationInstanceConnector {
|
||||
public:
|
||||
ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb);
|
||||
explicit ReplicationInstanceConnector(std::unique_ptr<ReplicationInstanceClient> client,
|
||||
HealthCheckInstanceCallback succ_instance_cb = nullptr,
|
||||
HealthCheckInstanceCallback fail_instance_cb = nullptr);
|
||||
|
||||
ReplicationInstance(ReplicationInstance const &other) = delete;
|
||||
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
|
||||
ReplicationInstance(ReplicationInstance &&other) noexcept = delete;
|
||||
ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete;
|
||||
~ReplicationInstance() = default;
|
||||
ReplicationInstanceConnector(ReplicationInstanceConnector const &other) = delete;
|
||||
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector const &other) = delete;
|
||||
ReplicationInstanceConnector(ReplicationInstanceConnector &&other) noexcept = delete;
|
||||
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector &&other) noexcept = delete;
|
||||
~ReplicationInstanceConnector() = default;
|
||||
|
||||
auto OnSuccessPing() -> void;
|
||||
auto OnFailPing() -> bool;
|
||||
auto IsReadyForUUIDPing() -> bool;
|
||||
|
||||
void UpdateReplicaLastResponseUUID();
|
||||
|
||||
auto IsAlive() const -> bool;
|
||||
|
||||
// TODO: (andi) Fetch from ClusterState
|
||||
auto InstanceName() const -> std::string;
|
||||
auto CoordinatorSocketAddress() const -> std::string;
|
||||
auto ReplicationSocketAddress() const -> std::string;
|
||||
@ -75,15 +71,16 @@ class ReplicationInstance {
|
||||
auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool;
|
||||
|
||||
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
|
||||
auto GetClient() -> CoordinatorClient &;
|
||||
auto GetClient() -> ReplicationInstanceClient &;
|
||||
|
||||
auto EnableWritingOnMain() -> bool;
|
||||
|
||||
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
|
||||
auto GetFailCallback() -> HealthCheckInstanceCallback &;
|
||||
|
||||
private:
|
||||
CoordinatorClient client_;
|
||||
protected:
|
||||
auto UpdateReplicaLastResponseUUID() -> void;
|
||||
std::unique_ptr<ReplicationInstanceClient> client_;
|
||||
std::chrono::system_clock::time_point last_response_time_{};
|
||||
bool is_alive_{false};
|
||||
std::chrono::system_clock::time_point last_check_of_uuid_{};
|
||||
@ -91,7 +88,7 @@ class ReplicationInstance {
|
||||
HealthCheckInstanceCallback succ_cb_;
|
||||
HealthCheckInstanceCallback fail_cb_;
|
||||
|
||||
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
|
||||
friend bool operator==(ReplicationInstanceConnector const &first, ReplicationInstanceConnector const &second) {
|
||||
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
|
||||
first.is_alive_ == second.is_alive_;
|
||||
}
|
@ -30,6 +30,9 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using nuraft::ptr;
|
||||
using replication_coordination_glue::ReplicationRole;
|
||||
|
||||
struct ReplicationInstanceState {
|
||||
@ -48,7 +51,7 @@ struct ReplicationInstanceState {
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: Currently instance of coordinator doesn't change from the registration. Hence, just wrap
|
||||
// NOTE: Currently coordinator instance doesn't change from the registration. Hence, just wraps
|
||||
// CoordinatorToCoordinatorConfig.
|
||||
struct CoordinatorInstanceState {
|
||||
CoordinatorToCoordinatorConfig config;
|
||||
@ -64,14 +67,16 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
|
||||
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig,
|
||||
InstanceUUIDUpdate>;
|
||||
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using nuraft::ptr;
|
||||
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state);
|
||||
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state);
|
||||
|
||||
// Represents the state of the cluster from the coordinator's perspective.
|
||||
// Source of truth since it is modified only as the result of RAFT's commiting
|
||||
class CoordinatorClusterState {
|
||||
public:
|
||||
CoordinatorClusterState() = default;
|
||||
explicit CoordinatorClusterState(CoordinatorInstanceInitConfig const &config);
|
||||
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||
std::vector<CoordinatorInstanceState> coordinators,
|
||||
utils::UUID const ¤t_main_uuid, bool is_lock_opened);
|
||||
|
||||
CoordinatorClusterState(CoordinatorClusterState const &);
|
||||
@ -89,7 +94,9 @@ class CoordinatorClusterState {
|
||||
|
||||
auto IsCurrentMain(std::string_view instance_name) const -> bool;
|
||||
|
||||
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
|
||||
auto InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) -> void;
|
||||
|
||||
auto DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void;
|
||||
|
||||
auto Serialize(ptr<buffer> &data) -> void;
|
||||
|
||||
@ -105,12 +112,17 @@ class CoordinatorClusterState {
|
||||
|
||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||
|
||||
friend auto operator==(CoordinatorClusterState const &lhs, CoordinatorClusterState const &rhs) -> bool {
|
||||
return lhs.repl_instances_ == rhs.repl_instances_ && lhs.coordinators_ == rhs.coordinators_ &&
|
||||
lhs.current_main_uuid_ == rhs.current_main_uuid_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<CoordinatorInstanceState> coordinators_{};
|
||||
std::map<std::string, ReplicationInstanceState, std::less<>> repl_instances_{};
|
||||
std::vector<CoordinatorInstanceState> coordinators_{};
|
||||
utils::UUID current_main_uuid_{};
|
||||
mutable utils::ResourceLock log_lock_{};
|
||||
bool is_lock_opened_{false};
|
||||
mutable utils::ResourceLock log_lock_{};
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -35,7 +35,7 @@ using nuraft::state_machine;
|
||||
|
||||
class CoordinatorStateMachine : public state_machine {
|
||||
public:
|
||||
CoordinatorStateMachine() = default;
|
||||
explicit CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config);
|
||||
CoordinatorStateMachine(CoordinatorStateMachine const &) = delete;
|
||||
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
|
||||
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
|
||||
|
@ -10,12 +10,16 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include <chrono>
|
||||
|
||||
#include "coordination/raft_state.hpp"
|
||||
|
||||
#include "coordination/coordinator_communication_config.hpp"
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/raft_state.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <chrono>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
@ -30,11 +34,11 @@ using nuraft::raft_server;
|
||||
using nuraft::srv_config;
|
||||
using raft_result = cmd_result<ptr<buffer>>;
|
||||
|
||||
RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
|
||||
uint32_t raft_port, std::string raft_address)
|
||||
: raft_endpoint_(raft_address, raft_port),
|
||||
coordinator_id_(coordinator_id),
|
||||
state_machine_(cs_new<CoordinatorStateMachine>()),
|
||||
RaftState::RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
|
||||
BecomeFollowerCb become_follower_cb)
|
||||
: raft_endpoint_("127.0.0.1", config.coordinator_port),
|
||||
coordinator_id_(config.coordinator_id),
|
||||
state_machine_(cs_new<CoordinatorStateMachine>(config)),
|
||||
state_manager_(cs_new<CoordinatorStateManager>(coordinator_id_, raft_endpoint_.SocketAddress())),
|
||||
logger_(nullptr),
|
||||
become_leader_cb_(std::move(become_leader_cb)),
|
||||
@ -97,28 +101,36 @@ auto RaftState::InitRaftServer() -> void {
|
||||
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint_.SocketAddress());
|
||||
}
|
||||
|
||||
auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState {
|
||||
uint32_t coordinator_id = FLAGS_coordinator_id;
|
||||
uint32_t raft_port = FLAGS_coordinator_port;
|
||||
|
||||
auto raft_state =
|
||||
RaftState(std::move(become_leader_cb), std::move(become_follower_cb), coordinator_id, raft_port, "127.0.0.1");
|
||||
auto RaftState::MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
|
||||
BecomeFollowerCb &&become_follower_cb) -> RaftState {
|
||||
auto raft_state = RaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
raft_state.InitRaftServer();
|
||||
return raft_state;
|
||||
}
|
||||
|
||||
RaftState::~RaftState() { launcher_.shutdown(); }
|
||||
RaftState::~RaftState() {
|
||||
spdlog::trace("Shutting down RaftState for coordinator_{}", coordinator_id_);
|
||||
state_machine_.reset();
|
||||
state_manager_.reset();
|
||||
logger_.reset();
|
||||
|
||||
auto RaftState::InstanceName() const -> std::string {
|
||||
return fmt::format("coordinator_{}", std::to_string(coordinator_id_));
|
||||
if (!raft_server_) {
|
||||
return;
|
||||
}
|
||||
raft_server_->shutdown();
|
||||
raft_server_.reset();
|
||||
}
|
||||
|
||||
auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); }
|
||||
|
||||
auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); }
|
||||
|
||||
auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||
spdlog::trace("Adding coordinator instance {} start in RaftState for coordinator_{}", config.coordinator_id,
|
||||
coordinator_id_);
|
||||
auto const endpoint = config.coordinator_server.SocketAddress();
|
||||
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_server_id), endpoint);
|
||||
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_id), endpoint);
|
||||
|
||||
auto cmd_result = raft_server_->add_srv(srv_config_to_add);
|
||||
|
||||
@ -136,9 +148,9 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
|
||||
bool added{false};
|
||||
while (!maybe_stop()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(waiting_period));
|
||||
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_server_id));
|
||||
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_id));
|
||||
if (server_config) {
|
||||
spdlog::trace("Server with id {} added to cluster", config.coordinator_server_id);
|
||||
spdlog::trace("Server with id {} added to cluster", config.coordinator_id);
|
||||
added = true;
|
||||
break;
|
||||
}
|
||||
@ -150,12 +162,6 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
|
||||
}
|
||||
}
|
||||
|
||||
auto RaftState::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
|
||||
std::vector<ptr<srv_config>> all_srv_configs;
|
||||
raft_server_->get_srv_config_all(all_srv_configs);
|
||||
return all_srv_configs;
|
||||
}
|
||||
|
||||
auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
|
||||
|
||||
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
|
||||
@ -363,14 +369,14 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
|
||||
spdlog::error(
|
||||
"Failed to accept request for adding coordinator instance {}. Most likely the reason is that the instance is "
|
||||
"not the leader.",
|
||||
config.coordinator_server_id);
|
||||
config.coordinator_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_server_id);
|
||||
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_id);
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_server_id,
|
||||
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_id,
|
||||
static_cast<int>(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
@ -426,5 +432,50 @@ auto RaftState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstan
|
||||
return state_machine_->GetCoordinatorInstances();
|
||||
}
|
||||
|
||||
auto RaftState::GetRoutingTable() const -> RoutingTable {
|
||||
auto res = RoutingTable{};
|
||||
|
||||
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
|
||||
return instance.config.BoltSocketAddress();
|
||||
};
|
||||
|
||||
// TODO: (andi) This is wrong check, Fico will correct in #1819.
|
||||
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::MAIN;
|
||||
};
|
||||
|
||||
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
auto const &raft_log_repl_instances = GetReplicationInstances();
|
||||
|
||||
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
|
||||
|
||||
if (!std::ranges::empty(bolt_mains)) {
|
||||
res.emplace_back(std::move(bolt_mains), "WRITE");
|
||||
}
|
||||
|
||||
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
if (!std::ranges::empty(bolt_replicas)) {
|
||||
res.emplace_back(std::move(bolt_replicas), "READ");
|
||||
}
|
||||
|
||||
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
|
||||
return instance.config.bolt_server.SocketAddress();
|
||||
};
|
||||
|
||||
auto const &raft_log_coord_instances = GetCoordinatorInstances();
|
||||
auto bolt_coords =
|
||||
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
|
||||
|
||||
res.emplace_back(std::move(bolt_coords), "ROUTE");
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -1,132 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/replication_instance.hpp"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include "replication_coordination_glue/handler.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb)
|
||||
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
|
||||
succ_cb_(succ_instance_cb),
|
||||
fail_cb_(fail_instance_cb) {}
|
||||
|
||||
auto ReplicationInstance::OnSuccessPing() -> void {
|
||||
last_response_time_ = std::chrono::system_clock::now();
|
||||
is_alive_ = true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::OnFailPing() -> bool {
|
||||
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
|
||||
is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec();
|
||||
return is_alive_;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_) >
|
||||
client_.InstanceGetUUIDFrequencySec();
|
||||
}
|
||||
|
||||
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
|
||||
auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); }
|
||||
auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); }
|
||||
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
|
||||
|
||||
auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb,
|
||||
HealthCheckInstanceCallback main_fail_cb) -> bool {
|
||||
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
succ_cb_ = main_succ_cb;
|
||||
fail_cb_ = main_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
|
||||
|
||||
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
|
||||
HealthCheckInstanceCallback replica_fail_cb) -> bool {
|
||||
if (!client_.DemoteToReplica()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
succ_cb_ = replica_succ_cb;
|
||||
fail_cb_ = replica_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
|
||||
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
|
||||
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
|
||||
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
|
||||
|
||||
auto ReplicationInstance::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
return client_.ReplicationClientInfo();
|
||||
}
|
||||
|
||||
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
|
||||
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
|
||||
|
||||
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
|
||||
|
||||
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
if (!IsReadyForUUIDPing()) {
|
||||
return true;
|
||||
}
|
||||
auto res = SendGetInstanceUUID();
|
||||
if (res.HasError()) {
|
||||
return false;
|
||||
}
|
||||
UpdateReplicaLastResponseUUID();
|
||||
|
||||
// NOLINTNEXTLINE
|
||||
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return SendSwapAndUpdateUUID(curr_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
||||
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
||||
return client_.SendUnregisterReplicaRpc(instance_name);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); }
|
||||
|
||||
auto ReplicationInstance::SendGetInstanceUUID()
|
||||
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
return client_.SendGetInstanceUUIDRpc();
|
||||
}
|
||||
|
||||
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
@ -12,7 +12,7 @@
|
||||
#include "utils/uuid.hpp"
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/replication_instance_client.hpp"
|
||||
|
||||
#include "coordination/coordinator_communication_config.hpp"
|
||||
#include "coordination/coordinator_rpc.hpp"
|
||||
@ -30,8 +30,10 @@ auto CreateClientContext(memgraph::coordination::CoordinatorToReplicaConfig cons
|
||||
}
|
||||
} // namespace
|
||||
|
||||
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
|
||||
ReplicationInstanceClient::ReplicationInstanceClient(CoordinatorInstance *coord_instance,
|
||||
CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb)
|
||||
: rpc_context_{CreateClientContext(config)},
|
||||
rpc_client_{config.mgt_server, &rpc_context_},
|
||||
config_{std::move(config)},
|
||||
@ -39,20 +41,24 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
|
||||
succ_cb_{std::move(succ_cb)},
|
||||
fail_cb_{std::move(fail_cb)} {}
|
||||
|
||||
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
|
||||
auto ReplicationInstanceClient::InstanceName() const -> std::string { return config_.instance_name; }
|
||||
|
||||
auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); }
|
||||
auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); }
|
||||
auto ReplicationInstanceClient::CoordinatorSocketAddress() const -> std::string {
|
||||
return config_.CoordinatorSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceClient::ReplicationSocketAddress() const -> std::string {
|
||||
return config_.ReplicationSocketAddress();
|
||||
}
|
||||
|
||||
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
|
||||
auto ReplicationInstanceClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
|
||||
return config_.instance_down_timeout_sec;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
|
||||
auto ReplicationInstanceClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
|
||||
return config_.instance_get_uuid_frequency_sec;
|
||||
}
|
||||
|
||||
void CoordinatorClient::StartFrequentCheck() {
|
||||
void ReplicationInstanceClient::StartFrequentCheck() {
|
||||
if (instance_checker_.IsRunning()) {
|
||||
return;
|
||||
}
|
||||
@ -81,16 +87,17 @@ void CoordinatorClient::StartFrequentCheck() {
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
|
||||
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
|
||||
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
|
||||
void ReplicationInstanceClient::StopFrequentCheck() { instance_checker_.Stop(); }
|
||||
void ReplicationInstanceClient::PauseFrequentCheck() { instance_checker_.Pause(); }
|
||||
void ReplicationInstanceClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
|
||||
|
||||
auto CoordinatorClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
auto ReplicationInstanceClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
return config_.replication_client_info;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
ReplicationClientsInfo replication_clients_info) const -> bool {
|
||||
auto ReplicationInstanceClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
ReplicationClientsInfo replication_clients_info) const
|
||||
-> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(uuid, std::move(replication_clients_info))};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -104,7 +111,7 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::DemoteToReplica() const -> bool {
|
||||
auto ReplicationInstanceClient::DemoteToReplica() const -> bool {
|
||||
auto const &instance_name = config_.instance_name;
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)};
|
||||
@ -120,7 +127,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
|
||||
auto ReplicationInstanceClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -134,7 +141,7 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bo
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
|
||||
auto ReplicationInstanceClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -148,7 +155,7 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name)
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceUUIDRpc() const
|
||||
auto ReplicationInstanceClient::SendGetInstanceUUIDRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<GetInstanceUUIDRpc>()};
|
||||
@ -160,7 +167,7 @@ auto CoordinatorClient::SendGetInstanceUUIDRpc() const
|
||||
}
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
auto ReplicationInstanceClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<EnableWritingOnMainRpc>()};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -174,7 +181,7 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
|
||||
auto ReplicationInstanceClient::SendGetInstanceTimestampsRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
|
132
src/coordination/replication_instance_connector.cpp
Normal file
132
src/coordination/replication_instance_connector.cpp
Normal file
@ -0,0 +1,132 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/replication_instance_connector.hpp"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include "replication_coordination_glue/handler.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
ReplicationInstanceConnector::ReplicationInstanceConnector(std::unique_ptr<ReplicationInstanceClient> client,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb)
|
||||
: client_(std::move(client)), succ_cb_(succ_instance_cb), fail_cb_(fail_instance_cb) {}
|
||||
|
||||
void ReplicationInstanceConnector::OnSuccessPing() {
|
||||
last_response_time_ = std::chrono::system_clock::now();
|
||||
is_alive_ = true;
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::OnFailPing() -> bool {
|
||||
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
|
||||
is_alive_ = elapsed_time < client_->InstanceDownTimeoutSec();
|
||||
return is_alive_;
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::IsReadyForUUIDPing() -> bool {
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_) >
|
||||
client_->InstanceGetUUIDFrequencySec();
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_->InstanceName(); }
|
||||
auto ReplicationInstanceConnector::CoordinatorSocketAddress() const -> std::string {
|
||||
return client_->CoordinatorSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceConnector::ReplicationSocketAddress() const -> std::string {
|
||||
return client_->ReplicationSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceConnector::IsAlive() const -> bool { return is_alive_; }
|
||||
|
||||
auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb,
|
||||
HealthCheckInstanceCallback main_fail_cb) -> bool {
|
||||
if (!client_->SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
succ_cb_ = main_succ_cb;
|
||||
fail_cb_ = main_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_->DemoteToReplica(); }
|
||||
|
||||
auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
|
||||
HealthCheckInstanceCallback replica_fail_cb) -> bool {
|
||||
if (!client_->DemoteToReplica()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
succ_cb_ = replica_succ_cb;
|
||||
fail_cb_ = replica_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_->StartFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_->StopFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_->PauseFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_->ResumeFrequentCheck(); }
|
||||
|
||||
auto ReplicationInstanceConnector::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
return client_->ReplicationClientInfo();
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
|
||||
auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
|
||||
|
||||
auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; }
|
||||
|
||||
auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
if (!IsReadyForUUIDPing()) {
|
||||
return true;
|
||||
}
|
||||
auto res = SendGetInstanceUUID();
|
||||
if (res.HasError()) {
|
||||
return false;
|
||||
}
|
||||
UpdateReplicaLastResponseUUID();
|
||||
|
||||
// NOLINTNEXTLINE
|
||||
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return SendSwapAndUpdateUUID(curr_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
||||
return replication_coordination_glue::SendSwapMainUUIDRpc(client_->RpcClient(), new_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
||||
return client_->SendUnregisterReplicaRpc(instance_name);
|
||||
}
|
||||
|
||||
auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_->SendEnableWritingOnMainRpc(); }
|
||||
|
||||
auto ReplicationInstanceConnector::SendGetInstanceUUID()
|
||||
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
return client_->SendGetInstanceUUIDRpc();
|
||||
}
|
||||
|
||||
void ReplicationInstanceConnector::UpdateReplicaLastResponseUUID() {
|
||||
last_check_of_uuid_ = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
@ -10,12 +10,17 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "replication.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
|
||||
#include <limits>
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(management_port, 0, "Port on which coordinator servers will be started.");
|
||||
DEFINE_VALIDATED_int32(management_port, 0, "Port on which coordinator servers will be started.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(coordinator_port, 0, "Port on which raft servers will be started.");
|
||||
DEFINE_VALIDATED_int32(coordinator_port, 0, "Port on which raft servers will be started.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint32(coordinator_id, 0, "Unique ID of the raft server.");
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
@ -15,9 +15,9 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(management_port);
|
||||
DECLARE_int32(management_port);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(coordinator_port);
|
||||
DECLARE_int32(coordinator_port);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_uint32(coordinator_id);
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
|
@ -407,7 +407,24 @@ int main(int argc, char **argv) {
|
||||
|
||||
// singleton coordinator state
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState coordinator_state;
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorState;
|
||||
using memgraph::coordination::ReplicationInstanceInitConfig;
|
||||
std::optional<CoordinatorState> coordinator_state{std::nullopt};
|
||||
if (FLAGS_management_port && (FLAGS_coordinator_id || FLAGS_coordinator_port)) {
|
||||
throw std::runtime_error(
|
||||
"Coordinator cannot be started with both coordinator_id/port and management_port. Specify coordinator_id and "
|
||||
"port for coordinator instance and management port for replication instance.");
|
||||
}
|
||||
|
||||
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
|
||||
coordinator_state.emplace(CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id,
|
||||
.coordinator_port = FLAGS_coordinator_port,
|
||||
.bolt_port = FLAGS_bolt_port});
|
||||
}
|
||||
if (FLAGS_management_port) {
|
||||
coordinator_state.emplace(ReplicationInstanceInitConfig{.management_port = FLAGS_management_port});
|
||||
}
|
||||
#endif
|
||||
|
||||
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
|
||||
@ -430,19 +447,20 @@ int main(int argc, char **argv) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
// MAIN or REPLICA instance
|
||||
if (FLAGS_management_port) {
|
||||
memgraph::dbms::CoordinatorHandlers::Register(coordinator_state.GetCoordinatorServer(), replication_handler);
|
||||
MG_ASSERT(coordinator_state.GetCoordinatorServer().Start(), "Failed to start coordinator server!");
|
||||
memgraph::dbms::CoordinatorHandlers::Register(coordinator_state->GetCoordinatorServer(), replication_handler);
|
||||
MG_ASSERT(coordinator_state->GetCoordinatorServer().Start(), "Failed to start coordinator server!");
|
||||
}
|
||||
#endif
|
||||
|
||||
auto db_acc = dbms_handler.Get();
|
||||
|
||||
memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, system,
|
||||
memgraph::query::InterpreterContext interpreter_context_(
|
||||
interp_config, &dbms_handler, &repl_state, system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
&coordinator_state,
|
||||
coordinator_state ? std::optional<std::reference_wrapper<CoordinatorState>>{std::ref(*coordinator_state)}
|
||||
: std::nullopt,
|
||||
#endif
|
||||
auth_handler.get(), auth_checker.get(),
|
||||
&replication_handler);
|
||||
auth_handler.get(), auth_checker.get(), &replication_handler);
|
||||
MG_ASSERT(db_acc, "Failed to access the main database");
|
||||
|
||||
memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(),
|
||||
|
@ -3178,7 +3178,7 @@ class CoordinatorQuery : public memgraph::query::Query {
|
||||
memgraph::query::CoordinatorQuery::Action action_;
|
||||
std::string instance_name_{};
|
||||
std::unordered_map<memgraph::query::Expression *, memgraph::query::Expression *> configs_;
|
||||
memgraph::query::Expression *coordinator_server_id_{nullptr};
|
||||
memgraph::query::Expression *coordinator_id_{nullptr};
|
||||
memgraph::query::CoordinatorQuery::SyncMode sync_mode_;
|
||||
|
||||
CoordinatorQuery *Clone(AstStorage *storage) const override {
|
||||
@ -3186,7 +3186,7 @@ class CoordinatorQuery : public memgraph::query::Query {
|
||||
|
||||
object->action_ = action_;
|
||||
object->instance_name_ = instance_name_;
|
||||
object->coordinator_server_id_ = coordinator_server_id_ ? coordinator_server_id_->Clone(storage) : nullptr;
|
||||
object->coordinator_id_ = coordinator_id_ ? coordinator_id_->Clone(storage) : nullptr;
|
||||
object->sync_mode_ = sync_mode_;
|
||||
for (const auto &[key, value] : configs_) {
|
||||
object->configs_[key->Clone(storage)] = value->Clone(storage);
|
||||
|
@ -447,7 +447,7 @@ antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::Add
|
||||
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
|
||||
|
||||
coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE;
|
||||
coordinator_query->coordinator_server_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
|
||||
coordinator_query->coordinator_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
|
||||
coordinator_query->configs_ =
|
||||
std::any_cast<std::unordered_map<Expression *, Expression *>>(ctx->configsMap->accept(this));
|
||||
|
||||
|
@ -498,7 +498,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
}
|
||||
|
||||
auto const coord_coord_config =
|
||||
coordination::CoordinatorToCoordinatorConfig{.coordinator_server_id = coordinator_id,
|
||||
coordination::CoordinatorToCoordinatorConfig{.coordinator_id = coordinator_id,
|
||||
.bolt_server = *maybe_bolt_server,
|
||||
.coordinator_server = *maybe_coordinator_server};
|
||||
|
||||
@ -1224,7 +1224,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer);
|
||||
}
|
||||
|
||||
auto coord_server_id = coordinator_query->coordinator_server_id_->Accept(evaluator).ValueInt();
|
||||
auto coord_server_id = coordinator_query->coordinator_id_->Accept(evaluator).ValueInt();
|
||||
|
||||
callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coord_server_id,
|
||||
bolt_server = bolt_server_it->second,
|
||||
@ -4299,8 +4299,10 @@ void Interpreter::RollbackTransaction() {
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> RouteResult {
|
||||
// TODO: (andi) Test
|
||||
if (!FLAGS_coordinator_id) {
|
||||
if (!interpreter_context_->coordinator_state_) {
|
||||
throw QueryException("You cannot fetch routing table from an instance which is not part of a cluster.");
|
||||
}
|
||||
if (FLAGS_management_port) {
|
||||
auto const &address = routing.find("address");
|
||||
if (address == routing.end()) {
|
||||
throw QueryException("Routing table must contain address field.");
|
||||
@ -4315,7 +4317,7 @@ auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> Ro
|
||||
return result;
|
||||
}
|
||||
|
||||
return RouteResult{.servers = interpreter_context_->coordinator_state_->GetRoutingTable(routing)};
|
||||
return RouteResult{.servers = interpreter_context_->coordinator_state_->get().GetRoutingTable()};
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -15,18 +15,18 @@
|
||||
#include "system/include/system/system.hpp"
|
||||
namespace memgraph::query {
|
||||
|
||||
InterpreterContext::InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
replication::ReplicationState *rs, memgraph::system::System &system,
|
||||
InterpreterContext::InterpreterContext(
|
||||
InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler, replication::ReplicationState *rs,
|
||||
memgraph::system::System &system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState *coordinator_state,
|
||||
std::optional<std::reference_wrapper<memgraph::coordination::CoordinatorState>> const &coordinator_state,
|
||||
#endif
|
||||
AuthQueryHandler *ah, AuthChecker *ac,
|
||||
ReplicationQueryHandler *replication_handler)
|
||||
AuthQueryHandler *ah, AuthChecker *ac, ReplicationQueryHandler *replication_handler)
|
||||
: dbms_handler(dbms_handler),
|
||||
config(interpreter_config),
|
||||
repl_state(rs),
|
||||
#ifdef MG_ENTERPRISE
|
||||
coordinator_state_{coordinator_state},
|
||||
coordinator_state_(coordinator_state),
|
||||
#endif
|
||||
auth(ah),
|
||||
auth_checker(ac),
|
||||
|
@ -57,7 +57,7 @@ struct InterpreterContext {
|
||||
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
replication::ReplicationState *rs, memgraph::system::System &system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState *coordinator_state,
|
||||
std::optional<std::reference_wrapper<coordination::CoordinatorState>> const &coordinator_state,
|
||||
#endif
|
||||
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
|
||||
ReplicationQueryHandler *replication_handler = nullptr);
|
||||
@ -72,7 +72,7 @@ struct InterpreterContext {
|
||||
// GLOBAL
|
||||
memgraph::replication::ReplicationState *repl_state;
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState *coordinator_state_;
|
||||
std::optional<std::reference_wrapper<coordination::CoordinatorState>> coordinator_state_;
|
||||
#endif
|
||||
|
||||
AuthQueryHandler *auth;
|
||||
|
@ -47,5 +47,5 @@ func main() {
|
||||
read_messages("neo4j://localhost:7690") // coordinator_1
|
||||
read_messages("neo4j://localhost:7691") // coordinator_2
|
||||
read_messages("neo4j://localhost:7692") // coordinator_3
|
||||
fmt.Println("Successfully finished running coordinator_route.go test")
|
||||
fmt.Println("Successfully finished running read_route.go test")
|
||||
}
|
||||
|
@ -453,9 +453,30 @@ target_link_libraries(${test_prefix}coordinator_cluster_state gflags mg-coordina
|
||||
target_include_directories(${test_prefix}coordinator_cluster_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test Raft log serialization
|
||||
# Test coordinator state machine
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(routing_table.cpp)
|
||||
target_link_libraries(${test_prefix}routing_table gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}routing_table PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
add_unit_test(coordinator_state_machine.cpp)
|
||||
target_link_libraries(${test_prefix}coordinator_state_machine gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}coordinator_state_machine PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test RAFT state
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(raft_state.cpp)
|
||||
target_link_libraries(${test_prefix}raft_state gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}raft_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test coordinator instance
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(coordinator_instance.cpp)
|
||||
target_link_libraries(${test_prefix}coordinator_instance gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}coordinator_instance PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test replication instance connector
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(replication_instance_connector.cpp)
|
||||
target_link_libraries(${test_prefix}replication_instance_connector gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}replication_instance_connector PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
@ -16,6 +16,8 @@
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
|
||||
class CoordinationUtils : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
@ -60,7 +62,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
@ -112,7 +117,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
@ -167,7 +174,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
@ -226,7 +235,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
|
@ -11,10 +11,9 @@
|
||||
|
||||
#include "nuraft/coordinator_cluster_state.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "replication_coordination_glue/role.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
@ -23,15 +22,17 @@
|
||||
#include "libnuraft/nuraft.hxx"
|
||||
|
||||
using memgraph::coordination::CoordinatorClusterState;
|
||||
using memgraph::coordination::CoordinatorStateMachine;
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorInstanceState;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftLogAction;
|
||||
using memgraph::coordination::ReplicationInstanceState;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::replication_coordination_glue::ReplicationRole;
|
||||
using memgraph::utils::UUID;
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using nuraft::ptr;
|
||||
|
||||
class CoordinatorClusterStateTest : public ::testing::Test {
|
||||
@ -44,6 +45,174 @@ class CoordinatorClusterStateTest : public ::testing::Test {
|
||||
"MG_tests_unit_coordinator_cluster_state"};
|
||||
};
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
|
||||
auto instances = cluster_state.GetReplicationInstances();
|
||||
ASSERT_EQ(instances.size(), 1);
|
||||
ASSERT_EQ(instances[0].config, config);
|
||||
ASSERT_EQ(instances[0].status, ReplicationRole::REPLICA);
|
||||
ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 1);
|
||||
|
||||
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
cluster_state.DoAction("instance3", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE);
|
||||
|
||||
ASSERT_EQ(cluster_state.GetReplicationInstances().size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10111},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10010}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
}
|
||||
|
||||
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
|
||||
auto const repl_instances = cluster_state.GetReplicationInstances();
|
||||
ASSERT_EQ(repl_instances.size(), 2);
|
||||
ASSERT_EQ(repl_instances[0].status, ReplicationRole::REPLICA);
|
||||
ASSERT_EQ(repl_instances[1].status, ReplicationRole::MAIN);
|
||||
ASSERT_TRUE(cluster_state.MainExists());
|
||||
ASSERT_TRUE(cluster_state.IsMain("instance3"));
|
||||
ASSERT_FALSE(cluster_state.IsMain("instance2"));
|
||||
ASSERT_TRUE(cluster_state.IsReplica("instance2"));
|
||||
ASSERT_FALSE(cluster_state.IsReplica("instance3"));
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10111},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10010}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
}
|
||||
|
||||
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
|
||||
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_REPLICA);
|
||||
cluster_state.DoAction("instance2", RaftLogAction::SET_INSTANCE_AS_MAIN);
|
||||
auto const repl_instances = cluster_state.GetReplicationInstances();
|
||||
ASSERT_EQ(repl_instances.size(), 2);
|
||||
ASSERT_EQ(repl_instances[0].status, ReplicationRole::MAIN);
|
||||
ASSERT_EQ(repl_instances[1].status, ReplicationRole::REPLICA);
|
||||
ASSERT_TRUE(cluster_state.MainExists());
|
||||
ASSERT_TRUE(cluster_state.IsMain("instance2"));
|
||||
ASSERT_FALSE(cluster_state.IsMain("instance3"));
|
||||
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
|
||||
ASSERT_FALSE(cluster_state.IsReplica("instance2"));
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, UpdateUUID) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
auto uuid = UUID();
|
||||
cluster_state.DoAction(uuid, RaftLogAction::UPDATE_UUID);
|
||||
ASSERT_EQ(cluster_state.GetUUID(), uuid);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
|
||||
|
||||
auto instances = cluster_state.GetCoordinatorInstances();
|
||||
ASSERT_EQ(instances.size(), 2);
|
||||
ASSERT_EQ(instances[1].config, config);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
|
||||
ReplicationInstanceState instance_state{
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
@ -65,122 +234,43 @@ TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
|
||||
EXPECT_EQ(instance_state.status, deserialized_instance_state.status);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, DoActionRegisterInstances) {
|
||||
auto coordinator_cluster_state = memgraph::coordination::CoordinatorClusterState{};
|
||||
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance1",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10111},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance1",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = {.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10002}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10113},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
.replication_client_info = {.instance_name = "instance3",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10003}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance4",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10114},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7690},
|
||||
.replication_client_info = {.instance_name = "instance4",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10004}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance5",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10115},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7691},
|
||||
.replication_client_info = {.instance_name = "instance5",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10005}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance6",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10116},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7692},
|
||||
.replication_client_info = {.instance_name = "instance6",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10006}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||
|
||||
coordinator_cluster_state.DoAction(payload, action);
|
||||
}
|
||||
|
||||
ptr<buffer> data;
|
||||
coordinator_cluster_state.Serialize(data);
|
||||
|
||||
auto deserialized_coordinator_cluster_state = CoordinatorClusterState::Deserialize(*data);
|
||||
ASSERT_EQ(coordinator_cluster_state.GetReplicationInstances(),
|
||||
deserialized_coordinator_cluster_state.GetReplicationInstances());
|
||||
TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
|
||||
CoordinatorInstanceState instance_state{
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}}};
|
||||
nlohmann::json j = instance_state;
|
||||
CoordinatorInstanceState deserialized_instance_state = j.get<CoordinatorInstanceState>();
|
||||
ASSERT_EQ(instance_state, deserialized_instance_state);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, Marshalling) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
|
||||
|
||||
auto config2 =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10111},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10010}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
cluster_state.DoAction(config2, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
|
||||
|
||||
ptr<buffer> data{};
|
||||
cluster_state.Serialize(data);
|
||||
|
||||
auto deserialized_cluster_state = CoordinatorClusterState::Deserialize(*data);
|
||||
ASSERT_EQ(cluster_state, deserialized_cluster_state);
|
||||
}
|
||||
|
130
tests/unit/coordinator_instance.cpp
Normal file
130
tests/unit/coordinator_instance.cpp
Normal file
@ -0,0 +1,130 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "interpreter_faker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/replication_handler.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstance;
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::HealthCheckClientCallback;
|
||||
using memgraph::coordination::HealthCheckInstanceCallback;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::RegisterInstanceCoordinatorStatus;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::coordination::ReplicationInstanceClient;
|
||||
using memgraph::coordination::ReplicationInstanceConnector;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication::ReplicationHandler;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::storage::Config;
|
||||
|
||||
using testing::_;
|
||||
|
||||
class ReplicationInstanceClientMock : public ReplicationInstanceClient {
|
||||
public:
|
||||
ReplicationInstanceClientMock(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config)
|
||||
: ReplicationInstanceClient(coord_instance, config, nullptr, nullptr) {
|
||||
ON_CALL(*this, DemoteToReplica()).WillByDefault(testing::Return(true));
|
||||
}
|
||||
MOCK_METHOD0(DemoteToReplica, bool());
|
||||
};
|
||||
|
||||
class CoordinatorInstanceTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
|
||||
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_tests_unit_coordinator_instance"};
|
||||
};
|
||||
|
||||
TEST_F(CoordinatorInstanceTest, RegisterReplicationInstance) {
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
|
||||
auto instance1 = CoordinatorInstance{init_config};
|
||||
|
||||
auto const coord_to_replica_config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto status = instance1.RegisterReplicationInstance(coord_to_replica_config);
|
||||
EXPECT_EQ(status, RegisterInstanceCoordinatorStatus::SUCCESS);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) {
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
|
||||
|
||||
auto const instance1 = CoordinatorInstance{init_config};
|
||||
auto const instances = instance1.ShowInstances();
|
||||
ASSERT_EQ(instances.size(), 1);
|
||||
ASSERT_EQ(instances[0].instance_name, "coordinator_4");
|
||||
ASSERT_EQ(instances[0].health, "unknown");
|
||||
ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110");
|
||||
ASSERT_EQ(instances[0].coord_socket_address, "");
|
||||
ASSERT_EQ(instances[0].cluster_role, "coordinator");
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorInstanceTest, ConnectCoordinators) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687};
|
||||
|
||||
auto instance1 = CoordinatorInstance{init_config1};
|
||||
|
||||
auto const init_config2 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 2, .coordinator_port = 10112, .bolt_port = 7688};
|
||||
|
||||
auto const instance2 = CoordinatorInstance{init_config2};
|
||||
|
||||
auto const init_config3 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 3, .coordinator_port = 10113, .bolt_port = 7689};
|
||||
|
||||
auto const instance3 = CoordinatorInstance{init_config3};
|
||||
|
||||
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 2,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10112}});
|
||||
|
||||
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 3,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10113}});
|
||||
|
||||
auto const instances = instance1.ShowInstances();
|
||||
ASSERT_EQ(instances.size(), 3);
|
||||
ASSERT_EQ(instances[0].instance_name, "coordinator_1");
|
||||
ASSERT_EQ(instances[1].instance_name, "coordinator_2");
|
||||
ASSERT_EQ(instances[2].instance_name, "coordinator_3");
|
||||
}
|
131
tests/unit/coordinator_state_machine.cpp
Normal file
131
tests/unit/coordinator_state_machine.cpp
Normal file
@ -0,0 +1,131 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "libnuraft/nuraft.hxx"
|
||||
|
||||
using memgraph::coordination::CoordinatorStateMachine;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftLogAction;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::utils::UUID;
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
using nuraft::ptr;
|
||||
|
||||
class CoordinatorStateMachineTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
|
||||
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() /
|
||||
"MG_tests_unit_coordinator_state_machine"};
|
||||
};
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeRegisterReplicationInstance) {
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
buffer_serializer bs(*data);
|
||||
auto const expected = nlohmann::json{{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeUnregisterReplicationInstance) {
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeUnregisterInstance("instance3");
|
||||
buffer_serializer bs(*data);
|
||||
|
||||
auto const expected =
|
||||
nlohmann::json{{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", "instance3"}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeAddCoordinatorInstance) {
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeAddCoordinatorInstance(config);
|
||||
buffer_serializer bs(*data);
|
||||
auto const expected = nlohmann::json{{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeSetInstanceToMain) {
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10112},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = {.instance_name = "instance_name",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}},
|
||||
.instance_health_check_frequency_sec = std::chrono::seconds{1},
|
||||
.instance_down_timeout_sec = std::chrono::seconds{5},
|
||||
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
|
||||
.ssl = std::nullopt};
|
||||
|
||||
CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||
|
||||
{
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3");
|
||||
buffer_serializer bs(*data);
|
||||
auto const expected = nlohmann::json{{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", "instance3"}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
||||
|
||||
{
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeSetInstanceAsReplica("instance3");
|
||||
buffer_serializer bs(*data);
|
||||
auto const expected = nlohmann::json{{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", "instance3"}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorStateMachineTest, SerializeUpdateUUID) {
|
||||
auto uuid = UUID{};
|
||||
|
||||
ptr<buffer> data = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
|
||||
buffer_serializer bs(*data);
|
||||
auto const expected = nlohmann::json{{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}};
|
||||
ASSERT_EQ(bs.get_str(), expected.dump());
|
||||
}
|
@ -2706,7 +2706,7 @@ TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) {
|
||||
auto *parsed_query = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query));
|
||||
|
||||
EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE);
|
||||
ast_generator.CheckLiteral(parsed_query->coordinator_server_id_, TypedValue(1));
|
||||
ast_generator.CheckLiteral(parsed_query->coordinator_id_, TypedValue(1));
|
||||
|
||||
auto const evaluate_config_map = [&ast_generator](std::unordered_map<Expression *, Expression *> const &config_map)
|
||||
-> std::map<std::string, std::string, std::less<>> {
|
||||
|
138
tests/unit/raft_state.cpp
Normal file
138
tests/unit/raft_state.cpp
Normal file
@ -0,0 +1,138 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "coordination/raft_state.hpp"
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "libnuraft/nuraft.hxx"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using nuraft::ptr;
|
||||
|
||||
class RaftStateTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
|
||||
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_state"};
|
||||
};
|
||||
|
||||
TEST_F(RaftStateTest, RaftStateEmptyMetadata) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
|
||||
auto const config = CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 1234, .bolt_port = 7688};
|
||||
|
||||
auto raft_state = RaftState::MakeRaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
ASSERT_EQ(raft_state.InstanceName(), "coordinator_1");
|
||||
ASSERT_EQ(raft_state.RaftSocketAddress(), "127.0.0.1:1234");
|
||||
ASSERT_TRUE(raft_state.IsLeader());
|
||||
ASSERT_TRUE(raft_state.GetReplicationInstances().empty());
|
||||
|
||||
auto const coords = raft_state.GetCoordinatorInstances();
|
||||
ASSERT_EQ(coords.size(), 1);
|
||||
auto const &coord_instance = coords[0];
|
||||
auto const &coord_config = CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 1234}};
|
||||
ASSERT_EQ(coord_instance.config, coord_config);
|
||||
}
|
||||
|
||||
TEST_F(RaftStateTest, GetSingleRouterRoutingTable) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10112, .bolt_port = 7688};
|
||||
|
||||
auto const raft_state =
|
||||
RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
auto routing_table = raft_state.GetRoutingTable();
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 1);
|
||||
|
||||
auto const routers = routing_table[0];
|
||||
ASSERT_EQ(routers.first, std::vector<std::string>{"127.0.0.1:7688"});
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
TEST_F(RaftStateTest, GetMixedRoutingTable) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10113, .bolt_port = 7690};
|
||||
auto leader = RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance1",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10011},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance1",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}}});
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10012},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10002}}});
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10013},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance3",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10003}}});
|
||||
|
||||
leader.AppendAddCoordinatorInstanceLog(
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 2,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7691},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10114}});
|
||||
|
||||
leader.AppendAddCoordinatorInstanceLog(
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 3,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7692},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10115}});
|
||||
|
||||
leader.AppendSetInstanceAsMainLog("instance1");
|
||||
|
||||
auto const routing_table = leader.GetRoutingTable();
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 3);
|
||||
|
||||
auto const &mains = routing_table[0];
|
||||
ASSERT_EQ(mains.second, "WRITE");
|
||||
ASSERT_EQ(mains.first, std::vector<std::string>{"127.0.0.1:7687"});
|
||||
|
||||
auto const &replicas = routing_table[1];
|
||||
ASSERT_EQ(replicas.second, "READ");
|
||||
auto const expected_replicas = std::vector<std::string>{"127.0.0.1:7688", "127.0.0.1:7689"};
|
||||
ASSERT_EQ(replicas.first, expected_replicas);
|
||||
|
||||
auto const &routers = routing_table[2];
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
auto const expected_routers = std::vector<std::string>{"127.0.0.1:7690", "127.0.0.1:7691", "127.0.0.1:7692"};
|
||||
ASSERT_EQ(routers.first, expected_routers);
|
||||
}
|
51
tests/unit/replication_instance_connector.cpp
Normal file
51
tests/unit/replication_instance_connector.cpp
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "coordination/replication_instance_connector.hpp"
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "interpreter_faker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/replication_handler.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstance;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::HealthCheckClientCallback;
|
||||
using memgraph::coordination::HealthCheckInstanceCallback;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::coordination::ReplicationInstanceClient;
|
||||
using memgraph::coordination::ReplicationInstanceConnector;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication::ReplicationHandler;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::storage::Config;
|
||||
|
||||
using testing::_;
|
||||
|
||||
class ReplicationInstanceClientMock {};
|
||||
|
||||
class ReplicationInstanceConnectorTest : public ::testing::Test {
|
||||
public:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
};
|
@ -1,176 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "interpreter_faker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/replication_handler.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstance;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication::ReplicationHandler;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::storage::Config;
|
||||
|
||||
// class MockCoordinatorInstance : CoordinatorInstance {
|
||||
// auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void override {}
|
||||
// };
|
||||
|
||||
class RoutingTableTest : public ::testing::Test {
|
||||
protected:
|
||||
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_tests_unit_coordinator_cluster_state"};
|
||||
std::filesystem::path repl1_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_test_unit_storage_v2_replication_repl"};
|
||||
std::filesystem::path repl2_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_test_unit_storage_v2_replication_repl2"};
|
||||
void SetUp() override { Clear(); }
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
Config main_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, main_data_directory);
|
||||
return config;
|
||||
}();
|
||||
Config repl1_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, repl1_data_directory);
|
||||
return config;
|
||||
}();
|
||||
Config repl2_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, repl2_data_directory);
|
||||
return config;
|
||||
}();
|
||||
|
||||
const std::string local_host = ("127.0.0.1");
|
||||
const std::array<uint16_t, 2> ports{10000, 20000};
|
||||
const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"};
|
||||
|
||||
private:
|
||||
void Clear() {
|
||||
if (std::filesystem::exists(main_data_directory)) std::filesystem::remove_all(main_data_directory);
|
||||
if (std::filesystem::exists(repl1_data_directory)) std::filesystem::remove_all(repl1_data_directory);
|
||||
if (std::filesystem::exists(repl2_data_directory)) std::filesystem::remove_all(repl2_data_directory);
|
||||
}
|
||||
};
|
||||
|
||||
struct MinMemgraph {
|
||||
MinMemgraph(const memgraph::storage::Config &conf)
|
||||
: auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}},
|
||||
repl_state{ReplicationStateRootPath(conf)},
|
||||
dbms{conf, repl_state
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
auth, true
|
||||
#endif
|
||||
},
|
||||
db_acc{dbms.Get()},
|
||||
db{*db_acc.get()},
|
||||
repl_handler(repl_state, dbms
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
system_, auth
|
||||
#endif
|
||||
) {
|
||||
}
|
||||
memgraph::auth::SynchedAuth auth;
|
||||
memgraph::system::System system_;
|
||||
memgraph::replication::ReplicationState repl_state;
|
||||
memgraph::dbms::DbmsHandler dbms;
|
||||
memgraph::dbms::DatabaseAccess db_acc;
|
||||
memgraph::dbms::Database &db;
|
||||
ReplicationHandler repl_handler;
|
||||
};
|
||||
;
|
||||
|
||||
TEST_F(RoutingTableTest, GetSingleRouterRoutingTable) {
|
||||
CoordinatorInstance instance1;
|
||||
auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
|
||||
auto routing_table = instance1.GetRoutingTable(routing);
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 1);
|
||||
|
||||
auto const routers = routing_table[0];
|
||||
ASSERT_EQ(routers.first, std::vector<std::string>{"localhost:7688"});
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
TEST_F(RoutingTableTest, GetMixedRoutingTable) {
|
||||
auto instance1 = RaftState::MakeRaftState([]() {}, []() {});
|
||||
auto routing = std::map<std::string, std::string>{{"address", "localhost:7690"}};
|
||||
instance1.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10011},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}}});
|
||||
instance1.GetAllCoordinators();
|
||||
// auto routing_table = instance1.GetRoutingTable(routing);
|
||||
|
||||
// ASSERT_EQ(routing_table.size(), 1);
|
||||
// auto const routers = routing_table[0];
|
||||
// ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
// TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) {
|
||||
//
|
||||
// CoordinatorInstance instance1;
|
||||
// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_server_id = 1,
|
||||
// .bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
// .coordinator_server = Endpoint{"127.0.0.1",
|
||||
// 10111}});
|
||||
//
|
||||
// auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
|
||||
// auto routing_table = instance1.GetRoutingTable(routing);
|
||||
//
|
||||
// ASSERT_EQ(routing_table.size(), 1);
|
||||
//
|
||||
// auto const routers = routing_table[0];
|
||||
// ASSERT_EQ(routers.second, "ROUTE");
|
||||
// ASSERT_EQ(routers.first.size(), 2);
|
||||
// auto const expected_routers = std::vector<std::string>{"localhost:7689", "localhost:7688"};
|
||||
// ASSERT_EQ(routers.first, expected_routers);
|
||||
// }
|
Loading…
Reference in New Issue
Block a user