Support failure of coordinators (#1728)

This commit is contained in:
Andi 2024-03-04 08:24:18 +01:00 committed by GitHub
parent 33caa27161
commit 822183b62d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 1557 additions and 574 deletions

View File

@ -16,6 +16,8 @@ target_sources(mg-coordination
include/coordination/raft_state.hpp
include/coordination/rpc_errors.hpp
include/nuraft/raft_log_action.hpp
include/nuraft/coordinator_cluster_state.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp
include/nuraft/coordinator_state_manager.hpp
@ -33,6 +35,7 @@ target_sources(mg-coordination
coordinator_log_store.cpp
coordinator_state_machine.cpp
coordinator_state_manager.cpp
coordinator_cluster_state.cpp
)
target_include_directories(mg-coordination PUBLIC include)

View File

@ -41,7 +41,9 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
fail_cb_{std::move(fail_cb)} {}
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); }
auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); }
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
return config_.instance_down_timeout_sec;
@ -64,7 +66,7 @@ void CoordinatorClient::StartFrequentCheck() {
[this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
config_.CoordinatorSocketAddress());
{ // NOTE: This is intentionally scoped so that stream lock could get released.
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
@ -117,7 +119,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
return false;
}
auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool {
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
try {
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};
if (!stream.AwaitResponse().success) {
@ -131,9 +133,10 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo
return false;
}
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool {
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
try {
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)};
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(
std::string(instance_name))}; // TODO: (andi) Try to change to stream string_view and do just one copy later
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to receive successful RPC response for unregistering replica!");
return false;

View File

@ -0,0 +1,166 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_cluster_state.hpp"
#include "utils/logging.hpp"
#include <shared_mutex>
namespace memgraph::coordination {
using replication_coordination_glue::ReplicationRole;
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
: instance_roles_{other.instance_roles_} {}
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) {
if (this == &other) {
return *this;
}
instance_roles_ = other.instance_roles_;
return *this;
}
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept
: instance_roles_{std::move(other.instance_roles_)} {}
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept {
if (this == &other) {
return *this;
}
instance_roles_ = std::move(other.instance_roles_);
return *this;
}
auto CoordinatorClusterState::MainExists() const -> bool {
auto lock = std::shared_lock{log_lock_};
return std::ranges::any_of(instance_roles_,
[](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; });
}
auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool {
auto lock = std::shared_lock{log_lock_};
auto const it = instance_roles_.find(instance_name);
return it != instance_roles_.end() && it->second.role == ReplicationRole::MAIN;
}
auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool {
auto lock = std::shared_lock{log_lock_};
auto const it = instance_roles_.find(instance_name);
return it != instance_roles_.end() && it->second.role == ReplicationRole::REPLICA;
}
auto CoordinatorClusterState::InsertInstance(std::string_view instance_name, ReplicationRole role) -> void {
auto lock = std::unique_lock{log_lock_};
instance_roles_[instance_name.data()].role = role;
}
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
auto lock = std::unique_lock{log_lock_};
switch (log_action) {
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: {
auto const &config = std::get<CoordinatorClientConfig>(log_entry);
instance_roles_[config.instance_name] = InstanceState{config, ReplicationRole::REPLICA};
break;
}
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
auto const instance_name = std::get<std::string>(log_entry);
instance_roles_.erase(instance_name);
break;
}
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
auto const instance_name = std::get<std::string>(log_entry);
auto it = instance_roles_.find(instance_name);
MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!");
it->second.role = ReplicationRole::MAIN;
break;
}
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {
auto const instance_name = std::get<std::string>(log_entry);
auto it = instance_roles_.find(instance_name);
MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!");
it->second.role = ReplicationRole::REPLICA;
break;
}
case RaftLogAction::UPDATE_UUID: {
uuid_ = std::get<utils::UUID>(log_entry);
break;
}
}
}
// TODO: (andi) Improve based on Gareth's comments
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto lock = std::shared_lock{log_lock_};
auto const role_to_string = [](auto const &role) -> std::string_view {
switch (role) {
case ReplicationRole::MAIN:
return "main";
case ReplicationRole::REPLICA:
return "replica";
}
};
auto const entry_to_string = [&role_to_string](auto const &entry) {
return fmt::format("{}_{}", entry.first, role_to_string(entry.second.role));
};
auto instances_str_view = instance_roles_ | ranges::views::transform(entry_to_string);
uint32_t size =
std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0,
[](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); });
data = buffer::alloc(size);
buffer_serializer bs(data);
std::for_each(instances_str_view.begin(), instances_str_view.end(), [&bs](auto const &entry) { bs.put_str(entry); });
}
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
auto const str_to_role = [](auto const &str) -> ReplicationRole {
if (str == "main") {
return ReplicationRole::MAIN;
}
return ReplicationRole::REPLICA;
};
CoordinatorClusterState cluster_state;
buffer_serializer bs(data);
while (bs.size() > 0) {
auto const entry = bs.get_str();
auto const first_dash = entry.find('_');
auto const instance_name = entry.substr(0, first_dash);
auto const role_str = entry.substr(first_dash + 1);
cluster_state.InsertInstance(instance_name, str_to_role(role_str));
}
return cluster_state;
}
auto CoordinatorClusterState::GetInstances() const -> std::vector<InstanceState> {
auto lock = std::shared_lock{log_lock_};
return instance_roles_ | ranges::views::values | ranges::to<std::vector<InstanceState>>;
}
auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; }
auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
auto lock = std::shared_lock{log_lock_};
auto const it = std::ranges::find_if(instance_roles_,
[](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; });
if (it == instance_roles_.end()) {
return {};
}
return it->first;
}
} // namespace memgraph::coordination
#endif

View File

@ -32,8 +32,42 @@ using nuraft::srv_config;
CoordinatorInstance::CoordinatorInstance()
: raft_state_(RaftState::MakeRaftState(
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
[this]() {
spdlog::info("Leader changed, starting all replication instances!");
auto const instances = raft_state_.GetInstances();
auto replicas = instances | ranges::views::filter([](auto const &instance) {
return instance.role == ReplicationRole::REPLICA;
});
std::ranges::for_each(replicas, [this](auto &replica) {
spdlog::info("Starting replication instance {}", replica.config.instance_name);
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
});
auto main = instances | ranges::views::filter(
[](auto const &instance) { return instance.role == ReplicationRole::MAIN; });
// TODO: (andi) Add support for this
// MG_ASSERT(std::ranges::distance(main) == 1, "There should be exactly one main instance");
std::ranges::for_each(main, [this](auto &main_instance) {
spdlog::info("Starting main instance {}", main_instance.config.instance_name);
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback);
});
std::ranges::for_each(repl_instances_, [this](auto &instance) {
instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename
instance.StartFrequentCheck();
});
},
[this]() {
spdlog::info("Leader changed, stopping all replication instances!");
repl_instances_.clear();
})) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
@ -59,75 +93,98 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = raft_state_.GetAllCoordinators();
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
.health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
auto instances_status = utils::fmap(coord_instance_to_status, coord_instances);
{
auto lock = std::shared_lock{coord_instance_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
auto instances_status = utils::fmap(coord_instance_to_status, raft_state_.GetAllCoordinators());
if (raft_state_.IsLeader()) {
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (raft_state_.IsMain(instance.InstanceName())) return "main";
return "replica";
};
auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string {
return instance.IsAlive() ? "up" : "down";
};
auto process_repl_instance_as_leader =
[&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.CoordinatorSocketAddress(),
.cluster_role = stringify_repl_role(instance),
.health = stringify_repl_health(instance)};
};
{
auto lock = std::shared_lock{coord_instance_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
}
} else {
auto const stringify_repl_role = [](ReplicationRole role) -> std::string {
return role == ReplicationRole::MAIN ? "main" : "replica";
};
// TODO: (andi) Add capability that followers can also return socket addresses
auto process_repl_instance_as_follower = [&stringify_repl_role](auto const &instance) -> InstanceStatus {
return {.instance_name = instance.config.instance_name,
.cluster_role = stringify_repl_role(instance.role),
.health = "unknown"};
};
std::ranges::transform(raft_state_.GetInstances(), std::back_inserter(instances_status),
process_repl_instance_as_follower);
}
return instances_status;
}
auto CoordinatorInstance::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
auto alive_replicas =
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// for each DB in instance we get one DatabaseHistory
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
if (!raft_state_.RequestLeadership()) {
spdlog::error("Failover failed since the instance is not the leader!");
return;
}
bool success{true};
std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
if (!success) {
return;
}
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
if (res.HasError()) {
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
success = false;
return;
}
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
});
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
if (!success) {
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); };
if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) {
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
return;
}
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) {
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories =
ranges::views::zip(alive_replicas, maybe_instance_db_histories) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_database_histories);
ChooseMostUpToDateInstance(instance_db_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, latest_epoch, latest_commit_timestamp);
most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
@ -139,16 +196,17 @@ auto CoordinatorInstance::TryFailover() -> void {
};
auto const new_main_uuid = utils::UUID{};
auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) {
return !instance.SendSwapAndUpdateUUID(new_main_uuid);
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
if (std::ranges::any_of(alive_replicas | ranges::views::filter(is_not_new_main), failed_to_swap)) {
spdlog::error("Failed to swap uuid for all instances");
return;
}
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
@ -158,23 +216,36 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
SetMainUUID(new_main_uuid);
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
return;
}
auto const new_main_instance_name = new_main->InstanceName();
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
return;
}
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
if (std::ranges::any_of(repl_instances_, &ReplicationInstance::IsMain)) {
if (raft_state_.MainExists()) {
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
}
if (!raft_state_.RequestLeadership()) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
}
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
@ -192,99 +263,149 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
auto const new_main_uuid = utils::UUID{};
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) {
return !instance.SendSwapAndUpdateUUID(new_main_uuid);
};
if (std::ranges::any_of(repl_instances_ | ranges::views::filter(is_not_new_main), failed_to_swap)) {
spdlog::error("Failed to swap uuid for all instances");
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
SetMainUUID(new_main_uuid);
spdlog::info("Instance {} promoted to main", instance_name);
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
}
if (!raft_state_.AppendSetInstanceAsMainLog(instance_name)) {
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
}
spdlog::info("Instance {} promoted to main on leader", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config)
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto instance_name = config.instance_name;
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
if (std::ranges::any_of(repl_instances_, name_matches)) {
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
auto const socket_address_matches = [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
};
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS;
}
if (std::ranges::any_of(repl_instances_, socket_address_matches)) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
}
if (!raft_state_.RequestLeadership()) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const res = raft_state_.AppendRegisterReplicationInstance(instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not "
"the "
"leader.",
config.instance_name);
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
spdlog::info("Request for registering instance {} accepted", instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
} catch (CoordinatorRegisterInstanceException const &) {
if (!new_instance->SendDemoteToReplicaRpc()) {
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
repl_instances_.pop_back();
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", instance_name, res->get_result_code());
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) {
return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR;
}
spdlog::info("Instance {} registered", instance_name);
new_instance->StartFrequentCheck();
spdlog::info("Instance {} registered", config.instance_name);
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
if (!raft_state_.RequestLeadership()) {
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches);
if (inst_to_remove == repl_instances_.end()) {
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
// TODO: (andi) Change so that RaftLogState is the central place for asking who is main...
auto const is_main = [this](ReplicationInstance const &instance) { return IsMain(instance.InstanceName()); };
if (is_main(*inst_to_remove) && inst_to_remove->IsAlive()) {
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
}
inst_to_remove->StopFrequentCheck();
auto curr_main = std::ranges::find_if(repl_instances_, is_main);
if (curr_main != repl_instances_.end() && curr_main->IsAlive()) {
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
inst_to_remove->StartFrequentCheck();
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
}
}
std::erase_if(repl_instances_, name_matches);
if (!raft_state_.AppendUnregisterReplicationInstanceLog(instance_name)) {
return UnregisterInstanceCoordinatorStatus::RAFT_LOG_ERROR;
}
return UnregisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port,
std::string_view raft_address) -> void {
raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
MG_ASSERT(repl_instance_uuid.has_value(), "Replication instance must have uuid set");
if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
// NOLINTNEXTLINE
if (!repl_instance.IsAlive() && raft_state_.GetUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
TryFailover();
}
}
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
@ -294,8 +415,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
// NOLINTNEXTLINE
if (raft_state_.GetUUID() == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
@ -305,6 +426,12 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
return;
}
if (!raft_state_.RequestLeadership()) {
spdlog::error("Demoting main instance {} to replica failed since the instance is not the leader!",
repl_instance_name);
return;
}
if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
repl_instance.OnSuccessPing();
@ -314,24 +441,29 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetUUID())) {
spdlog::error("Failed to swap uuid for demoted main instance {}", repl_instance_name);
return;
}
if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) {
return;
}
}
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
if (!IsReplica(repl_instance_name)) {
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetUUID())) {
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
return;
}
@ -340,57 +472,19 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
}
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
if (!IsReplica(repl_instance_name)) {
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
repl_instance.OnFailPing();
}
auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
-> UnregisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches);
if (inst_to_remove == repl_instances_.end()) {
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (inst_to_remove->IsMain() && inst_to_remove->IsAlive()) {
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
}
inst_to_remove->StopFrequentCheck();
auto curr_main = std::ranges::find_if(repl_instances_, &ReplicationInstance::IsMain);
MG_ASSERT(curr_main != repl_instances_.end(), "There must be a main instance when unregistering a replica");
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
inst_to_remove->StartFrequentCheck();
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
}
std::erase_if(repl_instances_, name_matches);
return UnregisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; }
// TODO: (andi) Add to the RAFT log.
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
auto CoordinatorInstance::ChooseMostUpToDateInstance(
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
&instance_database_histories) -> NewMainRes {
auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories)
-> NewMainRes {
std::optional<NewMainRes> new_main_res;
std::for_each(
instance_database_histories.begin(), instance_database_histories.end(),
@ -456,5 +550,14 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(
return std::move(*new_main_res);
}
auto CoordinatorInstance::IsMain(std::string_view instance_name) const -> bool {
return raft_state_.IsMain(instance_name);
}
auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> bool {
return raft_state_.IsReplica(instance_name);
}
} // namespace memgraph::coordination
#endif

View File

@ -41,7 +41,7 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config)
auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
@ -56,7 +56,8 @@ auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig confi
data_);
}
auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus {
auto CoordinatorState::UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot unregister instance since variant holds wrong alternative");
@ -70,7 +71,8 @@ auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name)
data_);
}
auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto CoordinatorState::SetReplicationInstanceToMain(std::string_view instance_name)
-> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
@ -96,8 +98,8 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
return *std::get<CoordinatorMainReplicaData>(data_).coordinator_server_;
}
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port,
std::string_view raft_address) -> void {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);

View File

@ -12,37 +12,94 @@
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_state_machine.hpp"
#include "utils/logging.hpp"
namespace memgraph::coordination {
auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(sizeof(uint32_t) + str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional<std::string> {
return cluster_state_.FindCurrentMainInstanceName();
}
auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string {
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool {
return cluster_state_.IsMain(instance_name);
}
auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const -> bool {
return cluster_state_.IsReplica(instance_name);
}
auto CoordinatorStateMachine::CreateLog(std::string_view log) -> ptr<buffer> {
ptr<buffer> log_buf = buffer::alloc(sizeof(uint32_t) + log.size());
buffer_serializer bs(log_buf);
bs.put_str(log.data());
return log_buf;
}
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer> {
auto const str_log = fmt::format("{}*register", config.ToString());
return CreateLog(str_log);
}
auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*unregister", instance_name);
return CreateLog(str_log);
}
auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*promote", instance_name);
return CreateLog(str_log);
}
auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*demote", instance_name);
return CreateLog(str_log);
}
auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer> {
auto const str_log = fmt::format("{}*update_uuid", nlohmann::json{{"uuid", uuid}}.dump());
return CreateLog(str_log);
}
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
buffer_serializer bs(data);
return bs.get_str();
auto const log_str = bs.get_str();
auto const sep = log_str.find('*');
auto const action = log_str.substr(sep + 1);
auto const info = log_str.substr(0, sep);
if (action == "register") {
return {CoordinatorClientConfig::FromString(info), RaftLogAction::REGISTER_REPLICATION_INSTANCE};
}
if (action == "unregister") {
return {info, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE};
}
if (action == "promote") {
return {info, RaftLogAction::SET_INSTANCE_AS_MAIN};
}
if (action == "demote") {
return {info, RaftLogAction::SET_INSTANCE_AS_REPLICA};
}
if (action == "update_uuid") {
auto const json = nlohmann::json::parse(info);
return {json.at("uuid").get<utils::UUID>(), RaftLogAction::UPDATE_UUID};
}
throw std::runtime_error("Unknown action");
}
auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("pre_commit {} : {}", log_idx, str);
return nullptr;
}
auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; }
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("commit {} : {}", log_idx, str);
auto const [parsed_data, log_action] = DecodeLog(data);
cluster_state_.DoAction(parsed_data, log_action);
last_committed_idx_ = log_idx;
// TODO: (andi) Don't return nullptr
return nullptr;
}
@ -51,61 +108,95 @@ auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_con
}
auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("rollback {} : {}", log_idx, str);
// NOTE: Nothing since we don't do anything in pre_commit
}
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/,
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id,
ptr<buffer> &data_out, bool &is_last_obj) -> int {
// Put dummy data.
data_out = buffer::alloc(sizeof(int32));
buffer_serializer bs(data_out);
bs.put_i32(0);
spdlog::info("read logical snapshot object, obj_id: {}", obj_id);
ptr<SnapshotCtx> ctx = nullptr;
{
auto ll = std::lock_guard{snapshots_lock_};
auto entry = snapshots_.find(snapshot.get_last_log_idx());
if (entry == snapshots_.end()) {
data_out = nullptr;
is_last_obj = true;
return 0;
}
ctx = entry->second;
}
ctx->cluster_state_.Serialize(data_out);
is_last_obj = true;
return 0;
}
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/,
bool /*is_last_obj*/) -> void {
spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id);
// Request next object.
obj_id++;
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj,
bool is_last_obj) -> void {
spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj,
is_last_obj);
buffer_serializer bs(data);
auto cluster_state = CoordinatorClusterState::Deserialize(data);
{
auto ll = std::lock_guard{snapshots_lock_};
auto entry = snapshots_.find(snapshot.get_last_log_idx());
DMG_ASSERT(entry != snapshots_.end());
entry->second->cluster_state_ = cluster_state;
}
}
auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool {
spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
{
auto lock = std::lock_guard{last_snapshot_lock_};
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
auto ll = std::lock_guard{snapshots_lock_};
auto entry = snapshots_.find(s.get_last_log_idx());
if (entry == snapshots_.end()) return false;
cluster_state_ = entry->second->cluster_state_;
return true;
}
auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {}
auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> {
auto lock = std::lock_guard{last_snapshot_lock_};
return last_snapshot_;
auto ll = std::lock_guard{snapshots_lock_};
auto entry = snapshots_.rbegin();
if (entry == snapshots_.rend()) return nullptr;
ptr<SnapshotCtx> ctx = entry->second;
return ctx->snapshot_;
}
auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; }
auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void {
spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
// Clone snapshot from `s`.
{
auto lock = std::lock_guard{last_snapshot_lock_};
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
ptr<buffer> snp_buf = s.serialize();
ptr<snapshot> ss = snapshot::deserialize(*snp_buf);
create_snapshot_internal(ss);
ptr<std::exception> except(nullptr);
bool ret = true;
when_done(ret, except);
}
auto CoordinatorStateMachine::create_snapshot_internal(ptr<snapshot> snapshot) -> void {
auto ll = std::lock_guard{snapshots_lock_};
auto ctx = cs_new<SnapshotCtx>(snapshot, cluster_state_);
snapshots_[snapshot->get_last_log_idx()] = ctx;
constexpr int MAX_SNAPSHOTS = 3;
while (snapshots_.size() > MAX_SNAPSHOTS) {
snapshots_.erase(snapshots_.begin());
}
}
auto CoordinatorStateMachine::GetInstances() const -> std::vector<InstanceState> {
return cluster_state_.GetInstances();
}
auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); }
} // namespace memgraph::coordination
#endif

View File

@ -46,16 +46,17 @@ class CoordinatorClient {
void ResumeFrequentCheck();
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
[[nodiscard]] auto DemoteToReplica() const -> bool;
auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const
auto SendPromoteReplicaToMainRpc(utils::UUID const &uuid, ReplicationClientsInfo replication_clients_info) const
-> bool;
auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool;
auto SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool;
auto SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool;
auto SendEnableWritingOnMainRpc() const -> bool;

View File

@ -14,16 +14,20 @@
#ifdef MG_ENTERPRISE
#include "replication_coordination_glue/mode.hpp"
#include "utils/string.hpp"
#include <chrono>
#include <cstdint>
#include <optional>
#include <string>
#include <fmt/format.h>
namespace memgraph::coordination {
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
// TODO: (andi) JSON serialization for RAFT log.
struct CoordinatorClientConfig {
std::string instance_name;
std::string ip_address;
@ -32,14 +36,35 @@ struct CoordinatorClientConfig {
std::chrono::seconds instance_down_timeout_sec{5};
std::chrono::seconds instance_get_uuid_frequency_sec{10};
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }
auto CoordinatorSocketAddress() const -> std::string { return fmt::format("{}:{}", ip_address, port); }
auto ReplicationSocketAddress() const -> std::string {
return fmt::format("{}:{}", replication_client_info.replication_ip_address,
replication_client_info.replication_port);
}
struct ReplicationClientInfo {
// TODO: (andi) Do we even need here instance_name for this struct?
std::string instance_name;
replication_coordination_glue::ReplicationMode replication_mode{};
std::string replication_ip_address;
uint16_t replication_port{};
auto ToString() const -> std::string {
return fmt::format("{}#{}#{}#{}", instance_name, replication_ip_address, replication_port,
replication_coordination_glue::ReplicationModeToString(replication_mode));
}
// TODO: (andi) How can I make use of monadic parsers here?
static auto FromString(std::string_view log) -> ReplicationClientInfo {
ReplicationClientInfo replication_client_info;
auto splitted = utils::Split(log, "#");
replication_client_info.instance_name = splitted[0];
replication_client_info.replication_ip_address = splitted[1];
replication_client_info.replication_port = std::stoi(splitted[2]);
replication_client_info.replication_mode = replication_coordination_glue::ReplicationModeFromString(splitted[3]);
return replication_client_info;
}
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
};
@ -54,6 +79,25 @@ struct CoordinatorClientConfig {
std::optional<SSL> ssl;
auto ToString() const -> std::string {
return fmt::format("{}|{}|{}|{}|{}|{}|{}", instance_name, ip_address, port,
instance_health_check_frequency_sec.count(), instance_down_timeout_sec.count(),
instance_get_uuid_frequency_sec.count(), replication_client_info.ToString());
}
static auto FromString(std::string_view log) -> CoordinatorClientConfig {
CoordinatorClientConfig config;
auto splitted = utils::Split(log, "|");
config.instance_name = splitted[0];
config.ip_address = splitted[1];
config.port = std::stoi(splitted[2]);
config.instance_health_check_frequency_sec = std::chrono::seconds(std::stoi(splitted[3]));
config.instance_down_timeout_sec = std::chrono::seconds(std::stoi(splitted[4]));
config.instance_get_uuid_frequency_sec = std::chrono::seconds(std::stoi(splitted[5]));
config.replication_client_info = ReplicationClientInfo::FromString(splitted[6]);
return config;
}
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
};

View File

@ -83,5 +83,16 @@ class RaftCouldNotParseFlagsException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException)
};
class InvalidRaftLogActionException final : public utils::BasicException {
public:
explicit InvalidRaftLogActionException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit InvalidRaftLogActionException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: InvalidRaftLogActionException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(InvalidRaftLogActionException)
};
} // namespace memgraph::coordination
#endif

View File

@ -37,20 +37,25 @@ class CoordinatorInstance {
public:
CoordinatorInstance();
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void;
auto GetMainUUID() const -> utils::UUID;
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
auto SetMainUUID(utils::UUID new_uuid) -> void;
private:
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
auto OnRaftCommitCallback(TRaftLog const &log_entry, RaftLogAction log_action) -> void;
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
@ -62,17 +67,14 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view);
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
private:
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
// NOTE: Must be std::list because we rely on pointer stability.
// Leader and followers should both have same view on repl_instances_
std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{};
utils::UUID main_uuid_;
RaftState raft_state_;
};

View File

@ -33,14 +33,16 @@ class CoordinatorState {
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void;
// NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;

View File

@ -26,7 +26,7 @@ struct InstanceStatus {
std::string raft_socket_address;
std::string coord_socket_address;
std::string cluster_role;
bool is_alive;
std::string health;
};
} // namespace memgraph::coordination

View File

@ -14,11 +14,16 @@
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
class CoordinatorInstance;
struct CoordinatorClientConfig;
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
@ -47,26 +52,39 @@ class RaftState {
RaftState &operator=(RaftState &&other) noexcept = default;
~RaftState();
static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState;
static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result>;
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
// TODO: (andi) I think variables below can be abstracted
auto AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool;
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetUUID() const -> utils::UUID;
private:
// TODO: (andi) I think variables below can be abstracted/clean them.
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<CoordinatorStateMachine> state_machine_;
ptr<CoordinatorStateManager> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;

View File

@ -19,12 +19,12 @@ namespace memgraph::coordination {
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
ENDPOINT_EXISTS,
COORD_ENDPOINT_EXISTS,
REPL_ENDPOINT_EXISTS,
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
RPC_FAILED,
RAFT_LOG_ERROR,
SUCCESS
};
@ -32,8 +32,9 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
IS_MAIN,
NOT_COORDINATOR,
NOT_LEADER,
RPC_FAILED,
NOT_LEADER,
RAFT_LOG_ERROR,
SUCCESS,
};
@ -41,9 +42,11 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
MAIN_ALREADY_EXISTS,
NOT_COORDINATOR,
SUCCESS,
NOT_LEADER,
RAFT_LOG_ERROR,
COULD_NOT_PROMOTE_TO_MAIN,
SWAP_UUID_FAILED
SWAP_UUID_FAILED,
SUCCESS,
};
} // namespace memgraph::coordination

View File

@ -17,11 +17,12 @@
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
#include <libnuraft/nuraft.hxx>
#include "utils/resource_lock.hpp"
#include "utils/result.hpp"
#include "utils/uuid.hpp"
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
class CoordinatorInstance;
@ -50,13 +51,14 @@ class ReplicationInstance {
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
auto PromoteToMain(utils::UUID const &uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
auto SendDemoteToReplicaRpc() -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
@ -69,8 +71,8 @@ class ReplicationInstance {
auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool;
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
auto SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool;
auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool;
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
auto GetClient() -> CoordinatorClient &;
@ -78,14 +80,14 @@ class ReplicationInstance {
auto EnableWritingOnMain() -> bool;
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
auto ResetMainUUID() -> void;
auto GetMainUUID() const -> std::optional<utils::UUID> const &;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};
bool is_alive_{false};
std::chrono::system_clock::time_point last_check_of_uuid_{};
@ -101,7 +103,8 @@ class ReplicationInstance {
HealthCheckInstanceCallback fail_cb_;
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
first.is_alive_ == second.is_alive_ && first.main_uuid_ == second.main_uuid_;
}
};

View File

@ -0,0 +1,82 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "nuraft/raft_log_action.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/resource_lock.hpp"
#include "utils/uuid.hpp"
#include <libnuraft/nuraft.hxx>
#include <range/v3/view.hpp>
#include <map>
#include <numeric>
#include <string>
#include <variant>
namespace memgraph::coordination {
using replication_coordination_glue::ReplicationRole;
struct InstanceState {
CoordinatorClientConfig config;
ReplicationRole role;
};
using TRaftLog = std::variant<CoordinatorClientConfig, std::string, utils::UUID>;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
class CoordinatorClusterState {
public:
CoordinatorClusterState() = default;
CoordinatorClusterState(CoordinatorClusterState const &);
CoordinatorClusterState &operator=(CoordinatorClusterState const &);
CoordinatorClusterState(CoordinatorClusterState &&other) noexcept;
CoordinatorClusterState &operator=(CoordinatorClusterState &&other) noexcept;
~CoordinatorClusterState() = default;
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
auto InsertInstance(std::string_view instance_name, ReplicationRole role) -> void;
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
auto Serialize(ptr<buffer> &data) -> void;
static auto Deserialize(buffer &data) -> CoordinatorClusterState;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetUUID() const -> utils::UUID;
private:
std::map<std::string, InstanceState, std::less<>> instance_roles_;
utils::UUID uuid_{};
mutable utils::ResourceLock log_lock_{};
};
} // namespace memgraph::coordination
#endif

View File

@ -13,9 +13,15 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "nuraft/coordinator_cluster_state.hpp"
#include "nuraft/raft_log_action.hpp"
#include <spdlog/spdlog.h>
#include <libnuraft/nuraft.hxx>
#include <variant>
namespace memgraph::coordination {
using nuraft::async_result;
@ -36,9 +42,19 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer>;
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string;
static auto CreateLog(std::string_view log) -> ptr<buffer>;
static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer>;
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer>;
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
@ -64,11 +80,31 @@ class CoordinatorStateMachine : public state_machine {
auto create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void override;
auto GetInstances() const -> std::vector<InstanceState>;
auto GetUUID() const -> utils::UUID;
private:
struct SnapshotCtx {
SnapshotCtx(ptr<snapshot> &snapshot, CoordinatorClusterState const &cluster_state)
: snapshot_(snapshot), cluster_state_(cluster_state) {}
ptr<snapshot> snapshot_;
CoordinatorClusterState cluster_state_;
};
auto create_snapshot_internal(ptr<snapshot> snapshot) -> void;
CoordinatorClusterState cluster_state_;
// mutable utils::RWLock lock{utils::RWLock::Priority::READ};
std::atomic<uint64_t> last_committed_idx_{0};
ptr<snapshot> last_snapshot_;
// TODO: (andi) Maybe not needed, remove it
std::map<uint64_t, ptr<SnapshotCtx>> snapshots_;
std::mutex snapshots_lock_;
ptr<snapshot> last_snapshot_;
std::mutex last_snapshot_lock_;
};

View File

@ -0,0 +1,53 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_exceptions.hpp"
#include <cstdint>
#include <string>
namespace memgraph::coordination {
enum class RaftLogAction : uint8_t {
REGISTER_REPLICATION_INSTANCE,
UNREGISTER_REPLICATION_INSTANCE,
SET_INSTANCE_AS_MAIN,
SET_INSTANCE_AS_REPLICA,
UPDATE_UUID
};
inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction {
if (action == "register") {
return RaftLogAction::REGISTER_REPLICATION_INSTANCE;
}
if (action == "unregister") {
return RaftLogAction::UNREGISTER_REPLICATION_INSTANCE;
}
if (action == "promote") {
return RaftLogAction::SET_INSTANCE_AS_MAIN;
}
if (action == "demote") {
return RaftLogAction::SET_INSTANCE_AS_REPLICA;
}
if (action == "update_uuid") {
return RaftLogAction::UPDATE_UUID;
}
throw InvalidRaftLogActionException("Invalid Raft log action: {}.", action);
}
} // namespace memgraph::coordination
#endif

View File

@ -13,9 +13,8 @@
#include "coordination/raft_state.hpp"
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
namespace memgraph::coordination {
@ -90,18 +89,13 @@ auto RaftState::InitRaftServer() -> void {
throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_);
}
auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState {
uint32_t raft_server_id{0};
uint32_t raft_port{0};
try {
raft_server_id = FLAGS_raft_server_id;
raft_port = FLAGS_raft_server_port;
} catch (std::exception const &e) {
throw RaftCouldNotParseFlagsException("Failed to parse flags: {}", e.what());
}
auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState {
uint32_t raft_server_id = FLAGS_raft_server_id;
uint32_t raft_port = FLAGS_raft_server_port;
auto raft_state =
RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_server_id, raft_port, "127.0.0.1");
raft_state.InitRaftServer();
return raft_state;
}
@ -112,8 +106,9 @@ auto RaftState::InstanceName() const -> std::string { return "coordinator_" + st
auto RaftState::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); }
auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address)
-> void {
auto const endpoint = fmt::format("{}:{}", raft_address, raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
@ -131,10 +126,123 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
auto RaftState::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> {
auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance);
return raft_server_->append_entries({new_log});
auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool {
auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not "
"the "
"leader.",
config.instance_name);
return false;
}
spdlog::info("Request for registering instance {} accepted", config.instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUnregisterInstance(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for unregistering instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for unregistering instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for promoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsReplica(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for demoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for updating UUID. Most likely the reason is that the instance is not "
"the leader.");
return false;
}
spdlog::info("Request for updating UUID accepted");
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to update UUID with error code {}", res->get_result_code());
return false;
}
return true;
}
auto RaftState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
return state_machine_->FindCurrentMainInstanceName();
}
auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); }
auto RaftState::IsMain(std::string_view instance_name) const -> bool { return state_machine_->IsMain(instance_name); }
auto RaftState::IsReplica(std::string_view instance_name) const -> bool {
return state_machine_->IsReplica(instance_name);
}
auto RaftState::GetInstances() const -> std::vector<InstanceState> { return state_machine_->GetInstances(); }
auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); }
} // namespace memgraph::coordination
#endif

View File

@ -25,15 +25,8 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC
HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb)
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
fail_cb_(fail_instance_cb) {}
auto ReplicationInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
@ -52,24 +45,17 @@ auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
}
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); }
auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto ReplicationInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
main_uuid_ = new_uuid;
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
@ -77,13 +63,14 @@ auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClients
return true;
}
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
succ_cb_ = replica_succ_cb;
fail_cb_ = replica_fail_cb;
@ -117,6 +104,7 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur
}
UpdateReplicaLastResponseUUID();
// NOLINTNEXTLINE
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
return true;
}
@ -124,7 +112,7 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur
return SendSwapAndUpdateUUID(curr_main_uuid);
}
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool {
auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
return false;
}
@ -132,7 +120,7 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid
return true;
}
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_name) -> bool {
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
return client_.SendUnregisterReplicaRpc(instance_name);
}

View File

@ -20,28 +20,28 @@ namespace memgraph::dbms {
CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state)
: coordinator_state_(coordinator_state) {}
auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config)
auto CoordinatorHandler::RegisterReplicationInstance(coordination::CoordinatorClientConfig const &config)
-> coordination::RegisterInstanceCoordinatorStatus {
return coordinator_state_.RegisterReplicationInstance(config);
}
auto CoordinatorHandler::UnregisterReplicationInstance(std::string instance_name)
auto CoordinatorHandler::UnregisterReplicationInstance(std::string_view instance_name)
-> coordination::UnregisterInstanceCoordinatorStatus {
return coordinator_state_.UnregisterReplicationInstance(std::move(instance_name));
return coordinator_state_.UnregisterReplicationInstance(instance_name);
}
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name)
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string_view instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name));
return coordinator_state_.SetReplicationInstanceToMain(instance_name);
}
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> {
return coordinator_state_.ShowInstances();
}
auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port,
std::string_view raft_address) -> void {
coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
} // namespace memgraph::dbms

View File

@ -30,16 +30,17 @@ class CoordinatorHandler {
// TODO: (andi) When moving coordinator state on same instances, rename from RegisterReplicationInstance to
// RegisterInstance
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config)
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig const &config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto UnregisterReplicationInstance(std::string instance_name) -> coordination::UnregisterInstanceCoordinatorStatus;
auto UnregisterReplicationInstance(std::string_view instance_name)
-> coordination::UnregisterInstanceCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string_view instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void;
private:
coordination::CoordinatorState &coordinator_state_;

View File

@ -19,7 +19,6 @@
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/fmt.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
@ -135,7 +134,7 @@ void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_ha
replication_coordination_glue::SwapMainUUIDReq req;
slk::Load(&req, req_reader);
spdlog::info(fmt::format("Set replica data UUID to main uuid {}", std::string(req.uuid)));
spdlog::info("Set replica data UUID to main uuid {}", std::string(req.uuid));
dbms_handler->ReplicationState().TryPersistRoleReplica(role_replica_data.config, req.uuid);
role_replica_data.uuid_ = req.uuid;

View File

@ -24,22 +24,22 @@
namespace memgraph::io::network {
Endpoint::IpFamily Endpoint::GetIpFamily(const std::string &address) {
Endpoint::IpFamily Endpoint::GetIpFamily(std::string_view address) {
in_addr addr4;
in6_addr addr6;
int ipv4_result = inet_pton(AF_INET, address.c_str(), &addr4);
int ipv6_result = inet_pton(AF_INET6, address.c_str(), &addr6);
int ipv4_result = inet_pton(AF_INET, address.data(), &addr4);
int ipv6_result = inet_pton(AF_INET6, address.data(), &addr6);
if (ipv4_result == 1) {
return IpFamily::IP4;
} else if (ipv6_result == 1) {
return IpFamily::IP6;
} else {
return IpFamily::NONE;
}
if (ipv6_result == 1) {
return IpFamily::IP6;
}
return IpFamily::NONE;
}
std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrIpAddress(
const std::string &address, const std::optional<uint16_t> default_port) {
std::string_view address, const std::optional<uint16_t> default_port) {
/// expected address format:
/// - "ip_address:port_number"
/// - "ip_address"
@ -56,7 +56,7 @@ std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrIpAddress
if (GetIpFamily(address) == IpFamily::NONE) {
return std::nullopt;
}
return std::pair{address, *default_port};
return std::pair{std::string(address), *default_port}; // TODO: (andi) Optimize throughout the code
}
} else if (parts.size() == 2) {
ip_address = std::move(parts[0]);
@ -88,7 +88,7 @@ std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrIpAddress
}
std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseHostname(
const std::string &address, const std::optional<uint16_t> default_port = {}) {
std::string_view address, const std::optional<uint16_t> default_port = {}) {
const std::string delimiter = ":";
std::string ip_address;
std::vector<std::string> parts = utils::Split(address, delimiter);
@ -97,7 +97,7 @@ std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseHostname(
if (!IsResolvableAddress(address, *default_port)) {
return std::nullopt;
}
return std::pair{address, *default_port};
return std::pair{std::string(address), *default_port}; // TODO: (andi) Optimize throughout the code
}
} else if (parts.size() == 2) {
int64_t int_port{0};
@ -153,20 +153,20 @@ std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint) {
return os << endpoint.address << ":" << endpoint.port;
}
bool Endpoint::IsResolvableAddress(const std::string &address, uint16_t port) {
bool Endpoint::IsResolvableAddress(std::string_view address, uint16_t port) {
addrinfo hints{
.ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC, // IPv4 and IPv6
.ai_socktype = SOCK_STREAM // TCP socket
};
addrinfo *info = nullptr;
auto status = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints, &info);
auto status = getaddrinfo(address.data(), std::to_string(port).c_str(), &hints, &info);
if (info) freeaddrinfo(info);
return status == 0;
}
std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrAddress(
const std::string &address, const std::optional<uint16_t> default_port) {
std::string_view address, const std::optional<uint16_t> default_port) {
const std::string delimiter = ":";
std::vector<std::string> parts = utils::Split(address, delimiter);
if (parts.size() == 1) {

View File

@ -48,8 +48,8 @@ struct Endpoint {
uint16_t port{0};
IpFamily family{IpFamily::NONE};
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrAddress(const std::string &address,
std::optional<uint16_t> default_port);
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrAddress(
std::string_view address, std::optional<uint16_t> default_port = {});
/**
* Tries to parse the given string as either a socket address or ip address.
@ -62,7 +62,7 @@ struct Endpoint {
* it won't be used, as we expect that it is given in the address string.
*/
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress(
const std::string &address, std::optional<uint16_t> default_port = {});
std::string_view address, std::optional<uint16_t> default_port = {});
/**
* Tries to parse given string as either socket address or hostname.
@ -71,12 +71,12 @@ struct Endpoint {
* - "hostname"
* After we parse hostname and port we try to resolve the hostname into an ip_address.
*/
static std::optional<std::pair<std::string, uint16_t>> ParseHostname(const std::string &address,
static std::optional<std::pair<std::string, uint16_t>> ParseHostname(std::string_view address,
std::optional<uint16_t> default_port);
static IpFamily GetIpFamily(const std::string &address);
static IpFamily GetIpFamily(std::string_view address);
static bool IsResolvableAddress(const std::string &address, uint16_t port);
static bool IsResolvableAddress(std::string_view address, uint16_t port);
/**
* Tries to resolve hostname to its corresponding IP address.

View File

@ -410,7 +410,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
: coordinator_handler_(coordinator_state) {}
void UnregisterInstance(std::string const &instance_name) override {
void UnregisterInstance(std::string_view instance_name) override {
auto status = coordinator_handler_.UnregisterReplicationInstance(instance_name);
switch (status) {
using enum memgraph::coordination::UnregisterInstanceCoordinatorStatus;
@ -423,6 +423,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!");
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't unregister replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't unregister replica instance because current main instance couldn't unregister replica!");
@ -431,20 +433,18 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void RegisterReplicationInstance(std::string const &coordinator_socket_address,
std::string const &replication_socket_address,
void RegisterReplicationInstance(std::string_view coordinator_socket_address,
std::string_view replication_socket_address,
std::chrono::seconds const &instance_check_frequency,
std::chrono::seconds const &instance_down_timeout,
std::chrono::seconds const &instance_get_uuid_frequency,
std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
std::string_view instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
const auto maybe_coordinator_ip_port =
io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt);
const auto maybe_coordinator_ip_port = io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
@ -452,13 +452,13 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
const auto [replication_ip, replication_port] = *maybe_replication_ip_port;
const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port;
const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{
.instance_name = instance_name,
.instance_name = std::string(instance_name),
.replication_mode = convertFromCoordinatorToReplicationMode(sync_mode),
.replication_ip_address = replication_ip,
.replication_port = replication_port};
auto coordinator_client_config =
coordination::CoordinatorClientConfig{.instance_name = instance_name,
coordination::CoordinatorClientConfig{.instance_name = std::string(instance_name),
.ip_address = coordinator_server_ip,
.port = coordinator_server_port,
.instance_health_check_frequency_sec = instance_check_frequency,
@ -472,18 +472,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!");
case ENDPOINT_EXISTS:
case COORD_ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
"Couldn't register replica instance since instance with such coordinator endpoint already exists!");
case REPL_ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such replication endpoint already exists!");
case NOT_COORDINATOR:
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
@ -494,8 +493,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &raft_socket_address) -> void override {
auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(raft_socket_address);
auto AddCoordinatorInstance(uint32_t raft_server_id, std::string_view raft_socket_address) -> void override {
auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrAddress(raft_socket_address);
if (maybe_ip_and_port) {
auto const [ip, port] = *maybe_ip_and_port;
spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip);
@ -505,8 +504,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void SetReplicationInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name);
void SetReplicationInstanceToMain(std::string_view instance_name) override {
auto const status = coordinator_handler_.SetReplicationInstanceToMain(instance_name);
switch (status) {
using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
@ -515,6 +514,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("Couldn't set instance to main since there is already a main instance in cluster!");
case NOT_COORDINATOR:
throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't set instance to main since coordinator is not a leader!");
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't promote instance since raft server couldn't append the log!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(
"Couldn't set replica instance to main! Check coordinator and replica for more logs");
@ -1251,14 +1254,13 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES.");
}
callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "alive", "role"};
callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "health", "role"};
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
replica_nfields = callback.header.size()]() mutable {
auto const instances = handler.ShowInstances();
auto const converter = [](const auto &status) -> std::vector<TypedValue> {
return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address},
TypedValue{status.coord_socket_address}, TypedValue{status.is_alive},
TypedValue{status.cluster_role}};
TypedValue{status.coord_socket_address}, TypedValue{status.health}, TypedValue{status.cluster_role}};
};
return utils::fmap(converter, instances);

View File

@ -95,25 +95,24 @@ class CoordinatorQueryHandler {
};
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplicationInstance(std::string const &coordinator_socket_address,
std::string const &replication_socket_address,
virtual void RegisterReplicationInstance(std::string_view coordinator_socket_address,
std::string_view replication_socket_address,
std::chrono::seconds const &instance_health_check_frequency,
std::chrono::seconds const &instance_down_timeout,
std::chrono::seconds const &instance_get_uuid_frequency,
std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
std::string_view instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void UnregisterInstance(std::string const &instance_name) = 0;
virtual void UnregisterInstance(std::string_view instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0;
virtual void SetReplicationInstanceToMain(std::string_view instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &coordinator_socket_address)
-> void = 0;
virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string_view coordinator_socket_address) -> void = 0;
};
#endif

View File

@ -12,7 +12,32 @@
#pragma once
#include <cstdint>
#include <map>
#include <stdexcept>
#include <string>
namespace memgraph::replication_coordination_glue {
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
inline auto ReplicationModeToString(ReplicationMode mode) -> std::string {
switch (mode) {
case ReplicationMode::SYNC:
return "SYNC";
case ReplicationMode::ASYNC:
return "ASYNC";
}
throw std::invalid_argument("Invalid replication mode");
}
inline auto ReplicationModeFromString(std::string_view mode) -> ReplicationMode {
if (mode == "SYNC") {
return ReplicationMode::SYNC;
}
if (mode == "ASYNC") {
return ReplicationMode::ASYNC;
}
throw std::invalid_argument("Invalid replication mode");
}
} // namespace memgraph::replication_coordination_glue

View File

@ -24,6 +24,7 @@ find_package(Threads REQUIRED)
add_library(mg-utils STATIC ${utils_src_files})
add_library(mg::utils ALIAS mg-utils)
target_link_libraries(mg-utils PUBLIC Boost::headers fmt::fmt spdlog::spdlog json)
target_link_libraries(mg-utils PRIVATE librdtsc stdc++fs Threads::Threads gflags uuid rt)

View File

@ -229,6 +229,13 @@ inline std::vector<std::string> Split(const std::string_view src, const std::str
return res;
}
inline std::vector<std::string_view> SplitView(const std::string_view src, const std::string_view delimiter,
int splits = -1) {
std::vector<std::string_view> res;
Split(&res, src, delimiter, splits);
return res;
}
/**
* Split a string by whitespace into a vector.
* Runs of consecutive whitespace are regarded as a single delimiter.

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,7 +10,7 @@
// licenses/APL.txt.
#include "utils/uuid.hpp"
#include <uuid/uuid.h>
#include "slk/serialization.hpp"
namespace memgraph::utils {

View File

@ -12,6 +12,7 @@
#pragma once
#include <uuid/uuid.h>
#include <array>
#include <json/json.hpp>
#include <string>

View File

@ -133,12 +133,12 @@ def test_register_repl_instances_then_coordinators():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
@ -147,21 +147,23 @@ def test_register_repl_instances_then_coordinators():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
def test_register_coordinator_then_repl_instances():
@ -187,12 +189,12 @@ def test_register_coordinator_then_repl_instances():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
@ -201,21 +203,23 @@ def test_register_coordinator_then_repl_instances():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
def test_coordinators_communication_with_restarts():
@ -237,10 +241,13 @@ def test_coordinators_communication_with_restarts():
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
@ -248,20 +255,20 @@ def test_coordinators_communication_with_restarts():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
@ -271,11 +278,11 @@ def test_coordinators_communication_with_restarts():
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
# TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
# # TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
@pytest.mark.parametrize(
"kill_instance",
[True, False],
@ -284,7 +291,12 @@ def test_unregister_replicas(kill_instance):
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
@ -296,6 +308,12 @@ def test_unregister_replicas(kill_instance):
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
@ -305,10 +323,21 @@ def test_unregister_replicas(kill_instance):
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = [
@ -328,6 +357,8 @@ def test_unregister_replicas(kill_instance):
),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -336,9 +367,19 @@ def test_unregister_replicas(kill_instance):
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_1")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = [
@ -351,6 +392,8 @@ def test_unregister_replicas(kill_instance):
),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -359,11 +402,22 @@ def test_unregister_replicas(kill_instance):
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_2")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = []
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -372,7 +426,11 @@ def test_unregister_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
@ -384,16 +442,35 @@ def test_unregister_main():
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
try:
@ -407,20 +484,43 @@ def test_unregister_main():
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
]
expected_replicas = [
@ -438,6 +538,8 @@ def test_unregister_main():
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)

View File

@ -44,10 +44,10 @@ def test_coordinator_show_instances():
return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;")))
expected_data = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)

View File

@ -143,20 +143,20 @@ def test_writing_disabled_on_main_restart():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
@ -173,10 +173,10 @@ def test_writing_disabled_on_main_restart():
)
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)

View File

@ -17,7 +17,11 @@ import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection
from mg_utils import (
mg_sleep_and_assert,
mg_sleep_and_assert_any_function,
mg_sleep_and_assert_collection,
)
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -138,8 +142,11 @@ def test_distributed_automatic_failover():
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == sorted(expected_data_on_main)
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -149,12 +156,12 @@ def test_distributed_automatic_failover():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -202,5 +209,88 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_distributed_automatic_failover_after_coord_dies():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
new_main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "invalid"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -75,8 +75,11 @@ def test_replication_works_on_failover():
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 3
interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION)
@ -200,9 +203,9 @@ def test_not_replicate_old_main_register_new_cluster():
return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;")))
expected_data_up_first_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("shared_instance", "", "127.0.0.1:10011", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("shared_instance", "", "127.0.0.1:10011", "up", "replica"),
]
mg_sleep_and_assert(expected_data_up_first_cluster, show_repl_cluster)
@ -254,9 +257,9 @@ def test_not_replicate_old_main_register_new_cluster():
return sorted(list(execute_and_fetch_all(second_cluster_coord_cursor, "SHOW INSTANCES;")))
expected_data_up_second_cluster = [
("coordinator_1", "127.0.0.1:10112", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("shared_instance", "", "127.0.0.1:10011", True, "replica"),
("coordinator_1", "127.0.0.1:10112", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
("shared_instance", "", "127.0.0.1:10011", "up", "replica"),
]
mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster)

View File

@ -252,10 +252,10 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -269,10 +269,10 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
new_expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
@ -289,10 +289,10 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
new_expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
@ -482,11 +482,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -506,11 +506,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
# 6
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -531,11 +531,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
# 10
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -550,11 +550,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -573,11 +573,11 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
("instance_4", "", "127.0.0.1:10014", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -779,11 +779,11 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
# TODO(antoniofilipovic) Before fixing durability, if this is removed we also have an issue. Check after fix
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -811,11 +811,11 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
# 7
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -837,11 +837,11 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "main"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -854,11 +854,11 @@ def test_replication_correct_replica_chosen_up_to_date_data(data_recovery):
# 11
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
("instance_4", "", "127.0.0.1:10014", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
@ -910,8 +910,11 @@ def test_replication_works_on_failover_simple():
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
def main_cursor_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, main_cursor_show_replicas)
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -923,10 +926,10 @@ def test_replication_works_on_failover_simple():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -1028,8 +1031,11 @@ def test_replication_works_on_replica_instance_restart():
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
def main_cursor_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, main_cursor_show_replicas)
# 3
coord_cursor = connect(host="localhost", port=7690).cursor()
@ -1040,10 +1046,10 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert_collection(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -1106,10 +1112,10 @@ def test_replication_works_on_replica_instance_restart():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -1154,10 +1160,10 @@ def test_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
@ -1177,20 +1183,20 @@ def test_show_instances():
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
@ -1216,8 +1222,11 @@ def test_simple_automatic_failover():
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == sorted(expected_data_on_main)
def main_cursor_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, main_cursor_show_replicas)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -1227,10 +1236,10 @@ def test_simple_automatic_failover():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -1302,7 +1311,10 @@ def test_registering_replica_fails_endpoint_exists():
coord_cursor,
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"
assert (
str(e.value)
== "Couldn't register replica instance since instance with such coordinator endpoint already exists!"
)
def test_replica_instance_restarts():
@ -1315,20 +1327,20 @@ def test_replica_instance_restarts():
return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;")))
expected_data_up = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_down, show_repl_cluster)
@ -1357,18 +1369,18 @@ def test_automatic_failover_main_back_as_replica():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_after_failover = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
expected_data_after_main_coming_back = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
]
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -1396,20 +1408,20 @@ def test_automatic_failover_main_back_as_main():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_all_down = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_main_back = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
@ -1424,10 +1436,10 @@ def test_automatic_failover_main_back_as_main():
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data_replicas_back = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)

View File

@ -17,6 +17,28 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=
return result
def mg_sleep_and_assert_any_function(
expected_value, functions_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):
result = [f() for f in functions_to_retrieve_data]
if any((x == expected_value for x in result)):
return result
start_time = time.time()
while result != expected_value:
duration = time.time() - start_time
if duration > max_duration:
assert (
False
), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}"
time.sleep(time_between_attempt)
result = [f() for f in functions_to_retrieve_data]
if any((x == expected_value for x in result)):
return result
return result
def mg_sleep_and_assert_collection(
expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2
):