Merge branch 'master' into text-search-integration-poc

This commit is contained in:
Ante Pušić 2024-02-04 00:52:54 +01:00 committed by GitHub
commit 2ac3d85735
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 626 additions and 408 deletions

View File

@ -24,7 +24,7 @@ jobs:
community_build:
name: "Community build"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -63,7 +63,7 @@ jobs:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -117,7 +117,7 @@ jobs:
debug_build:
name: "Debug build"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -171,7 +171,7 @@ jobs:
debug_integration_test:
name: "Debug integration tests"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -201,7 +201,7 @@ jobs:
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -272,7 +272,7 @@ jobs:
release_benchmark_tests:
name: "Release Benchmark Tests"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -324,7 +324,7 @@ jobs:
if: false
name: "Release End-to-end Test"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -372,7 +372,7 @@ jobs:
release_durability_stress_tests:
name: "Release durability and stress tests"
runs-on: [self-hosted, Linux, X64, Debian10]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -420,7 +420,7 @@ jobs:
release_jepsen_test:
name: "Release Jepsen Test"
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository

View File

@ -24,7 +24,7 @@ jobs:
community_build:
name: "Community build"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -59,7 +59,7 @@ jobs:
coverage_build:
name: "Coverage build"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -113,7 +113,7 @@ jobs:
debug_build:
name: "Debug build"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -167,7 +167,7 @@ jobs:
debug_integration_test:
name: "Debug integration tests"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -197,7 +197,7 @@ jobs:
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -268,7 +268,7 @@ jobs:
release_benchmark_tests:
name: "Release Benchmark Tests"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -320,7 +320,7 @@ jobs:
if: false
name: "Release End-to-end Test"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository
@ -368,7 +368,7 @@ jobs:
release_durability_stress_tests:
name: "Release durability and stress tests"
runs-on: [self-hosted, Linux, X64, Ubuntu20.04]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Set up repository

View File

@ -23,7 +23,7 @@ env:
jobs:
stress_test_large:
name: "Stress test large"
timeout-minutes: 600
timeout-minutes: 720
strategy:
matrix:
os: [Debian10, Ubuntu20.04]

38
ADRs/003_rocksdb.md Normal file
View File

@ -0,0 +1,38 @@
# RocksDB ADR
**Author**
Marko Budiselic (github.com/gitbuda)
**Status**
ACCEPTED
**Date**
January 23, 2024
**Problem**
Interacting with data (reads and writes) on disk in a concurrent, safe, and
fast way is a challenging task. Implementing all low-level primitives to
interact with various disk hardware efficiently consumes significant
engineering people. Whenever Memgraph has to store data on disk (or any
other colder than RAM storage system), the problem is how to do that in the
least amount of development time while satisfying all functional requirements
(often performance).
**Criteria**
- working efficiently in a highly concurrent environment
- easy integration with Memgraph's C++ codebase
- providing low-level key-value API
- heavily tested in production environments
- providing abstractions for the storage hardware (even for cloud-based
storages like S3)
**Decision**
There are a few robust key-value stores, but finding one that is
production-ready and compatible with Memgraph's C++ codebase is challenging.
**We select [RocksDB](https://github.com/facebook/rocksdb)** because it
delivers robust API to manage data on disk; it's battle-tested in many
production environments (many databases systems are embedding RocksDB), and
it's the most compatible one.

View File

@ -326,3 +326,10 @@ set_property(TARGET mgcxx_text_search PROPERTY IMPORTED_LOCATION ${MGCXX_ROOT}/l
# generated later during the build process.
file(MAKE_DIRECTORY ${MGCXX_ROOT}/include)
set_property(TARGET mgcxx_text_search PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MGCXX_ROOT}/include)
# Setup NuRaft
import_external_library(nuraft STATIC
${CMAKE_CURRENT_SOURCE_DIR}/nuraft/lib/libnuraft.a
${CMAKE_CURRENT_SOURCE_DIR}/nuraft/include/)
find_package(OpenSSL REQUIRED)
target_link_libraries(nuraft INTERFACE ${OPENSSL_LIBRARIES})

View File

@ -126,6 +126,7 @@ declare -A primary_urls=(
["absl"]="https://$local_cache_host/git/abseil-cpp.git"
["jemalloc"]="https://$local_cache_host/git/jemalloc.git"
["range-v3"]="https://$local_cache_host/git/ericniebler/range-v3.git"
["nuraft"]="https://$local_cache_host/git/eBay/NuRaft.git"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -155,6 +156,7 @@ declare -A secondary_urls=(
["absl"]="https://github.com/abseil/abseil-cpp.git"
["jemalloc"]="https://github.com/jemalloc/jemalloc.git"
["range-v3"]="https://github.com/ericniebler/range-v3.git"
["nuraft"]="https://github.com/eBay/NuRaft.git"
)
# antlr
@ -277,3 +279,10 @@ popd
#range-v3 release-0.12.0
range_v3_ref="release-0.12.0"
repo_clone_try_double "${primary_urls[range-v3]}" "${secondary_urls[range-v3]}" "rangev3" "$range_v3_ref"
# NuRaft
nuraft_tag="v2.1.0"
repo_clone_try_double "${primary_urls[nuraft]}" "${secondary_urls[nuraft]}" "nuraft" "$nuraft_tag" true
pushd nuraft
./prepare.sh
popd

View File

@ -12,7 +12,6 @@ target_sources(mg-coordination
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/failover_status.hpp
include/coordination/coordinator_cluster_config.hpp
PRIVATE
@ -21,9 +20,10 @@ target_sources(mg-coordination
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
)
target_include_directories(mg-coordination PUBLIC include)
target_link_libraries(mg-coordination
PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue lib::rangev3
PUBLIC mg::utils mg::rpc mg::slk mg::io mg::repl_coord_glue lib::rangev3 nuraft
)

View File

@ -20,7 +20,7 @@
namespace memgraph::coordination {
namespace {
auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &config)
auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &config)
-> communication::ClientContext {
return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file}
: communication::ClientContext{};
@ -45,38 +45,33 @@ void CoordinatorClient::StartFrequentCheck() {
"Health check frequency must be greater than 0");
instance_checker_.Run(
"Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
{ // NOTE: This is intentionally scoped so that stream lock could get released.
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_data_, instance_name);
} catch (const rpc::RpcFailedException &) {
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_data_, instance_name);
}
});
}
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & {
return config_.replication_client_info;
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
succ_cb_ = std::move(succ_cb);
fail_cb_ = std::move(fail_cb);
}
auto CoordinatorClient::ResetReplicationClientInfo() -> void {
// TODO (antoniofilipovic) Sync with Andi on this one
// config_.replication_client_info.reset();
}
auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
auto CoordinatorClient::SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool {
try {
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) {
@ -84,23 +79,24 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(
return false;
}
return true;
} catch (const rpc::RpcFailedException &) {
} catch (rpc::RpcFailedException const &) {
spdlog::error("RPC error occurred while sending failover RPC!");
}
return false;
}
auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool {
auto CoordinatorClient::DemoteToReplica() const -> bool {
auto const &instance_name = config_.instance_name;
try {
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))};
auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to set main to replica!");
spdlog::error("Failed to receive successful RPC response for setting instance {} to replica!", instance_name);
return false;
}
spdlog::info("Sent request RPC from coordinator to instance to set it as replica!");
return true;
} catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!");
} catch (rpc::RpcFailedException const &) {
spdlog::error("Failed to set instance {} to replica!", instance_name);
}
return false;
}

View File

@ -17,6 +17,7 @@
#include <range/v3/view.hpp>
#include <shared_mutex>
#include "libnuraft/nuraft.hxx"
namespace memgraph::coordination {
@ -24,7 +25,7 @@ CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
auto instance = std::ranges::find_if(
coord_data->registered_instances_,
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
[instance_name](CoordinatorInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!",
instance_name);
@ -34,105 +35,94 @@ CoordinatorData::CoordinatorData() {
replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateLastResponseTime();
find_instance(coord_data, instance_name).OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateInstanceStatus();
find_instance(coord_data, instance_name).OnFailPing();
};
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
instance.UpdateLastResponseTime();
if (instance.IsAlive() || !coord_data->ClusterHasAliveMain_()) {
instance.OnSuccessPing();
return;
}
bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
}
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
main_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) {
spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name);
switch (auto failover_status = DoFailover(); failover_status) {
using enum DoFailoverStatus;
case ALL_REPLICAS_DOWN:
spdlog::warn("Failover aborted since all replicas are down!");
break;
case MAIN_ALIVE:
spdlog::warn("Failover aborted since main is alive!");
break;
case RPC_FAILED:
spdlog::warn("Failover aborted since promoting replica to main failed!");
break;
case SUCCESS:
break;
}
find_instance(coord_data, instance_name).OnFailPing();
if (!coord_data->ClusterHasAliveMain_()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_data->TryFailover();
}
};
}
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
auto CoordinatorData::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](CoordinatorInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(registered_instances_, alive_main);
}
auto CoordinatorData::TryFailover() -> void {
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive);
if (chosen_replica_instance == replica_instances.end()) {
return DoFailoverStatus::ALL_REPLICAS_DOWN;
spdlog::warn("Failover failed since all replicas are down!");
return;
}
chosen_replica_instance->PrepareForFailover();
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
std::vector<ReplicationClientInfo> repl_clients_info;
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(std::ranges::distance(replica_instances));
auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
auto const not_chosen_replica_instance = [&chosen_replica_instance](CoordinatorInstance const &instance) {
return instance != *chosen_replica_instance;
};
auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
// TODO (antoniofilipovic): Should we send also data on old MAIN???
// TODO: (andi) Don't send replicas which aren't alive
for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo());
std::ranges::transform(registered_instances_ | ranges::views::filter(not_chosen_replica_instance),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!chosen_replica_instance->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
chosen_replica_instance->RestoreAfterFailedFailover();
return DoFailoverStatus::RPC_FAILED;
}
auto old_main = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
// TODO: (andi) For performing restoration we will have to improve this
old_main->client_.PauseFrequentCheck();
chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_);
return DoFailoverStatus::SUCCESS;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> {
std::vector<CoordinatorInstanceStatus> instances_status;
instances_status.reserve(registered_instances_.size());
auto const stringify_repl_role = [](const CoordinatorInstance &instance) -> std::string {
if (!instance.IsAlive()) return "";
auto const stringify_repl_role = [](CoordinatorInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const instance_to_status =
[&stringify_repl_role](const CoordinatorInstance &instance) -> CoordinatorInstanceStatus {
[&stringify_repl_role](CoordinatorInstance const &instance) -> CoordinatorInstanceStatus {
return {.instance_name = instance.InstanceName(),
.socket_address = instance.SocketAddress(),
.replication_role = stringify_repl_role(instance),
@ -150,70 +140,59 @@ auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceSt
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
// Find replica we already registered
auto registered_replica = std::find_if(
registered_instances_.begin(), registered_instances_.end(),
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
auto const is_new_main = [&instance_name](CoordinatorInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(registered_instances_, is_new_main);
// if replica not found...
if (registered_replica == registered_instances_.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
if (new_main == registered_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
registered_replica->client_.PauseFrequentCheck();
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
std::ranges::for_each(registered_instances_,
[registered_replica, &repl_clients_info](const CoordinatorInstance &replica) {
if (replica != *registered_replica) {
repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo());
}
});
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
registered_replica->client_.ResumeFrequentCheck();
auto const is_not_new_main = [&instance_name](CoordinatorInstance const &instance) {
return instance.InstanceName() != instance_name;
};
std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
registered_replica->client_.SetSuccCallback(main_succ_cb_);
registered_replica->client_.SetFailCallback(main_fail_cb_);
registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
registered_replica->client_.ResumeFrequentCheck();
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress());
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS;
}
CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info;
try {
registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
// TODO (antoniofilipovic) create and then push back
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) {
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
instance->client_.StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
} // namespace memgraph::coordination

View File

@ -0,0 +1,84 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
namespace memgraph::coordination {
CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
auto CoordinatorInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto CoordinatorInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; }
auto CoordinatorInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto CoordinatorInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto CoordinatorInstance::PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
}
auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
return true;
}
auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
} // namespace memgraph::coordination
#endif

View File

@ -36,19 +36,19 @@ void PromoteReplicaToMainRes::Load(PromoteReplicaToMainRes *self, memgraph::slk:
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaReq::Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
void DemoteMainToReplicaReq::Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaReq::Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
void DemoteMainToReplicaReq::Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaRes::Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
void DemoteMainToReplicaRes::Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaRes::Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
@ -60,11 +60,11 @@ constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::Ty
constexpr utils::TypeInfo coordination::PromoteReplicaToMainRes::kType{utils::TypeId::COORD_FAILOVER_RES,
"CoordPromoteReplicaToMainRes", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ,
"CoordSetReplMainReq", nullptr};
constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ,
"CoordDemoteToReplicaReq", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordSetReplMainRes", nullptr};
constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordDemoteToReplicaRes", nullptr};
namespace slk {
@ -84,19 +84,19 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::
memgraph::slk::Load(&self->replication_clients_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replication_client_info, builder);
}
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->replication_client_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
}
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->success, reader);
}

View File

@ -74,12 +74,6 @@ auto CoordinatorState::ShowInstances() const -> std::vector<CoordinatorInstanceS
return std::get<CoordinatorData>(data_).ShowInstances();
}
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
auto &coord_state = std::get<CoordinatorData>(data_);
return coord_state.DoFailover();
}
auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
MG_ASSERT(std::holds_alternative<CoordinatorMainReplicaData>(data_),
"Cannot get coordinator server since variant holds wrong alternative");

View File

@ -21,12 +21,10 @@ namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
@ -46,15 +44,12 @@ class CoordinatorClient {
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto DemoteToReplica() const -> bool;
auto ReplicationClientInfo() const -> const ReplClientInfo &;
auto ResetReplicationClientInfo() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
return first.config_ == second.config_;

View File

@ -32,9 +32,7 @@ struct CoordinatorClientConfig {
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }
// Info which coordinator will send to new main when performing failover
struct ReplicationClientInfo {
// Must be the same as CoordinatorClientConfig's instance_name
std::string instance_name;
replication_coordination_glue::ReplicationMode replication_mode{};
std::string replication_ip_address;
@ -43,7 +41,6 @@ struct CoordinatorClientConfig {
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
};
// Each instance has replication config in case it fails
ReplicationClientInfo replication_client_info;
struct SSL {
@ -58,6 +55,8 @@ struct CoordinatorClientConfig {
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
};
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
struct CoordinatorServerConfig {
std::string ip_address;
uint16_t port{};

View File

@ -16,9 +16,9 @@
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include <list>
@ -27,17 +27,20 @@ class CoordinatorData {
public:
CoordinatorData();
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto TryFailover() -> void;
auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>;
private:
auto ClusterHasAliveMain_() const -> bool;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// Must be std::list because we rely on pointer stability
// NOTE: Must be std::list because we rely on pointer stability
std::list<CoordinatorInstance> registered_instances_;
};

View File

@ -16,16 +16,16 @@
#include "utils/exceptions.hpp"
namespace memgraph::coordination {
class CoordinatorFailoverException final : public utils::BasicException {
class CoordinatorRegisterInstanceException final : public utils::BasicException {
public:
explicit CoordinatorFailoverException(const std::string_view what) noexcept
: BasicException("Failover didn't complete successfully: " + std::string(what)) {}
explicit CoordinatorRegisterInstanceException(const std::string_view what) noexcept
: BasicException("Failed to create instance: " + std::string(what)) {}
template <class... Args>
explicit CoordinatorFailoverException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorFailoverException(fmt::format(fmt, std::forward<Args>(args)...)) {}
explicit CoordinatorRegisterInstanceException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorRegisterInstanceException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorFailoverException)
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException)
};
} // namespace memgraph::coordination

View File

@ -15,6 +15,7 @@
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
namespace memgraph::coordination {
@ -24,10 +25,7 @@ class CoordinatorData;
class CoordinatorInstance {
public:
CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_role),
is_alive_(true) {}
HealthCheckCallback fail_cb);
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
@ -35,34 +33,27 @@ class CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto UpdateInstanceStatus() -> bool {
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_)
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto InstanceName() const -> std::string { return client_.InstanceName(); }
auto SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto IsAlive() const -> bool { return is_alive_; }
auto IsAlive() const -> bool;
auto IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; }
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
// Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN
// client_.ResetReplicationClientInfo();
client_.ResumeFrequentCheck();
}
auto PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};

View File

@ -48,35 +48,35 @@ struct PromoteReplicaToMainRes {
using PromoteReplicaToMainRpc = rpc::RequestResponse<PromoteReplicaToMainReq, PromoteReplicaToMainRes>;
struct SetMainToReplicaReq {
struct DemoteMainToReplicaReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
static void Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info)
explicit DemoteMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info)
: replication_client_info(std::move(replication_client_info)) {}
SetMainToReplicaReq() = default;
DemoteMainToReplicaReq() = default;
CoordinatorClientConfig::ReplicationClientInfo replication_client_info;
};
struct SetMainToReplicaRes {
struct DemoteMainToReplicaRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
static void Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaRes(bool success) : success(success) {}
SetMainToReplicaRes() = default;
explicit DemoteMainToReplicaRes(bool success) : success(success) {}
DemoteMainToReplicaRes() = default;
bool success;
};
using SetMainToReplicaRpc = rpc::RequestResponse<SetMainToReplicaReq, SetMainToReplicaRes>;
using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>;
} // namespace memgraph::coordination
@ -91,13 +91,13 @@ void Save(const memgraph::coordination::PromoteReplicaToMainReq &self, memgraph:
void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk

View File

@ -16,7 +16,6 @@
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include <variant>
@ -28,8 +27,8 @@ class CoordinatorState {
CoordinatorState();
~CoordinatorState() = default;
CoordinatorState(const CoordinatorState &) = delete;
CoordinatorState &operator=(const CoordinatorState &) = delete;
CoordinatorState(CoordinatorState const &) = delete;
CoordinatorState &operator=(CoordinatorState const &) = delete;
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
@ -43,8 +42,6 @@ class CoordinatorState {
// The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
private:
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
};

View File

@ -1,21 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <cstdint>
namespace memgraph::coordination {
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, RPC_FAILED };
} // namespace memgraph::coordination
#endif

View File

@ -20,7 +20,6 @@ namespace memgraph::coordination {
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
RPC_FAILED,
SUCCESS

View File

@ -13,12 +13,10 @@
#ifdef MG_ENTERPRISE
#include "utils/result.hpp"
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "utils/result.hpp"
#include <cstdint>
#include <optional>

View File

@ -12,12 +12,12 @@
#ifdef MG_ENTERPRISE
#include "dbms/coordinator_handlers.hpp"
#include "dbms/utils.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "dbms/dbms_handler.hpp"
#include "dbms/replication_client.hpp"
#include "dbms/utils.hpp"
#include "range/v3/view.hpp"
@ -32,31 +32,33 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) {
CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder);
});
server.Register<coordination::SetMainToReplicaRpc>(
server.Register<coordination::DemoteMainToReplicaRpc>(
[&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received SetMainToReplicaRpc from coordinator server");
CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder);
spdlog::info("Received DemoteMainToReplicaRpc from coordinator server");
CoordinatorHandlers::DemoteMainToReplicaHandler(dbms_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder) {
void CoordinatorHandlers::DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder) {
auto &repl_state = dbms_handler.ReplicationState();
spdlog::info("Executing SetMainToReplicaHandler");
if (!repl_state.IsMain()) {
if (repl_state.IsReplica()) {
spdlog::error("Setting to replica must be performed on main.");
slk::Save(coordination::SetMainToReplicaRes{false}, res_builder);
slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder);
return;
}
coordination::SetMainToReplicaReq req;
coordination::DemoteMainToReplicaReq req;
slk::Load(&req, req_reader);
replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
const replication::ReplicationServerConfig clients_config{
.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Setting main to replica failed!");
if (bool const success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Demoting main to replica failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
@ -69,16 +71,14 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
auto &repl_state = dbms_handler.ReplicationState();
if (!repl_state.IsReplica()) {
spdlog::error("Failover must be performed on replica!");
spdlog::error("Only replica can be promoted to main!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
auto repl_server_config = std::get<replication::RoleReplicaData>(repl_state.ReplicationData()).config;
// This can fail because of disk. If it does, the cluster state could get inconsistent.
// We don't handle disk issues.
if (bool success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) {
if (bool const success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) {
spdlog::error("Promoting replica to main failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
@ -104,28 +104,29 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
auto instance_client = repl_state.RegisterReplica(config);
if (instance_client.HasError()) {
using enum memgraph::replication::RegisterReplicaError;
switch (instance_client.GetError()) {
// Can't happen, we are already replica
case memgraph::replication::RegisterReplicaError::NOT_MAIN:
spdlog::error("Failover must be performed to main!");
case NOT_MAIN:
spdlog::error("Failover must be performed on main!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// Can't happen, checked on the coordinator side
case memgraph::replication::RegisterReplicaError::NAME_EXISTS:
case NAME_EXISTS:
spdlog::error("Replica with the same name already exists!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// Can't happen, checked on the coordinator side
case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS:
case ENDPOINT_EXISTS:
spdlog::error("Replica with the same endpoint already exists!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// We don't handle disk issues
case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
case COULD_NOT_BE_PERSISTED:
spdlog::error("Registered replica could not be persisted!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
case memgraph::replication::RegisterReplicaError::SUCCESS:
case SUCCESS:
break;
}
}
@ -138,9 +139,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
// Update system before enabling individual storage <-> replica clients
dbms_handler.SystemRestore(instance_client_ref);
// TODO: (andi) Policy for register all databases
// Will be resolved after deciding about choosing new replica
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler, instance_client_ref);
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients<true>(dbms_handler, instance_client_ref);
MG_ASSERT(all_clients_good, "Failed to register one or more databases to the REPLICA \"{}\".", config.name);
StartReplicaClient(dbms_handler, instance_client_ref);

View File

@ -26,7 +26,7 @@ class CoordinatorHandlers {
private:
static void PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder);
static void DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -76,6 +76,7 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler,
return success;
}
template <bool AllowRPCFailure = false>
inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
replication::ReplicationClient &instance_client) {
if (!allow_mt_repl && dbms_handler.All().size() > 1) {
@ -84,7 +85,6 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
bool all_clients_good = true;
// Add database specific clients (NOTE Currently all databases are connected to each replica)
dbms_handler.ForEach([&](DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
if (!allow_mt_repl && storage->name() != kDefaultDB) {
@ -93,16 +93,14 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
// TODO: ATM only IN_MEMORY_TRANSACTIONAL, fix other modes
if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return;
using enum storage::replication::ReplicaState;
all_clients_good &= storage->repl_storage_state_.replication_clients_.WithLock(
[storage, &instance_client, db_acc = std::move(db_acc)](auto &storage_clients) mutable { // NOLINT
auto client = std::make_unique<storage::ReplicationStorageClient>(instance_client);
// All good, start replica client
client->Start(storage, std::move(db_acc));
// After start the storage <-> replica state should be READY or RECOVERING (if correctly started)
// MAYBE_BEHIND isn't a statement of the current state, this is the default value
// Failed to start due an error like branching of MAIN and REPLICA
if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
return false; // TODO: sometimes we need to still add to storage_clients
if (client->State() == MAYBE_BEHIND && !AllowRPCFailure) {
return false;
}
storage_clients.push_back(std::move(client));
return true;

View File

@ -386,7 +386,7 @@ replicationSocketAddress : literal ;
registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC )
TO socketAddress ;
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
setInstanceToMain : SET INSTANCE instanceName TO MAIN ;

View File

@ -500,14 +500,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case END_POINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica because promotion on replica failed! Check logs on replica to find out more "
"info!");
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
"find out more info!");
case SUCCESS:
break;
}
@ -520,10 +518,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case NO_INSTANCE_WITH_NAME:
throw QueryRuntimeException("No instance with such name!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't set replica instance to main since this instance is not a coordinator!");
throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(
"Couldn't set replica instance to main. Check coordinator and replica for more logs");
"Couldn't set replica instance to main!. Check coordinator and replica for more logs");
case SUCCESS:
break;
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,6 +15,7 @@
#include <cstdint>
#include <filesystem>
#include "flags/replication.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage_mode.hpp"
#include "utils/exceptions.hpp"
@ -128,10 +129,15 @@ struct Config {
};
inline auto ReplicationStateRootPath(memgraph::storage::Config const &config) -> std::optional<std::filesystem::path> {
if (!config.durability.restore_replication_state_on_startup) {
if (!config.durability.restore_replication_state_on_startup
#ifdef MG_ENTERPRISE
&& !FLAGS_coordinator_server_port
#endif
) {
spdlog::warn(
"Replication configuration will NOT be stored. When the server restarts, replication state will be "
"forgotten.");
return std::nullopt;
}
return {config.durability.storage_directory};

View File

@ -35,6 +35,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
auto hb_stream{client_.rpc_client_.Stream<replication::HeartbeatRpc>(
storage->uuid(), replStorageState.last_commit_timestamp_, std::string{replStorageState.epoch_.id()})};
const auto replica = hb_stream.AwaitResponse();
#ifdef MG_ENTERPRISE // Multi-tenancy is only supported in enterprise
@ -67,7 +68,6 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
client_.name_, client_.name_, client_.name_);
// TODO: (andi) Talk about renaming MAYBE_BEHIND to branching
// State not updated, hence in MAYBE_BEHIND state
return;
}

View File

@ -32,7 +32,7 @@ namespace memgraph::utils {
* void long_function() {
* resource.enable();
* OnScopeExit on_exit([&resource] { resource.disable(); });
* // long block of code, might trow an exception
* // long block of code, might throw an exception
* }
*/
template <typename Callable>

View File

@ -4,7 +4,6 @@ copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
copy_e2e_python_files(ha_experimental common.py)
copy_e2e_python_files(ha_experimental conftest.py)
copy_e2e_python_files(ha_experimental workloads.yaml)
copy_e2e_python_files_from_parent_folder(ha_experimental ".." memgraph.py)

View File

@ -1,4 +1,4 @@
# Copyright 2024 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,13 @@
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import execute_and_fetch_all
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -24,25 +26,51 @@ interactive_mg_runner.PROJECT_DIR = os.path.normpath(
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"log_file": "replica1.log",
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"log_file": "replica2.log",
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"log_file": "main.log",
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"log_file": "replica3.log",
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
@ -53,7 +81,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
}
def test_replication_works_on_failover(connection):
def test_replication_works_on_failover():
# Goal of this test is to check the replication works after failover command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that main has correct state
@ -61,12 +89,13 @@ def test_replication_works_on_failover(connection):
# 4. We check that coordinator and new main have correct state
# 5. We insert one vertex on new main
# 6. We check that vertex appears on new replica
safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connection(7687, "instance_3").cursor()
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
@ -78,7 +107,7 @@ def test_replication_works_on_failover(connection):
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
@ -86,17 +115,25 @@ def test_replication_works_on_failover(connection):
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
new_main_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
@ -104,62 +141,68 @@ def test_replication_works_on_failover(connection):
execute_and_fetch_all(new_main_cursor, "CREATE ();")
# 6
alive_replica_cursror = connection(7689, "instance_2").cursor()
alive_replica_cursror = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
def test_show_replication_cluster():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
instance1_cursor = connect(host="localhost", port=7688).cursor()
instance2_cursor = connect(host="localhost", port=7689).cursor()
instance3_cursor = connect(host="localhost", port=7687).cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def test_simple_automatic_failover(connection):
def test_simple_automatic_failover():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(7687, "instance_3").cursor()
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
@ -169,7 +212,7 @@ def test_simple_automatic_failover(connection):
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
@ -177,46 +220,189 @@ def test_simple_automatic_failover(connection):
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
new_main_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
]
def test_registering_replica_fails_name_exists(connection):
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
shutil.rmtree(TEMP_DIR)
def test_registering_replica_fails_endpoint_exists(connection):
def test_registering_replica_fails_endpoint_exists():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';",
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';",
)
assert (
str(e.value)
== "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more info!"
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"
def test_replica_instance_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connect(host="localhost", port=7690).cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
instance1_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
expected_data_replica = [("replica",)]
mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1)
def test_automatic_failover_main_back_as_replica():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_after_failover = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
expected_data_after_main_coming_back = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "replica"),
]
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
mg_sleep_and_assert(expected_data_after_main_coming_back, retrieve_data_show_repl_cluster)
instance3_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3)
def test_automatic_failover_main_back_as_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_all_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_main_back = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
instance3_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data_replicas_back = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)
instance1_cursor = connect(host="localhost", port=7688).cursor()
instance2_cursor = connect(host="localhost", port=7689).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
if __name__ == "__main__":

View File

@ -23,3 +23,10 @@ def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(**kwargs)
connection.autocommit = True
return connection
def safe_execute(function, *args):
try:
function(*args)
except:
pass

View File

@ -1,43 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from common import connect, execute_and_fetch_all
# The fixture here is more complex because the connection has to be
# parameterized based on the test parameters (info has to be available on both
# sides).
#
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
# is not an elegant/feasible solution here.
#
# The solution was independently developed and then I stumbled upon the same
# approach here https://stackoverflow.com/a/68286553/4888809 which I think is
# optimal.
@pytest.fixture(scope="function")
def connection():
connection_holder = None
role_holder = None
def inner_connection(port, role):
nonlocal connection_holder, role_holder
connection_holder = connect(host="localhost", port=port)
role_holder = role
return connection_holder
yield inner_connection
# Only main instance can be cleaned up because replicas do NOT accept
# writes.
if role_holder == "main":
cursor = connection_holder.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")

View File

@ -12,33 +12,33 @@
import sys
import pytest
from common import execute_and_fetch_all
from common import connect, execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
def test_disable_cypher_queries(connection):
cursor = connection(7690, "coordinator").cursor()
def test_disable_cypher_queries():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "CREATE (n:TestNode {prop: 'test'})")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_cannot_be_replica_role(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_be_replica_role():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_cannot_run_show_repl_role(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_run_show_repl_role():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICATION ROLE;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_show_replication_cluster(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_show_replication_cluster():
cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
@ -51,30 +51,30 @@ def test_coordinator_show_replication_cluster(connection):
mg_sleep_and_assert(expected_data, retrieve_data)
def test_coordinator_cannot_call_show_replicas(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_call_show_replicas():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica")],
"port",
[7687, 7688, 7689],
)
def test_main_and_replicas_cannot_call_show_repl_cluster(port, role, connection):
cursor = connection(port, role).cursor()
def test_main_and_replicas_cannot_call_show_repl_cluster(port):
cursor = connect(host="localhost", port=port).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")
assert str(e.value) == "Only coordinator can run SHOW REPLICATION CLUSTER."
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica")],
"port",
[7687, 7688, 7689],
)
def test_main_and_replicas_cannot_register_coord_server(port, role, connection):
cursor = connection(port, role).cursor()
def test_main_and_replicas_cannot_register_coord_server(port):
cursor = connect(host="localhost", port=port).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
cursor,