From 1802dc93d12bc904bb239145fa3909c3a5ccb6e0 Mon Sep 17 00:00:00 2001 From: Andi Date: Tue, 5 Mar 2024 08:33:13 +0100 Subject: [PATCH] Improve Raft log serialization (#1778) --- src/coordination/CMakeLists.txt | 1 + src/coordination/coordinator_config.cpp | 54 +++++++ .../coordinator_state_machine.cpp | 66 ++++---- .../coordination/coordinator_config.hpp | 44 +---- .../nuraft/coordinator_state_machine.hpp | 2 +- .../include/nuraft/raft_log_action.hpp | 10 ++ src/replication_coordination_glue/mode.hpp | 25 +-- tests/unit/CMakeLists.txt | 7 + tests/unit/raft_log_serialization.cpp | 151 ++++++++++++++++++ 9 files changed, 265 insertions(+), 95 deletions(-) create mode 100644 src/coordination/coordinator_config.cpp create mode 100644 tests/unit/raft_log_serialization.cpp diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 3e293e2e7..ef9376a70 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -23,6 +23,7 @@ target_sources(mg-coordination include/nuraft/coordinator_state_manager.hpp PRIVATE + coordinator_config.cpp coordinator_client.cpp coordinator_state.cpp coordinator_rpc.cpp diff --git a/src/coordination/coordinator_config.cpp b/src/coordination/coordinator_config.cpp new file mode 100644 index 000000000..a1147d3b6 --- /dev/null +++ b/src/coordination/coordinator_config.cpp @@ -0,0 +1,54 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_config.hpp" + +namespace memgraph::coordination { + +void to_json(nlohmann::json &j, ReplClientInfo const &config) { + j = nlohmann::json{{"instance_name", config.instance_name}, + {"replication_mode", config.replication_mode}, + {"replication_ip_address", config.replication_ip_address}, + {"replication_port", config.replication_port}}; +} + +void from_json(nlohmann::json const &j, ReplClientInfo &config) { + config.instance_name = j.at("instance_name").get(); + config.replication_mode = j.at("replication_mode").get(); + config.replication_ip_address = j.at("replication_ip_address").get(); + config.replication_port = j.at("replication_port").get(); +} + +void to_json(nlohmann::json &j, CoordinatorClientConfig const &config) { + j = nlohmann::json{{"instance_name", config.instance_name}, + {"ip_address", config.ip_address}, + {"port", config.port}, + {"instance_health_check_frequency_sec", config.instance_health_check_frequency_sec.count()}, + {"instance_down_timeout_sec", config.instance_down_timeout_sec.count()}, + {"instance_get_uuid_frequency_sec", config.instance_get_uuid_frequency_sec.count()}, + {"replication_client_info", config.replication_client_info}}; +} + +void from_json(nlohmann::json const &j, CoordinatorClientConfig &config) { + config.instance_name = j.at("instance_name").get(); + config.ip_address = j.at("ip_address").get(); + config.port = j.at("port").get(); + config.instance_health_check_frequency_sec = + std::chrono::seconds{j.at("instance_health_check_frequency_sec").get()}; + config.instance_down_timeout_sec = std::chrono::seconds{j.at("instance_down_timeout_sec").get()}; + config.instance_get_uuid_frequency_sec = std::chrono::seconds{j.at("instance_get_uuid_frequency_sec").get()}; + config.replication_client_info = j.at("replication_client_info").get(); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 6e2986ecf..564303f22 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -30,77 +30,67 @@ auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const -> return cluster_state_.IsReplica(instance_name); } -auto CoordinatorStateMachine::CreateLog(std::string_view log) -> ptr { - ptr log_buf = buffer::alloc(sizeof(uint32_t) + log.size()); +auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr { + auto const log_dump = log.dump(); + ptr log_buf = buffer::alloc(sizeof(uint32_t) + log_dump.size()); buffer_serializer bs(log_buf); - bs.put_str(log.data()); + bs.put_str(log_dump); return log_buf; } auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr { - auto const str_log = fmt::format("{}*register", config.ToString()); - return CreateLog(str_log); + return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}}); } auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr { - auto const str_log = fmt::format("{}*unregister", instance_name); - return CreateLog(str_log); + return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}}); } auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr { - auto const str_log = fmt::format("{}*promote", instance_name); - return CreateLog(str_log); + return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_name}}); } auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr { - auto const str_log = fmt::format("{}*demote", instance_name); - return CreateLog(str_log); + return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}}); } auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr { - auto const str_log = fmt::format("{}*update_uuid", nlohmann::json{{"uuid", uuid}}.dump()); - return CreateLog(str_log); + return CreateLog({{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}}); } auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { buffer_serializer bs(data); + auto const json = nlohmann::json::parse(bs.get_str()); - auto const log_str = bs.get_str(); - auto const sep = log_str.find('*'); - auto const action = log_str.substr(sep + 1); - auto const info = log_str.substr(0, sep); + auto const action = json["action"].get(); + auto const &info = json["info"]; - if (action == "register") { - return {CoordinatorClientConfig::FromString(info), RaftLogAction::REGISTER_REPLICATION_INSTANCE}; + switch (action) { + case RaftLogAction::REGISTER_REPLICATION_INSTANCE: + return {info.get(), action}; + case RaftLogAction::UPDATE_UUID: + return {info.get(), action}; + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: + case RaftLogAction::SET_INSTANCE_AS_MAIN: + [[fallthrough]]; + case RaftLogAction::SET_INSTANCE_AS_REPLICA: + return {info.get(), action}; } - if (action == "unregister") { - return {info, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}; - } - if (action == "promote") { - return {info, RaftLogAction::SET_INSTANCE_AS_MAIN}; - } - if (action == "demote") { - return {info, RaftLogAction::SET_INSTANCE_AS_REPLICA}; - } - if (action == "update_uuid") { - auto const json = nlohmann::json::parse(info); - return {json.at("uuid").get(), RaftLogAction::UPDATE_UUID}; - } - throw std::runtime_error("Unknown action"); } auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr { return nullptr; } auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr { - buffer_serializer bs(data); - auto const [parsed_data, log_action] = DecodeLog(data); cluster_state_.DoAction(parsed_data, log_action); - last_committed_idx_ = log_idx; - // TODO: (andi) Don't return nullptr - return nullptr; + + // Return raft log number + ptr ret = buffer::alloc(sizeof(log_idx)); + buffer_serializer bs_ret(ret); + bs_ret.put_u64(log_idx); + return ret; } auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr & /*new_conf*/) -> void { diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index b0d7118ff..127a365eb 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -22,12 +22,12 @@ #include #include +#include "json/json.hpp" namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; -// TODO: (andi) JSON serialization for RAFT log. struct CoordinatorClientConfig { std::string instance_name; std::string ip_address; @@ -43,28 +43,11 @@ struct CoordinatorClientConfig { } struct ReplicationClientInfo { - // TODO: (andi) Do we even need here instance_name for this struct? std::string instance_name; replication_coordination_glue::ReplicationMode replication_mode{}; std::string replication_ip_address; uint16_t replication_port{}; - auto ToString() const -> std::string { - return fmt::format("{}#{}#{}#{}", instance_name, replication_ip_address, replication_port, - replication_coordination_glue::ReplicationModeToString(replication_mode)); - } - - // TODO: (andi) How can I make use of monadic parsers here? - static auto FromString(std::string_view log) -> ReplicationClientInfo { - ReplicationClientInfo replication_client_info; - auto splitted = utils::Split(log, "#"); - replication_client_info.instance_name = splitted[0]; - replication_client_info.replication_ip_address = splitted[1]; - replication_client_info.replication_port = std::stoi(splitted[2]); - replication_client_info.replication_mode = replication_coordination_glue::ReplicationModeFromString(splitted[3]); - return replication_client_info; - } - friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default; }; @@ -79,25 +62,6 @@ struct CoordinatorClientConfig { std::optional ssl; - auto ToString() const -> std::string { - return fmt::format("{}|{}|{}|{}|{}|{}|{}", instance_name, ip_address, port, - instance_health_check_frequency_sec.count(), instance_down_timeout_sec.count(), - instance_get_uuid_frequency_sec.count(), replication_client_info.ToString()); - } - - static auto FromString(std::string_view log) -> CoordinatorClientConfig { - CoordinatorClientConfig config; - auto splitted = utils::Split(log, "|"); - config.instance_name = splitted[0]; - config.ip_address = splitted[1]; - config.port = std::stoi(splitted[2]); - config.instance_health_check_frequency_sec = std::chrono::seconds(std::stoi(splitted[3])); - config.instance_down_timeout_sec = std::chrono::seconds(std::stoi(splitted[4])); - config.instance_get_uuid_frequency_sec = std::chrono::seconds(std::stoi(splitted[5])); - config.replication_client_info = ReplicationClientInfo::FromString(splitted[6]); - return config; - } - friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default; }; @@ -119,5 +83,11 @@ struct CoordinatorServerConfig { friend bool operator==(CoordinatorServerConfig const &, CoordinatorServerConfig const &) = default; }; +void to_json(nlohmann::json &j, CoordinatorClientConfig const &config); +void from_json(nlohmann::json const &j, CoordinatorClientConfig &config); + +void to_json(nlohmann::json &j, ReplClientInfo const &config); +void from_json(nlohmann::json const &j, ReplClientInfo &config); + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index aea21ab4e..516b8efc5 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -47,7 +47,7 @@ class CoordinatorStateMachine : public state_machine { auto IsMain(std::string_view instance_name) const -> bool; auto IsReplica(std::string_view instance_name) const -> bool; - static auto CreateLog(std::string_view log) -> ptr; + static auto CreateLog(nlohmann::json &&log) -> ptr; static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr; static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr; static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr; diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp index 399d33150..953049038 100644 --- a/src/coordination/include/nuraft/raft_log_action.hpp +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -18,6 +18,8 @@ #include #include +#include "json/json.hpp" + namespace memgraph::coordination { enum class RaftLogAction : uint8_t { @@ -28,6 +30,14 @@ enum class RaftLogAction : uint8_t { UPDATE_UUID }; +NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, { + {RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"}, + {RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"}, + {RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"}, + {RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"}, + {RaftLogAction::UPDATE_UUID, "update_uuid"}, + }) + inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction { if (action == "register") { return RaftLogAction::REGISTER_REPLICATION_INSTANCE; diff --git a/src/replication_coordination_glue/mode.hpp b/src/replication_coordination_glue/mode.hpp index 3f27afb05..4ca98b3a0 100644 --- a/src/replication_coordination_glue/mode.hpp +++ b/src/replication_coordination_glue/mode.hpp @@ -16,28 +16,15 @@ #include #include +#include "json/json.hpp" + namespace memgraph::replication_coordination_glue { enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; -inline auto ReplicationModeToString(ReplicationMode mode) -> std::string { - switch (mode) { - case ReplicationMode::SYNC: - return "SYNC"; - case ReplicationMode::ASYNC: - return "ASYNC"; - } - throw std::invalid_argument("Invalid replication mode"); -} - -inline auto ReplicationModeFromString(std::string_view mode) -> ReplicationMode { - if (mode == "SYNC") { - return ReplicationMode::SYNC; - } - if (mode == "ASYNC") { - return ReplicationMode::ASYNC; - } - throw std::invalid_argument("Invalid replication mode"); -} +NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationMode, { + {ReplicationMode::SYNC, "sync"}, + {ReplicationMode::ASYNC, "async"}, + }) } // namespace memgraph::replication_coordination_glue diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index b92989f4e..f1afcdf15 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -438,3 +438,10 @@ add_unit_test(coordination_utils.cpp) target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue) target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include) endif() + +# Test Raft log serialization +if(MG_ENTERPRISE) +add_unit_test(raft_log_serialization.cpp) +target_link_libraries(${test_prefix}raft_log_serialization gflags mg-coordination mg-repl_coord_glue) +target_include_directories(${test_prefix}raft_log_serialization PRIVATE ${CMAKE_SOURCE_DIR}/include) +endif() diff --git a/tests/unit/raft_log_serialization.cpp b/tests/unit/raft_log_serialization.cpp new file mode 100644 index 000000000..8550cf5b8 --- /dev/null +++ b/tests/unit/raft_log_serialization.cpp @@ -0,0 +1,151 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "coordination/coordinator_config.hpp" +#include "nuraft/coordinator_state_machine.hpp" +#include "nuraft/raft_log_action.hpp" +#include "utils/file.hpp" +#include "utils/uuid.hpp" + +#include +#include +#include "json/json.hpp" + +using memgraph::coordination::CoordinatorClientConfig; +using memgraph::coordination::CoordinatorStateMachine; +using memgraph::coordination::RaftLogAction; +using memgraph::coordination::ReplClientInfo; +using memgraph::replication_coordination_glue::ReplicationMode; +using memgraph::utils::UUID; + +class RaftLogSerialization : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} + + std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_log_serialization"}; +}; + +TEST_F(RaftLogSerialization, ReplClientInfo) { + ReplClientInfo info{"instance_name", ReplicationMode::SYNC, "127.0.0.1", 10111}; + + nlohmann::json j = info; + ReplClientInfo info2 = j.get(); + + ASSERT_EQ(info, info2); +} + +TEST_F(RaftLogSerialization, CoordinatorClientConfig) { + CoordinatorClientConfig config{"instance3", + "127.0.0.1", + 10112, + std::chrono::seconds{1}, + std::chrono::seconds{5}, + std::chrono::seconds{10}, + {"instance_name", ReplicationMode::ASYNC, "replication_ip_address", 10001}, + .ssl = std::nullopt}; + + nlohmann::json j = config; + CoordinatorClientConfig config2 = j.get(); + + ASSERT_EQ(config, config2); +} + +TEST_F(RaftLogSerialization, RaftLogActionRegister) { + auto action = RaftLogAction::REGISTER_REPLICATION_INSTANCE; + + nlohmann::json j = action; + RaftLogAction action2 = j.get(); + + ASSERT_EQ(action, action2); +} + +TEST_F(RaftLogSerialization, RaftLogActionUnregister) { + auto action = RaftLogAction::UNREGISTER_REPLICATION_INSTANCE; + + nlohmann::json j = action; + RaftLogAction action2 = j.get(); + + ASSERT_EQ(action, action2); +} + +TEST_F(RaftLogSerialization, RaftLogActionPromote) { + auto action = RaftLogAction::SET_INSTANCE_AS_MAIN; + + nlohmann::json j = action; + RaftLogAction action2 = j.get(); + + ASSERT_EQ(action, action2); +} + +TEST_F(RaftLogSerialization, RaftLogActionDemote) { + auto action = RaftLogAction::SET_INSTANCE_AS_REPLICA; + + nlohmann::json j = action; + RaftLogAction action2 = j.get(); + + ASSERT_EQ(action, action2); +} + +TEST_F(RaftLogSerialization, RaftLogActionUpdateUUID) { + auto action = RaftLogAction::UPDATE_UUID; + + nlohmann::json j = action; + RaftLogAction action2 = j.get(); + + ASSERT_EQ(action, action2); +} + +TEST_F(RaftLogSerialization, RegisterInstance) { + CoordinatorClientConfig config{"instance3", + "127.0.0.1", + 10112, + std::chrono::seconds{1}, + std::chrono::seconds{5}, + std::chrono::seconds{10}, + {"instance_name", ReplicationMode::ASYNC, "replication_ip_address", 10001}, + .ssl = std::nullopt}; + + auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config); + auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); + ASSERT_EQ(action, RaftLogAction::REGISTER_REPLICATION_INSTANCE); + ASSERT_EQ(config, std::get(payload)); +} + +TEST_F(RaftLogSerialization, UnregisterInstance) { + auto buffer = CoordinatorStateMachine::SerializeUnregisterInstance("instance3"); + auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); + ASSERT_EQ(action, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE); + ASSERT_EQ("instance3", std::get(payload)); +} + +TEST_F(RaftLogSerialization, SetInstanceAsMain) { + auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3"); + auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); + ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_MAIN); + ASSERT_EQ("instance3", std::get(payload)); +} + +TEST_F(RaftLogSerialization, SetInstanceAsReplica) { + auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsReplica("instance3"); + auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); + ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_REPLICA); + ASSERT_EQ("instance3", std::get(payload)); +} + +TEST_F(RaftLogSerialization, UpdateUUID) { + UUID uuid; + auto buffer = CoordinatorStateMachine::SerializeUpdateUUID(uuid); + auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); + ASSERT_EQ(action, RaftLogAction::UPDATE_UUID); + ASSERT_EQ(uuid, std::get(payload)); +}