Started improving CoordinatorState

This commit is contained in:
Andi Skrgat 2024-03-18 11:37:13 +01:00
parent fdd49b7983
commit a999110737
7 changed files with 81 additions and 16 deletions

View File

@ -24,20 +24,13 @@
namespace memgraph::coordination {
CoordinatorState::CoordinatorState() {
MG_ASSERT(!(FLAGS_coordinator_id && FLAGS_management_port),
"Instance cannot be a coordinator and have registered coordinator server.");
spdlog::info("Executing coordinator constructor");
if (FLAGS_management_port) {
spdlog::info("Coordinator server port set");
auto const config = ManagementServerConfig{
CoordinatorState::CoordinatorState(InstanceInitConfig const &config) {
if (std::holds_alternative<ReplicationInstanceInitConfig>(config.data_)) {
auto const mgmt_config = ManagementServerConfig{
.ip_address = kDefaultReplicationServerIp,
.port = static_cast<uint16_t>(FLAGS_management_port),
.port = static_cast<uint16_t>(std::get<ReplicationInstanceInitConfig>(config.data_).management_port),
};
spdlog::info("Executing coordinator constructor main replica");
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(mgmt_config)};
}
}

View File

@ -22,9 +22,22 @@
namespace memgraph::coordination {
struct ReplicationInstanceInitConfig {
uint32_t management_port{0};
};
struct CoordinatorInstanceInitConfig {
uint32_t raft_server_id{0};
uint32_t coordinator_port{0};
};
struct InstanceInitConfig {
std::variant<ReplicationInstanceInitConfig, CoordinatorInstanceInitConfig> data_;
};
class CoordinatorState {
public:
CoordinatorState();
explicit CoordinatorState(InstanceInitConfig const &config);
~CoordinatorState() = default;
CoordinatorState(CoordinatorState const &) = delete;

View File

@ -45,7 +45,7 @@ auto RaftState::InitRaftServer() -> void {
asio_opts.thread_pool_size_ = 1;
raft_params params;
params.heart_beat_interval_ = 100;
params.heart_beat_interval_ = 150;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
params.reserved_log_items_ = 5;

View File

@ -407,7 +407,17 @@ int main(int argc, char **argv) {
// singleton coordinator state
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState coordinator_state;
memgraph::coordination::InstanceInitConfig config;
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
config.data_ = memgraph::coordination::CoordinatorInstanceInitConfig {
.raft_server_id = FLAGS_coordinator_id, .coordinator_port = FLAGS_coordinator_port
}
} else if (FLAGS_management_port) {
config.data_ = memgraph::coordination::ReplicationInstanceInitConfig { .management_port = FLAGS_management_port }
} else {
throw std::runtime_error("Coordinator or replica must be started with valid flags!");
}
memgraph::coordination::CoordinatorState coordinator_state{config};
#endif
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state

View File

@ -47,5 +47,5 @@ func main() {
read_messages("neo4j://localhost:7690") // coordinator_1
read_messages("neo4j://localhost:7691") // coordinator_2
read_messages("neo4j://localhost:7692") // coordinator_3
fmt.Println("Successfully finished running coordinator_route.go test")
fmt.Println("Successfully finished running read_route.go test")
}

View File

@ -467,3 +467,10 @@ add_unit_test(coordinator_state_machine.cpp)
target_link_libraries(${test_prefix}coordinator_state_machine gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordinator_state_machine PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()
# Test RAFT state
if(MG_ENTERPRISE)
add_unit_test(raft_state.cpp)
target_link_libraries(${test_prefix}raft_state gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}raft_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()

42
tests/unit/raft_state.cpp Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/raft_state.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
#include "libnuraft/nuraft.hxx"
using memgraph::coordination::RaftState;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
class RaftStateTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_state"};
};
TEST_F(RaftStateTest, CreationOfRaftState) {
auto become_leader_cb = []() {};
auto become_follower_cb = []() {};
auto raft_state = RaftState::MakeRaftState(std::move(become_leader_cb), std::move(become_follower_cb));
spdlog::info("Raft state instance name: {}", raft_state.InstanceName());
spdlog::info("Raft state socket address: {}", raft_state.RaftSocketAddress());
}