diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 18ff0336e..2af3dbd21 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -26,7 +26,9 @@ namespace memgraph::coordination { using nuraft::ptr; using nuraft::srv_config; -CoordinatorInstance::CoordinatorInstance() { +CoordinatorInstance::CoordinatorInstance() + : self_([this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, + [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); }) { auto find_instance = [](CoordinatorInstance *coord_instance, std::string_view instance_name) -> ReplicationInstance & { auto instance = std::ranges::find_if( @@ -219,10 +221,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co } try { - auto *repl_instance = &repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); - if (self_.IsLeader()) { - repl_instance->StartFrequentCheck(); - } + repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); return RegisterInstanceCoordinatorStatus::SUCCESS; } catch (CoordinatorRegisterInstanceException const &) { diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 468b473ae..615f66e85 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -24,6 +24,7 @@ #include namespace memgraph::coordination { + class CoordinatorInstance { public: CoordinatorInstance(); diff --git a/src/coordination/include/coordination/raft_instance.hpp b/src/coordination/include/coordination/raft_instance.hpp index a851cd216..426fb7148 100644 --- a/src/coordination/include/coordination/raft_instance.hpp +++ b/src/coordination/include/coordination/raft_instance.hpp @@ -19,6 +19,9 @@ namespace memgraph::coordination { +using BecomeLeaderCb = std::function; +using BecomeFollowerCb = std::function; + using nuraft::logger; using nuraft::ptr; using nuraft::raft_launcher; @@ -29,7 +32,8 @@ using nuraft::state_mgr; class RaftInstance { public: - RaftInstance(); + RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb); + RaftInstance(RaftInstance const &other) = delete; RaftInstance &operator=(RaftInstance const &other) = delete; RaftInstance(RaftInstance &&other) noexcept = delete; @@ -55,6 +59,9 @@ class RaftInstance { uint32_t raft_server_id_; uint32_t raft_port_; std::string raft_address_; + + BecomeLeaderCb become_leader_cb_; + BecomeFollowerCb become_follower_cb_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index f090bdfd4..eb9eb29fa 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -52,6 +52,7 @@ class ReplicationInstance { auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; auto StartFrequentCheck() -> void; + auto StopFrequentCheck() -> void; auto PauseFrequentCheck() -> void; auto ResumeFrequentCheck() -> void; diff --git a/src/coordination/raft_instance.cpp b/src/coordination/raft_instance.cpp index dec80ad6d..f8045809a 100644 --- a/src/coordination/raft_instance.cpp +++ b/src/coordination/raft_instance.cpp @@ -31,8 +31,12 @@ using nuraft::raft_server; using nuraft::srv_config; using raft_result = cmd_result>; -RaftInstance::RaftInstance() - : raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") { +RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) + : raft_server_id_(FLAGS_raft_server_id), + raft_port_(FLAGS_raft_server_port), + raft_address_("127.0.0.1"), + become_leader_cb_(std::move(become_leader_cb)), + become_follower_cb_(std::move(become_follower_cb)) { auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_); state_manager_ = cs_new(raft_server_id_, raft_endpoint); state_machine_ = cs_new(); @@ -55,11 +59,13 @@ RaftInstance::RaftInstance() params.return_method_ = raft_params::blocking; raft_server::init_options init_opts; - init_opts.raft_callback_ = [](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { + init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { if (event_type == cb_func::BecomeLeader) { spdlog::info("Node {} became leader", param->leaderId); + become_leader_cb_(); } else if (event_type == cb_func::BecomeFollower) { spdlog::info("Node {} became follower", param->myId); + become_follower_cb_(); } return CbReturnCode::Ok; }; diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index d28d545d7..73d6baa44 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -75,6 +75,7 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, H } auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); } +auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); } auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } diff --git a/tests/e2e/high_availability_experimental/distributed_coordinators.py b/tests/e2e/high_availability_experimental/distributed_coordinators.py index 3b0556ee7..7062edf56 100644 --- a/tests/e2e/high_availability_experimental/distributed_coordinators.py +++ b/tests/e2e/high_availability_experimental/distributed_coordinators.py @@ -111,6 +111,8 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { } +# TODO: (andi) Test that the order of setting up coordinators and instances does not matter +# TODO: (andi) Currently, these tests are flaky, depend whether Raft server was created in time. def test_coordinators_communication(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)