HA coordinators

This commit is contained in:
Andi Skrgat 2024-02-27 13:43:51 +01:00
parent 2b3ae953b1
commit f6f29107a6
12 changed files with 300 additions and 1319 deletions

View File

@ -41,7 +41,9 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
fail_cb_{std::move(fail_cb)} {}
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); }
auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); }
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
return config_.instance_down_timeout_sec;
@ -64,7 +66,7 @@ void CoordinatorClient::StartFrequentCheck() {
[this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
config_.CoordinatorSocketAddress());
{ // NOTE: This is intentionally scoped so that stream lock could get released.
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,8 @@ class CoordinatorClient {
void ResumeFrequentCheck();
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
[[nodiscard]] auto DemoteToReplica() const -> bool;

View File

@ -35,7 +35,11 @@ struct CoordinatorClientConfig {
std::chrono::seconds instance_down_timeout_sec{5};
std::chrono::seconds instance_get_uuid_frequency_sec{10};
auto SocketAddress() const -> std::string { return fmt::format("{}:{}", ip_address, port); }
auto CoordinatorSocketAddress() const -> std::string { return fmt::format("{}:{}", ip_address, port); }
auto ReplicationSocketAddress() const -> std::string {
return fmt::format("{}:{}", replication_client_info.replication_ip_address,
replication_client_info.replication_port);
}
struct ReplicationClientInfo {
// TODO: (andi) Do we even need here instance_name for this struct?

View File

@ -48,6 +48,8 @@ class CoordinatorInstance {
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
private:
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
@ -63,10 +65,8 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view);
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
// NOTE: Only leader should have repl_instances_, not followers.
// NOTE: Must be std::list because we rely on pointer stability.
// Leader and followers should both have same view on repl_instances_
std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{};

View File

@ -53,7 +53,7 @@ class RaftState {
RaftState &operator=(RaftState &&other) noexcept = default;
~RaftState();
static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb,
static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb,
OnRaftCommitCb raft_commit_cb) -> RaftState;
auto InstanceName() const -> std::string;

View File

@ -19,9 +19,9 @@ namespace memgraph::coordination {
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
ENDPOINT_EXISTS,
COORD_ENDPOINT_EXISTS,
REPL_ENDPOINT_EXISTS,
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,

View File

@ -51,7 +51,8 @@ class ReplicationInstance {
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
auto PromoteToMainAsLeader(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb)
@ -62,8 +63,10 @@ class ReplicationInstance {
auto DemoteToReplicaAsLeader(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
auto DemoteToReplicaAsFollower(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> void;
auto SendDemoteToReplicaRpc() -> bool;
auto StartFrequentCheck() -> void;

View File

@ -90,7 +90,7 @@ auto RaftState::InitRaftServer() -> void {
throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_);
}
auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb,
auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb,
OnRaftCommitCb raft_commit_cb) -> RaftState {
uint32_t raft_server_id{0};
uint32_t raft_port{0};

View File

@ -45,7 +45,8 @@ auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
}
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); }
auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstance::PromoteToMainAsLeader(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,

View File

@ -478,9 +478,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!");
case ENDPOINT_EXISTS:
case COORD_ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
"Couldn't register replica instance since instance with such coordinator endpoint already exists!");
case REPL_ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such replication endpoint already exists!");
case NOT_COORDINATOR:
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
@ -491,10 +494,6 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
"find out more info!");
case SUCCESS:
break;
}

View File

@ -1302,7 +1302,10 @@ def test_registering_replica_fails_endpoint_exists():
coord_cursor,
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"
assert (
str(e.value)
== "Couldn't register replica instance since instance with such coordinator endpoint already exists!"
)
def test_replica_instance_restarts():