HA: Organize Raft coordinator group (#1687)

This commit is contained in:
Andi 2024-02-08 10:11:33 +01:00 committed by GitHub
parent 2fa8e00124
commit cf80687d1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1581 additions and 316 deletions

View File

@ -6,6 +6,7 @@ Checks: '*,
-altera-unroll-loops,
-android-*,
-cert-err58-cpp,
-cppcoreguidelines-avoid-do-while,
-cppcoreguidelines-avoid-c-arrays,
-cppcoreguidelines-avoid-goto,
-cppcoreguidelines-avoid-magic-numbers,
@ -60,6 +61,7 @@ Checks: '*,
-readability-implicit-bool-conversion,
-readability-magic-numbers,
-readability-named-parameter,
-readability-identifier-length,
-misc-no-recursion,
-concurrency-mt-unsafe,
-bugprone-easily-swappable-parameters'

View File

@ -275,7 +275,7 @@ option(MG_EXPERIMENTAL_HIGH_AVAILABILITY "Feature flag for experimental high ava
if (NOT MG_ENTERPRISE AND MG_EXPERIMENTAL_HIGH_AVAILABILITY)
set(MG_EXPERIMENTAL_HIGH_AVAILABILITY OFF)
message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY must be used with enterpise version of the code.")
message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY can only be used with enterpise version of the code.")
endif ()
if (MG_EXPERIMENTAL_HIGH_AVAILABILITY)

View File

@ -8,12 +8,18 @@ target_sources(mg-coordination
include/coordination/coordinator_server.hpp
include/coordination/coordinator_config.hpp
include/coordination/coordinator_exceptions.hpp
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/coordinator_cluster_config.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/coordinator_instance.hpp
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp
include/nuraft/coordinator_state_manager.hpp
PRIVATE
coordinator_client.cpp
@ -23,6 +29,11 @@ target_sources(mg-coordination
coordinator_data.cpp
coordinator_instance.cpp
coordinator_handlers.cpp
replication_instance.cpp
coordinator_log_store.cpp
coordinator_state_machine.cpp
coordinator_state_manager.cpp
)
target_include_directories(mg-coordination PUBLIC include)

View File

@ -9,27 +9,29 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/coordinator_instance.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/uuid.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
#include "libnuraft/nuraft.hxx"
namespace memgraph::coordination {
CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
auto instance = std::ranges::find_if(
coord_data->registered_instances_,
[instance_name](CoordinatorInstance const &instance) { return instance.InstanceName() == instance_name; });
using nuraft::ptr;
using nuraft::srv_config;
MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!",
instance_name);
CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_data->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name);
return *instance;
};
@ -70,6 +72,11 @@ CoordinatorData::CoordinatorData() {
auto &instance = find_instance(coord_data, instance_name);
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (main_uuid_ == instance_uuid.value()) {
@ -110,48 +117,40 @@ CoordinatorData::CoordinatorData() {
}
auto CoordinatorData::TryFailover() -> void {
std::vector<CoordinatorInstance *> alive_registered_replica_instances{};
std::ranges::transform(registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica) |
ranges::views::filter(&CoordinatorInstance::IsAlive),
std::back_inserter(alive_registered_replica_instances),
[](CoordinatorInstance &instance) { return &instance; });
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
// TODO(antoniof) more complex logic of choosing replica instance
CoordinatorInstance *chosen_replica_instance =
!alive_registered_replica_instances.empty() ? alive_registered_replica_instances[0] : nullptr;
if (nullptr == chosen_replica_instance) {
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto chosen_replica_instance = ranges::begin(alive_replicas);
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
utils::UUID potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
auto const potential_new_main_uuid = utils::UUID{};
auto not_chosen_instance = [chosen_replica_instance](auto *instance) {
return *instance != *chosen_replica_instance;
auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) {
return instance != *chosen_replica_instance;
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto *other_replica_instance : alive_registered_replica_instances | ranges::views::filter(not_chosen_instance)) {
if (!other_replica_instance->SendSwapAndUpdateUUID(potential_new_main_uuid)) {
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance->InstanceName()));
other_replica_instance.InstanceName()));
return;
}
}
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
std::ranges::transform(registered_instances_ | ranges::views::filter([chosen_replica_instance](const auto &instance) {
return *chosen_replica_instance != instance;
}),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_,
main_fail_cb_)) {
@ -164,41 +163,53 @@ auto CoordinatorData::TryFailover() -> void {
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> {
std::vector<CoordinatorInstanceStatus> instances_status;
instances_status.reserve(registered_instances_.size());
auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
auto const stringify_repl_role = [](CoordinatorInstance const &instance) -> std::string {
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const instance_to_status =
[&stringify_repl_role](CoordinatorInstance const &instance) -> CoordinatorInstanceStatus {
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.socket_address = instance.SocketAddress(),
.replication_role = stringify_repl_role(instance),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_data_lock_};
std::ranges::transform(registered_instances_, std::back_inserter(instances_status), instance_to_status);
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
auto const is_new_main = [&instance_name](CoordinatorInstance const &instance) {
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(registered_instances_, is_new_main);
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == registered_instances_.end()) {
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
@ -208,16 +219,16 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](CoordinatorInstance const &instance) {
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
for (auto &other_instance : registered_instances_ | ranges::views::filter(is_not_new_main)) {
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
@ -225,9 +236,9 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
}
}
std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main),
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
@ -241,20 +252,20 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
try {
registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
@ -262,5 +273,10 @@ auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> Regist
}
}
auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::coordination
#endif

View File

@ -13,83 +13,85 @@
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
namespace memgraph::coordination {
CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
using nuraft::asio_service;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
auto CoordinatorInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
CoordinatorInstance::CoordinatorInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
auto CoordinatorInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
// ASIO options
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; }
// RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms.
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
auto CoordinatorInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto CoordinatorInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
raft_server_ =
launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params);
auto CoordinatorInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) {
return false;
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
}
auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
if (!client_.DemoteToReplica()) {
return false;
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
return true;
}
auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
auto CoordinatorInstance::GetClient() -> CoordinatorClient & { return client_; }
void CoordinatorInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; }
auto CoordinatorInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
auto CoordinatorInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) {
return false;
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
SetNewMainUUID(main_uuid_);
return true;
spdlog::info("Raft server started on {}", raft_endpoint);
}
auto CoordinatorInstance::InstanceName() const -> std::string {
return "coordinator_" + std::to_string(raft_server_id_);
}
auto CoordinatorInstance::RaftSocketAddress() const -> std::string {
return raft_address_ + ":" + std::to_string(raft_port_);
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
}
auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
} // namespace memgraph::coordination

View File

@ -0,0 +1,331 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_log_store.hpp"
namespace memgraph::coordination {
using nuraft::cs_new;
using nuraft::timer_helper;
CoordinatorLogStore::CoordinatorLogStore()
: start_idx_(1),
raft_server_bwd_pointer_(nullptr),
disk_emul_delay(0),
disk_emul_thread_(nullptr),
disk_emul_thread_stop_signal_(false),
disk_emul_last_durable_index_(0) {
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
logs_[0] = cs_new<log_entry>(0, buf);
}
CoordinatorLogStore::~CoordinatorLogStore() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
// disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}
ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) {
// NOTE:
// Timestamp is used only when `replicate_log_timestamp_` option is on.
// Otherwise, log store does not need to store or load it.
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
return clone;
}
ulong CoordinatorLogStore::next_slot() const {
std::lock_guard<std::mutex> l(logs_lock_);
// Exclude the dummy entry.
return start_idx_ + logs_.size() - 1;
}
ulong CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
ulong next_idx = next_slot();
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(next_idx - 1);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
return MakeClone(entry->second);
}
ulong CoordinatorLogStore::append(ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
// disk_emul_ea_.invoke();
}
return idx;
}
void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
// Discard all logs equal to or greater than `index.
std::lock_guard<std::mutex> l(logs_lock_);
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;
// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
}
// disk_emul_ea_.invoke();
}
}
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
ret->resize(end - start);
ulong cc = 0;
for (ulong ii = start; ii < end; ++ii) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
(*ret)[cc++] = MakeClone(src);
}
return ret;
}
// NOLINTNEXTLINE(google-default-arguments)
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end,
int64 batch_size_hint_in_bytes) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
if (batch_size_hint_in_bytes < 0) {
return ret;
}
size_t accum_size = 0;
for (ulong ii = start; ii < end; ++ii) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
ret->push_back(MakeClone(src));
accum_size += src->get_buf().size();
if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break;
}
return ret;
}
ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
src = entry->second;
}
return MakeClone(src);
}
ulong CoordinatorLogStore::term_at(ulong index) {
ulong term = 0;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
term = entry->second->get_term();
}
return term;
}
ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) {
std::vector<ptr<buffer>> logs;
size_t size_total = 0;
for (ulong ii = index; ii < index + cnt; ++ii) {
ptr<log_entry> le = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
le = logs_[ii];
}
assert(le.get());
ptr<buffer> buf = le->serialize();
size_total += buf->size();
logs.push_back(buf);
}
ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
buf_out->pos(0);
buf_out->put((int32)cnt);
for (auto &entry : logs) {
ptr<buffer> &bb = entry;
buf_out->put((int32)bb->size());
buf_out->put(*bb);
}
return buf_out;
}
void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
pack.pos(0);
int32 num_logs = pack.get_int();
for (int32 ii = 0; ii < num_logs; ++ii) {
ulong cur_idx = index + ii;
int32 buf_size = pack.get_int();
ptr<buffer> buf_local = buffer::alloc(buf_size);
pack.get(buf_local);
ptr<log_entry> le = log_entry::deserialize(*buf_local);
{
std::lock_guard<std::mutex> l(logs_lock_);
logs_[cur_idx] = le;
}
}
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.upper_bound(0);
if (entry != logs_.end()) {
start_idx_ = entry->first;
} else {
start_idx_ = 1;
}
}
}
bool CoordinatorLogStore::compact(ulong last_log_index) {
std::lock_guard<std::mutex> l(logs_lock_);
for (ulong ii = start_idx_; ii <= last_log_index; ++ii) {
auto entry = logs_.find(ii);
if (entry != logs_.end()) {
logs_.erase(entry);
}
}
// WARNING:
// Even though nothing has been erased,
// we should set `start_idx_` to new index.
if (start_idx_ <= last_log_index) {
start_idx_ = last_log_index + 1;
}
return true;
}
bool CoordinatorLogStore::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}
ulong CoordinatorLogStore::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}
return disk_emul_last_durable_index_;
}
void CoordinatorLogStore::DiskEmulLoop() {
// This thread mimics async disk writes.
// uint32_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
// disk_emul_ea_.wait_us(next_sleep_us);
// disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;
uint64_t cur_time = timer_helper::get_timeofday_us();
// next_sleep_us = 100 * 1000;
bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}
entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
// next_sleep_us = entry->first - cur_time;
}
}
if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}
void CoordinatorLogStore::Close() {}
void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;
if (!disk_emul_thread_) {
disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this);
}
}
} // namespace memgraph::coordination
#endif

View File

@ -25,7 +25,7 @@
namespace memgraph::coordination {
CoordinatorState::CoordinatorState() {
MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port),
MG_ASSERT(!(FLAGS_raft_server_id && FLAGS_coordinator_server_port),
"Instance cannot be a coordinator and have registered coordinator server.");
spdlog::info("Executing coordinator constructor");
@ -68,7 +68,7 @@ auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstan
data_);
}
auto CoordinatorState::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> {
auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Can't call show instances on data_, as variant holds wrong alternative");
return std::get<CoordinatorData>(data_).ShowInstances();
@ -79,5 +79,13 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
"Cannot get coordinator server since variant holds wrong alternative");
return *std::get<CoordinatorMainReplicaData>(data_).coordinator_server_;
}
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,98 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_state_machine.hpp"
namespace memgraph::coordination {
auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("pre_commit {} : {}", log_idx, str);
return nullptr;
}
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("commit {} : {}", log_idx, str);
last_committed_idx_ = log_idx;
return nullptr;
}
auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void {
last_committed_idx_ = log_idx;
}
auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void {
buffer_serializer bs(data);
std::string str = bs.get_str();
spdlog::info("rollback {} : {}", log_idx, str);
}
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/,
ptr<buffer> &data_out, bool &is_last_obj) -> int {
// Put dummy data.
data_out = buffer::alloc(sizeof(int32));
buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = true;
return 0;
}
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/,
bool /*is_last_obj*/) -> void {
spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id);
// Request next object.
obj_id++;
}
auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool {
spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
{
auto lock = std::lock_guard{last_snapshot_lock_};
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
return true;
}
auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {}
auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> {
auto lock = std::lock_guard{last_snapshot_lock_};
return last_snapshot_;
}
auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; }
auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void {
spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term());
// Clone snapshot from `s`.
{
auto lock = std::lock_guard{last_snapshot_lock_};
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
ptr<std::exception> except(nullptr);
bool ret = true;
when_done(ret, except);
}
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,68 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_state_manager.hpp"
namespace memgraph::coordination {
using nuraft::cluster_config;
using nuraft::cs_new;
using nuraft::srv_config;
using nuraft::srv_state;
using nuraft::state_mgr;
CoordinatorStateManager::CoordinatorStateManager(int srv_id, std::string const &endpoint)
: my_id_(srv_id), my_endpoint_(endpoint), cur_log_store_(cs_new<CoordinatorLogStore>()) {
my_srv_config_ = cs_new<srv_config>(srv_id, endpoint);
// Initial cluster config: contains only one server (myself).
cluster_config_ = cs_new<cluster_config>();
cluster_config_->get_servers().push_back(my_srv_config_);
}
auto CoordinatorStateManager::load_config() -> ptr<cluster_config> {
// Just return in-memory data in this example.
// May require reading from disk here, if it has been written to disk.
return cluster_config_;
}
auto CoordinatorStateManager::save_config(cluster_config const &config) -> void {
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
ptr<buffer> buf = config.serialize();
cluster_config_ = cluster_config::deserialize(*buf);
}
auto CoordinatorStateManager::save_state(srv_state const &state) -> void {
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
ptr<buffer> buf = state.serialize();
saved_state_ = srv_state::deserialize(*buf);
}
auto CoordinatorStateManager::read_state() -> ptr<srv_state> {
// Just return in-memory data in this example.
// May require reading from disk here, if it has been written to disk.
return saved_state_;
}
auto CoordinatorStateManager::load_log_store() -> ptr<log_store> { return cur_log_store_; }
auto CoordinatorStateManager::server_id() -> int32 { return my_id_; }
auto CoordinatorStateManager::system_exit(int const exit_code) -> void {}
auto CoordinatorStateManager::GetSrvConfig() const -> ptr<srv_config> { return my_srv_config_; }
} // namespace memgraph::coordination
#endif

View File

@ -49,12 +49,10 @@ class CoordinatorClient {
auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const
-> bool;
auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto RpcClient() -> rpc::Client & { return rpc_client_; }

View File

@ -11,36 +11,45 @@
#pragma once
#include "utils/uuid.hpp"
#ifdef MG_ENTERPRISE
#include <list>
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include "utils/uuid.hpp"
#include <list>
namespace memgraph::coordination {
class CoordinatorData {
public:
CoordinatorData();
// TODO: (andi) Probably rename to RegisterReplicationInstance
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
private:
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<CoordinatorInstance> registered_instances_;
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
CoordinatorInstance self_;
utils::UUID main_uuid_;
};

View File

@ -28,5 +28,27 @@ class CoordinatorRegisterInstanceException final : public utils::BasicException
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException)
};
class RaftServerStartException final : public utils::BasicException {
public:
explicit RaftServerStartException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftServerStartException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftServerStartException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftServerStartException)
};
class RaftAddServerException final : public utils::BasicException {
public:
explicit RaftAddServerException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftAddServerException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftAddServerException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException)
};
} // namespace memgraph::coordination
#endif

View File

@ -31,8 +31,8 @@ class CoordinatorHandlers {
slk::Builder *res_builder);
static void DemoteMainToReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder);
static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -13,70 +13,45 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "replication_coordination_glue/role.hpp"
#include <flags/replication.hpp>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
class CoordinatorData;
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
class CoordinatorInstance {
public:
CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
CoordinatorInstance();
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete;
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PromoteToMain(utils::UUID main_uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto GetClient() -> CoordinatorClient &;
void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt);
auto GetMainUUID() -> const std::optional<utils::UUID> &;
auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};
// TODO this needs to be atomic? What if instance is alive and then we read it and it has changed
bool is_alive_{false};
// for replica this is main uuid of current main
// for "main" main this same as in CoordinatorData
// it is set to nullopt when replica is down
// TLDR; when replica is down and comes back up we reset uuid of main replica is listening to
// so we need to send swap uuid again
std::optional<utils::UUID> main_uuid_;
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
}
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
};
} // namespace memgraph::coordination

View File

@ -14,8 +14,8 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include <variant>
@ -37,7 +37,9 @@ class CoordinatorState {
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
// The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;

View File

@ -19,10 +19,13 @@
namespace memgraph::coordination {
struct CoordinatorInstanceStatus {
// TODO: (andi) For phase IV. Some instances won't have raft_socket_address, coord_socket_address, replication_role and
// cluster role... At the end, all instances will have everything.
struct InstanceStatus {
std::string instance_name;
std::string socket_address;
std::string replication_role;
std::string raft_socket_address;
std::string coord_socket_address;
std::string cluster_role;
bool is_alive;
};

View File

@ -0,0 +1,84 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
#include <libnuraft/nuraft.hxx>
#include "utils/uuid.hpp"
namespace memgraph::coordination {
class CoordinatorData;
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
ReplicationInstance(ReplicationInstance &&other) noexcept = delete;
ReplicationInstance &operator=(ReplicationInstance &&other) noexcept = delete;
~ReplicationInstance() = default;
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool;
auto GetClient() -> CoordinatorClient &;
void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt);
auto GetMainUUID() -> const std::optional<utils::UUID> &;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};
bool is_alive_{false};
// for replica this is main uuid of current main
// for "main" main this same as in CoordinatorData
// it is set to nullopt when replica is down
// TLDR; when replica is down and comes back up we reset uuid of main replica is listening to
// so we need to send swap uuid again
std::optional<utils::UUID> main_uuid_;
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
}
};
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,128 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using nuraft::buffer;
using nuraft::int32;
using nuraft::int64;
using nuraft::log_entry;
using nuraft::log_store;
using nuraft::ptr;
using nuraft::raft_server;
class CoordinatorLogStore : public log_store {
public:
CoordinatorLogStore();
CoordinatorLogStore(CoordinatorLogStore const &) = delete;
CoordinatorLogStore &operator=(CoordinatorLogStore const &) = delete;
CoordinatorLogStore(CoordinatorLogStore &&) = delete;
CoordinatorLogStore &operator=(CoordinatorLogStore &&) = delete;
~CoordinatorLogStore() override;
ulong next_slot() const override;
ulong start_index() const override;
ptr<log_entry> last_entry() const override;
ulong append(ptr<log_entry> &entry) override;
void write_at(ulong index, ptr<log_entry> &entry) override;
ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override;
// NOLINTNEXTLINE
ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override;
ptr<log_entry> entry_at(ulong index) override;
ulong term_at(ulong index) override;
ptr<buffer> pack(ulong index, int32 cnt) override;
void apply_pack(ulong index, buffer &pack) override;
bool compact(ulong last_log_index) override;
bool flush() override;
ulong last_durable_index() override;
void Close();
void SetDiskDelay(raft_server *raft, size_t delay_ms);
private:
static ptr<log_entry> MakeClone(ptr<log_entry> const &entry);
void DiskEmulLoop();
/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;
/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;
/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;
/**
* Backward pointer to Raft server.
*/
raft_server *raft_server_bwd_pointer_;
// Testing purpose --------------- BEGIN
/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;
/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;
/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;
/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;
/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;
// Testing purpose --------------- END
};
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,72 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <spdlog/spdlog.h>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using nuraft::async_result;
using nuraft::buffer;
using nuraft::buffer_serializer;
using nuraft::cluster_config;
using nuraft::int32;
using nuraft::ptr;
using nuraft::snapshot;
using nuraft::state_machine;
class CoordinatorStateMachine : public state_machine {
public:
CoordinatorStateMachine() = default;
CoordinatorStateMachine(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
auto commit_config(ulong log_idx, ptr<cluster_config> & /*new_conf*/) -> void override;
auto rollback(ulong log_idx, buffer &data) -> void override;
auto read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/, ptr<buffer> &data_out,
bool &is_last_obj) -> int override;
auto save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/, bool /*is_last_obj*/)
-> void override;
auto apply_snapshot(snapshot &s) -> bool override;
auto free_user_snp_ctx(void *&user_snp_ctx) -> void override;
auto last_snapshot() -> ptr<snapshot> override;
auto last_commit_index() -> ulong override;
auto create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void override;
private:
std::atomic<uint64_t> last_committed_idx_{0};
ptr<snapshot> last_snapshot_;
std::mutex last_snapshot_lock_;
};
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,66 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_log_store.hpp"
#include <spdlog/spdlog.h>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using nuraft::cluster_config;
using nuraft::cs_new;
using nuraft::srv_config;
using nuraft::srv_state;
using nuraft::state_mgr;
class CoordinatorStateManager : public state_mgr {
public:
explicit CoordinatorStateManager(int srv_id, std::string const &endpoint);
CoordinatorStateManager(CoordinatorStateManager const &) = delete;
CoordinatorStateManager &operator=(CoordinatorStateManager const &) = delete;
CoordinatorStateManager(CoordinatorStateManager &&) = delete;
CoordinatorStateManager &operator=(CoordinatorStateManager &&) = delete;
~CoordinatorStateManager() override = default;
auto load_config() -> ptr<cluster_config> override;
auto save_config(cluster_config const &config) -> void override;
auto save_state(srv_state const &state) -> void override;
auto read_state() -> ptr<srv_state> override;
auto load_log_store() -> ptr<log_store> override;
auto server_id() -> int32 override;
auto system_exit(int exit_code) -> void override;
auto GetSrvConfig() const -> ptr<srv_config>;
private:
int my_id_;
std::string my_endpoint_;
ptr<CoordinatorLogStore> cur_log_store_;
ptr<srv_config> my_srv_config_;
ptr<cluster_config> cluster_config_;
ptr<srv_state> saved_state_;
};
} // namespace memgraph::coordination
#endif

View File

@ -0,0 +1,98 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
auto ReplicationInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto ReplicationInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto ReplicationInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto ReplicationInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
}
auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
return true;
}
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
void ReplicationInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; }
auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) {
return false;
}
SetNewMainUUID(main_uuid_);
return true;
}
} // namespace memgraph::coordination
#endif

View File

@ -9,12 +9,11 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/register_main_replica_coordinator_status.hpp"
#ifdef MG_ENTERPRISE
#include "dbms/coordinator_handler.hpp"
#include "dbms/dbms_handler.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
namespace memgraph::dbms {
@ -31,9 +30,15 @@ auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
return coordinator_state_.SetInstanceToMain(std::move(instance_name));
}
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::CoordinatorInstanceStatus> {
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> {
return coordinator_state_.ShowInstances();
}
auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::dbms
#endif

View File

@ -14,8 +14,8 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_state.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include <vector>
@ -33,7 +33,9 @@ class CoordinatorHandler {
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::CoordinatorInstanceStatus>;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
private:
coordination::CoordinatorState &coordinator_state_;

View File

@ -13,9 +13,11 @@
#ifdef MG_ENTERPRISE
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(coordinator, false, "Controls whether the instance is a replication coordinator.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(coordinator_server_port, 0, "Port on which coordinator servers will be started.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(raft_server_port, 0, "Port on which raft servers will be started.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(raft_server_id, 0, "Unique ID of the raft server.");
#endif
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -15,9 +15,11 @@
#ifdef MG_ENTERPRISE
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(coordinator);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(coordinator_server_port);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(raft_server_port);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(raft_server_id);
#endif
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -39,7 +39,7 @@ Endpoint::IpFamily Endpoint::GetIpFamily(const std::string &address) {
}
std::optional<std::pair<std::string, uint16_t>> Endpoint::ParseSocketOrIpAddress(
const std::string &address, const std::optional<uint16_t> default_port = {}) {
const std::string &address, const std::optional<uint16_t> default_port) {
/// expected address format:
/// - "ip_address:port_number"
/// - "ip_address"

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -61,8 +61,8 @@ struct Endpoint {
* it into an ip address and a port number; even if a default port is given,
* it won't be used, as we expect that it is given in the address string.
*/
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress(const std::string &address,
std::optional<uint16_t> default_port);
static std::optional<std::pair<std::string, uint16_t>> ParseSocketOrIpAddress(
const std::string &address, std::optional<uint16_t> default_port = {});
/**
* Tries to parse given string as either socket address or hostname.

View File

@ -3071,11 +3071,7 @@ class CoordinatorQuery : public memgraph::query::Query {
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Action {
REGISTER_INSTANCE,
SET_INSTANCE_TO_MAIN,
SHOW_REPLICATION_CLUSTER,
};
enum class Action { REGISTER_INSTANCE, SET_INSTANCE_TO_MAIN, SHOW_INSTANCES, ADD_COORDINATOR_INSTANCE };
enum class SyncMode { SYNC, ASYNC };
@ -3087,6 +3083,8 @@ class CoordinatorQuery : public memgraph::query::Query {
std::string instance_name_;
memgraph::query::Expression *replication_socket_address_{nullptr};
memgraph::query::Expression *coordinator_socket_address_{nullptr};
memgraph::query::Expression *raft_socket_address_{nullptr};
memgraph::query::Expression *raft_server_id_{nullptr};
memgraph::query::CoordinatorQuery::SyncMode sync_mode_;
CoordinatorQuery *Clone(AstStorage *storage) const override {
@ -3098,6 +3096,8 @@ class CoordinatorQuery : public memgraph::query::Query {
object->sync_mode_ = sync_mode_;
object->coordinator_socket_address_ =
coordinator_socket_address_ ? coordinator_socket_address_->Clone(storage) : nullptr;
object->raft_socket_address_ = raft_socket_address_ ? raft_socket_address_->Clone(storage) : nullptr;
object->raft_server_id_ = raft_server_id_ ? raft_server_id_->Clone(storage) : nullptr;
return object;
}

View File

@ -374,7 +374,6 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe
return replication_query;
}
// License check is done in the interpreter.
antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator(
MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
@ -400,10 +399,28 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator(
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext * /*ctx*/) {
antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER;
if (!ctx->raftSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Raft socket address should be a string literal!");
}
if (!ctx->raftServerId()->literal()->numberLiteral()) {
throw SemanticException("Raft server id should be a number literal!");
}
coordinator_query->action_ = CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE;
coordinator_query->raft_socket_address_ = std::any_cast<Expression *>(ctx->raftSocketAddress()->accept(this));
coordinator_query->raft_server_id_ = std::any_cast<Expression *>(ctx->raftServerId()->accept(this));
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitShowInstances(MemgraphCypher::ShowInstancesContext * /*ctx*/) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::SHOW_INSTANCES;
return coordinator_query;
}

View File

@ -251,7 +251,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext *ctx) override;
antlrcpp::Any visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) override;
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitShowInstances(MemgraphCypher::ShowInstancesContext *ctx) override;
/**
* @return LockPathQuery*

View File

@ -20,6 +20,7 @@ options { tokenVocab=MemgraphCypherLexer; }
import Cypher ;
memgraphCypherKeyword : cypherKeyword
| ADD
| ACTIVE
| AFTER
| ALTER
@ -64,6 +65,7 @@ memgraphCypherKeyword : cypherKeyword
| HEADER
| IDENTIFIED
| INSTANCE
| INSTANCES
| NODE_LABELS
| NULLIF
| IMPORT
@ -189,7 +191,8 @@ replicationQuery : setReplicationRole
coordinatorQuery : registerInstanceOnCoordinator
| setInstanceToMain
| showReplicationCluster
| showInstances
| addCoordinatorInstance
;
triggerQuery : createTrigger
@ -374,7 +377,7 @@ setReplicationRole : SET REPLICATION ROLE TO ( MAIN | REPLICA )
showReplicationRole : SHOW REPLICATION ROLE ;
showReplicationCluster : SHOW REPLICATION CLUSTER ;
showInstances : SHOW INSTANCES ;
instanceName : symbolicName ;
@ -382,6 +385,7 @@ socketAddress : literal ;
coordinatorSocketAddress : literal ;
replicationSocketAddress : literal ;
raftSocketAddress : literal ;
registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC )
TO socketAddress ;
@ -390,6 +394,10 @@ registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSoc
setInstanceToMain : SET INSTANCE instanceName TO MAIN ;
raftServerId : literal ;
addCoordinatorInstance : ADD COORDINATOR raftServerId ON raftSocketAddress ;
dropReplica : DROP REPLICA instanceName ;
showReplicas : SHOW REPLICAS ;

View File

@ -23,6 +23,7 @@ lexer grammar MemgraphCypherLexer ;
import CypherLexer ;
ADD : A D D ;
ACTIVE : A C T I V E ;
AFTER : A F T E R ;
ALTER : A L T E R ;
@ -39,7 +40,6 @@ BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ;
CALL : C A L L ;
CHECK : C H E C K ;
CLEAR : C L E A R ;
CLUSTER : C L U S T E R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
@ -80,6 +80,7 @@ INACTIVE : I N A C T I V E ;
IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ;
IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ;
INSTANCE : I N S T A N C E ;
INSTANCES : I N S T A N C E S ;
ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ;
LABELS : L A B E L S ;

View File

@ -219,7 +219,8 @@ const trie::Trie kKeywords = {"union",
"lock",
"unlock",
"build",
"instance"};
"instance",
"coordinator"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -508,6 +508,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &raft_socket_address) -> void override {
auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(raft_socket_address);
if (maybe_ip_and_port) {
auto const [ip, port] = *maybe_ip_and_port;
spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip);
coordinator_handler_.AddCoordinatorInstance(raft_server_id, port, ip);
} else {
spdlog::error("Invalid raft socket address {}.", raft_socket_address);
}
}
void SetInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetInstanceToMain(instance_name);
switch (status) {
@ -526,7 +537,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
std::vector<coordination::CoordinatorInstanceStatus> ShowInstances() const override {
std::vector<coordination::InstanceStatus> ShowInstances() const override {
return coordinator_handler_.ShowInstances();
}
@ -930,7 +941,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
switch (repl_query->action_) {
case ReplicationQuery::Action::SET_REPLICATION_ROLE: {
#ifdef MG_ENTERPRISE
if (FLAGS_coordinator) {
if (FLAGS_raft_server_id) {
throw QueryRuntimeException("Coordinator can't set roles!");
}
if (FLAGS_coordinator_server_port) {
@ -960,7 +971,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: {
#ifdef MG_ENTERPRISE
if (FLAGS_coordinator) {
if (FLAGS_raft_server_id) {
throw QueryRuntimeException("Coordinator doesn't have a replication role!");
}
#endif
@ -1017,8 +1028,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
#ifdef MG_ENTERPRISE
if (FLAGS_coordinator) {
throw QueryRuntimeException("Coordinator cannot call SHOW REPLICAS! Use SHOW REPLICATION CLUSTER instead.");
if (FLAGS_raft_server_id) {
throw QueryRuntimeException("Coordinator cannot call SHOW REPLICAS! Use SHOW INSTANCES instead.");
}
#endif
@ -1079,6 +1090,37 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
const query::InterpreterConfig &config, std::vector<Notification> *notifications) {
Callback callback;
switch (coordinator_query->action_) {
case CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_raft_server_id) {
throw QueryRuntimeException("Only coordinator can add coordinator instance!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
auto raft_socket_address_tv = coordinator_query->raft_socket_address_->Accept(evaluator);
auto raft_server_id_tv = coordinator_query->raft_server_id_->Accept(evaluator);
callback.fn = [handler = CoordQueryHandler{*coordinator_state}, raft_socket_address_tv,
raft_server_id_tv]() mutable {
handler.AddCoordinatorInstance(raft_server_id_tv.ValueInt(), std::string(raft_socket_address_tv.ValueString()));
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::ADD_COORDINATOR_INSTANCE,
fmt::format("Coordinator has added instance {} on coordinator server {}.",
coordinator_query->instance_name_, raft_socket_address_tv.ValueString()));
return callback;
}
case CoordinatorQuery::Action::REGISTER_INSTANCE: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
@ -1089,7 +1131,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
if (!FLAGS_raft_server_id) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
@ -1124,7 +1166,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
if (!FLAGS_raft_server_id) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
@ -1140,7 +1182,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
return callback;
}
case CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER: {
case CoordinatorQuery::Action::SHOW_INSTANCES: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
@ -1149,11 +1191,11 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can run SHOW REPLICATION CLUSTER.");
if (!FLAGS_raft_server_id) {
throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES.");
}
callback.header = {"name", "socket_address", "alive", "role"};
callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "alive", "role"};
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
replica_nfields = callback.header.size()]() mutable {
auto const instances = handler.ShowInstances();
@ -1162,15 +1204,15 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
std::ranges::transform(instances, std::back_inserter(result),
[](const auto &status) -> std::vector<TypedValue> {
return {TypedValue{status.instance_name}, TypedValue{status.socket_address},
TypedValue{status.is_alive}, TypedValue{status.replication_role}};
return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address},
TypedValue{status.coord_socket_address}, TypedValue{status.is_alive},
TypedValue{status.cluster_role}};
});
return result;
};
return callback;
}
return callback;
}
}
#endif
@ -4175,7 +4217,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
}
#ifdef MG_ENTERPRISE
if (FLAGS_coordinator && !utils::Downcast<CoordinatorQuery>(parsed_query.query) &&
if (FLAGS_raft_server_id && !utils::Downcast<CoordinatorQuery>(parsed_query.query) &&
!utils::Downcast<SettingQuery>(parsed_query.query)) {
throw QueryRuntimeException("Coordinator can run only coordinator queries!");
}

View File

@ -53,7 +53,7 @@
#include "utils/tsc.hpp"
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/instance_status.hpp"
#endif
namespace memgraph::metrics {
@ -114,7 +114,11 @@ class CoordinatorQueryHandler {
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::CoordinatorInstanceStatus> ShowInstances() const = 0;
virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &coordinator_socket_address)
-> void = 0;
};
#endif

View File

@ -69,6 +69,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
#ifdef MG_ENTERPRISE
case NotificationCode::REGISTER_COORDINATOR_SERVER:
return "RegisterCoordinatorServer"sv;
case NotificationCode::ADD_COORDINATOR_INSTANCE:
return "AddCoordinatorInstance"sv;
#endif
case NotificationCode::REPLICA_PORT_WARNING:
return "ReplicaPortWarning"sv;

View File

@ -44,6 +44,7 @@ enum class NotificationCode : uint8_t {
REGISTER_REPLICA,
#ifdef MG_ENTERPRISE
REGISTER_COORDINATOR_SERVER,
ADD_COORDINATOR_INSTANCE,
#endif
SET_REPLICA,
START_STREAM,

View File

@ -66,8 +66,9 @@ startup_config_dict = {
"Time in seconds after which inactive Bolt sessions will be closed.",
),
"cartesian_product_enabled": ("true", "true", "Enable cartesian product expansion."),
"coordinator": ("false", "false", "Controls whether the instance is a replication coordinator."),
"coordinator_server_port": ("0", "0", "Port on which coordinator servers will be started."),
"raft_server_port": ("0", "0", "Port on which raft servers will be started."),
"raft_server_id": ("0", "0", "Unique ID of the raft server."),
"data_directory": ("mg_data", "mg_data", "Path to directory in which to save all permanent data."),
"data_recovery_on_startup": (
"false",

View File

@ -2,6 +2,7 @@ find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental distributed_coordinators.py)
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py)
copy_e2e_python_files(ha_experimental common.py)

View File

@ -13,7 +13,6 @@ import os
import shutil
import sys
import tempfile
import time
import interactive_mg_runner
import pytest
@ -70,7 +69,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
"setup_queries": [],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
@ -111,12 +110,13 @@ def test_replication_works_on_failover():
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -132,7 +132,6 @@ def test_replication_works_on_failover():
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
@ -143,13 +142,13 @@ def test_replication_works_on_failover():
execute_and_fetch_all(new_main_cursor, "CREATE ();")
# 6
alive_replica_cursor = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursor, "MATCH (n) RETURN count(n) as count;")[0][0]
alive_replica_cursror = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_show_replication_cluster():
def test_show_instances():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
@ -159,12 +158,13 @@ def test_show_replication_cluster():
coord_cursor = connect(host="localhost", port=7690).cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
@ -184,18 +184,20 @@ def test_show_replication_cluster():
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, show_repl_cluster)
@ -217,12 +219,13 @@ def test_simple_automatic_failover():
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
@ -280,21 +283,23 @@ def test_replica_instance_restarts():
cursor = connect(host="localhost", port=7690).cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, show_repl_cluster)
@ -320,19 +325,21 @@ def test_automatic_failover_main_back_as_replica():
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_after_failover = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
expected_data_after_main_coming_back = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
]
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -346,60 +353,68 @@ def test_automatic_failover_main_back_as_replica():
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3)
def test_replica_instance_restarts_replication_works():
def test_automatic_failover_main_back_as_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connect(host="localhost", port=7690).cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_all_down = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_down, show_repl_cluster)
mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_main_back = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
instance3_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
expected_data_on_main_show_replicas = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
expected_data_replicas_back = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
instance3_cursor = connect(host="localhost", port=7687).cursor()
mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)
instance1_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert(expected_data_on_main_show_replicas, retrieve_data_show_repl_role_instance1)
instance2_cursor = connect(host="localhost", port=7689).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
expected_data_replica = [("replica",)]
mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1)
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
execute_and_fetch_all(instance3_cursor, "CREATE ();")
def retrieve_data_replica():
return execute_and_fetch_all(instance1_cursor, "MATCH (n) RETURN count(n);")[0][0]
expected_data_replica = 1
mg_sleep_and_assert(expected_data_replica, retrieve_data_replica)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
if __name__ == "__main__":

View File

@ -37,16 +37,17 @@ def test_coordinator_cannot_run_show_repl_role():
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_show_replication_cluster():
def test_coordinator_show_instances():
cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
@ -65,8 +66,8 @@ def test_coordinator_cannot_call_show_replicas():
def test_main_and_replicas_cannot_call_show_repl_cluster(port):
cursor = connect(host="localhost", port=port).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")
assert str(e.value) == "Only coordinator can run SHOW REPLICATION CLUSTER."
execute_and_fetch_all(cursor, "SHOW INSTANCES;")
assert str(e.value) == "Only coordinator can run SHOW INSTANCES."
@pytest.mark.parametrize(

View File

@ -0,0 +1,145 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"coordinator1": {
"args": [
"--bolt-port",
"7687",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator2": {
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator3": {
"args": [
"--bolt-port",
"7689",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
],
},
}
def test_coordinators_communication():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7689).cursor()
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
mg_sleep_and_assert(expected_cluster, check_coordinator2)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -13,7 +13,7 @@ ha_cluster: &ha_cluster
log_file: "replication-e2e-main.log"
setup_queries: []
coordinator:
args: ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"]
args: ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"]
log_file: "replication-e2e-coordinator.log"
setup_queries: [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
@ -36,6 +36,10 @@ workloads:
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/manual_setting_replicas.py"]
- name: "Distributed coordinators"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/distributed_coordinators.py"]
- name: "Not replicate from old main"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/not_replicate_from_old_main.py"]

View File

@ -2632,6 +2632,19 @@ TEST_P(CypherMainVisitorTest, TestRegisterReplicationQuery) {
ReplicationQuery::SyncMode::SYNC);
}
#ifdef MG_ENTERPRISE
TEST_P(CypherMainVisitorTest, TestAddCoordinatorInstance) {
auto &ast_generator = *GetParam();
std::string const correct_query = R"(ADD COORDINATOR 1 ON "127.0.0.1:10111")";
auto *parsed_query = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query));
EXPECT_EQ(parsed_query->action_, CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE);
ast_generator.CheckLiteral(parsed_query->raft_socket_address_, TypedValue("127.0.0.1:10111"));
ast_generator.CheckLiteral(parsed_query->raft_server_id_, TypedValue(1));
}
#endif
TEST_P(CypherMainVisitorTest, TestDeleteReplica) {
auto &ast_generator = *GetParam();