diff --git a/.clang-tidy b/.clang-tidy index 5773ea5cd..a30f9e592 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -6,6 +6,7 @@ Checks: '*, -altera-unroll-loops, -android-*, -cert-err58-cpp, + -cppcoreguidelines-avoid-do-while, -cppcoreguidelines-avoid-c-arrays, -cppcoreguidelines-avoid-goto, -cppcoreguidelines-avoid-magic-numbers, @@ -60,6 +61,7 @@ Checks: '*, -readability-implicit-bool-conversion, -readability-magic-numbers, -readability-named-parameter, + -readability-identifier-length, -misc-no-recursion, -concurrency-mt-unsafe, -bugprone-easily-swappable-parameters' diff --git a/CMakeLists.txt b/CMakeLists.txt index 266a3bedb..7245bf9f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -275,7 +275,7 @@ option(MG_EXPERIMENTAL_HIGH_AVAILABILITY "Feature flag for experimental high ava if (NOT MG_ENTERPRISE AND MG_EXPERIMENTAL_HIGH_AVAILABILITY) set(MG_EXPERIMENTAL_HIGH_AVAILABILITY OFF) - message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY must be used with enterpise version of the code.") + message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY can only be used with enterpise version of the code.") endif () if (MG_EXPERIMENTAL_HIGH_AVAILABILITY) diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index d44cbcd26..d6ab23132 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -8,12 +8,18 @@ target_sources(mg-coordination include/coordination/coordinator_server.hpp include/coordination/coordinator_config.hpp include/coordination/coordinator_exceptions.hpp - include/coordination/coordinator_instance.hpp include/coordination/coordinator_slk.hpp include/coordination/coordinator_data.hpp include/coordination/constants.hpp include/coordination/coordinator_cluster_config.hpp include/coordination/coordinator_handlers.hpp + include/coordination/coordinator_instance.hpp + include/coordination/instance_status.hpp + include/coordination/replication_instance.hpp + + include/nuraft/coordinator_log_store.hpp + include/nuraft/coordinator_state_machine.hpp + include/nuraft/coordinator_state_manager.hpp PRIVATE coordinator_client.cpp @@ -23,6 +29,11 @@ target_sources(mg-coordination coordinator_data.cpp coordinator_instance.cpp coordinator_handlers.cpp + replication_instance.cpp + + coordinator_log_store.cpp + coordinator_state_machine.cpp + coordinator_state_manager.cpp ) target_include_directories(mg-coordination PUBLIC include) diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp index 3eb251003..3732958de 100644 --- a/src/coordination/coordinator_data.cpp +++ b/src/coordination/coordinator_data.cpp @@ -9,27 +9,29 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "coordination/coordinator_instance.hpp" -#include "coordination/register_main_replica_coordinator_status.hpp" -#include "utils/uuid.hpp" #ifdef MG_ENTERPRISE #include "coordination/coordinator_data.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" +#include "coordination/replication_instance.hpp" +#include "utils/uuid.hpp" + #include <range/v3/view.hpp> #include <shared_mutex> -#include "libnuraft/nuraft.hxx" namespace memgraph::coordination { -CoordinatorData::CoordinatorData() { - auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & { - auto instance = std::ranges::find_if( - coord_data->registered_instances_, - [instance_name](CoordinatorInstance const &instance) { return instance.InstanceName() == instance_name; }); +using nuraft::ptr; +using nuraft::srv_config; - MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!", - instance_name); +CoordinatorData::CoordinatorData() { + auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & { + auto instance = std::ranges::find_if( + coord_data->repl_instances_, + [instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; }); + + MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name); return *instance; }; @@ -70,6 +72,11 @@ CoordinatorData::CoordinatorData() { auto &instance = find_instance(coord_data, instance_name); + if (instance.IsAlive()) { + instance.OnSuccessPing(); + return; + } + const auto &instance_uuid = instance.GetMainUUID(); MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set"); if (main_uuid_ == instance_uuid.value()) { @@ -110,48 +117,40 @@ CoordinatorData::CoordinatorData() { } auto CoordinatorData::TryFailover() -> void { - std::vector<CoordinatorInstance *> alive_registered_replica_instances{}; - std::ranges::transform(registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica) | - ranges::views::filter(&CoordinatorInstance::IsAlive), - std::back_inserter(alive_registered_replica_instances), - [](CoordinatorInstance &instance) { return &instance; }); + auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) | + ranges::views::filter(&ReplicationInstance::IsAlive); - // TODO(antoniof) more complex logic of choosing replica instance - CoordinatorInstance *chosen_replica_instance = - !alive_registered_replica_instances.empty() ? alive_registered_replica_instances[0] : nullptr; - - if (nullptr == chosen_replica_instance) { + if (ranges::empty(alive_replicas)) { spdlog::warn("Failover failed since all replicas are down!"); return; } + // TODO: Smarter choice + auto chosen_replica_instance = ranges::begin(alive_replicas); + chosen_replica_instance->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }}; - utils::UUID potential_new_main_uuid = utils::UUID{}; - spdlog::trace("Generated potential new main uuid"); + auto const potential_new_main_uuid = utils::UUID{}; - auto not_chosen_instance = [chosen_replica_instance](auto *instance) { - return *instance != *chosen_replica_instance; + auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) { + return instance != *chosen_replica_instance; }; + // If for some replicas swap fails, for others on successful ping we will revert back on next change // or we will do failover first again and then it will be consistent again - for (auto *other_replica_instance : alive_registered_replica_instances | ranges::views::filter(not_chosen_instance)) { - if (!other_replica_instance->SendSwapAndUpdateUUID(potential_new_main_uuid)) { + for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) { + if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) { spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover", - other_replica_instance->InstanceName())); + other_replica_instance.InstanceName())); return; } } std::vector<ReplClientInfo> repl_clients_info; - repl_clients_info.reserve(registered_instances_.size() - 1); - - std::ranges::transform(registered_instances_ | ranges::views::filter([chosen_replica_instance](const auto &instance) { - return *chosen_replica_instance != instance; - }), - std::back_inserter(repl_clients_info), - [](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); }); + repl_clients_info.reserve(repl_instances_.size() - 1); + std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance), + std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { @@ -164,41 +163,53 @@ auto CoordinatorData::TryFailover() -> void { spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName()); } -auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> { - std::vector<CoordinatorInstanceStatus> instances_status; - instances_status.reserve(registered_instances_.size()); +auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> { + auto const coord_instances = self_.GetAllCoordinators(); - auto const stringify_repl_role = [](CoordinatorInstance const &instance) -> std::string { + std::vector<InstanceStatus> instances_status; + instances_status.reserve(repl_instances_.size() + coord_instances.size()); + + auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string { if (!instance.IsAlive()) return "unknown"; if (instance.IsMain()) return "main"; return "replica"; }; - auto const instance_to_status = - [&stringify_repl_role](CoordinatorInstance const &instance) -> CoordinatorInstanceStatus { + auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus { return {.instance_name = instance.InstanceName(), - .socket_address = instance.SocketAddress(), - .replication_role = stringify_repl_role(instance), + .coord_socket_address = instance.SocketAddress(), + .cluster_role = stringify_repl_role(instance), .is_alive = instance.IsAlive()}; }; + auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus { + return {.instance_name = "coordinator_" + std::to_string(instance->get_id()), + .raft_socket_address = instance->get_endpoint(), + .cluster_role = "coordinator", + .is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move + // CoordinatorState to every instance, we can be smarter about this using our RPC. + }; + + std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status); + { auto lock = std::shared_lock{coord_data_lock_}; - std::ranges::transform(registered_instances_, std::back_inserter(instances_status), instance_to_status); + std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status); } return instances_status; } +// TODO: (andi) Make sure you cannot put coordinator instance to the main auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { auto lock = std::lock_guard{coord_data_lock_}; - auto const is_new_main = [&instance_name](CoordinatorInstance const &instance) { + auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; }; - auto new_main = std::ranges::find_if(registered_instances_, is_new_main); + auto new_main = std::ranges::find_if(repl_instances_, is_new_main); - if (new_main == registered_instances_.end()) { + if (new_main == repl_instances_.end()) { spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, instance_name); return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; @@ -208,16 +219,16 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; ReplicationClientsInfo repl_clients_info; - repl_clients_info.reserve(registered_instances_.size() - 1); + repl_clients_info.reserve(repl_instances_.size() - 1); - auto const is_not_new_main = [&instance_name](CoordinatorInstance const &instance) { + auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { return instance.InstanceName() != instance_name; }; auto potential_new_main_uuid = utils::UUID{}; spdlog::trace("Generated potential new main uuid"); - for (auto &other_instance : registered_instances_ | ranges::views::filter(is_not_new_main)) { + for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) { spdlog::error( fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); @@ -225,9 +236,9 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc } } - std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main), + std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), std::back_inserter(repl_clients_info), - [](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); }); + [](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); }); if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; @@ -241,20 +252,20 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_data_lock_}; - if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) { + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { return instance.InstanceName() == config.instance_name; })) { return RegisterInstanceCoordinatorStatus::NAME_EXISTS; } - if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) { + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { return instance.SocketAddress() == config.SocketAddress(); })) { return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; } try { - registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); + repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); return RegisterInstanceCoordinatorStatus::SUCCESS; } catch (CoordinatorRegisterInstanceException const &) { @@ -262,5 +273,10 @@ auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> Regist } } +auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) + -> void { + self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index a759a2505..7a0b0fbd0 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -13,83 +13,85 @@ #include "coordination/coordinator_instance.hpp" +#include "coordination/coordinator_exceptions.hpp" +#include "nuraft/coordinator_state_machine.hpp" +#include "nuraft/coordinator_state_manager.hpp" +#include "utils/counter.hpp" + namespace memgraph::coordination { -CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, - HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) - : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)), - replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), - is_alive_(true) { - if (!client_.DemoteToReplica()) { - throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); - } - client_.StartFrequentCheck(); -} +using nuraft::asio_service; +using nuraft::cmd_result; +using nuraft::cs_new; +using nuraft::ptr; +using nuraft::raft_params; +using nuraft::srv_config; +using raft_result = cmd_result<ptr<buffer>>; -auto CoordinatorInstance::OnSuccessPing() -> void { - last_response_time_ = std::chrono::system_clock::now(); - is_alive_ = true; -} +CoordinatorInstance::CoordinatorInstance() + : raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") { + auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_); + state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint); + state_machine_ = cs_new<CoordinatorStateMachine>(); + logger_ = nullptr; -auto CoordinatorInstance::OnFailPing() -> bool { - is_alive_ = - std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() < - CoordinatorClusterConfig::alive_response_time_difference_sec_; - return is_alive_; -} + // ASIO options + asio_service::options asio_opts; + asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this -auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); } -auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } -auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; } + // RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms. + raft_params params; + params.heart_beat_interval_ = 100; + params.election_timeout_lower_bound_ = 200; + params.election_timeout_upper_bound_ = 400; + // 5 logs are preserved before the last snapshot + params.reserved_log_items_ = 5; + // Create snapshot for every 5 log appends + params.snapshot_distance_ = 5; + params.client_req_timeout_ = 3000; + params.return_method_ = raft_params::blocking; -auto CoordinatorInstance::IsReplica() const -> bool { - return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; -} -auto CoordinatorInstance::IsMain() const -> bool { - return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; -} + raft_server_ = + launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params); -auto CoordinatorInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, - HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool { - if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) { - return false; + if (!raft_server_) { + throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint); } - replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; - client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb)); - - return true; -} - -auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) - -> bool { - if (!client_.DemoteToReplica()) { - return false; + auto maybe_stop = utils::ResettableCounter<20>(); + while (!raft_server_->is_initialized() && !maybe_stop()) { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; - client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb)); - - return true; -} - -auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } -auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } - -auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo { - return client_.ReplicationClientInfo(); -} - -auto CoordinatorInstance::GetClient() -> CoordinatorClient & { return client_; } -void CoordinatorInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; } -auto CoordinatorInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; } - -auto CoordinatorInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool { - if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) { - return false; + if (!raft_server_->is_initialized()) { + throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint); } - SetNewMainUUID(main_uuid_); - return true; + + spdlog::info("Raft server started on {}", raft_endpoint); +} + +auto CoordinatorInstance::InstanceName() const -> std::string { + return "coordinator_" + std::to_string(raft_server_id_); +} + +auto CoordinatorInstance::RaftSocketAddress() const -> std::string { + return raft_address_ + ":" + std::to_string(raft_port_); +} + +auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) + -> void { + auto const endpoint = raft_address + ":" + std::to_string(raft_port); + srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint); + if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) { + throw RaftAddServerException("Failed to add server {} to the cluster", endpoint); + } + spdlog::info("Request to add server {} to the cluster accepted", endpoint); +} + +auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> { + std::vector<ptr<srv_config>> all_srv_configs; + raft_server_->get_srv_config_all(all_srv_configs); + return all_srv_configs; } } // namespace memgraph::coordination diff --git a/src/coordination/coordinator_log_store.cpp b/src/coordination/coordinator_log_store.cpp new file mode 100644 index 000000000..11b7be0dd --- /dev/null +++ b/src/coordination/coordinator_log_store.cpp @@ -0,0 +1,331 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "nuraft/coordinator_log_store.hpp" + +namespace memgraph::coordination { + +using nuraft::cs_new; +using nuraft::timer_helper; + +CoordinatorLogStore::CoordinatorLogStore() + : start_idx_(1), + raft_server_bwd_pointer_(nullptr), + disk_emul_delay(0), + disk_emul_thread_(nullptr), + disk_emul_thread_stop_signal_(false), + disk_emul_last_durable_index_(0) { + // Dummy entry for index 0. + ptr<buffer> buf = buffer::alloc(sz_ulong); + logs_[0] = cs_new<log_entry>(0, buf); +} + +CoordinatorLogStore::~CoordinatorLogStore() { + if (disk_emul_thread_) { + disk_emul_thread_stop_signal_ = true; + // disk_emul_ea_.invoke(); + if (disk_emul_thread_->joinable()) { + disk_emul_thread_->join(); + } + } +} + +ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) { + // NOTE: + // Timestamp is used only when `replicate_log_timestamp_` option is on. + // Otherwise, log store does not need to store or load it. + ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(), + entry->get_timestamp()); + return clone; +} + +ulong CoordinatorLogStore::next_slot() const { + std::lock_guard<std::mutex> l(logs_lock_); + // Exclude the dummy entry. + return start_idx_ + logs_.size() - 1; +} + +ulong CoordinatorLogStore::start_index() const { return start_idx_; } + +ptr<log_entry> CoordinatorLogStore::last_entry() const { + ulong next_idx = next_slot(); + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.find(next_idx - 1); + if (entry == logs_.end()) { + entry = logs_.find(0); + } + + return MakeClone(entry->second); +} + +ulong CoordinatorLogStore::append(ptr<log_entry> &entry) { + ptr<log_entry> clone = MakeClone(entry); + + std::lock_guard<std::mutex> l(logs_lock_); + size_t idx = start_idx_ + logs_.size() - 1; + logs_[idx] = clone; + + if (disk_emul_delay) { + uint64_t cur_time = timer_helper::get_timeofday_us(); + disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx; + // disk_emul_ea_.invoke(); + } + + return idx; +} + +void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) { + ptr<log_entry> clone = MakeClone(entry); + + // Discard all logs equal to or greater than `index. + std::lock_guard<std::mutex> l(logs_lock_); + auto itr = logs_.lower_bound(index); + while (itr != logs_.end()) { + itr = logs_.erase(itr); + } + logs_[index] = clone; + + if (disk_emul_delay) { + uint64_t cur_time = timer_helper::get_timeofday_us(); + disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index; + + // Remove entries greater than `index`. + auto entry = disk_emul_logs_being_written_.begin(); + while (entry != disk_emul_logs_being_written_.end()) { + if (entry->second > index) { + entry = disk_emul_logs_being_written_.erase(entry); + } else { + entry++; + } + } + // disk_emul_ea_.invoke(); + } +} + +ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) { + ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>(); + + ret->resize(end - start); + ulong cc = 0; + for (ulong ii = start; ii < end; ++ii) { + ptr<log_entry> src = nullptr; + { + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.find(ii); + if (entry == logs_.end()) { + entry = logs_.find(0); + assert(0); + } + src = entry->second; + } + (*ret)[cc++] = MakeClone(src); + } + return ret; +} + +// NOLINTNEXTLINE(google-default-arguments) +ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end, + int64 batch_size_hint_in_bytes) { + ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>(); + + if (batch_size_hint_in_bytes < 0) { + return ret; + } + + size_t accum_size = 0; + for (ulong ii = start; ii < end; ++ii) { + ptr<log_entry> src = nullptr; + { + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.find(ii); + if (entry == logs_.end()) { + entry = logs_.find(0); + assert(0); + } + src = entry->second; + } + ret->push_back(MakeClone(src)); + accum_size += src->get_buf().size(); + if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break; + } + return ret; +} + +ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) { + ptr<log_entry> src = nullptr; + { + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.find(index); + if (entry == logs_.end()) { + entry = logs_.find(0); + } + src = entry->second; + } + return MakeClone(src); +} + +ulong CoordinatorLogStore::term_at(ulong index) { + ulong term = 0; + { + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.find(index); + if (entry == logs_.end()) { + entry = logs_.find(0); + } + term = entry->second->get_term(); + } + return term; +} + +ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) { + std::vector<ptr<buffer>> logs; + + size_t size_total = 0; + for (ulong ii = index; ii < index + cnt; ++ii) { + ptr<log_entry> le = nullptr; + { + std::lock_guard<std::mutex> l(logs_lock_); + le = logs_[ii]; + } + assert(le.get()); + ptr<buffer> buf = le->serialize(); + size_total += buf->size(); + logs.push_back(buf); + } + + ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total); + buf_out->pos(0); + buf_out->put((int32)cnt); + + for (auto &entry : logs) { + ptr<buffer> &bb = entry; + buf_out->put((int32)bb->size()); + buf_out->put(*bb); + } + return buf_out; +} + +void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) { + pack.pos(0); + int32 num_logs = pack.get_int(); + + for (int32 ii = 0; ii < num_logs; ++ii) { + ulong cur_idx = index + ii; + int32 buf_size = pack.get_int(); + + ptr<buffer> buf_local = buffer::alloc(buf_size); + pack.get(buf_local); + + ptr<log_entry> le = log_entry::deserialize(*buf_local); + { + std::lock_guard<std::mutex> l(logs_lock_); + logs_[cur_idx] = le; + } + } + + { + std::lock_guard<std::mutex> l(logs_lock_); + auto entry = logs_.upper_bound(0); + if (entry != logs_.end()) { + start_idx_ = entry->first; + } else { + start_idx_ = 1; + } + } +} + +bool CoordinatorLogStore::compact(ulong last_log_index) { + std::lock_guard<std::mutex> l(logs_lock_); + for (ulong ii = start_idx_; ii <= last_log_index; ++ii) { + auto entry = logs_.find(ii); + if (entry != logs_.end()) { + logs_.erase(entry); + } + } + + // WARNING: + // Even though nothing has been erased, + // we should set `start_idx_` to new index. + if (start_idx_ <= last_log_index) { + start_idx_ = last_log_index + 1; + } + return true; +} + +bool CoordinatorLogStore::flush() { + disk_emul_last_durable_index_ = next_slot() - 1; + return true; +} + +ulong CoordinatorLogStore::last_durable_index() { + uint64_t last_log = next_slot() - 1; + if (!disk_emul_delay) { + return last_log; + } + + return disk_emul_last_durable_index_; +} + +void CoordinatorLogStore::DiskEmulLoop() { + // This thread mimics async disk writes. + + // uint32_t next_sleep_us = 100 * 1000; + while (!disk_emul_thread_stop_signal_) { + // disk_emul_ea_.wait_us(next_sleep_us); + // disk_emul_ea_.reset(); + if (disk_emul_thread_stop_signal_) break; + + uint64_t cur_time = timer_helper::get_timeofday_us(); + // next_sleep_us = 100 * 1000; + + bool call_notification = false; + { + std::lock_guard<std::mutex> l(logs_lock_); + // Remove all timestamps equal to or smaller than `cur_time`, + // and pick the greatest one among them. + auto entry = disk_emul_logs_being_written_.begin(); + while (entry != disk_emul_logs_being_written_.end()) { + if (entry->first <= cur_time) { + disk_emul_last_durable_index_ = entry->second; + entry = disk_emul_logs_being_written_.erase(entry); + call_notification = true; + } else { + break; + } + } + + entry = disk_emul_logs_being_written_.begin(); + if (entry != disk_emul_logs_being_written_.end()) { + // next_sleep_us = entry->first - cur_time; + } + } + + if (call_notification) { + raft_server_bwd_pointer_->notify_log_append_completion(true); + } + } +} + +void CoordinatorLogStore::Close() {} + +void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) { + disk_emul_delay = delay_ms; + raft_server_bwd_pointer_ = raft; + + if (!disk_emul_thread_) { + disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this); + } +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 96ad1902e..8337fa9d8 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -25,7 +25,7 @@ namespace memgraph::coordination { CoordinatorState::CoordinatorState() { - MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port), + MG_ASSERT(!(FLAGS_raft_server_id && FLAGS_coordinator_server_port), "Instance cannot be a coordinator and have registered coordinator server."); spdlog::info("Executing coordinator constructor"); @@ -68,7 +68,7 @@ auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstan data_); } -auto CoordinatorState::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> { +auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> { MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Can't call show instances on data_, as variant holds wrong alternative"); return std::get<CoordinatorData>(data_).ShowInstances(); @@ -79,5 +79,13 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & { "Cannot get coordinator server since variant holds wrong alternative"); return *std::get<CoordinatorMainReplicaData>(data_).coordinator_server_; } + +auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) + -> void { + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + "Coordinator cannot register replica since variant holds wrong alternative"); + return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp new file mode 100644 index 000000000..a278ab422 --- /dev/null +++ b/src/coordination/coordinator_state_machine.cpp @@ -0,0 +1,98 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "nuraft/coordinator_state_machine.hpp" + +namespace memgraph::coordination { + +auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> { + buffer_serializer bs(data); + std::string str = bs.get_str(); + + spdlog::info("pre_commit {} : {}", log_idx, str); + return nullptr; +} + +auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> { + buffer_serializer bs(data); + std::string str = bs.get_str(); + + spdlog::info("commit {} : {}", log_idx, str); + + last_committed_idx_ = log_idx; + return nullptr; +} + +auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void { + last_committed_idx_ = log_idx; +} + +auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void { + buffer_serializer bs(data); + std::string str = bs.get_str(); + + spdlog::info("rollback {} : {}", log_idx, str); +} + +auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/, + ptr<buffer> &data_out, bool &is_last_obj) -> int { + // Put dummy data. + data_out = buffer::alloc(sizeof(int32)); + buffer_serializer bs(data_out); + bs.put_i32(0); + + is_last_obj = true; + return 0; +} + +auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/, + bool /*is_last_obj*/) -> void { + spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id); + // Request next object. + obj_id++; +} + +auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool { + spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); + { + auto lock = std::lock_guard{last_snapshot_lock_}; + ptr<buffer> snp_buf = s.serialize(); + last_snapshot_ = snapshot::deserialize(*snp_buf); + } + return true; +} + +auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {} + +auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> { + auto lock = std::lock_guard{last_snapshot_lock_}; + return last_snapshot_; +} + +auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; } + +auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void { + spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); + // Clone snapshot from `s`. + { + auto lock = std::lock_guard{last_snapshot_lock_}; + ptr<buffer> snp_buf = s.serialize(); + last_snapshot_ = snapshot::deserialize(*snp_buf); + } + ptr<std::exception> except(nullptr); + bool ret = true; + when_done(ret, except); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_state_manager.cpp b/src/coordination/coordinator_state_manager.cpp new file mode 100644 index 000000000..b2fb81ea1 --- /dev/null +++ b/src/coordination/coordinator_state_manager.cpp @@ -0,0 +1,68 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "nuraft/coordinator_state_manager.hpp" + +namespace memgraph::coordination { + +using nuraft::cluster_config; +using nuraft::cs_new; +using nuraft::srv_config; +using nuraft::srv_state; +using nuraft::state_mgr; + +CoordinatorStateManager::CoordinatorStateManager(int srv_id, std::string const &endpoint) + : my_id_(srv_id), my_endpoint_(endpoint), cur_log_store_(cs_new<CoordinatorLogStore>()) { + my_srv_config_ = cs_new<srv_config>(srv_id, endpoint); + + // Initial cluster config: contains only one server (myself). + cluster_config_ = cs_new<cluster_config>(); + cluster_config_->get_servers().push_back(my_srv_config_); +} + +auto CoordinatorStateManager::load_config() -> ptr<cluster_config> { + // Just return in-memory data in this example. + // May require reading from disk here, if it has been written to disk. + return cluster_config_; +} + +auto CoordinatorStateManager::save_config(cluster_config const &config) -> void { + // Just keep in memory in this example. + // Need to write to disk here, if want to make it durable. + ptr<buffer> buf = config.serialize(); + cluster_config_ = cluster_config::deserialize(*buf); +} + +auto CoordinatorStateManager::save_state(srv_state const &state) -> void { + // Just keep in memory in this example. + // Need to write to disk here, if want to make it durable. + ptr<buffer> buf = state.serialize(); + saved_state_ = srv_state::deserialize(*buf); +} + +auto CoordinatorStateManager::read_state() -> ptr<srv_state> { + // Just return in-memory data in this example. + // May require reading from disk here, if it has been written to disk. + return saved_state_; +} + +auto CoordinatorStateManager::load_log_store() -> ptr<log_store> { return cur_log_store_; } + +auto CoordinatorStateManager::server_id() -> int32 { return my_id_; } + +auto CoordinatorStateManager::system_exit(int const exit_code) -> void {} + +auto CoordinatorStateManager::GetSrvConfig() const -> ptr<srv_config> { return my_srv_config_; } + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 00695acd7..76ae49a9f 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -49,12 +49,10 @@ class CoordinatorClient { auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const -> bool; - auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool; auto ReplicationClientInfo() const -> ReplClientInfo; - auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; auto RpcClient() -> rpc::Client & { return rpc_client_; } diff --git a/src/coordination/include/coordination/coordinator_data.hpp b/src/coordination/include/coordination/coordinator_data.hpp index 73bebdf7e..9f4c60297 100644 --- a/src/coordination/include/coordination/coordinator_data.hpp +++ b/src/coordination/include/coordination/coordinator_data.hpp @@ -11,36 +11,45 @@ #pragma once -#include "utils/uuid.hpp" #ifdef MG_ENTERPRISE -#include <list> #include "coordination/coordinator_instance.hpp" -#include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" +#include "coordination/instance_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" +#include "coordination/replication_instance.hpp" #include "replication_coordination_glue/handler.hpp" #include "utils/rw_lock.hpp" #include "utils/thread_pool.hpp" +#include "utils/uuid.hpp" + +#include <list> namespace memgraph::coordination { class CoordinatorData { public: CoordinatorData(); + // TODO: (andi) Probably rename to RegisterReplicationInstance [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + auto ShowInstances() const -> std::vector<InstanceStatus>; + auto TryFailover() -> void; - auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; private: - mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; + // NOTE: Must be std::list because we rely on pointer stability - std::list<CoordinatorInstance> registered_instances_; + std::list<ReplicationInstance> repl_instances_; + mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; + + CoordinatorInstance self_; + utils::UUID main_uuid_; }; diff --git a/src/coordination/include/coordination/coordinator_exceptions.hpp b/src/coordination/include/coordination/coordinator_exceptions.hpp index c9e2dff81..5b697e371 100644 --- a/src/coordination/include/coordination/coordinator_exceptions.hpp +++ b/src/coordination/include/coordination/coordinator_exceptions.hpp @@ -28,5 +28,27 @@ class CoordinatorRegisterInstanceException final : public utils::BasicException SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException) }; +class RaftServerStartException final : public utils::BasicException { + public: + explicit RaftServerStartException(std::string_view what) noexcept : BasicException(what) {} + + template <class... Args> + explicit RaftServerStartException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : RaftServerStartException(fmt::format(fmt, std::forward<Args>(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(RaftServerStartException) +}; + +class RaftAddServerException final : public utils::BasicException { + public: + explicit RaftAddServerException(std::string_view what) noexcept : BasicException(what) {} + + template <class... Args> + explicit RaftAddServerException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : RaftAddServerException(fmt::format(fmt, std::forward<Args>(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException) +}; + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index 1f170bd61..4aa4656c3 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -31,8 +31,8 @@ class CoordinatorHandlers { slk::Builder *res_builder); static void DemoteMainToReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); - static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); - + static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index f3fd3deca..1c7af59ae 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -13,70 +13,45 @@ #ifdef MG_ENTERPRISE -#include "coordination/coordinator_client.hpp" -#include "coordination/coordinator_cluster_config.hpp" -#include "coordination/coordinator_exceptions.hpp" -#include "replication_coordination_glue/handler.hpp" -#include "replication_coordination_glue/role.hpp" +#include <flags/replication.hpp> + +#include <libnuraft/nuraft.hxx> namespace memgraph::coordination { -class CoordinatorData; +using nuraft::logger; +using nuraft::ptr; +using nuraft::raft_launcher; +using nuraft::raft_server; +using nuraft::srv_config; +using nuraft::state_machine; +using nuraft::state_mgr; class CoordinatorInstance { public: - CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb, - HealthCheckCallback fail_cb); - + CoordinatorInstance(); CoordinatorInstance(CoordinatorInstance const &other) = delete; CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete; CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete; CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete; ~CoordinatorInstance() = default; - auto OnSuccessPing() -> void; - auto OnFailPing() -> bool; - - auto IsAlive() const -> bool; - auto InstanceName() const -> std::string; - auto SocketAddress() const -> std::string; - - auto IsReplica() const -> bool; - auto IsMain() const -> bool; - - auto PromoteToMain(utils::UUID main_uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, - HealthCheckCallback main_fail_cb) -> bool; - auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; - - auto PauseFrequentCheck() -> void; - auto ResumeFrequentCheck() -> void; - - auto ReplicationClientInfo() const -> ReplClientInfo; - - auto GetClient() -> CoordinatorClient &; - - void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt); - auto GetMainUUID() -> const std::optional<utils::UUID> &; - - auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool; + auto RaftSocketAddress() const -> std::string; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>; private: - CoordinatorClient client_; - replication_coordination_glue::ReplicationRole replication_role_; - std::chrono::system_clock::time_point last_response_time_{}; - // TODO this needs to be atomic? What if instance is alive and then we read it and it has changed - bool is_alive_{false}; - // for replica this is main uuid of current main - // for "main" main this same as in CoordinatorData - // it is set to nullopt when replica is down - // TLDR; when replica is down and comes back up we reset uuid of main replica is listening to - // so we need to send swap uuid again - std::optional<utils::UUID> main_uuid_; + ptr<state_machine> state_machine_; + ptr<state_mgr> state_manager_; + ptr<raft_server> raft_server_; + ptr<logger> logger_; + raft_launcher launcher_; - friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) { - return first.client_ == second.client_ && first.replication_role_ == second.replication_role_; - } + // TODO: (andi) I think variables below can be abstracted + uint32_t raft_server_id_; + uint32_t raft_port_; + std::string raft_address_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 5f52f85e5..9ab33a04e 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -14,8 +14,8 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_data.hpp" -#include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" +#include "coordination/instance_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #include <variant> @@ -37,7 +37,9 @@ class CoordinatorState { [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; - auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>; + auto ShowInstances() const -> std::vector<InstanceStatus>; + + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; // The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; diff --git a/src/coordination/include/coordination/coordinator_instance_status.hpp b/src/coordination/include/coordination/instance_status.hpp similarity index 71% rename from src/coordination/include/coordination/coordinator_instance_status.hpp rename to src/coordination/include/coordination/instance_status.hpp index 2a0a3a985..492410061 100644 --- a/src/coordination/include/coordination/coordinator_instance_status.hpp +++ b/src/coordination/include/coordination/instance_status.hpp @@ -19,10 +19,13 @@ namespace memgraph::coordination { -struct CoordinatorInstanceStatus { +// TODO: (andi) For phase IV. Some instances won't have raft_socket_address, coord_socket_address, replication_role and +// cluster role... At the end, all instances will have everything. +struct InstanceStatus { std::string instance_name; - std::string socket_address; - std::string replication_role; + std::string raft_socket_address; + std::string coord_socket_address; + std::string cluster_role; bool is_alive; }; diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp new file mode 100644 index 000000000..9d4765b47 --- /dev/null +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -0,0 +1,84 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_client.hpp" +#include "coordination/coordinator_cluster_config.hpp" +#include "coordination/coordinator_exceptions.hpp" +#include "replication_coordination_glue/role.hpp" + +#include <libnuraft/nuraft.hxx> +#include "utils/uuid.hpp" + +namespace memgraph::coordination { + +class CoordinatorData; + +class ReplicationInstance { + public: + ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb, + HealthCheckCallback fail_cb); + + ReplicationInstance(ReplicationInstance const &other) = delete; + ReplicationInstance &operator=(ReplicationInstance const &other) = delete; + ReplicationInstance(ReplicationInstance &&other) noexcept = delete; + ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete; + ~ReplicationInstance() = default; + + auto OnSuccessPing() -> void; + auto OnFailPing() -> bool; + + auto IsAlive() const -> bool; + + auto InstanceName() const -> std::string; + auto SocketAddress() const -> std::string; + + auto IsReplica() const -> bool; + auto IsMain() const -> bool; + + auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, + HealthCheckCallback main_fail_cb) -> bool; + auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; + + auto PauseFrequentCheck() -> void; + auto ResumeFrequentCheck() -> void; + + auto ReplicationClientInfo() const -> ReplClientInfo; + + auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool; + auto GetClient() -> CoordinatorClient &; + + void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt); + auto GetMainUUID() -> const std::optional<utils::UUID> &; + + private: + CoordinatorClient client_; + replication_coordination_glue::ReplicationRole replication_role_; + std::chrono::system_clock::time_point last_response_time_{}; + bool is_alive_{false}; + + // for replica this is main uuid of current main + // for "main" main this same as in CoordinatorData + // it is set to nullopt when replica is down + // TLDR; when replica is down and comes back up we reset uuid of main replica is listening to + // so we need to send swap uuid again + std::optional<utils::UUID> main_uuid_; + + friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) { + return first.client_ == second.client_ && first.replication_role_ == second.replication_role_; + } +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/nuraft/coordinator_log_store.hpp b/src/coordination/include/nuraft/coordinator_log_store.hpp new file mode 100644 index 000000000..ce1695d2f --- /dev/null +++ b/src/coordination/include/nuraft/coordinator_log_store.hpp @@ -0,0 +1,128 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <libnuraft/nuraft.hxx> + +namespace memgraph::coordination { + +using nuraft::buffer; +using nuraft::int32; +using nuraft::int64; +using nuraft::log_entry; +using nuraft::log_store; +using nuraft::ptr; +using nuraft::raft_server; + +class CoordinatorLogStore : public log_store { + public: + CoordinatorLogStore(); + CoordinatorLogStore(CoordinatorLogStore const &) = delete; + CoordinatorLogStore &operator=(CoordinatorLogStore const &) = delete; + CoordinatorLogStore(CoordinatorLogStore &&) = delete; + CoordinatorLogStore &operator=(CoordinatorLogStore &&) = delete; + ~CoordinatorLogStore() override; + + ulong next_slot() const override; + + ulong start_index() const override; + + ptr<log_entry> last_entry() const override; + + ulong append(ptr<log_entry> &entry) override; + + void write_at(ulong index, ptr<log_entry> &entry) override; + + ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override; + + // NOLINTNEXTLINE + ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override; + + ptr<log_entry> entry_at(ulong index) override; + + ulong term_at(ulong index) override; + + ptr<buffer> pack(ulong index, int32 cnt) override; + + void apply_pack(ulong index, buffer &pack) override; + + bool compact(ulong last_log_index) override; + + bool flush() override; + + ulong last_durable_index() override; + + void Close(); + + void SetDiskDelay(raft_server *raft, size_t delay_ms); + + private: + static ptr<log_entry> MakeClone(ptr<log_entry> const &entry); + + void DiskEmulLoop(); + + /** + * Map of <log index, log data>. + */ + std::map<ulong, ptr<log_entry>> logs_; + + /** + * Lock for `logs_`. + */ + mutable std::mutex logs_lock_; + + /** + * The index of the first log. + */ + std::atomic<ulong> start_idx_; + + /** + * Backward pointer to Raft server. + */ + raft_server *raft_server_bwd_pointer_; + + // Testing purpose --------------- BEGIN + + /** + * If non-zero, this log store will emulate the disk write delay. + */ + std::atomic<size_t> disk_emul_delay; + + /** + * Map of <timestamp, log index>, emulating logs that is being written to disk. + * Log index will be regarded as "durable" after the corresponding timestamp. + */ + std::map<uint64_t, uint64_t> disk_emul_logs_being_written_; + + /** + * Thread that will update `last_durable_index_` and call + * `notify_log_append_completion` at proper time. + */ + std::unique_ptr<std::thread> disk_emul_thread_; + + /** + * Flag to terminate the thread. + */ + std::atomic<bool> disk_emul_thread_stop_signal_; + + /** + * Last written log index. + */ + std::atomic<uint64_t> disk_emul_last_durable_index_; + + // Testing purpose --------------- END +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp new file mode 100644 index 000000000..fd7e92401 --- /dev/null +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -0,0 +1,72 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <spdlog/spdlog.h> +#include <libnuraft/nuraft.hxx> + +namespace memgraph::coordination { + +using nuraft::async_result; +using nuraft::buffer; +using nuraft::buffer_serializer; +using nuraft::cluster_config; +using nuraft::int32; +using nuraft::ptr; +using nuraft::snapshot; +using nuraft::state_machine; + +class CoordinatorStateMachine : public state_machine { + public: + CoordinatorStateMachine() = default; + CoordinatorStateMachine(CoordinatorStateMachine const &) = delete; + CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete; + CoordinatorStateMachine(CoordinatorStateMachine &&) = delete; + CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete; + ~CoordinatorStateMachine() override {} + + auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override; + + auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override; + + auto commit_config(ulong log_idx, ptr<cluster_config> & /*new_conf*/) -> void override; + + auto rollback(ulong log_idx, buffer &data) -> void override; + + auto read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/, ptr<buffer> &data_out, + bool &is_last_obj) -> int override; + + auto save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/, bool /*is_last_obj*/) + -> void override; + + auto apply_snapshot(snapshot &s) -> bool override; + + auto free_user_snp_ctx(void *&user_snp_ctx) -> void override; + + auto last_snapshot() -> ptr<snapshot> override; + + auto last_commit_index() -> ulong override; + + auto create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void override; + + private: + std::atomic<uint64_t> last_committed_idx_{0}; + + ptr<snapshot> last_snapshot_; + + std::mutex last_snapshot_lock_; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/nuraft/coordinator_state_manager.hpp b/src/coordination/include/nuraft/coordinator_state_manager.hpp new file mode 100644 index 000000000..b6cb6599b --- /dev/null +++ b/src/coordination/include/nuraft/coordinator_state_manager.hpp @@ -0,0 +1,66 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include "nuraft/coordinator_log_store.hpp" + +#include <spdlog/spdlog.h> +#include <libnuraft/nuraft.hxx> + +namespace memgraph::coordination { + +using nuraft::cluster_config; +using nuraft::cs_new; +using nuraft::srv_config; +using nuraft::srv_state; +using nuraft::state_mgr; + +class CoordinatorStateManager : public state_mgr { + public: + explicit CoordinatorStateManager(int srv_id, std::string const &endpoint); + + CoordinatorStateManager(CoordinatorStateManager const &) = delete; + CoordinatorStateManager &operator=(CoordinatorStateManager const &) = delete; + CoordinatorStateManager(CoordinatorStateManager &&) = delete; + CoordinatorStateManager &operator=(CoordinatorStateManager &&) = delete; + + ~CoordinatorStateManager() override = default; + + auto load_config() -> ptr<cluster_config> override; + + auto save_config(cluster_config const &config) -> void override; + + auto save_state(srv_state const &state) -> void override; + + auto read_state() -> ptr<srv_state> override; + + auto load_log_store() -> ptr<log_store> override; + + auto server_id() -> int32 override; + + auto system_exit(int exit_code) -> void override; + + auto GetSrvConfig() const -> ptr<srv_config>; + + private: + int my_id_; + std::string my_endpoint_; + ptr<CoordinatorLogStore> cur_log_store_; + ptr<srv_config> my_srv_config_; + ptr<cluster_config> cluster_config_; + ptr<srv_state> saved_state_; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp new file mode 100644 index 000000000..96a5c2a0e --- /dev/null +++ b/src/coordination/replication_instance.cpp @@ -0,0 +1,98 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "coordination/replication_instance.hpp" + +#include "replication_coordination_glue/handler.hpp" + +namespace memgraph::coordination { + +ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, + HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) + : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)), + replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), + is_alive_(true) { + if (!client_.DemoteToReplica()) { + throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); + } + client_.StartFrequentCheck(); +} + +auto ReplicationInstance::OnSuccessPing() -> void { + last_response_time_ = std::chrono::system_clock::now(); + is_alive_ = true; +} + +auto ReplicationInstance::OnFailPing() -> bool { + is_alive_ = + std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() < + CoordinatorClusterConfig::alive_response_time_difference_sec_; + return is_alive_; +} + +auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); } +auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } +auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; } + +auto ReplicationInstance::IsReplica() const -> bool { + return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; +} +auto ReplicationInstance::IsMain() const -> bool { + return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; +} + +auto ReplicationInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, + HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool { + if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) { + return false; + } + + replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; + client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb)); + + return true; +} + +auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) + -> bool { + if (!client_.DemoteToReplica()) { + return false; + } + + replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; + client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb)); + + return true; +} + +auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } +auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } + +auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo { + return client_.ReplicationClientInfo(); +} + +auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } +void ReplicationInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; } +auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; } + +auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool { + if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) { + return false; + } + SetNewMainUUID(main_uuid_); + return true; +} + +} // namespace memgraph::coordination +#endif diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index 958de0f91..d1310dee2 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -9,12 +9,11 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "coordination/register_main_replica_coordinator_status.hpp" #ifdef MG_ENTERPRISE #include "dbms/coordinator_handler.hpp" -#include "dbms/dbms_handler.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" namespace memgraph::dbms { @@ -31,9 +30,15 @@ auto CoordinatorHandler::SetInstanceToMain(std::string instance_name) return coordinator_state_.SetInstanceToMain(std::move(instance_name)); } -auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::CoordinatorInstanceStatus> { +auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> { return coordinator_state_.ShowInstances(); } + +auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) + -> void { + coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); +} + } // namespace memgraph::dbms #endif diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 04cfe8032..a2a1f19dc 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -14,8 +14,8 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" -#include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_state.hpp" +#include "coordination/instance_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #include <vector> @@ -33,7 +33,9 @@ class CoordinatorHandler { auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; - auto ShowInstances() const -> std::vector<coordination::CoordinatorInstanceStatus>; + auto ShowInstances() const -> std::vector<coordination::InstanceStatus>; + + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; private: coordination::CoordinatorState &coordinator_state_; diff --git a/src/flags/replication.cpp b/src/flags/replication.cpp index 3cd5187f3..29c7bfbda 100644 --- a/src/flags/replication.cpp +++ b/src/flags/replication.cpp @@ -13,9 +13,11 @@ #ifdef MG_ENTERPRISE // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(coordinator, false, "Controls whether the instance is a replication coordinator."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32(coordinator_server_port, 0, "Port on which coordinator servers will be started."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32(raft_server_port, 0, "Port on which raft servers will be started."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32(raft_server_id, 0, "Unique ID of the raft server."); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/replication.hpp b/src/flags/replication.hpp index 16f4c74d2..025079271 100644 --- a/src/flags/replication.hpp +++ b/src/flags/replication.hpp @@ -15,9 +15,11 @@ #ifdef MG_ENTERPRISE // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DECLARE_bool(coordinator); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint32(coordinator_server_port); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(raft_server_port); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(raft_server_id); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/io/network/endpoint.cpp b/src/io/network/endpoint.cpp index e9032e42e..44123db6b 100644 --- a/src/io/network/endpoint.cpp +++ b/src/io/network/endpoint.cpp @@ -39,7 +39,7 @@ Endpoint::IpFamily Endpoint::GetIpFamily(const std::string &address) { } std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrIpAddress( - const std::string &address, const std::optional<uint16_t> default_port = {}) { + const std::string &address, const std::optional<uint16_t> default_port) { /// expected address format: /// - "ip_address:port_number" /// - "ip_address" diff --git a/src/io/network/endpoint.hpp b/src/io/network/endpoint.hpp index 281be2162..16d70e080 100644 --- a/src/io/network/endpoint.hpp +++ b/src/io/network/endpoint.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -61,8 +61,8 @@ struct Endpoint { * it into an ip address and a port number; even if a default port is given, * it won't be used, as we expect that it is given in the address string. */ - static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress(const std::string &address, - std::optional<uint16_t> default_port); + static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress( + const std::string &address, std::optional<uint16_t> default_port = {}); /** * Tries to parse given string as either socket address or hostname. diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index 0cbb790d0..6fe6b8c9e 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3071,11 +3071,7 @@ class CoordinatorQuery : public memgraph::query::Query { static const utils::TypeInfo kType; const utils::TypeInfo &GetTypeInfo() const override { return kType; } - enum class Action { - REGISTER_INSTANCE, - SET_INSTANCE_TO_MAIN, - SHOW_REPLICATION_CLUSTER, - }; + enum class Action { REGISTER_INSTANCE, SET_INSTANCE_TO_MAIN, SHOW_INSTANCES, ADD_COORDINATOR_INSTANCE }; enum class SyncMode { SYNC, ASYNC }; @@ -3087,6 +3083,8 @@ class CoordinatorQuery : public memgraph::query::Query { std::string instance_name_; memgraph::query::Expression *replication_socket_address_{nullptr}; memgraph::query::Expression *coordinator_socket_address_{nullptr}; + memgraph::query::Expression *raft_socket_address_{nullptr}; + memgraph::query::Expression *raft_server_id_{nullptr}; memgraph::query::CoordinatorQuery::SyncMode sync_mode_; CoordinatorQuery *Clone(AstStorage *storage) const override { @@ -3098,6 +3096,8 @@ class CoordinatorQuery : public memgraph::query::Query { object->sync_mode_ = sync_mode_; object->coordinator_socket_address_ = coordinator_socket_address_ ? coordinator_socket_address_->Clone(storage) : nullptr; + object->raft_socket_address_ = raft_socket_address_ ? raft_socket_address_->Clone(storage) : nullptr; + object->raft_server_id_ = raft_server_id_ ? raft_server_id_->Clone(storage) : nullptr; return object; } diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 5735326ac..1de5e55ff 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -374,7 +374,6 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe return replication_query; } -// License check is done in the interpreter. antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) { auto *coordinator_query = storage_->Create<CoordinatorQuery>(); @@ -400,10 +399,28 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( return coordinator_query; } -// License check is done in the interpreter -antlrcpp::Any CypherMainVisitor::visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext * /*ctx*/) { +antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) { auto *coordinator_query = storage_->Create<CoordinatorQuery>(); - coordinator_query->action_ = CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER; + + if (!ctx->raftSocketAddress()->literal()->StringLiteral()) { + throw SemanticException("Raft socket address should be a string literal!"); + } + + if (!ctx->raftServerId()->literal()->numberLiteral()) { + throw SemanticException("Raft server id should be a number literal!"); + } + + coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE; + coordinator_query->raft_socket_address_ = std::any_cast<Expression *>(ctx->raftSocketAddress()->accept(this)); + coordinator_query->raft_server_id_ = std::any_cast<Expression *>(ctx->raftServerId()->accept(this)); + + return coordinator_query; +} + +// License check is done in the interpreter +antlrcpp::Any CypherMainVisitor::visitShowInstances(MemgraphCypher::ShowInstancesContext * /*ctx*/) { + auto *coordinator_query = storage_->Create<CoordinatorQuery>(); + coordinator_query->action_ = CoordinatorQuery::Action::SHOW_INSTANCES; return coordinator_query; } diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index e9da98f71..9007ec60a 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -251,7 +251,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { /** * @return CoordinatorQuery* */ - antlrcpp::Any visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext *ctx) override; + antlrcpp::Any visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) override; + + /** + * @return CoordinatorQuery* + */ + antlrcpp::Any visitShowInstances(MemgraphCypher::ShowInstancesContext *ctx) override; /** * @return LockPathQuery* diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index a99eda3e9..0597967c7 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -20,6 +20,7 @@ options { tokenVocab=MemgraphCypherLexer; } import Cypher ; memgraphCypherKeyword : cypherKeyword + | ADD | ACTIVE | AFTER | ALTER @@ -64,6 +65,7 @@ memgraphCypherKeyword : cypherKeyword | HEADER | IDENTIFIED | INSTANCE + | INSTANCES | NODE_LABELS | NULLIF | IMPORT @@ -189,7 +191,8 @@ replicationQuery : setReplicationRole coordinatorQuery : registerInstanceOnCoordinator | setInstanceToMain - | showReplicationCluster + | showInstances + | addCoordinatorInstance ; triggerQuery : createTrigger @@ -374,7 +377,7 @@ setReplicationRole : SET REPLICATION ROLE TO ( MAIN | REPLICA ) showReplicationRole : SHOW REPLICATION ROLE ; -showReplicationCluster : SHOW REPLICATION CLUSTER ; +showInstances : SHOW INSTANCES ; instanceName : symbolicName ; @@ -382,6 +385,7 @@ socketAddress : literal ; coordinatorSocketAddress : literal ; replicationSocketAddress : literal ; +raftSocketAddress : literal ; registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC ) TO socketAddress ; @@ -390,6 +394,10 @@ registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSoc setInstanceToMain : SET INSTANCE instanceName TO MAIN ; +raftServerId : literal ; + +addCoordinatorInstance : ADD COORDINATOR raftServerId ON raftSocketAddress ; + dropReplica : DROP REPLICA instanceName ; showReplicas : SHOW REPLICAS ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index b0febc4af..b2d4de661 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -23,6 +23,7 @@ lexer grammar MemgraphCypherLexer ; import CypherLexer ; +ADD : A D D ; ACTIVE : A C T I V E ; AFTER : A F T E R ; ALTER : A L T E R ; @@ -39,7 +40,6 @@ BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ; CALL : C A L L ; CHECK : C H E C K ; CLEAR : C L E A R ; -CLUSTER : C L U S T E R ; COMMIT : C O M M I T ; COMMITTED : C O M M I T T E D ; CONFIG : C O N F I G ; @@ -80,6 +80,7 @@ INACTIVE : I N A C T I V E ; IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ; IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ; INSTANCE : I N S T A N C E ; +INSTANCES : I N S T A N C E S ; ISOLATION : I S O L A T I O N ; KAFKA : K A F K A ; LABELS : L A B E L S ; diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index bd6ab7971..17583153b 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -219,7 +219,8 @@ const trie::Trie kKeywords = {"union", "lock", "unlock", "build", - "instance"}; + "instance", + "coordinator"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index ea175a18e..2ddb8dd2a 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -508,6 +508,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } + auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &raft_socket_address) -> void override { + auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(raft_socket_address); + if (maybe_ip_and_port) { + auto const [ip, port] = *maybe_ip_and_port; + spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip); + coordinator_handler_.AddCoordinatorInstance(raft_server_id, port, ip); + } else { + spdlog::error("Invalid raft socket address {}.", raft_socket_address); + } + } + void SetInstanceToMain(const std::string &instance_name) override { auto status = coordinator_handler_.SetInstanceToMain(instance_name); switch (status) { @@ -526,7 +537,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - std::vector<coordination::CoordinatorInstanceStatus> ShowInstances() const override { + std::vector<coordination::InstanceStatus> ShowInstances() const override { return coordinator_handler_.ShowInstances(); } @@ -930,7 +941,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & switch (repl_query->action_) { case ReplicationQuery::Action::SET_REPLICATION_ROLE: { #ifdef MG_ENTERPRISE - if (FLAGS_coordinator) { + if (FLAGS_raft_server_id) { throw QueryRuntimeException("Coordinator can't set roles!"); } if (FLAGS_coordinator_server_port) { @@ -960,7 +971,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: { #ifdef MG_ENTERPRISE - if (FLAGS_coordinator) { + if (FLAGS_raft_server_id) { throw QueryRuntimeException("Coordinator doesn't have a replication role!"); } #endif @@ -1017,8 +1028,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICAS: { #ifdef MG_ENTERPRISE - if (FLAGS_coordinator) { - throw QueryRuntimeException("Coordinator cannot call SHOW REPLICAS! Use SHOW REPLICATION CLUSTER instead."); + if (FLAGS_raft_server_id) { + throw QueryRuntimeException("Coordinator cannot call SHOW REPLICAS! Use SHOW INSTANCES instead."); } #endif @@ -1079,6 +1090,37 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param const query::InterpreterConfig &config, std::vector<Notification> *notifications) { Callback callback; switch (coordinator_query->action_) { + case CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE: { + if (!license::global_license_checker.IsEnterpriseValidFast()) { + throw QueryException("Trying to use enterprise feature without a valid license."); + } + if constexpr (!coordination::allow_ha) { + throw QueryRuntimeException( + "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " + "be able to use this functionality."); + } + if (!FLAGS_raft_server_id) { + throw QueryRuntimeException("Only coordinator can add coordinator instance!"); + } + + // TODO: MemoryResource for EvaluationContext, it should probably be passed as + // the argument to Callback. + EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; + auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; + + auto raft_socket_address_tv = coordinator_query->raft_socket_address_->Accept(evaluator); + auto raft_server_id_tv = coordinator_query->raft_server_id_->Accept(evaluator); + callback.fn = [handler = CoordQueryHandler{*coordinator_state}, raft_socket_address_tv, + raft_server_id_tv]() mutable { + handler.AddCoordinatorInstance(raft_server_id_tv.ValueInt(), std::string(raft_socket_address_tv.ValueString())); + return std::vector<std::vector<TypedValue>>(); + }; + + notifications->emplace_back(SeverityLevel::INFO, NotificationCode::ADD_COORDINATOR_INSTANCE, + fmt::format("Coordinator has added instance {} on coordinator server {}.", + coordinator_query->instance_name_, raft_socket_address_tv.ValueString())); + return callback; + } case CoordinatorQuery::Action::REGISTER_INSTANCE: { if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); @@ -1089,7 +1131,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " "be able to use this functionality."); } - if (!FLAGS_coordinator) { + if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } // TODO: MemoryResource for EvaluationContext, it should probably be passed as @@ -1124,7 +1166,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " "be able to use this functionality."); } - if (!FLAGS_coordinator) { + if (!FLAGS_raft_server_id) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } // TODO: MemoryResource for EvaluationContext, it should probably be passed as @@ -1140,7 +1182,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return callback; } - case CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER: { + case CoordinatorQuery::Action::SHOW_INSTANCES: { if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); } @@ -1149,11 +1191,11 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " "be able to use this functionality."); } - if (!FLAGS_coordinator) { - throw QueryRuntimeException("Only coordinator can run SHOW REPLICATION CLUSTER."); + if (!FLAGS_raft_server_id) { + throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES."); } - callback.header = {"name", "socket_address", "alive", "role"}; + callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "alive", "role"}; callback.fn = [handler = CoordQueryHandler{*coordinator_state}, replica_nfields = callback.header.size()]() mutable { auto const instances = handler.ShowInstances(); @@ -1162,15 +1204,15 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param std::ranges::transform(instances, std::back_inserter(result), [](const auto &status) -> std::vector<TypedValue> { - return {TypedValue{status.instance_name}, TypedValue{status.socket_address}, - TypedValue{status.is_alive}, TypedValue{status.replication_role}}; + return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address}, + TypedValue{status.coord_socket_address}, TypedValue{status.is_alive}, + TypedValue{status.cluster_role}}; }); return result; }; return callback; } - return callback; } } #endif @@ -4175,7 +4217,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } #ifdef MG_ENTERPRISE - if (FLAGS_coordinator && !utils::Downcast<CoordinatorQuery>(parsed_query.query) && + if (FLAGS_raft_server_id && !utils::Downcast<CoordinatorQuery>(parsed_query.query) && !utils::Downcast<SettingQuery>(parsed_query.query)) { throw QueryRuntimeException("Coordinator can run only coordinator queries!"); } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index cf822d8b9..698c639fa 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -53,7 +53,7 @@ #include "utils/tsc.hpp" #ifdef MG_ENTERPRISE -#include "coordination/coordinator_instance_status.hpp" +#include "coordination/instance_status.hpp" #endif namespace memgraph::metrics { @@ -114,7 +114,11 @@ class CoordinatorQueryHandler { virtual void SetInstanceToMain(const std::string &instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual std::vector<coordination::CoordinatorInstanceStatus> ShowInstances() const = 0; + virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0; + + /// @throw QueryRuntimeException if an error ocurred. + virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &coordinator_socket_address) + -> void = 0; }; #endif diff --git a/src/query/metadata.cpp b/src/query/metadata.cpp index 56ef57431..59d65e077 100644 --- a/src/query/metadata.cpp +++ b/src/query/metadata.cpp @@ -69,6 +69,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) { #ifdef MG_ENTERPRISE case NotificationCode::REGISTER_COORDINATOR_SERVER: return "RegisterCoordinatorServer"sv; + case NotificationCode::ADD_COORDINATOR_INSTANCE: + return "AddCoordinatorInstance"sv; #endif case NotificationCode::REPLICA_PORT_WARNING: return "ReplicaPortWarning"sv; diff --git a/src/query/metadata.hpp b/src/query/metadata.hpp index 8e82ad1e3..2f357a555 100644 --- a/src/query/metadata.hpp +++ b/src/query/metadata.hpp @@ -44,6 +44,7 @@ enum class NotificationCode : uint8_t { REGISTER_REPLICA, #ifdef MG_ENTERPRISE REGISTER_COORDINATOR_SERVER, + ADD_COORDINATOR_INSTANCE, #endif SET_REPLICA, START_STREAM, diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 915a14d14..e0cdc082c 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -66,8 +66,9 @@ startup_config_dict = { "Time in seconds after which inactive Bolt sessions will be closed.", ), "cartesian_product_enabled": ("true", "true", "Enable cartesian product expansion."), - "coordinator": ("false", "false", "Controls whether the instance is a replication coordinator."), "coordinator_server_port": ("0", "0", "Port on which coordinator servers will be started."), + "raft_server_port": ("0", "0", "Port on which raft servers will be started."), + "raft_server_id": ("0", "0", "Unique ID of the raft server."), "data_directory": ("mg_data", "mg_data", "Path to directory in which to save all permanent data."), "data_recovery_on_startup": ( "false", diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt index f22e24f43..424ebd08f 100644 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ b/tests/e2e/high_availability_experimental/CMakeLists.txt @@ -2,6 +2,7 @@ find_package(gflags REQUIRED) copy_e2e_python_files(ha_experimental coordinator.py) copy_e2e_python_files(ha_experimental automatic_failover.py) +copy_e2e_python_files(ha_experimental distributed_coordinators.py) copy_e2e_python_files(ha_experimental manual_setting_replicas.py) copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py) copy_e2e_python_files(ha_experimental common.py) diff --git a/tests/e2e/high_availability_experimental/automatic_failover.py b/tests/e2e/high_availability_experimental/automatic_failover.py index 23b462f45..1148075a1 100644 --- a/tests/e2e/high_availability_experimental/automatic_failover.py +++ b/tests/e2e/high_availability_experimental/automatic_failover.py @@ -13,7 +13,6 @@ import os import shutil import sys import tempfile -import time import interactive_mg_runner import pytest @@ -70,7 +69,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { "setup_queries": [], }, "coordinator": { - "args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"], + "args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"], "log_file": "coordinator.log", "setup_queries": [ "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", @@ -111,12 +110,13 @@ def test_replication_works_on_failover(): coord_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_repl_cluster(): - return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) expected_data_on_coord = [ - ("instance_1", "127.0.0.1:10011", True, "main"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", False, "unknown"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), ] mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) @@ -132,7 +132,6 @@ def test_replication_works_on_failover(): mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") - expected_data_on_new_main = [ ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"), @@ -143,13 +142,13 @@ def test_replication_works_on_failover(): execute_and_fetch_all(new_main_cursor, "CREATE ();") # 6 - alive_replica_cursor = connect(host="localhost", port=7689).cursor() - res = execute_and_fetch_all(alive_replica_cursor, "MATCH (n) RETURN count(n) as count;")[0][0] + alive_replica_cursror = connect(host="localhost", port=7689).cursor() + res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0] assert res == 1, "Vertex should be replicated" interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION) -def test_show_replication_cluster(): +def test_show_instances(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) @@ -159,12 +158,13 @@ def test_show_replication_cluster(): coord_cursor = connect(host="localhost", port=7690).cursor() def show_repl_cluster(): - return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) expected_data = [ - ("instance_1", "127.0.0.1:10011", True, "replica"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data, show_repl_cluster) @@ -184,18 +184,20 @@ def test_show_replication_cluster(): interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") expected_data = [ - ("instance_1", "127.0.0.1:10011", False, "unknown"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data, show_repl_cluster) interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") expected_data = [ - ("instance_1", "127.0.0.1:10011", False, "unknown"), - ("instance_2", "127.0.0.1:10012", False, "unknown"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data, show_repl_cluster) @@ -217,12 +219,13 @@ def test_simple_automatic_failover(): coord_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_repl_cluster(): - return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) expected_data_on_coord = [ - ("instance_1", "127.0.0.1:10011", True, "main"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", False, "unknown"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), ] mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) @@ -280,21 +283,23 @@ def test_replica_instance_restarts(): cursor = connect(host="localhost", port=7690).cursor() def show_repl_cluster(): - return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;"))) expected_data_up = [ - ("instance_1", "127.0.0.1:10011", True, "replica"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data_up, show_repl_cluster) interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") expected_data_down = [ - ("instance_1", "127.0.0.1:10011", False, "unknown"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data_down, show_repl_cluster) @@ -320,19 +325,21 @@ def test_automatic_failover_main_back_as_replica(): coord_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_repl_cluster(): - return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) expected_data_after_failover = [ - ("instance_1", "127.0.0.1:10011", True, "main"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", False, "unknown"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), ] mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster) expected_data_after_main_coming_back = [ - ("instance_1", "127.0.0.1:10011", True, "main"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "replica"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "replica"), ] interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") @@ -346,60 +353,68 @@ def test_automatic_failover_main_back_as_replica(): mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3) -def test_replica_instance_restarts_replication_works(): +def test_automatic_failover_main_back_as_main(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - cursor = connect(host="localhost", port=7690).cursor() - - def show_repl_cluster(): - return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) - - expected_data_up = [ - ("instance_1", "127.0.0.1:10011", True, "replica"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), - ] - mg_sleep_and_assert(expected_data_up, show_repl_cluster) - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") - expected_data_down = [ - ("instance_1", "127.0.0.1:10011", False, "unknown"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_all_down = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), ] - mg_sleep_and_assert(expected_data_down, show_repl_cluster) + + mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_main_back = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster) + + instance3_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_repl_role_instance3(): + return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;"))) + + mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3) interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") - mg_sleep_and_assert(expected_data_up, show_repl_cluster) - - expected_data_on_main_show_replicas = [ - ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), - ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + expected_data_replicas_back = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] - instance3_cursor = connect(host="localhost", port=7687).cursor() + + mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster) + instance1_cursor = connect(host="localhost", port=7688).cursor() - - def retrieve_data_show_repl_role_instance1(): - return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICAS;"))) - - mg_sleep_and_assert(expected_data_on_main_show_replicas, retrieve_data_show_repl_role_instance1) + instance2_cursor = connect(host="localhost", port=7689).cursor() def retrieve_data_show_repl_role_instance1(): return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;"))) - expected_data_replica = [("replica",)] - mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1) + def retrieve_data_show_repl_role_instance2(): + return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;"))) - execute_and_fetch_all(instance3_cursor, "CREATE ();") - - def retrieve_data_replica(): - return execute_and_fetch_all(instance1_cursor, "MATCH (n) RETURN count(n);")[0][0] - - expected_data_replica = 1 - mg_sleep_and_assert(expected_data_replica, retrieve_data_replica) + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1) + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2) + mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3) if __name__ == "__main__": diff --git a/tests/e2e/high_availability_experimental/coordinator.py b/tests/e2e/high_availability_experimental/coordinator.py index 9e34a4167..4330c2194 100644 --- a/tests/e2e/high_availability_experimental/coordinator.py +++ b/tests/e2e/high_availability_experimental/coordinator.py @@ -37,16 +37,17 @@ def test_coordinator_cannot_run_show_repl_role(): assert str(e.value) == "Coordinator can run only coordinator queries!" -def test_coordinator_show_replication_cluster(): +def test_coordinator_show_instances(): cursor = connect(host="localhost", port=7690).cursor() def retrieve_data(): - return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) + return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;"))) expected_data = [ - ("instance_1", "127.0.0.1:10011", True, "replica"), - ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", True, "main"), + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), ] mg_sleep_and_assert(expected_data, retrieve_data) @@ -65,8 +66,8 @@ def test_coordinator_cannot_call_show_replicas(): def test_main_and_replicas_cannot_call_show_repl_cluster(port): cursor = connect(host="localhost", port=port).cursor() with pytest.raises(Exception) as e: - execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;") - assert str(e.value) == "Only coordinator can run SHOW REPLICATION CLUSTER." + execute_and_fetch_all(cursor, "SHOW INSTANCES;") + assert str(e.value) == "Only coordinator can run SHOW INSTANCES." @pytest.mark.parametrize( diff --git a/tests/e2e/high_availability_experimental/distributed_coordinators.py b/tests/e2e/high_availability_experimental/distributed_coordinators.py new file mode 100644 index 000000000..8a9ebf3c2 --- /dev/null +++ b/tests/e2e/high_availability_experimental/distributed_coordinators.py @@ -0,0 +1,145 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import os +import shutil +import sys +import tempfile + +import interactive_mg_runner +import pytest +from common import connect, execute_and_fetch_all, safe_execute +from mg_utils import mg_sleep_and_assert + +interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +interactive_mg_runner.PROJECT_DIR = os.path.normpath( + os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") +) +interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) +interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) + +TEMP_DIR = tempfile.TemporaryDirectory().name + +MEMGRAPH_INSTANCES_DESCRIPTION = { + "coordinator1": { + "args": [ + "--bolt-port", + "7687", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator2": { + "args": [ + "--bolt-port", + "7688", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator3": { + "args": [ + "--bolt-port", + "7689", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "setup_queries": [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + ], + }, +} + + +def test_coordinators_communication(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7689).cursor() + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + expected_cluster = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ] + mg_sleep_and_assert(expected_cluster, check_coordinator3) + + coordinator1_cursor = connect(host="localhost", port=7687).cursor() + + def check_coordinator1(): + return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster, check_coordinator1) + + coordinator2_cursor = connect(host="localhost", port=7688).cursor() + + def check_coordinator2(): + return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster, check_coordinator2) + + +def test_coordinators_communication_with_restarts(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + expected_cluster = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ] + + coordinator1_cursor = connect(host="localhost", port=7687).cursor() + + def check_coordinator1(): + return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster, check_coordinator1) + + coordinator2_cursor = connect(host="localhost", port=7688).cursor() + + def check_coordinator2(): + return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES"))) + + mg_sleep_and_assert(expected_cluster, check_coordinator2) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") + coordinator1_cursor = connect(host="localhost", port=7687).cursor() + + mg_sleep_and_assert(expected_cluster, check_coordinator1) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2") + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2") + coordinator1_cursor = connect(host="localhost", port=7687).cursor() + coordinator2_cursor = connect(host="localhost", port=7688).cursor() + + mg_sleep_and_assert(expected_cluster, check_coordinator1) + mg_sleep_and_assert(expected_cluster, check_coordinator2) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/workloads.yaml b/tests/e2e/high_availability_experimental/workloads.yaml index 8b617dfb5..23fa3a5db 100644 --- a/tests/e2e/high_availability_experimental/workloads.yaml +++ b/tests/e2e/high_availability_experimental/workloads.yaml @@ -13,7 +13,7 @@ ha_cluster: &ha_cluster log_file: "replication-e2e-main.log" setup_queries: [] coordinator: - args: ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"] + args: ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"] log_file: "replication-e2e-coordinator.log" setup_queries: [ "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", @@ -36,6 +36,10 @@ workloads: binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/manual_setting_replicas.py"] + - name: "Distributed coordinators" + binary: "tests/e2e/pytest_runner.sh" + args: ["high_availability_experimental/distributed_coordinators.py"] + - name: "Not replicate from old main" binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/not_replicate_from_old_main.py"] diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 1353a56dd..63cca3aa4 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2632,6 +2632,19 @@ TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) { ReplicationQuery::SyncMode::SYNC); } +#ifdef MG_ENTERPRISE +TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) { + auto &ast_generator = *GetParam(); + + std::string const correct_query = R"(ADD COORDINATOR 1 ON "127.0.0.1:10111")"; + auto *parsed_query = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query)); + + EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE); + ast_generator.CheckLiteral(parsed_query->raft_socket_address_, TypedValue("127.0.0.1:10111")); + ast_generator.CheckLiteral(parsed_query->raft_server_id_, TypedValue(1)); +} +#endif + TEST_P(CypherMainVisitorTest, TestDeleteReplica) { auto &ast_generator = *GetParam();