Compare commits

...

10 Commits

Author SHA1 Message Date
Andi Skrgat
4159c9f6a5 Fix rebase 2024-03-25 08:02:49 +01:00
Andi Skrgat
b826a0a518 gmock introduction 2024-03-25 07:41:31 +01:00
Andi Skrgat
8688c7361a Allow non-HA instance in MG_ENTERPRISE 2024-03-25 07:40:48 +01:00
Andi Skrgat
16b3df3b77 Fix launcher.shutdown in NuRaft 2024-03-25 07:40:47 +01:00
Andi Skrgat
5d5c39f862 Add flaky test 2024-03-25 07:39:45 +01:00
Andi Skrgat
03f5cc2f33 CoordinatorInstance testing 2024-03-25 07:37:05 +01:00
Andi Skrgat
ef85e8ae61 Test Raft state 2024-03-25 07:29:45 +01:00
Andi Skrgat
a999110737 Started improving CoordinatorState 2024-03-25 07:22:19 +01:00
Andi Skrgat
fdd49b7983 Add tests for CoordinatorStateMachine 2024-03-25 07:22:19 +01:00
Andi Skrgat
3a0bd33a91 Test CoordClusterState 2024-03-25 07:22:17 +01:00
36 changed files with 1186 additions and 688 deletions

View File

@ -2,7 +2,7 @@ add_library(mg-coordination STATIC)
add_library(mg::coordination ALIAS mg-coordination)
target_sources(mg-coordination
PUBLIC
include/coordination/coordinator_client.hpp
include/coordination/replication_instance_client.hpp
include/coordination/coordinator_state.hpp
include/coordination/coordinator_rpc.hpp
include/coordination/coordinator_server.hpp
@ -12,7 +12,7 @@ target_sources(mg-coordination
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/coordination/replication_instance_connector.hpp
include/coordination/raft_state.hpp
include/coordination/rpc_errors.hpp
@ -24,13 +24,13 @@ target_sources(mg-coordination
PRIVATE
coordinator_communication_config.cpp
coordinator_client.cpp
replication_instance_client.cpp
coordinator_state.cpp
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_handlers.cpp
coordinator_instance.cpp
replication_instance.cpp
replication_instance_connector.cpp
raft_state.cpp
coordinator_log_store.cpp

View File

@ -29,9 +29,29 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
j.at("uuid").get_to(instance_state.instance_uuid);
}
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state) {
j = nlohmann::json{{"config", instance_state.config}};
}
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state) {
j.at("config").get_to(instance_state.config);
}
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
std::vector<CoordinatorInstanceState> coordinators,
utils::UUID const &current_main_uuid, bool is_lock_opened)
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
: repl_instances_{std::move(instances)},
coordinators_{std::move(coordinators)},
current_main_uuid_(current_main_uuid),
is_lock_opened_(is_lock_opened) {}
CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) {
auto c2c_config = CoordinatorToCoordinatorConfig{
.coordinator_id = config.coordinator_id,
.bolt_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.bolt_port)},
.coordinator_server = io::network::Endpoint{"127.0.0.1", static_cast<uint16_t>(config.coordinator_port)}};
coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)});
}
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
: repl_instances_{other.repl_instances_},
@ -88,7 +108,13 @@ auto CoordinatorClusterState::IsCurrentMain(std::string_view instance_name) cons
it->second.instance_uuid == current_main_uuid_;
}
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
auto CoordinatorClusterState::InsertInstance(std::string instance_name, ReplicationInstanceState instance_state)
-> void {
auto lock = std::lock_guard{log_lock_};
repl_instances_.insert_or_assign(std::move(instance_name), std::move(instance_state));
}
auto CoordinatorClusterState::DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void {
auto lock = std::lock_guard{log_lock_};
switch (log_action) {
// end of OPEN_LOCK_REGISTER_REPLICATION_INSTANCE
@ -148,7 +174,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
coordinators_.emplace_back(CoordinatorInstanceState{config});
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_id);
break;
}
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
@ -187,9 +213,11 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto lock = std::shared_lock{log_lock_};
nlohmann::json j = {{"repl_instances", repl_instances_},
{"coord_instances", coordinators_},
{"is_lock_opened", is_lock_opened_},
{"current_main_uuid", current_main_uuid_}};
auto const log = j.dump();
data = buffer::alloc(sizeof(uint32_t) + log.size());
buffer_serializer bs(data);
bs.put_str(log);
@ -198,10 +226,14 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
buffer_serializer bs(data);
auto const j = nlohmann::json::parse(bs.get_str());
auto instances = j["repl_instances"].get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
auto current_main_uuid = j["current_main_uuid"].get<utils::UUID>();
bool is_lock_opened = j["is_lock_opened"].get<int>();
return CoordinatorClusterState{std::move(instances), current_main_uuid, is_lock_opened};
auto repl_instances = j.at("repl_instances").get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
auto current_main_uuid = j.at("current_main_uuid").get<utils::UUID>();
bool is_lock_opened = j.at("is_lock_opened").get<int>();
auto coord_instances = j.at("coord_instances").get<std::vector<CoordinatorInstanceState>>();
return CoordinatorClusterState{std::move(repl_instances), std::move(coord_instances), current_main_uuid,
is_lock_opened};
}
auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {

View File

@ -16,13 +16,13 @@
namespace memgraph::coordination {
void to_json(nlohmann::json &j, CoordinatorToCoordinatorConfig const &config) {
j = nlohmann::json{{"coordinator_server_id", config.coordinator_server_id},
j = nlohmann::json{{"coordinator_id", config.coordinator_id},
{"coordinator_server", config.coordinator_server},
{"bolt_server", config.bolt_server}};
}
void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config) {
config.coordinator_server_id = j.at("coordinator_server_id").get<uint32_t>();
config.coordinator_id = j.at("coordinator_id").get<uint32_t>();
config.coordinator_server = j.at("coordinator_server").get<io::network::Endpoint>();
config.bolt_server = j.at("bolt_server").get<io::network::Endpoint>();
}

View File

@ -27,11 +27,11 @@
namespace memgraph::coordination {
using nuraft::ptr;
using nuraft::srv_config;
CoordinatorInstance::CoordinatorInstance()
CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config)
: thread_pool_{1},
raft_state_(RaftState::MakeRaftState(
config,
[this]() {
spdlog::info("Leader changed, starting all replication instances!");
auto const instances = raft_state_.GetReplicationInstances();
@ -41,8 +41,9 @@ CoordinatorInstance::CoordinatorInstance()
std::ranges::for_each(replicas, [this](auto &replica) {
spdlog::info("Started pinging replication instance {}", replica.config.instance_name);
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
auto client =
std::make_unique<ReplicationInstanceClient>(this, replica.config, client_succ_cb_, client_fail_cb_);
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
});
@ -51,8 +52,9 @@ CoordinatorInstance::CoordinatorInstance()
std::ranges::for_each(main, [this](auto &main_instance) {
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::MainSuccessCallback,
auto client = std::make_unique<ReplicationInstanceClient>(this, main_instance.config, client_succ_cb_,
client_fail_cb_);
repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback);
});
@ -89,9 +91,10 @@ CoordinatorInstance::CoordinatorInstance()
};
}
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name)
-> ReplicationInstanceConnector & {
auto repl_instance =
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstanceConnector const &instance) {
return instance.InstanceName() == replication_instance_name;
});
@ -101,27 +104,27 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
auto const coord_instance_to_status = [](CoordinatorInstanceState const &instance) -> InstanceStatus {
return {.instance_name = fmt::format("coordinator_{}", instance.config.coordinator_id),
.raft_socket_address = instance.config.coordinator_server.SocketAddress(),
.cluster_role = "coordinator",
.health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move
.health = "unknown"};
};
auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status);
auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status);
if (raft_state_.IsLeader()) {
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
auto const stringify_repl_role = [this](ReplicationInstanceConnector const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main";
return "replica";
};
auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string {
auto const stringify_repl_health = [](ReplicationInstanceConnector const &instance) -> std::string {
return instance.IsAlive() ? "up" : "down";
};
auto process_repl_instance_as_leader =
[&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus {
[&stringify_repl_role, &stringify_repl_health](ReplicationInstanceConnector const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.CoordinatorSocketAddress(),
.cluster_role = stringify_repl_role(instance),
@ -160,19 +163,26 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
}
auto CoordinatorInstance::TryFailover() -> void {
auto const is_replica = [this](ReplicationInstance const &instance) {
auto const is_replica = [this](ReplicationInstanceConnector const &instance) {
return HasReplicaState(instance.InstanceName());
};
auto alive_replicas =
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
auto alive_replicas = repl_instances_ | ranges::views::filter(is_replica) |
ranges::views::filter(&ReplicationInstanceConnector::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
if (!raft_state_.RequestLeadership()) {
spdlog::error("Failover failed since the instance is not the leader!");
return;
}
auto const get_ts = [](ReplicationInstanceConnector &replica) {
return replica.GetClient().SendGetInstanceTimestampsRpc();
};
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
@ -206,13 +216,13 @@ auto CoordinatorInstance::TryFailover() -> void {
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
auto const is_not_new_main = [&new_main](ReplicationInstanceConnector &instance) {
return instance.InstanceName() != new_main->InstanceName();
};
auto const new_main_uuid = utils::UUID{};
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
};
@ -225,7 +235,7 @@ auto CoordinatorInstance::TryFailover() -> void {
}
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
@ -268,7 +278,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
}
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
auto const is_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
return instance.InstanceName() == instance_name;
};
@ -287,13 +297,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
auto const is_not_new_main = [&instance_name](ReplicationInstanceConnector const &instance) {
return instance.InstanceName() != instance_name;
};
auto const new_main_uuid = utils::UUID{};
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstanceConnector &instance) {
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
};
@ -304,7 +314,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
}
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::views::transform(&ReplicationInstanceConnector::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
@ -334,19 +344,21 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
return RegisterInstanceCoordinatorStatus::LOCK_OPENED;
}
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
})) {
// TODO: (andi) Change that this is being asked from raft state
if (std::ranges::any_of(repl_instances_,
[instance_name = config.instance_name](ReplicationInstanceConnector const &instance) {
return instance.InstanceName() == instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS;
}
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstanceConnector const &instance) {
return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
@ -361,8 +373,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
return RegisterInstanceCoordinatorStatus::OPEN_LOCK;
}
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
auto client = std::make_unique<ReplicationInstanceClient>(this, config, client_succ_cb_, client_fail_cb_);
auto *new_instance = &repl_instances_.emplace_back(std::move(client), &CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
if (!new_instance->SendDemoteToReplicaRpc()) {
@ -392,7 +405,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
auto const name_matches = [&instance_name](ReplicationInstanceConnector const &instance) {
return instance.InstanceName() == instance_name;
};
@ -401,7 +414,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
auto const is_current_main = [this](ReplicationInstance const &instance) {
auto const is_current_main = [this](ReplicationInstanceConnector const &instance) {
return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive();
};
@ -434,6 +447,8 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
}
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
spdlog::trace("Adding coordinator instance {} start in CoordinatorInstance for {}", config.coordinator_id,
raft_state_.InstanceName());
raft_state_.AddCoordinatorInstance(config);
// NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log.
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
@ -625,56 +640,7 @@ auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const
return raft_state_.HasReplicaState(instance_name);
}
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
auto res = RoutingTable{};
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
return instance.config.BoltSocketAddress();
};
// TODO: (andi) This is wrong check, Fico will correct in #1819.
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
return instance.status == ReplicationRole::MAIN;
};
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
return instance.status == ReplicationRole::REPLICA;
};
auto const &raft_log_repl_instances = raft_state_.GetReplicationInstances();
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
if (!std::ranges::empty(bolt_mains)) {
res.emplace_back(std::move(bolt_mains), "WRITE");
}
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
if (!std::ranges::empty(bolt_replicas)) {
res.emplace_back(std::move(bolt_replicas), "READ");
}
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
return instance.config.bolt_server.SocketAddress();
};
auto const &raft_log_coord_instances = raft_state_.GetCoordinatorInstances();
auto bolt_coords =
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
auto const &local_bolt_coord = routing.find("address");
if (local_bolt_coord == routing.end()) {
throw InvalidRoutingTableException("No bolt address found in routing table for the current coordinator!");
}
bolt_coords.push_back(local_bolt_coord->second);
res.emplace_back(std::move(bolt_coords), "ROUTE");
return res;
}
auto CoordinatorInstance::GetRoutingTable() const -> RoutingTable { return raft_state_.GetRoutingTable(); }
} // namespace memgraph::coordination
#endif

View File

@ -24,21 +24,16 @@
namespace memgraph::coordination {
CoordinatorState::CoordinatorState() {
MG_ASSERT(!(FLAGS_coordinator_id && FLAGS_management_port),
"Instance cannot be a coordinator and have registered coordinator server.");
CoordinatorState::CoordinatorState(CoordinatorInstanceInitConfig const &config) {
data_.emplace<CoordinatorInstance>(config);
}
spdlog::info("Executing coordinator constructor");
if (FLAGS_management_port) {
spdlog::info("Coordinator server port set");
auto const config = ManagementServerConfig{
.ip_address = kDefaultReplicationServerIp,
.port = static_cast<uint16_t>(FLAGS_management_port),
};
spdlog::info("Executing coordinator constructor main replica");
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
}
CoordinatorState::CoordinatorState(ReplicationInstanceInitConfig const &config) {
auto const mgmt_config = ManagementServerConfig{
.ip_address = kDefaultReplicationServerIp,
.port = static_cast<uint16_t>(config.management_port),
};
data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(mgmt_config)};
}
auto CoordinatorState::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
@ -104,10 +99,10 @@ auto CoordinatorState::AddCoordinatorInstance(coordination::CoordinatorToCoordin
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(config);
}
auto CoordinatorState::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
auto CoordinatorState::GetRoutingTable() -> RoutingTable {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot get routing table since variant holds wrong alternative");
return std::get<CoordinatorInstance>(data_).GetRoutingTable(routing);
return std::get<CoordinatorInstance>(data_).GetRoutingTable();
}
} // namespace memgraph::coordination

View File

@ -20,6 +20,9 @@ constexpr int MAX_SNAPSHOTS = 3;
namespace memgraph::coordination {
CoordinatorStateMachine::CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config)
: cluster_state_(config) {}
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool {

View File

@ -30,6 +30,16 @@ namespace memgraph::coordination {
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
struct ReplicationInstanceInitConfig {
int management_port{0};
};
struct CoordinatorInstanceInitConfig {
uint32_t coordinator_id{0};
int coordinator_port{0};
int bolt_port{0};
};
struct ReplicationClientInfo {
std::string instance_name{};
replication_coordination_glue::ReplicationMode replication_mode{};
@ -66,7 +76,7 @@ struct CoordinatorToReplicaConfig {
};
struct CoordinatorToCoordinatorConfig {
uint32_t coordinator_server_id{0};
uint32_t coordinator_id{0};
io::network::Endpoint bolt_server;
io::network::Endpoint coordinator_server;

View File

@ -17,7 +17,7 @@
#include "coordination/instance_status.hpp"
#include "coordination/raft_state.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "coordination/replication_instance_connector.hpp"
#include "utils/resource_lock.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
@ -26,8 +26,6 @@
namespace memgraph::coordination {
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
struct NewMainRes {
std::string most_up_to_date_instance;
std::string latest_epoch;
@ -37,7 +35,7 @@ using InstanceNameDbHistories = std::pair<std::string, replication_coordination_
class CoordinatorInstance {
public:
CoordinatorInstance();
explicit CoordinatorInstance(CoordinatorInstanceInitConfig const &config);
CoordinatorInstance(CoordinatorInstance const &) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &) = delete;
CoordinatorInstance(CoordinatorInstance &&) noexcept = delete;
@ -56,9 +54,9 @@ class CoordinatorInstance {
auto TryFailover() -> void;
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void;
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable;
auto GetRoutingTable() const -> RoutingTable;
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
@ -67,7 +65,7 @@ class CoordinatorInstance {
auto HasReplicaState(std::string_view instance_name) const -> bool;
private:
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstanceConnector &;
void MainFailCallback(std::string_view);
@ -78,8 +76,10 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view);
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability.
std::list<ReplicationInstance> repl_instances_;
// TODO: (andi) Rename + virtualize for mocking.
std::list<ReplicationInstanceConnector> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{};
// Thread pool needs to be constructed before raft state as raft state can call thread pool

View File

@ -24,7 +24,8 @@ namespace memgraph::coordination {
class CoordinatorState {
public:
CoordinatorState();
explicit CoordinatorState(CoordinatorInstanceInitConfig const &config);
explicit CoordinatorState(ReplicationInstanceInitConfig const &config);
~CoordinatorState() = default;
CoordinatorState(CoordinatorState const &) = delete;
@ -47,14 +48,14 @@ class CoordinatorState {
// NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
auto GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable;
auto GetRoutingTable() -> RoutingTable;
private:
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
std::variant<CoordinatorMainReplicaData, CoordinatorInstance> data_;
};
} // namespace memgraph::coordination

View File

@ -27,6 +27,7 @@ struct CoordinatorToReplicaConfig;
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
using RoutingTable = std::vector<std::pair<std::vector<std::string>, std::string>>;
using nuraft::buffer;
using nuraft::logger;
@ -40,8 +41,8 @@ using raft_result = nuraft::cmd_result<ptr<buffer>>;
class RaftState {
private:
explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
uint32_t raft_port, std::string raft_address);
explicit RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
BecomeFollowerCb become_follower_cb);
auto InitRaftServer() -> void;
@ -53,13 +54,13 @@ class RaftState {
RaftState &operator=(RaftState &&other) noexcept = default;
~RaftState();
static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState;
static auto MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
BecomeFollowerCb &&become_follower_cb) -> RaftState;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
@ -68,6 +69,7 @@ class RaftState {
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool;
@ -76,9 +78,9 @@ class RaftState {
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
// TODO: (andi) Do we need then GetAllCoordinators?
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
auto MainExists() const -> bool;
@ -90,17 +92,17 @@ class RaftState {
auto GetInstanceUUID(std::string_view) const -> utils::UUID;
auto IsLockOpened() const -> bool;
auto GetRoutingTable() const -> RoutingTable;
private:
// TODO: (andi) I think variables below can be abstracted/clean them.
io::network::Endpoint raft_endpoint_;
uint32_t coordinator_id_;
ptr<CoordinatorStateMachine> state_machine_;
ptr<CoordinatorStateManager> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
ptr<raft_server> raft_server_;
BecomeLeaderCb become_leader_cb_;
BecomeFollowerCb become_follower_cb_;

View File

@ -27,18 +27,19 @@ class CoordinatorInstance;
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplicationClientInfo>;
class CoordinatorClient {
class ReplicationInstanceClient {
public:
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
explicit ReplicationInstanceClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
HealthCheckClientCallback succ_cb = nullptr,
HealthCheckClientCallback fail_cb = nullptr);
~CoordinatorClient() = default;
virtual ~ReplicationInstanceClient() = default;
CoordinatorClient(CoordinatorClient &) = delete;
CoordinatorClient &operator=(CoordinatorClient const &) = delete;
ReplicationInstanceClient(ReplicationInstanceClient &) = delete;
ReplicationInstanceClient &operator=(ReplicationInstanceClient const &) = delete;
CoordinatorClient(CoordinatorClient &&) noexcept = delete;
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
ReplicationInstanceClient(ReplicationInstanceClient &&) noexcept = delete;
ReplicationInstanceClient &operator=(ReplicationInstanceClient &&) noexcept = delete;
void StartFrequentCheck();
void StopFrequentCheck();
@ -49,7 +50,7 @@ class CoordinatorClient {
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
[[nodiscard]] auto DemoteToReplica() const -> bool;
virtual auto DemoteToReplica() const -> bool;
auto SendPromoteReplicaToMainRpc(utils::UUID const &uuid, ReplicationClientsInfo replication_clients_info) const
-> bool;
@ -73,7 +74,7 @@ class CoordinatorClient {
auto InstanceGetUUIDFrequencySec() const -> std::chrono::seconds;
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
friend bool operator==(ReplicationInstanceClient const &first, ReplicationInstanceClient const &second) {
return first.config_ == second.config_;
}

View File

@ -13,8 +13,8 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/replication_instance_client.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/resource_lock.hpp"
@ -25,31 +25,27 @@
namespace memgraph::coordination {
class CoordinatorInstance;
class ReplicationInstance;
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
class ReplicationInstance {
class ReplicationInstanceConnector {
public:
ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config, HealthCheckClientCallback succ_cb,
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb);
explicit ReplicationInstanceConnector(std::unique_ptr<ReplicationInstanceClient> client,
HealthCheckInstanceCallback succ_instance_cb = nullptr,
HealthCheckInstanceCallback fail_instance_cb = nullptr);
ReplicationInstance(ReplicationInstance const &other) = delete;
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
ReplicationInstance(ReplicationInstance &&other) noexcept = delete;
ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete;
~ReplicationInstance() = default;
ReplicationInstanceConnector(ReplicationInstanceConnector const &other) = delete;
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector const &other) = delete;
ReplicationInstanceConnector(ReplicationInstanceConnector &&other) noexcept = delete;
ReplicationInstanceConnector &operator=(ReplicationInstanceConnector &&other) noexcept = delete;
~ReplicationInstanceConnector() = default;
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto IsReadyForUUIDPing() -> bool;
void UpdateReplicaLastResponseUUID();
auto IsAlive() const -> bool;
// TODO: (andi) Fetch from ClusterState
auto InstanceName() const -> std::string;
auto CoordinatorSocketAddress() const -> std::string;
auto ReplicationSocketAddress() const -> std::string;
@ -75,15 +71,16 @@ class ReplicationInstance {
auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool;
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
auto GetClient() -> CoordinatorClient &;
auto GetClient() -> ReplicationInstanceClient &;
auto EnableWritingOnMain() -> bool;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
private:
CoordinatorClient client_;
protected:
auto UpdateReplicaLastResponseUUID() -> void;
std::unique_ptr<ReplicationInstanceClient> client_;
std::chrono::system_clock::time_point last_response_time_{};
bool is_alive_{false};
std::chrono::system_clock::time_point last_check_of_uuid_{};
@ -91,7 +88,7 @@ class ReplicationInstance {
HealthCheckInstanceCallback succ_cb_;
HealthCheckInstanceCallback fail_cb_;
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
friend bool operator==(ReplicationInstanceConnector const &first, ReplicationInstanceConnector const &second) {
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
first.is_alive_ == second.is_alive_;
}

View File

@ -30,6 +30,9 @@
namespace memgraph::coordination {
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
using replication_coordination_glue::ReplicationRole;
struct ReplicationInstanceState {
@ -48,7 +51,7 @@ struct ReplicationInstanceState {
}
};
// NOTE: Currently instance of coordinator doesn't change from the registration. Hence, just wrap
// NOTE: Currently coordinator instance doesn't change from the registration. Hence, just wraps
// CoordinatorToCoordinatorConfig.
struct CoordinatorInstanceState {
CoordinatorToCoordinatorConfig config;
@ -64,14 +67,16 @@ void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig,
InstanceUUIDUpdate>;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
void to_json(nlohmann::json &j, CoordinatorInstanceState const &instance_state);
void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state);
// Represents the state of the cluster from the coordinator's perspective.
// Source of truth since it is modified only as the result of RAFT's commiting
class CoordinatorClusterState {
public:
CoordinatorClusterState() = default;
explicit CoordinatorClusterState(CoordinatorInstanceInitConfig const &config);
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
std::vector<CoordinatorInstanceState> coordinators,
utils::UUID const &current_main_uuid, bool is_lock_opened);
CoordinatorClusterState(CoordinatorClusterState const &);
@ -89,7 +94,9 @@ class CoordinatorClusterState {
auto IsCurrentMain(std::string_view instance_name) const -> bool;
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
auto InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) -> void;
auto DoAction(TRaftLog const &log_entry, RaftLogAction log_action) -> void;
auto Serialize(ptr<buffer> &data) -> void;
@ -105,12 +112,17 @@ class CoordinatorClusterState {
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
friend auto operator==(CoordinatorClusterState const &lhs, CoordinatorClusterState const &rhs) -> bool {
return lhs.repl_instances_ == rhs.repl_instances_ && lhs.coordinators_ == rhs.coordinators_ &&
lhs.current_main_uuid_ == rhs.current_main_uuid_;
}
private:
std::vector<CoordinatorInstanceState> coordinators_{};
std::map<std::string, ReplicationInstanceState, std::less<>> repl_instances_{};
std::vector<CoordinatorInstanceState> coordinators_{};
utils::UUID current_main_uuid_{};
mutable utils::ResourceLock log_lock_{};
bool is_lock_opened_{false};
mutable utils::ResourceLock log_lock_{};
};
} // namespace memgraph::coordination

View File

@ -35,7 +35,7 @@ using nuraft::state_machine;
class CoordinatorStateMachine : public state_machine {
public:
CoordinatorStateMachine() = default;
explicit CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config);
CoordinatorStateMachine(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;

View File

@ -10,12 +10,16 @@
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include <chrono>
#include "coordination/raft_state.hpp"
#include "coordination/coordinator_communication_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/raft_state.hpp"
#include "utils/counter.hpp"
#include "utils/logging.hpp"
#include <spdlog/spdlog.h>
#include <chrono>
namespace memgraph::coordination {
@ -30,11 +34,11 @@ using nuraft::raft_server;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id,
uint32_t raft_port, std::string raft_address)
: raft_endpoint_(raft_address, raft_port),
coordinator_id_(coordinator_id),
state_machine_(cs_new<CoordinatorStateMachine>()),
RaftState::RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb,
BecomeFollowerCb become_follower_cb)
: raft_endpoint_("127.0.0.1", config.coordinator_port),
coordinator_id_(config.coordinator_id),
state_machine_(cs_new<CoordinatorStateMachine>(config)),
state_manager_(cs_new<CoordinatorStateManager>(coordinator_id_, raft_endpoint_.SocketAddress())),
logger_(nullptr),
become_leader_cb_(std::move(become_leader_cb)),
@ -97,28 +101,36 @@ auto RaftState::InitRaftServer() -> void {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint_.SocketAddress());
}
auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState {
uint32_t coordinator_id = FLAGS_coordinator_id;
uint32_t raft_port = FLAGS_coordinator_port;
auto raft_state =
RaftState(std::move(become_leader_cb), std::move(become_follower_cb), coordinator_id, raft_port, "127.0.0.1");
auto RaftState::MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb,
BecomeFollowerCb &&become_follower_cb) -> RaftState {
auto raft_state = RaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
raft_state.InitRaftServer();
return raft_state;
}
RaftState::~RaftState() { launcher_.shutdown(); }
RaftState::~RaftState() {
spdlog::trace("Shutting down RaftState for coordinator_{}", coordinator_id_);
state_machine_.reset();
state_manager_.reset();
logger_.reset();
auto RaftState::InstanceName() const -> std::string {
return fmt::format("coordinator_{}", std::to_string(coordinator_id_));
if (!raft_server_) {
return;
}
raft_server_->shutdown();
raft_server_.reset();
}
auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); }
auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); }
auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
spdlog::trace("Adding coordinator instance {} start in RaftState for coordinator_{}", config.coordinator_id,
coordinator_id_);
auto const endpoint = config.coordinator_server.SocketAddress();
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_server_id), endpoint);
srv_config const srv_config_to_add(static_cast<int>(config.coordinator_id), endpoint);
auto cmd_result = raft_server_->add_srv(srv_config_to_add);
@ -136,9 +148,9 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
bool added{false};
while (!maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(waiting_period));
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_server_id));
const auto server_config = raft_server_->get_srv_config(static_cast<nuraft::int32>(config.coordinator_id));
if (server_config) {
spdlog::trace("Server with id {} added to cluster", config.coordinator_server_id);
spdlog::trace("Server with id {} added to cluster", config.coordinator_id);
added = true;
break;
}
@ -150,12 +162,6 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon
}
}
auto RaftState::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
@ -363,14 +369,14 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
spdlog::error(
"Failed to accept request for adding coordinator instance {}. Most likely the reason is that the instance is "
"not the leader.",
config.coordinator_server_id);
config.coordinator_id);
return false;
}
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_server_id);
spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_id);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_server_id,
spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_id,
static_cast<int>(res->get_result_code()));
return false;
}
@ -426,5 +432,50 @@ auto RaftState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstan
return state_machine_->GetCoordinatorInstances();
}
auto RaftState::GetRoutingTable() const -> RoutingTable {
auto res = RoutingTable{};
auto const repl_instance_to_bolt = [](ReplicationInstanceState const &instance) {
return instance.config.BoltSocketAddress();
};
// TODO: (andi) This is wrong check, Fico will correct in #1819.
auto const is_instance_main = [&](ReplicationInstanceState const &instance) {
return instance.status == ReplicationRole::MAIN;
};
auto const is_instance_replica = [&](ReplicationInstanceState const &instance) {
return instance.status == ReplicationRole::REPLICA;
};
auto const &raft_log_repl_instances = GetReplicationInstances();
auto bolt_mains = raft_log_repl_instances | ranges::views::filter(is_instance_main) |
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
MG_ASSERT(bolt_mains.size() <= 1, "There can be at most one main instance active!");
if (!std::ranges::empty(bolt_mains)) {
res.emplace_back(std::move(bolt_mains), "WRITE");
}
auto bolt_replicas = raft_log_repl_instances | ranges::views::filter(is_instance_replica) |
ranges::views::transform(repl_instance_to_bolt) | ranges::to<std::vector>();
if (!std::ranges::empty(bolt_replicas)) {
res.emplace_back(std::move(bolt_replicas), "READ");
}
auto const coord_instance_to_bolt = [](CoordinatorInstanceState const &instance) {
return instance.config.bolt_server.SocketAddress();
};
auto const &raft_log_coord_instances = GetCoordinatorInstances();
auto bolt_coords =
raft_log_coord_instances | ranges::views::transform(coord_instance_to_bolt) | ranges::to<std::vector>();
res.emplace_back(std::move(bolt_coords), "ROUTE");
return res;
}
} // namespace memgraph::coordination
#endif

View File

@ -1,132 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/replication_instance.hpp"
#include <utility>
#include "replication_coordination_glue/handler.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorToReplicaConfig config,
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb)
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {}
auto ReplicationInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto ReplicationInstance::OnFailPing() -> bool {
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec();
return is_alive_;
}
auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_) >
client_.InstanceGetUUIDFrequencySec();
}
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); }
auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
return true;
}
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {
return false;
}
succ_cb_ = replica_succ_cb;
fail_cb_ = replica_fail_cb;
return true;
}
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto ReplicationInstance::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
if (!IsReadyForUUIDPing()) {
return true;
}
auto res = SendGetInstanceUUID();
if (res.HasError()) {
return false;
}
UpdateReplicaLastResponseUUID();
// NOLINTNEXTLINE
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
return true;
}
return SendSwapAndUpdateUUID(curr_main_uuid);
}
auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
return false;
}
return true;
}
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
return client_.SendUnregisterReplicaRpc(instance_name);
}
auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); }
auto ReplicationInstance::SendGetInstanceUUID()
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
return client_.SendGetInstanceUUIDRpc();
}
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
} // namespace memgraph::coordination
#endif

View File

@ -12,7 +12,7 @@
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/replication_instance_client.hpp"
#include "coordination/coordinator_communication_config.hpp"
#include "coordination/coordinator_rpc.hpp"
@ -30,8 +30,10 @@ auto CreateClientContext(memgraph::coordination::CoordinatorToReplicaConfig cons
}
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config,
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
ReplicationInstanceClient::ReplicationInstanceClient(CoordinatorInstance *coord_instance,
CoordinatorToReplicaConfig config,
HealthCheckClientCallback succ_cb,
HealthCheckClientCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{config.mgt_server, &rpc_context_},
config_{std::move(config)},
@ -39,20 +41,24 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
succ_cb_{std::move(succ_cb)},
fail_cb_{std::move(fail_cb)} {}
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
auto ReplicationInstanceClient::InstanceName() const -> std::string { return config_.instance_name; }
auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); }
auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); }
auto ReplicationInstanceClient::CoordinatorSocketAddress() const -> std::string {
return config_.CoordinatorSocketAddress();
}
auto ReplicationInstanceClient::ReplicationSocketAddress() const -> std::string {
return config_.ReplicationSocketAddress();
}
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
auto ReplicationInstanceClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
return config_.instance_down_timeout_sec;
}
auto CoordinatorClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
auto ReplicationInstanceClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds {
return config_.instance_get_uuid_frequency_sec;
}
void CoordinatorClient::StartFrequentCheck() {
void ReplicationInstanceClient::StartFrequentCheck() {
if (instance_checker_.IsRunning()) {
return;
}
@ -81,16 +87,17 @@ void CoordinatorClient::StartFrequentCheck() {
});
}
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
void ReplicationInstanceClient::StopFrequentCheck() { instance_checker_.Stop(); }
void ReplicationInstanceClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void ReplicationInstanceClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
auto ReplicationInstanceClient::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
return config_.replication_client_info;
}
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
ReplicationClientsInfo replication_clients_info) const -> bool {
auto ReplicationInstanceClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
ReplicationClientsInfo replication_clients_info) const
-> bool {
try {
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(uuid, std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) {
@ -104,7 +111,7 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
return false;
}
auto CoordinatorClient::DemoteToReplica() const -> bool {
auto ReplicationInstanceClient::DemoteToReplica() const -> bool {
auto const &instance_name = config_.instance_name;
try {
auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)};
@ -120,7 +127,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
return false;
}
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
auto ReplicationInstanceClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
try {
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};
if (!stream.AwaitResponse().success) {
@ -134,7 +141,7 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bo
return false;
}
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
auto ReplicationInstanceClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool {
try {
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)};
if (!stream.AwaitResponse().success) {
@ -148,7 +155,7 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name)
return false;
}
auto CoordinatorClient::SendGetInstanceUUIDRpc() const
auto ReplicationInstanceClient::SendGetInstanceUUIDRpc() const
-> utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>> {
try {
auto stream{rpc_client_.Stream<GetInstanceUUIDRpc>()};
@ -160,7 +167,7 @@ auto CoordinatorClient::SendGetInstanceUUIDRpc() const
}
}
auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
auto ReplicationInstanceClient::SendEnableWritingOnMainRpc() const -> bool {
try {
auto stream{rpc_client_.Stream<EnableWritingOnMainRpc>()};
if (!stream.AwaitResponse().success) {
@ -174,7 +181,7 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
return false;
}
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
auto ReplicationInstanceClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
try {
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};

View File

@ -0,0 +1,132 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/replication_instance_connector.hpp"
#include <utility>
#include "replication_coordination_glue/handler.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
ReplicationInstanceConnector::ReplicationInstanceConnector(std::unique_ptr<ReplicationInstanceClient> client,
HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb)
: client_(std::move(client)), succ_cb_(succ_instance_cb), fail_cb_(fail_instance_cb) {}
void ReplicationInstanceConnector::OnSuccessPing() {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto ReplicationInstanceConnector::OnFailPing() -> bool {
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
is_alive_ = elapsed_time < client_->InstanceDownTimeoutSec();
return is_alive_;
}
auto ReplicationInstanceConnector::IsReadyForUUIDPing() -> bool {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_) >
client_->InstanceGetUUIDFrequencySec();
}
auto ReplicationInstanceConnector::InstanceName() const -> std::string { return client_->InstanceName(); }
auto ReplicationInstanceConnector::CoordinatorSocketAddress() const -> std::string {
return client_->CoordinatorSocketAddress();
}
auto ReplicationInstanceConnector::ReplicationSocketAddress() const -> std::string {
return client_->ReplicationSocketAddress();
}
auto ReplicationInstanceConnector::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstanceConnector::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_->SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
return true;
}
auto ReplicationInstanceConnector::SendDemoteToReplicaRpc() -> bool { return client_->DemoteToReplica(); }
auto ReplicationInstanceConnector::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_->DemoteToReplica()) {
return false;
}
succ_cb_ = replica_succ_cb;
fail_cb_ = replica_fail_cb;
return true;
}
auto ReplicationInstanceConnector::StartFrequentCheck() -> void { client_->StartFrequentCheck(); }
auto ReplicationInstanceConnector::StopFrequentCheck() -> void { client_->StopFrequentCheck(); }
auto ReplicationInstanceConnector::PauseFrequentCheck() -> void { client_->PauseFrequentCheck(); }
auto ReplicationInstanceConnector::ResumeFrequentCheck() -> void { client_->ResumeFrequentCheck(); }
auto ReplicationInstanceConnector::ReplicationClientInfo() const -> coordination::ReplicationClientInfo {
return client_->ReplicationClientInfo();
}
auto ReplicationInstanceConnector::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
auto ReplicationInstanceConnector::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
auto ReplicationInstanceConnector::GetClient() -> ReplicationInstanceClient & { return *client_; }
auto ReplicationInstanceConnector::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
if (!IsReadyForUUIDPing()) {
return true;
}
auto res = SendGetInstanceUUID();
if (res.HasError()) {
return false;
}
UpdateReplicaLastResponseUUID();
// NOLINTNEXTLINE
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
return true;
}
return SendSwapAndUpdateUUID(curr_main_uuid);
}
auto ReplicationInstanceConnector::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool {
return replication_coordination_glue::SendSwapMainUUIDRpc(client_->RpcClient(), new_main_uuid);
}
auto ReplicationInstanceConnector::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool {
return client_->SendUnregisterReplicaRpc(instance_name);
}
auto ReplicationInstanceConnector::EnableWritingOnMain() -> bool { return client_->SendEnableWritingOnMainRpc(); }
auto ReplicationInstanceConnector::SendGetInstanceUUID()
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
return client_->SendGetInstanceUUIDRpc();
}
void ReplicationInstanceConnector::UpdateReplicaLastResponseUUID() {
last_check_of_uuid_ = std::chrono::system_clock::now();
}
} // namespace memgraph::coordination
#endif

View File

@ -10,12 +10,17 @@
// licenses/APL.txt.
#include "replication.hpp"
#include "utils/flag_validation.hpp"
#include <limits>
#ifdef MG_ENTERPRISE
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(management_port, 0, "Port on which coordinator servers will be started.");
DEFINE_VALIDATED_int32(management_port, 0, "Port on which coordinator servers will be started.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(coordinator_port, 0, "Port on which raft servers will be started.");
DEFINE_VALIDATED_int32(coordinator_port, 0, "Port on which raft servers will be started.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(coordinator_id, 0, "Unique ID of the raft server.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -15,9 +15,9 @@
#ifdef MG_ENTERPRISE
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(management_port);
DECLARE_int32(management_port);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(coordinator_port);
DECLARE_int32(coordinator_port);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(coordinator_id);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -407,7 +407,24 @@ int main(int argc, char **argv) {
// singleton coordinator state
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState coordinator_state;
using memgraph::coordination::CoordinatorInstanceInitConfig;
using memgraph::coordination::CoordinatorState;
using memgraph::coordination::ReplicationInstanceInitConfig;
std::optional<CoordinatorState> coordinator_state{std::nullopt};
if (FLAGS_management_port && (FLAGS_coordinator_id || FLAGS_coordinator_port)) {
throw std::runtime_error(
"Coordinator cannot be started with both coordinator_id/port and management_port. Specify coordinator_id and "
"port for coordinator instance and management port for replication instance.");
}
if (FLAGS_coordinator_id && FLAGS_coordinator_port) {
coordinator_state.emplace(CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id,
.coordinator_port = FLAGS_coordinator_port,
.bolt_port = FLAGS_bolt_port});
}
if (FLAGS_management_port) {
coordinator_state.emplace(ReplicationInstanceInitConfig{.management_port = FLAGS_management_port});
}
#endif
memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state
@ -430,19 +447,20 @@ int main(int argc, char **argv) {
#ifdef MG_ENTERPRISE
// MAIN or REPLICA instance
if (FLAGS_management_port) {
memgraph::dbms::CoordinatorHandlers::Register(coordinator_state.GetCoordinatorServer(), replication_handler);
MG_ASSERT(coordinator_state.GetCoordinatorServer().Start(), "Failed to start coordinator server!");
memgraph::dbms::CoordinatorHandlers::Register(coordinator_state->GetCoordinatorServer(), replication_handler);
MG_ASSERT(coordinator_state->GetCoordinatorServer().Start(), "Failed to start coordinator server!");
}
#endif
auto db_acc = dbms_handler.Get();
memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, system,
memgraph::query::InterpreterContext interpreter_context_(
interp_config, &dbms_handler, &repl_state, system,
#ifdef MG_ENTERPRISE
&coordinator_state,
coordinator_state ? std::optional<std::reference_wrapper<CoordinatorState>>{std::ref(*coordinator_state)}
: std::nullopt,
#endif
auth_handler.get(), auth_checker.get(),
&replication_handler);
auth_handler.get(), auth_checker.get(), &replication_handler);
MG_ASSERT(db_acc, "Failed to access the main database");
memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(),

View File

@ -3178,7 +3178,7 @@ class CoordinatorQuery : public memgraph::query::Query {
memgraph::query::CoordinatorQuery::Action action_;
std::string instance_name_{};
std::unordered_map<memgraph::query::Expression *, memgraph::query::Expression *> configs_;
memgraph::query::Expression *coordinator_server_id_{nullptr};
memgraph::query::Expression *coordinator_id_{nullptr};
memgraph::query::CoordinatorQuery::SyncMode sync_mode_;
CoordinatorQuery *Clone(AstStorage *storage) const override {
@ -3186,7 +3186,7 @@ class CoordinatorQuery : public memgraph::query::Query {
object->action_ = action_;
object->instance_name_ = instance_name_;
object->coordinator_server_id_ = coordinator_server_id_ ? coordinator_server_id_->Clone(storage) : nullptr;
object->coordinator_id_ = coordinator_id_ ? coordinator_id_->Clone(storage) : nullptr;
object->sync_mode_ = sync_mode_;
for (const auto &[key, value] : configs_) {
object->configs_[key->Clone(storage)] = value->Clone(storage);

View File

@ -447,7 +447,7 @@ antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::Add
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE;
coordinator_query->coordinator_server_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
coordinator_query->coordinator_id_ = std::any_cast<Expression *>(ctx->coordinatorServerId()->accept(this));
coordinator_query->configs_ =
std::any_cast<std::unordered_map<Expression *, Expression *>>(ctx->configsMap->accept(this));

View File

@ -498,7 +498,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
auto const coord_coord_config =
coordination::CoordinatorToCoordinatorConfig{.coordinator_server_id = coordinator_id,
coordination::CoordinatorToCoordinatorConfig{.coordinator_id = coordinator_id,
.bolt_server = *maybe_bolt_server,
.coordinator_server = *maybe_coordinator_server};
@ -1224,7 +1224,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer);
}
auto coord_server_id = coordinator_query->coordinator_server_id_->Accept(evaluator).ValueInt();
auto coord_server_id = coordinator_query->coordinator_id_->Accept(evaluator).ValueInt();
callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coord_server_id,
bolt_server = bolt_server_it->second,
@ -4299,8 +4299,10 @@ void Interpreter::RollbackTransaction() {
#ifdef MG_ENTERPRISE
auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> RouteResult {
// TODO: (andi) Test
if (!FLAGS_coordinator_id) {
if (!interpreter_context_->coordinator_state_) {
throw QueryException("You cannot fetch routing table from an instance which is not part of a cluster.");
}
if (FLAGS_management_port) {
auto const &address = routing.find("address");
if (address == routing.end()) {
throw QueryException("Routing table must contain address field.");
@ -4315,7 +4317,7 @@ auto Interpreter::Route(std::map<std::string, std::string> const &routing) -> Ro
return result;
}
return RouteResult{.servers = interpreter_context_->coordinator_state_->GetRoutingTable(routing)};
return RouteResult{.servers = interpreter_context_->coordinator_state_->get().GetRoutingTable()};
}
#endif

View File

@ -15,18 +15,18 @@
#include "system/include/system/system.hpp"
namespace memgraph::query {
InterpreterContext::InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
InterpreterContext::InterpreterContext(
InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler, replication::ReplicationState *rs,
memgraph::system::System &system,
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state,
std::optional<std::reference_wrapper<memgraph::coordination::CoordinatorState>> const &coordinator_state,
#endif
AuthQueryHandler *ah, AuthChecker *ac,
ReplicationQueryHandler *replication_handler)
AuthQueryHandler *ah, AuthChecker *ac, ReplicationQueryHandler *replication_handler)
: dbms_handler(dbms_handler),
config(interpreter_config),
repl_state(rs),
#ifdef MG_ENTERPRISE
coordinator_state_{coordinator_state},
coordinator_state_(coordinator_state),
#endif
auth(ah),
auth_checker(ac),

View File

@ -57,7 +57,7 @@ struct InterpreterContext {
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state,
std::optional<std::reference_wrapper<coordination::CoordinatorState>> const &coordinator_state,
#endif
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
ReplicationQueryHandler *replication_handler = nullptr);
@ -72,7 +72,7 @@ struct InterpreterContext {
// GLOBAL
memgraph::replication::ReplicationState *repl_state;
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state_;
std::optional<std::reference_wrapper<coordination::CoordinatorState>> coordinator_state_;
#endif
AuthQueryHandler *auth;

View File

@ -47,5 +47,5 @@ func main() {
read_messages("neo4j://localhost:7690") // coordinator_1
read_messages("neo4j://localhost:7691") // coordinator_2
read_messages("neo4j://localhost:7692") // coordinator_3
fmt.Println("Successfully finished running coordinator_route.go test")
fmt.Println("Successfully finished running read_route.go test")
}

View File

@ -453,9 +453,30 @@ target_link_libraries(${test_prefix}coordinator_cluster_state gflags mg-coordina
target_include_directories(${test_prefix}coordinator_cluster_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()
# Test Raft log serialization
# Test coordinator state machine
if(MG_ENTERPRISE)
add_unit_test(routing_table.cpp)
target_link_libraries(${test_prefix}routing_table gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}routing_table PRIVATE ${CMAKE_SOURCE_DIR}/include)
add_unit_test(coordinator_state_machine.cpp)
target_link_libraries(${test_prefix}coordinator_state_machine gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordinator_state_machine PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()
# Test RAFT state
if(MG_ENTERPRISE)
add_unit_test(raft_state.cpp)
target_link_libraries(${test_prefix}raft_state gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}raft_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()
# Test coordinator instance
if(MG_ENTERPRISE)
add_unit_test(coordinator_instance.cpp)
target_link_libraries(${test_prefix}coordinator_instance gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordinator_instance PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()
# Test replication instance connector
if(MG_ENTERPRISE)
add_unit_test(replication_instance_connector.cpp)
target_link_libraries(${test_prefix}replication_instance_connector gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}replication_instance_connector PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()

View File

@ -16,6 +16,8 @@
#include "replication_coordination_glue/common.hpp"
#include "utils/functional.hpp"
using memgraph::coordination::CoordinatorInstanceInitConfig;
class CoordinationUtils : public ::testing::Test {
protected:
void SetUp() override {}
@ -60,7 +62,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
memgraph::coordination::CoordinatorInstance instance{init_config1};
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
@ -112,7 +117,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
memgraph::coordination::CoordinatorInstance instance{init_config1};
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
@ -167,7 +174,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
memgraph::coordination::CoordinatorInstance instance{init_config1};
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
@ -226,7 +235,9 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
memgraph::coordination::CoordinatorInstance instance{init_config1};
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);

View File

@ -11,10 +11,9 @@
#include "nuraft/coordinator_cluster_state.hpp"
#include "io/network/endpoint.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/file.hpp"
#include "utils/uuid.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
@ -23,15 +22,17 @@
#include "libnuraft/nuraft.hxx"
using memgraph::coordination::CoordinatorClusterState;
using memgraph::coordination::CoordinatorStateMachine;
using memgraph::coordination::CoordinatorInstanceInitConfig;
using memgraph::coordination::CoordinatorInstanceState;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::RaftLogAction;
using memgraph::coordination::ReplicationInstanceState;
using memgraph::io::network::Endpoint;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::replication_coordination_glue::ReplicationRole;
using memgraph::utils::UUID;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
class CoordinatorClusterStateTest : public ::testing::Test {
@ -44,6 +45,174 @@ class CoordinatorClusterStateTest : public ::testing::Test {
"MG_tests_unit_coordinator_cluster_state"};
};
TEST_F(CoordinatorClusterStateTest, RegisterReplicationInstance) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
auto instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(instances.size(), 1);
ASSERT_EQ(instances[0].config, config);
ASSERT_EQ(instances[0].status, ReplicationRole::REPLICA);
ASSERT_EQ(cluster_state.GetCoordinatorInstances().size(), 1);
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
}
TEST_F(CoordinatorClusterStateTest, UnregisterReplicationInstance) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
cluster_state.DoAction("instance3", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE);
ASSERT_EQ(cluster_state.GetReplicationInstances().size(), 0);
}
TEST_F(CoordinatorClusterStateTest, SetInstanceToMain) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
auto const repl_instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(repl_instances.size(), 2);
ASSERT_EQ(repl_instances[0].status, ReplicationRole::REPLICA);
ASSERT_EQ(repl_instances[1].status, ReplicationRole::MAIN);
ASSERT_TRUE(cluster_state.MainExists());
ASSERT_TRUE(cluster_state.IsMain("instance3"));
ASSERT_FALSE(cluster_state.IsMain("instance2"));
ASSERT_TRUE(cluster_state.IsReplica("instance2"));
ASSERT_FALSE(cluster_state.IsReplica("instance3"));
}
TEST_F(CoordinatorClusterStateTest, SetInstanceToReplica) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
}
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_MAIN);
cluster_state.DoAction("instance3", RaftLogAction::SET_INSTANCE_AS_REPLICA);
cluster_state.DoAction("instance2", RaftLogAction::SET_INSTANCE_AS_MAIN);
auto const repl_instances = cluster_state.GetReplicationInstances();
ASSERT_EQ(repl_instances.size(), 2);
ASSERT_EQ(repl_instances[0].status, ReplicationRole::MAIN);
ASSERT_EQ(repl_instances[1].status, ReplicationRole::REPLICA);
ASSERT_TRUE(cluster_state.MainExists());
ASSERT_TRUE(cluster_state.IsMain("instance2"));
ASSERT_FALSE(cluster_state.IsMain("instance3"));
ASSERT_TRUE(cluster_state.IsReplica("instance3"));
ASSERT_FALSE(cluster_state.IsReplica("instance2"));
}
TEST_F(CoordinatorClusterStateTest, UpdateUUID) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
auto uuid = UUID();
cluster_state.DoAction(uuid, RaftLogAction::UPDATE_UUID);
ASSERT_EQ(cluster_state.GetUUID(), uuid);
}
TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
CoordinatorClusterState cluster_state{init_config1};
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
auto instances = cluster_state.GetCoordinatorInstances();
ASSERT_EQ(instances.size(), 2);
ASSERT_EQ(instances[1].config, config);
}
TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
ReplicationInstanceState instance_state{
CoordinatorToReplicaConfig{.instance_name = "instance3",
@ -65,122 +234,43 @@ TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) {
EXPECT_EQ(instance_state.status, deserialized_instance_state.status);
}
TEST_F(CoordinatorClusterStateTest, DoActionRegisterInstances) {
auto coordinator_cluster_state = memgraph::coordination::CoordinatorClusterState{};
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance1",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance1",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance2",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10002}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10113},
.bolt_server = Endpoint{"127.0.0.1", 7689},
.replication_client_info = {.instance_name = "instance3",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10003}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance4",
.mgt_server = Endpoint{"127.0.0.1", 10114},
.bolt_server = Endpoint{"127.0.0.1", 7690},
.replication_client_info = {.instance_name = "instance4",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10004}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance5",
.mgt_server = Endpoint{"127.0.0.1", 10115},
.bolt_server = Endpoint{"127.0.0.1", 7691},
.replication_client_info = {.instance_name = "instance5",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10005}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
{
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance6",
.mgt_server = Endpoint{"127.0.0.1", 10116},
.bolt_server = Endpoint{"127.0.0.1", 7692},
.replication_client_info = {.instance_name = "instance6",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10006}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto buffer = CoordinatorStateMachine::SerializeRegisterInstance(config);
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
coordinator_cluster_state.DoAction(payload, action);
}
ptr<buffer> data;
coordinator_cluster_state.Serialize(data);
auto deserialized_coordinator_cluster_state = CoordinatorClusterState::Deserialize(*data);
ASSERT_EQ(coordinator_cluster_state.GetReplicationInstances(),
deserialized_coordinator_cluster_state.GetReplicationInstances());
TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) {
CoordinatorInstanceState instance_state{
CoordinatorToCoordinatorConfig{.coordinator_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}}};
nlohmann::json j = instance_state;
CoordinatorInstanceState deserialized_instance_state = j.get<CoordinatorInstanceState>();
ASSERT_EQ(instance_state, deserialized_instance_state);
}
TEST_F(CoordinatorClusterStateTest, Marshalling) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7688};
CoordinatorClusterState cluster_state{init_config1};
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
cluster_state.DoAction(config, RaftLogAction::ADD_COORDINATOR_INSTANCE);
auto config2 =
CoordinatorToReplicaConfig{.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10111},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10010}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
cluster_state.DoAction(config2, RaftLogAction::REGISTER_REPLICATION_INSTANCE);
ptr<buffer> data{};
cluster_state.Serialize(data);
auto deserialized_cluster_state = CoordinatorClusterState::Deserialize(*data);
ASSERT_EQ(cluster_state, deserialized_cluster_state);
}

View File

@ -0,0 +1,130 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/coordinator_instance.hpp"
#include "auth/auth.hpp"
#include "flags/run_time_configurable.hpp"
#include "interpreter_faker.hpp"
#include "io/network/endpoint.hpp"
#include "license/license.hpp"
#include "replication_handler/replication_handler.hpp"
#include "storage/v2/config.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
using memgraph::coordination::CoordinatorInstance;
using memgraph::coordination::CoordinatorInstanceInitConfig;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::HealthCheckClientCallback;
using memgraph::coordination::HealthCheckInstanceCallback;
using memgraph::coordination::RaftState;
using memgraph::coordination::RegisterInstanceCoordinatorStatus;
using memgraph::coordination::ReplicationClientInfo;
using memgraph::coordination::ReplicationInstanceClient;
using memgraph::coordination::ReplicationInstanceConnector;
using memgraph::io::network::Endpoint;
using memgraph::replication::ReplicationHandler;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::storage::Config;
using testing::_;
class ReplicationInstanceClientMock : public ReplicationInstanceClient {
public:
ReplicationInstanceClientMock(CoordinatorInstance *coord_instance, CoordinatorToReplicaConfig config)
: ReplicationInstanceClient(coord_instance, config, nullptr, nullptr) {
ON_CALL(*this, DemoteToReplica()).WillByDefault(testing::Return(true));
}
MOCK_METHOD0(DemoteToReplica, bool());
};
class CoordinatorInstanceTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
"MG_tests_unit_coordinator_instance"};
};
TEST_F(CoordinatorInstanceTest, RegisterReplicationInstance) {
auto const init_config =
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
auto instance1 = CoordinatorInstance{init_config};
auto const coord_to_replica_config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
auto status = instance1.RegisterReplicationInstance(coord_to_replica_config);
EXPECT_EQ(status, RegisterInstanceCoordinatorStatus::SUCCESS);
}
TEST_F(CoordinatorInstanceTest, ShowInstancesEmptyTest) {
auto const init_config =
CoordinatorInstanceInitConfig{.coordinator_id = 4, .coordinator_port = 10110, .bolt_port = 7686};
auto const instance1 = CoordinatorInstance{init_config};
auto const instances = instance1.ShowInstances();
ASSERT_EQ(instances.size(), 1);
ASSERT_EQ(instances[0].instance_name, "coordinator_4");
ASSERT_EQ(instances[0].health, "unknown");
ASSERT_EQ(instances[0].raft_socket_address, "127.0.0.1:10110");
ASSERT_EQ(instances[0].coord_socket_address, "");
ASSERT_EQ(instances[0].cluster_role, "coordinator");
}
TEST_F(CoordinatorInstanceTest, ConnectCoordinators) {
auto const init_config1 =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10111, .bolt_port = 7687};
auto instance1 = CoordinatorInstance{init_config1};
auto const init_config2 =
CoordinatorInstanceInitConfig{.coordinator_id = 2, .coordinator_port = 10112, .bolt_port = 7688};
auto const instance2 = CoordinatorInstance{init_config2};
auto const init_config3 =
CoordinatorInstanceInitConfig{.coordinator_id = 3, .coordinator_port = 10113, .bolt_port = 7689};
auto const instance3 = CoordinatorInstance{init_config3};
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 2,
.bolt_server = Endpoint{"127.0.0.1", 7688},
.coordinator_server = Endpoint{"127.0.0.1", 10112}});
instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 3,
.bolt_server = Endpoint{"127.0.0.1", 7689},
.coordinator_server = Endpoint{"127.0.0.1", 10113}});
auto const instances = instance1.ShowInstances();
ASSERT_EQ(instances.size(), 3);
ASSERT_EQ(instances[0].instance_name, "coordinator_1");
ASSERT_EQ(instances[1].instance_name, "coordinator_2");
ASSERT_EQ(instances[2].instance_name, "coordinator_3");
}

View File

@ -0,0 +1,131 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "nuraft/coordinator_state_machine.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
#include "libnuraft/nuraft.hxx"
using memgraph::coordination::CoordinatorStateMachine;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::RaftLogAction;
using memgraph::io::network::Endpoint;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::utils::UUID;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::ptr;
class CoordinatorStateMachineTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() /
"MG_tests_unit_coordinator_state_machine"};
};
TEST_F(CoordinatorStateMachineTest, SerializeRegisterReplicationInstance) {
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
ptr<buffer> data = CoordinatorStateMachine::SerializeRegisterInstance(config);
buffer_serializer bs(*data);
auto const expected = nlohmann::json{{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}};
ASSERT_EQ(bs.get_str(), expected.dump());
}
TEST_F(CoordinatorStateMachineTest, SerializeUnregisterReplicationInstance) {
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
CoordinatorStateMachine::SerializeRegisterInstance(config);
ptr<buffer> data = CoordinatorStateMachine::SerializeUnregisterInstance("instance3");
buffer_serializer bs(*data);
auto const expected =
nlohmann::json{{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", "instance3"}};
ASSERT_EQ(bs.get_str(), expected.dump());
}
TEST_F(CoordinatorStateMachineTest, SerializeAddCoordinatorInstance) {
CoordinatorToCoordinatorConfig config{.coordinator_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7687},
.coordinator_server = Endpoint{"127.0.0.1", 10111}};
ptr<buffer> data = CoordinatorStateMachine::SerializeAddCoordinatorInstance(config);
buffer_serializer bs(*data);
auto const expected = nlohmann::json{{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}};
ASSERT_EQ(bs.get_str(), expected.dump());
}
TEST_F(CoordinatorStateMachineTest, SerializeSetInstanceToMain) {
auto config =
CoordinatorToReplicaConfig{.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10112},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = {.instance_name = "instance_name",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}},
.instance_health_check_frequency_sec = std::chrono::seconds{1},
.instance_down_timeout_sec = std::chrono::seconds{5},
.instance_get_uuid_frequency_sec = std::chrono::seconds{10},
.ssl = std::nullopt};
CoordinatorStateMachine::SerializeRegisterInstance(config);
{
ptr<buffer> data = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3");
buffer_serializer bs(*data);
auto const expected = nlohmann::json{{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", "instance3"}};
ASSERT_EQ(bs.get_str(), expected.dump());
}
{
ptr<buffer> data = CoordinatorStateMachine::SerializeSetInstanceAsReplica("instance3");
buffer_serializer bs(*data);
auto const expected = nlohmann::json{{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", "instance3"}};
ASSERT_EQ(bs.get_str(), expected.dump());
}
}
TEST_F(CoordinatorStateMachineTest, SerializeUpdateUUID) {
auto uuid = UUID{};
ptr<buffer> data = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
buffer_serializer bs(*data);
auto const expected = nlohmann::json{{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}};
ASSERT_EQ(bs.get_str(), expected.dump());
}

View File

@ -2706,7 +2706,7 @@ TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) {
auto *parsed_query = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query));
EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE);
ast_generator.CheckLiteral(parsed_query->coordinator_server_id_, TypedValue(1));
ast_generator.CheckLiteral(parsed_query->coordinator_id_, TypedValue(1));
auto const evaluate_config_map = [&ast_generator](std::unordered_map<Expression *, Expression *> const &config_map)
-> std::map<std::string, std::string, std::less<>> {

138
tests/unit/raft_state.cpp Normal file
View File

@ -0,0 +1,138 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/raft_state.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
#include "libnuraft/nuraft.hxx"
using memgraph::coordination::CoordinatorInstanceInitConfig;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::RaftState;
using memgraph::coordination::ReplicationClientInfo;
using memgraph::io::network::Endpoint;
using memgraph::replication_coordination_glue::ReplicationMode;
using nuraft::ptr;
class RaftStateTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_state"};
};
TEST_F(RaftStateTest, RaftStateEmptyMetadata) {
auto become_leader_cb = []() {};
auto become_follower_cb = []() {};
auto const config = CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 1234, .bolt_port = 7688};
auto raft_state = RaftState::MakeRaftState(config, std::move(become_leader_cb), std::move(become_follower_cb));
ASSERT_EQ(raft_state.InstanceName(), "coordinator_1");
ASSERT_EQ(raft_state.RaftSocketAddress(), "127.0.0.1:1234");
ASSERT_TRUE(raft_state.IsLeader());
ASSERT_TRUE(raft_state.GetReplicationInstances().empty());
auto const coords = raft_state.GetCoordinatorInstances();
ASSERT_EQ(coords.size(), 1);
auto const &coord_instance = coords[0];
auto const &coord_config = CoordinatorToCoordinatorConfig{.coordinator_id = 1,
.bolt_server = Endpoint{"127.0.0.1", 7688},
.coordinator_server = Endpoint{"127.0.0.1", 1234}};
ASSERT_EQ(coord_instance.config, coord_config);
}
TEST_F(RaftStateTest, GetSingleRouterRoutingTable) {
auto become_leader_cb = []() {};
auto become_follower_cb = []() {};
auto const init_config =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10112, .bolt_port = 7688};
auto const raft_state =
RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
auto routing_table = raft_state.GetRoutingTable();
ASSERT_EQ(routing_table.size(), 1);
auto const routers = routing_table[0];
ASSERT_EQ(routers.first, std::vector<std::string>{"127.0.0.1:7688"});
ASSERT_EQ(routers.second, "ROUTE");
}
TEST_F(RaftStateTest, GetMixedRoutingTable) {
auto become_leader_cb = []() {};
auto become_follower_cb = []() {};
auto const init_config =
CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 10113, .bolt_port = 7690};
auto leader = RaftState::MakeRaftState(init_config, std::move(become_leader_cb), std::move(become_follower_cb));
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
.instance_name = "instance1",
.mgt_server = Endpoint{"127.0.0.1", 10011},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = ReplicationClientInfo{.instance_name = "instance1",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}}});
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10012},
.bolt_server = Endpoint{"127.0.0.1", 7688},
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10002}}});
leader.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
.instance_name = "instance3",
.mgt_server = Endpoint{"127.0.0.1", 10013},
.bolt_server = Endpoint{"127.0.0.1", 7689},
.replication_client_info = ReplicationClientInfo{.instance_name = "instance3",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10003}}});
leader.AppendAddCoordinatorInstanceLog(
CoordinatorToCoordinatorConfig{.coordinator_id = 2,
.bolt_server = Endpoint{"127.0.0.1", 7691},
.coordinator_server = Endpoint{"127.0.0.1", 10114}});
leader.AppendAddCoordinatorInstanceLog(
CoordinatorToCoordinatorConfig{.coordinator_id = 3,
.bolt_server = Endpoint{"127.0.0.1", 7692},
.coordinator_server = Endpoint{"127.0.0.1", 10115}});
leader.AppendSetInstanceAsMainLog("instance1");
auto const routing_table = leader.GetRoutingTable();
ASSERT_EQ(routing_table.size(), 3);
auto const &mains = routing_table[0];
ASSERT_EQ(mains.second, "WRITE");
ASSERT_EQ(mains.first, std::vector<std::string>{"127.0.0.1:7687"});
auto const &replicas = routing_table[1];
ASSERT_EQ(replicas.second, "READ");
auto const expected_replicas = std::vector<std::string>{"127.0.0.1:7688", "127.0.0.1:7689"};
ASSERT_EQ(replicas.first, expected_replicas);
auto const &routers = routing_table[2];
ASSERT_EQ(routers.second, "ROUTE");
auto const expected_routers = std::vector<std::string>{"127.0.0.1:7690", "127.0.0.1:7691", "127.0.0.1:7692"};
ASSERT_EQ(routers.first, expected_routers);
}

View File

@ -0,0 +1,51 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/replication_instance_connector.hpp"
#include "coordination/coordinator_instance.hpp"
#include "auth/auth.hpp"
#include "flags/run_time_configurable.hpp"
#include "interpreter_faker.hpp"
#include "io/network/endpoint.hpp"
#include "license/license.hpp"
#include "replication_handler/replication_handler.hpp"
#include "storage/v2/config.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
using memgraph::coordination::CoordinatorInstance;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::HealthCheckClientCallback;
using memgraph::coordination::HealthCheckInstanceCallback;
using memgraph::coordination::ReplicationClientInfo;
using memgraph::coordination::ReplicationInstanceClient;
using memgraph::coordination::ReplicationInstanceConnector;
using memgraph::io::network::Endpoint;
using memgraph::replication::ReplicationHandler;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::storage::Config;
using testing::_;
class ReplicationInstanceClientMock {};
class ReplicationInstanceConnectorTest : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
};

View File

@ -1,176 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "auth/auth.hpp"
#include "coordination/coordinator_instance.hpp"
#include "flags/run_time_configurable.hpp"
#include "interpreter_faker.hpp"
#include "io/network/endpoint.hpp"
#include "license/license.hpp"
#include "replication_handler/replication_handler.hpp"
#include "storage/v2/config.hpp"
#include "utils/file.hpp"
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "json/json.hpp"
using memgraph::coordination::CoordinatorInstance;
using memgraph::coordination::CoordinatorToCoordinatorConfig;
using memgraph::coordination::CoordinatorToReplicaConfig;
using memgraph::coordination::RaftState;
using memgraph::coordination::ReplicationClientInfo;
using memgraph::io::network::Endpoint;
using memgraph::replication::ReplicationHandler;
using memgraph::replication_coordination_glue::ReplicationMode;
using memgraph::storage::Config;
// class MockCoordinatorInstance : CoordinatorInstance {
// auto AddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> void override {}
// };
class RoutingTableTest : public ::testing::Test {
protected:
std::filesystem::path main_data_directory{std::filesystem::temp_directory_path() /
"MG_tests_unit_coordinator_cluster_state"};
std::filesystem::path repl1_data_directory{std::filesystem::temp_directory_path() /
"MG_test_unit_storage_v2_replication_repl"};
std::filesystem::path repl2_data_directory{std::filesystem::temp_directory_path() /
"MG_test_unit_storage_v2_replication_repl2"};
void SetUp() override { Clear(); }
void TearDown() override { Clear(); }
Config main_conf = [&] {
Config config{
.durability =
{
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
},
.salient.items = {.properties_on_edges = true},
};
UpdatePaths(config, main_data_directory);
return config;
}();
Config repl1_conf = [&] {
Config config{
.durability =
{
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
},
.salient.items = {.properties_on_edges = true},
};
UpdatePaths(config, repl1_data_directory);
return config;
}();
Config repl2_conf = [&] {
Config config{
.durability =
{
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
},
.salient.items = {.properties_on_edges = true},
};
UpdatePaths(config, repl2_data_directory);
return config;
}();
const std::string local_host = ("127.0.0.1");
const std::array<uint16_t, 2> ports{10000, 20000};
const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"};
private:
void Clear() {
if (std::filesystem::exists(main_data_directory)) std::filesystem::remove_all(main_data_directory);
if (std::filesystem::exists(repl1_data_directory)) std::filesystem::remove_all(repl1_data_directory);
if (std::filesystem::exists(repl2_data_directory)) std::filesystem::remove_all(repl2_data_directory);
}
};
struct MinMemgraph {
MinMemgraph(const memgraph::storage::Config &conf)
: auth{conf.durability.storage_directory / "auth", memgraph::auth::Auth::Config{/* default */}},
repl_state{ReplicationStateRootPath(conf)},
dbms{conf, repl_state
#ifdef MG_ENTERPRISE
,
auth, true
#endif
},
db_acc{dbms.Get()},
db{*db_acc.get()},
repl_handler(repl_state, dbms
#ifdef MG_ENTERPRISE
,
system_, auth
#endif
) {
}
memgraph::auth::SynchedAuth auth;
memgraph::system::System system_;
memgraph::replication::ReplicationState repl_state;
memgraph::dbms::DbmsHandler dbms;
memgraph::dbms::DatabaseAccess db_acc;
memgraph::dbms::Database &db;
ReplicationHandler repl_handler;
};
;
TEST_F(RoutingTableTest, GetSingleRouterRoutingTable) {
CoordinatorInstance instance1;
auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
auto routing_table = instance1.GetRoutingTable(routing);
ASSERT_EQ(routing_table.size(), 1);
auto const routers = routing_table[0];
ASSERT_EQ(routers.first, std::vector<std::string>{"localhost:7688"});
ASSERT_EQ(routers.second, "ROUTE");
}
TEST_F(RoutingTableTest, GetMixedRoutingTable) {
auto instance1 = RaftState::MakeRaftState([]() {}, []() {});
auto routing = std::map<std::string, std::string>{{"address", "localhost:7690"}};
instance1.AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig{
.instance_name = "instance2",
.mgt_server = Endpoint{"127.0.0.1", 10011},
.bolt_server = Endpoint{"127.0.0.1", 7687},
.replication_client_info = ReplicationClientInfo{.instance_name = "instance2",
.replication_mode = ReplicationMode::ASYNC,
.replication_server = Endpoint{"127.0.0.1", 10001}}});
instance1.GetAllCoordinators();
// auto routing_table = instance1.GetRoutingTable(routing);
// ASSERT_EQ(routing_table.size(), 1);
// auto const routers = routing_table[0];
// ASSERT_EQ(routers.second, "ROUTE");
}
// TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) {
//
// CoordinatorInstance instance1;
// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_server_id = 1,
// .bolt_server = Endpoint{"127.0.0.1", 7689},
// .coordinator_server = Endpoint{"127.0.0.1",
// 10111}});
//
// auto routing = std::map<std::string, std::string>{{"address", "localhost:7688"}};
// auto routing_table = instance1.GetRoutingTable(routing);
//
// ASSERT_EQ(routing_table.size(), 1);
//
// auto const routers = routing_table[0];
// ASSERT_EQ(routers.second, "ROUTE");
// ASSERT_EQ(routers.first.size(), 2);
// auto const expected_routers = std::vector<std::string>{"localhost:7689", "localhost:7688"};
// ASSERT_EQ(routers.first, expected_routers);
// }