Compare commits

..

8 Commits

Author SHA1 Message Date
antoniofilipovic
cd6bd9e350 fix last comments 2024-03-23 18:24:10 +01:00
antoniofilipovic
68c1d2526e make all tests pass 2024-03-23 18:02:04 +01:00
antoniofilipovic
ee81a42923 add callback for demote to replica on force reset 2024-03-23 16:40:04 +01:00
antoniofilipovic
71f0f4a4b1 add impl for force sync 2024-03-22 17:10:27 +01:00
antoniofilipovic
3cc2dfafc4 add force reset test 2024-03-22 13:22:06 +01:00
antoniofilipovic
229e66b8bc add initial version of force reset 2024-03-22 13:22:06 +01:00
antoniofilipovic
32bb99d919 fix up coordinator instance 2024-03-22 13:22:06 +01:00
antoniofilipovic
9b37ba0ff6 initial commit for force reset 2024-03-22 13:22:06 +01:00
27 changed files with 562 additions and 263 deletions

View File

@ -337,8 +337,6 @@ if (ASAN)
endif() endif()
if (TSAN) if (TSAN)
message(WARNING "Disabling jemalloc as it doesn't work well with ASAN")
set(ENABLE_JEMALLOC OFF)
# ThreadSanitizer generally requires all code to be compiled with -fsanitize=thread. # ThreadSanitizer generally requires all code to be compiled with -fsanitize=thread.
# If some code (e.g. dynamic libraries) is not compiled with the flag, it can # If some code (e.g. dynamic libraries) is not compiled with the flag, it can
# lead to false positive race reports, false negative race reports and/or # lead to false positive race reports, false negative race reports and/or
@ -354,7 +352,7 @@ if (TSAN)
# By default ThreadSanitizer uses addr2line utility to symbolize reports. # By default ThreadSanitizer uses addr2line utility to symbolize reports.
# llvm-symbolizer is faster, consumes less memory and produces much better # llvm-symbolizer is faster, consumes less memory and produces much better
# reports. To use it set runtime flag: # reports. To use it set runtime flag:
# TSAN_OPTIONS="extern-symbolizer-path=~/llvm-symbolizer" # TSAN_OPTIONS="extern-symbolizer-path=~/llvm-symbolizer"
# For more runtime flags see: https://github.com/google/sanitizers/wiki/ThreadSanitizerFlags # For more runtime flags see: https://github.com/google/sanitizers/wiki/ThreadSanitizerFlags
endif() endif()

View File

@ -303,8 +303,6 @@ ExternalProject_Add(mgcxx-proj
"-DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>" "-DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>"
"-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}" "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}"
"-DENABLE_TESTS=OFF" "-DENABLE_TESTS=OFF"
"-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}"
"-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}"
INSTALL_DIR "${PROJECT_BINARY_DIR}/mgcxx" INSTALL_DIR "${PROJECT_BINARY_DIR}/mgcxx"
) )
ExternalProject_Get_Property(mgcxx-proj install_dir) ExternalProject_Get_Property(mgcxx-proj install_dir)

View File

@ -182,7 +182,7 @@ benchmark_tag="v1.6.0"
repo_clone_try_double "${primary_urls[gbenchmark]}" "${secondary_urls[gbenchmark]}" "benchmark" "$benchmark_tag" true repo_clone_try_double "${primary_urls[gbenchmark]}" "${secondary_urls[gbenchmark]}" "benchmark" "$benchmark_tag" true
# google test # google test
googletest_tag="v1.14.0" googletest_tag="release-1.8.0"
repo_clone_try_double "${primary_urls[gtest]}" "${secondary_urls[gtest]}" "googletest" "$googletest_tag" true repo_clone_try_double "${primary_urls[gtest]}" "${secondary_urls[gtest]}" "googletest" "$googletest_tag" true
# libbcrypt # libbcrypt

View File

@ -60,25 +60,16 @@ void CoordinatorClient::StartFrequentCheck() {
MG_ASSERT(config_.instance_health_check_frequency_sec > std::chrono::seconds(0), MG_ASSERT(config_.instance_health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0"); "Health check frequency must be greater than 0");
instance_checker_.Run( instance_checker_.Run(config_.instance_name, config_.instance_health_check_frequency_sec,
config_.instance_name, config_.instance_health_check_frequency_sec, [this, instance_name = config_.instance_name] {
[this, instance_name = config_.instance_name] { spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
try { config_.CoordinatorSocketAddress());
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name, if (SendFrequentHeartbeat()) {
config_.CoordinatorSocketAddress()); succ_cb_(coord_instance_, instance_name);
{ // NOTE: This is intentionally scoped so that stream lock could get released. return;
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; }
stream.AwaitResponse(); fail_cb_(coord_instance_, instance_name);
} });
// Subtle race condition:
// acquiring of lock needs to happen before function call, as function callback can be changed
// for instance after lock is already acquired
// (failover case when instance is promoted to MAIN)
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_instance_, instance_name);
}
});
} }
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); } void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
@ -120,6 +111,16 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
return false; return false;
} }
auto CoordinatorClient::SendFrequentHeartbeat() const -> bool {
try {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
return true;
} catch (rpc::RpcFailedException const &) {
return false;
}
}
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool { auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
try { try {
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)}; auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};

View File

@ -19,14 +19,17 @@
namespace memgraph::coordination { namespace memgraph::coordination {
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) { void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) {
j = nlohmann::json{ j = nlohmann::json{{"config", instance_state.config},
{"config", instance_state.config}, {"status", instance_state.status}, {"uuid", instance_state.instance_uuid}}; {"status", instance_state.status},
{"uuid", instance_state.instance_uuid},
{"needs_demote", instance_state.needs_demote}};
} }
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) { void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) {
j.at("config").get_to(instance_state.config); j.at("config").get_to(instance_state.config);
j.at("status").get_to(instance_state.status); j.at("status").get_to(instance_state.status);
j.at("uuid").get_to(instance_state.instance_uuid); j.at("uuid").get_to(instance_state.instance_uuid);
j.at("needs_demote").get_to(instance_state.needs_demote);
} }
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances, CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
@ -151,35 +154,17 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id); spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
break; break;
} }
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: { case RaftLogAction::INSTANCE_NEEDS_DEMOTE: {
is_lock_opened_ = true; auto const instance_name = std::get<std::string>(log_entry);
spdlog::trace("DoAction: open lock register"); auto it = repl_instances_.find(instance_name);
break; MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
// TODO(antoniofilipovic) save what we are doing to be able to undo.... it->second.needs_demote = true;
spdlog::trace("Added action that instance {} needs demote", instance_name);
} }
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE: { case RaftLogAction::OPEN_LOCK: {
is_lock_opened_ = true; is_lock_opened_ = true;
spdlog::trace("DoAction: open lock unregister"); spdlog::trace("DoAction: Opened lock");
break; break;
// TODO(antoniofilipovic) save what we are doing
}
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN: {
is_lock_opened_ = true;
spdlog::trace("DoAction: open lock set instance as main");
break;
// TODO(antoniofilipovic) save what we are doing
}
case RaftLogAction::OPEN_LOCK_FAILOVER: {
is_lock_opened_ = true;
spdlog::trace("DoAction: open lock failover");
break;
// TODO(antoniofilipovic) save what we are doing
}
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
is_lock_opened_ = true;
spdlog::trace("DoAction: open lock set instance as replica");
break;
// TODO(antoniofilipovic) save what we need to undo
} }
} }
} }

View File

@ -17,7 +17,6 @@
#include "dbms/constants.hpp" #include "dbms/constants.hpp"
#include "nuraft/coordinator_state_machine.hpp" #include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp" #include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include "utils/functional.hpp" #include "utils/functional.hpp"
#include "utils/resource_lock.hpp" #include "utils/resource_lock.hpp"
@ -33,6 +32,11 @@ CoordinatorInstance::CoordinatorInstance()
: thread_pool_{1}, : thread_pool_{1},
raft_state_(RaftState::MakeRaftState( raft_state_(RaftState::MakeRaftState(
[this]() { [this]() {
if (raft_state_.IsLockOpened()) {
spdlog::error("Leader hasn't encountered healthy state, doing force reset of cluster.");
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
return;
}
spdlog::info("Leader changed, starting all replication instances!"); spdlog::info("Leader changed, starting all replication instances!");
auto const instances = raft_state_.GetReplicationInstances(); auto const instances = raft_state_.GetReplicationInstances();
auto replicas = instances | ranges::views::filter([](auto const &instance) { auto replicas = instances | ranges::views::filter([](auto const &instance) {
@ -46,16 +50,28 @@ CoordinatorInstance::CoordinatorInstance()
&CoordinatorInstance::ReplicaFailCallback); &CoordinatorInstance::ReplicaFailCallback);
}); });
auto main = instances | ranges::views::filter( auto main_instances = instances | ranges::views::filter([](auto const &instance) {
[](auto const &instance) { return instance.status == ReplicationRole::MAIN; }); return instance.status == ReplicationRole::MAIN;
});
std::ranges::for_each(main, [this](auto &main_instance) { std::ranges::for_each(main_instances, [this](auto &main_instance) {
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name); spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_, repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback); &CoordinatorInstance::MainFailCallback);
}); });
// In case we got out of force reset but instances weren't still demoted
// we need to apply functions to these instances to demote them
std::ranges::for_each(instances, [this](ReplicationInstanceState const &replication_instance_state) {
if (replication_instance_state.needs_demote) {
spdlog::trace("Changing callback for instance {} to demote callback",
replication_instance_state.config.instance_name);
auto &instance = FindReplicationInstance(replication_instance_state.config.instance_name);
instance.SetCallbacks(&CoordinatorInstance::DemoteSuccessCallback,
&CoordinatorInstance::DemoteFailCallback);
}
});
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); }); std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
}, },
[this]() { [this]() {
@ -64,9 +80,11 @@ CoordinatorInstance::CoordinatorInstance()
// We need to stop checks before taking a lock because deadlock can happen if instances waits // We need to stop checks before taking a lock because deadlock can happen if instances waits
// to take a lock in frequent check, and this thread already has a lock and waits for instance to // to take a lock in frequent check, and this thread already has a lock and waits for instance to
// be done with frequent check // be done with frequent check
for (auto &repl_instance : repl_instances_) { std::ranges::for_each(repl_instances_, [](auto &repl_instance) {
spdlog::trace("Stopping frequent checks for instance {}", repl_instance.InstanceName());
repl_instance.StopFrequentCheck(); repl_instance.StopFrequentCheck();
} spdlog::trace("Stopped frequent checks for instance {}", repl_instance.InstanceName());
});
auto lock = std::unique_lock{coord_instance_lock_}; auto lock = std::unique_lock{coord_instance_lock_};
repl_instances_.clear(); repl_instances_.clear();
spdlog::info("Stopped all replication instance frequent checks."); spdlog::info("Stopped all replication instance frequent checks.");
@ -74,8 +92,6 @@ CoordinatorInstance::CoordinatorInstance()
})) { })) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; auto lock = std::unique_lock{self->coord_instance_lock_};
// when coordinator is becoming follower it will want to stop all threads doing frequent checks
// Thread can get stuck here waiting for lock so we need to frequently check if we are in shutdown state
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
@ -159,6 +175,171 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
return instances_status; return instances_status;
} }
void CoordinatorInstance::ForceResetCluster() {
// Force reset tries to return cluster to state in which we have all the replicas we had before
// and try to do failover to new MAIN. Only then is force reset successful
// 0. Open lock
// 1. Try to demote each instance to replica
// 2. Instances which are demoted proceed in next step as part of selection process
// 3. For selected instances try to send SWAP UUID and update log -> both must succeed
// 4. Do failover
// 5. For instances which were down set correct callback as before
// 6. After instance get's back up, do steps needed to recover
spdlog::trace("Force resetting cluster!");
// Ordering is important here, we must stop frequent check before
// taking lock to avoid deadlock between us stopping thread and thread wanting to take lock but can't because
// we have it
std::ranges::for_each(repl_instances_, [](auto &repl_instance) {
spdlog::trace("Stopping frequent check for instance {}", repl_instance.InstanceName());
repl_instance.StopFrequentCheck();
spdlog::trace("Stopped frequent check for instance {}", repl_instance.InstanceName());
});
spdlog::trace("Stopped all replication instance frequent checks.");
auto lock = std::unique_lock{coord_instance_lock_};
repl_instances_.clear();
if (!raft_state_.IsLeader()) {
spdlog::trace("Exiting force reset as coordinator is not any more leader!");
return;
}
if (!raft_state_.AppendOpenLock()) {
spdlog::trace("Appending log force reset failed, aborting force reset");
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is leader but append open lock failed, encountered wrong state.");
return;
}
utils::OnScopeExit maybe_do_another_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
spdlog::trace("Adding task to try force reset cluster again as lock is opened still.");
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
return;
}
spdlog::trace("Lock is not opened anymore or coordinator is not leader, not doing force reset again.");
}};
auto const instances = raft_state_.GetReplicationInstances();
// To each instance we send RPC
// If RPC fails we consider instance dead
// Otherwise we consider instance alive
// If at any point later RPC fails for alive instance, we consider this failure
std::ranges::for_each(instances, [this](auto &replica) {
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
});
auto instances_mapped_to_resp = repl_instances_ | ranges::views::transform([](auto &instance) {
return std::pair{instance.InstanceName(), instance.SendFrequentHeartbeat()};
}) |
ranges::to<std::unordered_map<std::string, bool>>();
auto alive_instances = repl_instances_ | ranges::views::filter([&instances_mapped_to_resp](auto const &instance) {
return instances_mapped_to_resp[instance.InstanceName()];
});
auto demote_to_replica_failed = [this](auto &instance) {
if (!instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
return true;
}
return !raft_state_.AppendSetInstanceAsReplicaLog(instance.InstanceName());
};
if (std::ranges::any_of(alive_instances, demote_to_replica_failed)) {
spdlog::error("Failed to send log instance demoted to replica.");
return;
}
auto const new_uuid = utils::UUID{};
auto update_uuid_failed = [&new_uuid, this](auto &repl_instance) {
if (!repl_instance.SendSwapAndUpdateUUID(new_uuid)) {
return true;
}
return !raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid);
};
if (std::ranges::any_of(alive_instances, update_uuid_failed)) {
spdlog::error("Force reset failed since update log swap uuid failed, assuming coordinator is now follower.");
return;
}
if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_uuid)) {
spdlog::error("Update log for new MAIN failed, assuming coordinator is now follower");
return;
}
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_instances);
if (!maybe_most_up_to_date_instance.has_value()) {
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
return;
}
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
auto const is_not_new_main = [&new_main](ReplicationInstance const &repl_instance) {
return repl_instance.InstanceName() != new_main.InstanceName();
};
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main.PromoteToMain(new_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Force reset failed since promoting replica to main failed.");
return;
}
// This will set cluster in healthy state again
if (!raft_state_.AppendSetInstanceAsMainLog(*maybe_most_up_to_date_instance, new_uuid)) {
spdlog::error("Update log for new MAIN failed");
return;
}
// We need to clear repl instances in the beginning as we don't know where exactly action failed and
// we need to recreate state from raft log
// If instance in raft log is MAIN, it can be REPLICA but raft append failed when we demoted it
// If instance in raft log is REPLICA, it can be MAIN but raft log failed when we promoted it
// CRUX of problem: We need universal callback which will demote instance to replica and only then change to
// REPLICA callbacks
auto needs_demote_setup_failed = [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
if (instances_mapped_to_resp[repl_instance.InstanceName()]) {
return false;
}
if (!raft_state_.AppendInstanceNeedsDemote(repl_instance.InstanceName())) {
return true;
}
repl_instance.SetCallbacks(&CoordinatorInstance::DemoteSuccessCallback, &CoordinatorInstance::DemoteFailCallback);
return false;
};
if (std::ranges::any_of(repl_instances_, needs_demote_setup_failed)) {
spdlog::error("Raft log didn't accept that some instances are in unknown state.");
return;
}
auto check_correct_callbacks_set = [this](auto &repl_instance) {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::ReplicaSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::ReplicaFailCallback,
"Callbacks are wrong");
} else {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::MainSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::MainFailCallback);
}
};
std::ranges::for_each(alive_instances, check_correct_callbacks_set);
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
MG_ASSERT(!raft_state_.IsLockOpened(), "After force reset we need to be in healthy state.");
}
auto CoordinatorInstance::TryFailover() -> void { auto CoordinatorInstance::TryFailover() -> void {
auto const is_replica = [this](ReplicationInstance const &instance) { auto const is_replica = [this](ReplicationInstance const &instance) {
return HasReplicaState(instance.InstanceName()); return HasReplicaState(instance.InstanceName());
@ -172,42 +353,32 @@ auto CoordinatorInstance::TryFailover() -> void {
return; return;
} }
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); }; auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_replicas);
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>(); if (!maybe_most_up_to_date_instance.has_value()) {
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); };
if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) {
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
return; return;
} }
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) { auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories = if (!raft_state_.AppendOpenLock()) {
ranges::views::zip(alive_replicas, maybe_instance_db_histories) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_db_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
if (!raft_state_.AppendOpenLockFailover(most_up_to_date_instance)) {
spdlog::error("Aborting failover as instance is not anymore leader."); spdlog::error("Aborting failover as instance is not anymore leader.");
return; return;
} }
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; utils::OnScopeExit do_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
}
}};
// We don't need to stop frequent check as we have lock, and we will swap callback function during locked phase
// In frequent check only when we take lock we then check which function (MAIN/REPLICA) success or fail callback
// we need to call
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) { auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
return instance.InstanceName() != new_main->InstanceName(); return instance.InstanceName() != new_main.InstanceName();
}; };
auto const new_main_uuid = utils::UUID{}; auto const new_main_uuid = utils::UUID{};
@ -228,8 +399,8 @@ auto CoordinatorInstance::TryFailover() -> void {
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>(); ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, if (!new_main.PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) { &CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Failover failed since promoting replica to main failed!"); spdlog::warn("Failover failed since promoting replica to main failed!");
return; return;
} }
@ -238,17 +409,17 @@ auto CoordinatorInstance::TryFailover() -> void {
return; return;
} }
auto const new_main_instance_name = new_main->InstanceName(); auto const new_main_instance_name = new_main.InstanceName();
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name, new_main_uuid)) { if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name, new_main_uuid)) {
return; return;
} }
if (!new_main->EnableWritingOnMain()) { if (!new_main.EnableWritingOnMain()) {
spdlog::error("Failover successful but couldn't enable writing on instance."); spdlog::error("Failover successful but couldn't enable writing on instance.");
} }
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); spdlog::info("Failover successful! Instance {} promoted to main.", new_main.InstanceName());
} }
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
@ -280,10 +451,16 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
} }
if (!raft_state_.AppendOpenLockSetInstanceToMain(instance_name)) { if (!raft_state_.AppendOpenLock()) {
return SetInstanceToMainCoordinatorStatus::OPEN_LOCK; return SetInstanceToMainCoordinatorStatus::FAILED_TO_OPEN_LOCK;
} }
utils::OnScopeExit do_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
}
}};
new_main->PauseFrequentCheck(); new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
@ -357,15 +534,23 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
return RegisterInstanceCoordinatorStatus::NOT_LEADER; return RegisterInstanceCoordinatorStatus::NOT_LEADER;
} }
if (!raft_state_.AppendOpenLockRegister(config)) { if (!raft_state_.AppendOpenLock()) {
return RegisterInstanceCoordinatorStatus::OPEN_LOCK; return RegisterInstanceCoordinatorStatus::FAILED_TO_OPEN_LOCK;
} }
utils::OnScopeExit do_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
}
}};
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_, auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback); &CoordinatorInstance::ReplicaFailCallback);
if (!new_instance->SendDemoteToReplicaRpc()) { if (!new_instance->DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
// TODO(antoniofilipovic) We don't need to do here force reset, only close lock later on
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name); spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
return RegisterInstanceCoordinatorStatus::RPC_FAILED; return RegisterInstanceCoordinatorStatus::RPC_FAILED;
} }
@ -409,10 +594,16 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
return UnregisterInstanceCoordinatorStatus::IS_MAIN; return UnregisterInstanceCoordinatorStatus::IS_MAIN;
} }
if (!raft_state_.AppendOpenLockUnregister(instance_name)) { if (!raft_state_.AppendOpenLock()) {
return UnregisterInstanceCoordinatorStatus::OPEN_LOCK; return UnregisterInstanceCoordinatorStatus::FAILED_TO_OPEN_LOCK;
} }
utils::OnScopeExit do_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
}
}};
inst_to_remove->StopFrequentCheck(); inst_to_remove->StopFrequentCheck();
auto curr_main = std::ranges::find_if(repl_instances_, is_current_main); auto curr_main = std::ranges::find_if(repl_instances_, is_current_main);
@ -483,7 +674,7 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
return; return;
} }
if (!raft_state_.AppendOpenLockSetInstanceToReplica(repl_instance.InstanceName())) { if (!raft_state_.AppendOpenLock()) {
spdlog::error("Failed to open lock for demoting OLD MAIN {} to REPLICA", repl_instance_name); spdlog::error("Failed to open lock for demoting OLD MAIN {} to REPLICA", repl_instance_name);
return; return;
} }
@ -549,6 +740,26 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam
repl_instance.OnFailPing(); repl_instance.OnFailPing();
} }
void CoordinatorInstance::DemoteSuccessCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing demote to replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.SendDemoteToReplicaRpc()) {
return;
}
if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) {
return;
}
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback);
}
void CoordinatorInstance::DemoteFailCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing demote to replica failure callback", repl_instance_name);
}
auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories) auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories)
-> NewMainRes { -> NewMainRes {
std::optional<NewMainRes> new_main_res; std::optional<NewMainRes> new_main_res;

View File

@ -38,21 +38,8 @@ auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
return log_buf; return log_buf;
} }
auto CoordinatorStateMachine::SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeOpenLock() -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE}, {"info", config}}); return CreateLog({{"action", RaftLogAction::OPEN_LOCK}});
}
auto CoordinatorStateMachine::SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer> {
return CreateLog(
{{"action", RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE}, {"info", std::string{instance_name}}});
}
auto CoordinatorStateMachine::SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_FAILOVER}, {"info", std::string(instance_name)}});
}
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN}, {"info", std::string(instance_name)}});
} }
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
@ -72,6 +59,10 @@ auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view ins
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}}); return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
} }
auto CoordinatorStateMachine::SerializeInstanceNeedsDemote(std::string_view instance_name) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::INSTANCE_NEEDS_DEMOTE}, {"info", std::string{instance_name}}});
}
auto CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::UPDATE_UUID_OF_NEW_MAIN}, {"info", uuid}}); return CreateLog({{"action", RaftLogAction::UPDATE_UUID_OF_NEW_MAIN}, {"info", uuid}});
} }
@ -86,10 +77,6 @@ auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoord
return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}}); return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}});
} }
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
}
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> { auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
buffer_serializer bs(data); buffer_serializer bs(data);
auto const json = nlohmann::json::parse(bs.get_str()); auto const json = nlohmann::json::parse(bs.get_str());
@ -97,17 +84,8 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
auto const &info = json["info"]; auto const &info = json["info"];
switch (action) { switch (action) {
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: { case RaftLogAction::OPEN_LOCK: {
return {info.get<CoordinatorToReplicaConfig>(), action}; return {std::monostate{}, action};
}
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE:
[[fallthrough]];
case RaftLogAction::OPEN_LOCK_FAILOVER:
[[fallthrough]];
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN:
[[fallthrough]];
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
return {info.get<std::string>(), action};
} }
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
return {info.get<CoordinatorToReplicaConfig>(), action}; return {info.get<CoordinatorToReplicaConfig>(), action};
@ -118,6 +96,8 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
return {info.get<InstanceUUIDUpdate>(), action}; return {info.get<InstanceUUIDUpdate>(), action};
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
[[fallthrough]]; [[fallthrough]];
case RaftLogAction::INSTANCE_NEEDS_DEMOTE:
[[fallthrough]];
case RaftLogAction::SET_INSTANCE_AS_REPLICA: case RaftLogAction::SET_INSTANCE_AS_REPLICA:
return {info.get<std::string>(), action}; return {info.get<std::string>(), action};
case RaftLogAction::ADD_COORDINATOR_INSTANCE: case RaftLogAction::ADD_COORDINATOR_INSTANCE:

View File

@ -15,6 +15,7 @@
#include "coordination/coordinator_communication_config.hpp" #include "coordination/coordinator_communication_config.hpp"
#include "replication_coordination_glue/common.hpp" #include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/role.hpp"
#include "rpc/client.hpp" #include "rpc/client.hpp"
#include "rpc_errors.hpp" #include "rpc_errors.hpp"
#include "utils/result.hpp" #include "utils/result.hpp"
@ -64,6 +65,8 @@ class CoordinatorClient {
auto ReplicationClientInfo() const -> ReplicationClientInfo; auto ReplicationClientInfo() const -> ReplicationClientInfo;
auto SendFrequentHeartbeat() const -> bool;
auto SendGetInstanceTimestampsRpc() const auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>; -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;
@ -85,6 +88,9 @@ class CoordinatorClient {
CoordinatorToReplicaConfig config_; CoordinatorToReplicaConfig config_;
CoordinatorInstance *coord_instance_; CoordinatorInstance *coord_instance_;
// The reason why we have HealthCheckClientCallback is because we need to acquire lock
// before we do correct function call (main or replica), as otherwise we can enter REPLICA callback
// but right before instance was promoted to MAIN
HealthCheckClientCallback succ_cb_; HealthCheckClientCallback succ_cb_;
HealthCheckClientCallback fail_cb_; HealthCheckClientCallback fail_cb_;
}; };

View File

@ -67,6 +67,44 @@ class CoordinatorInstance {
auto HasReplicaState(std::string_view instance_name) const -> bool; auto HasReplicaState(std::string_view instance_name) const -> bool;
private: private:
template <ranges::forward_range R>
auto GetMostUpToDateInstanceFromHistories(R &&alive_instances) -> std::optional<std::string> {
auto const get_ts = [](ReplicationInstance &replica) {
spdlog::trace("Sending get instance timestamps to {}", replica.InstanceName());
return replica.GetClient().SendGetInstanceTimestampsRpc();
};
auto maybe_instance_db_histories = alive_instances | ranges::views::transform(get_ts) | ranges::to<std::vector>();
auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); };
if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) {
spdlog::error("At least one instance which was alive didn't provide per database history.");
return std::nullopt;
}
auto const ts_has_value = [](auto const &zipped) -> bool {
auto &[replica, res] = zipped;
return res.HasValue();
};
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) {
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories = ranges::views::zip(alive_instances, maybe_instance_db_histories) |
ranges::views::filter(ts_has_value) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_db_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT
return most_up_to_date_instance;
}
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
void MainFailCallback(std::string_view); void MainFailCallback(std::string_view);
@ -77,8 +115,15 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view); void ReplicaFailCallback(std::string_view);
void DemoteSuccessCallback(std::string_view repl_instance_name);
void DemoteFailCallback(std::string_view repl_instance_name);
void ForceResetCluster();
HealthCheckClientCallback client_succ_cb_, client_fail_cb_; HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability. // NOTE: Must be std::list because we rely on pointer stability.
// TODO(antoniofilipovic) do we still rely on pointer stability
std::list<ReplicationInstance> repl_instances_; std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{}; mutable utils::ResourceLock coord_instance_lock_{};

View File

@ -16,6 +16,7 @@
#include "coordination/coordinator_communication_config.hpp" #include "coordination/coordinator_communication_config.hpp"
#include "replication_coordination_glue/common.hpp" #include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/role.hpp"
#include "rpc/messages.hpp" #include "rpc/messages.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"

View File

@ -70,12 +70,9 @@ class RaftState {
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool; auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool; auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool; auto AppendOpenLock() -> bool;
auto AppendOpenLockUnregister(std::string_view) -> bool;
auto AppendOpenLockFailover(std::string_view instance_name) -> bool;
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool; auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
auto AppendInstanceNeedsDemote(std::string_view) -> bool;
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>; auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
// TODO: (andi) Do we need then GetAllCoordinators? // TODO: (andi) Do we need then GetAllCoordinators?

View File

@ -27,7 +27,7 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
RAFT_LOG_ERROR, RAFT_LOG_ERROR,
SUCCESS, SUCCESS,
LOCK_OPENED, LOCK_OPENED,
OPEN_LOCK FAILED_TO_OPEN_LOCK
}; };
enum class UnregisterInstanceCoordinatorStatus : uint8_t { enum class UnregisterInstanceCoordinatorStatus : uint8_t {
@ -39,7 +39,7 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t {
RAFT_LOG_ERROR, RAFT_LOG_ERROR,
SUCCESS, SUCCESS,
LOCK_OPENED, LOCK_OPENED,
OPEN_LOCK FAILED_TO_OPEN_LOCK
}; };
enum class SetInstanceToMainCoordinatorStatus : uint8_t { enum class SetInstanceToMainCoordinatorStatus : uint8_t {
@ -52,7 +52,7 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
SWAP_UUID_FAILED, SWAP_UUID_FAILED,
SUCCESS, SUCCESS,
LOCK_OPENED, LOCK_OPENED,
OPEN_LOCK, FAILED_TO_OPEN_LOCK,
ENABLE_WRITING_FAILED ENABLE_WRITING_FAILED
}; };

View File

@ -23,6 +23,7 @@
#include <libnuraft/nuraft.hxx> #include <libnuraft/nuraft.hxx>
#include <functional>
namespace memgraph::coordination { namespace memgraph::coordination {
class CoordinatorInstance; class CoordinatorInstance;
@ -59,6 +60,8 @@ class ReplicationInstance {
auto SendDemoteToReplicaRpc() -> bool; auto SendDemoteToReplicaRpc() -> bool;
auto SendFrequentHeartbeat() const -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool; -> bool;
@ -79,8 +82,10 @@ class ReplicationInstance {
auto EnableWritingOnMain() -> bool; auto EnableWritingOnMain() -> bool;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &; auto GetSuccessCallback() -> HealthCheckInstanceCallback;
auto GetFailCallback() -> HealthCheckInstanceCallback &; auto GetFailCallback() -> HealthCheckInstanceCallback;
void SetCallbacks(HealthCheckInstanceCallback succ_cb, HealthCheckInstanceCallback fail_cb);
private: private:
CoordinatorClient client_; CoordinatorClient client_;

View File

@ -43,8 +43,11 @@ struct ReplicationInstanceState {
// For MAIN we don't enable writing until cluster is in healthy state // For MAIN we don't enable writing until cluster is in healthy state
utils::UUID instance_uuid; utils::UUID instance_uuid;
bool needs_demote{false};
friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool { friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool {
return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid; return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid &&
lhs.needs_demote == rhs.needs_demote;
} }
}; };
@ -61,8 +64,8 @@ struct CoordinatorInstanceState {
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state); void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state);
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state); void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state);
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig, using TRaftLog = std::variant<std::string, utils::UUID, CoordinatorToReplicaConfig, CoordinatorToCoordinatorConfig,
InstanceUUIDUpdate>; InstanceUUIDUpdate, std::monostate>;
using nuraft::buffer; using nuraft::buffer;
using nuraft::buffer_serializer; using nuraft::buffer_serializer;

View File

@ -43,10 +43,7 @@ class CoordinatorStateMachine : public state_machine {
~CoordinatorStateMachine() override = default; ~CoordinatorStateMachine() override = default;
static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>; static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>;
static auto SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer>; static auto SerializeOpenLock() -> ptr<buffer>;
static auto SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer>; static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>; static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>; static auto SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
@ -54,7 +51,7 @@ class CoordinatorStateMachine : public state_machine {
static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer>; static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer>;
static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>; static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>; static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>;
static auto SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>; static auto SerializeInstanceNeedsDemote(std::string_view instance_name) -> ptr<buffer>;
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>; static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;

View File

@ -23,11 +23,7 @@
namespace memgraph::coordination { namespace memgraph::coordination {
enum class RaftLogAction : uint8_t { enum class RaftLogAction : uint8_t {
OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, OPEN_LOCK,
OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
OPEN_LOCK_FAILOVER,
OPEN_LOCK_SET_INSTANCE_AS_MAIN,
OPEN_LOCK_SET_INSTANCE_AS_REPLICA,
REGISTER_REPLICATION_INSTANCE, REGISTER_REPLICATION_INSTANCE,
UNREGISTER_REPLICATION_INSTANCE, UNREGISTER_REPLICATION_INSTANCE,
SET_INSTANCE_AS_MAIN, SET_INSTANCE_AS_MAIN,
@ -35,22 +31,18 @@ enum class RaftLogAction : uint8_t {
UPDATE_UUID_OF_NEW_MAIN, UPDATE_UUID_OF_NEW_MAIN,
ADD_COORDINATOR_INSTANCE, ADD_COORDINATOR_INSTANCE,
UPDATE_UUID_FOR_INSTANCE, UPDATE_UUID_FOR_INSTANCE,
INSTANCE_NEEDS_DEMOTE
}; };
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
{{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"}, {RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"}, {RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"}, {RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"}, {RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"},
{RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"}, {RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"},
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"}, {RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"},
{RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"}, {RaftLogAction::INSTANCE_NEEDS_DEMOTE, "instance_needs_demote"},
{RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, "open_lock_register_instance"}, {RaftLogAction::OPEN_LOCK, "open_lock"}})
{RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
"open_lock_unregister_instance"},
{RaftLogAction::OPEN_LOCK_FAILOVER, "open_lock_failover"},
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN, "open_lock_set_instance_as_main"},
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA, "open_lock_set_instance_as_replica"}})
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -160,72 +160,17 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
auto RaftState::AppendOpenLockRegister(CoordinatorToReplicaConfig const &config) -> bool { auto RaftState::AppendOpenLock() -> bool {
auto new_log = CoordinatorStateMachine::SerializeOpenLockRegister(config); auto new_log = CoordinatorStateMachine::SerializeOpenLock();
auto const res = raft_server_->append_entries({new_log}); auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) { if (!res->get_accepted()) {
spdlog::error("Failed to accept request to open lock to register instance {}", config.instance_name); spdlog::error("Failed to accept request to open lock");
return false; return false;
} }
if (res->get_result_code() != nuraft::cmd_result_code::OK) { if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to open lock for registering instance {} with error code {}", config.instance_name, spdlog::error("Failed to open lock with error code {}", int(res->get_result_code()));
int(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendOpenLockUnregister(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeOpenLockUnregister(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error("Failed to accept request to open lock to unregister instance {}.", instance_name);
return false;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to open lock for unregistering instance {} with error code {}", instance_name,
int(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendOpenLockFailover(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeOpenLockFailover(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error("Failed to accept request to open lock for failover {}", instance_name);
return false;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to open lock for failover to instance {} with error code {}", instance_name,
int(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error("Failed to accept request to open lock and set instance {} to MAIN", instance_name);
return false;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to open lock to set instance {} to MAIN with error code {}", instance_name,
int(res->get_result_code()));
return false; return false;
} }
@ -317,26 +262,6 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) ->
return true; return true;
} }
auto RaftState::AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for demoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, int(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool { auto RaftState::AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid); auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid);
auto const res = raft_server_->append_entries({new_log}); auto const res = raft_server_->append_entries({new_log});
@ -378,6 +303,25 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
return true; return true;
} }
auto RaftState::AppendInstanceNeedsDemote(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeInstanceNeedsDemote(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error("Failed to accept request that instance {} needs demote", instance_name);
return false;
}
spdlog::trace("Request that instance {} needs demote accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to add instance {} needs demote with error code {}", instance_name,
static_cast<int>(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool { auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance( auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance(
{.instance_name = std::string{instance_name}, .uuid = uuid}); {.instance_name = std::string{instance_name}, .uuid = uuid});

View File

@ -64,6 +64,8 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::SendFrequentHeartbeat() const -> bool { return client_.SendFrequentHeartbeat(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool { HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) { if (!client_.DemoteToReplica()) {
@ -85,8 +87,8 @@ auto ReplicationInstance::ReplicationClientInfo() const -> coordination::Replica
return client_.ReplicationClientInfo(); return client_.ReplicationClientInfo();
} }
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; } auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; } auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback { return fail_cb_; }
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
@ -128,5 +130,10 @@ auto ReplicationInstance::SendGetInstanceUUID()
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); } void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
void ReplicationInstance::SetCallbacks(HealthCheckInstanceCallback succ_cb, HealthCheckInstanceCallback fail_cb) {
succ_cb_ = succ_cb;
fail_cb_ = fail_cb;
}
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -409,9 +409,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"Couldn't unregister replica instance because current main instance couldn't unregister replica!"); "Couldn't unregister replica instance because current main instance couldn't unregister replica!");
case LOCK_OPENED: case LOCK_OPENED:
throw QueryRuntimeException("Couldn't unregister replica because the last action didn't finish successfully!"); throw QueryRuntimeException("Couldn't unregister replica because the last action didn't finish successfully!");
case OPEN_LOCK: case FAILED_TO_OPEN_LOCK:
throw QueryRuntimeException( throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
"Couldn't register instance as cluster didn't accept entering unregistration state!");
case SUCCESS: case SUCCESS:
break; break;
} }
@ -477,9 +476,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case LOCK_OPENED: case LOCK_OPENED:
throw QueryRuntimeException( throw QueryRuntimeException(
"Couldn't register replica instance because because the last action didn't finish successfully!"); "Couldn't register replica instance because because the last action didn't finish successfully!");
case OPEN_LOCK: case FAILED_TO_OPEN_LOCK:
throw QueryRuntimeException( throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
"Couldn't register replica instance because cluster didn't accept registration query!");
case SUCCESS: case SUCCESS:
break; break;
} }
@ -525,9 +523,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"Couldn't set replica instance to main! Check coordinator and replica for more logs"); "Couldn't set replica instance to main! Check coordinator and replica for more logs");
case SWAP_UUID_FAILED: case SWAP_UUID_FAILED:
throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main."); throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main.");
case OPEN_LOCK: case FAILED_TO_OPEN_LOCK:
throw QueryRuntimeException( throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
"Couldn't set replica instance to main as cluster didn't accept setting instance state.");
case LOCK_OPENED: case LOCK_OPENED:
throw QueryRuntimeException( throw QueryRuntimeException(
"Couldn't register replica instance because because the last action didn't finish successfully!"); "Couldn't register replica instance because because the last action didn't finish successfully!");

View File

@ -24,7 +24,7 @@ namespace memgraph::replication_coordination_glue {
struct DatabaseHistory { struct DatabaseHistory {
memgraph::utils::UUID db_uuid; memgraph::utils::UUID db_uuid;
std::vector<std::pair<std::string, uint64_t>> history; std::vector<std::pair<std::string, uint64_t>> history;
std::string name; std::string name; // db name
}; };
using DatabaseHistories = std::vector<DatabaseHistory>; using DatabaseHistories = std::vector<DatabaseHistory>;

View File

@ -17,7 +17,7 @@
namespace memgraph::replication_coordination_glue { namespace memgraph::replication_coordination_glue {
// TODO: figure out a way of ensuring that usage of this type is never uninitialed/defaulted incorrectly to MAIN // TODO: figure out a way of ensuring that usage of this type is never uninitialized/defaulted incorrectly to MAIN
enum class ReplicationRole : uint8_t { MAIN, REPLICA }; enum class ReplicationRole : uint8_t { MAIN, REPLICA };
NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationRole, {{ReplicationRole::MAIN, "main"}, {ReplicationRole::REPLICA, "replica"}}) NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationRole, {{ReplicationRole::MAIN, "main"}, {ReplicationRole::REPLICA, "replica"}})

View File

@ -1420,5 +1420,134 @@ def test_multiple_old_mains_single_failover():
time_slept += 0.1 time_slept += 0.1
def test_force_reset_works_after_failed_registration():
# Goal of this test is to check when leadership changes
# and we have old MAIN down, that we don't start failover
# 1. Start all instances.
# 2. Check everything works correctly
# 3. Try register instance which doesn't exist
# 4. Enter force reset
# 5. Check that everything works correctly
# 1
safe_execute(shutil.rmtree, TEMP_DIR)
inner_instances_description = get_instances_description_no_setup()
interactive_mg_runner.start_all(inner_instances_description)
setup_queries = [
"ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}",
"ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}",
"REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};",
"REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};",
"REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};",
"SET INSTANCE instance_3 TO MAIN",
]
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
for query in setup_queries:
execute_and_fetch_all(coord_cursor_3, query)
# 2
def show_instances_coord3():
return sorted(list(execute_and_fetch_all(coord_cursor_3, "SHOW INSTANCES;")))
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(leader_data, show_instances_coord3)
mg_sleep_and_assert(follower_data, show_instances_coord1)
mg_sleep_and_assert(follower_data, show_instances_coord2)
instance_3_cursor = connect(host="localhost", port=7689).cursor()
def show_replicas():
return sorted(list(execute_and_fetch_all(instance_3_cursor, "SHOW REPLICAS;")))
replicas = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
mg_sleep_and_assert_collection(replicas, show_replicas)
def get_vertex_count_func(cursor):
def get_vertex_count():
return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0]
return get_vertex_count
vertex_count = 0
instance_1_cursor = connect(port=7687, host="localhost").cursor()
instance_2_cursor = connect(port=7688, host="localhost").cursor()
mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_1_cursor))
mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_2_cursor))
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor_3,
"REGISTER INSTANCE instance_4 WITH CONFIG {'bolt_server': '127.0.0.1:7680', 'management_server': '127.0.0.1:10050', 'replication_server': '127.0.0.1:10051'};",
)
# This will trigger force reset and choosing of new instance as MAIN
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
]
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "replica"),
]
mg_sleep_and_assert(leader_data, show_instances_coord3)
mg_sleep_and_assert(follower_data, show_instances_coord1)
mg_sleep_and_assert(follower_data, show_instances_coord2)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -15,14 +15,14 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <gtest/internal/gtest-param-util-generated.h>
#include "auth/models.hpp" #include "auth/models.hpp"
#include "disk_test_utils.hpp" #include "disk_test_utils.hpp"
#include "license/license.hpp" #include "license/license.hpp"
#include "storage/v2/disk/storage.hpp" #include "storage/v2/disk/storage.hpp"
#include "storage/v2/inmemory/storage.hpp" #include "storage/v2/inmemory/storage.hpp"
#include <gtest/gtest.h>
using namespace memgraph::query; using namespace memgraph::query;
using namespace memgraph::query::plan; using namespace memgraph::query::plan;

View File

@ -28,6 +28,7 @@
#include <json/json.hpp> #include <json/json.hpp>
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
#include <antlr4-runtime.h> #include <antlr4-runtime.h>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file // by the Apache License, Version 2.0, included in the file
// licenses/APL.txt. // licenses/APL.txt.
#include <gmock/gmock-generated-matchers.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest-death-test.h> #include <gtest/gtest-death-test.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>

View File

@ -15,6 +15,7 @@
#include <thread> #include <thread>
#include <fmt/format.h> #include <fmt/format.h>
#include <gmock/gmock-generated-matchers.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>

View File

@ -1,4 +1,4 @@
// Copyright 2024 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,7 +11,7 @@
#include <filesystem> #include <filesystem>
#include <gmock/gmock.h> #include <gmock/gmock-generated-matchers.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "utils/settings.hpp" #include "utils/settings.hpp"