Callbacks for leadership change

This commit is contained in:
Andi Skrgat 2024-02-06 11:42:03 +01:00
parent 7386b786a9
commit 17ad671773
7 changed files with 26 additions and 9 deletions

View File

@ -26,7 +26,9 @@ namespace memgraph::coordination {
using nuraft::ptr;
using nuraft::srv_config;
CoordinatorInstance::CoordinatorInstance() {
CoordinatorInstance::CoordinatorInstance()
: self_([this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); }) {
auto find_instance = [](CoordinatorInstance *coord_instance,
std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
@ -219,10 +221,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
}
try {
auto *repl_instance = &repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
if (self_.IsLeader()) {
repl_instance->StartFrequentCheck();
}
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {

View File

@ -24,6 +24,7 @@
#include <list>
namespace memgraph::coordination {
class CoordinatorInstance {
public:
CoordinatorInstance();

View File

@ -19,6 +19,9 @@
namespace memgraph::coordination {
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
@ -29,7 +32,8 @@ using nuraft::state_mgr;
class RaftInstance {
public:
RaftInstance();
RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb);
RaftInstance(RaftInstance const &other) = delete;
RaftInstance &operator=(RaftInstance const &other) = delete;
RaftInstance(RaftInstance &&other) noexcept = delete;
@ -55,6 +59,9 @@ class RaftInstance {
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
BecomeLeaderCb become_leader_cb_;
BecomeFollowerCb become_follower_cb_;
};
} // namespace memgraph::coordination

View File

@ -52,6 +52,7 @@ class ReplicationInstance {
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;

View File

@ -31,8 +31,12 @@ using nuraft::raft_server;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
RaftInstance::RaftInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb)
: raft_server_id_(FLAGS_raft_server_id),
raft_port_(FLAGS_raft_server_port),
raft_address_("127.0.0.1"),
become_leader_cb_(std::move(become_leader_cb)),
become_follower_cb_(std::move(become_follower_cb)) {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
@ -55,11 +59,13 @@ RaftInstance::RaftInstance()
params.return_method_ = raft_params::blocking;
raft_server::init_options init_opts;
init_opts.raft_callback_ = [](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_();
} else if (event_type == cb_func::BecomeFollower) {
spdlog::info("Node {} became follower", param->myId);
become_follower_cb_();
}
return CbReturnCode::Ok;
};

View File

@ -75,6 +75,7 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, H
}
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }

View File

@ -111,6 +111,8 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
}
# TODO: (andi) Test that the order of setting up coordinators and instances does not matter
# TODO: (andi) Currently, these tests are flaky, depend whether Raft server was created in time.
def test_coordinators_communication():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)