Fix launcher.shutdown in NuRaft
This commit is contained in:
parent
5d5c39f862
commit
16b3df3b77
@ -28,7 +28,7 @@ namespace memgraph::coordination {
|
|||||||
|
|
||||||
using nuraft::ptr;
|
using nuraft::ptr;
|
||||||
|
|
||||||
CoordinatorInstance::CoordinatorInstance()
|
CoordinatorInstance::CoordinatorInstance(CoordinationInstanceInitConfig const &config)
|
||||||
: thread_pool_{1},
|
: thread_pool_{1},
|
||||||
raft_state_(RaftState::MakeRaftState(
|
raft_state_(RaftState::MakeRaftState(
|
||||||
[this]() {
|
[this]() {
|
||||||
@ -88,6 +88,7 @@ CoordinatorInstance::CoordinatorInstance()
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
||||||
-> ReplicationInstanceConnector & {
|
-> ReplicationInstanceConnector & {
|
||||||
auto repl_instance =
|
auto repl_instance =
|
||||||
@ -442,6 +443,8 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||||
|
spdlog::trace("Adding coordinator instance {} start in CoordinatorInstance for {}", config.coordinator_id,
|
||||||
|
raft_state_.InstanceName());
|
||||||
raft_state_.AddCoordinatorInstance(config);
|
raft_state_.AddCoordinatorInstance(config);
|
||||||
// NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log.
|
// NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log.
|
||||||
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
|
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
|
||||||
|
@ -41,7 +41,7 @@ class CoordinatorInstance {
|
|||||||
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
|
||||||
CoordinatorInstance &operator=(CoordinatorInstance &&) noexcept = delete;
|
CoordinatorInstance &operator=(CoordinatorInstance &&) noexcept = delete;
|
||||||
|
|
||||||
~CoordinatorInstance() = default;
|
~CoordinatorInstance();
|
||||||
|
|
||||||
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
||||||
-> RegisterInstanceCoordinatorStatus;
|
-> RegisterInstanceCoordinatorStatus;
|
||||||
|
@ -100,9 +100,9 @@ class RaftState {
|
|||||||
|
|
||||||
ptr<CoordinatorStateMachine> state_machine_;
|
ptr<CoordinatorStateMachine> state_machine_;
|
||||||
ptr<CoordinatorStateManager> state_manager_;
|
ptr<CoordinatorStateManager> state_manager_;
|
||||||
ptr<raft_server> raft_server_;
|
|
||||||
ptr<logger> logger_;
|
ptr<logger> logger_;
|
||||||
raft_launcher launcher_;
|
raft_launcher launcher_;
|
||||||
|
ptr<raft_server> raft_server_;
|
||||||
|
|
||||||
BecomeLeaderCb become_leader_cb_;
|
BecomeLeaderCb become_leader_cb_;
|
||||||
BecomeFollowerCb become_follower_cb_;
|
BecomeFollowerCb become_follower_cb_;
|
||||||
|
@ -109,13 +109,26 @@ auto RaftState::MakeRaftState(CoordinatorInstanceInitConfig const &config, Becom
|
|||||||
return raft_state;
|
return raft_state;
|
||||||
}
|
}
|
||||||
|
|
||||||
RaftState::~RaftState() { launcher_.shutdown(); }
|
RaftState::~RaftState() {
|
||||||
|
spdlog::trace("Shutting down RaftState for coordinator_{}", coordinator_id_);
|
||||||
|
state_machine_.reset();
|
||||||
|
state_manager_.reset();
|
||||||
|
logger_.reset();
|
||||||
|
|
||||||
|
if (!raft_server_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
raft_server_->shutdown();
|
||||||
|
raft_server_.reset();
|
||||||
|
}
|
||||||
|
|
||||||
auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); }
|
auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); }
|
||||||
|
|
||||||
auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); }
|
auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); }
|
||||||
|
|
||||||
auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||||
|
spdlog::trace("Adding coordinator instance {} start in RaftState for coordinator_{}", config.coordinator_id,
|
||||||
|
coordinator_id_);
|
||||||
auto const endpoint = config.coordinator_server.SocketAddress();
|
auto const endpoint = config.coordinator_server.SocketAddress();
|
||||||
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_id), endpoint);
|
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_id), endpoint);
|
||||||
|
|
||||||
|
@ -46,28 +46,21 @@ class CoordinatorInstanceTest : public ::testing::Test {
|
|||||||
"MG_tests_unit_coordinator_instance"};
|
"MG_tests_unit_coordinator_instance"};
|
||||||
};
|
};
|
||||||
|
|
||||||
// TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) {
|
TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) {
|
||||||
// auto const init_config =
|
auto const init_config =
|
||||||
// CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
|
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
|
||||||
//
|
|
||||||
// auto const instance1 = CoordinatorInstance{init_config};
|
|
||||||
// auto const instances = instance1.ShowInstances();
|
|
||||||
// ASSERT_EQ(instances.size(), 1);
|
|
||||||
// ASSERT_EQ(instances[0].instance_name, "coordinator_4");
|
|
||||||
// ASSERT_EQ(instances[0].health, "unknown");
|
|
||||||
// ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110");
|
|
||||||
// ASSERT_EQ(instances[0].coord_socket_address, "");
|
|
||||||
// ASSERT_EQ(instances[0].cluster_role, "coordinator");
|
|
||||||
// }
|
|
||||||
|
|
||||||
int KillCode() {
|
auto const instance1 = CoordinatorInstance{init_config};
|
||||||
int *arr = new int[100];
|
auto const instances = instance1.ShowInstances();
|
||||||
delete[] arr;
|
ASSERT_EQ(instances.size(), 1);
|
||||||
return arr[0];
|
ASSERT_EQ(instances[0].instance_name, "coordinator_4");
|
||||||
|
ASSERT_EQ(instances[0].health, "unknown");
|
||||||
|
ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110");
|
||||||
|
ASSERT_EQ(instances[0].coord_socket_address, "");
|
||||||
|
ASSERT_EQ(instances[0].cluster_role, "coordinator");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(CoordinatorInstanceTest, ConnectCoordinators) {
|
TEST_F(CoordinatorInstanceTest, ConnectCoordinators) {
|
||||||
int b = KillCode();
|
|
||||||
auto const init_config1 =
|
auto const init_config1 =
|
||||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687};
|
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user