diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index 6fe6b8c9e..9b3a14672 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3034,7 +3034,7 @@ class ReplicationQuery : public memgraph::query::Query { enum class SyncMode { SYNC, ASYNC }; - enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; + enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, DIVERGED_FROM_MAIN }; ReplicationQuery() = default; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 27b6195ea..acfbd6e4c 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -438,6 +438,9 @@ class ReplQueryHandler { case storage::replication::ReplicaState::MAYBE_BEHIND: replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND; break; + case storage::replication::ReplicaState::DIVERGED_FROM_MAIN: + replica.state = ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN; + break; } return replica; @@ -1083,6 +1086,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & case ReplicationQuery::ReplicaState::MAYBE_BEHIND: typed_replica.emplace_back("invalid"); break; + case ReplicationQuery::ReplicaState::DIVERGED_FROM_MAIN: + typed_replica.emplace_back("diverged"); + break; } typed_replicas.emplace_back(std::move(typed_replica)); diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index a1903a62e..7882fa3c0 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -138,7 +138,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplState() -> memgraph::replication::ReplicationState &; private: - template + template auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> memgraph::utils::BasicResult { MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); @@ -200,13 +200,15 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { [storage, &instance_client_ptr, db_acc = std::move(db_acc), main_uuid](auto &storage_clients) mutable { // NOLINT auto client = std::make_unique(*instance_client_ptr, main_uuid); - // All good, start replica client client->Start(storage, std::move(db_acc)); - // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) - // MAYBE_BEHIND isn't a statement of the current state, this is the default value - // Failed to start due an error like branching of MAIN and REPLICA - const bool success = client->State() != storage::replication::ReplicaState::MAYBE_BEHIND; - if (HandleFailure || success) { + bool const success = std::invoke([state = client->State()]() { + if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) { + return AllowReplicaToDivergeFromMain; + } + return state != storage::replication::ReplicaState::MAYBE_BEHIND; + }); + + if (success) { storage_clients.push_back(std::move(client)); } return success; @@ -214,7 +216,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { }); // NOTE Currently if any databases fails, we revert back - if (!HandleFailure && !all_clients_good) { + if (!all_clients_good) { spdlog::error("Failed to register all databases on the REPLICA \"{}\"", config.name); UnregisterReplica(config.name); return memgraph::query::RegisterReplicaError::CONNECTION_FAILED; diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index be16ca192..2866303e6 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -14,6 +14,6 @@ namespace memgraph::storage::replication { -enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; +enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, DIVERGED_FROM_MAIN }; } // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 0c5ef8125..16247de57 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -69,7 +69,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce "now hold unique data. Please resolve data conflicts and start the " "replication on a clean instance.", client_.name_, client_.name_, client_.name_); - // State not updated, hence in MAYBE_BEHIND state + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::DIVERGED_FROM_MAIN; }); return; } @@ -171,6 +171,10 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren utils::MessageWithLink("Couldn't replicate data to {}.", client_.name_, "https://memgr.ph/replication")); TryCheckReplicaStateAsync(storage, std::move(db_acc)); return; + case DIVERGED_FROM_MAIN: + spdlog::error(utils::MessageWithLink("Couldn't replicate data to {} since replica has diverged from main.", + client_.name_, "https://memgr.ph/replication")); + return; case READY: MG_ASSERT(!replica_stream_); try {