diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index cb3070c45..60f0ca622 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -92,6 +92,10 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act it->second.role = ReplicationRole::REPLICA; break; } + case RaftLogAction::UPDATE_UUID: { + uuid_ = std::get(log_entry); + break; + } } } @@ -141,31 +145,12 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta return cluster_state; } -auto CoordinatorClusterState::GetInstances() const -> std::vector> { +auto CoordinatorClusterState::GetInstances() const -> std::vector { auto lock = std::shared_lock{log_lock_}; - // TODO: (andi) Abstract - auto const role_to_string = [](auto const &role) -> std::string { - switch (role) { - case ReplicationRole::MAIN: - return "main"; - case ReplicationRole::REPLICA: - return "replica"; - } - }; - - auto const entry_to_pair = [&role_to_string](auto const &entry) { - return std::make_pair(entry.first, role_to_string(entry.second.role)); - }; - - return instance_roles_ | ranges::views::transform(entry_to_pair) | ranges::to(); + return instance_roles_ | ranges::views::values | ranges::to>; } -auto CoordinatorClusterState::GetClientConfigs() const -> std::vector { - auto lock = std::shared_lock{log_lock_}; - return instance_roles_ | ranges::views::values | - ranges::views::transform([](auto const &instance_state) { return instance_state.config; }) | - ranges::to(); -} +auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; } auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional { auto lock = std::shared_lock{log_lock_}; diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 4eaf41107..6e2986ecf 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -57,6 +57,11 @@ auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view ins return CreateLog(str_log); } +auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr { + auto const str_log = fmt::format("{}*update_uuid", nlohmann::json{{"uuid", uuid}}.dump()); + return CreateLog(str_log); +} + auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { buffer_serializer bs(data); @@ -77,6 +82,11 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair(), RaftLogAction::UPDATE_UUID}; + } + throw std::runtime_error("Unknown action"); } @@ -87,7 +97,6 @@ auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr snapshot) - } } -auto CoordinatorStateMachine::GetInstances() const -> std::vector> { +auto CoordinatorStateMachine::GetInstances() const -> std::vector { return cluster_state_.GetInstances(); } -auto CoordinatorStateMachine::GetClientConfigs() const -> std::vector { - return cluster_state_.GetClientConfigs(); -} +auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); } } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index cee2f2c57..b0d7118ff 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -27,6 +27,7 @@ namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; +// TODO: (andi) JSON serialization for RAFT log. struct CoordinatorClientConfig { std::string instance_name; std::string ip_address; diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 620d9ba7c..10549f468 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -75,8 +75,6 @@ class CoordinatorInstance { std::list repl_instances_; mutable utils::ResourceLock coord_instance_lock_{}; - utils::UUID main_uuid_; - RaftState raft_state_; }; diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index e61ae0e2c..d702697f1 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -72,10 +72,10 @@ class RaftState { auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool; auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool; auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; + auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool; - auto GetInstances() const -> std::vector>; - - auto GetClientConfigs() const -> std::vector; + auto GetInstances() const -> std::vector; + auto GetUUID() const -> utils::UUID; private: // TODO: (andi) I think variables below can be abstracted/clean them. diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index 88263d803..f38d00073 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -17,6 +17,7 @@ #include "nuraft/raft_log_action.hpp" #include "replication_coordination_glue/role.hpp" #include "utils/resource_lock.hpp" +#include "utils/uuid.hpp" #include #include @@ -35,7 +36,7 @@ struct InstanceState { ReplicationRole role; }; -using TRaftLog = std::variant; +using TRaftLog = std::variant; using nuraft::buffer; using nuraft::buffer_serializer; @@ -67,12 +68,13 @@ class CoordinatorClusterState { static auto Deserialize(buffer &data) -> CoordinatorClusterState; - auto GetInstances() const -> std::vector>; + auto GetInstances() const -> std::vector; - auto GetClientConfigs() const -> std::vector; + auto GetUUID() const -> utils::UUID; private: std::map> instance_roles_; + utils::UUID uuid_{}; mutable utils::ResourceLock log_lock_{}; }; diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 9ecbf62b7..aea21ab4e 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -52,6 +52,7 @@ class CoordinatorStateMachine : public state_machine { static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr; static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr; static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr; + static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr; static auto DecodeLog(buffer &data) -> std::pair; @@ -79,9 +80,8 @@ class CoordinatorStateMachine : public state_machine { auto create_snapshot(snapshot &s, async_result::handler_type &when_done) -> void override; - auto GetInstances() const -> std::vector>; - - auto GetClientConfigs() const -> std::vector; + auto GetInstances() const -> std::vector; + auto GetUUID() const -> utils::UUID; private: struct SnapshotCtx { diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp index 98dc52346..399d33150 100644 --- a/src/coordination/include/nuraft/raft_log_action.hpp +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -24,7 +24,8 @@ enum class RaftLogAction : uint8_t { REGISTER_REPLICATION_INSTANCE, UNREGISTER_REPLICATION_INSTANCE, SET_INSTANCE_AS_MAIN, - SET_INSTANCE_AS_REPLICA + SET_INSTANCE_AS_REPLICA, + UPDATE_UUID }; inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction { @@ -40,6 +41,11 @@ inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction { if (action == "demote") { return RaftLogAction::SET_INSTANCE_AS_REPLICA; } + + if (action == "update_uuid") { + return RaftLogAction::UPDATE_UUID; + } + throw InvalidRaftLogActionException("Invalid Raft log action: {}.", action); } diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 14d71a91e..365388b06 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -209,6 +209,25 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> return true; } +auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool { + auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for updating UUID. Most likely the reason is that the instance is not " + "the leader."); + return false; + } + spdlog::info("Request for updating UUID accepted"); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to update UUID with error code {}", res->get_result_code()); + return false; + } + + return true; +} + auto RaftState::FindCurrentMainInstanceName() const -> std::optional { return state_machine_->FindCurrentMainInstanceName(); } @@ -221,13 +240,9 @@ auto RaftState::IsReplica(std::string_view instance_name) const -> bool { return state_machine_->IsReplica(instance_name); } -auto RaftState::GetInstances() const -> std::vector> { - return state_machine_->GetInstances(); -} +auto RaftState::GetInstances() const -> std::vector { return state_machine_->GetInstances(); } -auto RaftState::GetClientConfigs() const -> std::vector { - return state_machine_->GetClientConfigs(); -} +auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); } } // namespace memgraph::coordination #endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index b5c633978..ca7572ea7 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -26,9 +26,7 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC HealthCheckInstanceCallback fail_instance_cb) : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), succ_cb_(succ_instance_cb), - fail_cb_(fail_instance_cb) { - client_.StartFrequentCheck(); -} + fail_cb_(fail_instance_cb) {} auto ReplicationInstance::OnSuccessPing() -> void { last_response_time_ = std::chrono::system_clock::now(); diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 3fc174d3c..8339b65b4 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -19,7 +19,6 @@ #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/version.hpp" -#include "storage/v2/fmt.hpp" #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/inmemory/storage.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" @@ -135,7 +134,7 @@ void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_ha replication_coordination_glue::SwapMainUUIDReq req; slk::Load(&req, req_reader); - spdlog::info(fmt::format("Set replica data UUID to main uuid {}", std::string(req.uuid))); + spdlog::info("Set replica data UUID to main uuid {}", std::string(req.uuid)); dbms_handler->ReplicationState().TryPersistRoleReplica(role_replica_data.config, req.uuid); role_replica_data.uuid_ = req.uuid; diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index 1a5db1e09..9fa654d68 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -17,7 +17,11 @@ import tempfile import interactive_mg_runner import pytest from common import connect, execute_and_fetch_all, safe_execute -from mg_utils import mg_sleep_and_assert, mg_sleep_and_assert_collection +from mg_utils import ( + mg_sleep_and_assert, + mg_sleep_and_assert_any_function, + mg_sleep_and_assert_collection, +) interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) interactive_mg_runner.PROJECT_DIR = os.path.normpath( @@ -205,5 +209,88 @@ def test_distributed_automatic_failover(): mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) +def test_distributed_automatic_failover_after_coord_dies(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + + def show_instances_coord1(): + return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;"))) + + coord_cursor_2 = connect(host="localhost", port=7691).cursor() + + def show_instances_coord2(): + return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;"))) + + leader_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + + follower_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "", "unknown", "main"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) + + new_main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_new_main = [ + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ( + "instance_3", + "127.0.0.1:10003", + "sync", + {"ts": 0, "behind": None, "status": "invalid"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}}, + ), + ] + mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_on_new_main_old_alive = [ + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ( + "instance_3", + "127.0.0.1:10003", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ] + + mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/mg_utils.py b/tests/e2e/mg_utils.py index 3a475bf3c..7279f25f2 100644 --- a/tests/e2e/mg_utils.py +++ b/tests/e2e/mg_utils.py @@ -17,6 +17,28 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration= return result +def mg_sleep_and_assert_any_function( + expected_value, functions_to_retrieve_data, max_duration=20, time_between_attempt=0.2 +): + result = [f() for f in functions_to_retrieve_data] + if any((x == expected_value for x in result)): + return result + start_time = time.time() + while result != expected_value: + duration = time.time() - start_time + if duration > max_duration: + assert ( + False + ), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}" + + time.sleep(time_between_attempt) + result = [f() for f in functions_to_retrieve_data] + if any((x == expected_value for x in result)): + return result + + return result + + def mg_sleep_and_assert_collection( expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2 ):