Improve Raft log serialization (#1778)

This commit is contained in:
Andi 2024-03-05 08:33:13 +01:00 committed by GitHub
parent 822183b62d
commit 1802dc93d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 265 additions and 95 deletions

View File

@ -23,6 +23,7 @@ target_sources(mg-coordination
include/nuraft/coordinator_state_manager.hpp include/nuraft/coordinator_state_manager.hpp
PRIVATE PRIVATE
coordinator_config.cpp
coordinator_client.cpp coordinator_client.cpp
coordinator_state.cpp coordinator_state.cpp
coordinator_rpc.cpp coordinator_rpc.cpp

View File

@ -0,0 +1,54 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
namespace memgraph::coordination {
void to_json(nlohmann::json &j, ReplClientInfo const &config) {
j = nlohmann::json{{"instance_name", config.instance_name},
{"replication_mode", config.replication_mode},
{"replication_ip_address", config.replication_ip_address},
{"replication_port", config.replication_port}};
}
void from_json(nlohmann::json const &j, ReplClientInfo &config) {
config.instance_name = j.at("instance_name").get<std::string>();
config.replication_mode = j.at("replication_mode").get<replication_coordination_glue::ReplicationMode>();
config.replication_ip_address = j.at("replication_ip_address").get<std::string>();
config.replication_port = j.at("replication_port").get<uint16_t>();
}
void to_json(nlohmann::json &j, CoordinatorClientConfig const &config) {
j = nlohmann::json{{"instance_name", config.instance_name},
{"ip_address", config.ip_address},
{"port", config.port},
{"instance_health_check_frequency_sec", config.instance_health_check_frequency_sec.count()},
{"instance_down_timeout_sec", config.instance_down_timeout_sec.count()},
{"instance_get_uuid_frequency_sec", config.instance_get_uuid_frequency_sec.count()},
{"replication_client_info", config.replication_client_info}};
}
void from_json(nlohmann::json const &j, CoordinatorClientConfig &config) {
config.instance_name = j.at("instance_name").get<std::string>();
config.ip_address = j.at("ip_address").get<std::string>();
config.port = j.at("port").get<uint16_t>();
config.instance_health_check_frequency_sec =
std::chrono::seconds{j.at("instance_health_check_frequency_sec").get<int>()};
config.instance_down_timeout_sec = std::chrono::seconds{j.at("instance_down_timeout_sec").get<int>()};
config.instance_get_uuid_frequency_sec = std::chrono::seconds{j.at("instance_get_uuid_frequency_sec").get<int>()};
config.replication_client_info = j.at("replication_client_info").get<ReplClientInfo>();
}
} // namespace memgraph::coordination
#endif

View File

@ -30,77 +30,67 @@ auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const ->
return cluster_state_.IsReplica(instance_name); return cluster_state_.IsReplica(instance_name);
} }
auto CoordinatorStateMachine::CreateLog(std::string_view log) -> ptr<buffer> { auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
ptr<buffer> log_buf = buffer::alloc(sizeof(uint32_t) + log.size()); auto const log_dump = log.dump();
ptr<buffer> log_buf = buffer::alloc(sizeof(uint32_t) + log_dump.size());
buffer_serializer bs(log_buf); buffer_serializer bs(log_buf);
bs.put_str(log.data()); bs.put_str(log_dump);
return log_buf; return log_buf;
} }
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer> {
auto const str_log = fmt::format("{}*register", config.ToString()); return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}});
return CreateLog(str_log);
} }
auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*unregister", instance_name); return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}});
return CreateLog(str_log);
} }
auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*promote", instance_name); return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_name}});
return CreateLog(str_log);
} }
auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
auto const str_log = fmt::format("{}*demote", instance_name); return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
return CreateLog(str_log);
} }
auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer> { auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer> {
auto const str_log = fmt::format("{}*update_uuid", nlohmann::json{{"uuid", uuid}}.dump()); return CreateLog({{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}});
return CreateLog(str_log);
} }
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> { auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
buffer_serializer bs(data); buffer_serializer bs(data);
auto const json = nlohmann::json::parse(bs.get_str());
auto const log_str = bs.get_str(); auto const action = json["action"].get<RaftLogAction>();
auto const sep = log_str.find('*'); auto const &info = json["info"];
auto const action = log_str.substr(sep + 1);
auto const info = log_str.substr(0, sep);
if (action == "register") { switch (action) {
return {CoordinatorClientConfig::FromString(info), RaftLogAction::REGISTER_REPLICATION_INSTANCE}; case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
return {info.get<CoordinatorClientConfig>(), action};
case RaftLogAction::UPDATE_UUID:
return {info.get<utils::UUID>(), action};
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
case RaftLogAction::SET_INSTANCE_AS_MAIN:
[[fallthrough]];
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
return {info.get<std::string>(), action};
} }
if (action == "unregister") {
return {info, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE};
}
if (action == "promote") {
return {info, RaftLogAction::SET_INSTANCE_AS_MAIN};
}
if (action == "demote") {
return {info, RaftLogAction::SET_INSTANCE_AS_REPLICA};
}
if (action == "update_uuid") {
auto const json = nlohmann::json::parse(info);
return {json.at("uuid").get<utils::UUID>(), RaftLogAction::UPDATE_UUID};
}
throw std::runtime_error("Unknown action"); throw std::runtime_error("Unknown action");
} }
auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; } auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; }
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> { auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
auto const [parsed_data, log_action] = DecodeLog(data); auto const [parsed_data, log_action] = DecodeLog(data);
cluster_state_.DoAction(parsed_data, log_action); cluster_state_.DoAction(parsed_data, log_action);
last_committed_idx_ = log_idx; last_committed_idx_ = log_idx;
// TODO: (andi) Don't return nullptr
return nullptr; // Return raft log number
ptr<buffer> ret = buffer::alloc(sizeof(log_idx));
buffer_serializer bs_ret(ret);
bs_ret.put_u64(log_idx);
return ret;
} }
auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void { auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void {

View File

@ -22,12 +22,12 @@
#include <string> #include <string>
#include <fmt/format.h> #include <fmt/format.h>
#include "json/json.hpp"
namespace memgraph::coordination { namespace memgraph::coordination {
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
// TODO: (andi) JSON serialization for RAFT log.
struct CoordinatorClientConfig { struct CoordinatorClientConfig {
std::string instance_name; std::string instance_name;
std::string ip_address; std::string ip_address;
@ -43,28 +43,11 @@ struct CoordinatorClientConfig {
} }
struct ReplicationClientInfo { struct ReplicationClientInfo {
// TODO: (andi) Do we even need here instance_name for this struct?
std::string instance_name; std::string instance_name;
replication_coordination_glue::ReplicationMode replication_mode{}; replication_coordination_glue::ReplicationMode replication_mode{};
std::string replication_ip_address; std::string replication_ip_address;
uint16_t replication_port{}; uint16_t replication_port{};
auto ToString() const -> std::string {
return fmt::format("{}#{}#{}#{}", instance_name, replication_ip_address, replication_port,
replication_coordination_glue::ReplicationModeToString(replication_mode));
}
// TODO: (andi) How can I make use of monadic parsers here?
static auto FromString(std::string_view log) -> ReplicationClientInfo {
ReplicationClientInfo replication_client_info;
auto splitted = utils::Split(log, "#");
replication_client_info.instance_name = splitted[0];
replication_client_info.replication_ip_address = splitted[1];
replication_client_info.replication_port = std::stoi(splitted[2]);
replication_client_info.replication_mode = replication_coordination_glue::ReplicationModeFromString(splitted[3]);
return replication_client_info;
}
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default; friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
}; };
@ -79,25 +62,6 @@ struct CoordinatorClientConfig {
std::optional<SSL> ssl; std::optional<SSL> ssl;
auto ToString() const -> std::string {
return fmt::format("{}|{}|{}|{}|{}|{}|{}", instance_name, ip_address, port,
instance_health_check_frequency_sec.count(), instance_down_timeout_sec.count(),
instance_get_uuid_frequency_sec.count(), replication_client_info.ToString());
}
static auto FromString(std::string_view log) -> CoordinatorClientConfig {
CoordinatorClientConfig config;
auto splitted = utils::Split(log, "|");
config.instance_name = splitted[0];
config.ip_address = splitted[1];
config.port = std::stoi(splitted[2]);
config.instance_health_check_frequency_sec = std::chrono::seconds(std::stoi(splitted[3]));
config.instance_down_timeout_sec = std::chrono::seconds(std::stoi(splitted[4]));
config.instance_get_uuid_frequency_sec = std::chrono::seconds(std::stoi(splitted[5]));
config.replication_client_info = ReplicationClientInfo::FromString(splitted[6]);
return config;
}
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default; friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
}; };
@ -119,5 +83,11 @@ struct CoordinatorServerConfig {
friend bool operator==(CoordinatorServerConfig const &, CoordinatorServerConfig const &) = default; friend bool operator==(CoordinatorServerConfig const &, CoordinatorServerConfig const &) = default;
}; };
void to_json(nlohmann::json &j, CoordinatorClientConfig const &config);
void from_json(nlohmann::json const &j, CoordinatorClientConfig &config);
void to_json(nlohmann::json &j, ReplClientInfo const &config);
void from_json(nlohmann::json const &j, ReplClientInfo &config);
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -47,7 +47,7 @@ class CoordinatorStateMachine : public state_machine {
auto IsMain(std::string_view instance_name) const -> bool; auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool; auto IsReplica(std::string_view instance_name) const -> bool;
static auto CreateLog(std::string_view log) -> ptr<buffer>; static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>;
static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer>; static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr<buffer>;
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>; static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>; static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;

View File

@ -18,6 +18,8 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include "json/json.hpp"
namespace memgraph::coordination { namespace memgraph::coordination {
enum class RaftLogAction : uint8_t { enum class RaftLogAction : uint8_t {
@ -28,6 +30,14 @@ enum class RaftLogAction : uint8_t {
UPDATE_UUID UPDATE_UUID
}; };
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {
{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
{RaftLogAction::UPDATE_UUID, "update_uuid"},
})
inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction { inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction {
if (action == "register") { if (action == "register") {
return RaftLogAction::REGISTER_REPLICATION_INSTANCE; return RaftLogAction::REGISTER_REPLICATION_INSTANCE;

View File

@ -16,28 +16,15 @@
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include "json/json.hpp"
namespace memgraph::replication_coordination_glue { namespace memgraph::replication_coordination_glue {
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
inline auto ReplicationModeToString(ReplicationMode mode) -> std::string { NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationMode, {
switch (mode) { {ReplicationMode::SYNC, "sync"},
case ReplicationMode::SYNC: {ReplicationMode::ASYNC, "async"},
return "SYNC"; })
case ReplicationMode::ASYNC:
return "ASYNC";
}
throw std::invalid_argument("Invalid replication mode");
}
inline auto ReplicationModeFromString(std::string_view mode) -> ReplicationMode {
if (mode == "SYNC") {
return ReplicationMode::SYNC;
}
if (mode == "ASYNC") {
return ReplicationMode::ASYNC;
}
throw std::invalid_argument("Invalid replication mode");
}
} // namespace memgraph::replication_coordination_glue } // namespace memgraph::replication_coordination_glue

View File

@ -438,3 +438,10 @@ add_unit_test(coordination_utils.cpp)
target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue) target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include) target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif() endif()
# Test Raft log serialization
if(MG_ENTERPRISE)
add_unit_test(raft_log_serialization.cpp)
target_link_libraries(${test_prefix}raft_log_serialization gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}raft_log_serialization PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()

View File

@ -0,0 +1,151 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/coordinator_config.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/raft_log_action.hpp"
#include "utils/file.hpp"
#include "utils/uuid.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
using memgraph::coordination::CoordinatorClientConfig;
using memgraph::coordination::CoordinatorStateMachine;
using memgraph::coordination::RaftLogAction;
using memgraph::coordination::ReplClientInfo;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::utils::UUID;
class RaftLogSerialization : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_log_serialization"};
};
TEST_F(RaftLogSerialization, ReplClientInfo) {
ReplClientInfo info{"instance_name", ReplicationMode::SYNC, "127.0.0.1", 10111};
nlohmann::json j = info;
ReplClientInfo info2 = j.get<memgraph::coordination::ReplClientInfo>();
ASSERT_EQ(info, info2);
}
TEST_F(RaftLogSerialization, CoordinatorClientConfig) {
CoordinatorClientConfig config{"instance3",
"127.0.0.1",
10112,
std::chrono::seconds{1},
std::chrono::seconds{5},
std::chrono::seconds{10},
{"instance_name", ReplicationMode::ASYNC, "replication_ip_address", 10001},
.ssl = std::nullopt};
nlohmann::json j = config;
CoordinatorClientConfig config2 = j.get<memgraph::coordination::CoordinatorClientConfig>();
ASSERT_EQ(config, config2);
}
TEST_F(RaftLogSerialization, RaftLogActionRegister) {
auto action = RaftLogAction::REGISTER_REPLICATION_INSTANCE;
nlohmann::json j = action;
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
ASSERT_EQ(action, action2);
}
TEST_F(RaftLogSerialization, RaftLogActionUnregister) {
auto action = RaftLogAction::UNREGISTER_REPLICATION_INSTANCE;
nlohmann::json j = action;
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
ASSERT_EQ(action, action2);
}
TEST_F(RaftLogSerialization, RaftLogActionPromote) {
auto action = RaftLogAction::SET_INSTANCE_AS_MAIN;
nlohmann::json j = action;
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
ASSERT_EQ(action, action2);
}
TEST_F(RaftLogSerialization, RaftLogActionDemote) {
auto action = RaftLogAction::SET_INSTANCE_AS_REPLICA;
nlohmann::json j = action;
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
ASSERT_EQ(action, action2);
}
TEST_F(RaftLogSerialization, RaftLogActionUpdateUUID) {
auto action = RaftLogAction::UPDATE_UUID;
nlohmann::json j = action;
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
ASSERT_EQ(action, action2);
}
TEST_F(RaftLogSerialization, RegisterInstance) {
CoordinatorClientConfig config{"instance3",
"127.0.0.1",
10112,
std::chrono::seconds{1},
std::chrono::seconds{5},
std::chrono::seconds{10},
{"instance_name", ReplicationMode::ASYNC, "replication_ip_address", 10001},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
ASSERT_EQ(action, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
ASSERT_EQ(config, std::get<CoordinatorClientConfig>(payload));
}
TEST_F(RaftLogSerialization, UnregisterInstance) {
auto buffer = CoordinatorStateMachine::SerializeUnregisterInstance("instance3");
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
ASSERT_EQ(action, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE);
ASSERT_EQ("instance3", std::get<std::string>(payload));
}
TEST_F(RaftLogSerialization, SetInstanceAsMain) {
auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3");
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_MAIN);
ASSERT_EQ("instance3", std::get<std::string>(payload));
}
TEST_F(RaftLogSerialization, SetInstanceAsReplica) {
auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsReplica("instance3");
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_REPLICA);
ASSERT_EQ("instance3", std::get<std::string>(payload));
}
TEST_F(RaftLogSerialization, UpdateUUID) {
UUID uuid;
auto buffer = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
ASSERT_EQ(action, RaftLogAction::UPDATE_UUID);
ASSERT_EQ(uuid, std::get<UUID>(payload));
}