Compare commits

...

7 Commits

Author SHA1 Message Date
antoniofilipovic
da8db8d064 add RPC to get data 2024-02-14 13:12:03 +01:00
antoniofilipovic
867c192363 add test for get uuid, fix replica recovery bug with coordinator 2024-02-13 12:59:07 +01:00
antoniofilipovic
88e429889e add test for get uuid 2024-02-13 11:22:41 +01:00
antoniofilipovic
6b19923214 add get uuid RPC 2024-02-13 11:22:41 +01:00
antoniofilipovic
ca20b78479 split between try register and register replica 2024-02-13 11:22:41 +01:00
antoniofilipovic
f07fd21a9e add working test for demote to replica, not replicating in new cluster 2024-02-13 11:22:41 +01:00
antoniofilipovic
0739b5f8e6 make demote replica to main idempotent 2024-02-13 11:22:36 +01:00
25 changed files with 718 additions and 81 deletions

View File

@ -16,6 +16,7 @@ target_sources(mg-coordination
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/coordination/raft_state.hpp
include/coordination/rpc_errors.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp

View File

@ -16,7 +16,9 @@
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
@ -121,5 +123,38 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo
return false;
}
auto CoordinatorClient::SendGetInstanceUUID() const
-> utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>> {
std::optional<utils::UUID> instance_uuid;
try {
auto stream{rpc_client_.Stream<GetInstanceUUIDRpc>()};
auto res = stream.AwaitResponse();
if (!res.uuid) {
return instance_uuid;
}
instance_uuid = res.uuid;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
return GetInstanceUUIDError::RPC_EXCEPTION;
}
return instance_uuid;
}
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError,
std::vector<replication_coordination_glue::ReplicationTimestampResult>> {
try {
auto stream{rpc_client_.Stream<coordination::GetInstanceTimestampsRpc>()};
auto res = stream.AwaitResponse();
return res.replica_timestamps;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
return GetInstanceUUIDError::RPC_EXCEPTION;
}
}
} // namespace memgraph::coordination
#endif

View File

@ -39,6 +39,26 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
spdlog::info("Received SwapMainUUIDRPC on coordinator server");
CoordinatorHandlers::SwapMainUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::GetInstanceUUIDRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::GetInstanceTimestampsRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received GetInstanceTimestampsRpc on coordinator server");
CoordinatorHandlers::GetInstanceTimestampsHandler(replication_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder) {
coordination::GetInstanceTimestampsReq req;
slk::Load(&req, req_reader);
slk::Save(coordination::GetInstanceTimestampsRes{replication_handler.GetTimestampsForEachDb()}, res_builder);
}
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
@ -62,12 +82,6 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan
slk::Reader *req_reader, slk::Builder *res_builder) {
spdlog::info("Executing DemoteMainToReplicaHandler");
if (!replication_handler.IsMain()) {
spdlog::error("Setting to replica must be performed on main.");
slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder);
return;
}
coordination::DemoteMainToReplicaReq req;
slk::Load(&req, req_reader);
@ -77,11 +91,21 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan
if (!replication_handler.SetReplicationRoleReplica(clients_config, std::nullopt)) {
spdlog::error("Demoting main to replica failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder);
return;
}
slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder);
slk::Save(coordination::DemoteMainToReplicaRes{true}, res_builder);
}
void CoordinatorHandlers::GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder) {
spdlog::info("Executing GetInstanceUUIDHandler");
coordination::GetInstanceUUIDReq req;
slk::Load(&req, req_reader);
slk::Save(coordination::GetInstanceUUIDRes{replication_handler.GetReplicaUUID()}, res_builder);
}
void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHandler &replication_handler,

View File

@ -47,6 +47,10 @@ CoordinatorInstance::CoordinatorInstance()
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
spdlog::error(
fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()));
@ -61,14 +65,6 @@ CoordinatorInstance::CoordinatorInstance()
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
// incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
repl_instance.ResetMainUUID();
};
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
@ -174,6 +170,20 @@ auto CoordinatorInstance::TryFailover() -> void {
}
// TODO: Smarter choice
{
// for each DB we get one ReplicationTimestampResult, that is why we have vector here
using ReplicaTimestampsRes = std::vector<replication_coordination_glue::ReplicationTimestampResult>;
std::unordered_map<std::string, ReplicaTimestampsRes> instance_replica_timestamps_res;
std::for_each(alive_replicas.begin(), alive_replicas.end(),
[&instance_replica_timestamps_res](ReplicationInstance &replica) {
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
if (res.HasError()) {
return;
}
instance_replica_timestamps_res.emplace(replica.InstanceName(), std::move(res.GetValue()));
});
}
auto new_main = ranges::begin(alive_replicas);
new_main->PauseFrequentCheck();

View File

@ -52,6 +52,40 @@ void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::R
memgraph::slk::Load(self, reader);
}
// GetInstanceUUID
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceUUIDReq::Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void GetInstanceUUIDRes::Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
// GetInstanceUUID
void GetInstanceTimestampsReq::Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceTimestampsReq::Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void GetInstanceTimestampsRes::Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceTimestampsRes::Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -66,8 +100,22 @@ constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::Typ
constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordDemoteToReplicaRes", nullptr};
constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId::COORD_GET_UUID_REQ, "CoordGetUUIDReq",
nullptr};
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
nullptr};
constexpr utils::TypeInfo coordination::GetInstanceTimestampsReq::kType{
utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_REQ, "GetInstanceTimestampsReq", nullptr};
constexpr utils::TypeInfo coordination::GetInstanceTimestampsRes::kType{
utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_RES, "GetInstanceTimestampsRes", nullptr};
namespace slk {
// PromoteReplicaToMainRpc
void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
}
@ -86,6 +134,7 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::
memgraph::slk::Load(&self->replication_clients_info, reader);
}
// DemoteMainToReplicaRpc
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replication_client_info, builder);
}
@ -102,6 +151,42 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
memgraph::slk::Load(&self->success, reader);
}
// GetInstanceUUIDRpc
void Save(const memgraph::coordination::GetInstanceUUIDReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
/* nothing to serialize*/
}
void Load(memgraph::coordination::GetInstanceUUIDReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
/* nothing to serialize*/
}
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.uuid, builder);
}
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->uuid, reader);
}
// GetInstanceTimestampsReq
void Save(const memgraph::coordination::GetInstanceTimestampsReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
/* nothing to serialize*/
}
void Load(memgraph::coordination::GetInstanceTimestampsReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
/* nothing to serialize*/
}
void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replica_timestamps, builder);
}
void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->replica_timestamps, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -15,7 +15,10 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/client.hpp"
#include "rpc_errors.hpp"
#include "utils/result.hpp"
#include "utils/scheduler.hpp"
namespace memgraph::coordination {
@ -51,10 +54,16 @@ class CoordinatorClient {
auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool;
auto SendGetInstanceUUID() const -> memgraph::utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>>;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError,
std::vector<replication_coordination_glue::ReplicationTimestampResult>>;
auto RpcClient() -> rpc::Client & { return rpc_client_; }
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {

View File

@ -16,6 +16,7 @@ namespace memgraph::coordination {
struct CoordinatorClusterConfig {
static constexpr int alive_response_time_difference_sec_{5};
static constexpr int replica_uuid_last_check_time_difference_sec_{10};
};
} // namespace memgraph::coordination

View File

@ -33,6 +33,12 @@ class CoordinatorHandlers {
slk::Builder *res_builder);
static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -15,6 +15,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
@ -82,6 +83,62 @@ struct DemoteMainToReplicaRes {
using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>;
struct GetInstanceUUIDReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder);
GetInstanceUUIDReq() = default;
bool success;
};
struct GetInstanceUUIDRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
explicit GetInstanceUUIDRes(std::optional<utils::UUID> uuid) : uuid(uuid) {}
GetInstanceUUIDRes() = default;
std::optional<utils::UUID> uuid;
};
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
struct GetInstanceTimestampsReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder);
GetInstanceTimestampsReq() = default;
bool success;
};
struct GetInstanceTimestampsRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder);
explicit GetInstanceTimestampsRes(
const std::vector<replication_coordination_glue::ReplicationTimestampResult> &replica_timestamps)
: replica_timestamps(replica_timestamps) {}
GetInstanceTimestampsRes() = default;
std::vector<replication_coordination_glue::ReplicationTimestampResult> replica_timestamps;
};
using GetInstanceTimestampsRpc = rpc::RequestResponse<GetInstanceTimestampsReq, GetInstanceTimestampsRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -99,6 +156,17 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
// GetInstanceUUIDRpc
void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
// GetInstanceTimestampsRpc
void Save(const memgraph::coordination::GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
@ -34,5 +35,20 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
Load(&obj->replication_ip_address, reader);
Load(&obj->replication_port, reader);
}
inline void Save(const replication_coordination_glue::ReplicationTimestampResult &obj, Builder *builder) {
Save(obj.db_uuid, builder);
Save(obj.history, builder);
Save(obj.last_commit_timestamp, builder);
Save(obj.epoch_id, builder);
}
inline void Load(replication_coordination_glue::ReplicationTimestampResult *obj, Reader *reader) {
Load(&obj->db_uuid, reader);
Load(&obj->history, reader);
Load(&obj->last_commit_timestamp, reader);
Load(&obj->epoch_id, reader);
}
} // namespace memgraph::slk
#endif

View File

@ -19,6 +19,7 @@
#include "replication_coordination_glue/role.hpp"
#include <libnuraft/nuraft.hxx>
#include "utils/result.hpp"
#include "utils/uuid.hpp"
namespace memgraph::coordination {
@ -38,6 +39,9 @@ class ReplicationInstance {
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto IsReadyForUUIDPing() -> bool;
void UpdateReplicaLastResponseUUID();
auto IsAlive() const -> bool;
@ -60,6 +64,7 @@ class ReplicationInstance {
auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool;
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
auto GetClient() -> CoordinatorClient &;
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
@ -71,6 +76,7 @@ class ReplicationInstance {
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};
bool is_alive_{false};
std::chrono::system_clock::time_point last_check_of_uuid_{};
// for replica this is main uuid of current main
// for "main" main this same as in CoordinatorData

View File

@ -0,0 +1,15 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
namespace memgraph::coordination {
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
} // namespace memgraph::coordination

View File

@ -14,6 +14,7 @@
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
@ -40,6 +41,11 @@ auto ReplicationInstance::OnFailPing() -> bool {
return is_alive_;
}
auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_)
.count() > CoordinatorClusterConfig::replica_uuid_last_check_time_difference_sec_;
}
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
@ -91,10 +97,20 @@ auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
if (!main_uuid_ || *main_uuid_ != curr_main_uuid) {
return SendSwapAndUpdateUUID(curr_main_uuid);
if (!IsReadyForUUIDPing()) {
return true;
}
return true;
auto res = SendGetInstanceUUID();
if (res.HasError()) {
return false;
}
UpdateReplicaLastResponseUUID();
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
return true;
}
return SendSwapAndUpdateUUID(curr_main_uuid);
}
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool {
@ -105,5 +121,12 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid
return true;
}
auto ReplicationInstance::SendGetInstanceUUID()
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
return client_.SendGetInstanceUUID();
}
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
} // namespace memgraph::coordination
#endif

View File

@ -268,10 +268,6 @@ class DbmsHandler {
bool IsMain() const { return repl_state_.IsMain(); }
bool IsReplica() const { return repl_state_.IsReplica(); }
#ifdef MG_ENTERPRISE
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
#endif
/**
* @brief Return the statistics all databases.
*

View File

@ -327,7 +327,7 @@ class ReplQueryHandler {
.port = static_cast<uint16_t>(*port),
};
if (!handler_->SetReplicationRoleReplica(config, std::nullopt)) {
if (!handler_->TrySetReplicationRoleReplica(config, std::nullopt)) {
throw QueryRuntimeException("Couldn't set role to replica!");
}
}

View File

@ -49,6 +49,9 @@ struct ReplicationQueryHandler {
virtual bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) = 0;
virtual bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) = 0;
// as MAIN, define and connect to REPLICAs
virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
-> utils::BasicResult<RegisterReplicaError> = 0;

View File

@ -62,8 +62,10 @@ ReplicationState::ReplicationState(std::optional<std::filesystem::path> durabili
}
#endif
if (std::holds_alternative<RoleReplicaData>(replication_data)) {
spdlog::trace("Recovered main's uuid for replica {}",
std::string(std::get<RoleReplicaData>(replication_data).uuid_.value()));
std::string uuid = std::get<RoleReplicaData>(replication_data).uuid_.has_value()
? std::string(std::get<RoleReplicaData>(replication_data).uuid_.value())
: "";
spdlog::trace("Recovered main's uuid for replica {}", uuid);
} else {
spdlog::trace("Recovered uuid for main {}", std::string(std::get<RoleMainData>(replication_data).uuid_));
}

View File

@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
mode.hpp
role.hpp
handler.hpp
common.hpp
PRIVATE
messages.cpp

View File

@ -0,0 +1,31 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "rpc/client.hpp"
#include "utils/uuid.hpp"
#include <deque>
#include "messages.hpp"
#include "rpc/messages.hpp"
#include "utils/uuid.hpp"
namespace memgraph::replication_coordination_glue {
struct ReplicationTimestampResult {
memgraph::utils::UUID db_uuid;
std::vector<std::pair<std::string, uint64_t>> history;
uint64_t last_commit_timestamp;
std::string epoch_id;
};
} // namespace memgraph::replication_coordination_glue

View File

@ -13,6 +13,8 @@
#include "auth/auth.hpp"
#include "dbms/dbms_handler.hpp"
#include "replication/include/replication/state.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_handler/system_replication.hpp"
#include "replication_handler/system_rpc.hpp"
#include "utils/result.hpp"
@ -108,10 +110,65 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
// as REPLICA, become MAIN
bool SetReplicationRoleMain() override;
// as MAIN, become REPLICA
// as MAIN, become REPLICA, can be called on MAIN and REPLICA
bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) override;
// as MAIN, become REPLICA, can be called only on MAIN
bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) override;
// as MAIN, become REPLICA
template <bool HandleIdempotency>
bool SetReplicationRoleReplica_(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) {
if (!HandleIdempotency && repl_state_.IsReplica()) {
return false;
}
if (HandleIdempotency && repl_state_.IsReplica()) {
// We don't want to restart the server if we're already a REPLICA with correct config
auto &replica_data = std::get<memgraph::replication::RoleReplicaData>(repl_state_.ReplicationData());
if (replica_data.config == config) {
return true;
}
repl_state_.SetReplicationRoleReplica(config, main_uuid);
#ifdef MG_ENTERPRISE
return StartRpcServer(dbms_handler_, replica_data, auth_);
#else
return StartRpcServer(dbms_handler_, replica_data);
#endif
}
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
// deleting the replica.
// Remove database specific clients
dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
});
// Remove instance level clients
std::get<memgraph::replication::RoleMainData>(repl_state_.ReplicationData()).registered_replicas_.clear();
// Creates the server
repl_state_.SetReplicationRoleReplica(config, main_uuid);
// Start
const auto success = std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
// ASSERT
return false;
},
[this](memgraph::replication::RoleReplicaData &data) {
#ifdef MG_ENTERPRISE
return StartRpcServer(dbms_handler_, data, auth_);
#else
return StartRpcServer(dbms_handler_, data);
#endif
}},
repl_state_.ReplicationData());
// TODO Handle error (restore to main?)
return success;
}
// as MAIN, define and connect to REPLICAs
auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> override;
@ -132,6 +189,10 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplState() const -> const memgraph::replication::ReplicationState &;
auto GetReplState() -> memgraph::replication::ReplicationState &;
auto GetReplicaUUID() -> std::optional<utils::UUID>;
auto GetTimestampsForEachDb() -> std::vector<replication_coordination_glue::ReplicationTimestampResult>;
private:
template <bool HandleFailure>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)

View File

@ -196,40 +196,12 @@ bool ReplicationHandler::SetReplicationRoleMain() {
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) {
// We don't want to restart the server if we're already a REPLICA
if (repl_state_.IsReplica()) {
spdlog::trace("Instance has already has replica role.");
return false;
}
return SetReplicationRoleReplica_<true>(config, main_uuid);
}
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
// deleting the replica.
// Remove database specific clients
dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
});
// Remove instance level clients
std::get<memgraph::replication::RoleMainData>(repl_state_.ReplicationData()).registered_replicas_.clear();
// Creates the server
repl_state_.SetReplicationRoleReplica(config, main_uuid);
// Start
const auto success = std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) {
// ASSERT
return false;
},
[this](memgraph::replication::RoleReplicaData &data) {
#ifdef MG_ENTERPRISE
return StartRpcServer(dbms_handler_, data, auth_);
#else
return StartRpcServer(dbms_handler_, data);
#endif
}},
repl_state_.ReplicationData());
// TODO Handle error (restore to main?)
return success;
bool ReplicationHandler::TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config,
const std::optional<utils::UUID> &main_uuid) {
return SetReplicationRoleReplica_<false>(config, main_uuid);
}
bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) {
@ -300,6 +272,33 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g
return repl_state_.GetRole();
}
auto ReplicationHandler::GetTimestampsForEachDb()
-> std::vector<replication_coordination_glue::ReplicationTimestampResult> {
std::vector<replication_coordination_glue::ReplicationTimestampResult> results;
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
auto &repl_storage_state = storage->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history;
history.reserve(repl_storage_state.history.size());
std::transform(repl_storage_state.history.begin(), repl_storage_state.history.end(), std::back_inserter(history),
[](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); });
replication_coordination_glue::ReplicationTimestampResult repl{
.db_uuid = db_acc->storage()->uuid(),
.history = history,
.last_commit_timestamp = repl_storage_state.last_commit_timestamp_.load(),
.epoch_id = std::string(repl_storage_state.epoch_.id())};
results.emplace_back(repl);
});
return results;
}
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
MG_ASSERT(repl_state_.IsReplica());
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
}
auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; }
auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; }

View File

@ -108,6 +108,11 @@ enum class TypeId : uint64_t {
COORD_SWAP_UUID_REQ,
COORD_SWAP_UUID_RES,
COORD_GET_UUID_REQ,
COORD_GET_UUID_RES,
COORD_GET_INSTANCE_TIMESTAMPS_REQ,
COORD_GET_INSTANCE_TIMESTAMPS_RES,
// AST
AST_LABELIX = 3000,
AST_PROPERTYIX,

View File

@ -10,11 +10,13 @@
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import execute_and_fetch_all
from common import execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -38,7 +40,7 @@ MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = {
}
MEMGRAPH_INSTANCES_DESCRIPTION = {
MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = {
"replica": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE"],
"log_file": "replica.log",
@ -71,7 +73,7 @@ def test_replication_works_on_failover(connection):
assert actual_data_on_main == expected_data_on_main
# 3
interactive_mg_runner.start_all_keep_others(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION)
# 4
new_main_cursor = connection(7690, "main_2").cursor()
@ -113,5 +115,144 @@ def test_replication_works_on_failover(connection):
interactive_mg_runner.stop_all()
def test_not_replicate_old_main_register_new_cluster(connection):
# Goal of this test is to check that although replica is registered in one cluster
# it can be re-registered to new cluster
# This flow checks if Registering replica is idempotent and that old main cannot talk to replica
# 1. We start all replicas and main in one cluster
# 2. Main from first cluster can see all replicas
# 3. We start all replicas and main in second cluster, by reusing one replica from first cluster
# 4. New main should see replica. Registration should pass (idempotent registration)
# 5. Old main should not talk to new replica
# 6. New main should talk to replica
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION = {
"shared_instance": {
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/shared_instance",
"setup_queries": [],
},
"instance_2": {
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"coordinator_1": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"SET INSTANCE instance_2 TO MAIN",
],
},
}
# 1
interactive_mg_runner.start_all_keep_others(MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION)
# 2
first_cluster_coord_cursor = connection(7690, "coord_1").cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;")))
expected_data_up_first_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("shared_instance", "", "127.0.0.1:10011", True, "replica"),
]
mg_sleep_and_assert(expected_data_up_first_cluster, show_repl_cluster)
# 3
MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION = {
"instance_3": {
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_2": {
"args": ["--bolt-port", "7691", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10112"],
"log_file": "coordinator.log",
"setup_queries": [],
},
}
interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION)
second_cluster_coord_cursor = connection(7691, "coord_2").cursor()
execute_and_fetch_all(
second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';"
)
execute_and_fetch_all(
second_cluster_coord_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';"
)
execute_and_fetch_all(second_cluster_coord_cursor, "SET INSTANCE instance_3 TO MAIN")
# 4
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(second_cluster_coord_cursor, "SHOW INSTANCES;")))
expected_data_up_second_cluster = [
("coordinator_1", "127.0.0.1:10112", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("shared_instance", "", "127.0.0.1:10011", True, "replica"),
]
mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster)
# 5
main_1_cursor = connection(7689, "main_1").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_1_cursor, "CREATE ();")
assert (
str(e.value)
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
)
shared_replica_cursor = connection(7688, "shared_replica").cursor()
res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
assert res == 0, "Old main should not replicate to 'shared' replica"
# 6
main_2_cursor = connection(7687, "main_2").cursor()
execute_and_fetch_all(main_2_cursor, "CREATE ();")
shared_replica_cursor = connection(7688, "shared_replica").cursor()
res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0]
assert res == 1, "New main should replicate to 'shared' replica"
interactive_mg_runner.stop_all()
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -147,6 +147,105 @@ def test_replication_works_on_failover():
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_replication_works_on_replica_instance_restart():
# Goal of this test is to check the replication works after replica goes down and restarts
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that main has correct state
# 3. We kill replica
# 4. We check that main cannot replicate to replica
# 5. We bring replica back up
# 6. We check that replica gets data
safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
# 3
coord_cursor = connect(host="localhost", port=7690).cursor()
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE ();")
assert (
str(e.value)
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
)
res_instance_1 = execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
assert res_instance_1 == 1
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 5.
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 2, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 6.
instance_2_cursor = connect(port=7689, host="localhost").cursor()
execute_and_fetch_all(main_cursor, "CREATE ();")
res_instance_2 = execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
assert res_instance_2 == 2
def test_show_instances():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)

View File

@ -142,7 +142,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
MinMemgraph replica(repl_conf);
auto replica_store_handler = replica.repl_handler;
replica_store_handler.SetReplicationRoleReplica(
replica_store_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -439,13 +439,13 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
MinMemgraph replica1(repl_conf);
MinMemgraph replica2(repl2_conf);
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
},
std::nullopt);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
@ -597,7 +597,7 @@ TEST_F(ReplicationTest, RecoveryProcess) {
MinMemgraph replica(repl_conf);
auto replica_store_handler = replica.repl_handler;
replica_store_handler.SetReplicationRoleReplica(
replica_store_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -676,7 +676,7 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
MinMemgraph replica_async(repl_conf);
auto replica_store_handler = replica_async.repl_handler;
replica_store_handler.SetReplicationRoleReplica(
replica_store_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
@ -726,7 +726,7 @@ TEST_F(ReplicationTest, EpochTest) {
MinMemgraph main(main_conf);
MinMemgraph replica1(repl_conf);
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -734,7 +734,7 @@ TEST_F(ReplicationTest, EpochTest) {
std::nullopt);
MinMemgraph replica2(repl2_conf);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = 10001,
@ -819,7 +819,7 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc->Commit().HasError());
}
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -858,7 +858,7 @@ TEST_F(ReplicationTest, ReplicationInformation) {
MinMemgraph replica1(repl_conf);
uint16_t replica1_port = 10001;
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = replica1_port,
@ -867,7 +867,7 @@ TEST_F(ReplicationTest, ReplicationInformation) {
uint16_t replica2_port = 10002;
MinMemgraph replica2(repl2_conf);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = replica2_port,
@ -923,7 +923,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
MinMemgraph replica1(repl_conf);
uint16_t replica1_port = 10001;
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = replica1_port,
@ -932,7 +932,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
uint16_t replica2_port = 10002;
MinMemgraph replica2(repl2_conf);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = replica2_port,
@ -966,7 +966,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
MinMemgraph main(main_conf);
MinMemgraph replica1(repl_conf);
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = common_port,
@ -974,7 +974,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
std::nullopt);
MinMemgraph replica2(repl2_conf);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = common_port,
@ -1023,7 +1023,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
std::optional<MinMemgraph> main(main_config);
MinMemgraph replica1(replica1_config);
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -1031,7 +1031,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
std::nullopt);
MinMemgraph replica2(replica2_config);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
@ -1088,7 +1088,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
std::optional<MinMemgraph> main(main_config);
MinMemgraph replica1(repl_conf);
replica1.repl_handler.SetReplicationRoleReplica(
replica1.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
@ -1097,7 +1097,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
MinMemgraph replica2(repl2_conf);
replica2.repl_handler.SetReplicationRoleReplica(
replica2.repl_handler.TrySetReplicationRoleReplica(
ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],