Forbid writing to cluster-managed main on restart (#1717)
This commit is contained in:
parent
fb281459b9
commit
20b47845f0
@ -140,5 +140,19 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_nam
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<EnableWritingOnMainRpc>()};
|
||||
if (!stream.AwaitResponse().success) {
|
||||
spdlog::error("Failed to receive successful RPC response for enabling writing on main!");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch (rpc::RpcFailedException const &) {
|
||||
spdlog::error("Failed to enable writing on main!");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -45,6 +45,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
|
||||
spdlog::info("Received UnregisterReplicaRpc on coordinator server");
|
||||
CoordinatorHandlers::UnregisterReplicaHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
|
||||
server.Register<coordination::EnableWritingOnMainRpc>(
|
||||
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
|
||||
spdlog::info("Received EnableWritingOnMainRpc on coordinator server");
|
||||
CoordinatorHandlers::EnableWritingOnMainHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
|
||||
@ -148,7 +154,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
|
||||
}
|
||||
}
|
||||
}
|
||||
spdlog::error(fmt::format("FICO : Promote replica to main was success {}", std::string(req.main_uuid_)));
|
||||
spdlog::info("Promote replica to main was success {}", std::string(req.main_uuid_));
|
||||
slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder);
|
||||
}
|
||||
|
||||
@ -184,5 +190,22 @@ void CoordinatorHandlers::UnregisterReplicaHandler(replication::ReplicationHandl
|
||||
}
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::EnableWritingOnMainHandler(replication::ReplicationHandler &replication_handler,
|
||||
slk::Reader * /*req_reader*/, slk::Builder *res_builder) {
|
||||
if (!replication_handler.IsMain()) {
|
||||
spdlog::error("Enable writing on main must be performed on main!");
|
||||
slk::Save(coordination::EnableWritingOnMainRes{false}, res_builder);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!replication_handler.GetReplState().EnableWritingOnMain()) {
|
||||
spdlog::error("Enabling writing on main failed!");
|
||||
slk::Save(coordination::EnableWritingOnMainRes{false}, res_builder);
|
||||
return;
|
||||
}
|
||||
|
||||
slk::Save(coordination::EnableWritingOnMainRes{true}, res_builder);
|
||||
}
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
#endif
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "nuraft/coordinator_state_manager.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
#include <range/v3/view.hpp>
|
||||
#include <shared_mutex>
|
||||
@ -87,6 +88,11 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
|
||||
auto const curr_main_uuid = self->GetMainUUID();
|
||||
if (curr_main_uuid == repl_instance_uuid.value()) {
|
||||
if (!repl_instance.EnableWritingOnMain()) {
|
||||
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
}
|
||||
@ -125,9 +131,6 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
auto const coord_instances = raft_state_.GetAllCoordinators();
|
||||
|
||||
std::vector<InstanceStatus> instances_status;
|
||||
instances_status.reserve(repl_instances_.size() + coord_instances.size());
|
||||
|
||||
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (instance.IsMain()) return "main";
|
||||
@ -149,8 +152,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
// CoordinatorState to every instance, we can be smarter about this using our RPC.
|
||||
};
|
||||
|
||||
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
|
||||
|
||||
auto instances_status = utils::fmap(coord_instance_to_status, coord_instances);
|
||||
{
|
||||
auto lock = std::shared_lock{coord_instance_lock_};
|
||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
|
||||
@ -189,6 +191,7 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: (andi) fmap compliant
|
||||
ReplicationClientsInfo repl_clients_info;
|
||||
repl_clients_info.reserve(repl_instances_.size() - 1);
|
||||
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
|
||||
|
@ -68,6 +68,18 @@ void UnregisterReplicaRes::Load(UnregisterReplicaRes *self, memgraph::slk::Reade
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
void EnableWritingOnMainRes::Save(EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self, builder);
|
||||
}
|
||||
|
||||
void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
|
||||
|
||||
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
|
||||
|
||||
} // namespace coordination
|
||||
|
||||
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
|
||||
@ -89,6 +101,12 @@ constexpr utils::TypeInfo coordination::UnregisterReplicaReq::kType{utils::TypeI
|
||||
constexpr utils::TypeInfo coordination::UnregisterReplicaRes::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_RES,
|
||||
"UnregisterReplicaRes", nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::EnableWritingOnMainReq::kType{utils::TypeId::COORD_ENABLE_WRITING_ON_MAIN_REQ,
|
||||
"CoordEnableWritingOnMainReq", nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::EnableWritingOnMainRes::kType{utils::TypeId::COORD_ENABLE_WRITING_ON_MAIN_RES,
|
||||
"CoordEnableWritingOnMainRes", nullptr};
|
||||
|
||||
namespace slk {
|
||||
|
||||
void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) {
|
||||
@ -141,6 +159,14 @@ void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Rea
|
||||
memgraph::slk::Load(&self->success, reader);
|
||||
}
|
||||
|
||||
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.success, builder);
|
||||
}
|
||||
|
||||
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(&self->success, reader);
|
||||
}
|
||||
|
||||
} // namespace slk
|
||||
|
||||
} // namespace memgraph
|
||||
|
@ -11,12 +11,12 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/uuid.hpp"
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "rpc/client.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
@ -54,6 +54,8 @@ class CoordinatorClient {
|
||||
|
||||
auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool;
|
||||
|
||||
auto SendEnableWritingOnMainRpc() const -> bool;
|
||||
|
||||
auto ReplicationClientInfo() const -> ReplClientInfo;
|
||||
|
||||
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
|
||||
|
@ -36,6 +36,8 @@ class CoordinatorHandlers {
|
||||
|
||||
static void UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
static void EnableWritingOnMainHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
};
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
|
@ -111,6 +111,31 @@ struct UnregisterReplicaRes {
|
||||
|
||||
using UnregisterReplicaRpc = rpc::RequestResponse<UnregisterReplicaReq, UnregisterReplicaRes>;
|
||||
|
||||
struct EnableWritingOnMainReq {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader);
|
||||
static void Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder);
|
||||
|
||||
EnableWritingOnMainReq() = default;
|
||||
};
|
||||
|
||||
struct EnableWritingOnMainRes {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
|
||||
static void Save(EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
|
||||
|
||||
explicit EnableWritingOnMainRes(bool success) : success(success) {}
|
||||
EnableWritingOnMainRes() = default;
|
||||
|
||||
bool success;
|
||||
};
|
||||
|
||||
using EnableWritingOnMainRpc = rpc::RequestResponse<EnableWritingOnMainReq, EnableWritingOnMainRes>;
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
||||
// SLK serialization declarations
|
||||
@ -134,6 +159,9 @@ void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Rea
|
||||
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
|
||||
|
||||
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
} // namespace memgraph::slk
|
||||
|
||||
#endif
|
||||
|
@ -15,8 +15,6 @@
|
||||
|
||||
#include <flags/replication.hpp>
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
@ -65,6 +65,8 @@ class ReplicationInstance {
|
||||
// TODO: (andi) Inconsistent API
|
||||
auto GetClient() -> CoordinatorClient &;
|
||||
|
||||
auto EnableWritingOnMain() -> bool;
|
||||
|
||||
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
|
||||
auto ResetMainUUID() -> void;
|
||||
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
|
||||
|
@ -109,5 +109,7 @@ auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_n
|
||||
return client_.SendUnregisterReplicaRpc(instance_name);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); }
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -93,6 +93,7 @@
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
@ -1264,17 +1265,13 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
|
||||
replica_nfields = callback.header.size()]() mutable {
|
||||
auto const instances = handler.ShowInstances();
|
||||
std::vector<std::vector<TypedValue>> result{};
|
||||
result.reserve(result.size());
|
||||
auto const converter = [](const auto &status) -> std::vector<TypedValue> {
|
||||
return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address},
|
||||
TypedValue{status.coord_socket_address}, TypedValue{status.is_alive},
|
||||
TypedValue{status.cluster_role}};
|
||||
};
|
||||
|
||||
std::ranges::transform(instances, std::back_inserter(result),
|
||||
[](const auto &status) -> std::vector<TypedValue> {
|
||||
return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address},
|
||||
TypedValue{status.coord_socket_address}, TypedValue{status.is_alive},
|
||||
TypedValue{status.cluster_role}};
|
||||
});
|
||||
|
||||
return result;
|
||||
return utils::fmap(converter, instances);
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
@ -4404,9 +4401,19 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
|
||||
UpdateTypeCount(rw_type);
|
||||
|
||||
if (interpreter_context_->repl_state->IsReplica() && IsQueryWrite(rw_type)) {
|
||||
query_execution = nullptr;
|
||||
throw QueryException("Write query forbidden on the replica!");
|
||||
bool const write_query = IsQueryWrite(rw_type);
|
||||
if (write_query) {
|
||||
if (interpreter_context_->repl_state->IsReplica()) {
|
||||
query_execution = nullptr;
|
||||
throw QueryException("Write query forbidden on the replica!");
|
||||
}
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (FLAGS_coordinator_server_port && !interpreter_context_->repl_state->IsMainWriteable()) {
|
||||
query_execution = nullptr;
|
||||
throw QueryException(
|
||||
"Write query forbidden on the main! Coordinator needs to enable writing on main by sending RPC message.");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Set the target db to the current db (some queries have different target from the current db)
|
||||
|
@ -39,7 +39,8 @@ enum class RegisterReplicaError : uint8_t { NAME_EXISTS, ENDPOINT_EXISTS, COULD_
|
||||
|
||||
struct RoleMainData {
|
||||
RoleMainData() = default;
|
||||
explicit RoleMainData(ReplicationEpoch e, std::optional<utils::UUID> uuid = std::nullopt) : epoch_(std::move(e)) {
|
||||
explicit RoleMainData(ReplicationEpoch e, bool writing_enabled, std::optional<utils::UUID> uuid = std::nullopt)
|
||||
: epoch_(std::move(e)), writing_enabled_(writing_enabled) {
|
||||
if (uuid) {
|
||||
uuid_ = *uuid;
|
||||
}
|
||||
@ -54,6 +55,7 @@ struct RoleMainData {
|
||||
ReplicationEpoch epoch_;
|
||||
std::list<ReplicationClient> registered_replicas_{}; // TODO: data race issues
|
||||
utils::UUID uuid_;
|
||||
bool writing_enabled_{false};
|
||||
};
|
||||
|
||||
struct RoleReplicaData {
|
||||
@ -90,6 +92,21 @@ struct ReplicationState {
|
||||
bool IsMain() const { return GetRole() == replication_coordination_glue::ReplicationRole::MAIN; }
|
||||
bool IsReplica() const { return GetRole() == replication_coordination_glue::ReplicationRole::REPLICA; }
|
||||
|
||||
auto IsMainWriteable() const -> bool {
|
||||
if (auto const *main = std::get_if<RoleMainData>(&replication_data_)) {
|
||||
return main->writing_enabled_;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
auto EnableWritingOnMain() -> bool {
|
||||
if (auto *main = std::get_if<RoleMainData>(&replication_data_)) {
|
||||
main->writing_enabled_ = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool HasDurability() const { return nullptr != durability_; }
|
||||
|
||||
bool TryPersistRoleMain(std::string new_epoch, utils::UUID main_uuid);
|
||||
|
@ -144,8 +144,8 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t {
|
||||
return std::visit(
|
||||
utils::Overloaded{
|
||||
[&](durability::MainRole &&r) -> FetchReplicationResult_t {
|
||||
auto res =
|
||||
RoleMainData{std::move(r.epoch), r.main_uuid.has_value() ? r.main_uuid.value() : utils::UUID{}};
|
||||
auto res = RoleMainData{std::move(r.epoch), false,
|
||||
r.main_uuid.has_value() ? r.main_uuid.value() : utils::UUID{}};
|
||||
auto b = durability_->begin(durability::kReplicationReplicaPrefix);
|
||||
auto e = durability_->end(durability::kReplicationReplicaPrefix);
|
||||
for (; b != e; ++b) {
|
||||
@ -253,7 +253,7 @@ bool ReplicationState::SetReplicationRoleMain(const utils::UUID &main_uuid) {
|
||||
return false;
|
||||
}
|
||||
|
||||
replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, main_uuid};
|
||||
replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, true, main_uuid};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
27
src/utils/functional.hpp
Normal file
27
src/utils/functional.hpp
Normal file
@ -0,0 +1,27 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
template <class F, class T, class R = typename std::result_of<F(T)>::type, class V = std::vector<R>>
|
||||
V fmap(F &&f, const std::vector<T> &v) {
|
||||
V r;
|
||||
r.reserve(v.size());
|
||||
std::ranges::transform(v, std::back_inserter(r), std::forward<F>(f));
|
||||
return r;
|
||||
}
|
||||
|
||||
} // namespace memgraph::utils
|
@ -109,6 +109,8 @@ enum class TypeId : uint64_t {
|
||||
COORD_SWAP_UUID_RES,
|
||||
COORD_UNREGISTER_REPLICA_REQ,
|
||||
COORD_UNREGISTER_REPLICA_RES,
|
||||
COORD_ENABLE_WRITING_ON_MAIN_REQ,
|
||||
COORD_ENABLE_WRITING_ON_MAIN_RES,
|
||||
|
||||
// AST
|
||||
AST_LABELIX = 3000,
|
||||
|
@ -6,6 +6,7 @@ copy_e2e_python_files(ha_experimental coord_cluster_registration.py)
|
||||
copy_e2e_python_files(ha_experimental distributed_coords.py)
|
||||
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
|
||||
copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py)
|
||||
copy_e2e_python_files(ha_experimental disable_writing_on_main_after_restart.py)
|
||||
copy_e2e_python_files(ha_experimental common.py)
|
||||
copy_e2e_python_files(ha_experimental workloads.yaml)
|
||||
|
||||
|
@ -30,3 +30,14 @@ def safe_execute(function, *args):
|
||||
function(*args)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
# NOTE: Repeated execution because it can fail if Raft server is not up
|
||||
def add_coordinator(cursor, query):
|
||||
for _ in range(10):
|
||||
try:
|
||||
execute_and_fetch_all(cursor, query)
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
@ -16,7 +16,7 @@ import tempfile
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
from common import connect, execute_and_fetch_all, safe_execute
|
||||
from common import add_coordinator, connect, execute_and_fetch_all, safe_execute
|
||||
from mg_utils import mg_sleep_and_assert
|
||||
|
||||
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
@ -104,17 +104,6 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
}
|
||||
|
||||
|
||||
# NOTE: Repeated execution because it can fail if Raft server is not up
|
||||
def add_coordinator(cursor, query):
|
||||
for _ in range(10):
|
||||
try:
|
||||
execute_and_fetch_all(cursor, query)
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def test_register_repl_instances_then_coordinators():
|
||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
@ -0,0 +1,181 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
from common import add_coordinator, connect, execute_and_fetch_all, safe_execute
|
||||
from mg_utils import mg_sleep_and_assert
|
||||
|
||||
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
|
||||
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
|
||||
)
|
||||
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
|
||||
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
|
||||
|
||||
TEMP_DIR = tempfile.TemporaryDirectory().name
|
||||
|
||||
MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10011",
|
||||
"--also-log-to-stderr",
|
||||
"--instance-health-check-frequency-sec",
|
||||
"1",
|
||||
"--instance-down-timeout-sec",
|
||||
"5",
|
||||
],
|
||||
"log_file": "instance_1.log",
|
||||
"data_directory": f"{TEMP_DIR}/instance_1",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10012",
|
||||
"--also-log-to-stderr",
|
||||
"--instance-health-check-frequency-sec",
|
||||
"1",
|
||||
"--instance-down-timeout-sec",
|
||||
"5",
|
||||
],
|
||||
"log_file": "instance_2.log",
|
||||
"data_directory": f"{TEMP_DIR}/instance_2",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10013",
|
||||
"--also-log-to-stderr",
|
||||
"--instance-health-check-frequency-sec",
|
||||
"5",
|
||||
"--instance-down-timeout-sec",
|
||||
"10",
|
||||
],
|
||||
"log_file": "instance_3.log",
|
||||
"data_directory": f"{TEMP_DIR}/instance_3",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=1",
|
||||
"--raft-server-port=10111",
|
||||
],
|
||||
"log_file": "coordinator1.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=2",
|
||||
"--raft-server-port=10112",
|
||||
],
|
||||
"log_file": "coordinator2.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_3": {
|
||||
"args": [
|
||||
"--bolt-port",
|
||||
"7692",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=3",
|
||||
"--raft-server-port=10113",
|
||||
"--also-log-to-stderr",
|
||||
],
|
||||
"log_file": "coordinator3.log",
|
||||
"setup_queries": [],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_writing_disabled_on_main_restart():
|
||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
|
||||
|
||||
execute_and_fetch_all(
|
||||
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
|
||||
)
|
||||
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
|
||||
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
|
||||
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
|
||||
|
||||
def check_coordinator3():
|
||||
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
|
||||
|
||||
expected_cluster_coord3 = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
|
||||
("instance_3", "", "127.0.0.1:10013", True, "main"),
|
||||
]
|
||||
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
|
||||
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
|
||||
|
||||
expected_cluster_coord3 = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
|
||||
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
|
||||
]
|
||||
|
||||
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
|
||||
|
||||
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
|
||||
|
||||
try:
|
||||
instance3_cursor = connect(host="localhost", port=7689).cursor()
|
||||
execute_and_fetch_all(instance3_cursor, "CREATE (n:Node {name: 'node'})")
|
||||
except Exception as e:
|
||||
assert (
|
||||
str(e)
|
||||
== "Write query forbidden on the main! Coordinator needs to enable writing on main by sending RPC message."
|
||||
)
|
||||
|
||||
expected_cluster_coord3 = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
|
||||
("instance_3", "", "127.0.0.1:10013", True, "main"),
|
||||
]
|
||||
|
||||
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
|
||||
execute_and_fetch_all(instance3_cursor, "CREATE (n:Node {name: 'node'})")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -44,6 +44,10 @@ workloads:
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/not_replicate_from_old_main.py"]
|
||||
|
||||
- name: "Disable writing on main after restart"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/disable_writing_on_main_after_restart.py"]
|
||||
|
||||
- name: "Distributed coordinators"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["high_availability_experimental/distributed_coords.py"]
|
||||
|
Loading…
Reference in New Issue
Block a user