Add --experimental-enabled=high-availability (#1720)
This commit is contained in:
parent
f098a9d5e3
commit
7ec648b4ce
47
.github/workflows/diff.yaml
vendored
47
.github/workflows/diff.yaml
vendored
@ -336,53 +336,6 @@ jobs:
|
||||
# multiple paths could be defined
|
||||
build/logs
|
||||
|
||||
experimental_build_ha:
|
||||
name: "High availability build"
|
||||
runs-on: [self-hosted, Linux, X64, Diff]
|
||||
env:
|
||||
THREADS: 24
|
||||
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
|
||||
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
|
||||
|
||||
steps:
|
||||
- name: Set up repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
# Number of commits to fetch. `0` indicates all history for all
|
||||
# branches and tags. (default: 1)
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Build release binaries
|
||||
run: |
|
||||
source /opt/toolchain-v4/activate
|
||||
./init
|
||||
cd build
|
||||
cmake -DCMAKE_BUILD_TYPE=Release -DMG_EXPERIMENTAL_HIGH_AVAILABILITY=ON ..
|
||||
make -j$THREADS
|
||||
- name: Run unit tests
|
||||
run: |
|
||||
source /opt/toolchain-v4/activate
|
||||
cd build
|
||||
ctest -R memgraph__unit --output-on-failure -j$THREADS
|
||||
- name: Run e2e tests
|
||||
if: false
|
||||
run: |
|
||||
cd tests
|
||||
./setup.sh /opt/toolchain-v4/activate
|
||||
source ve3/bin/activate_e2e
|
||||
cd e2e
|
||||
./run.sh "Coordinator"
|
||||
./run.sh "Client initiated failover"
|
||||
./run.sh "Uninitialized cluster"
|
||||
- name: Save test data
|
||||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: "Test data(High availability build)"
|
||||
path: |
|
||||
# multiple paths could be defined
|
||||
build/logs
|
||||
|
||||
release_jepsen_test:
|
||||
name: "Release Jepsen Test"
|
||||
runs-on: [self-hosted, Linux, X64, Debian10, JepsenControl]
|
||||
|
@ -271,18 +271,6 @@ endif()
|
||||
set(libs_dir ${CMAKE_SOURCE_DIR}/libs)
|
||||
add_subdirectory(libs EXCLUDE_FROM_ALL)
|
||||
|
||||
option(MG_EXPERIMENTAL_HIGH_AVAILABILITY "Feature flag for experimental high availability" OFF)
|
||||
|
||||
if (NOT MG_ENTERPRISE AND MG_EXPERIMENTAL_HIGH_AVAILABILITY)
|
||||
set(MG_EXPERIMENTAL_HIGH_AVAILABILITY OFF)
|
||||
message(FATAL_ERROR "MG_EXPERIMENTAL_HIGH_AVAILABILITY can only be used with enterpise version of the code.")
|
||||
endif ()
|
||||
|
||||
if (MG_EXPERIMENTAL_HIGH_AVAILABILITY)
|
||||
add_compile_definitions(MG_EXPERIMENTAL_HIGH_AVAILABILITY)
|
||||
endif ()
|
||||
|
||||
# Optional subproject configuration -------------------------------------------
|
||||
option(TEST_COVERAGE "Generate coverage reports from running memgraph" OFF)
|
||||
option(TOOLS "Build tools binaries" ON)
|
||||
option(QUERY_MODULES "Build query modules containing custom procedures" ON)
|
||||
|
@ -11,7 +11,6 @@ target_sources(mg-coordination
|
||||
include/coordination/coordinator_slk.hpp
|
||||
include/coordination/coordinator_instance.hpp
|
||||
include/coordination/coordinator_handlers.hpp
|
||||
include/coordination/constants.hpp
|
||||
include/coordination/instance_status.hpp
|
||||
include/coordination/replication_instance.hpp
|
||||
include/coordination/raft_state.hpp
|
||||
|
@ -132,7 +132,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
|
||||
|
||||
// registering replicas
|
||||
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
|
||||
auto instance_client = replication_handler.RegisterReplica(config, false);
|
||||
auto instance_client = replication_handler.RegisterReplica(config);
|
||||
if (instance_client.HasError()) {
|
||||
using enum memgraph::replication::RegisterReplicaError;
|
||||
switch (instance_client.GetError()) {
|
||||
|
@ -186,11 +186,9 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: (andi) fmap compliant
|
||||
ReplicationClientsInfo repl_clients_info;
|
||||
repl_clients_info.reserve(repl_instances_.size() - 1);
|
||||
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
|
||||
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
|
||||
spdlog::warn("Failover failed since promoting replica to main failed!");
|
||||
|
@ -1,22 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
#ifdef MG_EXPERIMENTAL_HIGH_AVAILABILITY
|
||||
constexpr bool allow_ha = true;
|
||||
#else
|
||||
constexpr bool allow_ha = false;
|
||||
#endif
|
||||
|
||||
} // namespace memgraph::coordination
|
@ -19,13 +19,14 @@
|
||||
// Bolt server flags.
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_string(experimental_enabled, "",
|
||||
"Experimental features to be used, comma seperated. Options [system-replication]");
|
||||
"Experimental features to be used, comma seperated. Options [system-replication, high-availability]");
|
||||
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
namespace memgraph::flags {
|
||||
|
||||
auto const mapping = std::map{std::pair{"system-replication"sv, Experiments::SYSTEM_REPLICATION}};
|
||||
auto const mapping = std::map{std::pair{"system-replication"sv, Experiments::SYSTEM_REPLICATION},
|
||||
std::pair{"high-availability"sv, Experiments::HIGH_AVAILABILITY}};
|
||||
|
||||
auto ExperimentsInstance() -> Experiments & {
|
||||
static auto instance = Experiments{};
|
||||
|
@ -23,6 +23,7 @@ namespace memgraph::flags {
|
||||
// old experiments can be reused once code cleanup has happened
|
||||
enum class Experiments : uint8_t {
|
||||
SYSTEM_REPLICATION = 1 << 0,
|
||||
HIGH_AVAILABILITY = 1 << 1,
|
||||
};
|
||||
|
||||
bool AreExperimentsEnabled(Experiments experiments);
|
||||
|
@ -109,7 +109,6 @@
|
||||
#include "utils/variant_helpers.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "coordination/constants.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#endif
|
||||
|
||||
@ -370,7 +369,7 @@ class ReplQueryHandler {
|
||||
.replica_check_frequency = replica_check_frequency,
|
||||
.ssl = std::nullopt};
|
||||
|
||||
const auto error = handler_->TryRegisterReplica(replication_config, true).HasError();
|
||||
const auto error = handler_->TryRegisterReplica(replication_config).HasError();
|
||||
|
||||
if (error) {
|
||||
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
|
||||
@ -1131,17 +1130,21 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Parameters ¶meters,
|
||||
coordination::CoordinatorState *coordinator_state,
|
||||
const query::InterpreterConfig &config, std::vector<Notification> *notifications) {
|
||||
using enum memgraph::flags::Experiments;
|
||||
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryRuntimeException("High availability is only available in Memgraph Enterprise.");
|
||||
}
|
||||
|
||||
if (!flags::AreExperimentsEnabled(HIGH_AVAILABILITY)) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. If you want to use it, add high-availability option to the "
|
||||
"--experimental-enabled flag.");
|
||||
}
|
||||
|
||||
Callback callback;
|
||||
switch (coordinator_query->action_) {
|
||||
case CoordinatorQuery::Action::ADD_COORDINATOR_INSTANCE: {
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryException("Trying to use enterprise feature without a valid license.");
|
||||
}
|
||||
if constexpr (!coordination::allow_ha) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
if (!FLAGS_raft_server_id) {
|
||||
throw QueryRuntimeException("Only coordinator can add coordinator instance!");
|
||||
}
|
||||
@ -1165,15 +1168,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
return callback;
|
||||
}
|
||||
case CoordinatorQuery::Action::REGISTER_INSTANCE: {
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryException("Trying to use enterprise feature without a valid license.");
|
||||
}
|
||||
|
||||
if constexpr (!coordination::allow_ha) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
if (!FLAGS_raft_server_id) {
|
||||
throw QueryRuntimeException("Only coordinator can register coordinator server!");
|
||||
}
|
||||
@ -1205,15 +1199,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
return callback;
|
||||
}
|
||||
case CoordinatorQuery::Action::UNREGISTER_INSTANCE:
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryException("Trying to use enterprise feature without a valid license.");
|
||||
}
|
||||
|
||||
if constexpr (!coordination::allow_ha) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
if (!FLAGS_raft_server_id) {
|
||||
throw QueryRuntimeException("Only coordinator can register coordinator server!");
|
||||
}
|
||||
@ -1229,14 +1214,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
return callback;
|
||||
|
||||
case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: {
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryException("Trying to use enterprise feature without a valid license.");
|
||||
}
|
||||
if constexpr (!coordination::allow_ha) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
if (!FLAGS_raft_server_id) {
|
||||
throw QueryRuntimeException("Only coordinator can register coordinator server!");
|
||||
}
|
||||
@ -1254,14 +1231,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
return callback;
|
||||
}
|
||||
case CoordinatorQuery::Action::SHOW_INSTANCES: {
|
||||
if (!license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
throw QueryException("Trying to use enterprise feature without a valid license.");
|
||||
}
|
||||
if constexpr (!coordination::allow_ha) {
|
||||
throw QueryRuntimeException(
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
if (!FLAGS_raft_server_id) {
|
||||
throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES.");
|
||||
}
|
||||
|
@ -53,10 +53,10 @@ struct ReplicationQueryHandler {
|
||||
const std::optional<utils::UUID> &main_uuid) = 0;
|
||||
|
||||
// as MAIN, define and connect to REPLICAs
|
||||
virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> utils::BasicResult<RegisterReplicaError> = 0;
|
||||
|
||||
virtual auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
virtual auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> utils::BasicResult<RegisterReplicaError> = 0;
|
||||
|
||||
// as MAIN, remove a REPLICA connection
|
||||
|
@ -123,10 +123,10 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
const std::optional<utils::UUID> &main_uuid) override;
|
||||
|
||||
// as MAIN, define and connect to REPLICAs
|
||||
auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override;
|
||||
|
||||
auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
auto RegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override;
|
||||
|
||||
// as MAIN, remove a REPLICA connection
|
||||
@ -145,8 +145,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
auto GetReplicaUUID() -> std::optional<utils::UUID>;
|
||||
|
||||
private:
|
||||
template <bool AllowRPCFailure>
|
||||
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
template <bool SendSwapUUID>
|
||||
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> {
|
||||
MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!");
|
||||
auto maybe_client = repl_state_.RegisterReplica(config);
|
||||
@ -172,7 +172,7 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
}
|
||||
const auto main_uuid =
|
||||
std::get<memgraph::replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_;
|
||||
if (send_swap_uuid) {
|
||||
if constexpr (SendSwapUUID) {
|
||||
if (!memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_,
|
||||
main_uuid)) {
|
||||
return memgraph::query::RegisterReplicaError::ERROR_ACCEPTING_MAIN;
|
||||
@ -205,9 +205,6 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) {
|
||||
return false;
|
||||
}
|
||||
if (state == storage::replication::ReplicaState::MAYBE_BEHIND) {
|
||||
return AllowRPCFailure;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
|
@ -192,12 +192,12 @@ bool ReplicationHandler::SetReplicationRoleMain() {
|
||||
|
||||
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) {
|
||||
return SetReplicationRoleReplica_<false>(config, main_uuid);
|
||||
return SetReplicationRoleReplica_<true>(config, main_uuid);
|
||||
}
|
||||
|
||||
bool ReplicationHandler::TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
|
||||
const std::optional<utils::UUID> &main_uuid) {
|
||||
return SetReplicationRoleReplica_<true>(config, main_uuid);
|
||||
return SetReplicationRoleReplica_<false>(config, main_uuid);
|
||||
}
|
||||
|
||||
bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) {
|
||||
@ -226,16 +226,14 @@ bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid)
|
||||
};
|
||||
|
||||
// as MAIN, define and connect to REPLICAs
|
||||
auto ReplicationHandler::TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config,
|
||||
bool send_swap_uuid)
|
||||
auto ReplicationHandler::TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> {
|
||||
return RegisterReplica_<true>(config, send_swap_uuid);
|
||||
return RegisterReplica_<true>(config);
|
||||
}
|
||||
|
||||
auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config,
|
||||
bool send_swap_uuid)
|
||||
auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config)
|
||||
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> {
|
||||
return RegisterReplica_<false>(config, send_swap_uuid);
|
||||
return RegisterReplica_<false>(config);
|
||||
}
|
||||
|
||||
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::query::UnregisterReplicaResult {
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
|
@ -14,14 +14,13 @@
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
|
||||
#include <range/v3/view.hpp>
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
template <class F, class T, class R = typename std::result_of<F(T)>::type, class V = std::vector<R>>
|
||||
V fmap(F &&f, const std::vector<T> &v) {
|
||||
V r;
|
||||
r.reserve(v.size());
|
||||
std::ranges::transform(v, std::back_inserter(r), std::forward<F>(f));
|
||||
return r;
|
||||
template <class F, class T, class R = typename std::invoke_result<F, T>::type>
|
||||
auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
|
||||
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
|
||||
}
|
||||
|
||||
} // namespace memgraph::utils
|
||||
|
@ -56,7 +56,6 @@ add_subdirectory(python_query_modules_reloading)
|
||||
add_subdirectory(analyze_graph)
|
||||
add_subdirectory(transaction_queue)
|
||||
add_subdirectory(mock_api)
|
||||
#add_subdirectory(graphql)
|
||||
add_subdirectory(disk_storage)
|
||||
add_subdirectory(load_csv)
|
||||
add_subdirectory(init_file_flags)
|
||||
@ -77,10 +76,7 @@ add_subdirectory(query_modules_storage_modes)
|
||||
add_subdirectory(garbage_collection)
|
||||
add_subdirectory(query_planning)
|
||||
add_subdirectory(awesome_functions)
|
||||
|
||||
if (MG_EXPERIMENTAL_HIGH_AVAILABILITY)
|
||||
add_subdirectory(high_availability_experimental)
|
||||
endif ()
|
||||
add_subdirectory(high_availability)
|
||||
|
||||
add_subdirectory(replication_experimental)
|
||||
|
||||
|
@ -228,6 +228,6 @@ startup_config_dict = {
|
||||
"experimental_enabled": (
|
||||
"",
|
||||
"",
|
||||
"Experimental features to be used, comma seperated. Options [system-replication]",
|
||||
"Experimental features to be used, comma seperated. Options [system-replication, high-availability]",
|
||||
),
|
||||
}
|
||||
|
15
tests/e2e/high_availability/CMakeLists.txt
Normal file
15
tests/e2e/high_availability/CMakeLists.txt
Normal file
@ -0,0 +1,15 @@
|
||||
find_package(gflags REQUIRED)
|
||||
|
||||
copy_e2e_python_files(high_availability coordinator.py)
|
||||
copy_e2e_python_files(high_availability single_coordinator.py)
|
||||
copy_e2e_python_files(high_availability coord_cluster_registration.py)
|
||||
copy_e2e_python_files(high_availability distributed_coords.py)
|
||||
copy_e2e_python_files(high_availability disable_writing_on_main_after_restart.py)
|
||||
copy_e2e_python_files(high_availability manual_setting_replicas.py)
|
||||
copy_e2e_python_files(high_availability not_replicate_from_old_main.py)
|
||||
copy_e2e_python_files(high_availability common.py)
|
||||
copy_e2e_python_files(high_availability workloads.yaml)
|
||||
|
||||
copy_e2e_python_files_from_parent_folder(high_availability ".." memgraph.py)
|
||||
copy_e2e_python_files_from_parent_folder(high_availability ".." interactive_mg_runner.py)
|
||||
copy_e2e_python_files_from_parent_folder(high_availability ".." mg_utils.py)
|
@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
@ -44,6 +45,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
@ -57,6 +59,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
@ -70,6 +73,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
@ -81,6 +85,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
@ -92,6 +97,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7692",
|
||||
"--log-level=TRACE",
|
@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
@ -49,6 +50,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
@ -67,6 +69,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
@ -85,6 +88,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
@ -96,6 +100,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
@ -107,6 +112,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7692",
|
||||
"--log-level=TRACE",
|
@ -31,6 +31,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
@ -44,6 +45,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
@ -57,6 +59,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
@ -70,6 +73,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
@ -81,6 +85,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
@ -92,6 +97,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"coordinator_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7692",
|
||||
"--log-level=TRACE",
|
@ -14,8 +14,7 @@ import sys
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
from common import execute_and_fetch_all
|
||||
from mg_utils import mg_sleep_and_assert
|
||||
from common import connect, execute_and_fetch_all
|
||||
|
||||
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
|
||||
@ -26,20 +25,28 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_3": {
|
||||
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10013",
|
||||
],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_no_manual_setup_on_main(connection):
|
||||
def test_no_manual_setup_on_main():
|
||||
# Goal of this test is to check that all manual registration actions are disabled on instances with coordiantor server port
|
||||
|
||||
# 1
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
any_main = connection(7687, "instance_3").cursor()
|
||||
any_main = connect(host="localhost", port=7687).cursor()
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(any_main, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';")
|
||||
assert str(e.value) == "Can't register replica manually on instance with coordinator server port."
|
@ -16,7 +16,7 @@ import tempfile
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
from common import execute_and_fetch_all, safe_execute
|
||||
from common import connect, execute_and_fetch_all
|
||||
from mg_utils import mg_sleep_and_assert
|
||||
|
||||
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
@ -28,12 +28,12 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv
|
||||
|
||||
MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = {
|
||||
"shared_replica": {
|
||||
"args": ["--bolt-port", "7688", "--log-level", "TRACE"],
|
||||
"args": ["--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level", "TRACE"],
|
||||
"log_file": "replica2.log",
|
||||
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
|
||||
},
|
||||
"main1": {
|
||||
"args": ["--bolt-port", "7687", "--log-level", "TRACE"],
|
||||
"args": ["--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level", "TRACE"],
|
||||
"log_file": "main.log",
|
||||
"setup_queries": ["REGISTER REPLICA shared_replica SYNC TO '127.0.0.1:10001' ;"],
|
||||
},
|
||||
@ -42,12 +42,12 @@ MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = {
|
||||
|
||||
MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = {
|
||||
"replica": {
|
||||
"args": ["--bolt-port", "7689", "--log-level", "TRACE"],
|
||||
"args": ["--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level", "TRACE"],
|
||||
"log_file": "replica.log",
|
||||
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
|
||||
},
|
||||
"main_2": {
|
||||
"args": ["--bolt-port", "7690", "--log-level", "TRACE"],
|
||||
"args": ["--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level", "TRACE"],
|
||||
"log_file": "main_2.log",
|
||||
"setup_queries": [
|
||||
"REGISTER REPLICA shared_replica SYNC TO '127.0.0.1:10001' ;",
|
||||
@ -57,7 +57,7 @@ MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = {
|
||||
}
|
||||
|
||||
|
||||
def test_replication_works_on_failover(connection):
|
||||
def test_replication_works_on_failover():
|
||||
# Goal of this test is to check that after changing `shared_replica`
|
||||
# to be part of new cluster, `main` (old cluster) can't write any more to it
|
||||
|
||||
@ -65,7 +65,7 @@ def test_replication_works_on_failover(connection):
|
||||
interactive_mg_runner.start_all_keep_others(MEMGRAPH_FIRST_CLUSTER_DESCRIPTION)
|
||||
|
||||
# 2
|
||||
main_cursor = connection(7687, "main1").cursor()
|
||||
main_cursor = connect(host="localhost", port=7687).cursor()
|
||||
expected_data_on_main = [
|
||||
("shared_replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
|
||||
]
|
||||
@ -76,7 +76,7 @@ def test_replication_works_on_failover(connection):
|
||||
interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION)
|
||||
|
||||
# 4
|
||||
new_main_cursor = connection(7690, "main_2").cursor()
|
||||
new_main_cursor = connect(host="localhost", port=7690).cursor()
|
||||
|
||||
def retrieve_data_show_replicas():
|
||||
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
|
||||
@ -88,14 +88,11 @@ def test_replication_works_on_failover(connection):
|
||||
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
|
||||
|
||||
# 5
|
||||
shared_replica_cursor = connection(7688, "shared_replica").cursor()
|
||||
shared_replica_cursor = connect(host="localhost", port=7688).cursor()
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(main_cursor, "CREATE ();")
|
||||
assert (
|
||||
str(e.value)
|
||||
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
|
||||
)
|
||||
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
|
||||
|
||||
res = execute_and_fetch_all(main_cursor, "MATCH (n) RETURN count(n) as count;")[0][0]
|
||||
assert res == 1, "Vertex should be created"
|
||||
@ -115,7 +112,7 @@ def test_replication_works_on_failover(connection):
|
||||
interactive_mg_runner.stop_all()
|
||||
|
||||
|
||||
def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
def test_not_replicate_old_main_register_new_cluster():
|
||||
# Goal of this test is to check that although replica is registered in one cluster
|
||||
# it can be re-registered to new cluster
|
||||
# This flow checks if Registering replica is idempotent and that old main cannot talk to replica
|
||||
@ -130,6 +127,7 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION = {
|
||||
"shared_instance": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
@ -143,6 +141,7 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
@ -155,7 +154,14 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"],
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=1",
|
||||
"--raft-server-port=10111",
|
||||
],
|
||||
"log_file": "coordinator.log",
|
||||
"setup_queries": [
|
||||
"REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
|
||||
@ -170,7 +176,7 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
|
||||
# 2
|
||||
|
||||
first_cluster_coord_cursor = connection(7690, "coord_1").cursor()
|
||||
first_cluster_coord_cursor = connect(host="localhost", port=7690).cursor()
|
||||
|
||||
def show_repl_cluster():
|
||||
return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;")))
|
||||
@ -188,6 +194,7 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION = {
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
@ -200,14 +207,21 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": ["--bolt-port", "7691", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10112"],
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=1",
|
||||
"--raft-server-port=10112",
|
||||
],
|
||||
"log_file": "coordinator.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
}
|
||||
|
||||
interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION)
|
||||
second_cluster_coord_cursor = connection(7691, "coord_2").cursor()
|
||||
second_cluster_coord_cursor = connect(host="localhost", port=7691).cursor()
|
||||
execute_and_fetch_all(
|
||||
second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';"
|
||||
)
|
||||
@ -230,24 +244,21 @@ def test_not_replicate_old_main_register_new_cluster(connection):
|
||||
mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster)
|
||||
|
||||
# 5
|
||||
main_1_cursor = connection(7689, "main_1").cursor()
|
||||
main_1_cursor = connect(host="localhost", port=7689).cursor()
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(main_1_cursor, "CREATE ();")
|
||||
assert (
|
||||
str(e.value)
|
||||
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
|
||||
)
|
||||
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
|
||||
|
||||
shared_replica_cursor = connection(7688, "shared_replica").cursor()
|
||||
shared_replica_cursor = connect(host="localhost", port=7688).cursor()
|
||||
res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||
assert res == 0, "Old main should not replicate to 'shared' replica"
|
||||
|
||||
# 6
|
||||
main_2_cursor = connection(7687, "main_2").cursor()
|
||||
main_2_cursor = connect(host="localhost", port=7687).cursor()
|
||||
|
||||
execute_and_fetch_all(main_2_cursor, "CREATE ();")
|
||||
|
||||
shared_replica_cursor = connection(7688, "shared_replica").cursor()
|
||||
shared_replica_cursor = connect(host="localhost", port=7688).cursor()
|
||||
res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||
assert res == 1, "New main should replicate to 'shared' replica"
|
||||
|
@ -30,6 +30,7 @@ TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
@ -43,6 +44,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
@ -56,6 +58,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
@ -68,7 +71,14 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator": {
|
||||
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"],
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=1",
|
||||
"--raft-server-port=10111",
|
||||
],
|
||||
"log_file": "coordinator.log",
|
||||
"setup_queries": [
|
||||
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
|
||||
@ -198,10 +208,7 @@ def test_replication_works_on_replica_instance_restart():
|
||||
instance_1_cursor = connect(host="localhost", port=7688).cursor()
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(main_cursor, "CREATE ();")
|
||||
assert (
|
||||
str(e.value)
|
||||
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
|
||||
)
|
||||
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
|
||||
|
||||
res_instance_1 = execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
|
||||
assert res_instance_1 == 1
|
@ -1,19 +1,19 @@
|
||||
ha_cluster: &ha_cluster
|
||||
cluster:
|
||||
replica_1:
|
||||
args: ["--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"]
|
||||
args: ["--experimental-enabled=high-availability", "--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"]
|
||||
log_file: "replication-e2e-replica1.log"
|
||||
setup_queries: []
|
||||
replica_2:
|
||||
args: ["--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"]
|
||||
args: ["--experimental-enabled=high-availability", "--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"]
|
||||
log_file: "replication-e2e-replica2.log"
|
||||
setup_queries: []
|
||||
main:
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"]
|
||||
args: ["--experimental-enabled=high-availability", "--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"]
|
||||
log_file: "replication-e2e-main.log"
|
||||
setup_queries: []
|
||||
coordinator:
|
||||
args: ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"]
|
||||
args: ["--experimental-enabled=high-availability", "--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"]
|
||||
log_file: "replication-e2e-coordinator.log"
|
||||
setup_queries: [
|
||||
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
|
||||
@ -25,29 +25,29 @@ ha_cluster: &ha_cluster
|
||||
workloads:
|
||||
- name: "Coordinator"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/coordinator.py"]
|
||||
args: ["high_availability/coordinator.py"]
|
||||
<<: *ha_cluster
|
||||
|
||||
- name: "Single coordinator"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/single_coordinator.py"]
|
||||
args: ["high_availability/single_coordinator.py"]
|
||||
|
||||
- name: "Disabled manual setting of replication cluster"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/manual_setting_replicas.py"]
|
||||
args: ["high_availability/manual_setting_replicas.py"]
|
||||
|
||||
- name: "Coordinator cluster registration"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/coord_cluster_registration.py"]
|
||||
args: ["high_availability/coord_cluster_registration.py"]
|
||||
|
||||
- name: "Not replicate from old main"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/not_replicate_from_old_main.py"]
|
||||
args: ["high_availability/not_replicate_from_old_main.py"]
|
||||
|
||||
- name: "Disable writing on main after restart"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/disable_writing_on_main_after_restart.py"]
|
||||
args: ["high_availability/disable_writing_on_main_after_restart.py"]
|
||||
|
||||
- name: "Distributed coordinators"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/distributed_coords.py"]
|
||||
args: ["high_availability/distributed_coords.py"]
|
@ -1,15 +0,0 @@
|
||||
find_package(gflags REQUIRED)
|
||||
|
||||
copy_e2e_python_files(ha_experimental coordinator.py)
|
||||
copy_e2e_python_files(ha_experimental single_coordinator.py)
|
||||
copy_e2e_python_files(ha_experimental coord_cluster_registration.py)
|
||||
copy_e2e_python_files(ha_experimental distributed_coords.py)
|
||||
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
|
||||
copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py)
|
||||
copy_e2e_python_files(ha_experimental disable_writing_on_main_after_restart.py)
|
||||
copy_e2e_python_files(ha_experimental common.py)
|
||||
copy_e2e_python_files(ha_experimental workloads.yaml)
|
||||
|
||||
copy_e2e_python_files_from_parent_folder(ha_experimental ".." memgraph.py)
|
||||
copy_e2e_python_files_from_parent_folder(ha_experimental ".." interactive_mg_runner.py)
|
||||
copy_e2e_python_files_from_parent_folder(ha_experimental ".." mg_utils.py)
|
@ -160,6 +160,12 @@ def kill(context, name, keep_directories=True):
|
||||
MEMGRAPH_INSTANCES.pop(name)
|
||||
|
||||
|
||||
def kill_all(context, keep_directories=True):
|
||||
for key in MEMGRAPH_INSTANCES.keys():
|
||||
MEMGRAPH_INSTANCES[key].kill(keep_directories)
|
||||
MEMGRAPH_INSTANCES.clear()
|
||||
|
||||
|
||||
def cleanup_directories_on_exit(value=True):
|
||||
CLEANUP_DIRECTORIES_ON_EXIT = value
|
||||
|
||||
|
@ -795,7 +795,7 @@ def test_async_replication_when_main_is_killed():
|
||||
"data_directory": f"{data_directory_main.name}",
|
||||
},
|
||||
}
|
||||
|
||||
interactive_mg_runner.kill_all(CONFIGURATION)
|
||||
interactive_mg_runner.start_all(CONFIGURATION)
|
||||
|
||||
# 1/
|
||||
@ -878,7 +878,7 @@ def test_sync_replication_when_main_is_killed():
|
||||
"data_directory": f"{data_directory_main.name}",
|
||||
},
|
||||
}
|
||||
|
||||
interactive_mg_runner.kill_all(CONFIGURATION)
|
||||
interactive_mg_runner.start_all(CONFIGURATION)
|
||||
|
||||
# 1/
|
||||
@ -1990,5 +1990,4 @@ def test_replication_not_messed_up_by_ShowIndexInfo(connection):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-k", "test_basic_recovery"]))
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
||||
|
@ -149,14 +149,12 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
|
||||
},
|
||||
std::nullopt);
|
||||
|
||||
const auto ® = main.repl_handler.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = "REPLICA",
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true);
|
||||
const auto ® = main.repl_handler.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = "REPLICA",
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
});
|
||||
ASSERT_FALSE(reg.HasError()) << (int)reg.GetError();
|
||||
|
||||
// vertex create
|
||||
@ -453,24 +451,20 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
|
||||
std::nullopt);
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
})
|
||||
.HasError());
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
})
|
||||
.HasError());
|
||||
|
||||
const auto *vertex_label = "label";
|
||||
@ -604,14 +598,12 @@ TEST_F(ReplicationTest, RecoveryProcess) {
|
||||
},
|
||||
std::nullopt);
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_EQ(main.db.storage()->GetReplicaState(replicas[0]), ReplicaState::RECOVERY);
|
||||
@ -684,14 +676,12 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
|
||||
std::nullopt);
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = "REPLICA_ASYNC",
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = "REPLICA_ASYNC",
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
})
|
||||
.HasError());
|
||||
|
||||
static constexpr size_t vertices_create_num = 10;
|
||||
@ -742,25 +732,21 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
std::nullopt);
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = 10001,
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = 10001,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
std::optional<Gid> vertex_gid;
|
||||
@ -789,15 +775,12 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
ASSERT_TRUE(replica1.repl_handler.SetReplicationRoleMain());
|
||||
|
||||
ASSERT_FALSE(replica1.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = 10001,
|
||||
},
|
||||
true)
|
||||
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = 10001,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
{
|
||||
@ -826,15 +809,12 @@ TEST_F(ReplicationTest, EpochTest) {
|
||||
},
|
||||
std::nullopt);
|
||||
ASSERT_TRUE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true)
|
||||
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
})
|
||||
.HasError());
|
||||
|
||||
{
|
||||
@ -875,27 +855,21 @@ TEST_F(ReplicationTest, ReplicationInformation) {
|
||||
std::nullopt);
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
},
|
||||
true)
|
||||
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
},
|
||||
true)
|
||||
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_TRUE(main.repl_state.IsMain());
|
||||
@ -939,25 +913,21 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
|
||||
},
|
||||
std::nullopt);
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica1_port,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_TRUE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = replica2_port,
|
||||
})
|
||||
.GetError() == RegisterReplicaError::NAME_EXISTS);
|
||||
}
|
||||
|
||||
@ -982,25 +952,21 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
|
||||
std::nullopt);
|
||||
|
||||
ASSERT_FALSE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
})
|
||||
.HasError());
|
||||
|
||||
ASSERT_TRUE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::ASYNC,
|
||||
.ip_address = local_host,
|
||||
.port = common_port,
|
||||
})
|
||||
.GetError() == RegisterReplicaError::ENDPOINT_EXISTS);
|
||||
}
|
||||
|
||||
@ -1038,23 +1004,19 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
|
||||
},
|
||||
std::nullopt);
|
||||
|
||||
auto res = main->repl_handler.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true);
|
||||
auto res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
});
|
||||
ASSERT_FALSE(res.HasError()) << (int)res.GetError();
|
||||
res = main->repl_handler.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
},
|
||||
true);
|
||||
res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
});
|
||||
ASSERT_FALSE(res.HasError()) << (int)res.GetError();
|
||||
|
||||
auto replica_infos = main->db.storage()->ReplicasInfo();
|
||||
@ -1103,23 +1065,19 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
|
||||
.port = ports[1],
|
||||
},
|
||||
std::nullopt);
|
||||
auto res = main->repl_handler.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true);
|
||||
auto res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[0],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
});
|
||||
ASSERT_FALSE(res.HasError());
|
||||
res = main->repl_handler.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
},
|
||||
true);
|
||||
res = main->repl_handler.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = replicas[1],
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[1],
|
||||
});
|
||||
ASSERT_FALSE(res.HasError());
|
||||
|
||||
auto replica_infos = main->db.storage()->ReplicasInfo();
|
||||
@ -1157,13 +1115,11 @@ TEST_F(ReplicationTest, AddingInvalidReplica) {
|
||||
MinMemgraph main(main_conf);
|
||||
|
||||
ASSERT_TRUE(main.repl_handler
|
||||
.TryRegisterReplica(
|
||||
ReplicationClientConfig{
|
||||
.name = "REPLICA",
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
},
|
||||
true)
|
||||
.TryRegisterReplica(ReplicationClientConfig{
|
||||
.name = "REPLICA",
|
||||
.mode = ReplicationMode::SYNC,
|
||||
.ip_address = local_host,
|
||||
.port = ports[0],
|
||||
})
|
||||
.GetError() == RegisterReplicaError::ERROR_ACCEPTING_MAIN);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user