diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 9470a674e..6a0f44a09 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -45,6 +45,18 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state j.at("config").get_to(instance_state.config); } +CoordinatorClusterState::CoordinatorClusterState(CoordinatorInstanceInitConfig const &config) { + auto c2c_config = CoordinatorToCoordinatorConfig{ + .coordinator_id = config.coordinator_id, + .bolt_server = io::network::Endpoint{"127.0.0.1", static_cast(config.bolt_port)}, + .coordinator_server = io::network::Endpoint{"127.0.0.1", static_cast(config.coordinator_port)}}; + coordinators_.emplace_back(CoordinatorInstanceState{.config = std::move(c2c_config)}); +} + +CoordinatorClusterState::CoordinatorClusterState(std::map> instances, + utils::UUID const ¤t_main_uuid, bool is_lock_opened) + : repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {} + CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other) : repl_instances_{other.repl_instances_}, current_main_uuid_(other.current_main_uuid_), diff --git a/src/coordination/coordinator_communication_config.cpp b/src/coordination/coordinator_communication_config.cpp index 43e7fbc37..4ddbd0a5d 100644 --- a/src/coordination/coordinator_communication_config.cpp +++ b/src/coordination/coordinator_communication_config.cpp @@ -16,13 +16,13 @@ namespace memgraph::coordination { void to_json(nlohmann::json &j, CoordinatorToCoordinatorConfig const &config) { - j = nlohmann::json{{"coordinator_server_id", config.coordinator_server_id}, + j = nlohmann::json{{"coordinator_id", config.coordinator_id}, {"coordinator_server", config.coordinator_server}, {"bolt_server", config.bolt_server}}; } void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config) { - config.coordinator_server_id = j.at("coordinator_server_id").get(); + config.coordinator_id = j.at("coordinator_id").get(); config.coordinator_server = j.at("coordinator_server").get(); config.bolt_server = j.at("bolt_server").get(); } diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 6dc4a2eaf..4830848f7 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -29,9 +29,10 @@ namespace memgraph::coordination { using nuraft::ptr; using nuraft::srv_config; -CoordinatorInstance::CoordinatorInstance() +CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &config) : thread_pool_{1}, raft_state_(RaftState::MakeRaftState( + config, [this]() { spdlog::info("Leader changed, starting all replication instances!"); auto const instances = raft_state_.GetReplicationInstances(); @@ -101,13 +102,13 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i } auto CoordinatorInstance::ShowInstances() const -> std::vector { - auto const coord_instance_to_status = [](ptr const &instance) -> InstanceStatus { - return {.instance_name = "coordinator_" + std::to_string(instance->get_id()), - .raft_socket_address = instance->get_endpoint(), + auto const coord_instance_to_status = [](CoordinatorInstanceState const &instance) -> InstanceStatus { + return {.instance_name = fmt::format("coordinator_{}", instance.config.coordinator_id), + .raft_socket_address = instance.config.coordinator_server.SocketAddress(), .cluster_role = "coordinator", - .health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move + .health = "unknown"}; }; - auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status); + auto instances_status = utils::fmap(raft_state_.GetCoordinatorInstances(), coord_instance_to_status); if (raft_state_.IsLeader()) { auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string { diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 6f25b919f..f4e3e200f 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -24,14 +24,16 @@ namespace memgraph::coordination { -CoordinatorState::CoordinatorState(InstanceInitConfig const &config) { - if (std::holds_alternative(config.data_)) { - auto const mgmt_config = ManagementServerConfig{ - .ip_address = kDefaultReplicationServerIp, - .port = static_cast(std::get(config.data_).management_port), - }; - data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique(mgmt_config)}; - } +CoordinatorState::CoordinatorState(CoordinatorInstanceInitConfig const &config) { + data_.emplace(config); +} + +CoordinatorState::CoordinatorState(ReplicationInstanceInitConfig const &config) { + auto const mgmt_config = ManagementServerConfig{ + .ip_address = kDefaultReplicationServerIp, + .port = static_cast(config.management_port), + }; + data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique(mgmt_config)}; } auto CoordinatorState::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config) diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 28c8b0768..0e3a523cd 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -20,6 +20,9 @@ constexpr int MAX_SNAPSHOTS = 3; namespace memgraph::coordination { +CoordinatorStateMachine::CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config) + : cluster_state_(config) {} + auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); } auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool { diff --git a/src/coordination/include/coordination/coordinator_communication_config.hpp b/src/coordination/include/coordination/coordinator_communication_config.hpp index 56453d3ea..f28f6c776 100644 --- a/src/coordination/include/coordination/coordinator_communication_config.hpp +++ b/src/coordination/include/coordination/coordinator_communication_config.hpp @@ -30,6 +30,16 @@ namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; +struct ReplicationInstanceInitConfig { + int management_port{0}; +}; + +struct CoordinatorInstanceInitConfig { + uint32_t coordinator_id{0}; + int coordinator_port{0}; + int bolt_port{0}; +}; + struct ReplicationClientInfo { std::string instance_name{}; replication_coordination_glue::ReplicationMode replication_mode{}; @@ -66,7 +76,7 @@ struct CoordinatorToReplicaConfig { }; struct CoordinatorToCoordinatorConfig { - uint32_t coordinator_server_id{0}; + uint32_t coordinator_id{0}; io::network::Endpoint bolt_server; io::network::Endpoint coordinator_server; diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 5f74d1410..526930ccd 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -37,7 +37,7 @@ using InstanceNameDbHistories = std::pair data_; -}; - class CoordinatorState { public: - explicit CoordinatorState(InstanceInitConfig const &config); + explicit CoordinatorState(CoordinatorInstanceInitConfig const &config); + explicit CoordinatorState(ReplicationInstanceInitConfig const &config); ~CoordinatorState() = default; CoordinatorState(CoordinatorState const &) = delete; @@ -67,7 +55,7 @@ class CoordinatorState { std::unique_ptr coordinator_server_; }; - std::variant data_; + std::variant data_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 5c4e6cbe5..7e3043bc5 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -40,8 +40,8 @@ using raft_result = nuraft::cmd_result>; class RaftState { private: - explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id, - uint32_t raft_port, std::string raft_address); + explicit RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb, + BecomeFollowerCb become_follower_cb); auto InitRaftServer() -> void; @@ -53,13 +53,13 @@ class RaftState { RaftState &operator=(RaftState &&other) noexcept = default; ~RaftState(); - static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState; + static auto MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb, + BecomeFollowerCb &&become_follower_cb) -> RaftState; auto InstanceName() const -> std::string; auto RaftSocketAddress() const -> std::string; auto AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void; - auto GetAllCoordinators() const -> std::vector>; auto RequestLeadership() -> bool; auto IsLeader() const -> bool; @@ -68,6 +68,7 @@ class RaftState { auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool; auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool; auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; + auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool; auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool; auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool; @@ -78,7 +79,6 @@ class RaftState { auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool; auto GetReplicationInstances() const -> std::vector; - // TODO: (andi) Do we need then GetAllCoordinators? auto GetCoordinatorInstances() const -> std::vector; auto MainExists() const -> bool; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index d54baad81..d8ba9f6f9 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -74,7 +74,7 @@ void from_json(nlohmann::json const &j, CoordinatorInstanceState &instance_state // Source of truth since it is modified only as the result of RAFT's commiting class CoordinatorClusterState { public: - CoordinatorClusterState() = default; + explicit CoordinatorClusterState(CoordinatorInstanceInitConfig const &config); explicit CoordinatorClusterState(std::map> instances, std::vector coordinators, utils::UUID const ¤t_main_uuid, bool is_lock_opened); diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 754cb45af..6d2acb858 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -35,7 +35,7 @@ using nuraft::state_machine; class CoordinatorStateMachine : public state_machine { public: - CoordinatorStateMachine() = default; + explicit CoordinatorStateMachine(CoordinatorInstanceInitConfig const &config); CoordinatorStateMachine(CoordinatorStateMachine const &) = delete; CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete; CoordinatorStateMachine(CoordinatorStateMachine &&) = delete; diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 53f4032f9..b65bb9d16 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -30,11 +30,11 @@ using nuraft::raft_server; using nuraft::srv_config; using raft_result = cmd_result>; -RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t coordinator_id, - uint32_t raft_port, std::string raft_address) - : raft_endpoint_(raft_address, raft_port), - coordinator_id_(coordinator_id), - state_machine_(cs_new()), +RaftState::RaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb become_leader_cb, + BecomeFollowerCb become_follower_cb) + : raft_endpoint_("127.0.0.1", config.coordinator_port), + coordinator_id_(config.coordinator_id), + state_machine_(cs_new(config)), state_manager_(cs_new(coordinator_id_, raft_endpoint_.SocketAddress())), logger_(nullptr), become_leader_cb_(std::move(become_leader_cb)), @@ -97,12 +97,9 @@ auto RaftState::InitRaftServer() -> void { throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint_.SocketAddress()); } -auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState { - uint32_t coordinator_id = FLAGS_coordinator_id; - uint32_t raft_port = FLAGS_coordinator_port; - - auto raft_state = - RaftState(std::move(become_leader_cb), std::move(become_follower_cb), coordinator_id, raft_port, "127.0.0.1"); +auto RaftState::MakeRaftState(CoordinatorInstanceInitConfig const &config, BecomeLeaderCb &&become_leader_cb, + BecomeFollowerCb &&become_follower_cb) -> RaftState { + auto raft_state = RaftState(config, std::move(become_leader_cb), std::move(become_follower_cb)); raft_state.InitRaftServer(); return raft_state; @@ -110,15 +107,13 @@ auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerC RaftState::~RaftState() { launcher_.shutdown(); } -auto RaftState::InstanceName() const -> std::string { - return fmt::format("coordinator_{}", std::to_string(coordinator_id_)); -} +auto RaftState::InstanceName() const -> std::string { return fmt::format("coordinator_{}", coordinator_id_); } auto RaftState::RaftSocketAddress() const -> std::string { return raft_endpoint_.SocketAddress(); } auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void { auto const endpoint = config.coordinator_server.SocketAddress(); - srv_config const srv_config_to_add(static_cast(config.coordinator_server_id), endpoint); + srv_config const srv_config_to_add(static_cast(config.coordinator_id), endpoint); auto cmd_result = raft_server_->add_srv(srv_config_to_add); @@ -136,9 +131,9 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon bool added{false}; while (!maybe_stop()) { std::this_thread::sleep_for(std::chrono::milliseconds(waiting_period)); - const auto server_config = raft_server_->get_srv_config(static_cast(config.coordinator_server_id)); + const auto server_config = raft_server_->get_srv_config(static_cast(config.coordinator_id)); if (server_config) { - spdlog::trace("Server with id {} added to cluster", config.coordinator_server_id); + spdlog::trace("Server with id {} added to cluster", config.coordinator_id); added = true; break; } @@ -150,12 +145,6 @@ auto RaftState::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorCon } } -auto RaftState::GetAllCoordinators() const -> std::vector> { - std::vector> all_srv_configs; - raft_server_->get_srv_config_all(all_srv_configs); - return all_srv_configs; -} - auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } @@ -363,14 +352,14 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c spdlog::error( "Failed to accept request for adding coordinator instance {}. Most likely the reason is that the instance is " "not the leader.", - config.coordinator_server_id); + config.coordinator_id); return false; } - spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_server_id); + spdlog::info("Request for adding coordinator instance {} accepted", config.coordinator_id); if (res->get_result_code() != nuraft::cmd_result_code::OK) { - spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_server_id, + spdlog::error("Failed to add coordinator instance {} with error code {}", config.coordinator_id, static_cast(res->get_result_code())); return false; } diff --git a/src/flags/replication.cpp b/src/flags/replication.cpp index 3f8fd2400..ac59b4b1b 100644 --- a/src/flags/replication.cpp +++ b/src/flags/replication.cpp @@ -10,12 +10,17 @@ // licenses/APL.txt. #include "replication.hpp" +#include "utils/flag_validation.hpp" + +#include #ifdef MG_ENTERPRISE // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint32(management_port, 0, "Port on which coordinator servers will be started."); +DEFINE_VALIDATED_int32(management_port, 0, "Port on which coordinator servers will be started.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_uint32(coordinator_port, 0, "Port on which raft servers will be started."); +DEFINE_VALIDATED_int32(coordinator_port, 0, "Port on which raft servers will be started.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32(coordinator_id, 0, "Unique ID of the raft server."); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/replication.hpp b/src/flags/replication.hpp index e0d1aff8c..fe93c776c 100644 --- a/src/flags/replication.hpp +++ b/src/flags/replication.hpp @@ -15,9 +15,9 @@ #ifdef MG_ENTERPRISE // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DECLARE_uint32(management_port); +DECLARE_int32(management_port); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DECLARE_uint32(coordinator_port); +DECLARE_int32(coordinator_port); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint32(coordinator_id); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 1a6b0d429..6f2d05321 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -407,17 +407,20 @@ int main(int argc, char **argv) { // singleton coordinator state #ifdef MG_ENTERPRISE - memgraph::coordination::InstanceInitConfig config; - if (FLAGS_coordinator_id && FLAGS_coordinator_port) { - config.data_ = memgraph::coordination::CoordinatorInstanceInitConfig { - .raft_server_id = FLAGS_coordinator_id, .coordinator_port = FLAGS_coordinator_port + using memgraph::coordination::CoordinatorInstanceInitConfig; + using memgraph::coordination::CoordinatorState; + using memgraph::coordination::ReplicationInstanceInitConfig; + auto coordinator_state = [&]() -> CoordinatorState { + if (FLAGS_coordinator_id && FLAGS_coordinator_port) { + return CoordinatorState{CoordinatorInstanceInitConfig{.coordinator_id = FLAGS_coordinator_id, + .coordinator_port = FLAGS_coordinator_port, + .bolt_port = FLAGS_bolt_port}}; } - } else if (FLAGS_management_port) { - config.data_ = memgraph::coordination::ReplicationInstanceInitConfig { .management_port = FLAGS_management_port } - } else { - throw std::runtime_error("Coordinator or replica must be started with valid flags!"); - } - memgraph::coordination::CoordinatorState coordinator_state{config}; + if (FLAGS_management_port) { + return CoordinatorState{ReplicationInstanceInitConfig{.management_port = FLAGS_management_port}}; + } + throw std::runtime_error("Coordination or replication instances must be started with valid flags!"); + }(); #endif memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index e3d7bc0b2..3ae5658ef 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3178,7 +3178,7 @@ class CoordinatorQuery : public memgraph::query::Query { memgraph::query::CoordinatorQuery::Action action_; std::string instance_name_{}; std::unordered_map configs_; - memgraph::query::Expression *coordinator_server_id_{nullptr}; + memgraph::query::Expression *coordinator_id_{nullptr}; memgraph::query::CoordinatorQuery::SyncMode sync_mode_; CoordinatorQuery *Clone(AstStorage *storage) const override { @@ -3186,7 +3186,7 @@ class CoordinatorQuery : public memgraph::query::Query { object->action_ = action_; object->instance_name_ = instance_name_; - object->coordinator_server_id_ = coordinator_server_id_ ? coordinator_server_id_->Clone(storage) : nullptr; + object->coordinator_id_ = coordinator_id_ ? coordinator_id_->Clone(storage) : nullptr; object->sync_mode_ = sync_mode_; for (const auto &[key, value] : configs_) { object->configs_[key->Clone(storage)] = value->Clone(storage); diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 35ccb3670..897fbf9b4 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -447,7 +447,7 @@ antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::Add auto *coordinator_query = storage_->Create(); coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE; - coordinator_query->coordinator_server_id_ = std::any_cast(ctx->coordinatorServerId()->accept(this)); + coordinator_query->coordinator_id_ = std::any_cast(ctx->coordinatorServerId()->accept(this)); coordinator_query->configs_ = std::any_cast>(ctx->configsMap->accept(this)); diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 1d02f8435..0b3ba0734 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -498,7 +498,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } auto const coord_coord_config = - coordination::CoordinatorToCoordinatorConfig{.coordinator_server_id = coordinator_id, + coordination::CoordinatorToCoordinatorConfig{.coordinator_id = coordinator_id, .bolt_server = *maybe_bolt_server, .coordinator_server = *maybe_coordinator_server}; @@ -1224,7 +1224,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param throw QueryRuntimeException("Config map must contain {} entry!", kBoltServer); } - auto coord_server_id = coordinator_query->coordinator_server_id_->Accept(evaluator).ValueInt(); + auto coord_server_id = coordinator_query->coordinator_id_->Accept(evaluator).ValueInt(); callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coord_server_id, bolt_server = bolt_server_it->second, diff --git a/tests/unit/coordinator_cluster_state.cpp b/tests/unit/coordinator_cluster_state.cpp index ce166ae2f..581e07621 100644 --- a/tests/unit/coordinator_cluster_state.cpp +++ b/tests/unit/coordinator_cluster_state.cpp @@ -187,7 +187,7 @@ TEST_F(CoordinatorClusterStateTest, UpdateUUID) { } TEST_F(CoordinatorClusterStateTest, AddCoordinatorInstance) { - CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1, + CoordinatorToCoordinatorConfig config{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}; @@ -222,7 +222,7 @@ TEST_F(CoordinatorClusterStateTest, ReplicationInstanceStateSerialization) { TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) { CoordinatorInstanceState instance_state{ - CoordinatorToCoordinatorConfig{.coordinator_server_id = 1, + CoordinatorToCoordinatorConfig{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}}; nlohmann::json j = instance_state; @@ -232,7 +232,7 @@ TEST_F(CoordinatorClusterStateTest, CoordinatorInstanceStateSerialization) { TEST_F(CoordinatorClusterStateTest, Marshalling) { CoordinatorClusterState cluster_state; - CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1, + CoordinatorToCoordinatorConfig config{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}; diff --git a/tests/unit/coordinator_state_machine.cpp b/tests/unit/coordinator_state_machine.cpp index ab6f991f9..0eb6e1167 100644 --- a/tests/unit/coordinator_state_machine.cpp +++ b/tests/unit/coordinator_state_machine.cpp @@ -81,7 +81,7 @@ TEST_F(CoordinatorStateMachineTest, SerializeUnregisterReplicationInstance) { } TEST_F(CoordinatorStateMachineTest, SerializeAddCoordinatorInstance) { - CoordinatorToCoordinatorConfig config{.coordinator_server_id = 1, + CoordinatorToCoordinatorConfig config{.coordinator_id = 1, .bolt_server = Endpoint{"127.0.0.1", 7687}, .coordinator_server = Endpoint{"127.0.0.1", 10111}}; diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 26afd14f5..3f37b4763 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2706,7 +2706,7 @@ TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) { auto *parsed_query = dynamic_cast(ast_generator.ParseQuery(correct_query)); EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE); - ast_generator.CheckLiteral(parsed_query->coordinator_server_id_, TypedValue(1)); + ast_generator.CheckLiteral(parsed_query->coordinator_id_, TypedValue(1)); auto const evaluate_config_map = [&ast_generator](std::unordered_map const &config_map) -> std::map> { diff --git a/tests/unit/raft_state.cpp b/tests/unit/raft_state.cpp index 2cbdb8636..1ef9826d1 100644 --- a/tests/unit/raft_state.cpp +++ b/tests/unit/raft_state.cpp @@ -18,9 +18,10 @@ #include "libnuraft/nuraft.hxx" +using memgraph::coordination::CoordinatorInstanceInitConfig; +using memgraph::coordination::CoordinatorToCoordinatorConfig; using memgraph::coordination::RaftState; -using nuraft::buffer; -using nuraft::buffer_serializer; +using memgraph::io::network::Endpoint; using nuraft::ptr; class RaftStateTest : public ::testing::Test { @@ -32,11 +33,24 @@ class RaftStateTest : public ::testing::Test { std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_raft_state"}; }; -TEST_F(RaftStateTest, CreationOfRaftState) { +TEST_F(RaftStateTest, RaftStateEmptyMetadata) { auto become_leader_cb = []() {}; auto become_follower_cb = []() {}; - auto raft_state = RaftState::MakeRaftState(std::move(become_leader_cb), std::move(become_follower_cb)); - spdlog::info("Raft state instance name: {}", raft_state.InstanceName()); - spdlog::info("Raft state socket address: {}", raft_state.RaftSocketAddress()); + auto const config = CoordinatorInstanceInitConfig{.coordinator_id = 1, .coordinator_port = 1234, .bolt_port = 7688}; + + auto raft_state = RaftState::MakeRaftState(config, std::move(become_leader_cb), std::move(become_follower_cb)); + + ASSERT_EQ(raft_state.InstanceName(), "coordinator_1"); + ASSERT_EQ(raft_state.RaftSocketAddress(), "127.0.0.1:1234"); + ASSERT_TRUE(raft_state.IsLeader()); + ASSERT_TRUE(raft_state.GetReplicationInstances().empty()); + + auto const coords = raft_state.GetCoordinatorInstances(); + ASSERT_EQ(coords.size(), 1); + auto const &coord_instance = coords[0]; + auto const &coord_config = CoordinatorToCoordinatorConfig{.coordinator_id = 1, + .bolt_server = Endpoint{"127.0.0.1", 7688}, + .coordinator_server = Endpoint{"127.0.0.1", 1234}}; + ASSERT_EQ(coord_instance.config, coord_config); } diff --git a/tests/unit/routing_table.cpp b/tests/unit/routing_table.cpp index 42815d461..39c09df0f 100644 --- a/tests/unit/routing_table.cpp +++ b/tests/unit/routing_table.cpp @@ -147,7 +147,6 @@ TEST_F(RoutingTableTest, GetMixedRoutingTable) { .replication_client_info = ReplicationClientInfo{.instance_name = "instance2", .replication_mode = ReplicationMode::ASYNC, .replication_server = Endpoint{"127.0.0.1", 10001}}}); - instance1.GetAllCoordinators(); // auto routing_table = instance1.GetRoutingTable(routing); // ASSERT_EQ(routing_table.size(), 1); @@ -158,7 +157,7 @@ TEST_F(RoutingTableTest, GetMixedRoutingTable) { // TEST_F(RoutingTableTest, GetMultipleRoutersRoutingTable) { // // CoordinatorInstance instance1; -// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_server_id = 1, +// instance1.AddCoordinatorInstance(CoordinatorToCoordinatorConfig{.coordinator_id = 1, // .bolt_server = Endpoint{"127.0.0.1", 7689}, // .coordinator_server = Endpoint{"127.0.0.1", // 10111}});