diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 6f9609aaa..8d494dfb7 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -140,5 +140,19 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_nam return false; } +auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool { + try { + auto stream{rpc_client_.Stream<EnableWritingOnMainRpc>()}; + if (!stream.AwaitResponse().success) { + spdlog::error("Failed to receive successful RPC response for enabling writing on main!"); + return false; + } + return true; + } catch (rpc::RpcFailedException const &) { + spdlog::error("Failed to enable writing on main!"); + } + return false; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index a82fbea1f..5153eeb4d 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -45,6 +45,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received UnregisterReplicaRpc on coordinator server"); CoordinatorHandlers::UnregisterReplicaHandler(replication_handler, req_reader, res_builder); }); + + server.Register<coordination::EnableWritingOnMainRpc>( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received EnableWritingOnMainRpc on coordinator server"); + CoordinatorHandlers::EnableWritingOnMainHandler(replication_handler, req_reader, res_builder); + }); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, @@ -148,7 +154,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa } } } - spdlog::error(fmt::format("FICO : Promote replica to main was success {}", std::string(req.main_uuid_))); + spdlog::info("Promote replica to main was success {}", std::string(req.main_uuid_)); slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder); } @@ -184,5 +190,22 @@ void CoordinatorHandlers::UnregisterReplicaHandler(replication::ReplicationHandl } } +void CoordinatorHandlers::EnableWritingOnMainHandler(replication::ReplicationHandler &replication_handler, + slk::Reader * /*req_reader*/, slk::Builder *res_builder) { + if (!replication_handler.IsMain()) { + spdlog::error("Enable writing on main must be performed on main!"); + slk::Save(coordination::EnableWritingOnMainRes{false}, res_builder); + return; + } + + if (!replication_handler.GetReplState().EnableWritingOnMain()) { + spdlog::error("Enabling writing on main failed!"); + slk::Save(coordination::EnableWritingOnMainRes{false}, res_builder); + return; + } + + slk::Save(coordination::EnableWritingOnMainRes{true}, res_builder); +} + } // namespace memgraph::dbms #endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 070dcc49f..24bdf6df3 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -17,6 +17,7 @@ #include "nuraft/coordinator_state_machine.hpp" #include "nuraft/coordinator_state_manager.hpp" #include "utils/counter.hpp" +#include "utils/functional.hpp" #include <range/v3/view.hpp> #include <shared_mutex> @@ -87,6 +88,11 @@ CoordinatorInstance::CoordinatorInstance() auto const curr_main_uuid = self->GetMainUUID(); if (curr_main_uuid == repl_instance_uuid.value()) { + if (!repl_instance.EnableWritingOnMain()) { + spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); + return; + } + repl_instance.OnSuccessPing(); return; } @@ -125,9 +131,6 @@ CoordinatorInstance::CoordinatorInstance() auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { auto const coord_instances = raft_state_.GetAllCoordinators(); - std::vector<InstanceStatus> instances_status; - instances_status.reserve(repl_instances_.size() + coord_instances.size()); - auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string { if (!instance.IsAlive()) return "unknown"; if (instance.IsMain()) return "main"; @@ -149,8 +152,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { // CoordinatorState to every instance, we can be smarter about this using our RPC. }; - std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status); - + auto instances_status = utils::fmap(coord_instance_to_status, coord_instances); { auto lock = std::shared_lock{coord_instance_lock_}; std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status); @@ -189,6 +191,7 @@ auto CoordinatorInstance::TryFailover() -> void { } } + // TODO: (andi) fmap compliant ReplicationClientsInfo repl_clients_info; repl_clients_info.reserve(repl_instances_.size() - 1); std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 884eabfe2..7291c6fb0 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -68,6 +68,18 @@ void UnregisterReplicaRes::Load(UnregisterReplicaRes *self, memgraph::slk::Reade memgraph::slk::Load(self, reader); } +void EnableWritingOnMainRes::Save(EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {} + +void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -89,6 +101,12 @@ constexpr utils::TypeInfo coordination::UnregisterReplicaReq::kType{utils::TypeI constexpr utils::TypeInfo coordination::UnregisterReplicaRes::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_RES, "UnregisterReplicaRes", nullptr}; +constexpr utils::TypeInfo coordination::EnableWritingOnMainReq::kType{utils::TypeId::COORD_ENABLE_WRITING_ON_MAIN_REQ, + "CoordEnableWritingOnMainReq", nullptr}; + +constexpr utils::TypeInfo coordination::EnableWritingOnMainRes::kType{utils::TypeId::COORD_ENABLE_WRITING_ON_MAIN_RES, + "CoordEnableWritingOnMainRes", nullptr}; + namespace slk { void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) { @@ -141,6 +159,14 @@ void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Rea memgraph::slk::Load(&self->success, reader); } +void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); +} + +void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index bc2bc99c9..cb6167540 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -11,12 +11,12 @@ #pragma once -#include "utils/uuid.hpp" #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" #include "rpc/client.hpp" #include "utils/scheduler.hpp" +#include "utils/uuid.hpp" namespace memgraph::coordination { @@ -54,6 +54,8 @@ class CoordinatorClient { auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool; + auto SendEnableWritingOnMainRpc() const -> bool; + auto ReplicationClientInfo() const -> ReplClientInfo; auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index b3c5c1580..ac7caba2e 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -36,6 +36,8 @@ class CoordinatorHandlers { static void UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + static void EnableWritingOnMainHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index a52597dd8..137c95e73 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -111,6 +111,31 @@ struct UnregisterReplicaRes { using UnregisterReplicaRpc = rpc::RequestResponse<UnregisterReplicaReq, UnregisterReplicaRes>; +struct EnableWritingOnMainReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader); + static void Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder); + + EnableWritingOnMainReq() = default; +}; + +struct EnableWritingOnMainRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(EnableWritingOnMainRes *self, memgraph::slk::Reader *reader); + static void Save(EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder); + + explicit EnableWritingOnMainRes(bool success) : success(success) {} + EnableWritingOnMainRes() = default; + + bool success; +}; + +using EnableWritingOnMainRpc = rpc::RequestResponse<EnableWritingOnMainReq, EnableWritingOnMainRes>; + } // namespace memgraph::coordination // SLK serialization declarations @@ -134,6 +159,9 @@ void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Rea void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader); +void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 6b53197a0..b6ef06008 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -15,8 +15,6 @@ #include <flags/replication.hpp> -#include <optional> - #include <libnuraft/nuraft.hxx> namespace memgraph::coordination { diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 1a0b66402..573b96be6 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -65,6 +65,8 @@ class ReplicationInstance { // TODO: (andi) Inconsistent API auto GetClient() -> CoordinatorClient &; + auto EnableWritingOnMain() -> bool; + auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; auto ResetMainUUID() -> void; auto GetMainUUID() const -> const std::optional<utils::UUID> &; diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 6cbd24985..a550bb844 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -109,5 +109,7 @@ auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_n return client_.SendUnregisterReplicaRpc(instance_name); } +auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); } + } // namespace memgraph::coordination #endif diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 53f702e08..36725cea3 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -93,6 +93,7 @@ #include "utils/exceptions.hpp" #include "utils/file.hpp" #include "utils/flag_validation.hpp" +#include "utils/functional.hpp" #include "utils/likely.hpp" #include "utils/logging.hpp" #include "utils/memory.hpp" @@ -1264,17 +1265,13 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param callback.fn = [handler = CoordQueryHandler{*coordinator_state}, replica_nfields = callback.header.size()]() mutable { auto const instances = handler.ShowInstances(); - std::vector<std::vector<TypedValue>> result{}; - result.reserve(result.size()); + auto const converter = [](const auto &status) -> std::vector<TypedValue> { + return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address}, + TypedValue{status.coord_socket_address}, TypedValue{status.is_alive}, + TypedValue{status.cluster_role}}; + }; - std::ranges::transform(instances, std::back_inserter(result), - [](const auto &status) -> std::vector<TypedValue> { - return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address}, - TypedValue{status.coord_socket_address}, TypedValue{status.is_alive}, - TypedValue{status.cluster_role}}; - }); - - return result; + return utils::fmap(converter, instances); }; return callback; } @@ -4404,9 +4401,19 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, UpdateTypeCount(rw_type); - if (interpreter_context_->repl_state->IsReplica() && IsQueryWrite(rw_type)) { - query_execution = nullptr; - throw QueryException("Write query forbidden on the replica!"); + bool const write_query = IsQueryWrite(rw_type); + if (write_query) { + if (interpreter_context_->repl_state->IsReplica()) { + query_execution = nullptr; + throw QueryException("Write query forbidden on the replica!"); + } +#ifdef MG_ENTERPRISE + if (FLAGS_coordinator_server_port && !interpreter_context_->repl_state->IsMainWriteable()) { + query_execution = nullptr; + throw QueryException( + "Write query forbidden on the main! Coordinator needs to enable writing on main by sending RPC message."); + } +#endif } // Set the target db to the current db (some queries have different target from the current db) diff --git a/src/replication/include/replication/state.hpp b/src/replication/include/replication/state.hpp index 18f9efd4e..fb47185fc 100644 --- a/src/replication/include/replication/state.hpp +++ b/src/replication/include/replication/state.hpp @@ -39,7 +39,8 @@ enum class RegisterReplicaError : uint8_t { NAME_EXISTS, ENDPOINT_EXISTS, COULD_ struct RoleMainData { RoleMainData() = default; - explicit RoleMainData(ReplicationEpoch e, std::optional<utils::UUID> uuid = std::nullopt) : epoch_(std::move(e)) { + explicit RoleMainData(ReplicationEpoch e, bool writing_enabled, std::optional<utils::UUID> uuid = std::nullopt) + : epoch_(std::move(e)), writing_enabled_(writing_enabled) { if (uuid) { uuid_ = *uuid; } @@ -54,6 +55,7 @@ struct RoleMainData { ReplicationEpoch epoch_; std::list<ReplicationClient> registered_replicas_{}; // TODO: data race issues utils::UUID uuid_; + bool writing_enabled_{false}; }; struct RoleReplicaData { @@ -90,6 +92,21 @@ struct ReplicationState { bool IsMain() const { return GetRole() == replication_coordination_glue::ReplicationRole::MAIN; } bool IsReplica() const { return GetRole() == replication_coordination_glue::ReplicationRole::REPLICA; } + auto IsMainWriteable() const -> bool { + if (auto const *main = std::get_if<RoleMainData>(&replication_data_)) { + return main->writing_enabled_; + } + return false; + } + + auto EnableWritingOnMain() -> bool { + if (auto *main = std::get_if<RoleMainData>(&replication_data_)) { + main->writing_enabled_ = true; + return true; + } + return false; + } + bool HasDurability() const { return nullptr != durability_; } bool TryPersistRoleMain(std::string new_epoch, utils::UUID main_uuid); diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 6b1d128ec..1a505f9a4 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -144,8 +144,8 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t { return std::visit( utils::Overloaded{ [&](durability::MainRole &&r) -> FetchReplicationResult_t { - auto res = - RoleMainData{std::move(r.epoch), r.main_uuid.has_value() ? r.main_uuid.value() : utils::UUID{}}; + auto res = RoleMainData{std::move(r.epoch), false, + r.main_uuid.has_value() ? r.main_uuid.value() : utils::UUID{}}; auto b = durability_->begin(durability::kReplicationReplicaPrefix); auto e = durability_->end(durability::kReplicationReplicaPrefix); for (; b != e; ++b) { @@ -253,7 +253,7 @@ bool ReplicationState::SetReplicationRoleMain(const utils::UUID &main_uuid) { return false; } - replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, main_uuid}; + replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, true, main_uuid}; return true; } diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp new file mode 100644 index 000000000..299e49612 --- /dev/null +++ b/src/utils/functional.hpp @@ -0,0 +1,27 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include <algorithm> +#include <vector> + +namespace memgraph::utils { + +template <class F, class T, class R = typename std::result_of<F(T)>::type, class V = std::vector<R>> +V fmap(F &&f, const std::vector<T> &v) { + V r; + r.reserve(v.size()); + std::ranges::transform(v, std::back_inserter(r), std::forward<F>(f)); + return r; +} + +} // namespace memgraph::utils diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 2fd36cf41..a4068b023 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -109,6 +109,8 @@ enum class TypeId : uint64_t { COORD_SWAP_UUID_RES, COORD_UNREGISTER_REPLICA_REQ, COORD_UNREGISTER_REPLICA_RES, + COORD_ENABLE_WRITING_ON_MAIN_REQ, + COORD_ENABLE_WRITING_ON_MAIN_RES, // AST AST_LABELIX = 3000, diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt index d97080585..bbef1ebc7 100644 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ b/tests/e2e/high_availability_experimental/CMakeLists.txt @@ -6,6 +6,7 @@ copy_e2e_python_files(ha_experimental coord_cluster_registration.py) copy_e2e_python_files(ha_experimental distributed_coords.py) copy_e2e_python_files(ha_experimental manual_setting_replicas.py) copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py) +copy_e2e_python_files(ha_experimental disable_writing_on_main_after_restart.py) copy_e2e_python_files(ha_experimental common.py) copy_e2e_python_files(ha_experimental workloads.yaml) diff --git a/tests/e2e/high_availability_experimental/common.py b/tests/e2e/high_availability_experimental/common.py index adfabd87a..2157b29ca 100644 --- a/tests/e2e/high_availability_experimental/common.py +++ b/tests/e2e/high_availability_experimental/common.py @@ -30,3 +30,14 @@ def safe_execute(function, *args): function(*args) except: pass + + +# NOTE: Repeated execution because it can fail if Raft server is not up +def add_coordinator(cursor, query): + for _ in range(10): + try: + execute_and_fetch_all(cursor, query) + return True + except Exception: + pass + return False diff --git a/tests/e2e/high_availability_experimental/coord_cluster_registration.py b/tests/e2e/high_availability_experimental/coord_cluster_registration.py index 819b3ab2a..cccaec5ad 100644 --- a/tests/e2e/high_availability_experimental/coord_cluster_registration.py +++ b/tests/e2e/high_availability_experimental/coord_cluster_registration.py @@ -16,7 +16,7 @@ import tempfile import interactive_mg_runner import pytest -from common import connect, execute_and_fetch_all, safe_execute +from common import add_coordinator, connect, execute_and_fetch_all, safe_execute from mg_utils import mg_sleep_and_assert interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -104,17 +104,6 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { } -# NOTE: Repeated execution because it can fail if Raft server is not up -def add_coordinator(cursor, query): - for _ in range(10): - try: - execute_and_fetch_all(cursor, query) - return True - except Exception: - pass - return False - - def test_register_repl_instances_then_coordinators(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) diff --git a/tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py b/tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py new file mode 100644 index 000000000..ad3d95828 --- /dev/null +++ b/tests/e2e/high_availability_experimental/disable_writing_on_main_after_restart.py @@ -0,0 +1,181 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import os +import shutil +import sys +import tempfile + +import interactive_mg_runner +import pytest +from common import add_coordinator, connect, execute_and_fetch_all, safe_execute +from mg_utils import mg_sleep_and_assert + +interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +interactive_mg_runner.PROJECT_DIR = os.path.normpath( + os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") +) +interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) +interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) + +TEMP_DIR = tempfile.TemporaryDirectory().name + +MEMGRAPH_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--also-log-to-stderr", + "--instance-health-check-frequency-sec", + "1", + "--instance-down-timeout-sec", + "5", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--also-log-to-stderr", + "--instance-health-check-frequency-sec", + "1", + "--instance-down-timeout-sec", + "5", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--also-log-to-stderr", + "--instance-health-check-frequency-sec", + "5", + "--instance-down-timeout-sec", + "10", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + "--also-log-to-stderr", + ], + "log_file": "coordinator3.log", + "setup_queries": [], + }, +} + + +def test_writing_disabled_on_main_restart(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") + assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + expected_cluster_coord3 = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + expected_cluster_coord3 = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ] + + mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + try: + instance3_cursor = connect(host="localhost", port=7689).cursor() + execute_and_fetch_all(instance3_cursor, "CREATE (n:Node {name: 'node'})") + except Exception as e: + assert ( + str(e) + == "Write query forbidden on the main! Coordinator needs to enable writing on main by sending RPC message." + ) + + expected_cluster_coord3 = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + + mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3) + execute_and_fetch_all(instance3_cursor, "CREATE (n:Node {name: 'node'})") + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/workloads.yaml b/tests/e2e/high_availability_experimental/workloads.yaml index e624d35c0..2159c374c 100644 --- a/tests/e2e/high_availability_experimental/workloads.yaml +++ b/tests/e2e/high_availability_experimental/workloads.yaml @@ -44,6 +44,10 @@ workloads: binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/not_replicate_from_old_main.py"] + - name: "Disable writing on main after restart" + binary: "tests/e2e/pytest_runner.sh" + args: ["high_availability_experimental/disable_writing_on_main_after_restart.py"] + - name: "Distributed coordinators" binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/distributed_coords.py"]