CoordinatorInstance testing
This commit is contained in:
parent
ef85e8ae61
commit
03f5cc2f33
@ -30,8 +30,10 @@ auto CreateClientContext(memgraph::coordination::CoordinatorToReplicaConfig cons
|
||||
}
|
||||
} // namespace
|
||||
|
||||
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
|
||||
ReplicationInstanceClient::ReplicationInstanceClient(CoordinatorInstance *coord_instance,
|
||||
CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb)
|
||||
: rpc_context_{CreateClientContext(config)},
|
||||
rpc_client_{config.mgt_server, &rpc_context_},
|
||||
config_{std::move(config)},
|
||||
@ -39,20 +41,24 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
|
||||
succ_cb_{std::move(succ_cb)},
|
||||
fail_cb_{std::move(fail_cb)} {}
|
||||
|
||||
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
|
||||
auto ReplicationInstanceClient::InstanceName() const -> std::string { return config_.instance_name; }
|
||||
|
||||
auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); }
|
||||
auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); }
|
||||
auto ReplicationInstanceClient::CoordinatorSocketAddress() const -> std::string {
|
||||
return config_.CoordinatorSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceClient::ReplicationSocketAddress() const -> std::string {
|
||||
return config_.ReplicationSocketAddress();
|
||||
}
|
||||
|
||||
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
|
||||
auto ReplicationInstanceClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
|
||||
return config_.instance_down_timeout_sec;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
|
||||
auto ReplicationInstanceClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
|
||||
return config_.instance_get_uuid_frequency_sec;
|
||||
}
|
||||
|
||||
void CoordinatorClient::StartFrequentCheck() {
|
||||
void ReplicationInstanceClient::StartFrequentCheck() {
|
||||
if (instance_checker_.IsRunning()) {
|
||||
return;
|
||||
}
|
||||
@ -81,16 +87,17 @@ void CoordinatorClient::StartFrequentCheck() {
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
|
||||
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
|
||||
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
|
||||
void ReplicationInstanceClient::StopFrequentCheck() { instance_checker_.Stop(); }
|
||||
void ReplicationInstanceClient::PauseFrequentCheck() { instance_checker_.Pause(); }
|
||||
void ReplicationInstanceClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
|
||||
|
||||
auto CoordinatorClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
auto ReplicationInstanceClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
return config_.replication_client_info;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
ReplicationClientsInfo replication_clients_info) const -> bool {
|
||||
auto ReplicationInstanceClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
ReplicationClientsInfo replication_clients_info) const
|
||||
-> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(uuid, std::move(replication_clients_info))};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -104,7 +111,7 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::DemoteToReplica() const -> bool {
|
||||
auto ReplicationInstanceClient::DemoteToReplica() const -> bool {
|
||||
auto const &instance_name = config_.instance_name;
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)};
|
||||
@ -120,7 +127,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
|
||||
auto ReplicationInstanceClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -134,7 +141,7 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bo
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
|
||||
auto ReplicationInstanceClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -148,7 +155,7 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name)
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceUUIDRpc() const
|
||||
auto ReplicationInstanceClient::SendGetInstanceUUIDRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<GetInstanceUUIDRpc>()};
|
||||
@ -160,7 +167,7 @@ auto CoordinatorClient::SendGetInstanceUUIDRpc() const
|
||||
}
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
auto ReplicationInstanceClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<EnableWritingOnMainRpc>()};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
@ -174,7 +181,7 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
|
||||
auto ReplicationInstanceClient::SendGetInstanceTimestampsRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
|
||||
|
@ -27,7 +27,6 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using nuraft::ptr;
|
||||
using nuraft::srv_config;
|
||||
|
||||
CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config)
|
||||
: thread_pool_{1},
|
||||
@ -90,9 +89,10 @@ CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &co
|
||||
};
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
|
||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
|
||||
-> ReplicationInstanceConnector & {
|
||||
auto repl_instance =
|
||||
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
|
||||
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == replication_instance_name;
|
||||
});
|
||||
|
||||
@ -111,18 +111,18 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status);
|
||||
|
||||
if (raft_state_.IsLeader()) {
|
||||
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
||||
auto const stringify_repl_role = [this](ReplicationInstanceConnector const &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main";
|
||||
return "replica";
|
||||
};
|
||||
|
||||
auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string {
|
||||
auto const stringify_repl_health = [](ReplicationInstanceConnector const &instance) -> std::string {
|
||||
return instance.IsAlive() ? "up" : "down";
|
||||
};
|
||||
|
||||
auto process_repl_instance_as_leader =
|
||||
[&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus {
|
||||
[&stringify_repl_role, &stringify_repl_health](ReplicationInstanceConnector const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.InstanceName(),
|
||||
.coord_socket_address = instance.CoordinatorSocketAddress(),
|
||||
.cluster_role = stringify_repl_role(instance),
|
||||
@ -161,19 +161,26 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::TryFailover() -> void {
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||
auto const is_replica = [this](ReplicationInstanceConnector const &instance) {
|
||||
return HasReplicaState(instance.InstanceName());
|
||||
};
|
||||
|
||||
auto alive_replicas =
|
||||
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
|
||||
auto alive_replicas = repl_instances_ | ranges::views::filter(is_replica) |
|
||||
ranges::views::filter(&ReplicationInstanceConnector::IsAlive);
|
||||
|
||||
if (ranges::empty(alive_replicas)) {
|
||||
spdlog::warn("Failover failed since all replicas are down!");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
spdlog::error("Failover failed since the instance is not the leader!");
|
||||
return;
|
||||
}
|
||||
|
||||
auto const get_ts = [](ReplicationInstanceConnector &replica) {
|
||||
return replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||
};
|
||||
|
||||
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||
|
||||
@ -207,13 +214,13 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
|
||||
auto const is_not_new_main = [&new_main](ReplicationInstanceConnector &instance) {
|
||||
return instance.InstanceName() != new_main->InstanceName();
|
||||
};
|
||||
|
||||
auto const new_main_uuid = utils::UUID{};
|
||||
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
|
||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||
};
|
||||
@ -226,7 +233,7 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
}
|
||||
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
@ -269,7 +276,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const is_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
};
|
||||
|
||||
@ -288,13 +295,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const is_not_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() != instance_name;
|
||||
};
|
||||
|
||||
auto const new_main_uuid = utils::UUID{};
|
||||
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
|
||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||
};
|
||||
@ -305,7 +312,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
}
|
||||
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
@ -335,19 +342,20 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
||||
return RegisterInstanceCoordinatorStatus::LOCK_OPENED;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
})) {
|
||||
if (std::ranges::any_of(repl_instances_,
|
||||
[instance_name = config.instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
|
||||
return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress();
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS;
|
||||
}
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
|
||||
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
|
||||
return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress();
|
||||
})) {
|
||||
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
|
||||
@ -393,7 +401,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
|
||||
auto const name_matches = [&instance_name](ReplicationInstanceConnector const &instance) {
|
||||
return instance.InstanceName() == instance_name;
|
||||
};
|
||||
|
||||
@ -402,7 +410,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||
}
|
||||
|
||||
auto const is_current_main = [this](ReplicationInstance const &instance) {
|
||||
auto const is_current_main = [this](ReplicationInstanceConnector const &instance) {
|
||||
return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive();
|
||||
};
|
||||
|
||||
@ -626,55 +634,10 @@ auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const
|
||||
return raft_state_.HasReplicaState(instance_name);
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
|
||||
auto res = RoutingTable{};
|
||||
|
||||
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
|
||||
return instance.config.BoltSocketAddress();
|
||||
};
|
||||
|
||||
// TODO: (andi) This is wrong check, Fico will correct in #1819.
|
||||
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::MAIN;
|
||||
};
|
||||
|
||||
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
auto const &raft_log_repl_instances = raft_state_.GetReplicationInstances();
|
||||
|
||||
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
|
||||
|
||||
if (!std::ranges::empty(bolt_mains)) {
|
||||
res.emplace_back(std::move(bolt_mains), "WRITE");
|
||||
}
|
||||
|
||||
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
if (!std::ranges::empty(bolt_replicas)) {
|
||||
res.emplace_back(std::move(bolt_replicas), "READ");
|
||||
}
|
||||
|
||||
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
|
||||
return instance.config.bolt_server.SocketAddress();
|
||||
};
|
||||
|
||||
auto const &raft_log_coord_instances = raft_state_.GetCoordinatorInstances();
|
||||
auto bolt_coords =
|
||||
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
|
||||
|
||||
auto const &local_bolt_coord = routing.find("address");
|
||||
if (local_bolt_coord == routing.end()) {
|
||||
throw InvalidRoutingTableException("No bolt address found in routing table for the current coordinator!");
|
||||
}
|
||||
|
||||
bolt_coords.push_back(local_bolt_coord->second);
|
||||
res.emplace_back(std::move(bolt_coords), "ROUTE");
|
||||
|
||||
return res;
|
||||
// TODO: (andi) Remove accepting param
|
||||
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const & /*routing*/) const
|
||||
-> RoutingTable {
|
||||
return raft_state_.GetRoutingTable();
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -27,18 +27,18 @@ class CoordinatorInstance;
|
||||
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
|
||||
using ReplicationClientsInfo = std::vector<ReplicationClientInfo>;
|
||||
|
||||
class CoordinatorClient {
|
||||
class ReplicationInstanceClient {
|
||||
public:
|
||||
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
|
||||
explicit ReplicationInstanceClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
|
||||
|
||||
~CoordinatorClient() = default;
|
||||
~ReplicationInstanceClient() = default;
|
||||
|
||||
CoordinatorClient(CoordinatorClient &) = delete;
|
||||
CoordinatorClient &operator=(CoordinatorClient const &) = delete;
|
||||
ReplicationInstanceClient(ReplicationInstanceClient &) = delete;
|
||||
ReplicationInstanceClient &operator=(ReplicationInstanceClient const &) = delete;
|
||||
|
||||
CoordinatorClient(CoordinatorClient &&) noexcept = delete;
|
||||
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
|
||||
ReplicationInstanceClient(ReplicationInstanceClient &&) noexcept = delete;
|
||||
ReplicationInstanceClient &operator=(ReplicationInstanceClient &&) noexcept = delete;
|
||||
|
||||
void StartFrequentCheck();
|
||||
void StopFrequentCheck();
|
||||
@ -73,7 +73,7 @@ class CoordinatorClient {
|
||||
|
||||
auto InstanceGetUUIDFrequencySec() const -> std::chrono::seconds;
|
||||
|
||||
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
||||
friend bool operator==(ReplicationInstanceClient const &first, ReplicationInstanceClient const &second) {
|
||||
return first.config_ == second.config_;
|
||||
}
|
||||
|
||||
|
@ -26,8 +26,6 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
|
||||
|
||||
struct NewMainRes {
|
||||
std::string most_up_to_date_instance;
|
||||
std::string latest_epoch;
|
||||
@ -56,9 +54,9 @@ class CoordinatorInstance {
|
||||
|
||||
auto TryFailover() -> void;
|
||||
|
||||
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void;
|
||||
|
||||
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable;
|
||||
auto GetRoutingTable(std::map<std::string, std::string> const &routing) const -> RoutingTable;
|
||||
|
||||
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
|
||||
|
||||
@ -67,7 +65,7 @@ class CoordinatorInstance {
|
||||
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||
|
||||
private:
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstanceConnector &;
|
||||
|
||||
void MainFailCallback(std::string_view);
|
||||
|
||||
@ -79,7 +77,8 @@ class CoordinatorInstance {
|
||||
|
||||
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
||||
// NOTE: Must be std::list because we rely on pointer stability.
|
||||
std::list<ReplicationInstance> repl_instances_;
|
||||
// TODO: (andi) Rename + virtualize for mocking.
|
||||
std::list<ReplicationInstanceConnector> repl_instances_;
|
||||
mutable utils::ResourceLock coord_instance_lock_{};
|
||||
|
||||
// Thread pool needs to be constructed before raft state as raft state can call thread pool
|
||||
|
@ -27,6 +27,7 @@ struct CoordinatorToReplicaConfig;
|
||||
|
||||
using BecomeLeaderCb = std::function<void()>;
|
||||
using BecomeFollowerCb = std::function<void()>;
|
||||
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
|
||||
|
||||
using nuraft::buffer;
|
||||
using nuraft::logger;
|
||||
@ -77,6 +78,7 @@ class RaftState {
|
||||
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
|
||||
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
|
||||
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
||||
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
|
||||
|
||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||
@ -90,6 +92,7 @@ class RaftState {
|
||||
auto GetInstanceUUID(std::string_view) const -> utils::UUID;
|
||||
|
||||
auto IsLockOpened() const -> bool;
|
||||
auto GetRoutingTable() const -> RoutingTable;
|
||||
|
||||
private:
|
||||
io::network::Endpoint raft_endpoint_;
|
||||
|
@ -26,21 +26,21 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorInstance;
|
||||
class ReplicationInstance;
|
||||
|
||||
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
|
||||
|
||||
class ReplicationInstance {
|
||||
class ReplicationInstanceConnector {
|
||||
public:
|
||||
ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb);
|
||||
ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb);
|
||||
|
||||
ReplicationInstance(ReplicationInstance const &other) = delete;
|
||||
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
|
||||
ReplicationInstance(ReplicationInstance &&other) noexcept = delete;
|
||||
ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete;
|
||||
~ReplicationInstance() = default;
|
||||
ReplicationInstanceConnector(ReplicationInstanceConnector const &other) = delete;
|
||||
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector const &other) = delete;
|
||||
ReplicationInstanceConnector(ReplicationInstanceConnector &&other) noexcept = delete;
|
||||
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector &&other) noexcept = delete;
|
||||
~ReplicationInstanceConnector() = default;
|
||||
|
||||
auto OnSuccessPing() -> void;
|
||||
auto OnFailPing() -> bool;
|
||||
@ -75,7 +75,7 @@ class ReplicationInstance {
|
||||
auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool;
|
||||
|
||||
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
|
||||
auto GetClient() -> CoordinatorClient &;
|
||||
auto GetClient() -> ReplicationInstanceClient &;
|
||||
|
||||
auto EnableWritingOnMain() -> bool;
|
||||
|
||||
@ -83,7 +83,7 @@ class ReplicationInstance {
|
||||
auto GetFailCallback() -> HealthCheckInstanceCallback &;
|
||||
|
||||
private:
|
||||
CoordinatorClient client_;
|
||||
ReplicationInstanceClient client_;
|
||||
std::chrono::system_clock::time_point last_response_time_{};
|
||||
bool is_alive_{false};
|
||||
std::chrono::system_clock::time_point last_check_of_uuid_{};
|
||||
@ -91,7 +91,7 @@ class ReplicationInstance {
|
||||
HealthCheckInstanceCallback succ_cb_;
|
||||
HealthCheckInstanceCallback fail_cb_;
|
||||
|
||||
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
|
||||
friend bool operator==(ReplicationInstanceConnector const &first, ReplicationInstanceConnector const &second) {
|
||||
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
|
||||
first.is_alive_ == second.is_alive_;
|
||||
}
|
||||
|
@ -10,12 +10,16 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include <chrono>
|
||||
|
||||
#include "coordination/raft_state.hpp"
|
||||
|
||||
#include "coordination/coordinator_communication_config.hpp"
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/raft_state.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <chrono>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
@ -45,7 +49,7 @@ auto RaftState::InitRaftServer() -> void {
|
||||
asio_opts.thread_pool_size_ = 1;
|
||||
|
||||
raft_params params;
|
||||
params.heart_beat_interval_ = 150;
|
||||
params.heart_beat_interval_ = 100;
|
||||
params.election_timeout_lower_bound_ = 200;
|
||||
params.election_timeout_upper_bound_ = 400;
|
||||
params.reserved_log_items_ = 5;
|
||||
@ -415,5 +419,50 @@ auto RaftState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstan
|
||||
return state_machine_->GetCoordinatorInstances();
|
||||
}
|
||||
|
||||
auto RaftState::GetRoutingTable() const -> RoutingTable {
|
||||
auto res = RoutingTable{};
|
||||
|
||||
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
|
||||
return instance.config.BoltSocketAddress();
|
||||
};
|
||||
|
||||
// TODO: (andi) This is wrong check, Fico will correct in #1819.
|
||||
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::MAIN;
|
||||
};
|
||||
|
||||
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
|
||||
return instance.status == ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
auto const &raft_log_repl_instances = GetReplicationInstances();
|
||||
|
||||
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
|
||||
|
||||
if (!std::ranges::empty(bolt_mains)) {
|
||||
res.emplace_back(std::move(bolt_mains), "WRITE");
|
||||
}
|
||||
|
||||
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
|
||||
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
|
||||
if (!std::ranges::empty(bolt_replicas)) {
|
||||
res.emplace_back(std::move(bolt_replicas), "READ");
|
||||
}
|
||||
|
||||
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
|
||||
return instance.config.bolt_server.SocketAddress();
|
||||
};
|
||||
|
||||
auto const &raft_log_coord_instances = GetCoordinatorInstances();
|
||||
auto bolt_coords =
|
||||
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
|
||||
|
||||
res.emplace_back(std::move(bolt_coords), "ROUTE");
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -20,38 +20,43 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb)
|
||||
ReplicationInstanceConnector::ReplicationInstanceConnector(CoordinatorInstance *peer, CoordinatorToReplicaConfig config,
|
||||
HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb)
|
||||
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
|
||||
succ_cb_(succ_instance_cb),
|
||||
fail_cb_(fail_instance_cb) {}
|
||||
|
||||
auto ReplicationInstance::OnSuccessPing() -> void {
|
||||
auto ReplicationInstanceConnector::OnSuccessPing() -> void {
|
||||
last_response_time_ = std::chrono::system_clock::now();
|
||||
is_alive_ = true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::OnFailPing() -> bool {
|
||||
auto ReplicationInstanceConnector::OnFailPing() -> bool {
|
||||
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
|
||||
is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec();
|
||||
return is_alive_;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
|
||||
auto ReplicationInstanceConnector::IsReadyForUUIDPing() -> bool {
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_) >
|
||||
client_.InstanceGetUUIDFrequencySec();
|
||||
}
|
||||
|
||||
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
|
||||
auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); }
|
||||
auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); }
|
||||
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
|
||||
auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_.InstanceName(); }
|
||||
auto ReplicationInstanceConnector::CoordinatorSocketAddress() const -> std::string {
|
||||
return client_.CoordinatorSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceConnector::ReplicationSocketAddress() const -> std::string {
|
||||
return client_.ReplicationSocketAddress();
|
||||
}
|
||||
auto ReplicationInstanceConnector::IsAlive() const -> bool { return is_alive_; }
|
||||
|
||||
auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb,
|
||||
HealthCheckInstanceCallback main_fail_cb) -> bool {
|
||||
auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb,
|
||||
HealthCheckInstanceCallback main_fail_cb) -> bool {
|
||||
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
|
||||
return false;
|
||||
}
|
||||
@ -62,10 +67,10 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
|
||||
auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
|
||||
|
||||
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
|
||||
HealthCheckInstanceCallback replica_fail_cb) -> bool {
|
||||
auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
|
||||
HealthCheckInstanceCallback replica_fail_cb) -> bool {
|
||||
if (!client_.DemoteToReplica()) {
|
||||
return false;
|
||||
}
|
||||
@ -76,21 +81,24 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_su
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
|
||||
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
|
||||
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
|
||||
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
|
||||
auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
|
||||
|
||||
auto ReplicationInstance::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
auto ReplicationInstanceConnector::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
|
||||
return client_.ReplicationClientInfo();
|
||||
}
|
||||
|
||||
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
|
||||
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
|
||||
auto ReplicationInstanceConnector::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
|
||||
auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
|
||||
|
||||
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
|
||||
auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return client_; }
|
||||
|
||||
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
|
||||
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
|
||||
|
||||
auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
if (!IsReadyForUUIDPing()) {
|
||||
return true;
|
||||
}
|
||||
@ -108,25 +116,24 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur
|
||||
return SendSwapAndUpdateUUID(curr_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
||||
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
|
||||
return replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
||||
auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
|
||||
return client_.SendUnregisterReplicaRpc(instance_name);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); }
|
||||
auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); }
|
||||
|
||||
auto ReplicationInstance::SendGetInstanceUUID()
|
||||
auto ReplicationInstanceConnector::SendGetInstanceUUID()
|
||||
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
return client_.SendGetInstanceUUIDRpc();
|
||||
}
|
||||
|
||||
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
|
||||
void ReplicationInstanceConnector::UpdateReplicaLastResponseUUID() {
|
||||
last_check_of_uuid_ = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -453,14 +453,6 @@ target_link_libraries(${test_prefix}coordinator_cluster_state gflags mg-coordina
|
||||
target_include_directories(${test_prefix}coordinator_cluster_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# TODO: (andi) Move into appropriate test suite
|
||||
# Test Routing table
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(routing_table.cpp)
|
||||
target_link_libraries(${test_prefix}routing_table gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}routing_table PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test coordinator state machine
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(coordinator_state_machine.cpp)
|
||||
@ -474,3 +466,10 @@ add_unit_test(raft_state.cpp)
|
||||
target_link_libraries(${test_prefix}raft_state gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}raft_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
# Test coordinator instance
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(coordinator_instance.cpp)
|
||||
target_link_libraries(${test_prefix}coordinator_instance gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}coordinator_instance PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
@ -16,6 +16,8 @@
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
|
||||
class CoordinationUtils : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
@ -60,7 +62,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
@ -112,7 +117,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
@ -167,7 +174,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
@ -226,7 +235,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
memgraph::coordination::CoordinatorInstance instance{init_config1};
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "libnuraft/nuraft.hxx"
|
||||
|
||||
using memgraph::coordination::CoordinatorClusterState;
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorInstanceState;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
@ -45,7 +46,9 @@ class CoordinatorClusterStateTest : public ::testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
@ -65,13 +68,15 @@ TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) {
|
||||
ASSERT_EQ(instances.size(), 1);
|
||||
ASSERT_EQ(instances[0].config, config);
|
||||
ASSERT_EQ(instances[0].status, ReplicationRole::REPLICA);
|
||||
ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 0);
|
||||
ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 1);
|
||||
|
||||
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
@ -92,7 +97,9 @@ TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
@ -135,7 +142,10 @@ TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
{
|
||||
auto config =
|
||||
CoordinatorToReplicaConfig{.instance_name = "instance3",
|
||||
@ -180,23 +190,27 @@ TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, UpdateUUID) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
auto uuid = UUID();
|
||||
cluster_state.DoAction(uuid, RaftLogAction::UPDATE_UUID);
|
||||
ASSERT_EQ(cluster_state.GetUUID(), uuid);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) {
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
||||
CoordinatorClusterState cluster_state;
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
|
||||
|
||||
auto instances = cluster_state.GetCoordinatorInstances();
|
||||
ASSERT_EQ(instances.size(), 1);
|
||||
ASSERT_EQ(instances[0].config, config);
|
||||
ASSERT_EQ(instances.size(), 2);
|
||||
ASSERT_EQ(instances[1].config, config);
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
|
||||
@ -231,7 +245,10 @@ TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorClusterStateTest, Marshalling) {
|
||||
CoordinatorClusterState cluster_state;
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
|
||||
|
||||
CoordinatorClusterState cluster_state{init_config1};
|
||||
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
|
||||
|
92
tests/unit/coordinator_instance.cpp
Normal file
92
tests/unit/coordinator_instance.cpp
Normal file
@ -0,0 +1,92 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "interpreter_faker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/replication_handler.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstance;
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication::ReplicationHandler;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::storage::Config;
|
||||
|
||||
class CoordinatorInstanceTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
|
||||
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_tests_unit_coordinator_instance"};
|
||||
};
|
||||
|
||||
TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) {
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
|
||||
|
||||
auto const instance1 = CoordinatorInstance{init_config};
|
||||
auto const instances = instance1.ShowInstances();
|
||||
ASSERT_EQ(instances.size(), 1);
|
||||
ASSERT_EQ(instances[0].instance_name, "coordinator_4");
|
||||
ASSERT_EQ(instances[0].health, "unknown");
|
||||
ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110");
|
||||
ASSERT_EQ(instances[0].coord_socket_address, "");
|
||||
ASSERT_EQ(instances[0].cluster_role, "coordinator");
|
||||
}
|
||||
|
||||
TEST_F(CoordinatorInstanceTest, ConnectCoordinators) {
|
||||
auto const init_config2 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 2, .coordinator_port = 10112, .bolt_port = 7688};
|
||||
|
||||
auto const instance2 = CoordinatorInstance{init_config2};
|
||||
|
||||
auto const init_config3 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 3, .coordinator_port = 10113, .bolt_port = 7689};
|
||||
|
||||
auto const instance3 = CoordinatorInstance{init_config3};
|
||||
|
||||
auto const init_config1 =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687};
|
||||
|
||||
auto instance1 = CoordinatorInstance{init_config1};
|
||||
|
||||
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 2,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10112}});
|
||||
|
||||
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 3,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10113}});
|
||||
|
||||
auto const instances = instance1.ShowInstances();
|
||||
ASSERT_EQ(instances.size(), 3);
|
||||
ASSERT_EQ(instances[0].instance_name, "coordinator_1");
|
||||
ASSERT_EQ(instances[1].instance_name, "coordinator_2");
|
||||
ASSERT_EQ(instances[2].instance_name, "coordinator_3");
|
||||
}
|
@ -20,8 +20,11 @@
|
||||
|
||||
using memgraph::coordination::CoordinatorInstanceInitConfig;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using nuraft::ptr;
|
||||
|
||||
class RaftStateTest : public ::testing::Test {
|
||||
@ -54,3 +57,82 @@ TEST_F(RaftStateTest, RaftStateEmptyMetadata) {
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 1234}};
|
||||
ASSERT_EQ(coord_instance.config, coord_config);
|
||||
}
|
||||
|
||||
TEST_F(RaftStateTest, GetSingleRouterRoutingTable) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10112, .bolt_port = 7688};
|
||||
|
||||
auto const raft_state =
|
||||
RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
auto routing_table = raft_state.GetRoutingTable();
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 1);
|
||||
|
||||
auto const routers = routing_table[0];
|
||||
ASSERT_EQ(routers.first, std::vector<std::string>{"127.0.0.1:7688"});
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
TEST_F(RaftStateTest, GetMixedRoutingTable) {
|
||||
auto become_leader_cb = []() {};
|
||||
auto become_follower_cb = []() {};
|
||||
auto const init_config =
|
||||
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10113, .bolt_port = 7690};
|
||||
auto leader = RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance1",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10011},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance1",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}}});
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10012},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7688},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10002}}});
|
||||
|
||||
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance3",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10013},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance3",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10003}}});
|
||||
|
||||
leader.AppendAddCoordinatorInstanceLog(
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 2,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7691},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10114}});
|
||||
|
||||
leader.AppendAddCoordinatorInstanceLog(
|
||||
CoordinatorToCoordinatorConfig{.coordinator_id = 3,
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7692},
|
||||
.coordinator_server = Endpoint{"127.0.0.1", 10115}});
|
||||
|
||||
leader.AppendSetInstanceAsMainLog("instance1");
|
||||
|
||||
auto const routing_table = leader.GetRoutingTable();
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 3);
|
||||
|
||||
auto const &mains = routing_table[0];
|
||||
ASSERT_EQ(mains.second, "WRITE");
|
||||
ASSERT_EQ(mains.first, std::vector<std::string>{"127.0.0.1:7687"});
|
||||
|
||||
auto const &replicas = routing_table[1];
|
||||
ASSERT_EQ(replicas.second, "READ");
|
||||
auto const expected_replicas = std::vector<std::string>{"127.0.0.1:7688", "127.0.0.1:7689"};
|
||||
ASSERT_EQ(replicas.first, expected_replicas);
|
||||
|
||||
auto const &routers = routing_table[2];
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
auto const expected_routers = std::vector<std::string>{"127.0.0.1:7690", "127.0.0.1:7691", "127.0.0.1:7692"};
|
||||
ASSERT_EQ(routers.first, expected_routers);
|
||||
}
|
||||
|
@ -1,175 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "interpreter_faker.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "replication_handler/replication_handler.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
|
||||
#include "utils/file.hpp"
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
using memgraph::coordination::CoordinatorInstance;
|
||||
using memgraph::coordination::CoordinatorToCoordinatorConfig;
|
||||
using memgraph::coordination::CoordinatorToReplicaConfig;
|
||||
using memgraph::coordination::RaftState;
|
||||
using memgraph::coordination::ReplicationClientInfo;
|
||||
using memgraph::io::network::Endpoint;
|
||||
using memgraph::replication::ReplicationHandler;
|
||||
using memgraph::replication_coordination_glue::ReplicationMode;
|
||||
using memgraph::storage::Config;
|
||||
|
||||
// class MockCoordinatorInstance : CoordinatorInstance {
|
||||
// auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void override {}
|
||||
// };
|
||||
|
||||
class RoutingTableTest : public ::testing::Test {
|
||||
protected:
|
||||
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_tests_unit_coordinator_cluster_state"};
|
||||
std::filesystem::path repl1_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_test_unit_storage_v2_replication_repl"};
|
||||
std::filesystem::path repl2_data_directory{std::filesystem::temp_directory_path() /
|
||||
"MG_test_unit_storage_v2_replication_repl2"};
|
||||
void SetUp() override { Clear(); }
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
Config main_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, main_data_directory);
|
||||
return config;
|
||||
}();
|
||||
Config repl1_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, repl1_data_directory);
|
||||
return config;
|
||||
}();
|
||||
Config repl2_conf = [&] {
|
||||
Config config{
|
||||
.durability =
|
||||
{
|
||||
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
},
|
||||
.salient.items = {.properties_on_edges = true},
|
||||
};
|
||||
UpdatePaths(config, repl2_data_directory);
|
||||
return config;
|
||||
}();
|
||||
|
||||
const std::string local_host = ("127.0.0.1");
|
||||
const std::array<uint16_t, 2> ports{10000, 20000};
|
||||
const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"};
|
||||
|
||||
private:
|
||||
void Clear() {
|
||||
if (std::filesystem::exists(main_data_directory)) std::filesystem::remove_all(main_data_directory);
|
||||
if (std::filesystem::exists(repl1_data_directory)) std::filesystem::remove_all(repl1_data_directory);
|
||||
if (std::filesystem::exists(repl2_data_directory)) std::filesystem::remove_all(repl2_data_directory);
|
||||
}
|
||||
};
|
||||
|
||||
struct MinMemgraph {
|
||||
MinMemgraph(const memgraph::storage::Config &conf)
|
||||
: auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}},
|
||||
repl_state{ReplicationStateRootPath(conf)},
|
||||
dbms{conf, repl_state
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
auth, true
|
||||
#endif
|
||||
},
|
||||
db_acc{dbms.Get()},
|
||||
db{*db_acc.get()},
|
||||
repl_handler(repl_state, dbms
|
||||
#ifdef MG_ENTERPRISE
|
||||
,
|
||||
system_, auth
|
||||
#endif
|
||||
) {
|
||||
}
|
||||
memgraph::auth::SynchedAuth auth;
|
||||
memgraph::system::System system_;
|
||||
memgraph::replication::ReplicationState repl_state;
|
||||
memgraph::dbms::DbmsHandler dbms;
|
||||
memgraph::dbms::DatabaseAccess db_acc;
|
||||
memgraph::dbms::Database &db;
|
||||
ReplicationHandler repl_handler;
|
||||
};
|
||||
;
|
||||
|
||||
TEST_F(RoutingTableTest, GetSingleRouterRoutingTable) {
|
||||
CoordinatorInstance instance1;
|
||||
auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
|
||||
auto routing_table = instance1.GetRoutingTable(routing);
|
||||
|
||||
ASSERT_EQ(routing_table.size(), 1);
|
||||
|
||||
auto const routers = routing_table[0];
|
||||
ASSERT_EQ(routers.first, std::vector<std::string>{"localhost:7688"});
|
||||
ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
TEST_F(RoutingTableTest, GetMixedRoutingTable) {
|
||||
auto instance1 = RaftState::MakeRaftState([]() {}, []() {});
|
||||
auto routing = std::map<std::string, std::string>{{"address", "localhost:7690"}};
|
||||
instance1.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
|
||||
.instance_name = "instance2",
|
||||
.mgt_server = Endpoint{"127.0.0.1", 10011},
|
||||
.bolt_server = Endpoint{"127.0.0.1", 7687},
|
||||
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
|
||||
.replication_mode = ReplicationMode::ASYNC,
|
||||
.replication_server = Endpoint{"127.0.0.1", 10001}}});
|
||||
// auto routing_table = instance1.GetRoutingTable(routing);
|
||||
|
||||
// ASSERT_EQ(routing_table.size(), 1);
|
||||
// auto const routers = routing_table[0];
|
||||
// ASSERT_EQ(routers.second, "ROUTE");
|
||||
}
|
||||
|
||||
// TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) {
|
||||
//
|
||||
// CoordinatorInstance instance1;
|
||||
// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 1,
|
||||
// .bolt_server = Endpoint{"127.0.0.1", 7689},
|
||||
// .coordinator_server = Endpoint{"127.0.0.1",
|
||||
// 10111}});
|
||||
//
|
||||
// auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
|
||||
// auto routing_table = instance1.GetRoutingTable(routing);
|
||||
//
|
||||
// ASSERT_EQ(routing_table.size(), 1);
|
||||
//
|
||||
// auto const routers = routing_table[0];
|
||||
// ASSERT_EQ(routers.second, "ROUTE");
|
||||
// ASSERT_EQ(routers.first.size(), 2);
|
||||
// auto const expected_routers = std::vector<std::string>{"localhost:7689", "localhost:7688"};
|
||||
// ASSERT_EQ(routers.first, expected_routers);
|
||||
// }
|
Loading…
Reference in New Issue
Block a user