Request leadership on registering instance

This commit is contained in:
Andi Skrgat 2024-02-06 14:32:57 +01:00
parent 17ad671773
commit 1ecf6ddab2
9 changed files with 81 additions and 2 deletions

View File

@ -42,6 +42,10 @@ auto CoordinatorClient::InstanceName() const -> std::string { return config_.ins
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
void CoordinatorClient::StartFrequentCheck() {
if (instance_checker_.IsRunning()) {
return;
}
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");

View File

@ -220,13 +220,33 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
if (!self_.RequestLeadership()) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
// auto const res = self_.AppendRegisterReplicationInstance(config.instance_name);
// if (res->get_accepted()) {
// spdlog::info("Request for registering instance {} accepted", config.instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
// } else {
// spdlog::error(
// "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not the
// " "leader.", config.instance_name);
// return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
// }
// if (res->get_result_code() != nuraft::cmd_result_code::OK) {
// spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
// return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
// }
spdlog::info("Instance {} registered", config.instance_name);
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)

View File

@ -50,5 +50,16 @@ class RaftAddServerException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException)
};
class RaftBecomeLeaderException final : public utils::BasicException {
public:
explicit RaftBecomeLeaderException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftBecomeLeaderException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftBecomeLeaderException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftBecomeLeaderException)
};
} // namespace memgraph::coordination
#endif

View File

@ -22,6 +22,7 @@ namespace memgraph::coordination {
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
using nuraft::buffer;
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
@ -29,6 +30,7 @@ using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
using raft_result = nuraft::cmd_result<ptr<buffer>>;
class RaftInstance {
public:
@ -46,8 +48,11 @@ class RaftInstance {
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result>;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;

View File

@ -22,6 +22,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
ENDPOINT_EXISTS,
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
SUCCESS
};

View File

@ -36,6 +36,19 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
}
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string {
buffer_serializer bs(data);
return bs.get_str();
}
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override;

View File

@ -72,6 +72,7 @@ RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb bec
raft_server_ = launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts,
params, init_opts);
raft_server_->request_leadership();
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
@ -111,5 +112,17 @@ auto RaftInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
auto RaftInstance::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftInstance::RequestLeadership() -> bool {
if (!raft_server_->is_leader()) {
raft_server_->request_leadership();
}
return raft_server_->is_leader();
}
auto RaftInstance::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> {
auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance);
return raft_server_->append_entries({new_log});
}
} // namespace memgraph::coordination
#endif

View File

@ -25,6 +25,8 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
auto ReplicationInstance::OnSuccessPing() -> void {

View File

@ -500,6 +500,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"Couldn't register replica instance since instance with such endpoint already exists!");
case NOT_COORDINATOR:
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "