Add unreachable replica state

This commit is contained in:
Andi Skrgat 2024-02-09 07:53:02 +01:00
parent efd3257479
commit 8f9e044fcd
5 changed files with 41 additions and 30 deletions

View File

@ -3034,7 +3034,7 @@ class ReplicationQuery : public memgraph::query::Query {
enum class SyncMode { SYNC, ASYNC }; enum class SyncMode { SYNC, ASYNC };
enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, UNREACHABLE };
ReplicationQuery() = default; ReplicationQuery() = default;

View File

@ -437,6 +437,9 @@ class ReplQueryHandler {
case storage::replication::ReplicaState::MAYBE_BEHIND: case storage::replication::ReplicaState::MAYBE_BEHIND:
replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND; replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND;
break; break;
case storage::replication::ReplicaState::UNREACHABLE:
replica.state = ReplicationQuery::ReplicaState::UNREACHABLE;
break;
} }
return replica; return replica;
@ -1082,6 +1085,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
case ReplicationQuery::ReplicaState::MAYBE_BEHIND: case ReplicationQuery::ReplicaState::MAYBE_BEHIND:
typed_replica.emplace_back("invalid"); typed_replica.emplace_back("invalid");
break; break;
case ReplicationQuery::ReplicaState::UNREACHABLE:
typed_replica.emplace_back("unreachable");
break;
} }
typed_replicas.emplace_back(std::move(typed_replica)); typed_replicas.emplace_back(std::move(typed_replica));

View File

@ -133,39 +133,38 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplState() -> memgraph::replication::ReplicationState &; auto GetReplState() -> memgraph::replication::ReplicationState &;
private: private:
template <bool HandleFailure> template <bool AllowReplicaToBeUnreachable>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) auto RegisterReplica_(const replication::ReplicationClientConfig &config, bool send_swap_uuid)
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> { -> utils::BasicResult<memgraph::query::RegisterReplicaError> {
MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!");
auto maybe_client = repl_state_.RegisterReplica(config); auto maybe_client = repl_state_.RegisterReplica(config);
if (maybe_client.HasError()) { if (maybe_client.HasError()) {
switch (maybe_client.GetError()) { switch (maybe_client.GetError()) {
case memgraph::replication::RegisterReplicaError::NOT_MAIN: case replication::RegisterReplicaError::NOT_MAIN:
MG_ASSERT(false, "Only main instance can register a replica!"); MG_ASSERT(false, "Only main instance can register a replica!");
return {}; return {};
case memgraph::replication::RegisterReplicaError::NAME_EXISTS: case replication::RegisterReplicaError::NAME_EXISTS:
return memgraph::query::RegisterReplicaError::NAME_EXISTS; return query::RegisterReplicaError::NAME_EXISTS;
case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS: case replication::RegisterReplicaError::ENDPOINT_EXISTS:
return memgraph::query::RegisterReplicaError::ENDPOINT_EXISTS; return query::RegisterReplicaError::ENDPOINT_EXISTS;
case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: case replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return memgraph::query::RegisterReplicaError::COULD_NOT_BE_PERSISTED; return query::RegisterReplicaError::COULD_NOT_BE_PERSISTED;
case memgraph::replication::RegisterReplicaError::SUCCESS: case replication::RegisterReplicaError::SUCCESS:
break; break;
} }
} }
if (!memgraph::dbms::allow_mt_repl && dbms_handler_.All().size() > 1) { if (!dbms::allow_mt_repl && dbms_handler_.All().size() > 1) {
spdlog::warn("Multi-tenant replication is currently not supported!"); spdlog::warn("Multi-tenant replication is currently not supported!");
} }
const auto main_uuid =
std::get<memgraph::replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_;
if (send_swap_uuid) { auto const main_uuid =
if (!memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_, std::get<replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_;
main_uuid)) {
return memgraph::query::RegisterReplicaError::ERROR_ACCEPTING_MAIN; if (send_swap_uuid &&
} !replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_, main_uuid)) {
return query::RegisterReplicaError::ERROR_ACCEPTING_MAIN;
} }
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
@ -193,21 +192,21 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
[storage, &instance_client_ptr, db_acc = std::move(db_acc), [storage, &instance_client_ptr, db_acc = std::move(db_acc),
main_uuid](auto &storage_clients) mutable { // NOLINT main_uuid](auto &storage_clients) mutable { // NOLINT
auto client = std::make_unique<storage::ReplicationStorageClient>(*instance_client_ptr, main_uuid); auto client = std::make_unique<storage::ReplicationStorageClient>(*instance_client_ptr, main_uuid);
// All good, start replica client
client->Start(storage, std::move(db_acc)); client->Start(storage, std::move(db_acc));
// After start the storage <-> replica state should be READY or RECOVERING (if correctly started) // After start the storage <-> replica state shouldn't be MAYBE_BEHIND.
// MAYBE_BEHIND isn't a statement of the current state, this is the default value // When part of coordinator cluster we allow replica to be UNREACHABLE.
// Failed to start due an error like branching of MAIN and REPLICA auto state = client->State();
const bool success = client->State() != storage::replication::ReplicaState::MAYBE_BEHIND; bool const success =
if (HandleFailure || success) { (state != storage::replication::ReplicaState::MAYBE_BEHIND) ||
(state == storage::replication::ReplicaState::UNREACHABLE && AllowReplicaToBeUnreachable);
if (success) {
storage_clients.push_back(std::move(client)); storage_clients.push_back(std::move(client));
} }
return success; return success;
}); });
}); });
// NOTE Currently if any databases fails, we revert back if (!all_clients_good) {
if (!HandleFailure && !all_clients_good) {
spdlog::error("Failed to register all databases on the REPLICA \"{}\"", config.name); spdlog::error("Failed to register all databases on the REPLICA \"{}\"", config.name);
UnregisterReplica(config.name); UnregisterReplica(config.name);
return memgraph::query::RegisterReplicaError::CONNECTION_FAILED; return memgraph::query::RegisterReplicaError::CONNECTION_FAILED;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -14,6 +14,6 @@
namespace memgraph::storage::replication { namespace memgraph::storage::replication {
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, UNREACHABLE };
} // namespace memgraph::storage::replication } // namespace memgraph::storage::replication

View File

@ -46,6 +46,9 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
std::string{storage->uuid()}); std::string{storage->uuid()});
state = memgraph::replication::ReplicationClient::State::BEHIND; state = memgraph::replication::ReplicationClient::State::BEHIND;
}); });
replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::UNREACHABLE; });
return; return;
} }
#endif #endif
@ -149,6 +152,9 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren
auto locked_state = replica_state_.Lock(); auto locked_state = replica_state_.Lock();
switch (*locked_state) { switch (*locked_state) {
using enum replication::ReplicaState; using enum replication::ReplicaState;
case UNREACHABLE:
spdlog::debug("Replica {} is unreachable", client_.name_);
return;
case RECOVERY: case RECOVERY:
spdlog::debug("Replica {} is behind MAIN instance", client_.name_); spdlog::debug("Replica {} is behind MAIN instance", client_.name_);
return; return;