make all tests pass
This commit is contained in:
parent
ee81a42923
commit
68c1d2526e
@ -50,16 +50,28 @@ CoordinatorInstance::CoordinatorInstance()
|
|||||||
&CoordinatorInstance::ReplicaFailCallback);
|
&CoordinatorInstance::ReplicaFailCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
auto main = instances | ranges::views::filter(
|
auto main_instances = instances | ranges::views::filter([](auto const &instance) {
|
||||||
[](auto const &instance) { return instance.status == ReplicationRole::MAIN; });
|
return instance.status == ReplicationRole::MAIN;
|
||||||
|
});
|
||||||
|
|
||||||
std::ranges::for_each(main, [this](auto &main_instance) {
|
std::ranges::for_each(main_instances, [this](auto &main_instance) {
|
||||||
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
|
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
|
||||||
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
|
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
|
||||||
&CoordinatorInstance::MainSuccessCallback,
|
&CoordinatorInstance::MainSuccessCallback,
|
||||||
&CoordinatorInstance::MainFailCallback);
|
&CoordinatorInstance::MainFailCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// In case we got out of force reset but instances weren't still demoted
|
||||||
|
// we need to apply functions to these instances to demote them
|
||||||
|
std::ranges::for_each(instances, [this](ReplicationInstanceState const &replication_instance_state) {
|
||||||
|
if (replication_instance_state.needs_demote) {
|
||||||
|
spdlog::trace("Changing callback for instance {} to demote callback",
|
||||||
|
replication_instance_state.config.instance_name);
|
||||||
|
auto &instance = FindReplicationInstance(replication_instance_state.config.instance_name);
|
||||||
|
instance.SetCallbacks(&CoordinatorInstance::DemoteSuccessCallback,
|
||||||
|
&CoordinatorInstance::DemoteFailCallback);
|
||||||
|
}
|
||||||
|
});
|
||||||
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
|
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
|
||||||
},
|
},
|
||||||
[this]() {
|
[this]() {
|
||||||
@ -265,8 +277,9 @@ void CoordinatorInstance::ForceResetCluster() {
|
|||||||
|
|
||||||
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_instances);
|
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_instances);
|
||||||
|
|
||||||
if (maybe_most_up_to_date_instance->empty()) {
|
if (!maybe_most_up_to_date_instance.has_value()) {
|
||||||
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
|
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
|
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
|
||||||
@ -294,12 +307,9 @@ void CoordinatorInstance::ForceResetCluster() {
|
|||||||
// we need to recreate state from raft log
|
// we need to recreate state from raft log
|
||||||
// If instance in raft log is MAIN, it can be REPLICA but raft append failed when we demoted it
|
// If instance in raft log is MAIN, it can be REPLICA but raft append failed when we demoted it
|
||||||
// If instance in raft log is REPLICA, it can be MAIN but raft log failed when we promoted it
|
// If instance in raft log is REPLICA, it can be MAIN but raft log failed when we promoted it
|
||||||
// CRUX of problem: We need universal callback which will get correct state of instance and swap callback then
|
// CRUX of problem: We need universal callback which will demote instance to replica and only then change to
|
||||||
|
// REPLICA callbacks
|
||||||
|
|
||||||
// TODO(antoniofilipovic): Summary of problem:
|
|
||||||
// above
|
|
||||||
|
|
||||||
// TODO(antoniofilipovic) Update this part here
|
|
||||||
auto needs_demote_setup_failed = [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
|
auto needs_demote_setup_failed = [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
|
||||||
if (instances_mapped_to_resp[repl_instance.InstanceName()]) {
|
if (instances_mapped_to_resp[repl_instance.InstanceName()]) {
|
||||||
return false;
|
return false;
|
||||||
@ -307,8 +317,7 @@ void CoordinatorInstance::ForceResetCluster() {
|
|||||||
if (!raft_state_.AppendInstanceNeedsDemote(repl_instance.InstanceName())) {
|
if (!raft_state_.AppendInstanceNeedsDemote(repl_instance.InstanceName())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
repl_instance.SetCallbacks(&CoordinatorInstance::UniversalSuccessCallback,
|
repl_instance.SetCallbacks(&CoordinatorInstance::DemoteSuccessCallback, &CoordinatorInstance::DemoteFailCallback);
|
||||||
&CoordinatorInstance::UniversalFailCallback);
|
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -349,8 +358,9 @@ auto CoordinatorInstance::TryFailover() -> void {
|
|||||||
|
|
||||||
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_replicas);
|
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_replicas);
|
||||||
|
|
||||||
if (maybe_most_up_to_date_instance->empty()) {
|
if (!maybe_most_up_to_date_instance.has_value()) {
|
||||||
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
|
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
|
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
|
||||||
@ -733,8 +743,8 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam
|
|||||||
repl_instance.OnFailPing();
|
repl_instance.OnFailPing();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoordinatorInstance::UniversalSuccessCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::DemoteSuccessCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
spdlog::trace("Instance {} performing demote to replica successful callback", repl_instance_name);
|
||||||
|
|
||||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||||
|
|
||||||
@ -746,12 +756,11 @@ void CoordinatorInstance::UniversalSuccessCallback(std::string_view repl_instanc
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(antoniofilipovic) Double check that switching works
|
|
||||||
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback);
|
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoordinatorInstance::UniversalFailCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::DemoteFailCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
spdlog::trace("Instance {} performing demote to replica failure callback", repl_instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories)
|
auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories)
|
||||||
|
@ -69,7 +69,10 @@ class CoordinatorInstance {
|
|||||||
private:
|
private:
|
||||||
template <ranges::forward_range R>
|
template <ranges::forward_range R>
|
||||||
auto GetMostUpToDateInstanceFromHistories(R &&alive_instances) -> std::optional<std::string> {
|
auto GetMostUpToDateInstanceFromHistories(R &&alive_instances) -> std::optional<std::string> {
|
||||||
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
auto const get_ts = [](ReplicationInstance &replica) {
|
||||||
|
spdlog::trace("Sending get instance timestamps to {}", replica.InstanceName());
|
||||||
|
return replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||||
|
};
|
||||||
|
|
||||||
auto maybe_instance_db_histories = alive_instances | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
auto maybe_instance_db_histories = alive_instances | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||||
|
|
||||||
@ -112,9 +115,9 @@ class CoordinatorInstance {
|
|||||||
|
|
||||||
void ReplicaFailCallback(std::string_view);
|
void ReplicaFailCallback(std::string_view);
|
||||||
|
|
||||||
void UniversalSuccessCallback(std::string_view);
|
void DemoteSuccessCallback(std::string_view repl_instance_name);
|
||||||
|
|
||||||
void UniversalFailCallback(std::string_view);
|
void DemoteFailCallback(std::string_view repl_instance_name);
|
||||||
|
|
||||||
void ForceResetCluster();
|
void ForceResetCluster();
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ namespace memgraph::replication_coordination_glue {
|
|||||||
struct DatabaseHistory {
|
struct DatabaseHistory {
|
||||||
memgraph::utils::UUID db_uuid;
|
memgraph::utils::UUID db_uuid;
|
||||||
std::vector<std::pair<std::string, uint64_t>> history;
|
std::vector<std::pair<std::string, uint64_t>> history;
|
||||||
std::string name;
|
std::string name; // db name
|
||||||
};
|
};
|
||||||
|
|
||||||
using DatabaseHistories = std::vector<DatabaseHistory>;
|
using DatabaseHistories = std::vector<DatabaseHistory>;
|
||||||
|
@ -1532,7 +1532,7 @@ def test_force_reset_works_after_failed_registration():
|
|||||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
||||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||||
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
|
||||||
]
|
]
|
||||||
|
|
||||||
follower_data = [
|
follower_data = [
|
||||||
@ -1541,7 +1541,7 @@ def test_force_reset_works_after_failed_registration():
|
|||||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
("instance_1", "", "", "unknown", "main"),
|
("instance_1", "", "", "unknown", "main"),
|
||||||
("instance_2", "", "", "unknown", "replica"),
|
("instance_2", "", "", "unknown", "replica"),
|
||||||
("instance_3", "", "", "unknown", "unknown"),
|
("instance_3", "", "", "unknown", "replica"),
|
||||||
]
|
]
|
||||||
|
|
||||||
mg_sleep_and_assert(leader_data, show_instances_coord3)
|
mg_sleep_and_assert(leader_data, show_instances_coord3)
|
||||||
@ -1550,5 +1550,4 @@ def test_force_reset_works_after_failed_registration():
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(pytest.main([__file__, "-k", "test_force_reset_works_after_failed_registration", "-vv"]))
|
|
||||||
sys.exit(pytest.main([__file__, "-rA"]))
|
sys.exit(pytest.main([__file__, "-rA"]))
|
||||||
|
Loading…
Reference in New Issue
Block a user